Bug 1289932 - Send/Handle push messages for send tab to device. r?markh draft
authorEdouard Oger <eoger@fastmail.com>
Tue, 02 Aug 2016 10:09:30 -0700
changeset 397964 242a22dacbf7bfa3ac5088c925e8b9e7eaeaefbf
parent 397822 e78975b53563d80c99ebfbdf8a9fbf6b829a8a48
child 527584 dab927dcf047cd0013390f3a06b76766b4d214e1
push id25442
push userbmo:eoger@fastmail.com
push dateMon, 08 Aug 2016 17:47:42 +0000
reviewersmarkh
bugs1289932
milestone51.0a1
Bug 1289932 - Send/Handle push messages for send tab to device. r?markh MozReview-Commit-ID: WD4XzRtl86
services/fxaccounts/FxAccounts.jsm
services/fxaccounts/FxAccountsClient.jsm
services/fxaccounts/FxAccountsCommon.js
services/fxaccounts/FxAccountsPush.js
services/sync/modules/constants.js
services/sync/modules/engines.js
services/sync/modules/engines/clients.js
services/sync/modules/service.js
services/sync/tests/unit/test_clients_engine.js
--- a/services/fxaccounts/FxAccounts.jsm
+++ b/services/fxaccounts/FxAccounts.jsm
@@ -45,16 +45,17 @@ var publicProperties = [
   "getKeys",
   "getOAuthToken",
   "getSignedInUser",
   "getSignedInUserProfile",
   "handleDeviceDisconnection",
   "invalidateCertificate",
   "loadAndPoll",
   "localtimeOffsetMsec",
+  "notifyDevices",
   "now",
   "promiseAccountsChangeProfileURI",
   "promiseAccountsForceSigninURI",
   "promiseAccountsManageURI",
   "removeCachedOAuthToken",
   "resendVerificationEmail",
   "resetCredentials",
   "sessionStatus",
@@ -396,16 +397,39 @@ FxAccountsInternal.prototype = {
   // A hook-point for tests who may want a mocked AccountState or mocked storage.
   newAccountState(credentials) {
     let storage = new FxAccountsStorageManager();
     storage.initialize(credentials);
     return new AccountState(storage);
   },
 
   /**
+   * Send a message to a set of devices in the same account
+   *
+   * @return Promise
+   */
+  notifyDevices: function(deviceIds, payload, TTL) {
+    if (!Array.isArray(deviceIds)) {
+      deviceIds = [deviceIds];
+    }
+    return this.currentAccountState.getUserAccountData()
+      .then(data => {
+        if (!data) {
+          throw this._error(ERROR_NO_ACCOUNT);
+        }
+        if (!data.sessionToken) {
+          throw this._error(ERROR_AUTH_ERROR,
+            "notifyDevices called without a session token");
+        }
+        return this.fxAccountsClient.notifyDevices(data.sessionToken, deviceIds,
+          payload, TTL);
+    });
+  },
+
+  /**
    * Return the current time in milliseconds as an integer.  Allows tests to
    * manipulate the date to simulate certificate expiration.
    */
   now: function() {
     return this.fxAccountsClient.now();
   },
 
   getAccountsClient: function() {
--- a/services/fxaccounts/FxAccountsClient.jsm
+++ b/services/fxaccounts/FxAccountsClient.jsm
@@ -414,16 +414,41 @@ this.FxAccountsClient.prototype = {
       body.pushPublicKey = options.pushPublicKey;
       body.pushAuthKey = options.pushAuthKey;
     }
 
     return this._request(path, "POST", creds, body);
   },
 
   /**
+   * Sends a message to other devices. Must conform with the push payload schema:
+   * https://github.com/mozilla/fxa-auth-server/blob/master/docs/pushpayloads.schema.json
+   *
+   * @method notifyDevice
+   * @param  sessionTokenHex
+   *         Session token obtained from signIn
+   * @param  deviceIds
+   *         Devices to send the message to
+   * @param  payload
+   *         Data to send with the message
+   * @return Promise
+   *         Resolves to an empty object:
+   *         {}
+   */
+  notifyDevices(sessionTokenHex, deviceIds, payload, TTL = 0) {
+    const body = {
+      to: deviceIds,
+      payload,
+      TTL
+    };
+    return this._request("/account/devices/notify", "POST",
+      deriveHawkCredentials(sessionTokenHex, "sessionToken"), body);
+  },
+
+  /**
    * Update the session or name for an existing device
    *
    * @method updateDevice
    * @param  sessionTokenHex
    *         Session token obtained from signIn
    * @param  id
    *         Device identifier
    * @param  name
--- a/services/fxaccounts/FxAccountsCommon.js
+++ b/services/fxaccounts/FxAccountsCommon.js
@@ -87,16 +87,17 @@ exports.POLL_SESSION       = 1000 * 60 *
 exports.ONLOGIN_NOTIFICATION = "fxaccounts:onlogin";
 exports.ONVERIFIED_NOTIFICATION = "fxaccounts:onverified";
 exports.ONLOGOUT_NOTIFICATION = "fxaccounts:onlogout";
 // Internal to services/fxaccounts only
 exports.ON_FXA_UPDATE_NOTIFICATION = "fxaccounts:update";
 exports.ON_DEVICE_DISCONNECTED_NOTIFICATION = "fxaccounts:device_disconnected";
 exports.ON_PASSWORD_CHANGED_NOTIFICATION = "fxaccounts:password_changed";
 exports.ON_PASSWORD_RESET_NOTIFICATION = "fxaccounts:password_reset";
+exports.ON_COLLECTION_CHANGED_NOTIFICATION = "sync:collection_changed";
 
 exports.FXA_PUSH_SCOPE_ACCOUNT_UPDATE = "chrome://fxa-device-update";
 
 exports.ON_PROFILE_CHANGE_NOTIFICATION = "fxaccounts:profilechange";
 exports.ON_ACCOUNT_STATE_CHANGE_NOTIFICATION = "fxaccounts:statechange";
 
 // UI Requests.
 exports.UI_REQUEST_SIGN_IN_FLOW = "signInFlow";
--- a/services/fxaccounts/FxAccountsPush.js
+++ b/services/fxaccounts/FxAccountsPush.js
@@ -164,16 +164,18 @@ FxAccountsPushService.prototype = {
     switch (payload.command) {
       case ON_DEVICE_DISCONNECTED_NOTIFICATION:
         return this.fxAccounts.handleDeviceDisconnection(payload.data.id);
         break;
       case ON_PASSWORD_CHANGED_NOTIFICATION:
       case ON_PASSWORD_RESET_NOTIFICATION:
         return this._onPasswordChanged();
         break;
+      case ON_COLLECTION_CHANGED_NOTIFICATION:
+        Services.obs.notifyObservers(null, ON_COLLECTION_CHANGED_NOTIFICATION, payload.data.collections);
       default:
         this.log.warn("FxA Push command unrecognized: " + payload.command);
     }
   },
   /**
    * Check the FxA session status after a password change/reset event.
    * If the session is invalid, reset credentials and notify listeners of
    * ON_ACCOUNT_STATE_CHANGE_NOTIFICATION that the account may have changed
--- a/services/sync/modules/constants.js
+++ b/services/sync/modules/constants.js
@@ -40,17 +40,17 @@ SYNC_KEY_HYPHENATED_LENGTH:            3
 
 NO_SYNC_NODE_INTERVAL:                 10 * 60 * 1000, // 10 minutes
 
 MAX_ERROR_COUNT_BEFORE_BACKOFF:        3,
 MAX_IGNORE_ERROR_COUNT:                5,
 
 // Backoff intervals
 MINIMUM_BACKOFF_INTERVAL:              15 * 60 * 1000,      // 15 minutes
-MAXIMUM_BACKOFF_INTERVAL:              8 * 60 * 60 * 1000,  // 8 hours 
+MAXIMUM_BACKOFF_INTERVAL:              8 * 60 * 60 * 1000,  // 8 hours
 
 // HMAC event handling timeout.
 // 10 minutes: a compromise between the multi-desktop sync interval
 // and the mobile sync interval.
 HMAC_EVENT_INTERVAL:                   600000,
 
 // How long to wait between sync attempts if the Master Password is locked.
 MASTER_PASSWORD_LOCKED_RETRY_INTERVAL: 15 * 60 * 1000,   // 15 minutes
@@ -96,16 +96,19 @@ IDLE_OBSERVER_BACK_DELAY:              1
 
 // Max number of records or bytes to upload in a single POST - we'll do multiple POSTS if either
 // MAX_UPLOAD_RECORDS or MAX_UPLOAD_BYTES is hit)
 MAX_UPLOAD_RECORDS:                    100,
 MAX_UPLOAD_BYTES:                      1024 * 1023, // just under 1MB
 MAX_HISTORY_UPLOAD:                    5000,
 MAX_HISTORY_DOWNLOAD:                  5000,
 
+// TTL of the message sent to another device when sending a tab
+NOTIFY_TAB_SENT_TTL_SECS:              1 * 3600, // 1 hour
+
 // Top-level statuses:
 STATUS_OK:                             "success.status_ok",
 SYNC_FAILED:                           "error.sync.failed",
 LOGIN_FAILED:                          "error.login.failed",
 SYNC_FAILED_PARTIAL:                   "error.sync.failed_partial",
 CLIENT_NOT_CONFIGURED:                 "service.client_not_configured",
 STATUS_DISABLED:                       "service.disabled",
 MASTER_PASSWORD_LOCKED:                "service.master_password_locked",
--- a/services/sync/modules/engines.js
+++ b/services/sync/modules/engines.js
@@ -16,16 +16,19 @@ Cu.import("resource://services-common/as
 Cu.import("resource://gre/modules/Log.jsm");
 Cu.import("resource://services-common/observers.js");
 Cu.import("resource://services-sync/constants.js");
 Cu.import("resource://services-sync/identity.js");
 Cu.import("resource://services-sync/record.js");
 Cu.import("resource://services-sync/resource.js");
 Cu.import("resource://services-sync/util.js");
 
+XPCOMUtils.defineLazyModuleGetter(this, "fxAccounts",
+  "resource://gre/modules/FxAccounts.jsm");
+
 /*
  * Trackers are associated with a single engine and deal with
  * listening for changes to their particular data type.
  *
  * There are two things they keep track of:
  * 1) A score, indicating how urgently the engine wants to sync
  * 2) A list of IDs for all the changed items that need to be synced
  * and updating their 'score', indicating how urgently they
@@ -1472,20 +1475,22 @@ SyncEngine.prototype = {
         let failed_ids = Object.keys(resp.obj.failed);
         counts.failed += failed_ids.length;
         if (failed_ids.length)
           this._log.debug("Records that will be uploaded again because "
                           + "the server couldn't store them: "
                           + failed_ids.join(", "));
 
         // Clear successfully uploaded objects.
-        for (let key in resp.obj.success) {
-          let id = resp.obj.success[key];
+        const succeeded_ids = Object.values(resp.obj.success);
+        for (let id of succeeded_ids) {
           delete this._modified[id];
         }
+
+        this._onRecordsWritten(succeeded_ids, failed_ids);
       }
 
       let postQueue = up.newPostQueue(this._log, handleResponse);
 
       for (let id of modifiedIDs) {
         let out;
         let ok = false;
         try {
@@ -1506,16 +1511,21 @@ SyncEngine.prototype = {
         }
         this._store._sleep(0);
       }
       postQueue.flush();
       Observers.notify("weave:engine:sync:uploaded", counts, this.name);
     }
   },
 
+  _onRecordsWritten(succeeded, failed) {
+    // Implement this method to take specific actions against successfully
+    // uploaded records and failed records.
+  },
+
   // Any cleanup necessary.
   // Save the current snapshot so as to calculate changes at next sync
   _syncFinish: function () {
     this._log.trace("Finishing up sync");
     this._tracker.resetScore();
 
     let doDelete = Utils.bind2(this, function(key, val) {
       let coll = new Collection(this.engineURL, this._recordObj, this.service);
--- a/services/sync/modules/engines/clients.js
+++ b/services/sync/modules/engines/clients.js
@@ -43,17 +43,18 @@ ClientsRec.prototype = {
   _logName: "Sync.Record.Clients",
   ttl: CLIENTS_TTL
 };
 
 Utils.deferGetSet(ClientsRec,
                   "cleartext",
                   ["name", "type", "commands",
                    "version", "protocols",
-                   "formfactor", "os", "appPackage", "application", "device"]);
+                   "formfactor", "os", "appPackage", "application", "device",
+                   "fxaDeviceId"]);
 
 
 this.ClientEngine = function ClientEngine(service) {
   SyncEngine.call(this, "Clients", service);
 
   // Reset the client on every startup so that we fetch recent clients
   this._resetClient();
 }
@@ -171,16 +172,23 @@ ClientEngine.prototype = {
   getClientName(id) {
     if (id == this.localID) {
       return this.localName;
     }
     let client = this._store._remoteClients[id];
     return client ? client.name : "";
   },
 
+  getClientFxaDeviceId(id) {
+    if (this._store._remoteClients[id]) {
+      return this._store._remoteClients[id].fxaDeviceId;
+    }
+    return null;
+  },
+
   isMobile: function isMobile(id) {
     if (this._store._remoteClients[id])
       return this._store._remoteClients[id].type == DEVICE_TYPE_MOBILE;
     return false;
   },
 
   _syncStartup: function _syncStartup() {
     // Reupload new client record periodically.
@@ -232,16 +240,41 @@ ClientEngine.prototype = {
     }
   },
 
   _uploadOutgoing() {
     this._clearedCommands = null;
     SyncEngine.prototype._uploadOutgoing.call(this);
   },
 
+  _onRecordsWritten(succeeded, failed) {
+    // Notify other devices that their own client collection changed
+    const idsToNotify = succeeded.reduce((acc, id) => {
+      if (id == this.localID) {
+        return acc;
+      }
+      const fxaDeviceId = this.getClientFxaDeviceId(id);
+      return fxaDeviceId ? acc.concat(fxaDeviceId) : acc;
+    }, []);
+    if (idsToNotify.length > 0) {
+      this._notifyCollectionChanged(idsToNotify);
+    }
+  },
+
+  _notifyCollectionChanged(ids) {
+    const message = {
+      version: 1,
+      command: "sync:collection_changed",
+      data: {
+        collections: ["clients"]
+      }
+    };
+    fxAccounts.notifyDevices(ids, message, NOTIFY_TAB_SENT_TTL_SECS);
+  },
+
   _syncFinish() {
     // Record histograms for our device types, and also write them to a pref
     // so non-histogram telemetry (eg, UITelemetry) has easy access to them.
     for (let [deviceType, count] of this.deviceTypes) {
       let hid;
       let prefName = this.name + ".devices.";
       switch (deviceType) {
         case "desktop":
--- a/services/sync/modules/service.js
+++ b/services/sync/modules/service.js
@@ -354,16 +354,17 @@ Sync11Service.prototype = {
 
     if (!this._checkCrypto()) {
       this.enabled = false;
       this._log.info("Could not load the Weave crypto component. Disabling " +
                       "Weave, since it will not work correctly.");
     }
 
     Svc.Obs.add("weave:service:setup-complete", this);
+    Svc.Obs.add("sync:collection_changed", this); // Pulled from FxAccountsCommon
     Svc.Prefs.observe("engine.", this);
 
     this.scheduler = new SyncScheduler(this);
 
     if (!this.enabled) {
       this._log.info("Firefox Sync disabled.");
     }
 
@@ -480,16 +481,23 @@ Sync11Service.prototype = {
 
   QueryInterface: XPCOMUtils.generateQI([Ci.nsIObserver,
                                          Ci.nsISupportsWeakReference]),
 
   // nsIObserver
 
   observe: function observe(subject, topic, data) {
     switch (topic) {
+      // Ideally this observer should be in the SyncScheduler, but it would require
+      // some work to know about the sync specific engines. We should move this there once it does.
+      case "sync:collection_changed":
+        if (data.includes("clients")) {
+          this.sync([]); // [] = clients collection only
+        }
+        break;
       case "weave:service:setup-complete":
         let status = this._checkSetup();
         if (status != STATUS_DISABLED && status != CLIENT_NOT_CONFIGURED)
             Svc.Obs.notify("weave:engine:start-tracking");
         break;
       case "nsPref:changed":
         if (this._ignorePrefObserver)
           return;
--- a/services/sync/tests/unit/test_clients_engine.js
+++ b/services/sync/tests/unit/test_clients_engine.js
@@ -26,20 +26,20 @@ function check_record_version(user, id) 
     rec.collection = "clients";
     rec.ciphertext = payload.ciphertext;
     rec.hmac = payload.hmac;
     rec.IV = payload.IV;
 
     let cleartext = rec.decrypt(Service.collectionKeys.keyForCollection("clients"));
 
     _("Payload is " + JSON.stringify(cleartext));
-    do_check_eq(Services.appinfo.version, cleartext.version);
-    do_check_eq(2, cleartext.protocols.length);
-    do_check_eq("1.1", cleartext.protocols[0]);
-    do_check_eq("1.5", cleartext.protocols[1]);
+    equal(Services.appinfo.version, cleartext.version);
+    equal(2, cleartext.protocols.length);
+    equal("1.1", cleartext.protocols[0]);
+    equal("1.5", cleartext.protocols[1]);
 }
 
 add_test(function test_bad_hmac() {
   _("Ensure that Clients engine deletes corrupt records.");
   let contents = {
     meta: {global: {engines: {clients: {version: engine.version,
                                         syncID: engine.syncID}}}},
     clients: {},
@@ -59,62 +59,62 @@ add_test(function test_bad_hmac() {
   let server = serverForUsers({"foo": "password"}, contents, callback);
   let user   = server.user("foo");
 
   function check_clients_count(expectedCount) {
     let stack = Components.stack.caller;
     let coll  = user.collection("clients");
 
     // Treat a non-existent collection as empty.
-    do_check_eq(expectedCount, coll ? coll.count() : 0, stack);
+    equal(expectedCount, coll ? coll.count() : 0, stack);
   }
 
   function check_client_deleted(id) {
     let coll = user.collection("clients");
     let wbo  = coll.wbo(id);
     return !wbo || !wbo.payload;
   }
 
   function uploadNewKeys() {
     generateNewKeys(Service.collectionKeys);
     let serverKeys = Service.collectionKeys.asWBO("crypto", "keys");
     serverKeys.encrypt(Service.identity.syncKeyBundle);
-    do_check_true(serverKeys.upload(Service.resource(Service.cryptoKeysURL)).success);
+    ok(serverKeys.upload(Service.resource(Service.cryptoKeysURL)).success);
   }
 
   try {
     ensureLegacyIdentityManager();
     let passphrase     = "abcdeabcdeabcdeabcdeabcdea";
     Service.serverURL  = server.baseURI;
     Service.login("foo", "ilovejane", passphrase);
 
     generateNewKeys(Service.collectionKeys);
 
     _("First sync, client record is uploaded");
-    do_check_eq(engine.lastRecordUpload, 0);
+    equal(engine.lastRecordUpload, 0);
     check_clients_count(0);
     engine._sync();
     check_clients_count(1);
-    do_check_true(engine.lastRecordUpload > 0);
+    ok(engine.lastRecordUpload > 0);
 
     // Our uploaded record has a version.
     check_record_version(user, engine.localID);
 
     // Initial setup can wipe the server, so clean up.
     deletedCollections = [];
     deletedItems       = [];
 
     _("Change our keys and our client ID, reupload keys.");
     let oldLocalID  = engine.localID;     // Preserve to test for deletion!
     engine.localID = Utils.makeGUID();
     engine.resetClient();
     generateNewKeys(Service.collectionKeys);
     let serverKeys = Service.collectionKeys.asWBO("crypto", "keys");
     serverKeys.encrypt(Service.identity.syncKeyBundle);
-    do_check_true(serverKeys.upload(Service.resource(Service.cryptoKeysURL)).success);
+    ok(serverKeys.upload(Service.resource(Service.cryptoKeysURL)).success);
 
     _("Sync.");
     engine._sync();
 
     _("Old record " + oldLocalID + " was deleted, new one uploaded.");
     check_clients_count(1);
     check_client_deleted(oldLocalID);
 
@@ -125,18 +125,18 @@ add_test(function test_bad_hmac() {
     engine.resetClient();
     generateNewKeys(Service.collectionKeys);
     deletedCollections = [];
     deletedItems       = [];
     check_clients_count(1);
     engine._sync();
 
     _("Old record was not deleted, new one uploaded.");
-    do_check_eq(deletedCollections.length, 0);
-    do_check_eq(deletedItems.length, 0);
+    equal(deletedCollections.length, 0);
+    equal(deletedItems.length, 0);
     check_clients_count(2);
 
     _("Now try the scenario where our keys are wrong *and* there's a bad record.");
     // Clean up and start fresh.
     user.collection("clients")._wbos = {};
     Service.lastHMACEvent = 0;
     engine.localID = Utils.makeGUID();
     engine.resetClient();
@@ -157,41 +157,41 @@ add_test(function test_bad_hmac() {
     // as the object on the server. We'll download the new keys and also delete
     // the bad client record.
     oldLocalID  = engine.localID;         // Preserve to test for deletion!
     engine.localID = Utils.makeGUID();
     engine.resetClient();
     generateNewKeys(Service.collectionKeys);
     let oldKey = Service.collectionKeys.keyForCollection();
 
-    do_check_eq(deletedCollections.length, 0);
-    do_check_eq(deletedItems.length, 0);
+    equal(deletedCollections.length, 0);
+    equal(deletedItems.length, 0);
     engine._sync();
-    do_check_eq(deletedItems.length, 1);
+    equal(deletedItems.length, 1);
     check_client_deleted(oldLocalID);
     check_clients_count(1);
     let newKey = Service.collectionKeys.keyForCollection();
-    do_check_false(oldKey.equals(newKey));
+    ok(!oldKey.equals(newKey));
 
   } finally {
     Svc.Prefs.resetBranch("");
     Service.recordManager.clearCache();
     server.stop(run_next_test);
   }
 });
 
 add_test(function test_properties() {
   _("Test lastRecordUpload property");
   try {
-    do_check_eq(Svc.Prefs.get("clients.lastRecordUpload"), undefined);
-    do_check_eq(engine.lastRecordUpload, 0);
+    equal(Svc.Prefs.get("clients.lastRecordUpload"), undefined);
+    equal(engine.lastRecordUpload, 0);
 
     let now = Date.now();
     engine.lastRecordUpload = now / 1000;
-    do_check_eq(engine.lastRecordUpload, Math.floor(now / 1000));
+    equal(engine.lastRecordUpload, Math.floor(now / 1000));
   } finally {
     Svc.Prefs.resetBranch("");
     run_next_test();
   }
 });
 
 add_test(function test_full_sync() {
   _("Ensure that Clients engine fetches all records for each sync.");
@@ -283,40 +283,40 @@ add_test(function test_sync() {
 
   function clientWBO() {
     return user.collection("clients").wbo(engine.localID);
   }
 
   try {
 
     _("First sync. Client record is uploaded.");
-    do_check_eq(clientWBO(), undefined);
-    do_check_eq(engine.lastRecordUpload, 0);
+    equal(clientWBO(), undefined);
+    equal(engine.lastRecordUpload, 0);
     engine._sync();
-    do_check_true(!!clientWBO().payload);
-    do_check_true(engine.lastRecordUpload > 0);
+    ok(!!clientWBO().payload);
+    ok(engine.lastRecordUpload > 0);
 
     _("Let's time travel more than a week back, new record should've been uploaded.");
     engine.lastRecordUpload -= MORE_THAN_CLIENTS_TTL_REFRESH;
     let lastweek = engine.lastRecordUpload;
     clientWBO().payload = undefined;
     engine._sync();
-    do_check_true(!!clientWBO().payload);
-    do_check_true(engine.lastRecordUpload > lastweek);
+    ok(!!clientWBO().payload);
+    ok(engine.lastRecordUpload > lastweek);
 
     _("Remove client record.");
     engine.removeClientData();
-    do_check_eq(clientWBO().payload, undefined);
+    equal(clientWBO().payload, undefined);
 
     _("Time travel one day back, no record uploaded.");
     engine.lastRecordUpload -= LESS_THAN_CLIENTS_TTL_REFRESH;
     let yesterday = engine.lastRecordUpload;
     engine._sync();
-    do_check_eq(clientWBO().payload, undefined);
-    do_check_eq(engine.lastRecordUpload, yesterday);
+    equal(clientWBO().payload, undefined);
+    equal(engine.lastRecordUpload, yesterday);
 
   } finally {
     Svc.Prefs.resetBranch("");
     Service.recordManager.clearCache();
     server.stop(run_next_test);
   }
 });
 
@@ -331,26 +331,26 @@ add_test(function test_client_name_chang
   Svc.Obs.notify("weave:engine:start-tracking");
   _("initial name: " + initialName);
 
   // Tracker already has data, so clear it.
   tracker.clearChangedIDs();
 
   let initialScore = tracker.score;
 
-  do_check_eq(Object.keys(tracker.changedIDs).length, 0);
+  equal(Object.keys(tracker.changedIDs).length, 0);
 
   Svc.Prefs.set("client.name", "new name");
 
   _("new name: " + engine.localName);
-  do_check_neq(initialName, engine.localName);
-  do_check_eq(Object.keys(tracker.changedIDs).length, 1);
-  do_check_true(engine.localID in tracker.changedIDs);
-  do_check_true(tracker.score > initialScore);
-  do_check_true(tracker.score >= SCORE_INCREMENT_XLARGE);
+  notEqual(initialName, engine.localName);
+  equal(Object.keys(tracker.changedIDs).length, 1);
+  ok(engine.localID in tracker.changedIDs);
+  ok(tracker.score > initialScore);
+  ok(tracker.score >= SCORE_INCREMENT_XLARGE);
 
   Svc.Obs.notify("weave:engine:stop-tracking");
 
   run_next_test();
 });
 
 add_test(function test_send_command() {
   _("Verifies _sendCommandToClient puts commands in the outbound queue.");
@@ -364,25 +364,25 @@ add_test(function test_send_command() {
   let remoteRecord = store.createRecord(remoteId, "clients");
 
   let action = "testCommand";
   let args = ["foo", "bar"];
 
   engine._sendCommandToClient(action, args, remoteId);
 
   let newRecord = store._remoteClients[remoteId];
-  do_check_neq(newRecord, undefined);
-  do_check_eq(newRecord.commands.length, 1);
+  notEqual(newRecord, undefined);
+  equal(newRecord.commands.length, 1);
 
   let command = newRecord.commands[0];
-  do_check_eq(command.command, action);
-  do_check_eq(command.args.length, 2);
-  do_check_eq(command.args, args);
+  equal(command.command, action);
+  equal(command.args.length, 2);
+  equal(command.args, args);
 
-  do_check_neq(tracker.changedIDs[remoteId], undefined);
+  notEqual(tracker.changedIDs[remoteId], undefined);
 
   run_next_test();
 });
 
 add_test(function test_command_validation() {
   _("Verifies that command validation works properly.");
 
   let store = engine._store;
@@ -406,34 +406,34 @@ add_test(function test_command_validatio
     let rec = new ClientsRec("clients", remoteId);
 
     store.create(rec);
     store.createRecord(remoteId, "clients");
 
     engine.sendCommand(action, args, remoteId);
 
     let newRecord = store._remoteClients[remoteId];
-    do_check_neq(newRecord, undefined);
+    notEqual(newRecord, undefined);
 
     if (expectedResult) {
       _("Ensuring command is sent: " + action);
-      do_check_eq(newRecord.commands.length, 1);
+      equal(newRecord.commands.length, 1);
 
       let command = newRecord.commands[0];
-      do_check_eq(command.command, action);
-      do_check_eq(command.args, args);
+      equal(command.command, action);
+      equal(command.args, args);
 
-      do_check_neq(engine._tracker, undefined);
-      do_check_neq(engine._tracker.changedIDs[remoteId], undefined);
+      notEqual(engine._tracker, undefined);
+      notEqual(engine._tracker.changedIDs[remoteId], undefined);
     } else {
       _("Ensuring command is scrubbed: " + action);
-      do_check_eq(newRecord.commands, undefined);
+      equal(newRecord.commands, undefined);
 
       if (store._tracker) {
-        do_check_eq(engine._tracker[remoteId], undefined);
+        equal(engine._tracker[remoteId], undefined);
       }
     }
 
   }
   run_next_test();
 });
 
 add_test(function test_command_duplication() {
@@ -447,46 +447,46 @@ add_test(function test_command_duplicati
 
   let action = "resetAll";
   let args = [];
 
   engine.sendCommand(action, args, remoteId);
   engine.sendCommand(action, args, remoteId);
 
   let newRecord = store._remoteClients[remoteId];
-  do_check_eq(newRecord.commands.length, 1);
+  equal(newRecord.commands.length, 1);
 
   _("Check variant args length");
   newRecord.commands = [];
 
   action = "resetEngine";
   engine.sendCommand(action, [{ x: "foo" }], remoteId);
   engine.sendCommand(action, [{ x: "bar" }], remoteId);
 
   _("Make sure we spot a real dupe argument.");
   engine.sendCommand(action, [{ x: "bar" }], remoteId);
 
-  do_check_eq(newRecord.commands.length, 2);
+  equal(newRecord.commands.length, 2);
 
   run_next_test();
 });
 
 add_test(function test_command_invalid_client() {
   _("Ensures invalid client IDs are caught");
 
   let id = Utils.makeGUID();
   let error;
 
   try {
     engine.sendCommand("wipeAll", [], id);
   } catch (ex) {
     error = ex;
   }
 
-  do_check_eq(error.message.indexOf("Unknown remote client ID: "), 0);
+  equal(error.message.indexOf("Unknown remote client ID: "), 0);
 
   run_next_test();
 });
 
 add_test(function test_process_incoming_commands() {
   _("Ensures local commands are executed");
 
   engine.localCommands = [{ command: "logout", args: [] }];
@@ -501,17 +501,17 @@ add_test(function test_process_incoming_
     engine._resetClient();
 
     run_next_test();
   };
 
   Svc.Obs.add(ev, handler);
 
   // logout command causes processIncomingCommands to return explicit false.
-  do_check_false(engine.processIncomingCommands());
+  ok(!engine.processIncomingCommands());
 });
 
 add_test(function test_filter_duplicate_names() {
   _("Ensure that we exclude clients with identical names that haven't synced in a week.");
 
   let now = Date.now() / 1000;
   let contents = {
     meta: {global: {engines: {clients: {version: engine.version,
@@ -698,41 +698,41 @@ add_test(function test_command_sync() {
   }), Date.now() / 1000));
 
   try {
     _("Syncing.");
     engine._sync();
 
     _("Checking remote record was downloaded.");
     let clientRecord = engine._store._remoteClients[remoteId];
-    do_check_neq(clientRecord, undefined);
-    do_check_eq(clientRecord.commands.length, 0);
+    notEqual(clientRecord, undefined);
+    equal(clientRecord.commands.length, 0);
 
     _("Send a command to the remote client.");
     engine.sendCommand("wipeAll", []);
-    do_check_eq(clientRecord.commands.length, 1);
+    equal(clientRecord.commands.length, 1);
     engine._sync();
 
     _("Checking record was uploaded.");
-    do_check_neq(clientWBO(engine.localID).payload, undefined);
-    do_check_true(engine.lastRecordUpload > 0);
+    notEqual(clientWBO(engine.localID).payload, undefined);
+    ok(engine.lastRecordUpload > 0);
 
-    do_check_neq(clientWBO(remoteId).payload, undefined);
+    notEqual(clientWBO(remoteId).payload, undefined);
 
     Svc.Prefs.set("client.GUID", remoteId);
     engine._resetClient();
-    do_check_eq(engine.localID, remoteId);
+    equal(engine.localID, remoteId);
     _("Performing sync on resetted client.");
     engine._sync();
-    do_check_neq(engine.localCommands, undefined);
-    do_check_eq(engine.localCommands.length, 1);
+    notEqual(engine.localCommands, undefined);
+    equal(engine.localCommands.length, 1);
 
     let command = engine.localCommands[0];
-    do_check_eq(command.command, "wipeAll");
-    do_check_eq(command.args.length, 0);
+    equal(command.command, "wipeAll");
+    equal(command.args.length, 0);
 
   } finally {
     Svc.Prefs.resetBranch("");
     Service.recordManager.clearCache();
 
     try {
       let collection = server.getCollection("foo", "clients");
       collection.remove(remoteId);
@@ -758,40 +758,40 @@ add_test(function test_send_uri_to_clien
   let initialScore = tracker.score;
 
   let uri = "http://www.mozilla.org/";
   let title = "Title of the Page";
   engine.sendURIToClientForDisplay(uri, remoteId, title);
 
   let newRecord = store._remoteClients[remoteId];
 
-  do_check_neq(newRecord, undefined);
-  do_check_eq(newRecord.commands.length, 1);
+  notEqual(newRecord, undefined);
+  equal(newRecord.commands.length, 1);
 
   let command = newRecord.commands[0];
-  do_check_eq(command.command, "displayURI");
-  do_check_eq(command.args.length, 3);
-  do_check_eq(command.args[0], uri);
-  do_check_eq(command.args[1], engine.localID);
-  do_check_eq(command.args[2], title);
+  equal(command.command, "displayURI");
+  equal(command.args.length, 3);
+  equal(command.args[0], uri);
+  equal(command.args[1], engine.localID);
+  equal(command.args[2], title);
 
-  do_check_true(tracker.score > initialScore);
-  do_check_true(tracker.score - initialScore >= SCORE_INCREMENT_XLARGE);
+  ok(tracker.score > initialScore);
+  ok(tracker.score - initialScore >= SCORE_INCREMENT_XLARGE);
 
   _("Ensure unknown client IDs result in exception.");
   let unknownId = Utils.makeGUID();
   let error;
 
   try {
     engine.sendURIToClientForDisplay(uri, unknownId);
   } catch (ex) {
     error = ex;
   }
 
-  do_check_eq(error.message.indexOf("Unknown remote client ID: "), 0);
+  equal(error.message.indexOf("Unknown remote client ID: "), 0);
 
   Svc.Prefs.resetBranch("");
   Service.recordManager.clearCache();
   engine._resetClient();
 
   run_next_test();
 });
 
@@ -814,52 +814,52 @@ add_test(function test_receive_display_u
 
   // Received 'displayURI' command should result in the topic defined below
   // being called.
   let ev = "weave:engine:clients:display-uris";
 
   let handler = function(subject, data) {
     Svc.Obs.remove(ev, handler);
 
-    do_check_eq(subject[0].uri, uri);
-    do_check_eq(subject[0].clientId, remoteId);
-    do_check_eq(subject[0].title, title);
-    do_check_eq(data, null);
+    equal(subject[0].uri, uri);
+    equal(subject[0].clientId, remoteId);
+    equal(subject[0].title, title);
+    equal(data, null);
 
     run_next_test();
   };
 
   Svc.Obs.add(ev, handler);
 
-  do_check_true(engine.processIncomingCommands());
+  ok(engine.processIncomingCommands());
 
   Svc.Prefs.resetBranch("");
   Service.recordManager.clearCache();
   engine._resetClient();
 });
 
 add_test(function test_optional_client_fields() {
   _("Ensure that we produce records with the fields added in Bug 1097222.");
 
   const SUPPORTED_PROTOCOL_VERSIONS = ["1.1", "1.5"];
   let local = engine._store.createRecord(engine.localID, "clients");
-  do_check_eq(local.name, engine.localName);
-  do_check_eq(local.type, engine.localType);
-  do_check_eq(local.version, Services.appinfo.version);
-  do_check_array_eq(local.protocols, SUPPORTED_PROTOCOL_VERSIONS);
+  equal(local.name, engine.localName);
+  equal(local.type, engine.localType);
+  equal(local.version, Services.appinfo.version);
+  deepEqual(local.protocols, SUPPORTED_PROTOCOL_VERSIONS);
 
   // Optional fields.
   // Make sure they're what they ought to be...
-  do_check_eq(local.os, Services.appinfo.OS);
-  do_check_eq(local.appPackage, Services.appinfo.ID);
+  equal(local.os, Services.appinfo.OS);
+  equal(local.appPackage, Services.appinfo.ID);
 
   // ... and also that they're non-empty.
-  do_check_true(!!local.os);
-  do_check_true(!!local.appPackage);
-  do_check_true(!!local.application);
+  ok(!!local.os);
+  ok(!!local.appPackage);
+  ok(!!local.application);
 
   // We don't currently populate device or formfactor.
   // See Bug 1100722, Bug 1100723.
 
   engine._resetClient();
   run_next_test();
 });
 
@@ -1065,13 +1065,85 @@ add_test(function test_send_uri_ack() {
     try {
       server.deleteCollections("foo");
     } finally {
       server.stop(run_next_test);
     }
   }
 });
 
+add_test(function test_command_sync() {
+  _("Notify other clients when writing their record.");
+
+  engine._store.wipe();
+  generateNewKeys(Service.collectionKeys);
+
+  let contents = {
+    meta: {global: {engines: {clients: {version: engine.version,
+                                        syncID: engine.syncID}}}},
+    clients: {},
+    crypto: {}
+  };
+  let server    = serverForUsers({"foo": "password"}, contents);
+  new SyncTestingInfrastructure(server.server);
+
+  let user       = server.user("foo");
+  let collection = server.getCollection("foo", "clients");
+  let remoteId   = Utils.makeGUID();
+  let remoteId2  = Utils.makeGUID();
+
+  function clientWBO(id) {
+    return user.collection("clients").wbo(id);
+  }
+
+  _("Create remote client record 1");
+  server.insertWBO("foo", "clients", new ServerWBO(remoteId, encryptPayload({
+    id: remoteId,
+    name: "Remote client",
+    type: "desktop",
+    commands: [],
+    version: "48",
+    protocols: ["1.5"]
+  }), Date.now() / 1000));
+
+  _("Create remote client record 2");
+  server.insertWBO("foo", "clients", new ServerWBO(remoteId2, encryptPayload({
+    id: remoteId2,
+    name: "Remote client 2",
+    type: "mobile",
+    commands: [],
+    version: "48",
+    protocols: ["1.5"]
+  }), Date.now() / 1000));
+
+  try {
+    equal(collection.count(), 2, "2 remote records written");
+    engine._sync();
+    equal(collection.count(), 3, "3 remote records written (+1 for the synced local record)");
+
+    let notifiedIds;
+    engine.sendCommand("wipeAll", []);
+    engine._tracker.addChangedID(engine.localID);
+    engine.getClientFxaDeviceId = (id) => "fxa-" + id;
+    engine._notifyCollectionChanged = (ids) => (notifiedIds = ids);
+    _("Syncing.");
+    engine._sync();
+    deepEqual(notifiedIds, ["fxa-fake-guid-00","fxa-fake-guid-01"]);
+    ok(!notifiedIds.includes(engine.getClientFxaDeviceId(engine.localID)),
+      "We never notify the local device");
+
+  } finally {
+    Svc.Prefs.resetBranch("");
+    Service.recordManager.clearCache();
+
+    try {
+      server.deleteCollections("foo");
+    } finally {
+      server.stop(run_next_test);
+    }
+  }
+});
+
 function run_test() {
   initTestLogging("Trace");
   Log.repository.getLogger("Sync.Engine.Clients").level = Log.Level.Trace;
   run_next_test();
 }