Bug 1366067 - Use `JSONFile.jsm` to persist the backlog and previously failed IDs. r=tcsc draft
authorKit Cambridge <kit@yakshaving.ninja>
Wed, 01 Nov 2017 11:10:47 -0700
changeset 693023 c86692ad68694cd864204ea552896428f51fe8c4
parent 692869 b235bfc81d73b50e54686f5b34816d36d6012e51
child 738918 b1fb2c79bd3e69754eb63579b9d2a917a2772839
push id87674
push userbmo:kit@mozilla.com
push dateFri, 03 Nov 2017 21:09:34 +0000
reviewerstcsc
bugs1366067
milestone58.0a1
Bug 1366067 - Use `JSONFile.jsm` to persist the backlog and previously failed IDs. r=tcsc MozReview-Commit-ID: DAwMrtWI8eh
services/sync/modules/engines.js
services/sync/tests/unit/test_syncengine.js
--- a/services/sync/modules/engines.js
+++ b/services/sync/modules/engines.js
@@ -25,16 +25,21 @@ Cu.import("resource://services-sync/reso
 Cu.import("resource://services-sync/util.js");
 
 XPCOMUtils.defineLazyModuleGetters(this, {
   fxAccounts: "resource://gre/modules/FxAccounts.jsm",
   OS: "resource://gre/modules/osfile.jsm",
   PlacesSyncUtils: "resource://gre/modules/PlacesSyncUtils.jsm",
 });
 
+function ensureDirectory(path) {
+  let basename = OS.Path.dirname(path);
+  return OS.File.makeDir(basename, { from: OS.Constants.Path.profileDir });
+}
+
 /*
  * Trackers are associated with a single engine and deal with
  * listening for changes to their particular data type.
  *
  * There are two things they keep track of:
  * 1) A score, indicating how urgently the engine wants to sync
  * 2) A list of IDs for all the changed items that need to be synced
  * and updating their 'score', indicating how urgently they
@@ -53,18 +58,16 @@ this.Tracker = function Tracker(name, en
   this._log = Log.repository.getLogger("Sync.Tracker." + name);
   let level = Svc.Prefs.get("log.logger.engine." + this.name, "Debug");
   this._log.level = Log.Level[level];
 
   this._score = 0;
   this._ignored = [];
   this._storage = new JSONFile({
     path: Utils.jsonFilePath("changes/" + this.file),
-    // We use arrow functions instead of `.bind(this)` so that tests can
-    // easily override these hooks.
     dataPostProcessor: json => this._dataPostProcessor(json),
     beforeSave: () => this._beforeSave(),
   });
   this.ignoreAll = false;
 
   Svc.Obs.add("weave:engine:start-tracking", this);
   Svc.Obs.add("weave:engine:stop-tracking", this);
 
@@ -88,18 +91,17 @@ Tracker.prototype = {
 
   // Default to an empty object if the file doesn't exist.
   _dataPostProcessor(json) {
     return typeof json == "object" && json || {};
   },
 
   // Ensure the Weave storage directory exists before writing the file.
   _beforeSave() {
-    let basename = OS.Path.dirname(this._storage.path);
-    return OS.File.makeDir(basename, { from: OS.Constants.Path.profileDir });
+    return ensureDirectory(this._storage.path);
   },
 
   get changedIDs() {
     Async.promiseSpinningly(this._storage.load());
     return this._storage.data;
   },
 
   set score(value) {
@@ -760,16 +762,28 @@ Engine.prototype = {
   async finalize() {
     await this._tracker.finalize();
   },
 };
 
 this.SyncEngine = function SyncEngine(name, service) {
   Engine.call(this, name || "SyncEngine", service);
 
+  this._toFetchStorage = new JSONFile({
+    path: Utils.jsonFilePath("toFetch/" + this.name),
+    dataPostProcessor: json => this._metadataPostProcessor(json),
+    beforeSave: () => this._beforeSaveMetadata(),
+  });
+
+  this._previousFailedStorage = new JSONFile({
+    path: Utils.jsonFilePath("failed/" + this.name),
+    dataPostProcessor: json => this._metadataPostProcessor(json),
+    beforeSave: () => this._beforeSaveMetadata(),
+  });
+
   // Async initializations can be made in the initialize() method.
 
   // The map of ids => metadata for records needing a weak upload.
   //
   // Currently the "metadata" fields are:
   //
   // - forceTombstone: whether or not we should ignore the local information
   //   about the record, and write a tombstone for it anyway -- e.g. in the case
@@ -807,16 +821,33 @@ SyncEngine.kRecoveryStrategy = {
 SyncEngine.prototype = {
   __proto__: Engine.prototype,
   _recordObj: CryptoWrapper,
   version: 1,
 
   // Which sortindex to use when retrieving records for this engine.
   _defaultSort: undefined,
 
+  _metadataPostProcessor(json) {
+    if (Array.isArray(json)) {
+      // Pre-`JSONFile` storage stored an array, but `JSONFile` defaults to
+      // an object, so we wrap the array for consistency.
+      return { ids: json };
+    }
+    if (!json.ids) {
+      json.ids = [];
+    }
+    return json;
+  },
+
+  async _beforeSaveMetadata() {
+    await ensureDirectory(this._toFetchStorage.path);
+    await ensureDirectory(this._previousFailedStorage.path);
+  },
+
   // A relative priority to use when computing an order
   // for engines to be synced. Higher-priority engines
   // (lower numbers) are synced first.
   // It is recommended that a unique value be used for each engine,
   // in order to guarantee a stable sequence.
   syncPriority: 0,
 
   // How many records to pull in a single sync. This is primarily to avoid very
@@ -825,18 +856,18 @@ SyncEngine.prototype = {
 
   // How many records to pull at one time when specifying IDs. This is to avoid
   // URI length limitations.
   guidFetchBatchSize: DEFAULT_GUID_FETCH_BATCH_SIZE,
 
   downloadBatchSize: DEFAULT_DOWNLOAD_BATCH_SIZE,
 
   async initialize() {
-    await this.loadToFetch();
-    await this.loadPreviousFailed();
+    await this._toFetchStorage.load();
+    await this._previousFailedStorage.load();
     this._log.debug("SyncEngine initialized", this.name);
   },
 
   get storageURL() {
     return this.service.storageURL;
   },
 
   get engineURL() {
@@ -875,74 +906,32 @@ SyncEngine.prototype = {
   resetLastSync() {
     this._log.debug("Resetting " + this.name + " last sync time");
     Svc.Prefs.reset(this.name + ".lastSync");
     Svc.Prefs.set(this.name + ".lastSync", "0");
     this.lastSyncLocal = 0;
   },
 
   get toFetch() {
-    return this._toFetch;
-  },
-  set toFetch(val) {
-    // Coerce the array to a string for more efficient comparison.
-    if (val + "" == this._toFetch) {
-      return;
-    }
-    this._toFetch = val;
-    CommonUtils.namedTimer(function() {
-      try {
-        Async.promiseSpinningly(Utils.jsonSave("toFetch/" + this.name, this, this._toFetch));
-      } catch (error) {
-        this._log.error("Failed to read JSON records to fetch", error);
-      }
-      // Notify our tests that we finished writing the file.
-      Observers.notify("sync-testing:file-saved:toFetch", null, this.name);
-    }, 0, this, "_toFetchDelay");
+    this._toFetchStorage.ensureDataReady();
+    return this._toFetchStorage.data.ids;
   },
 
-  async loadToFetch() {
-    // Initialize to empty if there's no file.
-    this._toFetch = [];
-    let toFetch = await Utils.jsonLoad("toFetch/" + this.name, this);
-    if (toFetch) {
-      this._toFetch = toFetch;
-    }
+  set toFetch(ids) {
+    this._toFetchStorage.data = { ids };
+    this._toFetchStorage.saveSoon();
   },
 
   get previousFailed() {
-    return this._previousFailed;
+    this._previousFailedStorage.ensureDataReady();
+    return this._previousFailedStorage.data.ids;
   },
-  set previousFailed(val) {
-    // Coerce the array to a string for more efficient comparison.
-    if (val + "" == this._previousFailed) {
-      return;
-    }
-    this._previousFailed = val;
-    CommonUtils.namedTimer(function() {
-      Utils.jsonSave("failed/" + this.name, this, this._previousFailed).then(() => {
-        this._log.debug("Successfully wrote previousFailed.");
-      })
-      .catch((error) => {
-        this._log.error("Failed to set previousFailed", error);
-      })
-      .then(() => {
-        // Notify our tests that we finished writing the file.
-        Observers.notify("sync-testing:file-saved:previousFailed", null, this.name);
-      });
-    }, 0, this, "_previousFailedDelay");
-  },
-
-  async loadPreviousFailed() {
-    // Initialize to empty if there's no file
-    this._previousFailed = [];
-    let previousFailed = await Utils.jsonLoad("failed/" + this.name, this);
-    if (previousFailed) {
-      this._previousFailed = previousFailed;
-    }
+  set previousFailed(ids) {
+    this._previousFailedStorage.data = { ids };
+    this._previousFailedStorage.saveSoon();
   },
 
   /*
    * lastSyncLocal is a timestamp in local time.
    */
   get lastSyncLocal() {
     return parseInt(Svc.Prefs.get(this.name + ".lastSyncLocal", "0"), 10);
   },
@@ -1901,16 +1890,22 @@ SyncEngine.prototype = {
    * items that failed to upload. This method is called at the end of each sync.
    *
    */
   async trackRemainingChanges() {
     for (let [id, change] of this._modified.entries()) {
       this._tracker.addChangedID(id, change);
     }
   },
+
+  async finalize() {
+    await super.finalize();
+    await this._toFetchStorage.finalize();
+    await this._previousFailedStorage.finalize();
+  },
 };
 
 /**
  * A changeset is created for each sync in `Engine::get{Changed, All}IDs`,
  * and stores opaque change data for tracked IDs. The default implementation
  * only records timestamps, though engines can extend this to store additional
  * data for each entry.
  */
--- a/services/sync/tests/unit/test_syncengine.js
+++ b/services/sync/tests/unit/test_syncengine.js
@@ -1,22 +1,48 @@
 /* Any copyright is dedicated to the Public Domain.
    http://creativecommons.org/publicdomain/zero/1.0/ */
 
+Cu.import("resource://gre/modules/osfile.jsm");
 Cu.import("resource://services-sync/engines.js");
 Cu.import("resource://services-sync/service.js");
 Cu.import("resource://services-sync/util.js");
 Cu.import("resource://testing-common/services/sync/utils.js");
 
 async function makeSteamEngine() {
   let engine = new SyncEngine("Steam", Service);
   await engine.initialize();
   return engine;
 }
 
+async function testSteamEngineStorage(test) {
+  try {
+    let setupEngine = await makeSteamEngine();
+
+    if (test.setup) {
+      await test.setup(setupEngine);
+    }
+
+    // Finalize the engine to flush the backlog and previous failed to disk.
+    await setupEngine.finalize();
+
+    if (test.beforeCheck) {
+      await test.beforeCheck();
+    }
+
+    let checkEngine = await makeSteamEngine();
+    await test.check(checkEngine);
+
+    await checkEngine.resetClient();
+    await checkEngine.finalize();
+  } finally {
+    Svc.Prefs.resetBranch("");
+  }
+}
+
 let server;
 
 add_task(async function setup() {
   server = httpd_setup({});
 });
 
 add_task(async function test_url_attributes() {
   _("SyncEngine url attributes");
@@ -79,104 +105,117 @@ add_task(async function test_lastSync() 
     do_check_eq(Svc.Prefs.get("steam.lastSync"), "0");
   } finally {
     Svc.Prefs.resetBranch("");
   }
 });
 
 add_task(async function test_toFetch() {
   _("SyncEngine.toFetch corresponds to file on disk");
-  let syncTesting = await SyncTestingInfrastructure(server);
+  await SyncTestingInfrastructure(server);
   const filename = "weave/toFetch/steam.json";
-  let engine = await makeSteamEngine();
-  try {
-    // Ensure pristine environment
-    do_check_eq(engine.toFetch.length, 0);
+
+  await testSteamEngineStorage({
+    toFetch: [Utils.makeGUID(), Utils.makeGUID(), Utils.makeGUID()],
+    setup(engine) {
+      // Ensure pristine environment
+      do_check_eq(engine.toFetch.length, 0);
 
-    // Write file to disk
-    let toFetch = [Utils.makeGUID(), Utils.makeGUID(), Utils.makeGUID()];
-    let wrotePromise = promiseOneObserver("sync-testing:file-saved:toFetch");
-    engine.toFetch = toFetch;
-    do_check_eq(engine.toFetch, toFetch);
-    // toFetch is written asynchronously
-    await wrotePromise;
-    let fakefile = syncTesting.fakeFilesystem.fakeContents[filename];
-    do_check_eq(fakefile, JSON.stringify(toFetch));
+      // Write file to disk
+      engine.toFetch = this.toFetch;
+      do_check_eq(engine.toFetch, this.toFetch);
+    },
+    check(engine) {
+      // toFetch is written asynchronously
+      do_check_matches(engine.toFetch, this.toFetch);
+    },
+  });
 
-    // Make sure it work for consecutive writes before the callback is executed.
-    toFetch = [Utils.makeGUID(), Utils.makeGUID(), Utils.makeGUID(), Utils.makeGUID()];
-    let toFetch2 = [Utils.makeGUID(), Utils.makeGUID(), Utils.makeGUID(), Utils.makeGUID(), Utils.makeGUID()];
-    wrotePromise = promiseOneObserver("sync-testing:file-saved:toFetch");
-
-    engine.toFetch = toFetch;
-    do_check_eq(engine.toFetch, toFetch);
+  await testSteamEngineStorage({
+    toFetch: [Utils.makeGUID(), Utils.makeGUID(), Utils.makeGUID(), Utils.makeGUID()],
+    toFetch2: [Utils.makeGUID(), Utils.makeGUID(), Utils.makeGUID(), Utils.makeGUID(), Utils.makeGUID()],
+    setup(engine) {
+      // Make sure it work for consecutive writes before the callback is executed.
+      engine.toFetch = this.toFetch;
+      do_check_eq(engine.toFetch, this.toFetch);
 
-    engine.toFetch = toFetch2;
-    do_check_eq(engine.toFetch, toFetch2);
-    // Note that do to the way CommonUtils.namedTimer works, we won't get a 2nd callback.
-    await wrotePromise;
-    fakefile = syncTesting.fakeFilesystem.fakeContents[filename];
-    do_check_eq(fakefile, JSON.stringify(toFetch2));
+      engine.toFetch = this.toFetch2;
+      do_check_eq(engine.toFetch, this.toFetch2);
+    },
+    check(engine) {
+      do_check_matches(engine.toFetch, this.toFetch2);
+    },
+  });
 
-    // Read file from disk
-    toFetch = [Utils.makeGUID(), Utils.makeGUID()];
-    syncTesting.fakeFilesystem.fakeContents[filename] = JSON.stringify(toFetch);
-    await engine.loadToFetch();
-    do_check_eq(engine.toFetch.length, 2);
-    do_check_eq(engine.toFetch[0], toFetch[0]);
-    do_check_eq(engine.toFetch[1], toFetch[1]);
-  } finally {
-    Svc.Prefs.resetBranch("");
-  }
+  await testSteamEngineStorage({
+    toFetch: [Utils.makeGUID(), Utils.makeGUID()],
+    async beforeCheck() {
+      let toFetchPath = OS.Path.join(OS.Constants.Path.profileDir, filename);
+      let bytes = new TextEncoder().encode(JSON.stringify(this.toFetch));
+      await OS.File.writeAtomic(toFetchPath, bytes,
+                                { tmpPath: toFetchPath + ".tmp" });
+    },
+    check(engine) {
+      // Read file from disk
+      do_check_matches(engine.toFetch, this.toFetch);
+    },
+  });
 });
 
 add_task(async function test_previousFailed() {
   _("SyncEngine.previousFailed corresponds to file on disk");
-  let syncTesting = await SyncTestingInfrastructure(server);
+  await SyncTestingInfrastructure(server);
   const filename = "weave/failed/steam.json";
-  let engine = await makeSteamEngine();
-  try {
-    // Ensure pristine environment
-    do_check_eq(engine.previousFailed.length, 0);
+
+  await testSteamEngineStorage({
+    previousFailed: [Utils.makeGUID(), Utils.makeGUID(), Utils.makeGUID()],
+    setup(engine) {
+      // Ensure pristine environment
+      do_check_eq(engine.previousFailed.length, 0);
 
-    // Write file to disk
-    let previousFailed = [Utils.makeGUID(), Utils.makeGUID(), Utils.makeGUID()];
-    let wrotePromise = promiseOneObserver("sync-testing:file-saved:previousFailed");
-    engine.previousFailed = previousFailed;
-    do_check_eq(engine.previousFailed, previousFailed);
-    // previousFailed is written asynchronously
-    await wrotePromise;
-    let fakefile = syncTesting.fakeFilesystem.fakeContents[filename];
-    do_check_eq(fakefile, JSON.stringify(previousFailed));
+      // Write file to disk
+      engine.previousFailed = this.previousFailed;
+      do_check_eq(engine.previousFailed, this.previousFailed);
+    },
+    check(engine) {
+      // previousFailed is written asynchronously
+      do_check_matches(engine.previousFailed, this.previousFailed);
+    },
+  });
 
-    // Make sure it work for consecutive writes before the callback is executed.
-    previousFailed = [Utils.makeGUID(), Utils.makeGUID(), Utils.makeGUID(), Utils.makeGUID()];
-    let previousFailed2 = [Utils.makeGUID(), Utils.makeGUID(), Utils.makeGUID(), Utils.makeGUID(), Utils.makeGUID()];
-    wrotePromise = promiseOneObserver("sync-testing:file-saved:previousFailed");
-
-    engine.previousFailed = previousFailed;
-    do_check_eq(engine.previousFailed, previousFailed);
+  await testSteamEngineStorage({
+    previousFailed: [Utils.makeGUID(), Utils.makeGUID(), Utils.makeGUID(), Utils.makeGUID()],
+    previousFailed2: [Utils.makeGUID(), Utils.makeGUID(), Utils.makeGUID(), Utils.makeGUID(), Utils.makeGUID()],
+    setup(engine) {
+      // Make sure it work for consecutive writes before the callback is executed.
+      engine.previousFailed = this.previousFailed;
+      do_check_eq(engine.previousFailed, this.previousFailed);
 
-    engine.previousFailed = previousFailed2;
-    do_check_eq(engine.previousFailed, previousFailed2);
-    // Note that do to the way CommonUtils.namedTimer works, we're only notified once.
-    await wrotePromise;
-    fakefile = syncTesting.fakeFilesystem.fakeContents[filename];
-    do_check_eq(fakefile, JSON.stringify(previousFailed2));
+      engine.previousFailed = this.previousFailed2;
+      do_check_eq(engine.previousFailed, this.previousFailed2);
+    },
+    check(engine) {
+      do_check_matches(engine.previousFailed, this.previousFailed2);
+    },
+  });
 
-    // Read file from disk
-    previousFailed = [Utils.makeGUID(), Utils.makeGUID()];
-    syncTesting.fakeFilesystem.fakeContents[filename] = JSON.stringify(previousFailed);
-    await engine.loadPreviousFailed();
-    do_check_eq(engine.previousFailed.length, 2);
-    do_check_eq(engine.previousFailed[0], previousFailed[0]);
-    do_check_eq(engine.previousFailed[1], previousFailed[1]);
-  } finally {
-    Svc.Prefs.resetBranch("");
-  }
+  await testSteamEngineStorage({
+    previousFailed: [Utils.makeGUID(), Utils.makeGUID()],
+    async beforeCheck() {
+      let previousFailedPath = OS.Path.join(OS.Constants.Path.profileDir,
+                                            filename);
+      let bytes = new TextEncoder().encode(JSON.stringify(this.previousFailed));
+      await OS.File.writeAtomic(previousFailedPath, bytes,
+                                { tmpPath: previousFailedPath + ".tmp" });
+    },
+    check(engine) {
+      // Read file from disk
+      do_check_matches(engine.previousFailed, this.previousFailed);
+    },
+  });
 });
 
 add_task(async function test_resetClient() {
   _("SyncEngine.resetClient resets lastSync and toFetch");
   await SyncTestingInfrastructure(server);
   let engine = await makeSteamEngine();
   try {
     // Ensure pristine environment