Bug 1368209 - Refactor `Engine::_processIncoming` into three stages. r=eoger,tcsc draft
authorKit Cambridge <kit@yakshaving.ninja>
Wed, 01 Nov 2017 11:09:57 -0700
changeset 692869 b235bfc81d73b50e54686f5b34816d36d6012e51
parent 692825 91a81dccbd167dc1974476ad44318146132166d3
child 692870 695f61169cd512482aa74370ebcf6476d888ed7c
child 693021 2ab00bab6ce950b048903976a89f96499bc1a562
child 693023 c86692ad68694cd864204ea552896428f51fe8c4
push id87630
push userbmo:kit@mozilla.com
push dateFri, 03 Nov 2017 17:31:38 +0000
reviewerseoger, tcsc
bugs1368209
milestone58.0a1
Bug 1368209 - Refactor `Engine::_processIncoming` into three stages. r=eoger,tcsc * In the first stage, we fetch changed records, newest first, up to the download limit. We keep track of the oldest record modified time we see. * Once we've fetched all records, we reconcile, noting records that fail to decrypt or reconcile for the next sync. We then ask the store to apply all remaining records. Previously, `applyIncomingBatchSize` specified how many records to apply at a time. I removed this because it added an extra layer of indirection that's no longer necessary, now that download batching buffers all records in memory, and all stores are async. * In the second stage, we fetch IDs for all remaining records changed between the last sync and the oldest modified time we saw in the first stage. We *don't* set the download limit here, to ensure we add *all* changed records to our backlog, and we use the `"oldest"` sort order instead of `"index"`. * In the third stage, we backfill as before. We don't want large deltas to delay other engines from syncing, so we still only take IDs up to the download limit from the backlog, and include failed IDs from the previous sync. On subsequent syncs, we'll keep fetching from the backlog until it's empty. Other changes to note in this patch: * `Collection::_rebuildURL` now allows callers to specify both `older` and `newer`. According to :rfkelly, this is explicitly and intentionally supported. * Tests that exercise `applyIncomingBatchSize` are gone, since that's no longer a thing. * The test server now shuffles records if the sort order is unspecified. MozReview-Commit-ID: 4EXvNOa8mIo
services/sync/modules/constants.js
services/sync/modules/engines.js
services/sync/modules/engines/forms.js
services/sync/modules/engines/history.js
services/sync/modules/engines/passwords.js
services/sync/modules/record.js
services/sync/tests/unit/head_http_server.js
services/sync/tests/unit/test_history_engine.js
services/sync/tests/unit/test_syncengine_sync.js
testing/modules/TestUtils.jsm
toolkit/components/places/PlacesSyncUtils.jsm
--- a/services/sync/modules/constants.js
+++ b/services/sync/modules/constants.js
@@ -49,22 +49,16 @@ HMAC_EVENT_INTERVAL:                   6
 MASTER_PASSWORD_LOCKED_RETRY_INTERVAL: 15 * 60 * 1000,   // 15 minutes
 
 // 50 is hardcoded here because of URL length restrictions.
 // (GUIDs can be up to 64 chars long.)
 // Individual engines can set different values for their limit if their
 // identifiers are shorter.
 DEFAULT_GUID_FETCH_BATCH_SIZE:         50,
 
-// Default batch size for applying incoming records.
-DEFAULT_STORE_BATCH_SIZE:              1,
-HISTORY_STORE_BATCH_SIZE:              50,
-FORMS_STORE_BATCH_SIZE:                50,
-PASSWORDS_STORE_BATCH_SIZE:            50,
-
 // Default batch size for download batching
 // (how many records are fetched at a time from the server when batching is used).
 DEFAULT_DOWNLOAD_BATCH_SIZE:           1000,
 
 // score thresholds for early syncs
 SINGLE_USER_THRESHOLD:                 1000,
 MULTI_DEVICE_THRESHOLD:                300,
 
