Bug 1392409 - set engine.lastModified to avoid intermittent orange. r?tcsc draft
authorMark Hammond <mhammond@skippinet.com.au>
Wed, 23 Aug 2017 13:59:16 +1000
changeset 653990 b45f7d6b28f1b8002b117b560e09d9f75d27a7fb
parent 653766 d10c97627b51a226e19d0fa801201897fe1932f6
child 728462 5cd96af44c1ec33aa9fcd3d4af5b394d122575a9
push id76462
push userbmo:markh@mozilla.com
push dateMon, 28 Aug 2017 07:54:30 +0000
reviewerstcsc
bugs1392409
milestone57.0a1
Bug 1392409 - set engine.lastModified to avoid intermittent orange. r?tcsc MozReview-Commit-ID: 4kPzk9nSWxw
services/sync/tests/unit/test_clients_engine.js
--- a/services/sync/tests/unit/test_clients_engine.js
+++ b/services/sync/tests/unit/test_clients_engine.js
@@ -43,16 +43,21 @@ function compareCommands(actual, expecte
   let tweakedActual = JSON.parse(JSON.stringify(actual));
   tweakedActual.map(elt => delete elt.flowID);
   deepEqual(tweakedActual, expected, description);
   // each item must have a unique flowID.
   let allIDs = new Set(actual.map(elt => elt.flowID).filter(fid => !!fid));
   equal(allIDs.size, actual.length, "all items have unique IDs");
 }
 
