Bug 1366067 - Use `JSONFile.jsm` to persist the backlog and previously failed IDs. r=tcsc
MozReview-Commit-ID: DAwMrtWI8eh
--- a/services/sync/modules/engines.js
+++ b/services/sync/modules/engines.js
@@ -25,16 +25,21 @@ Cu.import("resource://services-sync/reso
Cu.import("resource://services-sync/util.js");
XPCOMUtils.defineLazyModuleGetters(this, {
fxAccounts: "resource://gre/modules/FxAccounts.jsm",
OS: "resource://gre/modules/osfile.jsm",
PlacesSyncUtils: "resource://gre/modules/PlacesSyncUtils.jsm",
});
+function ensureDirectory(path) {
+ let basename = OS.Path.dirname(path);
+ return OS.File.makeDir(basename, { from: OS.Constants.Path.profileDir });
+}
+
/*
* Trackers are associated with a single engine and deal with
* listening for changes to their particular data type.
*
* There are two things they keep track of:
* 1) A score, indicating how urgently the engine wants to sync
* 2) A list of IDs for all the changed items that need to be synced
* and updating their 'score', indicating how urgently they
@@ -53,18 +58,16 @@ this.Tracker = function Tracker(name, en
this._log = Log.repository.getLogger("Sync.Tracker." + name);
let level = Svc.Prefs.get("log.logger.engine." + this.name, "Debug");
this._log.level = Log.Level[level];
this._score = 0;
this._ignored = [];
this._storage = new JSONFile({
path: Utils.jsonFilePath("changes/" + this.file),
- // We use arrow functions instead of `.bind(this)` so that tests can
- // easily override these hooks.
dataPostProcessor: json => this._dataPostProcessor(json),
beforeSave: () => this._beforeSave(),
});
this.ignoreAll = false;
Svc.Obs.add("weave:engine:start-tracking", this);
Svc.Obs.add("weave:engine:stop-tracking", this);
@@ -88,18 +91,17 @@ Tracker.prototype = {
// Default to an empty object if the file doesn't exist.
_dataPostProcessor(json) {
return typeof json == "object" && json || {};
},
// Ensure the Weave storage directory exists before writing the file.
_beforeSave() {
- let basename = OS.Path.dirname(this._storage.path);
- return OS.File.makeDir(basename, { from: OS.Constants.Path.profileDir });
+ return ensureDirectory(this._storage.path);
},
get changedIDs() {
Async.promiseSpinningly(this._storage.load());
return this._storage.data;
},
set score(value) {
@@ -760,16 +762,28 @@ Engine.prototype = {
async finalize() {
await this._tracker.finalize();
},
};
this.SyncEngine = function SyncEngine(name, service) {
Engine.call(this, name || "SyncEngine", service);
+ this._toFetchStorage = new JSONFile({
+ path: Utils.jsonFilePath("toFetch/" + this.name),
+ dataPostProcessor: json => this._metadataPostProcessor(json),
+ beforeSave: () => this._beforeSaveMetadata(),
+ });
+
+ this._previousFailedStorage = new JSONFile({
+ path: Utils.jsonFilePath("failed/" + this.name),
+ dataPostProcessor: json => this._metadataPostProcessor(json),
+ beforeSave: () => this._beforeSaveMetadata(),
+ });
+
// Async initializations can be made in the initialize() method.
// The map of ids => metadata for records needing a weak upload.
//
// Currently the "metadata" fields are:
//
// - forceTombstone: whether or not we should ignore the local information
// about the record, and write a tombstone for it anyway -- e.g. in the case
@@ -807,16 +821,33 @@ SyncEngine.kRecoveryStrategy = {
SyncEngine.prototype = {
__proto__: Engine.prototype,
_recordObj: CryptoWrapper,
version: 1,
// Which sortindex to use when retrieving records for this engine.
_defaultSort: undefined,
+ _metadataPostProcessor(json) {
+ if (Array.isArray(json)) {
+ // Pre-`JSONFile` storage stored an array, but `JSONFile` defaults to
+ // an object, so we wrap the array for consistency.
+ return { ids: json };
+ }
+ if (!json.ids) {
+ json.ids = [];
+ }
+ return json;
+ },
+
+ async _beforeSaveMetadata() {
+ await ensureDirectory(this._toFetchStorage.path);
+ await ensureDirectory(this._previousFailedStorage.path);
+ },
+
// A relative priority to use when computing an order
// for engines to be synced. Higher-priority engines
// (lower numbers) are synced first.
// It is recommended that a unique value be used for each engine,
// in order to guarantee a stable sequence.
syncPriority: 0,
// How many records to pull in a single sync. This is primarily to avoid very
@@ -825,18 +856,18 @@ SyncEngine.prototype = {
// How many records to pull at one time when specifying IDs. This is to avoid
// URI length limitations.
guidFetchBatchSize: DEFAULT_GUID_FETCH_BATCH_SIZE,
downloadBatchSize: DEFAULT_DOWNLOAD_BATCH_SIZE,
async initialize() {
- await this.loadToFetch();
- await this.loadPreviousFailed();
+ await this._toFetchStorage.load();
+ await this._previousFailedStorage.load();
this._log.debug("SyncEngine initialized", this.name);
},
get storageURL() {
return this.service.storageURL;
},
get engineURL() {
@@ -875,74 +906,32 @@ SyncEngine.prototype = {
resetLastSync() {
this._log.debug("Resetting " + this.name + " last sync time");
Svc.Prefs.reset(this.name + ".lastSync");
Svc.Prefs.set(this.name + ".lastSync", "0");
this.lastSyncLocal = 0;
},
get toFetch() {
- return this._toFetch;
- },
- set toFetch(val) {
- // Coerce the array to a string for more efficient comparison.
- if (val + "" == this._toFetch) {
- return;
- }
- this._toFetch = val;
- CommonUtils.namedTimer(function() {
- try {
- Async.promiseSpinningly(Utils.jsonSave("toFetch/" + this.name, this, this._toFetch));
- } catch (error) {
- this._log.error("Failed to read JSON records to fetch", error);
- }
- // Notify our tests that we finished writing the file.
- Observers.notify("sync-testing:file-saved:toFetch", null, this.name);
- }, 0, this, "_toFetchDelay");
+ this._toFetchStorage.ensureDataReady();
+ return this._toFetchStorage.data.ids;
},
- async loadToFetch() {
- // Initialize to empty if there's no file.
- this._toFetch = [];
- let toFetch = await Utils.jsonLoad("toFetch/" + this.name, this);
- if (toFetch) {
- this._toFetch = toFetch;
- }
+ set toFetch(ids) {
+ this._toFetchStorage.data = { ids };
+ this._toFetchStorage.saveSoon();
},
get previousFailed() {
- return this._previousFailed;
+ this._previousFailedStorage.ensureDataReady();
+ return this._previousFailedStorage.data.ids;
},
- set previousFailed(val) {
- // Coerce the array to a string for more efficient comparison.
- if (val + "" == this._previousFailed) {
- return;
- }
- this._previousFailed = val;
- CommonUtils.namedTimer(function() {
- Utils.jsonSave("failed/" + this.name, this, this._previousFailed).then(() => {
- this._log.debug("Successfully wrote previousFailed.");
- })
- .catch((error) => {
- this._log.error("Failed to set previousFailed", error);
- })
- .then(() => {
- // Notify our tests that we finished writing the file.
- Observers.notify("sync-testing:file-saved:previousFailed", null, this.name);
- });
- }, 0, this, "_previousFailedDelay");
- },
-
- async loadPreviousFailed() {
- // Initialize to empty if there's no file
- this._previousFailed = [];
- let previousFailed = await Utils.jsonLoad("failed/" + this.name, this);
- if (previousFailed) {
- this._previousFailed = previousFailed;
- }
+ set previousFailed(ids) {
+ this._previousFailedStorage.data = { ids };
+ this._previousFailedStorage.saveSoon();
},
/*
* lastSyncLocal is a timestamp in local time.
*/
get lastSyncLocal() {
return parseInt(Svc.Prefs.get(this.name + ".lastSyncLocal", "0"), 10);
},
@@ -1901,16 +1890,22 @@ SyncEngine.prototype = {
* items that failed to upload. This method is called at the end of each sync.
*
*/
async trackRemainingChanges() {
for (let [id, change] of this._modified.entries()) {
this._tracker.addChangedID(id, change);
}
},
+
+ async finalize() {
+ await super.finalize();
+ await this._toFetchStorage.finalize();
+ await this._previousFailedStorage.finalize();
+ },
};
/**
* A changeset is created for each sync in `Engine::get{Changed, All}IDs`,
* and stores opaque change data for tracked IDs. The default implementation
* only records timestamps, though engines can extend this to store additional
* data for each entry.
*/
--- a/services/sync/tests/unit/test_syncengine.js
+++ b/services/sync/tests/unit/test_syncengine.js
@@ -1,22 +1,48 @@
/* Any copyright is dedicated to the Public Domain.
http://creativecommons.org/publicdomain/zero/1.0/ */
+Cu.import("resource://gre/modules/osfile.jsm");
Cu.import("resource://services-sync/engines.js");
Cu.import("resource://services-sync/service.js");
Cu.import("resource://services-sync/util.js");
Cu.import("resource://testing-common/services/sync/utils.js");
async function makeSteamEngine() {
let engine = new SyncEngine("Steam", Service);
await engine.initialize();
return engine;
}
+async function testSteamEngineStorage(test) {
+ try {
+ let setupEngine = await makeSteamEngine();
+
+ if (test.setup) {
+ await test.setup(setupEngine);
+ }
+
+ // Finalize the engine to flush the backlog and previous failed to disk.
+ await setupEngine.finalize();
+
+ if (test.beforeCheck) {
+ await test.beforeCheck();
+ }
+
+ let checkEngine = await makeSteamEngine();
+ await test.check(checkEngine);
+
+ await checkEngine.resetClient();
+ await checkEngine.finalize();
+ } finally {
+ Svc.Prefs.resetBranch("");
+ }
+}
+
let server;
add_task(async function setup() {
server = httpd_setup({});
});
add_task(async function test_url_attributes() {
_("SyncEngine url attributes");
@@ -79,104 +105,117 @@ add_task(async function test_lastSync()
do_check_eq(Svc.Prefs.get("steam.lastSync"), "0");
} finally {
Svc.Prefs.resetBranch("");
}
});
add_task(async function test_toFetch() {
_("SyncEngine.toFetch corresponds to file on disk");
- let syncTesting = await SyncTestingInfrastructure(server);
+ await SyncTestingInfrastructure(server);
const filename = "weave/toFetch/steam.json";
- let engine = await makeSteamEngine();
- try {
- // Ensure pristine environment
- do_check_eq(engine.toFetch.length, 0);
+
+ await testSteamEngineStorage({
+ toFetch: [Utils.makeGUID(), Utils.makeGUID(), Utils.makeGUID()],
+ setup(engine) {
+ // Ensure pristine environment
+ do_check_eq(engine.toFetch.length, 0);
- // Write file to disk
- let toFetch = [Utils.makeGUID(), Utils.makeGUID(), Utils.makeGUID()];
- let wrotePromise = promiseOneObserver("sync-testing:file-saved:toFetch");
- engine.toFetch = toFetch;
- do_check_eq(engine.toFetch, toFetch);
- // toFetch is written asynchronously
- await wrotePromise;
- let fakefile = syncTesting.fakeFilesystem.fakeContents[filename];
- do_check_eq(fakefile, JSON.stringify(toFetch));
+ // Write file to disk
+ engine.toFetch = this.toFetch;
+ do_check_eq(engine.toFetch, this.toFetch);
+ },
+ check(engine) {
+ // toFetch is written asynchronously
+ do_check_matches(engine.toFetch, this.toFetch);
+ },
+ });
- // Make sure it work for consecutive writes before the callback is executed.
- toFetch = [Utils.makeGUID(), Utils.makeGUID(), Utils.makeGUID(), Utils.makeGUID()];
- let toFetch2 = [Utils.makeGUID(), Utils.makeGUID(), Utils.makeGUID(), Utils.makeGUID(), Utils.makeGUID()];
- wrotePromise = promiseOneObserver("sync-testing:file-saved:toFetch");
-
- engine.toFetch = toFetch;
- do_check_eq(engine.toFetch, toFetch);
+ await testSteamEngineStorage({
+ toFetch: [Utils.makeGUID(), Utils.makeGUID(), Utils.makeGUID(), Utils.makeGUID()],
+ toFetch2: [Utils.makeGUID(), Utils.makeGUID(), Utils.makeGUID(), Utils.makeGUID(), Utils.makeGUID()],
+ setup(engine) {
+ // Make sure it work for consecutive writes before the callback is executed.
+ engine.toFetch = this.toFetch;
+ do_check_eq(engine.toFetch, this.toFetch);
- engine.toFetch = toFetch2;
- do_check_eq(engine.toFetch, toFetch2);
- // Note that do to the way CommonUtils.namedTimer works, we won't get a 2nd callback.
- await wrotePromise;
- fakefile = syncTesting.fakeFilesystem.fakeContents[filename];
- do_check_eq(fakefile, JSON.stringify(toFetch2));
+ engine.toFetch = this.toFetch2;
+ do_check_eq(engine.toFetch, this.toFetch2);
+ },
+ check(engine) {
+ do_check_matches(engine.toFetch, this.toFetch2);
+ },
+ });
- // Read file from disk
- toFetch = [Utils.makeGUID(), Utils.makeGUID()];
- syncTesting.fakeFilesystem.fakeContents[filename] = JSON.stringify(toFetch);
- await engine.loadToFetch();
- do_check_eq(engine.toFetch.length, 2);
- do_check_eq(engine.toFetch[0], toFetch[0]);
- do_check_eq(engine.toFetch[1], toFetch[1]);
- } finally {
- Svc.Prefs.resetBranch("");
- }
+ await testSteamEngineStorage({
+ toFetch: [Utils.makeGUID(), Utils.makeGUID()],
+ async beforeCheck() {
+ let toFetchPath = OS.Path.join(OS.Constants.Path.profileDir, filename);
+ let bytes = new TextEncoder().encode(JSON.stringify(this.toFetch));
+ await OS.File.writeAtomic(toFetchPath, bytes,
+ { tmpPath: toFetchPath + ".tmp" });
+ },
+ check(engine) {
+ // Read file from disk
+ do_check_matches(engine.toFetch, this.toFetch);
+ },
+ });
});
add_task(async function test_previousFailed() {
_("SyncEngine.previousFailed corresponds to file on disk");
- let syncTesting = await SyncTestingInfrastructure(server);
+ await SyncTestingInfrastructure(server);
const filename = "weave/failed/steam.json";
- let engine = await makeSteamEngine();
- try {
- // Ensure pristine environment
- do_check_eq(engine.previousFailed.length, 0);
+
+ await testSteamEngineStorage({
+ previousFailed: [Utils.makeGUID(), Utils.makeGUID(), Utils.makeGUID()],
+ setup(engine) {
+ // Ensure pristine environment
+ do_check_eq(engine.previousFailed.length, 0);
- // Write file to disk
- let previousFailed = [Utils.makeGUID(), Utils.makeGUID(), Utils.makeGUID()];
- let wrotePromise = promiseOneObserver("sync-testing:file-saved:previousFailed");
- engine.previousFailed = previousFailed;
- do_check_eq(engine.previousFailed, previousFailed);
- // previousFailed is written asynchronously
- await wrotePromise;
- let fakefile = syncTesting.fakeFilesystem.fakeContents[filename];
- do_check_eq(fakefile, JSON.stringify(previousFailed));
+ // Write file to disk
+ engine.previousFailed = this.previousFailed;
+ do_check_eq(engine.previousFailed, this.previousFailed);
+ },
+ check(engine) {
+ // previousFailed is written asynchronously
+ do_check_matches(engine.previousFailed, this.previousFailed);
+ },
+ });
- // Make sure it work for consecutive writes before the callback is executed.
- previousFailed = [Utils.makeGUID(), Utils.makeGUID(), Utils.makeGUID(), Utils.makeGUID()];
- let previousFailed2 = [Utils.makeGUID(), Utils.makeGUID(), Utils.makeGUID(), Utils.makeGUID(), Utils.makeGUID()];
- wrotePromise = promiseOneObserver("sync-testing:file-saved:previousFailed");
-
- engine.previousFailed = previousFailed;
- do_check_eq(engine.previousFailed, previousFailed);
+ await testSteamEngineStorage({
+ previousFailed: [Utils.makeGUID(), Utils.makeGUID(), Utils.makeGUID(), Utils.makeGUID()],
+ previousFailed2: [Utils.makeGUID(), Utils.makeGUID(), Utils.makeGUID(), Utils.makeGUID(), Utils.makeGUID()],
+ setup(engine) {
+ // Make sure it work for consecutive writes before the callback is executed.
+ engine.previousFailed = this.previousFailed;
+ do_check_eq(engine.previousFailed, this.previousFailed);
- engine.previousFailed = previousFailed2;
- do_check_eq(engine.previousFailed, previousFailed2);
- // Note that do to the way CommonUtils.namedTimer works, we're only notified once.
- await wrotePromise;
- fakefile = syncTesting.fakeFilesystem.fakeContents[filename];
- do_check_eq(fakefile, JSON.stringify(previousFailed2));
+ engine.previousFailed = this.previousFailed2;
+ do_check_eq(engine.previousFailed, this.previousFailed2);
+ },
+ check(engine) {
+ do_check_matches(engine.previousFailed, this.previousFailed2);
+ },
+ });
- // Read file from disk
- previousFailed = [Utils.makeGUID(), Utils.makeGUID()];
- syncTesting.fakeFilesystem.fakeContents[filename] = JSON.stringify(previousFailed);
- await engine.loadPreviousFailed();
- do_check_eq(engine.previousFailed.length, 2);
- do_check_eq(engine.previousFailed[0], previousFailed[0]);
- do_check_eq(engine.previousFailed[1], previousFailed[1]);
- } finally {
- Svc.Prefs.resetBranch("");
- }
+ await testSteamEngineStorage({
+ previousFailed: [Utils.makeGUID(), Utils.makeGUID()],
+ async beforeCheck() {
+ let previousFailedPath = OS.Path.join(OS.Constants.Path.profileDir,
+ filename);
+ let bytes = new TextEncoder().encode(JSON.stringify(this.previousFailed));
+ await OS.File.writeAtomic(previousFailedPath, bytes,
+ { tmpPath: previousFailedPath + ".tmp" });
+ },
+ check(engine) {
+ // Read file from disk
+ do_check_matches(engine.previousFailed, this.previousFailed);
+ },
+ });
});
add_task(async function test_resetClient() {
_("SyncEngine.resetClient resets lastSync and toFetch");
await SyncTestingInfrastructure(server);
let engine = await makeSteamEngine();
try {
// Ensure pristine environment