--- a/services/sync/modules/engines.js
+++ b/services/sync/modules/engines.js
@@ -19,20 +19,21 @@ Cu.import("resource://gre/modules/Log.js
 Cu.import("resource://services-common/async.js");
 Cu.import("resource://services-common/observers.js");
 Cu.import("resource://services-common/utils.js");
 Cu.import("resource://services-sync/constants.js");
 Cu.import("resource://services-sync/record.js");
 Cu.import("resource://services-sync/resource.js");
 Cu.import("resource://services-sync/util.js");
 
-XPCOMUtils.defineLazyModuleGetter(this, "fxAccounts",
-  "resource://gre/modules/FxAccounts.jsm");
-XPCOMUtils.defineLazyModuleGetter(this, "OS",
-                                  "resource://gre/modules/osfile.jsm");
+XPCOMUtils.defineLazyModuleGetters(this, {
+  fxAccounts: "resource://gre/modules/FxAccounts.jsm",
+  OS: "resource://gre/modules/osfile.jsm",
+  PlacesSyncUtils: "resource://gre/modules/PlacesSyncUtils.jsm",
+});
 
 /*
  * Trackers are associated with a single engine and deal with
  * listening for changes to their particular data type.
  *
  * There are two things they keep track of:
  * 1) A score, indicating how urgently the engine wants to sync
  * 2) A list of IDs for all the changed items that need to be synced
@@ -821,19 +822,16 @@ SyncEngine.prototype = {
   // How many records to pull in a single sync. This is primarily to avoid very
   // long first syncs against profiles with many history records.
   downloadLimit: null,
 
   // How many records to pull at one time when specifying IDs. This is to avoid
   // URI length limitations.
   guidFetchBatchSize: DEFAULT_GUID_FETCH_BATCH_SIZE,
 
-  // How many records to process in a single batch.
-  applyIncomingBatchSize: DEFAULT_STORE_BATCH_SIZE,
-
   downloadBatchSize: DEFAULT_DOWNLOAD_BATCH_SIZE,
 
   async initialize() {
     await this.loadToFetch();
     await this.loadPreviousFailed();
     this._log.debug("SyncEngine initialized", this.name);
   },
 
@@ -1060,314 +1058,324 @@ SyncEngine.prototype = {
    * A tiny abstraction to make it easier to test incoming record
    * application.
    */
   itemSource() {
     return new Collection(this.engineURL, this._recordObj, this.service);
   },
 
   /**
-   * Process incoming records.
-   * In the most awful and untestable way possible.
-   * This now accepts something that makes testing vaguely less impossible.
+   * Download and apply remote records changed since the last sync. This
+   * happens in three stages.
+   *
+   * In the first stage, we fetch full records for all changed items, newest
+   * first, up to the download limit. The limit lets us make progress for large
+   * collections, where the sync is likely to be interrupted before we
+   * can fetch everything.
+   *
+   * In the second stage, we fetch the IDs of any remaining records changed
+   * since the last sync, add them to our backlog, and fast-forward our last
+   * sync time.
+   *
+   * In the third stage, we fetch and apply records for all backlogged IDs,
+   * as well as any records that failed to apply during the last sync. We
+   * request records for the IDs in chunks, to avoid exceeding URL length
+   * limits, then remove successfully applied records from the backlog, and
+   * record IDs of any records that failed to apply to retry on the next sync.
    */
-  async _processIncoming(newitems) {
+  async _processIncoming() {
     this._log.trace("Downloading & applying server changes");
 
-    // Figure out how many total items to fetch this sync; do less on mobile.
-    let batchSize = this.downloadLimit || Infinity;
-
-    if (!newitems) {
-      newitems = this.itemSource();
-    }
-
-    if (this._defaultSort) {
-      newitems.sort = this._defaultSort;
-    }
+    let newitems = this.itemSource();
 
     newitems.newer = this.lastSync;
     newitems.full  = true;
-    newitems.limit = batchSize;
+
+    let downloadLimit = Infinity;
+    if (this.downloadLimit) {
+      // Fetch new records up to the download limit. Currently, only the history
+      // engine sets a limit, since the history collection has the highest volume
+      // of changed records between syncs. The other engines fetch all records
+      // changed since the last sync.
+      if (this._defaultSort) {
+        // A download limit with a sort order doesn't make sense: we won't know
+        // which records to backfill.
+        throw new Error("Can't specify download limit with default sort order");
+      }
+      newitems.sort = "newest";
+      downloadLimit = newitems.limit = this.downloadLimit;
+    } else if (this._defaultSort) {
+      // The bookmarks engine fetches records by sort index; other engines leave
+      // the order unspecified. We can remove `_defaultSort` entirely after bug
+      // 1305563: the sort index won't matter because we'll buffer all bookmarks
+      // before applying.
+      newitems.sort = this._defaultSort;
+    }
 
     // applied    => number of items that should be applied.
     // failed     => number of items that failed in this sync.
     // newFailed  => number of items that failed for the first time in this sync.
     // reconciled => number of items that were reconciled.
     let count = {applied: 0, failed: 0, newFailed: 0, reconciled: 0};
-    let handled = [];
-    let applyBatch = [];
-    let failed = [];
-    let failedInPreviousSync = this.previousFailed;
-    let fetchBatch = Utils.arrayUnion(this.toFetch, failedInPreviousSync);
-    // Reset previousFailed for each sync since previously failed items may not fail again.
-    this.previousFailed = [];
-
-    // Used (via exceptions) to allow the record handler/reconciliation/etc.
-    // methods to signal that they would like processing of incoming records to
-    // cease.
-    let aborting = undefined;
-
-    async function doApplyBatch() {
-      this._tracker.ignoreAll = true;
-      try {
-        failed = failed.concat((await this._store.applyIncomingBatch(applyBatch)));
-      } catch (ex) {
-        if (Async.isShutdownException(ex)) {
-          throw ex;
-        }
-        // Catch any error that escapes from applyIncomingBatch. At present
-        // those will all be abort events.
-        this._log.warn("Got exception, aborting processIncoming", ex);
-        aborting = ex;
-      }
-      this._tracker.ignoreAll = false;
-      applyBatch = [];
-    }
-
-    async function doApplyBatchAndPersistFailed() {
-      // Apply remaining batch.
-      if (applyBatch.length) {
-        await doApplyBatch.call(this);
-      }
-      // Persist failed items so we refetch them.
-      if (failed.length) {
-        this.previousFailed = Utils.arrayUnion(failed, this.previousFailed);
-        count.failed += failed.length;
-        this._log.debug("Records that failed to apply: " + failed);
-        failed = [];
-      }
-    }
-
-    let key = this.service.collectionKeys.keyForCollection(this.name);
-
-    // Not binding this method to 'this' for performance reasons. It gets
-    // called for every incoming record.
-    let self = this;
-
-    let recordHandler = async function(item) {
-      if (aborting) {
-        return;
-      }
-
-      // Grab a later last modified if possible
-      if (self.lastModified == null || item.modified > self.lastModified)
-        self.lastModified = item.modified;
-
-      // Track the collection for the WBO.
-      item.collection = self.name;
-
-      // Remember which records were processed
-      handled.push(item.id);
+    let recordsToApply = [];
+    let failedInCurrentSync = [];
 
-      try {
-        try {
-          await item.decrypt(key);
-        } catch (ex) {
-          if (!Utils.isHMACMismatch(ex)) {
-            throw ex;
-          }
-          let strategy = await self.handleHMACMismatch(item, true);
-          if (strategy == SyncEngine.kRecoveryStrategy.retry) {
-            // You only get one retry.
-            try {
-              // Try decrypting again, typically because we've got new keys.
-              self._log.info("Trying decrypt again...");
-              key = self.service.collectionKeys.keyForCollection(self.name);
-              await item.decrypt(key);
-              strategy = null;
-            } catch (ex) {
-              if (!Utils.isHMACMismatch(ex)) {
-                throw ex;
-              }
-              strategy = await self.handleHMACMismatch(item, false);
-            }
-          }
+    let oldestModified = this.lastModified;
+    let downloadedIDs = new Set();
 
-          switch (strategy) {
-            case null:
-              // Retry succeeded! No further handling.
-              break;
-            case SyncEngine.kRecoveryStrategy.retry:
-              self._log.debug("Ignoring second retry suggestion.");
-              // Fall through to error case.
-            case SyncEngine.kRecoveryStrategy.error:
-              self._log.warn("Error decrypting record", ex);
-              failed.push(item.id);
-              return;
-            case SyncEngine.kRecoveryStrategy.ignore:
-              self._log.debug("Ignoring record " + item.id +
-                              " with bad HMAC: already handled.");
-              return;
-          }
-        }
-      } catch (ex) {
-        if (Async.isShutdownException(ex)) {
-          throw ex;
-        }
-        self._log.warn("Error decrypting record", ex);
-        failed.push(item.id);
-        return;
-      }
-
-      if (self._shouldDeleteRemotely(item)) {
-        self._log.trace("Deleting item from server without applying", item);
-        self._deleteId(item.id);
-        return;
-      }
-
-      let shouldApply;
-      try {
-        shouldApply = await self._reconcile(item);
-      } catch (ex) {
-        if (ex.code == Engine.prototype.eEngineAbortApplyIncoming) {
-          self._log.warn("Reconciliation failed: aborting incoming processing.");
-          failed.push(item.id);
-          aborting = ex.cause;
-        } else if (!Async.isShutdownException(ex)) {
-          self._log.warn("Failed to reconcile incoming record " + item.id, ex);
-          failed.push(item.id);
-          return;
-        } else {
-          throw ex;
-        }
-      }
-
-      if (shouldApply) {
-        count.applied++;
-        applyBatch.push(item);
-      } else {
-        count.reconciled++;
-        self._log.trace("Skipping reconciled incoming item " + item.id);
-      }
-
-      if (applyBatch.length == self.applyIncomingBatchSize) {
-        await doApplyBatch.call(self);
-      }
-    };
-
-    // Only bother getting data from the server if there's new things
+    // Stage 1: Fetch new records from the server, up to the download limit.
     if (this.lastModified == null || this.lastModified > this.lastSync) {
       let { response, records } = await newitems.getBatched(this.downloadBatchSize);
       if (!response.success) {
         response.failureCode = ENGINE_DOWNLOAD_FAIL;
         throw response;
       }
 
       let maybeYield = Async.jankYielder();
       for (let record of records) {
         await maybeYield();
-        await recordHandler(record);
-      }
-      await doApplyBatchAndPersistFailed.call(this);
+        downloadedIDs.add(record.id);
+
+        if (record.modified < oldestModified) {
+          oldestModified = record.modified;
+        }
 
-      if (aborting) {
-        throw aborting;
+        let { shouldApply, error } = await this._maybeReconcile(record);
+        if (error) {
+          failedInCurrentSync.push(record.id);
+          count.failed++;
+          continue;
+        }
+        if (!shouldApply) {
+          count.reconciled++;
+          continue;
+        }
+        recordsToApply.push(record);
       }
+
+      let failedToApply = await this._applyRecords(recordsToApply);
+      failedInCurrentSync.push(...failedToApply);
+
+      // `applied` is a bit of a misnomer: it counts records that *should* be
+      // applied, so it also includes records that we tried to apply and failed.
+      // `recordsToApply.length - failedToApply.length` is the number of records
+      // that we *successfully* applied.
+      count.failed += failedToApply.length;
+      count.applied += recordsToApply.length;
     }
 
-    // History: check if we got the maximum that we requested; get the rest if so.
-    if (handled.length == newitems.limit) {
-      // XXX - this block appears to have no test coverage (eg, throwing here,
-      // or commenting the entire block causes no tests to fail.)
-      // See bug 1368951 comment 3 for some insightful analysis of why this
-      // might not be doing what we expect anyway, so it may be the case that
-      // this needs both fixing *and* tests.
-      let guidColl = new Collection(this.engineURL, null, this.service);
+    // Stage 2: If we reached our download limit, we might still have records
+    // on the server that changed since the last sync. Fetch the IDs for the
+    // remaining records, and add them to the backlog. Note that this stage
+    // only runs for engines that set a download limit.
+    if (downloadedIDs.size == downloadLimit) {
+      let guidColl = this.itemSource();
 
-      // Sort and limit so that we only get the last X records.
-      guidColl.limit = this.downloadLimit;
       guidColl.newer = this.lastSync;
-
-      // index: Orders by the sortindex descending (highest weight first).
-      guidColl.sort  = "index";
+      guidColl.older = oldestModified;
+      guidColl.sort  = "oldest";
 
       let guids = await guidColl.get();
       if (!guids.success)
         throw guids;
 
-      // Figure out which guids weren't just fetched then remove any guids that
-      // were already waiting and prepend the new ones
-      let extra = Utils.arraySub(guids.obj, handled);
-      if (extra.length > 0) {
-        fetchBatch = Utils.arrayUnion(extra, fetchBatch);
-        this.toFetch = Utils.arrayUnion(extra, this.toFetch);
+      // Filtering out already downloaded IDs here isn't necessary. We only do
+      // that in case the Sync server doesn't support `older` (bug 1316110).
+      let remainingIDs = guids.obj.filter(id => !downloadedIDs.has(id));
+      if (remainingIDs.length > 0) {
+        this.toFetch = Utils.arrayUnion(this.toFetch, remainingIDs);
       }
     }
 
-    // Fast-foward the lastSync timestamp since we have stored the
-    // remaining items in toFetch.
+    // Fast-foward the lastSync timestamp since we have backlogged the
+    // remaining items.
     if (this.lastSync < this.lastModified) {
       this.lastSync = this.lastModified;
     }
 
-    // Process any backlog of GUIDs.
-    // At this point we impose an upper limit on the number of items to fetch
-    // in a single request, even for desktop, to avoid hitting URI limits.
-    batchSize = this.guidFetchBatchSize;
+    // Stage 3: Backfill records from the backlog, and those that failed to
+    // decrypt or apply during the last sync. We only backfill up to the
+    // download limit, to prevent a large backlog for one engine from blocking
+    // the others. We'll keep processing the backlog on subsequent engine syncs.
+    let failedInPreviousSync = this.previousFailed;
+    let idsToBackfill = Utils.arrayUnion(this.toFetch.slice(0, downloadLimit),
+      failedInPreviousSync);
 
-    while (batchSize && fetchBatch.length && !aborting) {
-      // Reuse the original query, but get rid of the restricting params
-      // and batch remaining records.
-      newitems.limit = 0;
-      newitems.newer = 0;
-      newitems.ids = fetchBatch.slice(0, batchSize);
+    // Note that we intentionally overwrite the previously failed list here.
+    // Records that fail to decrypt or apply in two consecutive syncs are likely
+    // corrupt; we remove them from the list because retrying and failing on
+    // every subsequent sync just adds noise.
+    this.previousFailed = failedInCurrentSync;
+
+    let backfilledItems = this.itemSource();
 
-      let resp = await newitems.get();
+    backfilledItems.sort = "newest";
+    backfilledItems.full = true;
+
+    // `get` includes the list of IDs as a query parameter, so we need to fetch
+    // records in chunks to avoid exceeding URI length limits.
+    for (let ids of PlacesSyncUtils.chunkArray(idsToBackfill, this.guidFetchBatchSize)) {
+      backfilledItems.ids = ids;
+
+      let resp = await backfilledItems.get();
       if (!resp.success) {
         resp.failureCode = ENGINE_DOWNLOAD_FAIL;
         throw resp;
       }
 
       let maybeYield = Async.jankYielder();
+      let backfilledRecordsToApply = [];
+      let failedInBackfill = [];
+
       for (let json of resp.obj) {
         await maybeYield();
         let record = new this._recordObj();
         record.deserialize(json);
-        await recordHandler(record);
-      }
 
-      // This batch was successfully applied. Not using
-      // doApplyBatchAndPersistFailed() here to avoid writing toFetch twice.
-      fetchBatch = fetchBatch.slice(batchSize);
-      this.toFetch = Utils.arraySub(this.toFetch, newitems.ids);
-      this.previousFailed = Utils.arrayUnion(this.previousFailed, failed);
-      if (failed.length) {
-        count.failed += failed.length;
-        this._log.debug("Records that failed to apply: " + failed);
+        let { shouldApply, error } = await this._maybeReconcile(record);
+        if (error) {
+          failedInBackfill.push(record.id);
+          count.failed++;
+          continue;
+        }
+        if (!shouldApply) {
+          count.reconciled++;
+          continue;
+        }
+        backfilledRecordsToApply.push(record);
       }
-      failed = [];
+      let failedToApply = await this._applyRecords(backfilledRecordsToApply);
+      failedInBackfill.push(...failedToApply);
 
-      if (aborting) {
-        throw aborting;
-      }
+      count.failed += failedToApply.length;
+      count.applied += backfilledRecordsToApply.length;
+
+      this.toFetch = Utils.arraySub(this.toFetch, ids);
+      this.previousFailed = Utils.arrayUnion(this.previousFailed, failedInBackfill);
 
       if (this.lastSync < this.lastModified) {
         this.lastSync = this.lastModified;
       }
     }
 
-    // Apply remaining items.
-    await doApplyBatchAndPersistFailed.call(this);
-
     count.newFailed = this.previousFailed.reduce((count, engine) => {
       if (failedInPreviousSync.indexOf(engine) == -1) {
         count++;
       }
       return count;
     }, 0);
     count.succeeded = Math.max(0, count.applied - count.failed);
     this._log.info(["Records:",
                     count.applied, "applied,",
                     count.succeeded, "successfully,",
                     count.failed, "failed to apply,",
                     count.newFailed, "newly failed to apply,",
                     count.reconciled, "reconciled."].join(" "));
     Observers.notify("weave:engine:sync:applied", count, this.name);
   },
 
+  async _maybeReconcile(item) {
+    let key = this.service.collectionKeys.keyForCollection(this.name);
+
+    // Grab a later last modified if possible
+    if (this.lastModified == null || item.modified > this.lastModified) {
+      this.lastModified = item.modified;
+    }
+
+    try {
+      try {
+        await item.decrypt(key);
+      } catch (ex) {
+        if (!Utils.isHMACMismatch(ex)) {
+          throw ex;
+        }
+        let strategy = await this.handleHMACMismatch(item, true);
+        if (strategy == SyncEngine.kRecoveryStrategy.retry) {
+          // You only get one retry.
+          try {
+            // Try decrypting again, typically because we've got new keys.
+            this._log.info("Trying decrypt again...");
+            key = this.service.collectionKeys.keyForCollection(this.name);
+            await item.decrypt(key);
+            strategy = null;
+          } catch (ex) {
+            if (!Utils.isHMACMismatch(ex)) {
+              throw ex;
+            }
+            strategy = await this.handleHMACMismatch(item, false);
+          }
+        }
+
+        switch (strategy) {
+          case null:
+            // Retry succeeded! No further handling.
+            break;
+          case SyncEngine.kRecoveryStrategy.retry:
+            this._log.debug("Ignoring second retry suggestion.");
+            // Fall through to error case.
+          case SyncEngine.kRecoveryStrategy.error:
+            this._log.warn("Error decrypting record", ex);
+            return { shouldApply: false, error: ex };
+          case SyncEngine.kRecoveryStrategy.ignore:
+            this._log.debug("Ignoring record " + item.id +
+                            " with bad HMAC: already handled.");
+            return { shouldApply: false, error: null };
+        }
+      }
+    } catch (ex) {
+      if (Async.isShutdownException(ex)) {
+        throw ex;
+      }
+      this._log.warn("Error decrypting record", ex);
+      return { shouldApply: false, error: ex };
+    }
+
+    if (this._shouldDeleteRemotely(item)) {
+      this._log.trace("Deleting item from server without applying", item);
+      this._deleteId(item.id);
+      return { shouldApply: false, error: null };
+    }
+
+    let shouldApply;
+    try {
+      shouldApply = await this._reconcile(item);
+    } catch (ex) {
+      if (ex.code == Engine.prototype.eEngineAbortApplyIncoming) {
+        this._log.warn("Reconciliation failed: aborting incoming processing.");
+        throw ex.cause;
+      } else if (!Async.isShutdownException(ex)) {
+        this._log.warn("Failed to reconcile incoming record " + item.id, ex);
+        return { shouldApply: false, error: ex };
+      } else {
+        throw ex;
+      }
+    }
+
+    if (!shouldApply) {
+      this._log.trace("Skipping reconciled incoming item " + item.id);
+    }
+
+    return { shouldApply, error: null };
+  },
+
+  async _applyRecords(records) {
+    this._tracker.ignoreAll = true;
+    try {
+      let failedIDs = await this._store.applyIncomingBatch(records);
+      return failedIDs;
+    } catch (ex) {
+      // Catch any error that escapes from applyIncomingBatch. At present
+      // those will all be abort events.
+      this._log.warn("Got exception, aborting processIncoming", ex);
+      throw ex;
+    } finally {
+      this._tracker.ignoreAll = false;
+    }
+  },
+
   // Indicates whether an incoming item should be deleted from the server at
   // the end of the sync. Engines can override this method to clean up records
   // that shouldn't be on the server.
   _shouldDeleteRemotely(remoteItem) {
     return false;
   },
 
   /**
--- a/services/sync/modules/engines/forms.js
+++ b/services/sync/modules/engines/forms.js
@@ -102,17 +102,16 @@ var FormWrapper = {
 this.FormEngine = function FormEngine(service) {
   SyncEngine.call(this, "Forms", service);
 };
 FormEngine.prototype = {
   __proto__: SyncEngine.prototype,
   _storeObj: FormStore,
   _trackerObj: FormTracker,
   _recordObj: FormRec,
-  applyIncomingBatchSize: FORMS_STORE_BATCH_SIZE,
 
   syncPriority: 6,
 
   get prefName() {
     return "history";
   },
 
   async _findDupe(item) {
--- a/services/sync/modules/engines/history.js
+++ b/services/sync/modules/engines/history.js
@@ -42,17 +42,16 @@ this.HistoryEngine = function HistoryEng
   SyncEngine.call(this, "History", service);
 };
 HistoryEngine.prototype = {
   __proto__: SyncEngine.prototype,
   _recordObj: HistoryRec,
   _storeObj: HistoryStore,
   _trackerObj: HistoryTracker,
   downloadLimit: MAX_HISTORY_DOWNLOAD,
-  applyIncomingBatchSize: HISTORY_STORE_BATCH_SIZE,
 
   syncPriority: 7,
 
   async _processIncoming(newitems) {
     // We want to notify history observers that a batch operation is underway
     // so they don't do lots of work for each incoming record.
     let observers = PlacesUtils.history.getObservers();
     function notifyHistoryObservers(notification) {
--- a/services/sync/modules/engines/passwords.js
+++ b/services/sync/modules/engines/passwords.js
@@ -72,18 +72,16 @@ this.PasswordEngine = function PasswordE
   SyncEngine.call(this, "Passwords", service);
 };
 PasswordEngine.prototype = {
   __proto__: SyncEngine.prototype,
   _storeObj: PasswordStore,
   _trackerObj: PasswordTracker,
   _recordObj: LoginRec,
 
-  applyIncomingBatchSize: PASSWORDS_STORE_BATCH_SIZE,
-
   syncPriority: 2,
 
   async _syncFinish() {
     await SyncEngine.prototype._syncFinish.call(this);
 
     // Delete the Weave credentials from the server once.
     if (!Svc.Prefs.get("deletePwdFxA", false)) {
       try {
--- a/services/sync/modules/record.js
+++ b/services/sync/modules/record.js
@@ -613,35 +613,43 @@ Collection.prototype = {
   __proto__: Resource.prototype,
   _logName: "Sync.Collection",
 
   _rebuildURL: function Coll__rebuildURL() {
     // XXX should consider what happens if it's not a URL...
     this.uri.QueryInterface(Ci.nsIURL);
 
     let args = [];
-    if (this.older)
+    if (this.older) {
       args.push("older=" + this.older);
-    else if (this.newer) {
+    }
+    if (this.newer) {
       args.push("newer=" + this.newer);
     }
-    if (this.full)
+    if (this.full) {
       args.push("full=1");
-    if (this.sort)
+    }
+    if (this.sort) {
       args.push("sort=" + this.sort);
-    if (this.ids != null)
+    }
+    if (this.ids != null) {
       args.push("ids=" + this.ids);
-    if (this.limit > 0 && this.limit != Infinity)
+    }
+    if (this.limit > 0 && this.limit != Infinity) {
       args.push("limit=" + this.limit);
-    if (this._batch)
+    }
+    if (this._batch) {
       args.push("batch=" + encodeURIComponent(this._batch));
-    if (this._commit)
+    }
+    if (this._commit) {
       args.push("commit=true");
-    if (this._offset)
+    }
+    if (this._offset) {
       args.push("offset=" + encodeURIComponent(this._offset));
+    }
 
     this.uri.query = (args.length > 0) ? "?" + args.join("&") : "";
   },
 
   // get full items
   get full() { return this._full; },
   set full(value) {
     this._full = value;
--- a/services/sync/tests/unit/head_http_server.js
+++ b/services/sync/tests/unit/head_http_server.js
@@ -2,16 +2,17 @@
 /* import-globals-from ../../../common/tests/unit/head_helpers.js */
 /* import-globals-from head_helpers.js */
 
 var Cm = Components.manager;
 
 // Shared logging for all HTTP server functions.
 Cu.import("resource://gre/modules/Log.jsm");
 Cu.import("resource://services-common/utils.js");
+Cu.import("resource://testing-common/TestUtils.jsm");
 const SYNC_HTTP_LOGGER = "Sync.Test.Server";
 
 // While the sync code itself uses 1.5, the tests hard-code 1.1,
 // so we're sticking with 1.1 here.
 const SYNC_API_VERSION = "1.1";
 
 // Use the same method that record.js does, which mirrors the server.
 // The server returns timestamps with 1/100 sec granularity. Note that this is
@@ -323,19 +324,19 @@ ServerCollection.prototype = {
         break;
 
       default:
         if (options.sort) {
           this._log.error("Error: client requesting unknown sort order",
                           options.sort);
           throw new Error("Unknown sort order");
         }
-        // If the client didn't request a sort order, sort newest first,
-        // since `test_history_engine` currently depends on this.
-        data.sort((a, b) => b.modified - a.modified);
+        // If the client didn't request a sort order, shuffle the records
+        // to ensure that we don't accidentally depend on the default order.
+        TestUtils.shuffle(data);
     }
     if (options.full) {
       data = data.map(wbo => wbo.get());
       let start = options.offset || 0;
       if (options.limit) {
         let numItemsPastOffset = data.length - start;
         data = data.slice(start, start + options.limit);
         // use options as a backchannel to set x-weave-next-offset
--- a/services/sync/tests/unit/test_history_engine.js
+++ b/services/sync/tests/unit/test_history_engine.js
@@ -33,47 +33,47 @@ add_task(async function test_history_dow
         type: PlacesUtils.history.TRANSITIONS.LINK,
       }],
     }), lastSync + 1 + i);
     wbo.sortindex = 15 - i;
     collection.insertWBO(wbo);
   }
 
   // We have 15 records on the server since the last sync, but our download
-  // limit is 5 records at a time.
+  // limit is 5 records at a time. We should eventually fetch all 15.
   engine.lastSync = lastSync;
   engine.downloadBatchSize = 4;
   engine.downloadLimit = 5;
 
   // Don't actually fetch any backlogged records, so that we can inspect
   // the backlog between syncs.
   engine.guidFetchBatchSize = 0;
 
   let ping = await sync_engine_and_validate_telem(engine, false);
   deepEqual(ping.engines[0].incoming, { applied: 5 });
 
   let backlogAfterFirstSync = engine.toFetch.slice(0);
   deepEqual(backlogAfterFirstSync, ["place0000000", "place0000001",
-    "place0000002", "place0000003", "place0000004"]);
+    "place0000002", "place0000003", "place0000004", "place0000005",
+    "place0000006", "place0000007", "place0000008", "place0000009"]);
 
   // We should have fast-forwarded the last sync time.
   equal(engine.lastSync, lastSync + 15);
 
   engine.lastModified = collection.modified;
   ping = await sync_engine_and_validate_telem(engine, false);
   ok(!ping.engines[0].incoming);
 
   // After the second sync, our backlog still contains the same GUIDs: we
   // weren't able to make progress on fetching them, since our
   // `guidFetchBatchSize` is 0.
   let backlogAfterSecondSync = engine.toFetch.slice(0);
   deepEqual(backlogAfterFirstSync, backlogAfterSecondSync);
 