+async function syncClientsEngine(server) {
+  engine.lastModified = server.getCollection("foo", "clients").timestamp;
+  await engine._sync();
+}
+
 add_task(async function setup() {
   engine = Service.clientsEngine;
 });
 
 async function cleanup() {
   Svc.Prefs.resetBranch("");
   engine._tracker.clearChangedIDs();
   await engine._resetClient();
@@ -101,17 +106,17 @@ add_task(async function test_bad_hmac() 
     await configureIdentity({username: "foo"}, server);
     await Service.login();
 
     generateNewKeys(Service.collectionKeys);
 
     _("First sync, client record is uploaded");
     equal(engine.lastRecordUpload, 0);
     check_clients_count(0);
-    await engine._sync();
+    await syncClientsEngine(server);
     check_clients_count(1);
     ok(engine.lastRecordUpload > 0);
 
     // Our uploaded record has a version.
     check_record_version(user, engine.localID);
 
     // Initial setup can wipe the server, so clean up.
     deletedCollections = [];
@@ -122,33 +127,32 @@ add_task(async function test_bad_hmac() 
     engine.localID = Utils.makeGUID();
     await engine.resetClient();
     generateNewKeys(Service.collectionKeys);
     let serverKeys = Service.collectionKeys.asWBO("crypto", "keys");
     serverKeys.encrypt(Service.identity.syncKeyBundle);
     ok((await serverKeys.upload(Service.resource(Service.cryptoKeysURL))).success);
 
     _("Sync.");
-    await engine._sync();
+    await syncClientsEngine(server);
 
     _("Old record " + oldLocalID + " was deleted, new one uploaded.");
     check_clients_count(1);
     check_client_deleted(oldLocalID);
 
     _("Now change our keys but don't upload them. " +
       "That means we get an HMAC error but redownload keys.");
     Service.lastHMACEvent = 0;
     engine.localID = Utils.makeGUID();
     await engine.resetClient();
     generateNewKeys(Service.collectionKeys);
     deletedCollections = [];
     deletedItems       = [];
     check_clients_count(1);
-    engine.lastModified = server.getCollection("foo", "clients").timestamp;
-    await engine._sync();
+    await syncClientsEngine(server);
 
     _("Old record was not deleted, new one uploaded.");
     equal(deletedCollections.length, 0);
     equal(deletedItems.length, 0);
     check_clients_count(2);
 
     _("Now try the scenario where our keys are wrong *and* there's a bad record.");
     // Clean up and start fresh.
@@ -158,35 +162,34 @@ add_task(async function test_bad_hmac() 
     await engine.resetClient();
     deletedCollections = [];
     deletedItems       = [];
     check_clients_count(0);
 
     await uploadNewKeys();
 
     // Sync once to upload a record.
-    engine.lastModified = server.getCollection("foo", "clients").timestamp;
-    await engine._sync();
+    await syncClientsEngine(server);
     check_clients_count(1);
 
     // Generate and upload new keys, so the old client record is wrong.
     await uploadNewKeys();
 
     // Create a new client record and new keys. Now our keys are wrong, as well
     // as the object on the server. We'll download the new keys and also delete
     // the bad client record.
     oldLocalID  = engine.localID;         // Preserve to test for deletion!
     engine.localID = Utils.makeGUID();
     await engine.resetClient();
     generateNewKeys(Service.collectionKeys);
     let oldKey = Service.collectionKeys.keyForCollection();
 
     equal(deletedCollections.length, 0);
     equal(deletedItems.length, 0);
-    await engine._sync();
+    await syncClientsEngine(server);
     equal(deletedItems.length, 1);
     check_client_deleted(oldLocalID);
     check_clients_count(1);
     let newKey = Service.collectionKeys.keyForCollection();
     ok(!oldKey.equals(newKey));
 
   } finally {
     await cleanup();
@@ -238,33 +241,31 @@ add_task(async function test_full_sync()
     protocols: ["1.5"],
   }), now - 10));
 
   try {
     let store = engine._store;
 
     _("First sync. 2 records downloaded; our record uploaded.");
     strictEqual(engine.lastRecordUpload, 0);
-    engine.lastModified = server.getCollection("foo", "clients").timestamp;
-    await engine._sync();
+    await syncClientsEngine(server);
     ok(engine.lastRecordUpload > 0);
     deepEqual(user.collection("clients").keys().sort(),
               [activeID, deletedID, engine.localID].sort(),
               "Our record should be uploaded on first sync");
     let ids = await store.getAllIDs();
     deepEqual(Object.keys(ids).sort(),
               [activeID, deletedID, engine.localID].sort(),
               "Other clients should be downloaded on first sync");
 
     _("Delete a record, then sync again");
     let collection = server.getCollection("foo", "clients");
     collection.remove(deletedID);
     // Simulate a timestamp update in info/collections.
-    engine.lastModified = now;
-    await engine._sync();
+    await syncClientsEngine(server);
 
     _("Record should be updated");
     ids = await store.getAllIDs();
     deepEqual(Object.keys(ids).sort(),
               [activeID, engine.localID].sort(),
               "Deleted client should be removed on next sync");
   } finally {
     await cleanup();
@@ -290,38 +291,36 @@ add_task(async function test_sync() {
     return user.collection("clients").wbo(engine.localID);
   }
 
   try {
 
     _("First sync. Client record is uploaded.");
     equal(clientWBO(), undefined);
     equal(engine.lastRecordUpload, 0);
-    engine.lastModified = server.getCollection("foo", "clients").timestamp;
-    await engine._sync();
+    await syncClientsEngine(server);
     ok(!!clientWBO().payload);
     ok(engine.lastRecordUpload > 0);
 
     _("Let's time travel more than a week back, new record should've been uploaded.");
     engine.lastRecordUpload -= MORE_THAN_CLIENTS_TTL_REFRESH;
     let lastweek = engine.lastRecordUpload;
     clientWBO().payload = undefined;
-    engine.lastModified = server.getCollection("foo", "clients").timestamp;
-    await engine._sync();
+    await syncClientsEngine(server);
     ok(!!clientWBO().payload);
     ok(engine.lastRecordUpload > lastweek);
 
     _("Remove client record.");
     await engine.removeClientData();
     equal(clientWBO().payload, undefined);
 
     _("Time travel one day back, no record uploaded.");
     engine.lastRecordUpload -= LESS_THAN_CLIENTS_TTL_REFRESH;
     let yesterday = engine.lastRecordUpload;
-    await engine._sync();
+    await syncClientsEngine(server);
     equal(clientWBO().payload, undefined);
     equal(engine.lastRecordUpload, yesterday);
 
   } finally {
     await cleanup();
     await promiseStopServer(server);
   }
 });
@@ -377,18 +376,17 @@ add_task(async function test_last_modifi
     version: "48",
     protocols: ["1.5"],
   }), now - 10));
 
   try {
     let collection = user.collection("clients");
 
     _("Sync to download the record");
-    engine.lastModified = server.getCollection("foo", "clients").timestamp;
-    await engine._sync();
+    await syncClientsEngine(server);
 
     equal(engine._store._remoteClients[activeID].serverLastModified, now - 10,
           "last modified in the local record is correctly the server last-modified");
 
     _("Modify the record and re-upload it");
     // set a new name to make sure we really did upload.
     engine._store._remoteClients[activeID].name = "New name";
     engine._modified.set(activeID, 0);
@@ -634,18 +632,17 @@ add_task(async function test_filter_dupl
     protocols: ["1.5"],
   }), now - 604820));
 
   try {
     let store = engine._store;
 
     _("First sync");
     strictEqual(engine.lastRecordUpload, 0);
-    engine.lastModified = server.getCollection("foo", "clients").timestamp;
-    await engine._sync();
+    await syncClientsEngine(server);
     ok(engine.lastRecordUpload > 0);
     deepEqual(user.collection("clients").keys().sort(),
               [recentID, dupeID, oldID, engine.localID].sort(),
               "Our record should be uploaded on first sync");
 
     let ids = await store.getAllIDs();
     deepEqual(Object.keys(ids).sort(),
               [recentID, dupeID, oldID, engine.localID].sort(),
@@ -670,26 +667,26 @@ add_task(async function test_filter_dupl
 
     // Check that a subsequent Sync doesn't report anything as being processed.
     let counts;
     Svc.Obs.add("weave:engine:sync:applied", function observe(subject, data) {
       Svc.Obs.remove("weave:engine:sync:applied", observe);
       counts = subject;
     });
 
-    await engine._sync();
+    await syncClientsEngine(server);
     equal(counts.applied, 0); // We didn't report applying any records.
     equal(counts.reconciled, 4); // We reported reconcilliation for all records
     equal(counts.succeeded, 0);
     equal(counts.failed, 0);
     equal(counts.newFailed, 0);
 
     _("Broadcast logout to all clients");
     await engine.sendCommand("logout", []);
-    await engine._sync();
+    await syncClientsEngine(server);
 
     let collection = server.getCollection("foo", "clients");
     let recentPayload = JSON.parse(JSON.parse(collection.payload(recentID)).ciphertext);
     compareCommands(recentPayload.commands, [{ command: "logout", args: [] }],
                     "Should send commands to the recent client");
 
     let oldPayload = JSON.parse(JSON.parse(collection.payload(oldID)).ciphertext);
     compareCommands(oldPayload.commands, [{ command: "logout", args: [] }],
@@ -705,17 +702,17 @@ add_task(async function test_filter_dupl
       name: engine.localName,
       type: "desktop",
       commands: [],
       version: "48",
       protocols: ["1.5"],
     }), now - 10));
 
     _("Second sync.");
-    await engine._sync();
+    await syncClientsEngine(server);
 
     ids = await store.getAllIDs();
     deepEqual(Object.keys(ids).sort(),
               [recentID, oldID, dupeID, engine.localID].sort(),
               "Stale client synced, so it should no longer be marked as a dupe");
 
     ok(engine.remoteClientExists(dupeID), "Dupe ID should appear as it synced.");
 
@@ -766,40 +763,40 @@ add_task(async function test_command_syn
     type: "desktop",
     commands: [],
     version: "48",
     protocols: ["1.5"],
   }), Date.now() / 1000));
 
   try {
     _("Syncing.");
-    await engine._sync();
+    await syncClientsEngine(server);
 
     _("Checking remote record was downloaded.");
     let clientRecord = engine._store._remoteClients[remoteId];
     notEqual(clientRecord, undefined);
     equal(clientRecord.commands.length, 0);
 
     _("Send a command to the remote client.");
     await engine.sendCommand("wipeAll", []);
     let clientCommands = (await engine._readCommands())[remoteId];
     equal(clientCommands.length, 1);
-    await engine._sync();
+    await syncClientsEngine(server);
 
     _("Checking record was uploaded.");
     notEqual(clientWBO(engine.localID).payload, undefined);
     ok(engine.lastRecordUpload > 0);
 
     notEqual(clientWBO(remoteId).payload, undefined);
 
     Svc.Prefs.set("client.GUID", remoteId);
     engine._resetClient();
     equal(engine.localID, remoteId);
     _("Performing sync on resetted client.");
-    await engine._sync();
+    await syncClientsEngine(server);
     notEqual(engine.localCommands, undefined);
     equal(engine.localCommands.length, 1);
 
     let command = engine.localCommands[0];
     equal(command.command, "wipeAll");
     equal(command.args.length, 0);
 
   } finally {
@@ -850,17 +847,17 @@ add_task(async function test_clients_not
   engine.fxAccounts = {
     notifyDevices() { return Promise.resolve(true); },
     getDeviceId() { return fxAccounts.getDeviceId(); },
     getDeviceList() { return Promise.resolve([{ id: remoteId }]); }
   };
 
   try {
     _("Syncing.");
-    await engine._sync();
+    await syncClientsEngine(server);
 
     ok(!engine._store._remoteClients[remoteId].stale);
     ok(engine._store._remoteClients[remoteId2].stale);
 
   } finally {
     engine.fxAccounts = fxAccounts;
     await cleanup();
 
@@ -1027,22 +1024,21 @@ add_task(async function test_merge_comma
     }],
     version: "48",
     protocols: ["1.5"],
   }), now - 10));
 
   try {
     _("First sync. 2 records downloaded.");
     strictEqual(engine.lastRecordUpload, 0);
-    engine.lastModified = server.getCollection("foo", "clients").timestamp;
-    await engine._sync();
+    await syncClientsEngine(server);
 
     _("Broadcast logout to all clients");
     await engine.sendCommand("logout", []);
-    await engine._sync();
+    await syncClientsEngine(server);
 
     let collection = server.getCollection("foo", "clients");
     let desktopPayload = JSON.parse(JSON.parse(collection.payload(desktopID)).ciphertext);
     compareCommands(desktopPayload.commands, [{
       command: "displayURI",
       args: ["https://example.com", engine.localID, "Yak Herders Anonymous"],
     }, {
       command: "logout",
@@ -1080,37 +1076,35 @@ add_task(async function test_duplicate_r
     commands: [],
     version: "48",
     protocols: ["1.5"],
   }), now - 10));
 
   try {
     _("First sync. 1 record downloaded.");
     strictEqual(engine.lastRecordUpload, 0);
-    engine.lastModified = server.getCollection("foo", "clients").timestamp;
-    await engine._sync();
+    await syncClientsEngine(server);
 
     _("Send tab to client");
     await engine.sendCommand("displayURI", ["https://example.com", engine.localID, "Yak Herders Anonymous"]);
-    await engine._sync();
+    await syncClientsEngine(server);
 
     _("Simulate the desktop client consuming the command and syncing to the server");
     server.insertWBO("foo", "clients", new ServerWBO(desktopID, encryptPayload({
       id: desktopID,
       name: "Desktop client",
       type: "desktop",
       commands: [],
       version: "48",
       protocols: ["1.5"],
     }), now - 10));
 
     _("Send another tab to the desktop client");
     await engine.sendCommand("displayURI", ["https://foobar.com", engine.localID, "Foo bar!"], desktopID);
-    engine.lastModified = server.getCollection("foo", "clients").timestamp;
-    await engine._sync();
+    await syncClientsEngine(server);
 
     let collection = server.getCollection("foo", "clients");
     let desktopPayload = JSON.parse(JSON.parse(collection.payload(desktopID)).ciphertext);
     compareCommands(desktopPayload.commands, [{
       command: "displayURI",
       args: ["https://foobar.com", engine.localID, "Foo bar!"],
     }], "Should only send the second command to the desktop client");
   } finally {
@@ -1154,25 +1148,24 @@ add_task(async function test_upload_afte
     commands: [],
     version: "48",
     protocols: ["1.5"],
   }), now - 10));
 
   try {
     _("First sync. 2 records downloaded.");
     strictEqual(engine.lastRecordUpload, 0);
-    engine.lastModified = server.getCollection("foo", "clients").timestamp;
-    await engine._sync();
+    await syncClientsEngine(server);
 
     _("Send tab to client");
     await engine.sendCommand("displayURI", ["https://example.com", engine.localID, "Yak Herders Anonymous"], deviceBID);
 
     const oldUploadOutgoing = SyncEngine.prototype._uploadOutgoing;
     SyncEngine.prototype._uploadOutgoing = async () => engine._onRecordsWritten([], [deviceBID]);
-    await engine._sync();
+    await syncClientsEngine(server);
 
     let collection = server.getCollection("foo", "clients");
     let deviceBPayload = JSON.parse(JSON.parse(collection.payload(deviceBID)).ciphertext);
     compareCommands(deviceBPayload.commands, [{
       command: "displayURI", args: ["https://deviceclink.com", deviceCID, "Device C link"]
     }], "Should be the same because the upload failed");
 
     _("Simulate the client B consuming the command and syncing to the server");
@@ -1185,17 +1178,17 @@ add_task(async function test_upload_afte
       protocols: ["1.5"],
     }), now - 10));
 
     // Simulate reboot
     SyncEngine.prototype._uploadOutgoing = oldUploadOutgoing;
     engine = Service.clientsEngine = new ClientEngine(Service);
     await engine.initialize();
 
