Bug 1294599 - Implement batched downloads of sync collections on desktop r?rnewman,markh draft
authorThom Chiovoloni <tchiovoloni@mozilla.com>
Thu, 06 Oct 2016 17:52:27 -0400
changeset 427734 a30b1656f0a2ede8dc3e0d8496e28d672d98bdde
parent 427613 e2d125de7b243ca88d04bd7af6059b5db7a2249f
child 534548 71cd794f3759f9c4ab573b7aa613fd7f84c81d39
push id33105
push userbmo:tchiovoloni@mozilla.com
push dateThu, 20 Oct 2016 20:53:48 +0000
reviewersrnewman, markh
bugs1294599
milestone52.0a1
Bug 1294599 - Implement batched downloads of sync collections on desktop r?rnewman,markh MozReview-Commit-ID: 6la9t1FxQhH
services/sync/modules/bookmark_validator.js
services/sync/modules/collection_validator.js
services/sync/modules/constants.js
services/sync/modules/engines.js
services/sync/modules/record.js
services/sync/tests/unit/head_http_server.js
services/sync/tests/unit/test_bookmark_engine.js
services/sync/tests/unit/test_collection_getBatched.js
services/sync/tests/unit/xpcshell.ini
--- a/services/sync/modules/bookmark_validator.js
+++ b/services/sync/modules/bookmark_validator.js
@@ -715,17 +715,20 @@ class BookmarkValidator {
     let collection = engine.itemSource();
     let collectionKey = engine.service.collectionKeys.keyForCollection(engine.name);
     collection.full = true;
     let items = [];
     collection.recordHandler = function(item) {
       item.decrypt(collectionKey);
       items.push(item.cleartext);
     };
-    collection.get();
+    let resp = collection.getBatched();
+    if (!resp.success) {
+      throw resp;
+    }
     return items;
   }
 
   validate(engine) {
     let self = this;
     return Task.spawn(function*() {
       let start = Date.now();
       let clientTree = yield PlacesUtils.promiseBookmarksTree("", {
--- a/services/sync/modules/collection_validator.js
+++ b/services/sync/modules/collection_validator.js
@@ -65,17 +65,20 @@ class CollectionValidator {
     let collection = engine.itemSource();
     let collectionKey = engine.service.collectionKeys.keyForCollection(engine.name);
     collection.full = true;
     let items = [];
     collection.recordHandler = function(item) {
       item.decrypt(collectionKey);
       items.push(item.cleartext);
     };
-    collection.get();
+    let resp = collection.getBatched();
+    if (!resp.success) {
+      throw resp;
+    }
     return items;
   }
 
   // Should return a promise that resolves to an array of client items.
   getClientItems() {
     return Promise.reject("Must implement");
   }
 
--- a/services/sync/modules/constants.js
+++ b/services/sync/modules/constants.js
@@ -71,16 +71,20 @@ DEFAULT_MOBILE_GUID_FETCH_BATCH_SIZE:  5
 // Default batch size for applying incoming records.
 DEFAULT_STORE_BATCH_SIZE:              1,
 HISTORY_STORE_BATCH_SIZE:              50,      // same as MOBILE_BATCH_SIZE
 FORMS_STORE_BATCH_SIZE:                50,      // same as MOBILE_BATCH_SIZE
 PASSWORDS_STORE_BATCH_SIZE:            50,      // same as MOBILE_BATCH_SIZE
 ADDONS_STORE_BATCH_SIZE:               1000000, // process all addons at once
 APPS_STORE_BATCH_SIZE:                 50,      // same as MOBILE_BATCH_SIZE
 
+// Default batch size for download batching
+// (how many records are fetched at a time from the server when batching is used).
+DEFAULT_DOWNLOAD_BATCH_SIZE:           1000,
+
 // score thresholds for early syncs
 SINGLE_USER_THRESHOLD:                 1000,
 MULTI_DEVICE_THRESHOLD:                300,
 
 // Other score increment constants
 SCORE_INCREMENT_SMALL:                 1,
 SCORE_INCREMENT_MEDIUM:                10,
 
--- a/services/sync/modules/engines.js
+++ b/services/sync/modules/engines.js
@@ -1153,17 +1153,17 @@ SyncEngine.prototype = {
       if (applyBatch.length == self.applyIncomingBatchSize) {
         doApplyBatch.call(self);
       }
       self._store._sleep(0);
     };
 
     // Only bother getting data from the server if there's new things
     if (this.lastModified == null || this.lastModified > this.lastSync) {
-      let resp = newitems.get();
+      let resp = newitems.getBatched();
       doApplyBatchAndPersistFailed.call(this);
       if (!resp.success) {
         resp.failureCode = ENGINE_DOWNLOAD_FAIL;
         throw resp;
       }
 
       if (aborting) {
         throw aborting;
--- a/services/sync/modules/record.js
+++ b/services/sync/modules/record.js
@@ -526,19 +526,22 @@ this.Collection = function Collection(ur
   this._service = service;
 
   this._full = false;
   this._ids = null;
   this._limit = 0;
   this._older = 0;
   this._newer = 0;
   this._data = [];
-  // optional members used by batch operations.
+  // optional members used by batch upload operations.
   this._batch = null;
   this._commit = false;
+  // Used for batch download operations -- note that this is explicitly an
+  // opaque value and not (necessarily) a number.
+  this._offset = null;
 }
 Collection.prototype = {
   __proto__: Resource.prototype,
   _logName: "Sync.Collection",
 
   _rebuildURL: function Coll__rebuildURL() {
     // XXX should consider what happens if it's not a URL...
     this.uri.QueryInterface(Ci.nsIURL);
@@ -556,16 +559,18 @@ Collection.prototype = {
     if (this.ids != null)
       args.push("ids=" + this.ids);
     if (this.limit > 0 && this.limit != Infinity)
       args.push("limit=" + this.limit);
     if (this._batch)
       args.push("batch=" + encodeURIComponent(this._batch));
     if (this._commit)
       args.push("commit=true");
+    if (this._offset)
+      args.push("offset=" + encodeURIComponent(this._offset));
 
     this.uri.query = (args.length > 0)? '?' + args.join('&') : '';
   },
 
   // get full items
   get full() { return this._full; },
   set full(value) {
     this._full = value;
@@ -605,47 +610,132 @@ Collection.prototype = {
   // newest (newest first)
   // index
   get sort() { return this._sort; },
   set sort(value) {
     this._sort = value;
     this._rebuildURL();
   },
 
+  get offset() { return this._offset; },
+  set offset(value) {
+    this._offset = value;
+    this._rebuildURL();
+  },
+
   // Set information about the batch for this request.
   get batch() { return this._batch; },
   set batch(value) {
     this._batch = value;
     this._rebuildURL();
   },
 
   get commit() { return this._commit; },
   set commit(value) {
     this._commit = value && true;
     this._rebuildURL();
   },
 
+  // Similar to get(), but will page through the items `batchSize` at a time,
+  // deferring calling the record handler until we've gotten them all.
+  //
+  // Returns the last response processed, and doesn't run the record handler
+  // on any items if a non-success status is received while downloading the
+  // records (or if a network error occurs).
+  getBatched(batchSize = DEFAULT_DOWNLOAD_BATCH_SIZE) {
+    let totalLimit = Number(this.limit) || Infinity;
+    if (batchSize <= 0 || batchSize >= totalLimit) {
+      // Invalid batch sizes should arguably be an error, but they're easy to handle
+      return this.get();
+    }
+
+    if (!this.full) {
+      throw new Error("getBatched is unimplemented for guid-only GETs");
+    }
+
+    // _onComplete and _onProgress are reset after each `get` by AsyncResource.
+    // We overwrite _onRecord to something that stores the data in an array
+    // until the end.
+    let { _onComplete, _onProgress, _onRecord } = this;
+    let recordBuffer = [];
+    let resp;
+    try {
+      this._onRecord = r => recordBuffer.push(r);
+      let lastModifiedTime;
+      this.limit = batchSize;
+
+      do {
+        this._onProgress = _onProgress;
+        this._onComplete = _onComplete;
+        if (batchSize + recordBuffer.length > totalLimit) {
+          this.limit = totalLimit - recordBuffer.length;
+        }
+        this._log.trace("Performing batched GET", { limit: this.limit, offset: this.offset });
+        // Actually perform the request
+        resp = this.get();
+        if (!resp.success) {
+          break;
+        }
+
+        // Initialize last modified, or check that something broken isn't happening.
+        let lastModified = resp.headers["x-last-modified"];
+        if (!lastModifiedTime) {
+          lastModifiedTime = lastModified;
+          this.setHeader("X-If-Unmodified-Since", lastModified);
+        } else if (lastModified != lastModifiedTime) {
+          // Should be impossible -- We'd get a 412 in this case.
+          throw new Error("X-Last-Modified changed in the middle of a download batch! " +
+                          `${lastModified} => ${lastModifiedTime}`)
+        }
+
+        // If this is missing, we're finished.
+        this.offset = resp.headers["x-weave-next-offset"];
+      } while (this.offset && totalLimit > recordBuffer.length);
+    } finally {
+      // Ensure we undo any temporary state so that subsequent calls to get()
+      // or getBatched() work properly. We do this before calling the record
+      // handler so that we can more convincingly pretend to be a normal get()
+      // call. Note: we're resetting these to the values they had before this
+      // function was called.
+      this._onRecord = _onRecord;
+      this._limit = totalLimit;
+      this._offset = null;
+      delete this._headers["x-if-unmodified-since"];
+      this._rebuildURL();
+    }
+    if (resp.success && Async.checkAppReady()) {
+      // call the original _onRecord (e.g. the user supplied record handler)
+      // for each record we've stored
+      for (let record of recordBuffer) {
+        this._onRecord(record);
+      }
+    }
+    return resp;
+  },
+
   set recordHandler(onRecord) {
     // Save this because onProgress is called with this as the ChannelListener
     let coll = this;
 
     // Switch to newline separated records for incremental parsing
     coll.setHeader("Accept", "application/newlines");
 
+    this._onRecord = onRecord;
+
     this._onProgress = function() {
       let newline;
       while ((newline = this._data.indexOf("\n")) > 0) {
         // Split the json record from the rest of the data
         let json = this._data.slice(0, newline);
         this._data = this._data.slice(newline + 1);
 
         // Deserialize a record from json and give it to the callback
         let record = new coll._recordObj();
         record.deserialize(json);
-        onRecord(record);
+        coll._onRecord(record);
       }
     };
   },
 
   // This object only supports posting via the postQueue object.
   post() {
     throw new Error("Don't directly post to a collection - use newPostQueue instead");
   },
--- a/services/sync/tests/unit/head_http_server.js
+++ b/services/sync/tests/unit/head_http_server.js
@@ -289,33 +289,45 @@ ServerCollection.prototype = {
     if (options.full) {
       let data = [];
       for (let [id, wbo] of Object.entries(this._wbos)) {
         // Drop deleted.
         if (wbo.modified && this._inResultSet(wbo, options)) {
           data.push(wbo.get());
         }
       }
+      let start = options.offset || 0;
       if (options.limit) {
-        data = data.slice(0, options.limit);
+        let numItemsPastOffset = data.length - start;
+        data = data.slice(start, start + options.limit);
+        // use options as a backchannel to set x-weave-next-offset
+        if (numItemsPastOffset > options.limit) {
+          options.nextOffset = start + options.limit;
+        }
+      } else if (start) {
+        data = data.slice(start);
       }
       // Our implementation of application/newlines.
       result = data.join("\n") + "\n";
 
       // Use options as a backchannel to report count.
       options.recordCount = data.length;
     } else {
       let data = [];
       for (let [id, wbo] of Object.entries(this._wbos)) {
         if (this._inResultSet(wbo, options)) {
           data.push(id);
         }
       }
+      let start = options.offset || 0;
       if (options.limit) {
-        data = data.slice(0, options.limit);
+        data = data.slice(start, start + options.limit);
+        options.nextOffset = start + options.limit;
+      } else if (start) {
+        data = data.slice(start);
       }
       result = JSON.stringify(data);
       options.recordCount = data.length;
     }
     return result;
   },
 
   post: function(input) {
@@ -386,28 +398,35 @@ ServerCollection.prototype = {
         options.ids = options.ids.split(",");
       }
       if (options.newer) {
         options.newer = parseFloat(options.newer);
       }
       if (options.limit) {
         options.limit = parseInt(options.limit, 10);
       }
+      if (options.offset) {
+        options.offset = parseInt(options.offset, 10);
+      }
 
       switch(request.method) {
         case "GET":
           body = self.get(options, request);
-          // "If supported by the db, this header will return the number of
-          // records total in the request body of any multiple-record GET
-          // request."
-          let records = options.recordCount;
-          self._log.info("Records: " + records);
+          // see http://moz-services-docs.readthedocs.io/en/latest/storage/apis-1.5.html
+          // for description of these headers.
+          let { recordCount: records, nextOffset } = options;
+
+          self._log.info("Records: " + records + ", nextOffset: " + nextOffset);
           if (records != null) {
             response.setHeader("X-Weave-Records", "" + records);
           }
+          if (nextOffset) {
+            response.setHeader("X-Weave-Next-Offset", "" + nextOffset);
+          }
+          response.setHeader("X-Last-Modified", "" + this.timestamp);
           break;
 
         case "POST":
           let res = self.post(readBytesFromInputStream(request.bodyInputStream), request);
           body = JSON.stringify(res);
           response.newModified = res.modified;
           break;
 
--- a/services/sync/tests/unit/test_bookmark_engine.js
+++ b/services/sync/tests/unit/test_bookmark_engine.js
@@ -597,16 +597,19 @@ add_task(function* test_misreconciled_ro
   encrypted.decrypt = function () {
     for (let x in rec) {
       encrypted[x] = rec[x];
     }
   };
 
   _("Applying record.");
   engine._processIncoming({
+    getBatched() {
+      return this.get();
+    },
     get: function () {
       this.recordHandler(encrypted);
       return {success: true}
     },
   });
 
   // Ensure that afterwards, toolbar is still there.
   // As of 2012-12-05, this only passes because Places doesn't use "toolbar" as
new file mode 100644
--- /dev/null
+++ b/services/sync/tests/unit/test_collection_getBatched.js
@@ -0,0 +1,195 @@
+/* Any copyright is dedicated to the Public Domain.
+   http://creativecommons.org/publicdomain/zero/1.0/ */
+
+Cu.import("resource://services-sync/record.js");
+Cu.import("resource://services-sync/service.js");
+
+function run_test() {
+  initTestLogging("Trace");
+  Log.repository.getLogger("Sync.Collection").level = Log.Level.Trace;
+  run_next_test();
+}
+
+function recordRange(lim, offset, total) {
+  let res = [];
+  for (let i = offset; i < Math.min(lim + offset, total); ++i) {
+    res.push(JSON.stringify({ id: String(i), payload: "test:" + i }));
+  }
+  return res.join("\n") + "\n";
+}
+
+function get_test_collection_info({ totalRecords, batchSize, lastModified,
+                                    throwAfter = Infinity,
+                                    interruptedAfter = Infinity }) {
+  let coll = new Collection("http://example.com/test/", WBORecord, Service);
+  coll.full = true;
+  let requests = [];
+  let responses = [];
+  let sawRecord = false;
+  coll.get = function() {
+    ok(!sawRecord); // make sure we call record handler after all requests.
+    let limit = +this.limit;
+    let offset = 0;
+    if (this.offset) {
+      equal(this.offset.slice(0, 6), "foobar");
+      offset = +this.offset.slice(6);
+    }
+    requests.push({
+      limit,
+      offset,
+      spec: this.spec,
+      headers: Object.assign({}, this.headers)
+    });
+    if (--throwAfter === 0) {
+      throw "Some Network Error";
+    }
+    let body = recordRange(limit, offset, totalRecords);
+    this._onProgress.call({ _data: body });
+    let response = {
+      body,
+      success: true,
+      status: 200,
+      headers: {}
+    };
+    if (--interruptedAfter === 0) {
+      response.success = false;
+      response.status = 412;
+      response.body = "";
+    } else if (offset + limit < totalRecords) {
+      // Ensure we're treating this as an opaque string, since the docs say
+      // it might not be numeric.
+      response.headers["x-weave-next-offset"] = "foobar" + (offset + batchSize);
+    }
+    response.headers["x-last-modified"] = lastModified;
+    responses.push(response);
+    return response;
+  };
+
+  let records = [];
+  coll.recordHandler = function(record) {
+    sawRecord = true;
+    // ensure records are coming in in the right order
+    equal(record.id, String(records.length));
+    equal(record.payload, "test:" + records.length);
+    records.push(record);
+  };
+  return { records, responses, requests, coll };
+}
+
+add_test(function test_success() {
+  const totalRecords = 11;
+  const batchSize = 2;
+  const lastModified = "111111";
+  let { records, responses, requests, coll } = get_test_collection_info({
+    totalRecords,
+    batchSize,
+    lastModified,
+  });
+  let response = coll.getBatched(batchSize);
+
+  equal(requests.length, Math.ceil(totalRecords / batchSize));
+
+  // records are mostly checked in recordHandler, we just care about the length
+  equal(records.length, totalRecords);
+
+  // ensure we're returning the last response
+  equal(responses[responses.length - 1], response);
+
+  // check first separately since its a bit of a special case
+  ok(!requests[0].headers["x-if-unmodified-since"]);
+  ok(!requests[0].offset);
+  equal(requests[0].limit, batchSize);
+  let expectedOffset = 2;
+  for (let i = 1; i < requests.length; ++i) {
+    let req = requests[i];
+    equal(req.headers["x-if-unmodified-since"], lastModified);
+    equal(req.limit, batchSize);
+    if (i !== requests.length - 1) {
+      equal(req.offset, expectedOffset);
+    }
+
+    expectedOffset += batchSize;
+  }
+
+  // ensure we cleaned up anything that would break further
+  // use of this collection.
+  ok(!coll._headers["x-if-unmodified-since"]);
+  ok(!coll.offset);
+  ok(!coll.limit || (coll.limit == Infinity));
+
+  run_next_test();
+});
+
+add_test(function test_total_limit() {
+  _("getBatched respects the (initial) value of the limit property");
+  const totalRecords = 100;
+  const recordLimit = 11;
+  const batchSize = 2;
+  const lastModified = "111111";
+  let { records, responses, requests, coll } = get_test_collection_info({
+    totalRecords,
+    batchSize,
+    lastModified,
+  });
+  coll.limit = recordLimit;
+  let response = coll.getBatched(batchSize);
+
+  equal(requests.length, Math.ceil(recordLimit / batchSize));
+  equal(records.length, recordLimit);
+
+  for (let i = 0; i < requests.length; ++i) {
+    let req = requests[i];
+    if (i !== requests.length - 1) {
+      equal(req.limit, batchSize);
+    } else {
+      equal(req.limit, recordLimit % batchSize);
+    }
+  }
+
+  equal(coll._limit, recordLimit);
+
+  run_next_test();
+});
+
+add_test(function test_412() {
+  _("We shouldn't record records if we get a 412 in the middle of a batch");
+  const totalRecords = 11;
+  const batchSize = 2;
+  const lastModified = "111111";
+  let { records, responses, requests, coll } = get_test_collection_info({
+    totalRecords,
+    batchSize,
+    lastModified,
+    interruptedAfter: 3
+  });
+  let response = coll.getBatched(batchSize);
+
+  equal(requests.length, 3);
+  equal(records.length, 0); // record handler shouldn't be called for anything
+
+  // ensure we're returning the last response
+  equal(responses[responses.length - 1], response);
+
+  ok(!response.success);
+  equal(response.status, 412);
+  run_next_test();
+});
+
+add_test(function test_get_throws() {
+  _("We shouldn't record records if get() throws for some reason");
+  const totalRecords = 11;
+  const batchSize = 2;
+  const lastModified = "111111";
+  let { records, responses, requests, coll } = get_test_collection_info({
+    totalRecords,
+    batchSize,
+    lastModified,
+    throwAfter: 3
+  });
+
+  throws(() => coll.getBatched(batchSize), "Some Network Error");
+
+  equal(requests.length, 3);
+  equal(records.length, 0);
+  run_next_test();
+});
--- a/services/sync/tests/unit/xpcshell.ini
+++ b/services/sync/tests/unit/xpcshell.ini
@@ -52,16 +52,17 @@ skip-if = os == "win" || os == "android"
 [test_resource_async.js]
 [test_resource_header.js]
 [test_resource_ua.js]
 [test_syncstoragerequest.js]
 
 # Generic Sync types.
 [test_browserid_identity.js]
 [test_collection_inc_get.js]
+[test_collection_getBatched.js]
 [test_collections_recovery.js]
 [test_identity_manager.js]
 [test_keys.js]
 [test_records_crypto.js]
 [test_records_wbo.js]
 
 # Engine APIs.
 [test_engine.js]