-  // Now add a newer record to the server. We should download and apply it, even
-  // though we've backlogged records with higher sort indices.
+  // Now add a newer record to the server.
   let newWBO = new ServerWBO("placeAAAAAAA", encryptPayload({
     id: "placeAAAAAAA",
     histUri: "http://example.com/a",
     title: "New Page A",
     visits: [{
       date: Date.now() * 1000,
       type: PlacesUtils.history.TRANSITIONS.TYPED,
     }],
@@ -94,13 +94,18 @@ add_task(async function test_history_dow
   // Bump the fetch batch size to let the backlog make progress. We should
   // make 3 requests to fetch 5 backlogged GUIDs.
   engine.guidFetchBatchSize = 2;
 
   engine.lastModified = collection.modified;
   ping = await sync_engine_and_validate_telem(engine, false);
   deepEqual(ping.engines[0].incoming, { applied: 5 });
 
-  deepEqual(engine.toFetch, []);
+  deepEqual(engine.toFetch, ["place0000005", "place0000006", "place0000007",
+    "place0000008", "place0000009"]);
 
-  // Note that we'll only have fetched *at most* 10 records: we'll never fetch
-  // the remaining 5, because they're not in our backlog.
+  // Sync again to clear out the backlog.
+  engine.lastModified = collection.modified;
+  ping = await sync_engine_and_validate_telem(engine, false);
+  deepEqual(ping.engines[0].incoming, { applied: 5 });
+
+  deepEqual(engine.toFetch, []);
 });
--- a/services/sync/tests/unit/test_syncengine_sync.js
+++ b/services/sync/tests/unit/test_syncengine_sync.js
@@ -1,11 +1,12 @@
 /* Any copyright is dedicated to the Public Domain.
  * http://creativecommons.org/publicdomain/zero/1.0/ */
 
+Cu.import("resource://gre/modules/PlacesSyncUtils.jsm");
 Cu.import("resource://services-sync/constants.js");
 Cu.import("resource://services-sync/engines.js");
 Cu.import("resource://services-sync/main.js");
 Cu.import("resource://services-sync/policies.js");
 Cu.import("resource://services-sync/record.js");
 Cu.import("resource://services-sync/resource.js");
 Cu.import("resource://services-sync/service.js");
 Cu.import("resource://services-sync/util.js");
@@ -660,141 +661,38 @@ add_task(async function test_processInco
     do_check_eq(engine._store.items.failed2, "Record No. 2");
     do_check_eq(engine.previousFailed.length, 0);
   } finally {
     await cleanAndGo(engine, server);
   }
 });
 
 