-    await engine._sync();
+    await syncClientsEngine(server);
 
     deviceBPayload = JSON.parse(JSON.parse(collection.payload(deviceBID)).ciphertext);
     compareCommands(deviceBPayload.commands, [{
       command: "displayURI",
       args: ["https://example.com", engine.localID, "Yak Herders Anonymous"],
     }], "Should only had written our outgoing command");
   } finally {
     await cleanup();
@@ -1258,17 +1251,17 @@ add_task(async function test_keep_cleare
     strictEqual(engine.lastRecordUpload, 0);
 
     let collection = server.getCollection("foo", "clients");
     const oldUploadOutgoing = SyncEngine.prototype._uploadOutgoing;
     SyncEngine.prototype._uploadOutgoing = async () => engine._onRecordsWritten([], [deviceBID]);
     let commandsProcessed = 0;
     engine._handleDisplayURIs = (uris) => { commandsProcessed = uris.length };
 
-    await engine._sync();
+    await syncClientsEngine(server);
     await engine.processIncomingCommands(); // Not called by the engine.sync(), gotta call it ourselves
     equal(commandsProcessed, 2, "We processed 2 commands");
 
     let localRemoteRecord = JSON.parse(JSON.parse(collection.payload(engine.localID)).ciphertext);
     compareCommands(localRemoteRecord.commands, [{
       command: "displayURI", args: ["https://deviceblink.com", deviceBID, "Device B link"]
     },
     {
@@ -1301,18 +1294,17 @@ add_task(async function test_keep_cleare
 
     // Simulate reboot
     SyncEngine.prototype._uploadOutgoing = oldUploadOutgoing;
     engine = Service.clientsEngine = new ClientEngine(Service);
     await engine.initialize();
 
     commandsProcessed = 0;
     engine._handleDisplayURIs = (uris) => { commandsProcessed = uris.length };
-    engine.lastModified = server.getCollection("foo", "clients").timestamp;
-    await engine._sync();
+    await syncClientsEngine(server);
     await engine.processIncomingCommands();
     equal(commandsProcessed, 1, "We processed one command (the other were cleared)");
 
     localRemoteRecord = JSON.parse(JSON.parse(collection.payload(deviceBID)).ciphertext);
     deepEqual(localRemoteRecord.commands, [], "Should be empty");
   } finally {
     await cleanup();
 
@@ -1355,27 +1347,25 @@ add_task(async function test_deleted_com
     type: "desktop",
     commands: [],
     version: "48",
     protocols: ["1.5"],
   }), now - 10));
 
   try {
     _("First sync. 2 records downloaded.");
-    engine.lastModified = server.getCollection("foo", "clients").timestamp;
-    await engine._sync();
+    await syncClientsEngine(server);
 
     _("Delete a record on the server.");
     let collection = server.getCollection("foo", "clients");
     collection.remove(deletedID);
 
     _("Broadcast a command to all clients");
     await engine.sendCommand("logout", []);
-    engine.lastModified = server.getCollection("foo", "clients").timestamp;
-    await engine._sync();
+    await syncClientsEngine(server);
 
     deepEqual(collection.keys().sort(), [activeID, engine.localID].sort(),
       "Should not reupload deleted clients");
 
     let activePayload = JSON.parse(JSON.parse(collection.payload(activeID)).ciphertext);
     compareCommands(activePayload.commands, [{ command: "logout", args: [] }],
       "Should send the command to the active client");
   } finally {
@@ -1397,46 +1387,44 @@ add_task(async function test_send_uri_ac
 
   await SyncTestingInfrastructure(server);
   generateNewKeys(Service.collectionKeys);
 
   try {
     let fakeSenderID = Utils.makeGUID();
 
     _("Initial sync for empty clients collection");
-    engine.lastModified = server.getCollection("foo", "clients").timestamp;
-    await engine._sync();
+    await syncClientsEngine(server);
     let collection = server.getCollection("foo", "clients");
     let ourPayload = JSON.parse(JSON.parse(collection.payload(engine.localID)).ciphertext);
     ok(ourPayload, "Should upload our client record");
 
     _("Send a URL to the device on the server");
     ourPayload.commands = [{
       command: "displayURI",
       args: ["https://example.com", fakeSenderID, "Yak Herders Anonymous"],
       flowID: Utils.makeGUID(),
     }];
     server.insertWBO("foo", "clients", new ServerWBO(engine.localID, encryptPayload(ourPayload), now));
 
     _("Sync again");
-    engine.lastModified = server.getCollection("foo", "clients").timestamp;
-    await engine._sync();
+    await syncClientsEngine(server);
     compareCommands(engine.localCommands, [{
       command: "displayURI",
       args: ["https://example.com", fakeSenderID, "Yak Herders Anonymous"],
     }], "Should receive incoming URI");
     ok((await engine.processIncomingCommands()), "Should process incoming commands");
     const clearedCommands = (await engine._readCommands())[engine.localID];
     compareCommands(clearedCommands, [{
       command: "displayURI",
       args: ["https://example.com", fakeSenderID, "Yak Herders Anonymous"],
     }], "Should mark the commands as cleared after processing");
 
     _("Check that the command was removed on the server");
-    await engine._sync();
+    await syncClientsEngine(server);
     ourPayload = JSON.parse(JSON.parse(collection.payload(engine.localID)).ciphertext);
     ok(ourPayload, "Should upload the synced client record");
     deepEqual(ourPayload.commands, [], "Should not reupload cleared commands");
   } finally {
     await cleanup();
 
     try {
       server.deleteCollections("foo");
@@ -1476,28 +1464,27 @@ add_task(async function test_command_syn
     type: "mobile",
     commands: [],
     version: "48",
     protocols: ["1.5"]
   }), Date.now() / 1000));
 
   try {
     equal(collection.count(), 2, "2 remote records written");
-    engine.lastModified = server.getCollection("foo", "clients").timestamp;
-    await engine._sync();
+    await syncClientsEngine(server);
     equal(collection.count(), 3, "3 remote records written (+1 for the synced local record)");
 
     await engine.sendCommand("wipeAll", []);
     engine._tracker.addChangedID(engine.localID);
     const getClientFxaDeviceId = sinon.stub(engine, "getClientFxaDeviceId", (id) => "fxa-" + id);
     const engineMock = sinon.mock(engine);
     let _notifyCollectionChanged = engineMock.expects("_notifyCollectionChanged")
                                              .withArgs(["fxa-" + remoteId, "fxa-" + remoteId2]);
     _("Syncing.");
-    await engine._sync();
+    await syncClientsEngine(server);
     _notifyCollectionChanged.verify();
 
     engineMock.restore();
     getClientFxaDeviceId.restore();
   } finally {
     await cleanup();
     engine._tracker.clearChangedIDs();
 
@@ -1540,58 +1527,57 @@ add_task(async function ensureSameFlowID
       id: remoteId2,
       name: "Remote client 2",
       type: "mobile",
       commands: [],
       version: "48",
       protocols: ["1.5"]
     }), Date.now() / 1000));
 
-    engine.lastModified = server.getCollection("foo", "clients").timestamp;
-    await engine._sync();
+    await syncClientsEngine(server);
     await engine.sendCommand("wipeAll", []);
-    await engine._sync();
+    await syncClientsEngine(server);
     equal(events.length, 2);
     // we don't know what the flowID is, but do know it should be the same.
     equal(events[0].extra.flowID, events[1].extra.flowID);
     // Wipe remote clients to ensure deduping doesn't prevent us from adding the command.
     for (let client of Object.values(engine._store._remoteClients)) {
       client.commands = [];
     }
     // check it's correctly used when we specify a flow ID
     events.length = 0;
     let flowID = Utils.makeGUID();
     await engine.sendCommand("wipeAll", [], null, { flowID });
-    await engine._sync();
+    await syncClientsEngine(server);
     equal(events.length, 2);
     equal(events[0].extra.flowID, flowID);
     equal(events[1].extra.flowID, flowID);
 
     // Wipe remote clients to ensure deduping doesn't prevent us from adding the command.
     for (let client of Object.values(engine._store._remoteClients)) {
       client.commands = [];
     }
 
     // and that it works when something else is in "extra"
     events.length = 0;
     await engine.sendCommand("wipeAll", [], null, { reason: "testing" });
-    await engine._sync();
+    await syncClientsEngine(server);
     equal(events.length, 2);
     equal(events[0].extra.flowID, events[1].extra.flowID);
     equal(events[0].extra.reason, "testing");
     equal(events[1].extra.reason, "testing");
     // Wipe remote clients to ensure deduping doesn't prevent us from adding the command.
     for (let client of Object.values(engine._store._remoteClients)) {
       client.commands = [];
     }
 
     // and when both are specified.
     events.length = 0;
     await engine.sendCommand("wipeAll", [], null, { reason: "testing", flowID });
-    await engine._sync();
+    await syncClientsEngine(server);
     equal(events.length, 2);
     equal(events[0].extra.flowID, flowID);
     equal(events[1].extra.flowID, flowID);
     equal(events[0].extra.reason, "testing");
     equal(events[1].extra.reason, "testing");
     // Wipe remote clients to ensure deduping doesn't prevent us from adding the command.
     for (let client of Object.values(engine._store._remoteClients)) {
       client.commands = [];
@@ -1633,22 +1619,22 @@ add_task(async function test_duplicate_c
       id: remoteId2,
       name: "Remote client 2",
       type: "mobile",
       commands: [],
       version: "48",
       protocols: ["1.5"]
     }), Date.now() / 1000));
 
