Bug 1435007 - (WIP) Kill toFetch, replace with BackfillManager f?markh,kitcambridge draft
authorThom Chiovoloni <tchiovoloni@mozilla.com>
Thu, 01 Feb 2018 02:47:05 -0500
changeset 779368 244209a3fb15ea09f21a29b128b96fd5b02feb82
parent 779350 15678b283f0f62b7a27afba6e572ef8961d46c69
push id105757
push userbmo:tchiovoloni@mozilla.com
push dateMon, 09 Apr 2018 19:45:15 +0000
bugs1435007
milestone61.0a1
Bug 1435007 - (WIP) Kill toFetch, replace with BackfillManager f?markh,kitcambridge MozReview-Commit-ID: 6UCcBJ9jLVR
services/sync/modules-testing/rotaryengine.js
services/sync/modules/bookmark_repair.js
services/sync/modules/engines.js
services/sync/modules/engines/history.js
services/sync/tests/unit/test_bookmark_engine.js
--- a/services/sync/modules-testing/rotaryengine.js
+++ b/services/sync/modules-testing/rotaryengine.js
@@ -93,19 +93,19 @@ RotaryTracker.prototype = {
   __proto__: Tracker.prototype,
   persistChangedIDs: false,
 };
 
 
 function RotaryEngine(service) {
   SyncEngine.call(this, "Rotary", service);
   // Ensure that the engine starts with a clean slate.
-  this.toFetch        = new SerializableSet();
-  this.previousFailed = new SerializableSet();
+  this.backfillManager.clear();
 }
