Bug 1337978 - Unify the multiple notions of 'weak upload' in sync. r?kitcambridge,markh draft
authorThom Chiovoloni <tchiovoloni@mozilla.com>
Tue, 18 Jul 2017 14:18:04 -0400
changeset 614553 d0cc9e7b08dda3b509c5a11fca41547e41acb50f
parent 610704 aa6ecfc0651723beb1afb51cb0594cb1429ee432
child 638895 88bd3707e28cef050632767be1d1f045e51dcb4b
push id70044
push userbmo:tchiovoloni@mozilla.com
push dateMon, 24 Jul 2017 18:57:28 +0000
reviewerskitcambridge, markh
bugs1337978
milestone56.0a1
Bug 1337978 - Unify the multiple notions of 'weak upload' in sync. r?kitcambridge,markh MozReview-Commit-ID: 5vTCAhUfMzm
services/sync/modules/bookmark_repair.js
services/sync/modules/engines.js
services/sync/modules/engines/bookmarks.js
services/sync/tests/unit/test_bookmark_engine.js
services/sync/tests/unit/test_bookmark_repair_responder.js
services/sync/tests/unit/test_bookmark_tracker.js
toolkit/components/places/PlacesSyncUtils.jsm
--- a/services/sync/modules/bookmark_repair.js
+++ b/services/sync/modules/bookmark_repair.js
@@ -181,17 +181,17 @@ class BookmarkRepairRequestor extends Co
   }
 
   tryServerOnlyRepairs(validationInfo) {
     if (this._countServerOnlyFixableProblems(validationInfo) == 0) {
       return false;
     }
     let engine = this.service.engineManager.get("bookmarks");
     for (let id of validationInfo.problems.serverMissing) {
-      engine._modified.setWeak(id, { tombstone: false });
+      engine.addForWeakUpload(id);
     }
     let toFetch = engine.toFetch.concat(validationInfo.problems.clientMissing,
                                         validationInfo.problems.serverDeleted);
     engine.toFetch = Array.from(new Set(toFetch));
     return true;
   }
 
   /* See if the repairer is willing and able to begin a repair process given
@@ -569,34 +569,35 @@ class BookmarkRepairResponder extends Co
     // progress as we don't do anything too smart that could cause problems,
     // but just upload items. If we get any smarter we should re-think this
     // (but when we do, note that checking this._currentState isn't enough as
     // this responder is not a singleton)
 
     this._currentState = {
       request,
       rawCommand,
+      processedCommand: false,
       ids: [],
     }
 
     try {
       let engine = this.service.engineManager.get("bookmarks");
       let { toUpload, toDelete } = await this._fetchItemsToUpload(request);
 
       if (toUpload.size || toDelete.size) {
         log.debug(`repair request will upload ${toUpload.size} items and delete ${toDelete.size} items`);
         // whew - now add these items to the tracker "weakly" (ie, they will not
         // persist in the case of a restart, but that's OK - we'll then end up here
         // again) and also record them in the response we send back.
         for (let id of toUpload) {
-          engine._modified.setWeak(id, { tombstone: false });
+          engine.addForWeakUpload(id);
           this._currentState.ids.push(id);
         }
         for (let id of toDelete) {
-          engine._modified.setWeak(id, { tombstone: true });
+          engine.addForWeakUpload(id, { forceTombstone: true });
           this._currentState.ids.push(id);
         }
 
         // We have arranged for stuff to be uploaded, so wait until that's done.
         Svc.Obs.add("weave:engine:sync:uploaded", this.onUploaded, this);
         // and record in telemetry that we got this far - just incase we never
         // end up doing the upload for some obscure reason.
         let eventExtra = {
@@ -685,30 +686,33 @@ class BookmarkRepairResponder extends Co
     return { toUpload, toDelete };
   }
 
   onUploaded(subject, data) {
     if (data != "bookmarks") {
       return;
     }
     Svc.Obs.remove("weave:engine:sync:uploaded", this.onUploaded, this);
-    log.debug(`bookmarks engine has uploaded stuff - creating a repair response`);
+    if (subject.failed) {
+      return;
+    }
+    log.debug(`bookmarks engine has uploaded stuff - creating a repair response`, subject);
     Async.promiseSpinningly(this._finishRepair());
   }
 
   async _finishRepair() {
     let clientsEngine = this.service.clientsEngine;
     let flowID = this._currentState.request.flowID;
     let response = {
       request: this._currentState.request.request,
       collection: "bookmarks",
       clientID: clientsEngine.localID,
       flowID,
       ids: this._currentState.ids,
-    }
+    };
     let clientID = this._currentState.request.requestor;
     await clientsEngine.sendCommand("repairResponse", [response], clientID, { flowID });
     // and nuke the request from our client.
     await clientsEngine.removeLocalCommand(this._currentState.rawCommand);
     let eventExtra = {
       flowID,
       numIDs: response.ids.length.toString(),
     }
--- a/services/sync/modules/engines.js
+++ b/services/sync/modules/engines.js
@@ -760,32 +760,43 @@ Engine.prototype = {
   },
 };
 
 this.SyncEngine = function SyncEngine(name, service) {
   Engine.call(this, name || "SyncEngine", service);
 
   // Async initializations can be made in the initialize() method.
 
-  // The set of records needing a weak reupload.
-  // The difference between this and a "normal" reupload is that these records
-  // are only tracked in memory, and if the reupload attempt fails (shutdown,
-  // 412, etc), we abort uploading the "weak" set.
+  // The map of ids => metadata for records needing a weak upload.
+  //
+  // Currently the "metadata" fields are:
+  //
+  // - forceTombstone: whether or not we should ignore the local information
+  //   about the record, and write a tombstone for it anyway -- e.g. in the case
+  //   of records that should exist locally, but should never be uploaded to the
+  //   server (note that not all sync engines support tombstones)
+  //
+  // The difference between this and a "normal" upload is that these records
+  // are only tracked in memory, and if the upload attempt fails (shutdown,
+  // 412, etc), we abort uploading the "weak" set (by clearing the map).
   //
   // The rationale here is for the cases where we receive a record from the
   // server that we know is wrong in some (small) way. For example, the
   // dateAdded field on bookmarks -- maybe we have a better date, or the server
   // record is entirely missing the date, etc.
   //
   // In these cases, we fix our local copy of the record, and mark it for
-  // weak reupload. A normal ("strong") reupload is problematic here because
+  // weak upload. A normal ("strong") upload is problematic here because
   // in the case of a conflict from the server, there's a window where our
   // record would be marked as modified more recently than a change that occurs
   // on another device change, and we lose data from the user.
-  this._needWeakReupload = new Set();
+  //
+  // Additionally, we use this as the set of items to upload for bookmark
+  // repair reponse, which has similar constraints.
+  this._needWeakUpload = new Map();
 }
 
 // Enumeration to define approaches to handling bad records.
 // Attached to the constructor to allow use as a kind of static enumeration.
 SyncEngine.kRecoveryStrategy = {
   ignore: "ignore",
   retry:  "retry",
   error:  "error"
@@ -961,16 +972,20 @@ SyncEngine.prototype = {
   _createTombstone(id) {
     let tombstone = new this._recordObj(this.name, id);
     tombstone.id = id;
     tombstone.collection = this.name;
     tombstone.deleted = true;
     return tombstone;
   },
 
+  addForWeakUpload(id, { forceTombstone = false } = {}) {
+    this._needWeakUpload.set(id, { forceTombstone });
+  },
+
   // Any setup that needs to happen at the beginning of each sync.
   async _syncStartup() {
 
     // Determine if we need to wipe on outdated versions
     let metaGlobal = await this.service.recordManager.get(this.metaURL);
     let engines = metaGlobal.payload.engines || {};
     let engineData = engines[this.name] || {};
 
@@ -1029,17 +1044,16 @@ SyncEngine.prototype = {
     // to upload back.
     this._tracker.clearChangedIDs();
 
     this._log.info(this._modified.count() +
                    " outgoing items pre-reconciliation");
 
     // Keep track of what to delete at the end of sync
     this._delete = {};
-    this._needWeakReupload.clear();
   },
 
   /**
    * A tiny abstraction to make it easier to test incoming record
    * application.
    */
   itemSource() {
     return new Collection(this.engineURL, this._recordObj, this.service);
@@ -1579,23 +1593,26 @@ SyncEngine.prototype = {
   },
 
   // Upload outgoing records.
   async _uploadOutgoing() {
     this._log.trace("Uploading local changes to server.");
 
     // collection we'll upload
     let up = new Collection(this.engineURL, null, this.service);
-    let modifiedIDs = this._modified.ids();
+    let modifiedIDs = new Set(this._modified.ids());
+    for (let id of this._needWeakUpload.keys()) {
+      modifiedIDs.add(id);
+    }
     let counts = { failed: 0, sent: 0 };
-    if (modifiedIDs.length) {
-      this._log.trace("Preparing " + modifiedIDs.length +
+    if (modifiedIDs.size) {
+      this._log.trace("Preparing " + modifiedIDs.size +
                       " outgoing records");
 
-      counts.sent = modifiedIDs.length;
+      counts.sent = modifiedIDs.size;
 
       let failed = [];
       let successful = [];
       let handleResponse = async (resp, batchOngoing = false) => {
         // Note: We don't want to update this.lastSync, or this._modified until
         // the batch is complete, however we want to remember success/failure
         // indicators for when that happens.
         if (!resp.success) {
@@ -1637,116 +1654,68 @@ SyncEngine.prototype = {
       };
 
       let postQueue = up.newPostQueue(this._log, this.lastSync, handleResponse);
 
       for (let id of modifiedIDs) {
         let out;
         let ok = false;
         try {
-          out = await this._createRecord(id);
+          let { forceTombstone = false } = this._needWeakUpload.get(id) || {};
+          if (forceTombstone) {
+            out = await this._createTombstone(id);
+          } else {
+            out = await this._createRecord(id);
+          }
           if (this._log.level <= Log.Level.Trace)
             this._log.trace("Outgoing: " + out);
 
           out.encrypt(this.service.collectionKeys.keyForCollection(this.name));
           let payloadLength = JSON.stringify(out.payload).length;
           if (payloadLength > this.maxRecordPayloadBytes) {
             if (this.allowSkippedRecord) {
               this._modified.delete(id); // Do not attempt to sync that record again
             }
             throw new Error(`Payload too big: ${payloadLength} bytes`);
           }
           ok = true;
         } catch (ex) {
           this._log.warn("Error creating record", ex);
           ++counts.failed;
           if (Async.isShutdownException(ex) || !this.allowSkippedRecord) {
-            Observers.notify("weave:engine:sync:uploaded", counts, this.name);
+            if (!this.allowSkippedRecord) {
+              // Don't bother for shutdown errors
+              Observers.notify("weave:engine:sync:uploaded", counts, this.name);
+            }
             throw ex;
           }
         }
-        this._needWeakReupload.delete(id);
         if (ok) {
           let { enqueued, error } = await postQueue.enqueue(out);
           if (!enqueued) {
             ++counts.failed;
             if (!this.allowSkippedRecord) {
+              Observers.notify("weave:engine:sync:uploaded", counts, this.name);
               throw error;
             }
             this._modified.delete(id);
             this._log.warn(`Failed to enqueue record "${id}" (skipping)`, error);
           }
         }
         await Async.promiseYield();
       }
       await postQueue.flush(true);
     }
+    this._needWeakUpload.clear();
 
-    if (this._needWeakReupload.size) {
-      try {
-        const { sent, failed } = await this._weakReupload(up);
-        counts.sent += sent;
-        counts.failed += failed;
-      } catch (e) {
-        if (Async.isShutdownException(e)) {
-          throw e;
-        }
-        this._log.warn("Weak reupload failed", e);
-      }
-    }
     if (counts.sent || counts.failed) {
       Observers.notify("weave:engine:sync:uploaded", counts, this.name);
     }
   },
 
-  async _weakReupload(collection) {
-    const counts = { sent: 0, failed: 0 };
-    let pendingSent = 0;
-    let postQueue = collection.newPostQueue(this._log, this.lastSync, (resp, batchOngoing = false) => {
-      if (!resp.success) {
-        this._needWeakReupload.clear();
-        this._log.warn("Uploading records (weak) failed: " + resp);
-        resp.failureCode = resp.status == 412 ? ENGINE_BATCH_INTERRUPTED : ENGINE_UPLOAD_FAIL;
-        throw resp;
-      }
-      if (!batchOngoing) {
-        counts.sent += pendingSent;
-        pendingSent = 0;
-      }
-    });
-
-    let pendingWeakReupload = await this.buildWeakReuploadMap(this._needWeakReupload);
-    for (let [id, encodedRecord] of pendingWeakReupload) {
-      try {
-        this._log.trace("Outgoing (weak)", encodedRecord);
-        encodedRecord.encrypt(this.service.collectionKeys.keyForCollection(this.name));
-      } catch (ex) {
-        if (Async.isShutdownException(ex)) {
-          throw ex;
-        }
-        this._log.warn(`Failed to encrypt record "${id}" during weak reupload`, ex);
-        ++counts.failed;
-        continue;
-      }
-      // Note that general errors (network error, 412, etc.) will throw here.
-      // `enqueued` is only false if the specific item failed to enqueue, but
-      // other items should be/are fine. For example, enqueued will be false if
-      // it is larger than the max post or WBO size.
-      let { enqueued } = await postQueue.enqueue(encodedRecord);
-      if (!enqueued) {
-        ++counts.failed;
-      } else {
-        ++pendingSent;
-      }
-      await Async.promiseYield();
-    }
-    await postQueue.flush(true);
-    return counts;
-  },
-
   async _onRecordsWritten(succeeded, failed) {
     // Implement this method to take specific actions against successfully
     // uploaded records and failed records.
   },
 
   // Any cleanup necessary.
   // Save the current snapshot so as to calculate changes at next sync
   async _syncFinish() {
@@ -1772,17 +1741,17 @@ SyncEngine.prototype = {
           await doDelete(key, val.slice(0, 100));
           val = val.slice(100);
         }
       }
     }
   },
 
   async _syncCleanup() {
-    this._needWeakReupload.clear();
+    this._needWeakUpload.clear();
     if (!this._modified) {
       return;
     }
 
     try {
       // Mark failed WBOs as changed again so they are reuploaded next time.
       await this.trackRemainingChanges();
     } finally {
@@ -1832,16 +1801,17 @@ SyncEngine.prototype = {
 
     return canDecrypt;
   },
 
   async _resetClient() {
     this.resetLastSync();
     this.previousFailed = [];
     this.toFetch = [];
+    this._needWeakUpload.clear();
   },
 
   async wipeServer() {
     let response = await this.service.resource(this.engineURL).delete();
     if (response.status != 200 && response.status != 404) {
       throw response;
     }
     await this._resetClient();
@@ -1910,40 +1880,16 @@ SyncEngine.prototype = {
    * items that failed to upload. This method is called at the end of each sync.
    *
    */
   async trackRemainingChanges() {
     for (let [id, change] of this._modified.entries()) {
       this._tracker.addChangedID(id, change);
     }
   },
-
-  /**
-   * Returns a map of (id, unencrypted record) that will be used to perform
-   * the weak reupload. Subclasses may override this to filter out items we
-   * shouldn't upload as part of a weak reupload (items that have changed,
-   * for example).
-   */
-  async buildWeakReuploadMap(idSet) {
-    let result = new Map();
-    let maybeYield = Async.jankYielder();
-    for (let id of idSet) {
-      await maybeYield();
-      try {
-        let record = await this._createRecord(id);
-        result.set(id, record);
-      } catch (ex) {
-        if (Async.isShutdownException(ex)) {
-          throw ex;
-        }
-        this._log.warn("createRecord failed during weak reupload", ex);
-      }
-    }
-    return result;
-  }
 };
 
 /**
  * A changeset is created for each sync in `Engine::get{Changed, All}IDs`,
  * and stores opaque change data for tracked IDs. The default implementation
  * only records timestamps, though engines can extend this to store additional
  * data for each entry.
  */
--- a/services/sync/modules/engines/bookmarks.js
+++ b/services/sync/modules/engines/bookmarks.js
@@ -546,43 +546,16 @@ BookmarksEngine.prototype = {
       // Make sure deleted items are marked as tombstones. We do this here
       // in addition to the `isTombstone` call above because it's possible
       // a changed bookmark might be deleted during a sync (bug 1313967).
       this._modified.setTombstone(record.id);
     }
     return record;
   },
 
-  async buildWeakReuploadMap(idSet) {
-    // We want to avoid uploading records which have changed, since that could
-    // cause an inconsistent state on the server.
-    //
-    // Strictly speaking, it would be correct to just call getChangedIds() after
-    // building the initial weak reupload map, however this is quite slow, since
-    // we might end up doing createRecord() (which runs at least one, and
-    // sometimes multiple database queries) for a potentially large number of
-    // items.
-    //
-    // Since the call to getChangedIds is relatively cheap, we do it once before
-    // building the weakReuploadMap (which is where the calls to createRecord()
-    // occur) as an optimization, and once after for correctness, to handle the
-    // unlikely case that a record was modified while we were building the map.
-    let initialChanges = await PlacesSyncUtils.bookmarks.getChangedIds();
-    for (let changed of initialChanges) {
-      idSet.delete(changed);
-    }
-
-    let map = await SyncEngine.prototype.buildWeakReuploadMap.call(this, idSet);
-    let changes = await PlacesSyncUtils.bookmarks.getChangedIds();
-    for (let id of changes) {
-      map.delete(id);
-    }
-    return map;
-  },
-
   async _findDupe(item) {
     this._log.trace("Finding dupe for " + item.id +
                     " (already duped: " + item.hasDupe + ").");
 
     // Don't bother finding a dupe if the incoming item has duplicates.
     if (item.hasDupe) {
       this._log.trace(item.id + " already a dupe: not finding one.");
       return null;
@@ -721,34 +694,34 @@ BookmarksStore.prototype = {
     // This can throw if we're inserting an invalid or incomplete bookmark.
     // That's fine; the exception will be caught by `applyIncomingBatch`
     // without aborting further processing.
     let item = await PlacesSyncUtils.bookmarks.insert(info);
     if (item) {
       this._log.trace(`Created ${item.kind} ${item.syncId} under ${
         item.parentSyncId}`, item);
       if (item.dateAdded != record.dateAdded) {
-        this.engine._needWeakReupload.add(item.syncId);
+        this.engine.addForWeakUpload(item.syncId);
       }
     }
   },
 
   async remove(record) {
     this._log.trace(`Buffering removal of item "${record.id}".`);
     this._itemsToDelete.add(record.id);
   },
 
   async update(record) {
     let info = record.toSyncBookmark();
     let item = await PlacesSyncUtils.bookmarks.update(info);
     if (item) {
       this._log.trace(`Updated ${item.kind} ${item.syncId} under ${
         item.parentSyncId}`, item);
       if (item.dateAdded != record.dateAdded) {
-        this.engine._needWeakReupload.add(item.syncId);
+        this.engine.addForWeakUpload(item.syncId);
       }
     }
   },
 
   async _orderChildren() {
     for (let syncID in this._childrenToOrder) {
       let children = this._childrenToOrder[syncID];
       try {
@@ -1139,109 +1112,69 @@ BookmarksTracker.prototype = {
       this.score += SCORE_INCREMENT_XLARGE;
       this._batchSawScoreIncrement = false;
     }
   },
   onItemVisited() {}
 };
 
 class BookmarksChangeset extends Changeset {
-  constructor() {
-    super();
-    // Weak changes are part of the changeset, but don't bump the change
-    // counter, and aren't persisted anywhere.
-    this.weakChanges = {};
-  }
 
   getStatus(id) {
     let change = this.changes[id];
     if (!change) {
       return PlacesUtils.bookmarks.SYNC_STATUS.UNKNOWN;
     }
     return change.status;
   }
 
   getModifiedTimestamp(id) {
     let change = this.changes[id];
     if (change) {
       // Pretend the change doesn't exist if we've already synced or
       // reconciled it.
       return change.synced ? Number.NaN : change.modified;
     }
-    if (this.weakChanges[id]) {
-      // For weak changes, we use a timestamp from long ago to ensure we always
-      // prefer the remote version in case of conflicts.
-      return 0;
-    }
     return Number.NaN;
   }
 
-  setWeak(id, { tombstone = false } = {}) {
-    this.weakChanges[id] = { tombstone };
-  }
-
   has(id) {
     let change = this.changes[id];
     if (change) {
       return !change.synced;
     }
-    return !!this.weakChanges[id];
+    return false;
   }
 
   setTombstone(id) {
     let change = this.changes[id];
     if (change) {
       change.tombstone = true;
     }
-    let weakChange = this.weakChanges[id];
-    if (weakChange) {
-      // Not strictly necessary, since we never persist weak changes, but may
-      // be useful for bookkeeping.
-      weakChange.tombstone = true;
-    }
   }
 
   delete(id) {
     let change = this.changes[id];
     if (change) {
       // Mark the change as synced without removing it from the set. We do this
       // so that we can update Places in `trackRemainingChanges`.
       change.synced = true;
     }
-    delete this.weakChanges[id];
-  }
-
-  changeID(oldID, newID) {
-    super.changeID(oldID, newID);
-    this.weakChanges[newID] = this.weakChanges[oldID];
-    delete this.weakChanges[oldID];
   }
 
   ids() {
     let results = new Set();
     for (let id in this.changes) {
       if (!this.changes[id].synced) {
         results.add(id);
       }
     }
-    for (let id in this.weakChanges) {
-      results.add(id);
-    }
     return [...results];
   }
 
-  clear() {
-    super.clear();
-    this.weakChanges = {};
-  }
-
   isTombstone(id) {
     let change = this.changes[id];
     if (change) {
       return change.tombstone;
     }
-    let weakChange = this.weakChanges[id];
-    if (weakChange) {
-      return weakChange.tombstone;
-    }
     return false;
   }
 }
--- a/services/sync/tests/unit/test_bookmark_engine.js
+++ b/services/sync/tests/unit/test_bookmark_engine.js
@@ -639,23 +639,16 @@ add_task(async function test_sync_dateAd
     let item6 = new Bookmark("bookmarks", item6GUID);
     item6.bmkUri = "https://example.com/6";
     item6.title = "asdf6";
     item6.parentName = "Bookmarks Toolbar";
     item6.parentid = "toolbar";
     const item6LastModified = (now - oneYearMS) / 1000;
     collection.insert(item6GUID, encryptPayload(item6.cleartext), item6LastModified);
 
-    let origBuildWeakReuploadMap = engine.buildWeakReuploadMap;
-    engine.buildWeakReuploadMap = async (set) => {
-      let fullMap = await origBuildWeakReuploadMap.call(engine, set);
-      fullMap.delete(item6GUID);
-      return fullMap;
-    };
-
     await sync_engine_and_validate_telem(engine, false);
 
     let record1 = await store.createRecord(item1GUID);
     let record2 = await store.createRecord(item2GUID);
 
     equal(item1.dateAdded, record1.dateAdded, "dateAdded in past should be synced");
     equal(record2.dateAdded, item2LastModified * 1000, "dateAdded in future should be ignored in favor of last modified");
 
@@ -670,25 +663,16 @@ add_task(async function test_sync_dateAd
     let record4 = await store.createRecord(item4GUID);
     equal(record4.dateAdded, item4LastModified * 1000,
           "If no dateAdded is provided, lastModified should be used");
 
     let record5 = await store.createRecord(item5GUID);
     equal(record5.dateAdded, item5LastModified * 1000,
           "If no dateAdded is provided, lastModified should be used (even if it's in the future)");
 
-    let item6WBO = JSON.parse(JSON.parse(collection._wbos[item6GUID].payload).ciphertext);
-    ok(!item6WBO.dateAdded,
-       "If we think an item has been modified locally, we don't upload it to the server");
-
-    let record6 = await store.createRecord(item6GUID);
-    equal(record6.dateAdded, item6LastModified * 1000,
-       "We still remember the more accurate dateAdded if we don't upload a record due to local changes");
-    engine.buildWeakReuploadMap = origBuildWeakReuploadMap;
-
     // Update item2 and try resyncing it.
     item2.dateAdded = now - 100000;
     collection.insert(item2GUID, encryptPayload(item2.cleartext), now / 1000 - 50);
 
 
     // Also, add a local bookmark and make sure it's date added makes it up to the server
     let bzid = PlacesUtils.bookmarks.insertBookmark(
       PlacesUtils.bookmarksMenuFolderId, Utils.makeURI("https://bugzilla.mozilla.org/"),
--- a/services/sync/tests/unit/test_bookmark_repair_responder.js
+++ b/services/sync/tests/unit/test_bookmark_repair_responder.js
@@ -520,11 +520,96 @@ add_task(async function test_aborts_unkn
       extra: { flowID: request.flowID,
                reason: "Don't understand request type 'not-upload'",
              },
     },
   ]);
   await cleanup(server);
 });
 
+add_task(async function test_upload_fail() {
+  let server = await makeServer();
+
+  // Pretend we've already synced this bookmark, so that we can ensure it's
+  // uploaded in response to our repair request.
+  let bm = await PlacesUtils.bookmarks.insert({ parentGuid: PlacesUtils.bookmarks.unfiledGuid,
+                                                title: "Get Firefox",
+                                                url: "http://getfirefox.com/",
+                                                source: PlacesUtils.bookmarks.SOURCES.SYNC });
+
+  await Service.sync();
+  let request = {
+    request: "upload",
+    ids: [bm.guid],
+    flowID: Utils.makeGUID(),
+  }
+  let responder = new BookmarkRepairResponder();
+  await responder.repair(request, null);
+
+  checkRecordedEvents([
+    { object: "repairResponse",
+      method: "uploading",
+      value: undefined,
+      extra: {flowID: request.flowID, numIDs: "1"},
+    },
+  ]);
+
+  // This sync would normally upload the item - arrange for it to fail.
+  let engine = Service.engineManager.get("bookmarks");
+  let oldCreateRecord = engine._createRecord;
+  engine._createRecord = async function(id) {
+    return "anything"; // doesn't have an "encrypt"
+  }
+
+  let numFailures = 0;
+  let numSuccesses = 0;
+  function onUploaded(subject, data) {
+    if (data != "bookmarks") {
+      return;
+    }
+    if (subject.failed) {
+      numFailures += 1;
+    } else {
+      numSuccesses += 1;
+    }
+  }
+  Svc.Obs.add("weave:engine:sync:uploaded", onUploaded, this);
+
+  await Service.sync();
+
+  equal(numFailures, 1);
+  equal(numSuccesses, 0);
+
+  // should be no recorded events
+  checkRecordedEvents([]);
+
+  // restore the error injection so next sync succeeds - the repair should
+  // restart
+  engine._createRecord = oldCreateRecord;
+  await responder.repair(request, null);
+
+  checkRecordedEvents([
+    { object: "repairResponse",
+      method: "uploading",
+      value: undefined,
+      extra: {flowID: request.flowID, numIDs: "1"},
+    },
+  ]);
+
+  await Service.sync();
+  checkRecordedEvents([
+    { object: "repairResponse",
+      method: "finished",
+      value: undefined,
+      extra: {flowID: request.flowID, numIDs: "1"},
+    },
+  ])
+
+  equal(numFailures, 1);
+  equal(numSuccesses, 1);
+
+  Svc.Obs.remove("weave:engine:sync:uploaded", onUploaded, this);
+  await cleanup(server);
+});
+
 add_task(async function teardown() {
   Svc.Prefs.reset("engine.bookmarks.validation.enabled");
 });
--- a/services/sync/tests/unit/test_bookmark_tracker.js
+++ b/services/sync/tests/unit/test_bookmark_tracker.js
@@ -40,17 +40,17 @@ async function verifyTrackerEmpty() {
 
 async function resetTracker() {
   await PlacesTestUtils.markBookmarksAsSynced();
   tracker.resetScore();
 }
 
 async function cleanup() {
   engine.lastSync = 0;
-  engine._needWeakReupload.clear()
+  engine._needWeakUpload.clear()
   await store.wipe();
   await resetTracker();
   await stopTracking();
 }
 
 // startTracking is a signal that the test wants to notice things that happen
 // after this is called (ie, things already tracked should be discarded.)
 async function startTracking() {
--- a/toolkit/components/places/PlacesSyncUtils.jsm
+++ b/toolkit/components/places/PlacesSyncUtils.jsm
@@ -109,21 +109,18 @@ const BookmarkSyncUtils = PlacesSyncUtil
   },
 
   /**
    * Resolves to an array of the syncIDs of bookmarks that have a nonzero change
    * counter
    */
   async getChangedIds() {
     let db = await PlacesUtils.promiseDBConnection();
-    let result = await db.executeCached(`
-      SELECT guid FROM moz_bookmarks
-      WHERE syncChangeCounter >= 1`);
-    return result.map(row =>
-      BookmarkSyncUtils.guidToSyncId(row.getResultByName("guid")));
+    let changes = await pullSyncChanges(db);
+    return Object.keys(changes);
   },
 
   /**
    * Fetches the sync IDs for a folder's children, ordered by their position
    * within the folder.
    */
   async fetchChildSyncIds(parentSyncId) {
     PlacesUtils.SYNC_BOOKMARK_VALIDATORS.syncId(parentSyncId);
@@ -1722,19 +1719,18 @@ function addRowToChangeRecords(row, chan
     counter: row.getResultByName("syncChangeCounter"),
     status: row.getResultByName("syncStatus"),
     tombstone: !!row.getResultByName("tombstone"),
     synced: false,
   };
 }
 
 /**
- * Queries the database for synced bookmarks and tombstones, updates the sync
- * status of all "NEW" bookmarks to "NORMAL", and returns a changeset for the
- * Sync bookmarks engine.
+ * Queries the database for synced bookmarks and tombstones, and returns a
+ * changeset for the Sync bookmarks engine.
  *
  * @param db
  *        The Sqlite.jsm connection handle.
  * @return {Promise} resolved once all items have been fetched.
  * @resolves to an object containing records for changed bookmarks, keyed by
  *           the sync ID.
  */
 var pullSyncChanges = async function(db) {