-    await engine._sync();
+    await syncClientsEngine(server);
     // Make sure deduping works before syncing
     await engine.sendURIToClientForDisplay("https://example.com", remoteId, "Example");
     await engine.sendURIToClientForDisplay("https://example.com", remoteId, "Example");
     equal(events.length, 1);
-    await engine._sync();
+    await syncClientsEngine(server);
     // And after syncing.
     await engine.sendURIToClientForDisplay("https://example.com", remoteId, "Example");
     equal(events.length, 1);
     // Ensure we aren't deduping commands to different clients
     await engine.sendURIToClientForDisplay("https://example.com", remoteId2, "Example");
     equal(events.length, 2);
   } finally {
     Service.recordTelemetryEvent = origRecordTelemetryEvent;
@@ -1675,22 +1661,21 @@ add_task(async function test_other_clien
       calls++;
       return Promise.resolve(true);
     }
   };
 
   try {
     engine.lastRecordUpload = 0;
     _("First sync, should notify other clients");
-    engine.lastModified = server.getCollection("foo", "clients").timestamp;
-    await engine._sync();
+    await syncClientsEngine(server);
     equal(calls, 1);
 
     _("Second sync, should not notify other clients");
-    await engine._sync();
+    await syncClientsEngine(server);
     equal(calls, 1);
   } finally {
     engine.fxAccounts = fxAccounts;
     cleanup();
     await promiseStopServer(server);
   }
 });