Bug 1313967 - Buffer outgoing Sync records in memory. r?markh draft
authorKit Cambridge <kit@yakshaving.ninja>
Tue, 01 Nov 2016 18:27:09 -0700
changeset 432432 91bfee385dfbb5ba731a05ecb9561a8b1658527f
parent 431855 37ab1d54a08e7e1431660b22377428b74dcd090a
child 432434 5901e84839442eced6a72f29c7242b9cb754f906
push id34311
push userbmo:kcambridge@mozilla.com
push dateWed, 02 Nov 2016 02:38:42 +0000
reviewersmarkh
bugs1313967, 1303679
milestone52.0a1
Bug 1313967 - Buffer outgoing Sync records in memory. r?markh The current design of `Engine::_uploadOutgoing` makes it hard to upload records that aren't in `Engine::_modified`. This commit extracts the `_createRecord` calls into a `fetchOutgoingRecords` method that returns an array of encrypted records for upload. Engines can override this method to include additional records. This is used in bug 1303679 to add a cleanup stage that writes tombstones for records that shouldn't be on the server. MozReview-Commit-ID: 9iePxJzWl09
services/sync/modules/engines.js
--- a/services/sync/modules/engines.js
+++ b/services/sync/modules/engines.js
@@ -1486,22 +1486,22 @@ SyncEngine.prototype = {
                    item.id);
     return remoteIsNewer;
   },
 
   // Upload outgoing records.
   _uploadOutgoing: function () {
     this._log.trace("Uploading local changes to server.");
 
-    let modifiedIDs = this._modified.ids();
-    if (modifiedIDs.length) {
-      this._log.trace("Preparing " + modifiedIDs.length +
+    let records = this.fetchOutgoingRecords();
+    if (records.length) {
+      this._log.trace("Uploading " + records.length +
                       " outgoing records");
 
-      let counts = { sent: modifiedIDs.length, failed: 0 };
+      let counts = { sent: records.length, failed: 0 };
 
       // collection we'll upload
       let up = new Collection(this.engineURL, null, this.service);
 
       let failed = [];
       let successful = [];
       let handleResponse = (resp, batchOngoing = false) => {
         // Note: We don't want to update this.lastSync, or this._modified until
@@ -1542,48 +1542,58 @@ SyncEngine.prototype = {
 
         // clear for next batch
         failed.length = 0;
         successful.length = 0;
       };
 
       let postQueue = up.newPostQueue(this._log, this.lastSync, handleResponse);
 
-      for (let id of modifiedIDs) {
-        let out;
-        let ok = false;
-        try {
-          out = this._createRecord(id);
-          if (this._log.level <= Log.Level.Trace)
-            this._log.trace("Outgoing: " + out);
-
-          out.encrypt(this.service.collectionKeys.keyForCollection(this.name));
-          ok = true;
-        } catch (ex) {
-          if (Async.isShutdownException(ex)) {
-            throw ex;
-          }
-          this._log.warn("Error creating record", ex);
-        }
-        if (ok) {
-          let { enqueued, error } = postQueue.enqueue(out);
-          if (!enqueued) {
-            ++counts.failed;
-            if (!this.allowSkippedRecord) {
-              throw error;
-            }
+      for (let out of records) {
+        let { enqueued, error } = postQueue.enqueue(out);
+        if (!enqueued) {
+          ++counts.failed;
+          if (!this.allowSkippedRecord) {
+            throw error;
           }
         }
-        this._store._sleep(0);
       }
       postQueue.flush(true);
       Observers.notify("weave:engine:sync:uploaded", counts, this.name);
     }
   },
 
+  fetchOutgoingRecords() {
+    let modifiedIDs = this._modified.ids();
+    this._log.trace("Fetching " + modifiedIDs.length +
+                    " outgoing records");
+
+    let records = [];
+    for (let id of modifiedIDs) {
+      try {
+        let out = this._createRecord(id);
+        if (this._log.level <= Log.Level.Trace) {
+          this._log.trace("Outgoing: " + out);
+        }
+        out.encrypt(this.service.collectionKeys.keyForCollection(this.name));
+        records.push(out);
+      } catch (ex) {
+        if (Async.isShutdownException(ex)) {
+          throw ex;
+        }
+        this._log.warn(`Error creating record for ${id}`, ex);
+      }
+      // `_createRecord` might perform synchronous I/O, so we yield back to the
+      // main thread. See bug 636312.
+      this._store._sleep(0);
+    }
+
+    return records;
+  },
+
   _onRecordsWritten(succeeded, failed) {
     // Implement this method to take specific actions against successfully
     // uploaded records and failed records.
   },
 
   // Any cleanup necessary.
   // Save the current snapshot so as to calculate changes at next sync
   _syncFinish: function () {