+
 RotaryEngine.prototype = {
   __proto__: SyncEngine.prototype,
   _storeObj: RotaryStore,
   _trackerObj: RotaryTracker,
   _recordObj: RotaryRecord,
 
   async _findDupe(item) {
     // This is a Special Value® used for testing proper reconciling on dupe
--- a/services/sync/modules/bookmark_repair.js
+++ b/services/sync/modules/bookmark_repair.js
@@ -187,21 +187,18 @@ class BookmarkRepairRequestor extends Co
   tryServerOnlyRepairs(validationInfo) {
     if (this._countServerOnlyFixableProblems(validationInfo) == 0) {
       return false;
     }
     let engine = this.service.engineManager.get("bookmarks");
     for (let id of validationInfo.problems.serverMissing) {
       engine.addForWeakUpload(id);
     }
-    engine.toFetch = Utils.setAddAll(
-      Utils.setAddAll(engine.toFetch,
-                      validationInfo.problems.clientMissing),
-      validationInfo.problems.serverDeleted
-    );
+    engine.backfillManager.addToFetch(validationInfo.problems.clientMissing);
+    engine.backfillManager.addToFetch(validationInfo.problems.serverDeleted);
     return true;
   }
 
   /* See if the repairer is willing and able to begin a repair process given
      the specified validation information.
      Returns true if a repair was started and false otherwise.
   */
   async startRepairs(validationInfo, flowID) {
--- a/services/sync/modules/engines.js
+++ b/services/sync/modules/engines.js
@@ -641,27 +641,17 @@ function SyncEngine(name, service) {
   this._notify = Utils.notify("weave:engine:");
   this._log = Log.repository.getLogger("Sync.Engine." + this.Name);
   this._log.manageLevelFromPref(`services.sync.log.logger.engine.${this.name}`);
 
   this._modified = this.emptyChangeset();
   this._tracker; // initialize tracker to load previously changed IDs
   this._log.debug("Engine constructed");
 
-  this._toFetchStorage = new JSONFile({
-    path: Utils.jsonFilePath("toFetch/" + this.name),
-    dataPostProcessor: json => this._metadataPostProcessor(json),
-    beforeSave: () => this._beforeSaveMetadata(),
-  });
-
-  this._previousFailedStorage = new JSONFile({
-    path: Utils.jsonFilePath("failed/" + this.name),
-    dataPostProcessor: json => this._metadataPostProcessor(json),
-    beforeSave: () => this._beforeSaveMetadata(),
-  });
+  this.backfillManager = new BackfillManager(this);
 
   XPCOMUtils.defineLazyPreferenceGetter(this, "_enabled",
     `services.sync.engine.${this.prefName}`, false,
     (data, previous, latest) =>
       // We do not await on the promise onEngineEnabledChanged returns.
       this._tracker.onEngineEnabledChanged(latest));
   XPCOMUtils.defineLazyPreferenceGetter(this, "_syncID",
                                         `services.sync.${this.name}.syncID`,
@@ -724,36 +714,16 @@ SyncEngine.prototype = {
   // If this is false, we'll throw, otherwise, we'll ignore the record and
   // continue. This currently can only happen due to the record being larger
   // than the record upload limit.
   allowSkippedRecord: true,
 
   // Which sortindex to use when retrieving records for this engine.
   _defaultSort: undefined,
 
-  _metadataPostProcessor(json) {
-    if (Array.isArray(json)) {
-      // Pre-`JSONFile` storage stored an array, but `JSONFile` defaults to
-      // an object, so we wrap the array for consistency.
-      json = { ids: json };
-    }
-    if (!json.ids) {
-      json.ids = [];
-    }
-    // The set serializes the same way as an array, but offers more efficient
-    // methods of manipulation.
-    json.ids = new SerializableSet(json.ids);
-    return json;
-  },
-
-  async _beforeSaveMetadata() {
-    await ensureDirectory(this._toFetchStorage.path);
-    await ensureDirectory(this._previousFailedStorage.path);
-  },
-
   // A relative priority to use when computing an order
   // for engines to be synced. Higher-priority engines
   // (lower numbers) are synced first.
   // It is recommended that a unique value be used for each engine,
   // in order to guarantee a stable sequence.
   syncPriority: 0,
 
   // How many records to pull in a single sync. This is primarily to avoid very
@@ -762,18 +732,17 @@ SyncEngine.prototype = {
 
   // How many records to pull at one time when specifying IDs. This is to avoid
   // URI length limitations.
   guidFetchBatchSize: DEFAULT_GUID_FETCH_BATCH_SIZE,
 
   downloadBatchSize: DEFAULT_DOWNLOAD_BATCH_SIZE,
 
   async initialize() {
-    await this._toFetchStorage.load();
-    await this._previousFailedStorage.load();
+    await this.backfillManager.load();
     this._log.debug("SyncEngine initialized", this.name);
   },
 
   get prefName() {
     return this.name;
   },
 
   get enabled() {
@@ -901,58 +870,35 @@ SyncEngine.prototype = {
    * as a first sync.
    *
    * @return the new sync ID.
    */
   async resetLocalSyncID() {
     return this.ensureCurrentSyncID(Utils.makeGUID());
   },
 
+  addToFetch(coll) {
+    this.backfillManager.addToFetch(coll);
+  },
+
   /*
    * lastSync is a timestamp in server time.
    */
   async getLastSync() {
     return this._lastSync;
   },
   async setLastSync(lastSync) {
     // Store the value as a string to keep floating point precision
     Svc.Prefs.set(this.name + ".lastSync", lastSync.toString());
   },
   async resetLastSync() {
     this._log.debug("Resetting " + this.name + " last sync time");
     await this.setLastSync(0);
   },
 
-  get toFetch() {
-    this._toFetchStorage.ensureDataReady();
-    return this._toFetchStorage.data.ids;
-  },
-
-  set toFetch(ids) {
-    if (ids.constructor.name != "SerializableSet") {
-      throw new Error("Bug: Attempted to set toFetch to something that isn't a SerializableSet");
-    }
-    this._toFetchStorage.data = { ids };
-    this._toFetchStorage.saveSoon();
-  },
-
-  get previousFailed() {
-    this._previousFailedStorage.ensureDataReady();
-    return this._previousFailedStorage.data.ids;
-  },
-
-  set previousFailed(ids) {
-    if (ids.constructor.name != "SerializableSet") {
-      throw new Error(
-        "Bug: Attempted to set previousFailed to something that isn't a SerializableSet");
-    }
-    this._previousFailedStorage.data = { ids };
-    this._previousFailedStorage.saveSoon();
-  },
-
   /*
    * Returns a changeset for this sync. Engine implementations can override this
    * method to bypass the tracker for certain or all changed items.
    */
   async getChangedIDs() {
     return this._tracker.getChangedIDs();
   },
 
@@ -1105,17 +1051,16 @@ SyncEngine.prototype = {
     // newFailed  => number of items that failed for the first time in this sync.
     // reconciled => number of items that were reconciled.
     let count = {applied: 0, failed: 0, newFailed: 0, reconciled: 0};
     let recordsToApply = [];
     let failedInCurrentSync = new SerializableSet();
 
     let oldestModified = this.lastModified;
     let downloadedIDs = new Set();
-
     // Stage 1: Fetch new records from the server, up to the download limit.
     if (this.lastModified == null || this.lastModified > lastSync) {
       let { response, records } = await newitems.getBatched(this.downloadBatchSize);
       if (!response.success) {
         response.failureCode = ENGINE_DOWNLOAD_FAIL;
         throw response;
       }
 
@@ -1147,116 +1092,67 @@ SyncEngine.prototype = {
       // `applied` is a bit of a misnomer: it counts records that *should* be
       // applied, so it also includes records that we tried to apply and failed.
       // `recordsToApply.length - failedToApply.length` is the number of records
       // that we *successfully* applied.
       count.failed += failedToApply.length;
       count.applied += recordsToApply.length;
     }
 
-    // Stage 2: If we reached our download limit, we might still have records
-    // on the server that changed since the last sync. Fetch the IDs for the
-    // remaining records, and add them to the backlog. Note that this stage
-    // only runs for engines that set a download limit.
     if (downloadedIDs.size == downloadLimit) {
-      let guidColl = this.itemSource();
-
-      guidColl.newer = lastSync;
-      guidColl.older = oldestModified;
-      guidColl.sort  = "oldest";
-
-      let guids = await guidColl.get();
-      if (!guids.success)
-        throw guids;
-
-      // Filtering out already downloaded IDs here isn't necessary. We only do
-      // that in case the Sync server doesn't support `older` (bug 1316110).
-      let remainingIDs = guids.obj.filter(id => !downloadedIDs.has(id));
-      if (remainingIDs.length > 0) {
-        this.toFetch = Utils.setAddAll(this.toFetch, remainingIDs);
-      }
+      this.backfillManager.pushBackfillChunk(lastSync, oldestModified);
     }
 
     // Fast-foward the lastSync timestamp since we have backlogged the
     // remaining items.
     if (lastSync < this.lastModified) {
       lastSync = this.lastModified;
       await this.setLastSync(lastSync);
     }
 
     // Stage 3: Backfill records from the backlog, and those that failed to
     // decrypt or apply during the last sync. We only backfill up to the
     // download limit, to prevent a large backlog for one engine from blocking
     // the others. We'll keep processing the backlog on subsequent engine syncs.
-    let failedInPreviousSync = this.previousFailed;
-    let idsToBackfill = Array.from(
-      Utils.setAddAll(Utils.subsetOfSize(this.toFetch, downloadLimit),
-                      failedInPreviousSync));
-
-    // Note that we intentionally overwrite the previously failed list here.
-    // Records that fail to decrypt or apply in two consecutive syncs are likely
-    // corrupt; we remove them from the list because retrying and failing on
-    // every subsequent sync just adds noise.
-    this.previousFailed = failedInCurrentSync;
-
-    let backfilledItems = this.itemSource();
-
-    backfilledItems.sort = "newest";
-    backfilledItems.full = true;
-
-    // `getBatched` includes the list of IDs as a query parameter, so we need to fetch
-    // records in chunks to avoid exceeding URI length limits.
-    for (let ids of PlacesSyncUtils.chunkArray(idsToBackfill, this.guidFetchBatchSize)) {
-      backfilledItems.ids = ids;
-
-      let {response, records} = await backfilledItems.getBatched(this.downloadBatchSize);
-      if (!response.success) {
-        response.failureCode = ENGINE_DOWNLOAD_FAIL;
-        throw response;
-      }
-
-      let maybeYield = Async.jankYielder();
+    let maybeYield = Async.jankYielder();
+    await this.backfillManager.stepBackfill(downloadLimit, failedInCurrentSync,
+                                            async (records, failedBefore) => {
       let backfilledRecordsToApply = [];
       let failedInBackfill = [];
 
       for (let record of records) {
         await maybeYield();
 
-        let { shouldApply, error } = await this._maybeReconcile(record);
+        let { shouldApply, error } = await this.engine._maybeReconcile(record);
         if (error) {
           failedInBackfill.push(record.id);
           count.failed++;
           continue;
         }
         if (!shouldApply) {
           count.reconciled++;
           continue;
         }
         backfilledRecordsToApply.push(record);
       }
-      let failedToApply = await this._applyRecords(backfilledRecordsToApply);
+
+      let failedToApply = await this.engine._applyRecords(backfilledRecordsToApply);
       failedInBackfill.push(...failedToApply);
-
       count.failed += failedToApply.length;
-      count.applied += backfilledRecordsToApply.length;
-
-      this.toFetch = Utils.setDeleteAll(this.toFetch, ids);
-      this.previousFailed = Utils.setAddAll(this.previousFailed, failedInBackfill);
+      if (!failedBefore) {
+        count.newFailed += failedInBackfill.length;
+      }
 
-      if (lastSync < this.lastModified) {
-        lastSync = this.lastModified;
-        await this.setLastSync(lastSync);
-      }
-    }
+      count.applied += backfilledRecordsToApply.length;
+      return failedInBackfill;
+    });
 
-    count.newFailed = 0;
-    for (let item of this.previousFailed) {
-      if (!failedInPreviousSync.has(item)) {
-        ++count.newFailed;
-      }
+    if (lastSync < this.lastModified) {
+      lastSync = this.lastModified;
+      await this.setLastSync(lastSync);
     }
 
     count.succeeded = Math.max(0, count.applied - count.failed);
     this._log.info(["Records:",
                     count.applied, "applied,",
                     count.succeeded, "successfully,",
                     count.failed, "failed to apply,",
                     count.newFailed, "newly failed to apply,",
@@ -1921,19 +1817,18 @@ SyncEngine.prototype = {
    * Removes all local Sync metadata for this engine, but keeps all existing
    * local user data.
    */
   async resetClient() {
     return this._notify("reset-client", this.name, this._resetClient)();
   },
 
   async _resetClient() {
-    await this.resetLastSync();
-    this.previousFailed = new SerializableSet();
-    this.toFetch = new SerializableSet();
+    this.resetLastSync();
+    this.backfillManager.clear();
     this._needWeakUpload.clear();
   },
 
   /**
    * Removes all local Sync metadata and user data for this engine.
    */
   async wipeClient() {
     return this._notify("wipe-client", this.name, this._wipeClient)();
@@ -1954,18 +1849,17 @@ SyncEngine.prototype = {
    * with a getSummary method). Otherwise return null.
    */
   getValidator() {
     return null;
   },
 
   async finalize() {
     await this._tracker.finalize();
-    await this._toFetchStorage.finalize();
-    await this._previousFailedStorage.finalize();
+    await this.backfillManager.finalize();
   },
 };
 
 /**
  * A changeset is created for each sync in `Engine::get{Changed, All}IDs`,
  * and stores opaque change data for tracked IDs. The default implementation
  * only records timestamps, though engines can extend this to store additional
  * data for each entry.
@@ -2031,8 +1925,302 @@ class Changeset {
     return this.ids().length;
   }
 
   // Clears the changeset.
   clear() {
     this.changes = {};
   }
 }
+
+// This class is responsible for backfilling records for engines that require a
+// record backfill. It handles three types of backfill at the same time:
+//
+// 1. `chunks`, which is a list of time ranges that contains records we are
+//    missing (more on this below).
+//
+// 2. `previousFailed`, which is a set of individual ids that failed to apply
+//    which we attempt to fetch a second time (but after failing twice are
+//    discarded)
+//
+// 3. `toFetch`, which is also a set of individual ids we need to request.
+//    Unlike `previousFailed`, these are either explicitly requested by the
+//    engine, (as is the case with the bookmarks engine during repair), or they
+//    were written to the `toFetch` before we introduced the chunked backfill
+//    mechanism (before that `toFetch` was used instead).
+//
+// Each of these is persisted in separate files, as they're typically updated
+// separately, and the updates may happen multiple times across the duration of
+// a sync.
+//
+// More on chunks: The behavior here is similar to Android and iOS sync's notion
+// of a high water mark, which maintains the `offset`, and `older` parameters
+// needed for the next fetch.
+//
+// Where this code differs is that desktop doesn't want to backfill records it
+// has already synced, so it stores the `newer` parameter as the oldest item
+// synced outside of the backfill. However, if we haven't completed the previous
+// backfill when another backfill would be required, we may need multiple ranges
+// (chunks) of `{older, newer, offset}` for backfilling. (See also Bug 1435007)
+//
+// These "chunks" are persisted to/loaded from the toFetch file at startup.
+// Previous versions of the toFetch file stored a set of IDs. Conceptually this
+// class maintains multiple ongoing `getBatched` calls, which can continue
+// across restarts.
+class BackfillManager {
+
+  constructor(engine, limit = engine.downloadLimit) {
+    this._log = Log.repository.getLogger(`Sync.Engine.${engine.Name}.Backfill`);
+    this.engine = engine;
+    this.limit = limit;
+
+    this.failedStorage  = this._makeJSONFile("failed", BackfillManager._onLoadIDs);
+    this.toFetchStorage = this._makeJSONFile("toFetch", BackfillManager._onLoadIDs);
+    this.chunksStorage  = this._makeJSONFile("chunks", BackfillManager._onLoadChunks);
+  }
+
+  _makeJSONFile(folder, dataPostProcessor) {
+    const path = Utils.jsonFilePath(folder + "/" + this.engine.name);
+    return new JSONFile({
+      path,
+      dataPostProcessor,
+      beforeSave: () => ensureDirectory(path)
+    });
+  }
+
+  static _onLoadIDs(json) {
+    if (Array.isArray(json)) {
+      // Pre-`JSONFile` storage stored an array, but `JSONFile` defaults to
+      // an object, so we wrap the array for consistency.
+      json = { ids: json };
+    }
+    if (!json.ids) {
+      json.ids = [];
+    }
+    // The set serializes the same way as an array, but offers more efficient
+    // methods of manipulation.
+    json.ids = new SerializableSet(json.ids);
+    return json;
+  }
+
+  static _onLoadChunks(json) {
+    if (!json.chunks) {
+      json.chunks = [];
+    }
+    return json;
+  }
+
+  async load() {
+    await Promise.all([
+      this.chunkStorage.load(),
+      this.failedStorage.load(),
+      this.toFetchStorage.load(),
+    ]);
+  }
+
+  async finalize() {
+    await Promise.all([
+      this.failedStorage.finalize(),
+      this.chunkStorage.finalize(),
+      this.toFetchStorage.finalize(),
+    ]);
+  }
+
+  reset() {
+    this.previousFailed = new SerializableSet();
+    this.toFetch = new SerializableSet();
+    this.chunks = [];
+  }
+
+  pushBackfillChunk(older, newer) {
+    let chunk = { older, newer, offset: null };
+    this.chunks.push(chunk);
+    this.chunkStorage.saveSoon();
+  }
+
+  get chunks() {
+    this.chunkStorage.ensureDataReady();
+    return this.chunkStorage.data.chunks;
+  }
+
+  set chunks(chunks) {
+    this.chunkStorage.data = { chunks };
+    this.chunkStorage.saveSoon();
+  }
+
+  get previousFailed() {
+    this.failedStorage.ensureDataReady();
+    return this.failedStorage.data.ids;
+  }
+
+  set previousFailed(ids) {
+    if (ids.constructor.name != "SerializableSet") {
+      throw new Error("Bug: Attempted to set previousFailed to something that isn't a SerializableSet");
+    }
+    this.failedStorage.data = { ids };
+    this.failedStorage.saveSoon();
+  }
+
+  get toFetch() {
+    this.toFetchStorage.ensureDataReady();
+    return this.toFetchStorage.data.ids;
+  }
+
+  set toFetch(ids) {
+    if (ids.constructor.name != "SerializableSet") {
+      throw new Error("Bug: Attempted to set toFetch to something that isn't a SerializableSet");
+    }
+    this.toFetchStorage.data = { ids };
+    this.toFetchStorage.saveSoon();
+  }
+
+  addFailed(ids) {
+    if (ids.length) {
+      this.previousFailed = Utils.setAddAll(this.previousFailed, ids);
+    }
+  }
+
+  addToFetch(ids) {
+    if (ids.length) {
+      this.toFetch = Utils.setAddAll(this.toFetch, ids);
+    }
+  }
+
+  removeToFetch(ids) {
+    // If we have a lot of items in toFetch, this gets called frequently enough
+    // that it's worth avoiding triggering the saveSoon in the case where there
+    // were no changes.
+    let startSize = this.toFetch.size;
+    let toFetch = Utils.setDeleteAll(this.toFetch, ids);
+    if (toFetch.size != startSize) {
+      this.toFetch = toFetch;
+    }
+  }
+
+  async _fetch(resource) {
+    // For backfilling we don't want XIUS semantics, so we don't use getBatched.
+    // (We also handle the batching ourselves, but in a resumable way).
+    let response = await resource.get();
+    if (!response.success) {
+      response.failureCode = ENGINE_DOWNLOAD_FAIL;
+      throw response;
+    }
+    let records = response.obj.map(json => {
+      let record = new this.engine._recordObj();
+      record.deserialize(json);
+      return record;
+    });
+    return { records, response };
+  }
+
+  /**
+   * Fetch and apply items from the backfill.
+   *
+   * @param limit {number}
+   *        The maximum number of items to fetch
+   * @param failedInCurrentSync {SerializableSet}
+   *        Set of ids that failed in the current sync.
+   * @param applyRecords {async(records: Array, failedBefore: boolean) => Array}
+   *        A callback that takes the list of records and whether or not these
+   *        records have failed before (for recording newFailed in telemetry).
+   *        It should return an array of ids of records that failed to apply.
+   */
+  async stepBackfill(limit, failedInCurrentSync, applyRecords) {
+    let downloadLimit = Math.min(this.limit, limit);
+
+    await this._processIDs(failedInCurrentSync, downloadLimit, async (records, failedBefore) => {
+      downloadLimit -= records.length;
+      return applyRecords(records, failedBefore);
+    });
+
+    // While we still have chunks, process the next chunks, but don't exceed the batch size.
+    while (downloadLimit > 0 && this.chunks.length) {
+      let limit = Math.min(downloadLimit, this.engine.downloadBatchSize);
+      await this._processNextChunk(limit, async records => {
+        downloadLimit -= records.length;
+        let failed = await applyRecords(records, false);
+        this.addFailed(failed);
+      });
+    }
+  }
+
+  // yields [ids, haveFailedBefore]
+  * _idGroups(failedPrevArray, toFetchArray) {
+    let groupSize = this.engine.guidFetchBatchSize;
+    for (let ids of PlacesSyncUtils.chunkArray(failedPrevArray, groupSize)) {
+      if (ids.length) {
+        yield [ids, true];
+      }
+    }
+
+    for (let ids of PlacesSyncUtils.chunkArray(toFetchArray, groupSize)) {
+      if (ids.length) {
+        yield [ids, false];
+      }
+    }
+  }
+
+  async _processIDs(failedInCurrentSync, limit, applyCallback) {
+    // Note that we intentionally overwrite the previously failed list here.
+    // Records that fail to decrypt or apply in two consecutive syncs are likely
+    // corrupt; we remove them from the list because retrying and failing on
+    // every subsequent sync just adds noise.
+    let failedPrev = this.previousFailed;
+    this.previousFailed = failedInCurrentSync;
+    let toFetch = this.toFetch;
+    if (!failedPrev.size && !toFetch.size) {
+      return;
+    }
+
+    let idResource = this.engine.itemSource();
+    idResource.sort = "newest";
+    idResource.full = true;
+
+    let toFetchArray = Array.from(Utils.subsetOfSize(toFetch, limit));
+    let prevFailedArray = Array.from(failedPrev);
+    for (let [ids, failedBefore] in this._idGroups(prevFailedArray, toFetchArray)) {
+      idResource.ids = ids;
+      let { records } = await this._fetch(idResource);
+      let failed = await applyCallback(records, failedBefore);
+      this.removeToFetch(records);
+      if (!failedBefore) {
+        this.addFailed(failed);
+      }
+    }
+  }
+
+  async _processNextChunk(limit, applyCallback) {
+    if (!this.chunks.length) {
+      return;
+    }
+    let chunk = this.chunks[0];
+    let resource = this.engine.itemSource();
+    limit = Math.min(limit, this.limit);
+
+    resource.full = true;
+    resource.sort = "oldest";
+    resource.limit = limit;
+    resource.older = chunk.older;
+    resource.newer = chunk.newer;
+
+    if (chunk.offset) {
+      resource.offset = chunk.offset;
+    }
+
+    let response = await resource.get();
+    let records = this._getRecordsFromResponse(response);
+
+    // This should throw if applying failed badly enough that we shouldn't
+    // update our chunkStorage.
+    await applyCallback(records);
+
+    let nextOffset = response.headers["x-weave-next-offset"];
+    if (records.length < limit || !nextOffset) {
+      this.chunks.shift();
+    } else {
+      Object.assign(this.chunks[0], {
+        offset: nextOffset,
+        // newer: records[records.length - 1].modified,
+      });
+    }
+    this.chunkStorage.saveSoon();
+  }
+}
--- a/services/sync/modules/engines/history.js
+++ b/services/sync/modules/engines/history.js
@@ -39,16 +39,17 @@ function HistoryEngine(service) {
 HistoryEngine.prototype = {
   __proto__: SyncEngine.prototype,
   _recordObj: HistoryRec,
   _storeObj: HistoryStore,
   _trackerObj: HistoryTracker,
   downloadLimit: MAX_HISTORY_DOWNLOAD,
 
   syncPriority: 7,
+  useChunkedBackfill: true,
 
   _migratedSyncMetadata: false,
   async _migrateSyncMetadata() {
     if (this._migratedSyncMetadata) {
       return;
     }
     // Migrate the history sync ID and last sync time from prefs, to avoid
     // triggering a full sync on upgrade. This can be removed in bug 1443021.
--- a/services/sync/tests/unit/test_bookmark_engine.js
+++ b/services/sync/tests/unit/test_bookmark_engine.js
@@ -203,17 +203,17 @@ add_bookmark_test(async function test_pr
     let bogus_record = collection.insert(BOGUS_GUID, "I'm a bogus record!");
     bogus_record.get = function get() {
       throw new Error("Sync this!");
     };
 
     // Make the 10 minutes old so it will only be synced in the toFetch phase.
     bogus_record.modified = Date.now() / 1000 - 60 * 10;
     await engine.setLastSync(Date.now() / 1000 - 60);
-    engine.toFetch = new SerializableSet([BOGUS_GUID]);
+    engine.addToFetch([BOGUS_GUID]);
 
     let error;
     try {
       await sync_engine_and_validate_telem(engine, true);
     } catch (ex) {
       error = ex;
     }
     ok(!!error);