--- a/services/sync/modules/engines.js
+++ b/services/sync/modules/engines.js
@@ -1,15 +1,14 @@
/* This Source Code Form is subject to the terms of the Mozilla Public
* License, v. 2.0. If a copy of the MPL was not distributed with this
* file, You can obtain one at http://mozilla.org/MPL/2.0/. */
this.EXPORTED_SYMBOLS = [
"EngineManager",
- "Engine",
"SyncEngine",
"Tracker",
"Store",
"Changeset"
];
ChromeUtils.import("resource://gre/modules/XPCOMUtils.jsm");
ChromeUtils.import("resource://gre/modules/JSONFile.jsm");
@@ -309,17 +308,17 @@ Store.prototype = {
async applyIncomingBatch(records) {
let failed = [];
let maybeYield = Async.jankYielder();
for (let record of records) {
await maybeYield();
try {
await this.applyIncoming(record);
} catch (ex) {
- if (ex.code == Engine.prototype.eEngineAbortApplyIncoming) {
+ if (ex.code == SyncEngine.prototype.eEngineAbortApplyIncoming) {
// This kind of exception should have a 'cause' attribute, which is an
// originating exception.
// ex.cause will carry its stack with it when rethrown.
throw ex.cause;
}
if (Async.isShutdownException(ex)) {
throw ex;
}
@@ -605,17 +604,17 @@ EngineManager.prototype = {
name = name.name || "";
this._log.error(`Could not initialize engine ${name}`, ex);
}
},
async unregister(val) {
let name = val;
- if (val instanceof Engine) {
+ if (val instanceof SyncEngine) {
name = val.name;
}
if (name in this._engines) {
let engine = this._engines[name];
delete this._engines[name];
await engine.finalize();
}
},
@@ -624,164 +623,51 @@ EngineManager.prototype = {
for (let name in this._engines) {
let engine = this._engines[name];
delete this._engines[name];
await engine.finalize();
}
},
};
-this.Engine = function Engine(name, service) {
+this.SyncEngine = function SyncEngine(name, service) {
if (!service) {
- throw new Error("Engine must be associated with a Service instance.");
+ throw new Error("SyncEngine must be associated with a Service instance.");
}
this.Name = name || "Unnamed";
this.name = name.toLowerCase();
this.service = service;
this._notify = Utils.notify("weave:engine:");
this._log = Log.repository.getLogger("Sync.Engine." + this.Name);
this._log.manageLevelFromPref(`services.sync.log.logger.engine.${this.name}`);
this._modified = this.emptyChangeset();
this._tracker; // initialize tracker to load previously changed IDs
this._log.debug("Engine constructed");
- XPCOMUtils.defineLazyPreferenceGetter(this, "_enabled",
- `services.sync.engine.${this.prefName}`, false,
- (data, previous, latest) =>
- // We do not await on the promise onEngineEnabledChanged returns.
- this._tracker.onEngineEnabledChanged(latest));
-};
-Engine.prototype = {
- // _storeObj, and _trackerObj should to be overridden in subclasses
- _storeObj: Store,
- _trackerObj: Tracker,
-
- // Override this method to return a new changeset type.
- emptyChangeset() {
- return new Changeset();
- },
-
- // Local 'constant'.
- // Signal to the engine that processing further records is pointless.
- eEngineAbortApplyIncoming: "error.engine.abort.applyincoming",
-
- // Should we keep syncing if we find a record that cannot be uploaded (ever)?
- // If this is false, we'll throw, otherwise, we'll ignore the record and
- // continue. This currently can only happen due to the record being larger
- // than the record upload limit.
- allowSkippedRecord: true,
-
- get prefName() {
- return this.name;
- },
-
- get enabled() {
- return this._enabled;
- },
-
- set enabled(val) {
- if (!!val != this._enabled) {
- Svc.Prefs.set("engine." + this.prefName, !!val);
- }
- },
-
- get score() {
- return this._tracker.score;
- },
-
- get _store() {
- let store = new this._storeObj(this.Name, this);
- this.__defineGetter__("_store", () => store);
- return store;
- },
-
- get _tracker() {
- let tracker = new this._trackerObj(this.Name, this);
- this.__defineGetter__("_tracker", () => tracker);
- return tracker;
- },
-
- startTracking() {
- this._tracker.start();
- },
-
- // Returns a promise
- stopTracking() {
- return this._tracker.stop();
- },
-
- async sync() {
- if (!this.enabled) {
- return false;
- }
-
- if (!this._sync) {
- throw new Error("engine does not implement _sync method");
- }
-
- return this._notify("sync", this.name, this._sync)();
- },
-
- /**
- * Get rid of any local meta-data.
- */
- async resetClient() {
- if (!this._resetClient) {
- throw new Error("engine does not implement _resetClient method");
- }
-
- return this._notify("reset-client", this.name, this._resetClient)();
- },
-
- async _wipeClient() {
- await this.resetClient();
- this._log.debug("Deleting all local data");
- this._tracker.ignoreAll = true;
- await this._store.wipe();
- this._tracker.ignoreAll = false;
- await this._tracker.clearChangedIDs();
- },
-
- async wipeClient() {
- return this._notify("wipe-client", this.name, this._wipeClient)();
- },
-
- /**
- * If one exists, initialize and return a validator for this engine (which
- * must have a `validate(engine)` method that returns a promise to an object
- * with a getSummary method). Otherwise return null.
- */
- getValidator() {
- return null;
- },
-
- async finalize() {
- await this._tracker.finalize();
- },
-};
-
-this.SyncEngine = function SyncEngine(name, service) {
- Engine.call(this, name || "SyncEngine", service);
-
this._toFetchStorage = new JSONFile({
path: Utils.jsonFilePath("toFetch/" + this.name),
dataPostProcessor: json => this._metadataPostProcessor(json),
beforeSave: () => this._beforeSaveMetadata(),
});
this._previousFailedStorage = new JSONFile({
path: Utils.jsonFilePath("failed/" + this.name),
dataPostProcessor: json => this._metadataPostProcessor(json),
beforeSave: () => this._beforeSaveMetadata(),
});
Utils.defineLazyIDProperty(this, "syncID", `services.sync.${this.name}.syncID`);
+ XPCOMUtils.defineLazyPreferenceGetter(this, "_enabled",
+ `services.sync.engine.${this.prefName}`, false,
+ (data, previous, latest) =>
+ // We do not await on the promise onEngineEnabledChanged returns.
+ this._tracker.onEngineEnabledChanged(latest));
XPCOMUtils.defineLazyPreferenceGetter(this, "_lastSync",
`services.sync.${this.name}.lastSync`,
"0", null,
v => parseFloat(v));
XPCOMUtils.defineLazyPreferenceGetter(this, "_lastSyncLocal",
`services.sync.${this.name}.lastSyncLocal`,
"0", null,
v => parseInt(v, 10));
@@ -820,20 +706,32 @@ this.SyncEngine = function SyncEngine(na
// Attached to the constructor to allow use as a kind of static enumeration.
SyncEngine.kRecoveryStrategy = {
ignore: "ignore",
retry: "retry",
error: "error"
};
SyncEngine.prototype = {
- __proto__: Engine.prototype,
_recordObj: CryptoWrapper,
+ // _storeObj, and _trackerObj should to be overridden in subclasses
+ _storeObj: Store,
+ _trackerObj: Tracker,
version: 1,
+ // Local 'constant'.
+ // Signal to the engine that processing further records is pointless.
+ eEngineAbortApplyIncoming: "error.engine.abort.applyincoming",
+
+ // Should we keep syncing if we find a record that cannot be uploaded (ever)?
+ // If this is false, we'll throw, otherwise, we'll ignore the record and
+ // continue. This currently can only happen due to the record being larger
+ // than the record upload limit.
+ allowSkippedRecord: true,
+
// Which sortindex to use when retrieving records for this engine.
_defaultSort: undefined,
_metadataPostProcessor(json) {
if (Array.isArray(json)) {
// Pre-`JSONFile` storage stored an array, but `JSONFile` defaults to
// an object, so we wrap the array for consistency.
json = { ids: json };
@@ -870,32 +768,88 @@ SyncEngine.prototype = {
downloadBatchSize: DEFAULT_DOWNLOAD_BATCH_SIZE,
async initialize() {
await this._toFetchStorage.load();
await this._previousFailedStorage.load();
this._log.debug("SyncEngine initialized", this.name);
},
+ get prefName() {
+ return this.name;
+ },
+
+ get enabled() {
+ return this._enabled;
+ },
+
+ set enabled(val) {
+ if (!!val != this._enabled) {
+ Svc.Prefs.set("engine." + this.prefName, !!val);
+ }
+ },
+
+ get score() {
+ return this._tracker.score;
+ },
+
+ get _store() {
+ let store = new this._storeObj(this.Name, this);
+ this.__defineGetter__("_store", () => store);
+ return store;
+ },
+
+ get _tracker() {
+ let tracker = new this._trackerObj(this.Name, this);
+ this.__defineGetter__("_tracker", () => tracker);
+ return tracker;
+ },
+
get storageURL() {
return this.service.storageURL;
},
get engineURL() {
return this.storageURL + this.name;
},
get cryptoKeysURL() {
return this.storageURL + "crypto/keys";
},
get metaURL() {
return this.storageURL + "meta/global";
},
+ startTracking() {
+ this._tracker.start();
+ },
+
+ // Returns a promise
+ stopTracking() {
+ return this._tracker.stop();
+ },
+
+ async sync() {
+ if (!this.enabled) {
+ return false;
+ }
+
+ if (!this._sync) {
+ throw new Error("engine does not implement _sync method");
+ }
+
+ return this._notify("sync", this.name, this._sync)();
+ },
+
+ // Override this method to return a new changeset type.
+ emptyChangeset() {
+ return new Changeset();
+ },
+
/*
* lastSync is a timestamp in server time.
*/
async getLastSync() {
return this.lastSync;
},
async setLastSync(lastSync) {
this.lastSync = lastSync;
@@ -1338,17 +1292,17 @@ SyncEngine.prototype = {
await this._deleteId(item.id);
return { shouldApply: false, error: null };
}
let shouldApply;
try {
shouldApply = await this._reconcile(item);
} catch (ex) {
- if (ex.code == Engine.prototype.eEngineAbortApplyIncoming) {
+ if (ex.code == SyncEngine.prototype.eEngineAbortApplyIncoming) {
this._log.warn("Reconciliation failed: aborting incoming processing.");
throw ex.cause;
} else if (!Async.isShutdownException(ex)) {
this._log.warn("Failed to reconcile incoming record " + item.id, ex);
return { shouldApply: false, error: ex };
} else {
throw ex;
}
@@ -1917,18 +1871,51 @@ SyncEngine.prototype = {
*
*/
async trackRemainingChanges() {
for (let [id, change] of this._modified.entries()) {
await this._tracker.addChangedID(id, change);
}
},
+ /**
+ * Get rid of any local meta-data.
+ */
+ async resetClient() {
+ if (!this._resetClient) {
+ throw new Error("engine does not implement _resetClient method");
+ }
+
+ return this._notify("reset-client", this.name, this._resetClient)();
+ },
+
+ async wipeClient() {
+ return this._notify("wipe-client", this.name, this._wipeClient)();
+ },
+
+ async _wipeClient() {
+ await this.resetClient();
+ this._log.debug("Deleting all local data");
+ this._tracker.ignoreAll = true;
+ await this._store.wipe();
+ this._tracker.ignoreAll = false;
+ await this._tracker.clearChangedIDs();
+ },
+
+ /**
+ * If one exists, initialize and return a validator for this engine (which
+ * must have a `validate(engine)` method that returns a promise to an object
+ * with a getSummary method). Otherwise return null.
+ */
+ getValidator() {
+ return null;
+ },
+
async finalize() {
- await super.finalize();
+ await this._tracker.finalize();
await this._toFetchStorage.finalize();
await this._previousFailedStorage.finalize();
},
};
/**
* A changeset is created for each sync in `Engine::get{Changed, All}IDs`,
* and stores opaque change data for tracked IDs. The default implementation
--- a/services/sync/modules/engines/bookmarks.js
+++ b/services/sync/modules/engines/bookmarks.js
@@ -560,17 +560,17 @@ BookmarksEngine.prototype = {
try {
return this._guidMap = await this._buildGUIDMap();
} catch (ex) {
if (Async.isShutdownException(ex)) {
throw ex;
}
this._log.warn("Error while building GUID map, skipping all other incoming items", ex);
// eslint-disable-next-line no-throw-literal
- throw {code: Engine.prototype.eEngineAbortApplyIncoming,
+ throw {code: SyncEngine.prototype.eEngineAbortApplyIncoming,
cause: ex};
}
},
async _deletePending() {
// Delete pending items -- See the comment above BookmarkStore's deletePending
let newlyModified = await this._store.deletePending();
if (newlyModified) {
--- a/services/sync/tests/unit/test_bookmark_engine.js
+++ b/services/sync/tests/unit/test_bookmark_engine.js
@@ -547,17 +547,17 @@ add_task(async function test_bookmark_gu
_("We get an error if building _guidMap fails in use.");
let err;
try {
_(await engine.getGuidMap());
} catch (ex) {
err = ex;
}
- Assert.equal(err.code, Engine.prototype.eEngineAbortApplyIncoming);
+ Assert.equal(err.code, SyncEngine.prototype.eEngineAbortApplyIncoming);
Assert.equal(err.cause, "Nooo");
_("We get an error and abort during processIncoming.");
err = undefined;
try {
await engine._processIncoming();
} catch (ex) {
err = ex;
--- a/services/sync/tests/unit/test_declined.js
+++ b/services/sync/tests/unit/test_declined.js
@@ -12,17 +12,17 @@ PetrolEngine.prototype.name = "petrol";
function DieselEngine() {}
DieselEngine.prototype.name = "diesel";
function DummyEngine() {}
DummyEngine.prototype.name = "dummy";
function ActualEngine() {}
-ActualEngine.prototype = {__proto__: Engine.prototype,
+ActualEngine.prototype = {__proto__: SyncEngine.prototype,
name: "actual"};
function getEngineManager() {
let manager = new EngineManager(Service);
Service.engineManager = manager;
manager._engines = {
"petrol": new PetrolEngine(),
"diesel": new DieselEngine(),
--- a/services/sync/tests/unit/test_engine.js
+++ b/services/sync/tests/unit/test_engine.js
@@ -24,22 +24,22 @@ function SteamTracker(name, engine) {
Tracker.call(this, name || "Steam", engine);
}
SteamTracker.prototype = {
__proto__: Tracker.prototype,
persistChangedIDs: false,
};
function SteamEngine(name, service) {
- Engine.call(this, name, service);
+ SyncEngine.call(this, name, service);
this.wasReset = false;
this.wasSynced = false;
}
SteamEngine.prototype = {
- __proto__: Engine.prototype,
+ __proto__: SyncEngine.prototype,
_storeObj: SteamStore,
_trackerObj: SteamTracker,
async _resetClient() {
this.wasReset = true;
},
async _sync() {
--- a/services/sync/tests/unit/test_engine_abort.js
+++ b/services/sync/tests/unit/test_engine_abort.js
@@ -25,17 +25,17 @@ add_task(async function test_processInco
_("Create some server data.");
let meta_global = Service.recordManager.set(engine.metaURL,
new WBORecord(engine.metaURL));
meta_global.payload.engines = {rotary: {version: engine.version,
syncID: engine.syncID}};
_("Fake applyIncoming to abort.");
engine._store.applyIncoming = async function(record) {
- let ex = {code: Engine.prototype.eEngineAbortApplyIncoming,
+ let ex = {code: SyncEngine.prototype.eEngineAbortApplyIncoming,
cause: "Nooo"};
_("Throwing: " + JSON.stringify(ex));
throw ex;
};
_("Trying _processIncoming. It will throw after aborting.");
let err;
try {
--- a/services/sync/tests/unit/test_enginemanager.js
+++ b/services/sync/tests/unit/test_enginemanager.js
@@ -11,19 +11,21 @@ PetrolEngine.prototype.finalize = async
function DieselEngine() {}
DieselEngine.prototype.name = "diesel";
DieselEngine.prototype.finalize = async function() {};
function DummyEngine() {}
DummyEngine.prototype.name = "dummy";
DummyEngine.prototype.finalize = async function() {};
-function ActualEngine() {}
-ActualEngine.prototype = {__proto__: Engine.prototype,
- name: "actual"};
+class ActualEngine extends SyncEngine {
+ constructor(service) {
+ super("Actual", service);
+ }
+}
add_task(async function test_basics() {
_("We start out with a clean slate");
let manager = new EngineManager(Service);
let engines = await manager.getAll();
Assert.equal(engines.length, 0);
@@ -98,14 +100,14 @@ add_task(async function test_basics() {
Assert.equal(engines.length, 2);
Assert.equal(engines.indexOf(dummy), -1);
_("Unregister an engine by value");
// manager.unregister() checks for instanceof Engine, so let's make one:
await manager.register(ActualEngine);
let actual = await manager.get("actual");
Assert.ok(actual instanceof ActualEngine);
- Assert.ok(actual instanceof Engine);
+ Assert.ok(actual instanceof SyncEngine);
await manager.unregister(actual);
Assert.equal((await manager.get("actual")), undefined);
});
--- a/services/sync/tests/unit/test_telemetry.js
+++ b/services/sync/tests/unit/test_telemetry.js
@@ -31,33 +31,33 @@ function SteamTracker(name, engine) {
}
SteamTracker.prototype = {
__proto__: Tracker.prototype,
persistChangedIDs: false,
};
function SteamEngine(service) {
- Engine.call(this, "steam", service);
+ SyncEngine.call(this, "steam", service);
}
SteamEngine.prototype = {
- __proto__: Engine.prototype,
+ __proto__: SyncEngine.prototype,
_storeObj: SteamStore,
_trackerObj: SteamTracker,
_errToThrow: null,
async _sync() {
if (this._errToThrow) {
throw this._errToThrow;
}
}
};
function BogusEngine(service) {
- Engine.call(this, "bogus", service);
+ SyncEngine.call(this, "bogus", service);
}
BogusEngine.prototype = Object.create(SteamEngine.prototype);
async function cleanAndGo(engine, server) {
await engine._tracker.clearChangedIDs();
Svc.Prefs.resetBranch("");
syncTestLogging();
--- a/tools/lint/eslint/modules.json
+++ b/tools/lint/eslint/modules.json
@@ -46,17 +46,17 @@
"declined.js": ["DeclinedEngines"],
"dispatcher.js": ["dispatcher"],
"distribution.js": ["DistributionCustomizer"],
"DNSTypes.jsm": ["DNS_QUERY_RESPONSE_CODES", "DNS_AUTHORITATIVE_ANSWER_CODES", "DNS_CLASS_CODES", "DNS_RECORD_TYPES"],
"doctor.js": ["Doctor"],
"DOMRequestHelper.jsm": ["DOMRequestIpcHelper"],
"DownloadCore.jsm": ["Download", "DownloadSource", "DownloadTarget", "DownloadError", "DownloadSaver", "DownloadCopySaver", "DownloadLegacySaver", "DownloadPDFSaver"],
"DownloadList.jsm": ["DownloadList", "DownloadCombinedList", "DownloadSummary"],
- "engines.js": ["EngineManager", "Engine", "SyncEngine", "Tracker", "Store", "Changeset"],
+ "engines.js": ["EngineManager", "SyncEngine", "Tracker", "Store", "Changeset"],
"enginesync.js": ["EngineSynchronizer"],
"evaluate.js": ["evaluate", "sandbox", "Sandboxes"],
"event-emitter.js": ["EventEmitter"],
"Extension.jsm": ["Extension", "ExtensionData"],
"ExtensionAPI.jsm": ["ExtensionAPI", "ExtensionAPIs"],
"ExtensionsUI.jsm": ["ExtensionsUI"],
"ExtensionTabs.jsm": ["TabTrackerBase", "TabManagerBase", "TabBase", "WindowTrackerBase", "WindowManagerBase", "WindowBase"],
"extension-storage.js": ["ExtensionStorageEngine", "EncryptionRemoteTransformer", "KeyRingEncryptionRemoteTransformer"],