-add_task(async function test_processIncoming_applyIncomingBatchSize_smaller() {
-  _("Ensure that a number of incoming items less than applyIncomingBatchSize is still applied.");
-
-  // Engine that doesn't like the first and last record it's given.
-  const APPLY_BATCH_SIZE = 10;
-  let engine = makeRotaryEngine();
-  engine.applyIncomingBatchSize = APPLY_BATCH_SIZE;
-  engine._store._applyIncomingBatch = engine._store.applyIncomingBatch;
-  engine._store.applyIncomingBatch = async function(records) {
-    let failed1 = records.shift();
-    let failed2 = records.pop();
-    await this._applyIncomingBatch(records);
-    return [failed1.id, failed2.id];
-  };
-
-  // Let's create less than a batch worth of server side records.
-  let collection = new ServerCollection();
-  for (let i = 0; i < APPLY_BATCH_SIZE - 1; i++) {
-    let id = "record-no-" + i;
-    let payload = encryptPayload({id, denomination: "Record No. " + id});
-    collection.insert(id, payload);
-  }
-
-  let server = sync_httpd_setup({
-      "/1.1/foo/storage/rotary": collection.handler()
-  });
-
-  await SyncTestingInfrastructure(server);
-
-  let meta_global = Service.recordManager.set(engine.metaURL,
-                                              new WBORecord(engine.metaURL));
-  meta_global.payload.engines = {rotary: {version: engine.version,
-                                         syncID: engine.syncID}};
-  try {
-
-    // Confirm initial environment
-    do_check_empty(engine._store.items);
-
-    await engine._syncStartup();
-    await engine._processIncoming();
-
-    // Records have been applied and the expected failures have failed.
-    do_check_attribute_count(engine._store.items, APPLY_BATCH_SIZE - 1 - 2);
-    do_check_eq(engine.toFetch.length, 0);
-    do_check_eq(engine.previousFailed.length, 2);
-    do_check_eq(engine.previousFailed[0], "record-no-0");
-    do_check_eq(engine.previousFailed[1], "record-no-8");
-
-  } finally {
-    await cleanAndGo(engine, server);
-  }
-});
-
-
-add_task(async function test_processIncoming_applyIncomingBatchSize_multiple() {
-  _("Ensure that incoming items are applied according to applyIncomingBatchSize.");
-
-  const APPLY_BATCH_SIZE = 10;
-
-  // Engine that applies records in batches.
-  let engine = makeRotaryEngine();
-  engine.applyIncomingBatchSize = APPLY_BATCH_SIZE;
-  let batchCalls = 0;
-  engine._store._applyIncomingBatch = engine._store.applyIncomingBatch;
-  engine._store.applyIncomingBatch = async function(records) {
-    batchCalls += 1;
-    do_check_eq(records.length, APPLY_BATCH_SIZE);
-    await this._applyIncomingBatch.apply(this, arguments);
-  };
-
-  // Let's create three batches worth of server side records.
-  let collection = new ServerCollection();
-  for (let i = 0; i < APPLY_BATCH_SIZE * 3; i++) {
-    let id = "record-no-" + i;
-    let payload = encryptPayload({id, denomination: "Record No. " + id});
-    collection.insert(id, payload);
-  }
-
-  let server = sync_httpd_setup({
-      "/1.1/foo/storage/rotary": collection.handler()
-  });
-
-  await SyncTestingInfrastructure(server);
-
-  let meta_global = Service.recordManager.set(engine.metaURL,
-                                              new WBORecord(engine.metaURL));
-  meta_global.payload.engines = {rotary: {version: engine.version,
-                                         syncID: engine.syncID}};
-  try {
-
-    // Confirm initial environment
-    do_check_empty(engine._store.items);
-
-    await engine._syncStartup();
-    await engine._processIncoming();
-
-    // Records have been applied in 3 batches.
-    do_check_eq(batchCalls, 3);
-    do_check_attribute_count(engine._store.items, APPLY_BATCH_SIZE * 3);
-
-  } finally {
-    await cleanAndGo(engine, server);
-  }
-});
-
-
 add_task(async function test_processIncoming_notify_count() {
   _("Ensure that failed records are reported only once.");
 
-  const APPLY_BATCH_SIZE = 5;
   const NUMBER_OF_RECORDS = 15;
 
-  // Engine that fails the first record.
+  // Engine that fails every 5 records.
   let engine = makeRotaryEngine();
-  engine.applyIncomingBatchSize = APPLY_BATCH_SIZE;
   engine._store._applyIncomingBatch = engine._store.applyIncomingBatch;
   engine._store.applyIncomingBatch = async function(records) {
-    await engine._store._applyIncomingBatch(records.slice(1));
-    return [records[0].id];
+    let sortedRecords = records.sort((a, b) => a.id > b.id ? 1 : -1);
+    let recordsToApply = [], recordsToFail = [];
+    for (let i = 0; i < sortedRecords.length; i++) {
+      (i % 5 === 0 ? recordsToFail : recordsToApply).push(sortedRecords[i]);
+    }
+    await engine._store._applyIncomingBatch(recordsToApply);
+    return recordsToFail.map(record => record.id);
   };
 
   // Create a batch of server side records.
   let collection = new ServerCollection();
   for (var i = 0; i < NUMBER_OF_RECORDS; i++) {
-    let id = "record-no-" + i;
+    let id = "record-no-" + i.toString(10).padStart(2, "0");
     let payload = encryptPayload({id, denomination: "Record No. " + id});
     collection.insert(id, payload);
   }
 
   let server = sync_httpd_setup({
       "/1.1/foo/storage/rotary": collection.handler()
   });
 
@@ -821,69 +719,69 @@ add_task(async function test_processInco
     Svc.Obs.add("weave:engine:sync:applied", onApplied);
 
     // Do sync.
     await engine._syncStartup();
     await engine._processIncoming();
 
     // Confirm failures.
     do_check_attribute_count(engine._store.items, 12);
-    do_check_eq(engine.previousFailed.length, 3);
-    do_check_eq(engine.previousFailed[0], "record-no-0");
-    do_check_eq(engine.previousFailed[1], "record-no-5");
-    do_check_eq(engine.previousFailed[2], "record-no-10");
+    do_check_matches(engine.previousFailed, ["record-no-00", "record-no-05",
+      "record-no-10"]);
 
     // There are newly failed records and they are reported.
     do_check_eq(called, 1);
     do_check_eq(counts.failed, 3);
     do_check_eq(counts.applied, 15);
     do_check_eq(counts.newFailed, 3);
     do_check_eq(counts.succeeded, 12);
 
     // Sync again, 1 of the failed items are the same, the rest didn't fail.
     await engine._processIncoming();
 
     // Confirming removed failures.
     do_check_attribute_count(engine._store.items, 14);
-    do_check_eq(engine.previousFailed.length, 1);
-    do_check_eq(engine.previousFailed[0], "record-no-0");
+    do_check_matches(engine.previousFailed, ["record-no-00"]);
 
     do_check_eq(called, 2);
     do_check_eq(counts.failed, 1);
     do_check_eq(counts.applied, 3);
     do_check_eq(counts.newFailed, 0);
     do_check_eq(counts.succeeded, 2);
 
     Svc.Obs.remove("weave:engine:sync:applied", onApplied);
   } finally {
     await cleanAndGo(engine, server);
   }
 });
 
 
 add_task(async function test_processIncoming_previousFailed() {
   _("Ensure that failed records are retried.");
-  Svc.Prefs.set("client.type", "mobile");
 
-  const APPLY_BATCH_SIZE = 4;
   const NUMBER_OF_RECORDS = 14;
 
-  // Engine that fails the first 2 records.
+  // Engine that alternates between failing and applying every 2 records.
   let engine = makeRotaryEngine();
-  engine.mobileGUIDFetchBatchSize = engine.applyIncomingBatchSize = APPLY_BATCH_SIZE;
   engine._store._applyIncomingBatch = engine._store.applyIncomingBatch;
   engine._store.applyIncomingBatch = async function(records) {
-    await engine._store._applyIncomingBatch(records.slice(2));
-    return [records[0].id, records[1].id];
+    let sortedRecords = records.sort((a, b) => a.id > b.id ? 1 : -1);
+    let recordsToApply = [], recordsToFail = [];
+    let chunks = Array.from(PlacesSyncUtils.chunkArray(sortedRecords, 2));
+    for (let i = 0; i < chunks.length; i++) {
+      (i % 2 === 0 ? recordsToFail : recordsToApply).push(...chunks[i]);
+    }
+    await engine._store._applyIncomingBatch(recordsToApply);
+    return recordsToFail.map(record => record.id);
   };
 
   // Create a batch of server side records.
   let collection = new ServerCollection();
   for (var i = 0; i < NUMBER_OF_RECORDS; i++) {
-    let id = "record-no-" + i;
+    let id = "record-no-" + i.toString(10).padStart(2, "0");
     let payload = encryptPayload({id, denomination: "Record No. " + i});
     collection.insert(id, payload);
   }
 
   let server = sync_httpd_setup({
       "/1.1/foo/storage/rotary": collection.handler()
   });
 
@@ -906,41 +804,32 @@ add_task(async function test_processInco
     do_check_eq(engine.previousFailed, previousFailed);
 
     // Do sync.
     await engine._syncStartup();
     await engine._processIncoming();
 
     // Expected result: 4 sync batches with 2 failures each => 8 failures
     do_check_attribute_count(engine._store.items, 6);
-    do_check_eq(engine.previousFailed.length, 8);
-    do_check_eq(engine.previousFailed[0], "record-no-0");
-    do_check_eq(engine.previousFailed[1], "record-no-1");
-    do_check_eq(engine.previousFailed[2], "record-no-4");
-    do_check_eq(engine.previousFailed[3], "record-no-5");
-    do_check_eq(engine.previousFailed[4], "record-no-8");
-    do_check_eq(engine.previousFailed[5], "record-no-9");
-    do_check_eq(engine.previousFailed[6], "record-no-12");
-    do_check_eq(engine.previousFailed[7], "record-no-13");
+    do_check_matches(engine.previousFailed, ["record-no-00", "record-no-01",
+      "record-no-04", "record-no-05", "record-no-08", "record-no-09",
+      "record-no-12", "record-no-13"]);
 
     // Sync again with the same failed items (records 0, 1, 8, 9).
     await engine._processIncoming();
 
     // A second sync with the same failed items should not add the same items again.
     // Items that did not fail a second time should no longer be in previousFailed.
     do_check_attribute_count(engine._store.items, 10);
-    do_check_eq(engine.previousFailed.length, 4);
-    do_check_eq(engine.previousFailed[0], "record-no-0");
-    do_check_eq(engine.previousFailed[1], "record-no-1");
-    do_check_eq(engine.previousFailed[2], "record-no-8");
-    do_check_eq(engine.previousFailed[3], "record-no-9");
+    do_check_matches(engine.previousFailed, ["record-no-00", "record-no-01",
+      "record-no-08", "record-no-09"]);
 
     // Refetched items that didn't fail the second time are in engine._store.items.
-    do_check_eq(engine._store.items["record-no-4"], "Record No. 4");
-    do_check_eq(engine._store.items["record-no-5"], "Record No. 5");
+    do_check_eq(engine._store.items["record-no-04"], "Record No. 4");
+    do_check_eq(engine._store.items["record-no-05"], "Record No. 5");
     do_check_eq(engine._store.items["record-no-12"], "Record No. 12");
     do_check_eq(engine._store.items["record-no-13"], "Record No. 13");
   } finally {
     await cleanAndGo(engine, server);
   }
 });
 
 
