Bug 1294599 - Implement batched downloads of sync collections on desktop r?rnewman,markh
MozReview-Commit-ID: 6la9t1FxQhH
--- a/services/sync/modules/bookmark_validator.js
+++ b/services/sync/modules/bookmark_validator.js
@@ -715,17 +715,20 @@ class BookmarkValidator {
let collection = engine.itemSource();
let collectionKey = engine.service.collectionKeys.keyForCollection(engine.name);
collection.full = true;
let items = [];
collection.recordHandler = function(item) {
item.decrypt(collectionKey);
items.push(item.cleartext);
};
- collection.get();
+ let resp = collection.getBatched();
+ if (!resp.success) {
+ throw resp;
+ }
return items;
}
validate(engine) {
let self = this;
return Task.spawn(function*() {
let start = Date.now();
let clientTree = yield PlacesUtils.promiseBookmarksTree("", {
--- a/services/sync/modules/collection_validator.js
+++ b/services/sync/modules/collection_validator.js
@@ -65,17 +65,20 @@ class CollectionValidator {
let collection = engine.itemSource();
let collectionKey = engine.service.collectionKeys.keyForCollection(engine.name);
collection.full = true;
let items = [];
collection.recordHandler = function(item) {
item.decrypt(collectionKey);
items.push(item.cleartext);
};
- collection.get();
+ let resp = collection.getBatched();
+ if (!resp.success) {
+ throw resp;
+ }
return items;
}
// Should return a promise that resolves to an array of client items.
getClientItems() {
return Promise.reject("Must implement");
}
--- a/services/sync/modules/constants.js
+++ b/services/sync/modules/constants.js
@@ -71,16 +71,20 @@ DEFAULT_MOBILE_GUID_FETCH_BATCH_SIZE: 5
// Default batch size for applying incoming records.
DEFAULT_STORE_BATCH_SIZE: 1,
HISTORY_STORE_BATCH_SIZE: 50, // same as MOBILE_BATCH_SIZE
FORMS_STORE_BATCH_SIZE: 50, // same as MOBILE_BATCH_SIZE
PASSWORDS_STORE_BATCH_SIZE: 50, // same as MOBILE_BATCH_SIZE
ADDONS_STORE_BATCH_SIZE: 1000000, // process all addons at once
APPS_STORE_BATCH_SIZE: 50, // same as MOBILE_BATCH_SIZE
+// Default batch size for download batching
+// (how many records are fetched at a time from the server when batching is used).
+DEFAULT_DOWNLOAD_BATCH_SIZE: 1000,
+
// score thresholds for early syncs
SINGLE_USER_THRESHOLD: 1000,
MULTI_DEVICE_THRESHOLD: 300,
// Other score increment constants
SCORE_INCREMENT_SMALL: 1,
SCORE_INCREMENT_MEDIUM: 10,
--- a/services/sync/modules/engines.js
+++ b/services/sync/modules/engines.js
@@ -1153,17 +1153,17 @@ SyncEngine.prototype = {
if (applyBatch.length == self.applyIncomingBatchSize) {
doApplyBatch.call(self);
}
self._store._sleep(0);
};
// Only bother getting data from the server if there's new things
if (this.lastModified == null || this.lastModified > this.lastSync) {
- let resp = newitems.get();
+ let resp = newitems.getBatched();
doApplyBatchAndPersistFailed.call(this);
if (!resp.success) {
resp.failureCode = ENGINE_DOWNLOAD_FAIL;
throw resp;
}
if (aborting) {
throw aborting;
--- a/services/sync/modules/record.js
+++ b/services/sync/modules/record.js
@@ -526,19 +526,22 @@ this.Collection = function Collection(ur
this._service = service;
this._full = false;
this._ids = null;
this._limit = 0;
this._older = 0;
this._newer = 0;
this._data = [];
- // optional members used by batch operations.
+ // optional members used by batch upload operations.
this._batch = null;
this._commit = false;
+ // Used for batch download operations -- note that this is explicitly an
+ // opaque value and not (necessarily) a number.
+ this._offset = null;
}
Collection.prototype = {
__proto__: Resource.prototype,
_logName: "Sync.Collection",
_rebuildURL: function Coll__rebuildURL() {
// XXX should consider what happens if it's not a URL...
this.uri.QueryInterface(Ci.nsIURL);
@@ -556,16 +559,18 @@ Collection.prototype = {
if (this.ids != null)
args.push("ids=" + this.ids);
if (this.limit > 0 && this.limit != Infinity)
args.push("limit=" + this.limit);
if (this._batch)
args.push("batch=" + encodeURIComponent(this._batch));
if (this._commit)
args.push("commit=true");
+ if (this._offset)
+ args.push("offset=" + encodeURIComponent(this._offset));
this.uri.query = (args.length > 0)? '?' + args.join('&') : '';
},
// get full items
get full() { return this._full; },
set full(value) {
this._full = value;
@@ -605,47 +610,132 @@ Collection.prototype = {
// newest (newest first)
// index
get sort() { return this._sort; },
set sort(value) {
this._sort = value;
this._rebuildURL();
},
+ get offset() { return this._offset; },
+ set offset(value) {
+ this._offset = value;
+ this._rebuildURL();
+ },
+
// Set information about the batch for this request.
get batch() { return this._batch; },
set batch(value) {
this._batch = value;
this._rebuildURL();
},
get commit() { return this._commit; },
set commit(value) {
this._commit = value && true;
this._rebuildURL();
},
+ // Similar to get(), but will page through the items `batchSize` at a time,
+ // deferring calling the record handler until we've gotten them all.
+ //
+ // Returns the last response processed, and doesn't run the record handler
+ // on any items if a non-success status is received while downloading the
+ // records (or if a network error occurs).
+ getBatched(batchSize = DEFAULT_DOWNLOAD_BATCH_SIZE) {
+ let totalLimit = Number(this.limit) || Infinity;
+ if (batchSize <= 0 || batchSize >= totalLimit) {
+ // Invalid batch sizes should arguably be an error, but they're easy to handle
+ return this.get();
+ }
+
+ if (!this.full) {
+ throw new Error("getBatched is unimplemented for guid-only GETs");
+ }
+
+ // _onComplete and _onProgress are reset after each `get` by AsyncResource.
+ // We overwrite _onRecord to something that stores the data in an array
+ // until the end.
+ let { _onComplete, _onProgress, _onRecord } = this;
+ let recordBuffer = [];
+ let resp;
+ try {
+ this._onRecord = r => recordBuffer.push(r);
+ let lastModifiedTime;
+ this.limit = batchSize;
+
+ do {
+ this._onProgress = _onProgress;
+ this._onComplete = _onComplete;
+ if (batchSize + recordBuffer.length > totalLimit) {
+ this.limit = totalLimit - recordBuffer.length;
+ }
+ this._log.trace("Performing batched GET", { limit: this.limit, offset: this.offset });
+ // Actually perform the request
+ resp = this.get();
+ if (!resp.success) {
+ break;
+ }
+
+ // Initialize last modified, or check that something broken isn't happening.
+ let lastModified = resp.headers["x-last-modified"];
+ if (!lastModifiedTime) {
+ lastModifiedTime = lastModified;
+ this.setHeader("X-If-Unmodified-Since", lastModified);
+ } else if (lastModified != lastModifiedTime) {
+ // Should be impossible -- We'd get a 412 in this case.
+ throw new Error("X-Last-Modified changed in the middle of a download batch! " +
+ `${lastModified} => ${lastModifiedTime}`)
+ }
+
+ // If this is missing, we're finished.
+ this.offset = resp.headers["x-weave-next-offset"];
+ } while (this.offset && totalLimit > recordBuffer.length);
+ } finally {
+ // Ensure we undo any temporary state so that subsequent calls to get()
+ // or getBatched() work properly. We do this before calling the record
+ // handler so that we can more convincingly pretend to be a normal get()
+ // call. Note: we're resetting these to the values they had before this
+ // function was called.
+ this._onRecord = _onRecord;
+ this._limit = totalLimit;
+ this._offset = null;
+ delete this._headers["x-if-unmodified-since"];
+ this._rebuildURL();
+ }
+ if (resp.success && Async.checkAppReady()) {
+ // call the original _onRecord (e.g. the user supplied record handler)
+ // for each record we've stored
+ for (let record of recordBuffer) {
+ this._onRecord(record);
+ }
+ }
+ return resp;
+ },
+
set recordHandler(onRecord) {
// Save this because onProgress is called with this as the ChannelListener
let coll = this;
// Switch to newline separated records for incremental parsing
coll.setHeader("Accept", "application/newlines");
+ this._onRecord = onRecord;
+
this._onProgress = function() {
let newline;
while ((newline = this._data.indexOf("\n")) > 0) {
// Split the json record from the rest of the data
let json = this._data.slice(0, newline);
this._data = this._data.slice(newline + 1);
// Deserialize a record from json and give it to the callback
let record = new coll._recordObj();
record.deserialize(json);
- onRecord(record);
+ coll._onRecord(record);
}
};
},
// This object only supports posting via the postQueue object.
post() {
throw new Error("Don't directly post to a collection - use newPostQueue instead");
},
--- a/services/sync/tests/unit/head_http_server.js
+++ b/services/sync/tests/unit/head_http_server.js
@@ -289,33 +289,45 @@ ServerCollection.prototype = {
if (options.full) {
let data = [];
for (let [id, wbo] of Object.entries(this._wbos)) {
// Drop deleted.
if (wbo.modified && this._inResultSet(wbo, options)) {
data.push(wbo.get());
}
}
+ let start = options.offset || 0;
if (options.limit) {
- data = data.slice(0, options.limit);
+ let numItemsPastOffset = data.length - start;
+ data = data.slice(start, start + options.limit);
+ // use options as a backchannel to set x-weave-next-offset
+ if (numItemsPastOffset > options.limit) {
+ options.nextOffset = start + options.limit;
+ }
+ } else if (start) {
+ data = data.slice(start);
}
// Our implementation of application/newlines.
result = data.join("\n") + "\n";
// Use options as a backchannel to report count.
options.recordCount = data.length;
} else {
let data = [];
for (let [id, wbo] of Object.entries(this._wbos)) {
if (this._inResultSet(wbo, options)) {
data.push(id);
}
}
+ let start = options.offset || 0;
if (options.limit) {
- data = data.slice(0, options.limit);
+ data = data.slice(start, start + options.limit);
+ options.nextOffset = start + options.limit;
+ } else if (start) {
+ data = data.slice(start);
}
result = JSON.stringify(data);
options.recordCount = data.length;
}
return result;
},
post: function(input) {
@@ -386,28 +398,35 @@ ServerCollection.prototype = {
options.ids = options.ids.split(",");
}
if (options.newer) {
options.newer = parseFloat(options.newer);
}
if (options.limit) {
options.limit = parseInt(options.limit, 10);
}
+ if (options.offset) {
+ options.offset = parseInt(options.offset, 10);
+ }
switch(request.method) {
case "GET":
body = self.get(options, request);
- // "If supported by the db, this header will return the number of
- // records total in the request body of any multiple-record GET
- // request."
- let records = options.recordCount;
- self._log.info("Records: " + records);
+ // see http://moz-services-docs.readthedocs.io/en/latest/storage/apis-1.5.html
+ // for description of these headers.
+ let { recordCount: records, nextOffset } = options;
+
+ self._log.info("Records: " + records + ", nextOffset: " + nextOffset);
if (records != null) {
response.setHeader("X-Weave-Records", "" + records);
}
+ if (nextOffset) {
+ response.setHeader("X-Weave-Next-Offset", "" + nextOffset);
+ }
+ response.setHeader("X-Last-Modified", "" + this.timestamp);
break;
case "POST":
let res = self.post(readBytesFromInputStream(request.bodyInputStream), request);
body = JSON.stringify(res);
response.newModified = res.modified;
break;
--- a/services/sync/tests/unit/test_bookmark_engine.js
+++ b/services/sync/tests/unit/test_bookmark_engine.js
@@ -597,16 +597,19 @@ add_task(function* test_misreconciled_ro
encrypted.decrypt = function () {
for (let x in rec) {
encrypted[x] = rec[x];
}
};
_("Applying record.");
engine._processIncoming({
+ getBatched() {
+ return this.get();
+ },
get: function () {
this.recordHandler(encrypted);
return {success: true}
},
});
// Ensure that afterwards, toolbar is still there.
// As of 2012-12-05, this only passes because Places doesn't use "toolbar" as
new file mode 100644
--- /dev/null
+++ b/services/sync/tests/unit/test_collection_getBatched.js
@@ -0,0 +1,195 @@
+/* Any copyright is dedicated to the Public Domain.
+ http://creativecommons.org/publicdomain/zero/1.0/ */
+
+Cu.import("resource://services-sync/record.js");
+Cu.import("resource://services-sync/service.js");
+
+function run_test() {
+ initTestLogging("Trace");
+ Log.repository.getLogger("Sync.Collection").level = Log.Level.Trace;
+ run_next_test();
+}
+
+function recordRange(lim, offset, total) {
+ let res = [];
+ for (let i = offset; i < Math.min(lim + offset, total); ++i) {
+ res.push(JSON.stringify({ id: String(i), payload: "test:" + i }));
+ }
+ return res.join("\n") + "\n";
+}
+
+function get_test_collection_info({ totalRecords, batchSize, lastModified,
+ throwAfter = Infinity,
+ interruptedAfter = Infinity }) {
+ let coll = new Collection("http://example.com/test/", WBORecord, Service);
+ coll.full = true;
+ let requests = [];
+ let responses = [];
+ let sawRecord = false;
+ coll.get = function() {
+ ok(!sawRecord); // make sure we call record handler after all requests.
+ let limit = +this.limit;
+ let offset = 0;
+ if (this.offset) {
+ equal(this.offset.slice(0, 6), "foobar");
+ offset = +this.offset.slice(6);
+ }
+ requests.push({
+ limit,
+ offset,
+ spec: this.spec,
+ headers: Object.assign({}, this.headers)
+ });
+ if (--throwAfter === 0) {
+ throw "Some Network Error";
+ }
+ let body = recordRange(limit, offset, totalRecords);
+ this._onProgress.call({ _data: body });
+ let response = {
+ body,
+ success: true,
+ status: 200,
+ headers: {}
+ };
+ if (--interruptedAfter === 0) {
+ response.success = false;
+ response.status = 412;
+ response.body = "";
+ } else if (offset + limit < totalRecords) {
+ // Ensure we're treating this as an opaque string, since the docs say
+ // it might not be numeric.
+ response.headers["x-weave-next-offset"] = "foobar" + (offset + batchSize);
+ }
+ response.headers["x-last-modified"] = lastModified;
+ responses.push(response);
+ return response;
+ };
+
+ let records = [];
+ coll.recordHandler = function(record) {
+ sawRecord = true;
+ // ensure records are coming in in the right order
+ equal(record.id, String(records.length));
+ equal(record.payload, "test:" + records.length);
+ records.push(record);
+ };
+ return { records, responses, requests, coll };
+}
+
+add_test(function test_success() {
+ const totalRecords = 11;
+ const batchSize = 2;
+ const lastModified = "111111";
+ let { records, responses, requests, coll } = get_test_collection_info({
+ totalRecords,
+ batchSize,
+ lastModified,
+ });
+ let response = coll.getBatched(batchSize);
+
+ equal(requests.length, Math.ceil(totalRecords / batchSize));
+
+ // records are mostly checked in recordHandler, we just care about the length
+ equal(records.length, totalRecords);
+
+ // ensure we're returning the last response
+ equal(responses[responses.length - 1], response);
+
+ // check first separately since its a bit of a special case
+ ok(!requests[0].headers["x-if-unmodified-since"]);
+ ok(!requests[0].offset);
+ equal(requests[0].limit, batchSize);
+ let expectedOffset = 2;
+ for (let i = 1; i < requests.length; ++i) {
+ let req = requests[i];
+ equal(req.headers["x-if-unmodified-since"], lastModified);
+ equal(req.limit, batchSize);
+ if (i !== requests.length - 1) {
+ equal(req.offset, expectedOffset);
+ }
+
+ expectedOffset += batchSize;
+ }
+
+ // ensure we cleaned up anything that would break further
+ // use of this collection.
+ ok(!coll._headers["x-if-unmodified-since"]);
+ ok(!coll.offset);
+ ok(!coll.limit || (coll.limit == Infinity));
+
+ run_next_test();
+});
+
+add_test(function test_total_limit() {
+ _("getBatched respects the (initial) value of the limit property");
+ const totalRecords = 100;
+ const recordLimit = 11;
+ const batchSize = 2;
+ const lastModified = "111111";
+ let { records, responses, requests, coll } = get_test_collection_info({
+ totalRecords,
+ batchSize,
+ lastModified,
+ });
+ coll.limit = recordLimit;
+ let response = coll.getBatched(batchSize);
+
+ equal(requests.length, Math.ceil(recordLimit / batchSize));
+ equal(records.length, recordLimit);
+
+ for (let i = 0; i < requests.length; ++i) {
+ let req = requests[i];
+ if (i !== requests.length - 1) {
+ equal(req.limit, batchSize);
+ } else {
+ equal(req.limit, recordLimit % batchSize);
+ }
+ }
+
+ equal(coll._limit, recordLimit);
+
+ run_next_test();
+});
+
+add_test(function test_412() {
+ _("We shouldn't record records if we get a 412 in the middle of a batch");
+ const totalRecords = 11;
+ const batchSize = 2;
+ const lastModified = "111111";
+ let { records, responses, requests, coll } = get_test_collection_info({
+ totalRecords,
+ batchSize,
+ lastModified,
+ interruptedAfter: 3
+ });
+ let response = coll.getBatched(batchSize);
+
+ equal(requests.length, 3);
+ equal(records.length, 0); // record handler shouldn't be called for anything
+
+ // ensure we're returning the last response
+ equal(responses[responses.length - 1], response);
+
+ ok(!response.success);
+ equal(response.status, 412);
+ run_next_test();
+});
+
+add_test(function test_get_throws() {
+ _("We shouldn't record records if get() throws for some reason");
+ const totalRecords = 11;
+ const batchSize = 2;
+ const lastModified = "111111";
+ let { records, responses, requests, coll } = get_test_collection_info({
+ totalRecords,
+ batchSize,
+ lastModified,
+ throwAfter: 3
+ });
+
+ throws(() => coll.getBatched(batchSize), "Some Network Error");
+
+ equal(requests.length, 3);
+ equal(records.length, 0);
+ run_next_test();
+});
--- a/services/sync/tests/unit/xpcshell.ini
+++ b/services/sync/tests/unit/xpcshell.ini
@@ -52,16 +52,17 @@ skip-if = os == "win" || os == "android"
[test_resource_async.js]
[test_resource_header.js]
[test_resource_ua.js]
[test_syncstoragerequest.js]
# Generic Sync types.
[test_browserid_identity.js]
[test_collection_inc_get.js]
+[test_collection_getBatched.js]
[test_collections_recovery.js]
[test_identity_manager.js]
[test_keys.js]
[test_records_crypto.js]
[test_records_wbo.js]
# Engine APIs.
[test_engine.js]