Bug 633062 p3 - Remove event loop spinning from the tracker base implementation. r?markh draft
authorEdouard Oger <eoger@fastmail.com>
Thu, 04 Jan 2018 16:03:18 -0500
changeset 716436 48b1ba2f43eb24e881b3f3206c3cada34f3e36d5
parent 716435 915713f4388f56c5067b33eb7d0cac723d276681
child 716437 2a8451b0e1ce639431a22526582f721efb85e7d4
push id94444
push userbmo:eoger@fastmail.com
push dateFri, 05 Jan 2018 20:12:03 +0000
reviewersmarkh
bugs633062
milestone59.0a1
Bug 633062 p3 - Remove event loop spinning from the tracker base implementation. r?markh MozReview-Commit-ID: BNtrRpmgZzq
services/sync/modules/engines.js
services/sync/tests/unit/test_engine.js
services/sync/tests/unit/test_syncengine_sync.js
services/sync/tests/unit/test_telemetry.js
services/sync/tests/unit/test_tracker_addChanged.js
--- a/services/sync/modules/engines.js
+++ b/services/sync/modules/engines.js
@@ -67,16 +67,21 @@ this.Tracker = function Tracker(name, en
   this.ignoreAll = false;
 
   Svc.Obs.add("weave:engine:start-tracking", this);
   Svc.Obs.add("weave:engine:stop-tracking", this);
 
 };
 
 Tracker.prototype = {
+  // Always call this at least once before using any of the other methods here!
+  initialize() {
+    return this._storage.load();
+  },
+
   /*
    * Score can be called as often as desired to decide which engines to sync
    *
    * Valid values for score:
    * -1: Do not sync unless the user specifically requests it (almost disabled)
    * 0: Nothing has changed
    * 100: Please sync me ASAP!
    *
@@ -92,17 +97,16 @@ Tracker.prototype = {
   },
 
   // Ensure the Weave storage directory exists before writing the file.
   _beforeSave() {
     return ensureDirectory(this._storage.path);
   },
 
   get changedIDs() {
-    Async.promiseSpinningly(this._storage.load());
     return this._storage.data;
   },
 
   set score(value) {
     this._score = value;
     Observers.notify("weave:engine:score:updated", this.name);
   },
 
@@ -707,16 +711,20 @@ Engine.prototype = {
   },
 
   get _tracker() {
     let tracker = new this._trackerObj(this.Name, this);
     this.__defineGetter__("_tracker", () => tracker);
     return tracker;
   },
 
+  async initialize() {
+    await this._tracker.initialize();
+  },
+
   async sync() {
     if (!this.enabled) {
       return false;
     }
 
     if (!this._sync) {
       throw new Error("engine does not implement _sync method");
     }
@@ -863,16 +871,17 @@ SyncEngine.prototype = {
 
   // How many records to pull at one time when specifying IDs. This is to avoid
   // URI length limitations.
   guidFetchBatchSize: DEFAULT_GUID_FETCH_BATCH_SIZE,
 
   downloadBatchSize: DEFAULT_DOWNLOAD_BATCH_SIZE,
 
   async initialize() {
+    await Engine.prototype.initialize.call(this);
     await this._toFetchStorage.load();
     await this._previousFailedStorage.load();
     this._log.debug("SyncEngine initialized", this.name);
   },
 
   get storageURL() {
     return this.service.storageURL;
   },
--- a/services/sync/tests/unit/test_engine.js
+++ b/services/sync/tests/unit/test_engine.js
@@ -41,16 +41,22 @@ SteamEngine.prototype = {
     this.wasReset = true;
   },
 
   async _sync() {
     this.wasSynced = true;
   }
 };
 
+async function makeSteamEngine() {
+  const engine = new SteamEngine("Steam", Service);
+  await engine.initialize();
+  return engine;
+}
+
 var engineObserver = {
   topics: [],
 
   observe(subject, topic, data) {
     Assert.equal(data, "steam");
     this.topics.push(topic);
   },
 
@@ -71,75 +77,56 @@ async function cleanup(engine) {
   engine.wasSynced = false;
   engineObserver.reset();
   engine._tracker.clearChangedIDs();
   await engine.finalize();
 }
 
 add_task(async function test_members() {
   _("Engine object members");
-  let engine = new SteamEngine("Steam", Service);
+  let engine = await makeSteamEngine();
   Assert.equal(engine.Name, "Steam");
   Assert.equal(engine.prefName, "steam");
   Assert.ok(engine._store instanceof SteamStore);
   Assert.ok(engine._tracker instanceof SteamTracker);
 });
 
 add_task(async function test_score() {
   _("Engine.score corresponds to tracker.score and is readonly");
-  let engine = new SteamEngine("Steam", Service);
+  let engine = await makeSteamEngine();
   Assert.equal(engine.score, 0);
   engine._tracker.score += 5;
   Assert.equal(engine.score, 5);
 
   try {
     engine.score = 10;
   } catch (ex) {
     // Setting an attribute that has a getter produces an error in
     // Firefox <= 3.6 and is ignored in later versions.  Either way,
     // the attribute's value won't change.
   }
   Assert.equal(engine.score, 5);
 });
 
 add_task(async function test_resetClient() {
   _("Engine.resetClient calls _resetClient");
-  let engine = new SteamEngine("Steam", Service);
+  let engine = await makeSteamEngine();
   Assert.ok(!engine.wasReset);
 
   await engine.resetClient();
   Assert.ok(engine.wasReset);
   Assert.equal(engineObserver.topics[0], "weave:engine:reset-client:start");
   Assert.equal(engineObserver.topics[1], "weave:engine:reset-client:finish");
 
   await cleanup(engine);
 });
 
-add_task(async function test_invalidChangedIDs() {
-  _("Test that invalid changed IDs on disk don't end up live.");
-  let engine = new SteamEngine("Steam", Service);
-  let tracker = engine._tracker;
-
-  await tracker._beforeSave();
-  await OS.File.writeAtomic(tracker._storage.path, new TextEncoder().encode("5"),
-                            { tmpPath: tracker._storage.path + ".tmp" });
-
-  ok(!tracker._storage.dataReady);
-  tracker.changedIDs.placeholder = true;
-  deepEqual(tracker.changedIDs, { placeholder: true },
-    "Accessing changed IDs should load changes from disk as a side effect");
-  ok(tracker._storage.dataReady);
-
-  Assert.ok(tracker.changedIDs.placeholder);
-  await cleanup(engine);
-});
-
 add_task(async function test_wipeClient() {
   _("Engine.wipeClient calls resetClient, wipes store, clears changed IDs");
-  let engine = new SteamEngine("Steam", Service);
+  let engine = await makeSteamEngine();
   Assert.ok(!engine.wasReset);
   Assert.ok(!engine._store.wasWiped);
   Assert.ok(engine._tracker.addChangedID("a-changed-id"));
   Assert.ok("a-changed-id" in engine._tracker.changedIDs);
 
   await engine.wipeClient();
   Assert.ok(engine.wasReset);
   Assert.ok(engine._store.wasWiped);
@@ -149,31 +136,31 @@ add_task(async function test_wipeClient(
   Assert.equal(engineObserver.topics[2], "weave:engine:reset-client:finish");
   Assert.equal(engineObserver.topics[3], "weave:engine:wipe-client:finish");
 
   await cleanup(engine);
 });
 
 add_task(async function test_enabled() {
   _("Engine.enabled corresponds to preference");
-  let engine = new SteamEngine("Steam", Service);
+  let engine = await makeSteamEngine();
   try {
     Assert.ok(!engine.enabled);
     Svc.Prefs.set("engine.steam", true);
     Assert.ok(engine.enabled);
 
     engine.enabled = false;
     Assert.ok(!Svc.Prefs.get("engine.steam"));
   } finally {
     await cleanup(engine);
   }
 });
 
 add_task(async function test_sync() {
-  let engine = new SteamEngine("Steam", Service);
+  let engine = await makeSteamEngine();
   try {
     _("Engine.sync doesn't call _sync if it's not enabled");
     Assert.ok(!engine.enabled);
     Assert.ok(!engine.wasSynced);
     await engine.sync();
 
     Assert.ok(!engine.wasSynced);
 
@@ -186,17 +173,17 @@ add_task(async function test_sync() {
     Assert.equal(engineObserver.topics[1], "weave:engine:sync:finish");
   } finally {
     await cleanup(engine);
   }
 });
 
 add_task(async function test_disabled_no_track() {
   _("When an engine is disabled, its tracker is not tracking.");
-  let engine = new SteamEngine("Steam", Service);
+  let engine = await makeSteamEngine();
   let tracker = engine._tracker;
   Assert.equal(engine, tracker.engine);
 
   Assert.ok(!engine.enabled);
   Assert.ok(!tracker._isTracking);
   do_check_empty(tracker.changedIDs);
 
   Assert.ok(!tracker.engineIsEnabled());
--- a/services/sync/tests/unit/test_syncengine_sync.js
+++ b/services/sync/tests/unit/test_syncengine_sync.js
@@ -6,18 +6,20 @@ Cu.import("resource://services-sync/engi
 Cu.import("resource://services-sync/main.js");
 Cu.import("resource://services-sync/policies.js");
 Cu.import("resource://services-sync/record.js");
 Cu.import("resource://services-sync/resource.js");
 Cu.import("resource://services-sync/service.js");
 Cu.import("resource://services-sync/util.js");
 Cu.import("resource://testing-common/services/sync/rotaryengine.js");
 
-function makeRotaryEngine() {
-  return new RotaryEngine(Service);
+async function makeRotaryEngine() {
+  const engine = new RotaryEngine(Service);
+  await engine.initialize();
+  return engine;
 }
 
 async function clean(engine) {
   Svc.Prefs.resetBranch("");
   Svc.Prefs.set("log.logger.engine.rotary", "Trace");
   Service.recordManager.clearCache();
   engine._tracker.clearChangedIDs();
   await engine.finalize();
@@ -29,17 +31,17 @@ async function cleanAndGo(engine, server
 }
 
 async function promiseClean(engine, server) {
   await clean(engine);
   await promiseStopServer(server);
 }
 
 async function createServerAndConfigureClient() {
-  let engine = new RotaryEngine(Service);
+  let engine = await makeRotaryEngine();
 
   let contents = {
     meta: {global: {engines: {rotary: {version: engine.version,
                                        syncID:  engine.syncID}}}},
     crypto: {},
     rotary: {}
   };
 
@@ -88,17 +90,17 @@ add_task(async function test_syncStartup
                                     denomination: "Flying Scotsman"}));
 
   let server = sync_httpd_setup({
       "/1.1/foo/storage/rotary": collection.handler()
   });
 
   await SyncTestingInfrastructure(server);
 
-  let engine = makeRotaryEngine();
+  let engine = await makeRotaryEngine();
   engine._store.items = {rekolok: "Rekonstruktionslokomotive"};
   try {
 
     // Confirm initial environment
     Assert.equal(engine._tracker.changedIDs.rekolok, undefined);
     let metaGlobal = await Service.recordManager.get(engine.metaURL);
     Assert.equal(metaGlobal.payload.engines, undefined);
     Assert.ok(!!collection.payload("flying"));
@@ -131,17 +133,17 @@ add_task(async function test_syncStartup
 
   let global = new ServerWBO("global", {engines: {rotary: {version: 23456}}});
   let server = httpd_setup({
       "/1.1/foo/storage/meta/global": global.handler()
   });
 
   await SyncTestingInfrastructure(server);
 
-  let engine = makeRotaryEngine();
+  let engine = await makeRotaryEngine();
   try {
 
     // The server has a newer version of the data and our engine can
     // handle.  That should give us an exception.
     let error;
     try {
       await engine._syncStartup();
     } catch (ex) {
@@ -158,17 +160,17 @@ add_task(async function test_syncStartup
 add_task(async function test_syncStartup_syncIDMismatchResetsClient() {
   _("SyncEngine._syncStartup resets sync if syncIDs don't match");
 
   let server = sync_httpd_setup({});
 
   await SyncTestingInfrastructure(server);
 
   // global record with a different syncID than our engine has
-  let engine = makeRotaryEngine();
+  let engine = await makeRotaryEngine();
   let global = new ServerWBO("global",
                              {engines: {rotary: {version: engine.version,
                                                 syncID: "foobar"}}});
   server.registerPathHandler("/1.1/foo/storage/meta/global", global.handler());
 
   try {
 
     // Confirm initial environment
@@ -196,17 +198,17 @@ add_task(async function test_processInco
 
   let collection = new ServerCollection();
   let server = sync_httpd_setup({
       "/1.1/foo/storage/rotary": collection.handler()
   });
 
   await SyncTestingInfrastructure(server);
 
-  let engine = makeRotaryEngine();
+  let engine = await makeRotaryEngine();
   try {
 
     // Merely ensure that this code path is run without any errors
     await engine._processIncoming();
     Assert.equal(engine.lastSync, 0);
 
   } finally {
     await cleanAndGo(engine, server);
@@ -236,17 +238,17 @@ add_task(async function test_processInco
       "/1.1/foo/storage/rotary/flying": collection.wbo("flying").handler(),
       "/1.1/foo/storage/rotary/scotsman": collection.wbo("scotsman").handler()
   });
 
   await SyncTestingInfrastructure(server);
 
   await generateNewKeys(Service.collectionKeys);
 
-  let engine = makeRotaryEngine();
+  let engine = await makeRotaryEngine();
   let meta_global = Service.recordManager.set(engine.metaURL,
                                               new WBORecord(engine.metaURL));
   meta_global.payload.engines = {rotary: {version: engine.version,
                                          syncID: engine.syncID}};
 
   try {
 
     // Confirm initial environment
@@ -318,17 +320,17 @@ add_task(async function test_processInco
                                     deleted: true}));
 
   let server = sync_httpd_setup({
       "/1.1/foo/storage/rotary": collection.handler()
   });
 
   await SyncTestingInfrastructure(server);
 
-  let engine = makeRotaryEngine();
+  let engine = await makeRotaryEngine();
   engine._store.items = {newerserver: "New data, but not as new as server!",
                          olderidentical: "Older but identical",
                          updateclient: "Got data?",
                          original: "Original Entry",
                          long_original: "Long Original Entry",
                          nukeme: "Nuke me!"};
   // Make this record 1 min old, thus older than the one on the server
   engine._tracker.addChangedID("newerserver", Date.now() / 1000 - 60);
@@ -620,17 +622,17 @@ add_task(async function test_processInco
     collection.insertWBO(wbo);
   }
 
   collection.wbo("flying").modified =
     collection.wbo("scotsman").modified = LASTSYNC - 10;
   collection._wbos.rekolok.modified = LASTSYNC + 10;
 
   // Time travel 10 seconds into the future but still download the above WBOs.
-  let engine = makeRotaryEngine();
+  let engine = await makeRotaryEngine();
   engine.lastSync = LASTSYNC;
   engine.toFetch = ["flying", "scotsman"];
   engine.previousFailed = ["failed0", "failed1", "failed2"];
 
   let server = sync_httpd_setup({
       "/1.1/foo/storage/rotary": collection.handler()
   });
 
@@ -665,17 +667,17 @@ add_task(async function test_processInco
 
 
 add_task(async function test_processIncoming_notify_count() {
   _("Ensure that failed records are reported only once.");
 
   const NUMBER_OF_RECORDS = 15;
 
   // Engine that fails every 5 records.
-  let engine = makeRotaryEngine();
+  let engine = await makeRotaryEngine();
   engine._store._applyIncomingBatch = engine._store.applyIncomingBatch;
   engine._store.applyIncomingBatch = async function(records) {
     let sortedRecords = records.sort((a, b) => a.id > b.id ? 1 : -1);
     let recordsToApply = [], recordsToFail = [];
     for (let i = 0; i < sortedRecords.length; i++) {
       (i % 5 === 0 ? recordsToFail : recordsToApply).push(sortedRecords[i]);
     }
     await engine._store._applyIncomingBatch(recordsToApply);
@@ -753,17 +755,17 @@ add_task(async function test_processInco
 
 
 add_task(async function test_processIncoming_previousFailed() {
   _("Ensure that failed records are retried.");
 
   const NUMBER_OF_RECORDS = 14;
 
   // Engine that alternates between failing and applying every 2 records.
-  let engine = makeRotaryEngine();
+  let engine = await makeRotaryEngine();
   engine._store._applyIncomingBatch = engine._store.applyIncomingBatch;
   engine._store.applyIncomingBatch = async function(records) {
     let sortedRecords = records.sort((a, b) => a.id > b.id ? 1 : -1);
     let recordsToApply = [], recordsToFail = [];
     let chunks = Array.from(PlacesSyncUtils.chunkArray(sortedRecords, 2));
     for (let i = 0; i < chunks.length; i++) {
       (i % 2 === 0 ? recordsToFail : recordsToApply).push(...chunks[i]);
     }
@@ -852,17 +854,17 @@ add_task(async function test_processInco
   const BOGUS_RECORDS = ["record-no-" + 42,
                          "record-no-" + 23,
                          "record-no-" + (42 + APPLY_BATCH_SIZE),
                          "record-no-" + (23 + APPLY_BATCH_SIZE),
                          "record-no-" + (42 + APPLY_BATCH_SIZE * 2),
                          "record-no-" + (23 + APPLY_BATCH_SIZE * 2),
                          "record-no-" + (2 + APPLY_BATCH_SIZE * 3),
                          "record-no-" + (1 + APPLY_BATCH_SIZE * 3)];
-  let engine = makeRotaryEngine();
+  let engine = await makeRotaryEngine();
 
   engine.__reconcile = engine._reconcile;
   engine._reconcile = async function _reconcile(record) {
     if (BOGUS_RECORDS.indexOf(record.id) % 2 == 0) {
       throw new Error("I don't like this record! Baaaaaah!");
     }
     return this.__reconcile.apply(this, arguments);
   };
@@ -976,17 +978,17 @@ add_task(async function test_processInco
     if (ciphertext == "Decrypt this!") {
       throw new Error(
           "Derp! Cipher finalized failed. Im ur crypto destroyin ur recordz.");
     }
     return this._decrypt.apply(this, arguments);
   };
 
   // Some broken records also exist locally.
-  let engine = makeRotaryEngine();
+  let engine = await makeRotaryEngine();
   engine.enabled = true;
   engine._store.items = {nojson: "Valid JSON",
                          nodecrypt: "Valid ciphertext"};
 
   let server = sync_httpd_setup({
       "/1.1/foo/storage/rotary": collection.handler()
   });
 
@@ -1044,17 +1046,17 @@ add_task(async function test_uploadOutgo
       "/1.1/foo/storage/rotary": collection.handler(),
       "/1.1/foo/storage/rotary/flying": collection.wbo("flying").handler(),
       "/1.1/foo/storage/rotary/scotsman": collection.wbo("scotsman").handler()
   });
 
   await SyncTestingInfrastructure(server);
   await generateNewKeys(Service.collectionKeys);
 
-  let engine = makeRotaryEngine();
+  let engine = await makeRotaryEngine();
   engine.lastSync = 123; // needs to be non-zero so that tracker is queried
   engine._store.items = {flying: "LNER Class A3 4472",
                          scotsman: "Flying Scotsman"};
   // Mark one of these records as changed
   engine._tracker.addChangedID("scotsman", 0);
 
   let meta_global = Service.recordManager.set(engine.metaURL,
                                               new WBORecord(engine.metaURL));
@@ -1100,17 +1102,17 @@ async function test_uploadOutgoing_max_r
       "/1.1/foo/storage/rotary": collection.handler(),
       "/1.1/foo/storage/rotary/flying": collection.wbo("flying").handler(),
       "/1.1/foo/storage/rotary/scotsman": collection.wbo("scotsman").handler(),
   });
 
   await SyncTestingInfrastructure(server);
   await generateNewKeys(Service.collectionKeys);
 
-  let engine = makeRotaryEngine();
+  let engine = await makeRotaryEngine();
   engine.allowSkippedRecord = allowSkippedRecord;
   engine.lastSync = 1;
   engine._store.items = { flying: "a".repeat(1024 * 1024), scotsman: "abcd" };
 
   engine._tracker.addChangedID("flying", 1000);
   engine._tracker.addChangedID("scotsman", 1000);
 
   let meta_global = Service.recordManager.set(engine.metaURL,
@@ -1174,17 +1176,17 @@ add_task(async function test_uploadOutgo
   collection._wbos.flying = new ServerWBO("flying");
 
   let server = sync_httpd_setup({
       "/1.1/foo/storage/rotary": collection.handler()
   });
 
   await SyncTestingInfrastructure(server);
 
-  let engine = makeRotaryEngine();
+  let engine = await makeRotaryEngine();
   engine.lastSync = 123; // needs to be non-zero so that tracker is queried
   engine._store.items = {flying: "LNER Class A3 4472",
                          scotsman: "Flying Scotsman",
                          peppercorn: "Peppercorn Class"};
   // Mark these records as changed
   const FLYING_CHANGED = 12345;
   const SCOTSMAN_CHANGED = 23456;
   const PEPPERCORN_CHANGED = 34567;
@@ -1233,17 +1235,17 @@ async function createRecordFailTelemetry
   collection._wbos.scotsman = new ServerWBO("scotsman");
 
   let server = sync_httpd_setup({
       "/1.1/foo/storage/rotary": collection.handler()
   });
 
   await SyncTestingInfrastructure(server);
 
-  let engine = makeRotaryEngine();
+  let engine = await makeRotaryEngine();
   engine.allowSkippedRecord = allowSkippedRecord;
   let oldCreateRecord = engine._store.createRecord;
   engine._store.createRecord = async (id, col) => {
     if (id != "flying") {
       throw new Error("oops");
     }
     return oldCreateRecord.call(engine._store, id, col);
   };
@@ -1316,17 +1318,17 @@ add_task(async function test_uploadOutgo
   await createRecordFailTelemetry(false);
 });
 
 add_task(async function test_uploadOutgoing_largeRecords() {
   _("SyncEngine._uploadOutgoing throws on records larger than the max record payload size");
 
   let collection = new ServerCollection();
 
-  let engine = makeRotaryEngine();
+  let engine = await makeRotaryEngine();
   engine.allowSkippedRecord = false;
   engine._store.items["large-item"] = "Y".repeat(Service.getMaxRecordPayloadSize() * 2);
   engine._tracker.addChangedID("large-item", 0);
   collection.insert("large-item");
 
 
   let meta_global = Service.recordManager.set(engine.metaURL,
                                               new WBORecord(engine.metaURL));
@@ -1368,17 +1370,17 @@ add_task(async function test_syncFinish_
       "rekolok", encryptPayload({id: "rekolok",
                                 denomination: "Rekonstruktionslokomotive"}));
 
   let server = httpd_setup({
       "/1.1/foo/storage/rotary": collection.handler()
   });
   await SyncTestingInfrastructure(server);
 
-  let engine = makeRotaryEngine();
+  let engine = await makeRotaryEngine();
   try {
     engine._delete = {ids: ["flying", "rekolok"]};
     await engine._syncFinish();
 
     // The 'flying' and 'rekolok' records were deleted while the
     // 'scotsman' one wasn't.
     Assert.equal(collection.payload("flying"), undefined);
     Assert.ok(!!collection.payload("scotsman"));
@@ -1418,17 +1420,17 @@ add_task(async function test_syncFinish_
   }
 
   let server = httpd_setup({
       "/1.1/foo/storage/rotary": collection.handler()
   });
 
   await SyncTestingInfrastructure(server);
 
-  let engine = makeRotaryEngine();
+  let engine = await makeRotaryEngine();
   try {
 
     // Confirm initial environment
     Assert.equal(noOfUploads, 0);
 
     // Declare what we want to have deleted: all records no. 100 and
     // up and all records that are less than 200 mins old (which are
     // records 0 thru 90).
@@ -1472,17 +1474,17 @@ add_task(async function test_sync_partia
   });
   let oldServerConfiguration = Service.serverConfiguration;
   Service.serverConfiguration = {
     max_post_records: 100
   };
   await SyncTestingInfrastructure(server);
   await generateNewKeys(Service.collectionKeys);
 
-  let engine = makeRotaryEngine();
+  let engine = await makeRotaryEngine();
   engine.lastSync = 123; // needs to be non-zero so that tracker is queried
   engine.lastSyncLocal = 456;
 
   // Let the third upload fail completely
   var noOfUploads = 0;
   collection.post = (function(orig) {
     return function() {
       if (noOfUploads == 2)
@@ -1552,17 +1554,17 @@ add_task(async function test_canDecrypt_
       "flying", encryptPayload({id: "flying",
                                 denomination: "LNER Class A3 4472"}));
 
   let server = sync_httpd_setup({
       "/1.1/foo/storage/rotary": collection.handler()
   });
 
   await SyncTestingInfrastructure(server);
-  let engine = makeRotaryEngine();
+  let engine = await makeRotaryEngine();
   try {
 
     Assert.equal(false, (await engine.canDecrypt()));
 
   } finally {
     await cleanAndGo(engine, server);
   }
 });
@@ -1577,31 +1579,31 @@ add_task(async function test_canDecrypt_
       "flying", encryptPayload({id: "flying",
                                 denomination: "LNER Class A3 4472"}));
 
   let server = sync_httpd_setup({
       "/1.1/foo/storage/rotary": collection.handler()
   });
 
   await SyncTestingInfrastructure(server);
-  let engine = makeRotaryEngine();
+  let engine = await makeRotaryEngine();
   try {
 
     Assert.ok((await engine.canDecrypt()));
 
   } finally {
     await cleanAndGo(engine, server);
   }
 
 });
 
 add_task(async function test_syncapplied_observer() {
   const NUMBER_OF_RECORDS = 10;
 
-  let engine = makeRotaryEngine();
+  let engine = await makeRotaryEngine();
 
   // Create a batch of server side records.
   let collection = new ServerCollection();
   for (var i = 0; i < NUMBER_OF_RECORDS; i++) {
     let id = "record-no-" + i;
     let payload = encryptPayload({id, denomination: "Record No. " + id});
     collection.insert(id, payload);
   }
--- a/services/sync/tests/unit/test_telemetry.js
+++ b/services/sync/tests/unit/test_telemetry.js
@@ -204,16 +204,17 @@ add_task(async function test_upload_fail
   let server = sync_httpd_setup({
       "/1.1/foo/storage/rotary": collection.handler()
   });
 
   await SyncTestingInfrastructure(server);
   await configureIdentity({ username: "foo" }, server);
 
   let engine = new RotaryEngine(Service);
+  await engine.initialize();
   engine.lastSync = 123; // needs to be non-zero so that tracker is queried
   engine.lastSyncLocal = 456;
   engine._store.items = {
     flying: "LNER Class A3 4472",
     scotsman: "Flying Scotsman",
     peppercorn: "Peppercorn Class"
   };
   const FLYING_CHANGED = 12345;
@@ -256,16 +257,17 @@ add_task(async function test_sync_partia
   let collection = new ServerCollection();
   let server = sync_httpd_setup({
       "/1.1/foo/storage/rotary": collection.handler()
   });
   await SyncTestingInfrastructure(server);
   await generateNewKeys(Service.collectionKeys);
 
   let engine = new RotaryEngine(Service);
+  await engine.initialize();
   engine.lastSync = 123;
   engine.lastSyncLocal = 456;
 
 
   // Create a bunch of records (and server side handlers)
   for (let i = 0; i < 234; i++) {
     let id = "record-no-" + i;
     engine._store.items[id] = "Record No. " + i;
@@ -424,17 +426,17 @@ add_task(async function test_engine_fail
     await cleanAndGo(engine, server);
     await Service.engineManager.unregister(engine);
   }
 });
 
 add_task(async function test_clean_urls() {
   enableValidationPrefs();
 
-  Service.engineManager.register(SteamEngine);
+  await Service.engineManager.register(SteamEngine);
   let engine = Service.engineManager.get("steam");
   engine.enabled = true;
   let server = await serverForFoo(engine);
   await SyncTestingInfrastructure(server);
   engine._errToThrow = new TypeError("http://www.google .com is not a valid URL.");
 
   try {
     _(`test_clean_urls: Steam tracker contents: ${
--- a/services/sync/tests/unit/test_tracker_addChanged.js
+++ b/services/sync/tests/unit/test_tracker_addChanged.js
@@ -2,16 +2,17 @@
    http://creativecommons.org/publicdomain/zero/1.0/ */
 
 Cu.import("resource://services-sync/engines.js");
 Cu.import("resource://services-sync/service.js");
 Cu.import("resource://services-sync/util.js");
 
 add_task(async function test_tracker_basics() {
   let tracker = new Tracker("Tracker", Service);
+  await tracker.initialize();
   tracker.persistChangedIDs = false;
 
   let id = "the_id!";
 
   _("Make sure nothing exists yet..");
   Assert.equal(tracker.changedIDs[id], null);
 
   _("Make sure adding of time 0 works");
@@ -28,16 +29,17 @@ add_task(async function test_tracker_bas
 
   _("Adding without time defaults to current time");
   tracker.addChangedID(id);
   Assert.ok(tracker.changedIDs[id] > 10);
 });
 
 add_task(async function test_tracker_persistence() {
   let tracker = new Tracker("Tracker", Service);
+  await tracker.initialize();
   let id = "abcdef";
 
   tracker.persistChangedIDs = true;
 
   let promiseSave = new Promise((resolve, reject) => {
     let save = tracker._storage._save;
     tracker._storage._save = function() {
       save.call(tracker._storage).then(resolve, reject);