@@ -966,17 +855,16 @@ add_task(async function test_processInco
                          "record-no-" + 23,
                          "record-no-" + (42 + APPLY_BATCH_SIZE),
                          "record-no-" + (23 + APPLY_BATCH_SIZE),
                          "record-no-" + (42 + APPLY_BATCH_SIZE * 2),
                          "record-no-" + (23 + APPLY_BATCH_SIZE * 2),
                          "record-no-" + (2 + APPLY_BATCH_SIZE * 3),
                          "record-no-" + (1 + APPLY_BATCH_SIZE * 3)];
   let engine = makeRotaryEngine();
-  engine.applyIncomingBatchSize = APPLY_BATCH_SIZE;
 
   engine.__reconcile = engine._reconcile;
   engine._reconcile = async function _reconcile(record) {
     if (BOGUS_RECORDS.indexOf(record.id) % 2 == 0) {
       throw new Error("I don't like this record! Baaaaaah!");
     }
     return this.__reconcile.apply(this, arguments);
   };
@@ -1030,21 +918,17 @@ add_task(async function test_processInco
     await engine._processIncoming();
 
     // Ensure that all records but the bogus 4 have been applied.
     do_check_attribute_count(engine._store.items,
                              NUMBER_OF_RECORDS - BOGUS_RECORDS.length);
 
     // Ensure that the bogus records will be fetched again on the next sync.
     do_check_eq(engine.previousFailed.length, BOGUS_RECORDS.length);
-    engine.previousFailed.sort();
-    BOGUS_RECORDS.sort();
-    for (let i = 0; i < engine.previousFailed.length; i++) {
-      do_check_eq(engine.previousFailed[i], BOGUS_RECORDS[i]);
-    }
+    do_check_matches(engine.previousFailed.sort(), BOGUS_RECORDS.sort());
 
     // Ensure the observer was notified
     do_check_eq(observerData, engine.name);
     do_check_eq(observerSubject.failed, BOGUS_RECORDS.length);
     do_check_eq(observerSubject.newFailed, BOGUS_RECORDS.length);
 
     // Testing batching of failed item fetches.
     // Try to sync again. Ensure that we split the request into chunks to avoid
--- a/testing/modules/TestUtils.jsm
+++ b/testing/modules/TestUtils.jsm
@@ -137,9 +137,19 @@ this.TestUtils = {
         if (conditionPassed) {
           clearInterval(intervalID);
           resolve();
         }
         tries++;
       }, interval);
     });
   },
