--- a/services/sync/modules/engines.js
+++ b/services/sync/modules/engines.js
@@ -641,27 +641,17 @@ function SyncEngine(name, service) {
this._notify = Utils.notify("weave:engine:");
this._log = Log.repository.getLogger("Sync.Engine." + this.Name);
this._log.manageLevelFromPref(`services.sync.log.logger.engine.${this.name}`);
this._modified = this.emptyChangeset();
this._tracker; // initialize tracker to load previously changed IDs
this._log.debug("Engine constructed");
- this._toFetchStorage = new JSONFile({
- path: Utils.jsonFilePath("toFetch/" + this.name),
- dataPostProcessor: json => this._metadataPostProcessor(json),
- beforeSave: () => this._beforeSaveMetadata(),
- });
-
- this._previousFailedStorage = new JSONFile({
- path: Utils.jsonFilePath("failed/" + this.name),
- dataPostProcessor: json => this._metadataPostProcessor(json),
- beforeSave: () => this._beforeSaveMetadata(),
- });
+ this.backfillManager = new BackfillManager(this);
XPCOMUtils.defineLazyPreferenceGetter(this, "_enabled",
`services.sync.engine.${this.prefName}`, false,
(data, previous, latest) =>
// We do not await on the promise onEngineEnabledChanged returns.
this._tracker.onEngineEnabledChanged(latest));
XPCOMUtils.defineLazyPreferenceGetter(this, "_syncID",
`services.sync.${this.name}.syncID`,
@@ -724,36 +714,16 @@ SyncEngine.prototype = {
// If this is false, we'll throw, otherwise, we'll ignore the record and
// continue. This currently can only happen due to the record being larger
// than the record upload limit.
allowSkippedRecord: true,
// Which sortindex to use when retrieving records for this engine.
_defaultSort: undefined,
- _metadataPostProcessor(json) {
- if (Array.isArray(json)) {
- // Pre-`JSONFile` storage stored an array, but `JSONFile` defaults to
- // an object, so we wrap the array for consistency.
- json = { ids: json };
- }
- if (!json.ids) {
- json.ids = [];
- }
- // The set serializes the same way as an array, but offers more efficient
- // methods of manipulation.
- json.ids = new SerializableSet(json.ids);
- return json;
- },
-
- async _beforeSaveMetadata() {
- await ensureDirectory(this._toFetchStorage.path);
- await ensureDirectory(this._previousFailedStorage.path);
- },
-
// A relative priority to use when computing an order
// for engines to be synced. Higher-priority engines
// (lower numbers) are synced first.
// It is recommended that a unique value be used for each engine,
// in order to guarantee a stable sequence.
syncPriority: 0,
// How many records to pull in a single sync. This is primarily to avoid very
@@ -762,18 +732,17 @@ SyncEngine.prototype = {
// How many records to pull at one time when specifying IDs. This is to avoid
// URI length limitations.
guidFetchBatchSize: DEFAULT_GUID_FETCH_BATCH_SIZE,
downloadBatchSize: DEFAULT_DOWNLOAD_BATCH_SIZE,
async initialize() {
- await this._toFetchStorage.load();
- await this._previousFailedStorage.load();
+ await this.backfillManager.load();
this._log.debug("SyncEngine initialized", this.name);
},
get prefName() {
return this.name;
},
get enabled() {
@@ -901,58 +870,35 @@ SyncEngine.prototype = {
* as a first sync.
*
* @return the new sync ID.
*/
async resetLocalSyncID() {
return this.ensureCurrentSyncID(Utils.makeGUID());
},
+ addToFetch(coll) {
+ this.backfillManager.addToFetch(coll);
+ },
+
/*
* lastSync is a timestamp in server time.
*/
async getLastSync() {
return this._lastSync;
},
async setLastSync(lastSync) {
// Store the value as a string to keep floating point precision
Svc.Prefs.set(this.name + ".lastSync", lastSync.toString());
},
async resetLastSync() {
this._log.debug("Resetting " + this.name + " last sync time");
await this.setLastSync(0);
},
- get toFetch() {
- this._toFetchStorage.ensureDataReady();
- return this._toFetchStorage.data.ids;
- },
-
- set toFetch(ids) {
- if (ids.constructor.name != "SerializableSet") {
- throw new Error("Bug: Attempted to set toFetch to something that isn't a SerializableSet");
- }
- this._toFetchStorage.data = { ids };
- this._toFetchStorage.saveSoon();
- },
-
- get previousFailed() {
- this._previousFailedStorage.ensureDataReady();
- return this._previousFailedStorage.data.ids;
- },
-
- set previousFailed(ids) {
- if (ids.constructor.name != "SerializableSet") {
- throw new Error(
- "Bug: Attempted to set previousFailed to something that isn't a SerializableSet");
- }
- this._previousFailedStorage.data = { ids };
- this._previousFailedStorage.saveSoon();
- },
-
/*
* Returns a changeset for this sync. Engine implementations can override this
* method to bypass the tracker for certain or all changed items.
*/
async getChangedIDs() {
return this._tracker.getChangedIDs();
},
@@ -1105,17 +1051,16 @@ SyncEngine.prototype = {
// newFailed => number of items that failed for the first time in this sync.
// reconciled => number of items that were reconciled.
let count = {applied: 0, failed: 0, newFailed: 0, reconciled: 0};
let recordsToApply = [];
let failedInCurrentSync = new SerializableSet();
let oldestModified = this.lastModified;
let downloadedIDs = new Set();
-
// Stage 1: Fetch new records from the server, up to the download limit.
if (this.lastModified == null || this.lastModified > lastSync) {
let { response, records } = await newitems.getBatched(this.downloadBatchSize);
if (!response.success) {
response.failureCode = ENGINE_DOWNLOAD_FAIL;
throw response;
}
@@ -1147,116 +1092,67 @@ SyncEngine.prototype = {
// `applied` is a bit of a misnomer: it counts records that *should* be
// applied, so it also includes records that we tried to apply and failed.
// `recordsToApply.length - failedToApply.length` is the number of records
// that we *successfully* applied.
count.failed += failedToApply.length;
count.applied += recordsToApply.length;
}
- // Stage 2: If we reached our download limit, we might still have records
- // on the server that changed since the last sync. Fetch the IDs for the
- // remaining records, and add them to the backlog. Note that this stage
- // only runs for engines that set a download limit.
if (downloadedIDs.size == downloadLimit) {
- let guidColl = this.itemSource();
-
- guidColl.newer = lastSync;
- guidColl.older = oldestModified;
- guidColl.sort = "oldest";
-
- let guids = await guidColl.get();
- if (!guids.success)
- throw guids;
-
- // Filtering out already downloaded IDs here isn't necessary. We only do
- // that in case the Sync server doesn't support `older` (bug 1316110).
- let remainingIDs = guids.obj.filter(id => !downloadedIDs.has(id));
- if (remainingIDs.length > 0) {
- this.toFetch = Utils.setAddAll(this.toFetch, remainingIDs);
- }
+ this.backfillManager.pushBackfillChunk(lastSync, oldestModified);
}
// Fast-foward the lastSync timestamp since we have backlogged the
// remaining items.
if (lastSync < this.lastModified) {
lastSync = this.lastModified;
await this.setLastSync(lastSync);
}
// Stage 3: Backfill records from the backlog, and those that failed to
// decrypt or apply during the last sync. We only backfill up to the
// download limit, to prevent a large backlog for one engine from blocking
// the others. We'll keep processing the backlog on subsequent engine syncs.
- let failedInPreviousSync = this.previousFailed;
- let idsToBackfill = Array.from(
- Utils.setAddAll(Utils.subsetOfSize(this.toFetch, downloadLimit),
- failedInPreviousSync));
-
- // Note that we intentionally overwrite the previously failed list here.
- // Records that fail to decrypt or apply in two consecutive syncs are likely
- // corrupt; we remove them from the list because retrying and failing on
- // every subsequent sync just adds noise.
- this.previousFailed = failedInCurrentSync;
-
- let backfilledItems = this.itemSource();
-
- backfilledItems.sort = "newest";
- backfilledItems.full = true;
-
- // `getBatched` includes the list of IDs as a query parameter, so we need to fetch
- // records in chunks to avoid exceeding URI length limits.
- for (let ids of PlacesSyncUtils.chunkArray(idsToBackfill, this.guidFetchBatchSize)) {
- backfilledItems.ids = ids;
-
- let {response, records} = await backfilledItems.getBatched(this.downloadBatchSize);
- if (!response.success) {
- response.failureCode = ENGINE_DOWNLOAD_FAIL;
- throw response;
- }
-
- let maybeYield = Async.jankYielder();
+ let maybeYield = Async.jankYielder();
+ await this.backfillManager.stepBackfill(downloadLimit, failedInCurrentSync,
+ async (records, failedBefore) => {
let backfilledRecordsToApply = [];
let failedInBackfill = [];
for (let record of records) {
await maybeYield();
- let { shouldApply, error } = await this._maybeReconcile(record);
+ let { shouldApply, error } = await this.engine._maybeReconcile(record);
if (error) {
failedInBackfill.push(record.id);
count.failed++;
continue;
}
if (!shouldApply) {
count.reconciled++;
continue;
}
backfilledRecordsToApply.push(record);
}
- let failedToApply = await this._applyRecords(backfilledRecordsToApply);
+
+ let failedToApply = await this.engine._applyRecords(backfilledRecordsToApply);
failedInBackfill.push(...failedToApply);
-
count.failed += failedToApply.length;
- count.applied += backfilledRecordsToApply.length;
-
- this.toFetch = Utils.setDeleteAll(this.toFetch, ids);
- this.previousFailed = Utils.setAddAll(this.previousFailed, failedInBackfill);
+ if (!failedBefore) {
+ count.newFailed += failedInBackfill.length;
+ }
- if (lastSync < this.lastModified) {
- lastSync = this.lastModified;
- await this.setLastSync(lastSync);
- }
- }
+ count.applied += backfilledRecordsToApply.length;
+ return failedInBackfill;
+ });
- count.newFailed = 0;
- for (let item of this.previousFailed) {
- if (!failedInPreviousSync.has(item)) {
- ++count.newFailed;
- }
+ if (lastSync < this.lastModified) {
+ lastSync = this.lastModified;
+ await this.setLastSync(lastSync);
}
count.succeeded = Math.max(0, count.applied - count.failed);
this._log.info(["Records:",
count.applied, "applied,",
count.succeeded, "successfully,",
count.failed, "failed to apply,",
count.newFailed, "newly failed to apply,",
@@ -1921,19 +1817,18 @@ SyncEngine.prototype = {
* Removes all local Sync metadata for this engine, but keeps all existing
* local user data.
*/
async resetClient() {
return this._notify("reset-client", this.name, this._resetClient)();
},
async _resetClient() {
- await this.resetLastSync();
- this.previousFailed = new SerializableSet();
- this.toFetch = new SerializableSet();
+ this.resetLastSync();
+ this.backfillManager.clear();
this._needWeakUpload.clear();
},
/**
* Removes all local Sync metadata and user data for this engine.
*/
async wipeClient() {
return this._notify("wipe-client", this.name, this._wipeClient)();
@@ -1954,18 +1849,17 @@ SyncEngine.prototype = {
* with a getSummary method). Otherwise return null.
*/
getValidator() {
return null;
},
async finalize() {
await this._tracker.finalize();
- await this._toFetchStorage.finalize();
- await this._previousFailedStorage.finalize();
+ await this.backfillManager.finalize();
},
};
/**
* A changeset is created for each sync in `Engine::get{Changed, All}IDs`,
* and stores opaque change data for tracked IDs. The default implementation
* only records timestamps, though engines can extend this to store additional
* data for each entry.
@@ -2031,8 +1925,302 @@ class Changeset {
return this.ids().length;
}
// Clears the changeset.
clear() {
this.changes = {};
}
}
+
+// This class is responsible for backfilling records for engines that require a
+// record backfill. It handles three types of backfill at the same time:
+//
+// 1. `chunks`, which is a list of time ranges that contains records we are
+// missing (more on this below).
+//
+// 2. `previousFailed`, which is a set of individual ids that failed to apply
+// which we attempt to fetch a second time (but after failing twice are
+// discarded)
+//
+// 3. `toFetch`, which is also a set of individual ids we need to request.
+// Unlike `previousFailed`, these are either explicitly requested by the
+// engine, (as is the case with the bookmarks engine during repair), or they
+// were written to the `toFetch` before we introduced the chunked backfill
+// mechanism (before that `toFetch` was used instead).
+//
+// Each of these is persisted in separate files, as they're typically updated
+// separately, and the updates may happen multiple times across the duration of
+// a sync.
+//
+// More on chunks: The behavior here is similar to Android and iOS sync's notion
+// of a high water mark, which maintains the `offset`, and `older` parameters
+// needed for the next fetch.
+//
+// Where this code differs is that desktop doesn't want to backfill records it
+// has already synced, so it stores the `newer` parameter as the oldest item
+// synced outside of the backfill. However, if we haven't completed the previous
+// backfill when another backfill would be required, we may need multiple ranges
+// (chunks) of `{older, newer, offset}` for backfilling. (See also Bug 1435007)
+//
+// These "chunks" are persisted to/loaded from the toFetch file at startup.
+// Previous versions of the toFetch file stored a set of IDs. Conceptually this
+// class maintains multiple ongoing `getBatched` calls, which can continue
+// across restarts.
+class BackfillManager {
+
+ constructor(engine, limit = engine.downloadLimit) {
+ this._log = Log.repository.getLogger(`Sync.Engine.${engine.Name}.Backfill`);
+ this.engine = engine;
+ this.limit = limit;
+
+ this.failedStorage = this._makeJSONFile("failed", BackfillManager._onLoadIDs);
+ this.toFetchStorage = this._makeJSONFile("toFetch", BackfillManager._onLoadIDs);
+ this.chunksStorage = this._makeJSONFile("chunks", BackfillManager._onLoadChunks);
+ }
+
+ _makeJSONFile(folder, dataPostProcessor) {
+ const path = Utils.jsonFilePath(folder + "/" + this.engine.name);
+ return new JSONFile({
+ path,
+ dataPostProcessor,
+ beforeSave: () => ensureDirectory(path)
+ });
+ }
+
+ static _onLoadIDs(json) {
+ if (Array.isArray(json)) {
+ // Pre-`JSONFile` storage stored an array, but `JSONFile` defaults to
+ // an object, so we wrap the array for consistency.
+ json = { ids: json };
+ }
+ if (!json.ids) {
+ json.ids = [];
+ }
+ // The set serializes the same way as an array, but offers more efficient
+ // methods of manipulation.
+ json.ids = new SerializableSet(json.ids);
+ return json;
+ }
+
+ static _onLoadChunks(json) {
+ if (!json.chunks) {
+ json.chunks = [];
+ }
+ return json;
+ }
+
+ async load() {
+ await Promise.all([
+ this.chunkStorage.load(),
+ this.failedStorage.load(),
+ this.toFetchStorage.load(),
+ ]);
+ }
+
+ async finalize() {
+ await Promise.all([
+ this.failedStorage.finalize(),
+ this.chunkStorage.finalize(),
+ this.toFetchStorage.finalize(),
+ ]);
+ }
+
+ reset() {
+ this.previousFailed = new SerializableSet();
+ this.toFetch = new SerializableSet();
+ this.chunks = [];
+ }
+
+ pushBackfillChunk(older, newer) {
+ let chunk = { older, newer, offset: null };
+ this.chunks.push(chunk);
+ this.chunkStorage.saveSoon();
+ }
+
+ get chunks() {
+ this.chunkStorage.ensureDataReady();
+ return this.chunkStorage.data.chunks;
+ }
+
+ set chunks(chunks) {
+ this.chunkStorage.data = { chunks };
+ this.chunkStorage.saveSoon();
+ }
+
+ get previousFailed() {
+ this.failedStorage.ensureDataReady();
+ return this.failedStorage.data.ids;
+ }
+
+ set previousFailed(ids) {
+ if (ids.constructor.name != "SerializableSet") {
+ throw new Error("Bug: Attempted to set previousFailed to something that isn't a SerializableSet");
+ }
+ this.failedStorage.data = { ids };
+ this.failedStorage.saveSoon();
+ }
+
+ get toFetch() {
+ this.toFetchStorage.ensureDataReady();
+ return this.toFetchStorage.data.ids;
+ }
+
+ set toFetch(ids) {
+ if (ids.constructor.name != "SerializableSet") {
+ throw new Error("Bug: Attempted to set toFetch to something that isn't a SerializableSet");
+ }
+ this.toFetchStorage.data = { ids };
+ this.toFetchStorage.saveSoon();
+ }
+
+ addFailed(ids) {
+ if (ids.length) {
+ this.previousFailed = Utils.setAddAll(this.previousFailed, ids);
+ }
+ }
+
+ addToFetch(ids) {
+ if (ids.length) {
+ this.toFetch = Utils.setAddAll(this.toFetch, ids);
+ }
+ }
+
+ removeToFetch(ids) {
+ // If we have a lot of items in toFetch, this gets called frequently enough
+ // that it's worth avoiding triggering the saveSoon in the case where there
+ // were no changes.
+ let startSize = this.toFetch.size;
+ let toFetch = Utils.setDeleteAll(this.toFetch, ids);
+ if (toFetch.size != startSize) {
+ this.toFetch = toFetch;
+ }
+ }
+
+ async _fetch(resource) {
+ // For backfilling we don't want XIUS semantics, so we don't use getBatched.
+ // (We also handle the batching ourselves, but in a resumable way).
+ let response = await resource.get();
+ if (!response.success) {
+ response.failureCode = ENGINE_DOWNLOAD_FAIL;
+ throw response;
+ }
+ let records = response.obj.map(json => {
+ let record = new this.engine._recordObj();
+ record.deserialize(json);
+ return record;
+ });
+ return { records, response };
+ }
+
+ /**
+ * Fetch and apply items from the backfill.
+ *
+ * @param limit {number}
+ * The maximum number of items to fetch
+ * @param failedInCurrentSync {SerializableSet}
+ * Set of ids that failed in the current sync.
+ * @param applyRecords {async(records: Array, failedBefore: boolean) => Array}
+ * A callback that takes the list of records and whether or not these
+ * records have failed before (for recording newFailed in telemetry).
+ * It should return an array of ids of records that failed to apply.
+ */
+ async stepBackfill(limit, failedInCurrentSync, applyRecords) {
+ let downloadLimit = Math.min(this.limit, limit);
+
+ await this._processIDs(failedInCurrentSync, downloadLimit, async (records, failedBefore) => {
+ downloadLimit -= records.length;
+ return applyRecords(records, failedBefore);
+ });
+
+ // While we still have chunks, process the next chunks, but don't exceed the batch size.
+ while (downloadLimit > 0 && this.chunks.length) {
+ let limit = Math.min(downloadLimit, this.engine.downloadBatchSize);
+ await this._processNextChunk(limit, async records => {
+ downloadLimit -= records.length;
+ let failed = await applyRecords(records, false);
+ this.addFailed(failed);
+ });
+ }
+ }
+
+ // yields [ids, haveFailedBefore]
+ * _idGroups(failedPrevArray, toFetchArray) {
+ let groupSize = this.engine.guidFetchBatchSize;
+ for (let ids of PlacesSyncUtils.chunkArray(failedPrevArray, groupSize)) {
+ if (ids.length) {
+ yield [ids, true];
+ }
+ }
+
+ for (let ids of PlacesSyncUtils.chunkArray(toFetchArray, groupSize)) {
+ if (ids.length) {
+ yield [ids, false];
+ }
+ }
+ }
+
+ async _processIDs(failedInCurrentSync, limit, applyCallback) {
+ // Note that we intentionally overwrite the previously failed list here.
+ // Records that fail to decrypt or apply in two consecutive syncs are likely
+ // corrupt; we remove them from the list because retrying and failing on
+ // every subsequent sync just adds noise.
+ let failedPrev = this.previousFailed;
+ this.previousFailed = failedInCurrentSync;
+ let toFetch = this.toFetch;
+ if (!failedPrev.size && !toFetch.size) {
+ return;
+ }
+
+ let idResource = this.engine.itemSource();
+ idResource.sort = "newest";
+ idResource.full = true;
+
+ let toFetchArray = Array.from(Utils.subsetOfSize(toFetch, limit));
+ let prevFailedArray = Array.from(failedPrev);
+ for (let [ids, failedBefore] in this._idGroups(prevFailedArray, toFetchArray)) {
+ idResource.ids = ids;
+ let { records } = await this._fetch(idResource);
+ let failed = await applyCallback(records, failedBefore);
+ this.removeToFetch(records);
+ if (!failedBefore) {
+ this.addFailed(failed);
+ }
+ }
+ }
+
+ async _processNextChunk(limit, applyCallback) {
+ if (!this.chunks.length) {
+ return;
+ }
+ let chunk = this.chunks[0];
+ let resource = this.engine.itemSource();
+ limit = Math.min(limit, this.limit);
+
+ resource.full = true;
+ resource.sort = "oldest";
+ resource.limit = limit;
+ resource.older = chunk.older;
+ resource.newer = chunk.newer;
+
+ if (chunk.offset) {
+ resource.offset = chunk.offset;
+ }
+
+ let response = await resource.get();
+ let records = this._getRecordsFromResponse(response);
+
+ // This should throw if applying failed badly enough that we shouldn't
+ // update our chunkStorage.
+ await applyCallback(records);
+
+ let nextOffset = response.headers["x-weave-next-offset"];
+ if (records.length < limit || !nextOffset) {
+ this.chunks.shift();
+ } else {
+ Object.assign(this.chunks[0], {
+ offset: nextOffset,
+ // newer: records[records.length - 1].modified,
+ });
+ }
+ this.chunkStorage.saveSoon();
+ }
+}