+
+  shuffle(array) {
+    let results = [];
+    for (let i = 0; i < array.length; ++i) {
+      let randomIndex = Math.floor(Math.random() * (i + 1));
+      results[i] = results[randomIndex];
+      results[randomIndex] = array[i];
+    }
+    return results;
+  },
 };
--- a/toolkit/components/places/PlacesSyncUtils.jsm
+++ b/toolkit/components/places/PlacesSyncUtils.jsm
@@ -19,17 +19,37 @@ XPCOMUtils.defineLazyModuleGetter(this, 
                                   "resource://gre/modules/PlacesUtils.jsm");
 
 /**
  * This module exports functions for Sync to use when applying remote
  * records. The calls are similar to those in `Bookmarks.jsm` and
  * `nsINavBookmarksService`, with special handling for smart bookmarks,
  * tags, keywords, synced annotations, and missing parents.
  */
-var PlacesSyncUtils = {};
+var PlacesSyncUtils = {
+  /**
+   * Auxiliary generator function that yields an array in chunks
+   *
+   * @param  array
+   * @param  chunkLength
+   * @yields {Array}
+   *         New Array with the next chunkLength elements of array.
+   *         If the array has less than chunkLength elements, yields all of them
+   */
+  * chunkArray(array, chunkLength) {
+    if (!array.length || chunkLength <= 0) {
+      return;
+    }
+    let startIndex = 0;
+    while (startIndex < array.length) {
+      yield array.slice(startIndex, startIndex + chunkLength);
+      startIndex += chunkLength;
+    }
+  },
+};
 
 const { SOURCE_SYNC } = Ci.nsINavBookmarksService;
 
 const MICROSECONDS_PER_SECOND = 1000000;
 const SQLITE_MAX_VARIABLE_NUMBER = 999;
 
 const ORGANIZER_QUERY_ANNO = "PlacesOrganizer/OrganizerQuery";
 const ORGANIZER_ALL_BOOKMARKS_ANNO_VALUE = "AllBookmarks";
@@ -55,30 +75,16 @@ XPCOMUtils.defineLazyGetter(this, "ROOT_
   [PlacesUtils.bookmarks.unfiledGuid]: "unfiled",
   [PlacesUtils.bookmarks.mobileGuid]: "mobile",
 }));
 
 XPCOMUtils.defineLazyGetter(this, "ROOTS", () =>
   Object.keys(ROOT_SYNC_ID_TO_GUID)
 );
 
-/**
- * Auxiliary generator function that yields an array in chunks
- *
- * @param array
- * @param chunkLength
- * @yields {Array} New Array with the next chunkLength elements of array. If the array has less than chunkLength elements, yields all of them
- */
-function* chunkArray(array, chunkLength) {
-  let startIndex = 0;
-  while (startIndex < array.length) {
-    yield array.slice(startIndex, startIndex += chunkLength);
-  }
-}
-
 const HistorySyncUtils = PlacesSyncUtils.history = Object.freeze({
   /**
    * Clamps a history visit date between the current date and the earliest
    * sensible date.
    *
    * @param {Date} visitDate
    *        The visit date.
    * @return {Date} The clamped visit date.
@@ -124,17 +130,17 @@ const HistorySyncUtils = PlacesSyncUtils
    */
   async determineNonSyncableGuids(guids) {
     // Filter out hidden pages and `TRANSITION_FRAMED_LINK` visits. These are
     // excluded when rendering the history menu, so we use the same constraints
     // for Sync. We also don't want to sync `TRANSITION_EMBED` visits, but those
     // aren't stored in the database.
     let db = await PlacesUtils.promiseDBConnection();
     let nonSyncableGuids = [];
-    for (let chunk of chunkArray(guids, SQLITE_MAX_VARIABLE_NUMBER)) {
+    for (let chunk of PlacesSyncUtils.chunkArray(guids, SQLITE_MAX_VARIABLE_NUMBER)) {
       let rows = await db.execute(`
         SELECT DISTINCT p.guid FROM moz_places p
         JOIN moz_historyvisits v ON p.id = v.place_id
         WHERE p.guid IN (${new Array(chunk.length).fill("?").join(",")}) AND
             (p.hidden = 1 OR v.visit_type IN (0,
               ${PlacesUtils.history.TRANSITION_FRAMED_LINK}))
       `, chunk);
       nonSyncableGuids = nonSyncableGuids.concat(rows.map(row => row.getResultByName("guid")));