(DO NOT REVIEW) - Change getBatched() to return records directly instead of using a callback.
draft
(DO NOT REVIEW) - Change getBatched() to return records directly instead of using a callback.
MozReview-Commit-ID: HfwPe8jSH66
--- a/services/sync/modules/bookmark_validator.js
+++ b/services/sync/modules/bookmark_validator.js
@@ -818,26 +818,24 @@ class BookmarkValidator {
}
async _getServerState(engine) {
// XXXXX - todo - we need to capture last-modified of the server here and
// ensure the repairer only applys with if-unmodified-since that date.
let collection = engine.itemSource();
let collectionKey = engine.service.collectionKeys.keyForCollection(engine.name);
collection.full = true;
- let items = [];
- collection.recordHandler = function(item) {
- item.decrypt(collectionKey);
- items.push(item.cleartext);
- };
- let resp = await collection.getBatched();
- if (!resp.success) {
- throw resp;
+ let result = await collection.getBatched();
+ if (!result.response.success) {
+ throw result.response;
}
- return items;
+ return result.records.map(record => {
+ record.decrypt(collectionKey);
+ return record.cleartext;
+ });
}
async validate(engine) {
let start = Date.now();
let clientTree = await PlacesUtils.promiseBookmarksTree("", {
includeItemIds: true
});
let serverState = await this._getServerState(engine);
--- a/services/sync/modules/collection_validator.js
+++ b/services/sync/modules/collection_validator.js
@@ -64,26 +64,24 @@ class CollectionValidator {
emptyProblemData() {
return new CollectionProblemData();
}
async getServerItems(engine) {
let collection = engine.itemSource();
let collectionKey = engine.service.collectionKeys.keyForCollection(engine.name);
collection.full = true;
- let items = [];
- collection.recordHandler = function(item) {
- item.decrypt(collectionKey);
- items.push(item.cleartext);
- };
- let resp = await collection.getBatched();
- if (!resp.success) {
- throw resp;
+ let result = await collection.getBatched();
+ if (!result.response.success) {
+ throw result.response;
}
- return items;
+ return result.records.map(record => {
+ record.decrypt(collectionKey);
+ return record.cleartext;
+ });
}
// Should return a promise that resolves to an array of client items.
getClientItems() {
return Promise.reject("Must implement");
}
/**
--- a/services/sync/modules/engines.js
+++ b/services/sync/modules/engines.js
@@ -1118,17 +1118,17 @@ SyncEngine.prototype = {
}
let key = this.service.collectionKeys.keyForCollection(this.name);
// Not binding this method to 'this' for performance reasons. It gets
// called for every incoming record.
let self = this;
- newitems.recordHandler = function(item) {
+ let recordHandler = function(item) {
if (aborting) {
return;
}
// Grab a later last modified if possible
if (self.lastModified == null || item.modified > self.lastModified)
self.lastModified = item.modified;
@@ -1226,22 +1226,26 @@ SyncEngine.prototype = {
if (applyBatch.length == self.applyIncomingBatchSize) {
doApplyBatch.call(self);
}
self._store._sleep(0);
};
// Only bother getting data from the server if there's new things
if (this.lastModified == null || this.lastModified > this.lastSync) {
- let resp = Async.promiseSpinningly(newitems.getBatched());
+ let { response, records } = Async.promiseSpinningly(newitems.getBatched());
+ if (!response.success) {
+ response.failureCode = ENGINE_DOWNLOAD_FAIL;
+ throw response;
+ }
+
+ for (let record of records) {
+ recordHandler(record);
+ }
doApplyBatchAndPersistFailed.call(this);
- if (!resp.success) {
- resp.failureCode = ENGINE_DOWNLOAD_FAIL;
- throw resp;
- }
if (aborting) {
throw aborting;
}
}
// Mobile: check if we got the maximum that we requested; get the rest if so.
if (handled.length == newitems.limit) {
@@ -1280,23 +1284,28 @@ SyncEngine.prototype = {
while (fetchBatch.length && !aborting) {
// Reuse the original query, but get rid of the restricting params
// and batch remaining records.
newitems.limit = 0;
newitems.newer = 0;
newitems.ids = fetchBatch.slice(0, batchSize);
- // Reuse the existing record handler set earlier
let resp = Async.promiseSpinningly(newitems.get());
if (!resp.success) {
resp.failureCode = ENGINE_DOWNLOAD_FAIL;
throw resp;
}
+ for (let json of resp.obj) {
+ let record = new this._recordObj();
+ record.deserialize(json);
+ recordHandler(record);
+ }
+
// This batch was successfully applied. Not using
// doApplyBatchAndPersistFailed() here to avoid writing toFetch twice.
fetchBatch = fetchBatch.slice(batchSize);
this.toFetch = Utils.arraySub(this.toFetch, newitems.ids);
this.previousFailed = Utils.arrayUnion(this.previousFailed, failed);
if (failed.length) {
count.failed += failed.length;
this._log.debug("Records that failed to apply: " + failed);
@@ -1810,25 +1819,25 @@ SyncEngine.prototype = {
// Fetch the most recently uploaded record and try to decrypt it
let test = new Collection(this.engineURL, this._recordObj, this.service);
test.limit = 1;
test.sort = "newest";
test.full = true;
let key = this.service.collectionKeys.keyForCollection(this.name);
- test.recordHandler = function recordHandler(record) {
- record.decrypt(key);
- canDecrypt = true;
- };
// Any failure fetching/decrypting will just result in false
try {
this._log.trace("Trying to decrypt a record from the server..");
- Async.promiseSpinningly(test.get());
+ let json = Async.promiseSpinningly(test.get()).obj[0];
+ let record = new this._recordObj();
+ record.deserialize(json);
+ record.decrypt(key);
+ canDecrypt = true;
} catch (ex) {
if (Async.isShutdownException(ex)) {
throw ex;
}
this._log.debug("Failed test decrypt", ex);
}
return canDecrypt;
--- a/services/sync/modules/record.js
+++ b/services/sync/modules/record.js
@@ -702,47 +702,49 @@ Collection.prototype = {
// deferring calling the record handler until we've gotten them all.
//
// Returns the last response processed, and doesn't run the record handler
// on any items if a non-success status is received while downloading the
// records (or if a network error occurs).
async getBatched(batchSize = DEFAULT_DOWNLOAD_BATCH_SIZE) {
let totalLimit = Number(this.limit) || Infinity;
if (batchSize <= 0 || batchSize >= totalLimit) {
- // Invalid batch sizes should arguably be an error, but they're easy to handle
- return this.get();
+ throw new Error("Invalid batch size");
}
if (!this.full) {
throw new Error("getBatched is unimplemented for guid-only GETs");
}
// _onComplete and _onProgress are reset after each `get` by AsyncResource.
- // We overwrite _onRecord to something that stores the data in an array
- // until the end.
- let { _onComplete, _onProgress, _onRecord } = this;
+ let { _onComplete, _onProgress } = this;
let recordBuffer = [];
let resp;
try {
- this._onRecord = r => recordBuffer.push(r);
let lastModifiedTime;
this.limit = batchSize;
do {
this._onProgress = _onProgress;
this._onComplete = _onComplete;
if (batchSize + recordBuffer.length > totalLimit) {
this.limit = totalLimit - recordBuffer.length;
}
this._log.trace("Performing batched GET", { limit: this.limit, offset: this.offset });
// Actually perform the request
resp = await this.get();
if (!resp.success) {
+ recordBuffer = [];
break;
}
+ for (let json of resp.obj) {
+ let record = new this._recordObj();
+ record.deserialize(json);
+ recordBuffer.push(record);
+ }
// Initialize last modified, or check that something broken isn't happening.
let lastModified = resp.headers["x-last-modified"];
if (!lastModifiedTime) {
lastModifiedTime = lastModified;
this.setHeader("X-If-Unmodified-Since", lastModified);
} else if (lastModified != lastModifiedTime) {
// Should be impossible -- We'd get a 412 in this case.
@@ -754,64 +756,22 @@ Collection.prototype = {
this.offset = resp.headers["x-weave-next-offset"];
} while (this.offset && totalLimit > recordBuffer.length);
} finally {
// Ensure we undo any temporary state so that subsequent calls to get()
// or getBatched() work properly. We do this before calling the record
// handler so that we can more convincingly pretend to be a normal get()
// call. Note: we're resetting these to the values they had before this
// function was called.
- this._onRecord = _onRecord;
this._limit = totalLimit;
this._offset = null;
delete this._headers["x-if-unmodified-since"];
this._rebuildURL();
}
- if (resp.success && Async.checkAppReady()) {
- // call the original _onRecord (e.g. the user supplied record handler)
- // for each record we've stored
- for (let record of recordBuffer) {
- this._onRecord(record);
- }
- }
- return resp;
- },
-
- set recordHandler(onRecord) {
- // Save this because onProgress is called with this as the ChannelListener
- let coll = this;
-
- // Switch to newline separated records for incremental parsing
- coll.setHeader("Accept", "application/newlines");
-
- this._onRecord = onRecord;
-
- this._onProgress = function(httpChannel) {
- let newline, length = 0, contentLength = "unknown";
-
- try {
- // Content-Length of the value of this response header
- contentLength = httpChannel.getResponseHeader("Content-Length");
- } catch (ex) { }
-
- while ((newline = this._data.indexOf("\n")) > 0) {
- // Split the json record from the rest of the data
- let json = this._data.slice(0, newline);
- this._data = this._data.slice(newline + 1);
-
- length += json.length;
- coll._log.trace("Record: Content-Length = " + contentLength +
- ", ByteCount = " + length);
-
- // Deserialize a record from json and give it to the callback
- let record = new coll._recordObj();
- record.deserialize(json);
- coll._onRecord(record);
- }
- };
+ return { response: resp, records: recordBuffer };
},
// This object only supports posting via the postQueue object.
post() {
throw new Error("Don't directly post to a collection - use newPostQueue instead");
},
newPostQueue(log, timestamp, postCallback) {
--- a/services/sync/tests/unit/head_http_server.js
+++ b/services/sync/tests/unit/head_http_server.js
@@ -291,17 +291,17 @@ ServerCollection.prototype = {
for (let wbo of Object.values(this._wbos)) {
if (wbo.modified && this._inResultSet(wbo, options)) {
c++;
}
}
return c;
},
- get(options) {
+ get(options, request) {
let result;
if (options.full) {
let data = [];
for (let wbo of Object.values(this._wbos)) {
// Drop deleted.
if (wbo.modified && this._inResultSet(wbo, options)) {
data.push(wbo.get());
}
@@ -312,18 +312,23 @@ ServerCollection.prototype = {
data = data.slice(start, start + options.limit);
// use options as a backchannel to set x-weave-next-offset
if (numItemsPastOffset > options.limit) {
options.nextOffset = start + options.limit;
}
} else if (start) {
data = data.slice(start);
}
- // Our implementation of application/newlines.
- result = data.join("\n") + "\n";
+
+ if (request && request.getHeader("accept") == "application/newlines") {
+ this._log.error("Error: client requesting application/newlines content");
+ throw new Error("This server should not serve application/newlines content");
+ } else {
+ result = JSON.stringify(data);
+ }
// Use options as a backchannel to report count.
options.recordCount = data.length;
} else {
let data = [];
for (let [id, wbo] of Object.entries(this._wbos)) {
if (this._inResultSet(wbo, options)) {
data.push(id);
--- a/services/sync/tests/unit/test_bookmark_duping.js
+++ b/services/sync/tests/unit/test_bookmark_duping.js
@@ -66,17 +66,17 @@ async function createBookmark(parentId,
let bookmark = await bms.insert({ parentGuid, url, index, title });
let id = await PlacesUtils.promiseItemId(bookmark.guid);
return { id, guid: bookmark.guid };
}
function getServerRecord(collection, id) {
let wbo = collection.get({ full: true, ids: [id] });
// Whew - lots of json strings inside strings.
- return JSON.parse(JSON.parse(JSON.parse(wbo).payload).ciphertext);
+ return JSON.parse(JSON.parse(JSON.parse(JSON.parse(wbo)[0]).payload).ciphertext);
}
async function promiseNoLocalItem(guid) {
// Check there's no item with the specified guid.
let got = await bms.fetch({ guid });
ok(!got, `No record remains with GUID ${guid}`);
// and while we are here ensure the places cache doesn't still have it.
await Assert.rejects(PlacesUtils.promiseItemId(guid));
--- a/services/sync/tests/unit/test_bookmark_engine.js
+++ b/services/sync/tests/unit/test_bookmark_engine.js
@@ -530,33 +530,19 @@ add_task(async function test_misreconcil
title: "Bookmarks Toolbar",
description: "Now you're for it.",
parentName: "",
parentid: "mobile", // Why not?
children: [],
};
let rec = new FakeRecord(BookmarkFolder, to_apply);
- let encrypted = encryptPayload(rec.cleartext);
- encrypted.decrypt = function() {
- for (let x in rec) {
- encrypted[x] = rec[x];
- }
- };
_("Applying record.");
- engine._processIncoming({
- getBatched() {
- return this.get();
- },
- async get() {
- this.recordHandler(encrypted);
- return {success: true}
- },
- });
+ store.applyIncoming(rec);
// Ensure that afterwards, toolbar is still there.
// As of 2012-12-05, this only passes because Places doesn't use "toolbar" as
// the real GUID, instead using a generated one. Sync does the translation.
let toolbarAfter = store.createRecord("toolbar", "bookmarks");
let parentGUIDAfter = toolbarAfter.parentid;
let parentIDAfter = store.idForGUID(parentGUIDAfter);
do_check_eq(store.GUIDForId(toolbarIDBefore), "toolbar");
--- a/services/sync/tests/unit/test_collection_getBatched.js
+++ b/services/sync/tests/unit/test_collection_getBatched.js
@@ -10,48 +10,45 @@ function run_test() {
run_next_test();
}
function recordRange(lim, offset, total) {
let res = [];
for (let i = offset; i < Math.min(lim + offset, total); ++i) {
res.push(JSON.stringify({ id: String(i), payload: "test:" + i }));
}
- return res.join("\n") + "\n";
+ return res;
}
function get_test_collection_info({ totalRecords, batchSize, lastModified,
throwAfter = Infinity,
interruptedAfter = Infinity }) {
let coll = new Collection("http://example.com/test/", WBORecord, Service);
coll.full = true;
let requests = [];
let responses = [];
- let sawRecord = false;
coll.get = async function() {
- ok(!sawRecord); // make sure we call record handler after all requests.
let limit = +this.limit;
let offset = 0;
if (this.offset) {
equal(this.offset.slice(0, 6), "foobar");
offset = +this.offset.slice(6);
}
requests.push({
limit,
offset,
spec: this.spec,
headers: Object.assign({}, this.headers)
});
if (--throwAfter === 0) {
throw "Some Network Error";
}
let body = recordRange(limit, offset, totalRecords);
- this._onProgress.call({ _data: body });
let response = {
- body,
+ obj: body,
success: true,
status: 200,
headers: {}
};
if (--interruptedAfter === 0) {
response.success = false;
response.status = 412;
response.body = "";
@@ -59,43 +56,34 @@ function get_test_collection_info({ tota
// Ensure we're treating this as an opaque string, since the docs say
// it might not be numeric.
response.headers["x-weave-next-offset"] = "foobar" + (offset + batchSize);
}
response.headers["x-last-modified"] = lastModified;
responses.push(response);
return response;
};
-
- let records = [];
- coll.recordHandler = function(record) {
- sawRecord = true;
- // ensure records are coming in in the right order
- equal(record.id, String(records.length));
- equal(record.payload, "test:" + records.length);
- records.push(record);
- };
- return { records, responses, requests, coll };
+ return { responses, requests, coll };
}
add_task(async function test_success() {
const totalRecords = 11;
const batchSize = 2;
const lastModified = "111111";
- let { records, responses, requests, coll } = get_test_collection_info({
+ let { responses, requests, coll } = get_test_collection_info({
totalRecords,
batchSize,
lastModified,
});
- let response = await coll.getBatched(batchSize);
+ let { response, records } = await coll.getBatched(batchSize);
equal(requests.length, Math.ceil(totalRecords / batchSize));
- // records are mostly checked in recordHandler, we just care about the length
equal(records.length, totalRecords);
+ checkRecordsOrder(records);
// ensure we're returning the last response
equal(responses[responses.length - 1], response);
// check first separately since its a bit of a special case
ok(!requests[0].headers["x-if-unmodified-since"]);
ok(!requests[0].offset);
equal(requests[0].limit, batchSize);
@@ -119,23 +107,24 @@ add_task(async function test_success() {
});
add_task(async function test_total_limit() {
_("getBatched respects the (initial) value of the limit property");
const totalRecords = 100;
const recordLimit = 11;
const batchSize = 2;
const lastModified = "111111";
- let { records, requests, coll } = get_test_collection_info({
+ let { requests, coll } = get_test_collection_info({
totalRecords,
batchSize,
lastModified,
});
coll.limit = recordLimit;
- await coll.getBatched(batchSize);
+ let { records } = await coll.getBatched(batchSize);
+ checkRecordsOrder(records);
equal(requests.length, Math.ceil(recordLimit / batchSize));
equal(records.length, recordLimit);
for (let i = 0; i < requests.length; ++i) {
let req = requests[i];
if (i !== requests.length - 1) {
equal(req.limit, batchSize);
@@ -147,43 +136,50 @@ add_task(async function test_total_limit
equal(coll._limit, recordLimit);
});
add_task(async function test_412() {
_("We shouldn't record records if we get a 412 in the middle of a batch");
const totalRecords = 11;
const batchSize = 2;
const lastModified = "111111";
- let { records, responses, requests, coll } = get_test_collection_info({
+ let { responses, requests, coll } = get_test_collection_info({
totalRecords,
batchSize,
lastModified,
interruptedAfter: 3
});
- let response = await coll.getBatched(batchSize);
+ let { response, records } = await coll.getBatched(batchSize);
equal(requests.length, 3);
- equal(records.length, 0); // record handler shouldn't be called for anything
+ equal(records.length, 0); // we should not get any records
// ensure we're returning the last response
equal(responses[responses.length - 1], response);
ok(!response.success);
equal(response.status, 412);
});
add_task(async function test_get_throws() {
- _("We shouldn't record records if get() throws for some reason");
+ _("getBatched() should throw if a get() throws");
const totalRecords = 11;
const batchSize = 2;
const lastModified = "111111";
- let { records, requests, coll } = get_test_collection_info({
+ let { requests, coll } = get_test_collection_info({
totalRecords,
batchSize,
lastModified,
throwAfter: 3
});
- await Assert.rejects(coll.getBatched(batchSize), "Some Network Error");
+ await Assert.rejects(coll.getBatched(batchSize), /Some Network Error/);
equal(requests.length, 3);
- equal(records.length, 0);
});
+
+function checkRecordsOrder(records) {
+ ok(records.length > 0)
+ for (let i = 0; i < records.length; i++) {
+ equal(records[i].id, String(i));
+ equal(records[i].payload, "test:" + i);
+ }
+}
deleted file mode 100644
--- a/services/sync/tests/unit/test_collection_inc_get.js
+++ /dev/null
@@ -1,188 +0,0 @@
-/* Any copyright is dedicated to the Public Domain.
- http://creativecommons.org/publicdomain/zero/1.0/ */
-
-_("Make sure Collection can correctly incrementally parse GET requests");
-Cu.import("resource://services-sync/record.js");
-Cu.import("resource://services-sync/service.js");
-
-function run_test() {
- let base = "http://fake/";
- let coll = new Collection("http://fake/uri/", WBORecord, Service);
- let stream = { _data: "" };
- let called, recCount, sum;
-
- _("Not-JSON, string payloads are strings");
- called = false;
- stream._data = '{"id":"hello","payload":"world"}\n';
- coll.recordHandler = function(rec) {
- called = true;
- _("Got record:", JSON.stringify(rec));
- rec.collection = "uri"; // This would be done by an engine, so do it here.
- do_check_eq(rec.collection, "uri");
- do_check_eq(rec.id, "hello");
- do_check_eq(rec.uri(base).spec, "http://fake/uri/hello");
- do_check_eq(rec.payload, "world");
- };
- coll._onProgress.call(stream);
- do_check_eq(stream._data, "");
- do_check_true(called);
- _("\n");
-
-
- _("Parse record with payload");
- called = false;
- stream._data = '{"payload":"{\\"value\\":123}"}\n';
- coll.recordHandler = function(rec) {
- called = true;
- _("Got record:", JSON.stringify(rec));
- do_check_eq(rec.payload.value, 123);
- };
- coll._onProgress.call(stream);
- do_check_eq(stream._data, "");
- do_check_true(called);
- _("\n");
-
-
- _("Parse multiple records in one go");
- called = false;
- recCount = 0;
- sum = 0;
- stream._data = '{"id":"hundred","payload":"{\\"value\\":100}"}\n{"id":"ten","payload":"{\\"value\\":10}"}\n{"id":"one","payload":"{\\"value\\":1}"}\n';
- coll.recordHandler = function(rec) {
- called = true;
- _("Got record:", JSON.stringify(rec));
- recCount++;
- sum += rec.payload.value;
- _("Incremental status: count", recCount, "sum", sum);
- rec.collection = "uri";
- switch (recCount) {
- case 1:
- do_check_eq(rec.id, "hundred");
- do_check_eq(rec.uri(base).spec, "http://fake/uri/hundred");
- do_check_eq(rec.payload.value, 100);
- do_check_eq(sum, 100);
- break;
- case 2:
- do_check_eq(rec.id, "ten");
- do_check_eq(rec.uri(base).spec, "http://fake/uri/ten");
- do_check_eq(rec.payload.value, 10);
- do_check_eq(sum, 110);
- break;
- case 3:
- do_check_eq(rec.id, "one");
- do_check_eq(rec.uri(base).spec, "http://fake/uri/one");
- do_check_eq(rec.payload.value, 1);
- do_check_eq(sum, 111);
- break;
- default:
- do_throw("unexpected number of record counts", recCount);
- break;
- }
- };
- coll._onProgress.call(stream);
- do_check_eq(recCount, 3);
- do_check_eq(sum, 111);
- do_check_eq(stream._data, "");
- do_check_true(called);
- _("\n");
-
-
- _("Handle incremental data incoming");
- called = false;
- recCount = 0;
- sum = 0;
- stream._data = '{"payl';
- coll.recordHandler = function(rec) {
- called = true;
- do_throw("shouldn't have gotten a record..");
- };
- coll._onProgress.call(stream);
- _("shouldn't have gotten anything yet");
- do_check_eq(recCount, 0);
- do_check_eq(sum, 0);
- _("leading array bracket should have been trimmed");
- do_check_eq(stream._data, '{"payl');
- do_check_false(called);
- _();
-
- _("adding more data enough for one record..");
- called = false;
- stream._data += 'oad":"{\\"value\\":100}"}\n';
- coll.recordHandler = function(rec) {
- called = true;
- _("Got record:", JSON.stringify(rec));
- recCount++;
- sum += rec.payload.value;
- };
- coll._onProgress.call(stream);
- _("should have 1 record with sum 100");
- do_check_eq(recCount, 1);
- do_check_eq(sum, 100);
- _("all data should have been consumed including trailing comma");
- do_check_eq(stream._data, "");
- do_check_true(called);
- _();
-
- _("adding more data..");
- called = false;
- stream._data += '{"payload":"{\\"value\\":10}"';
- coll.recordHandler = function(rec) {
- called = true;
- do_throw("shouldn't have gotten a record..");
- };
- coll._onProgress.call(stream);
- _("should still have 1 record with sum 100");
- do_check_eq(recCount, 1);
- do_check_eq(sum, 100);
- _("should almost have a record");
- do_check_eq(stream._data, '{"payload":"{\\"value\\":10}"');
- do_check_false(called);
- _();
-
- _("add data for two records..");
- called = false;
- stream._data += '}\n{"payload":"{\\"value\\":1}"}\n';
- coll.recordHandler = function(rec) {
- called = true;
- _("Got record:", JSON.stringify(rec));
- recCount++;
- sum += rec.payload.value;
- switch (recCount) {
- case 2:
- do_check_eq(rec.payload.value, 10);
- do_check_eq(sum, 110);
- break;
- case 3:
- do_check_eq(rec.payload.value, 1);
- do_check_eq(sum, 111);
- break;
- default:
- do_throw("unexpected number of record counts", recCount);
- break;
- }
- };
- coll._onProgress.call(stream);
- _("should have gotten all 3 records with sum 111");
- do_check_eq(recCount, 3);
- do_check_eq(sum, 111);
- _("should have consumed all data");
- do_check_eq(stream._data, "");
- do_check_true(called);
- _();
-
- _("add no extra data");
- called = false;
- stream._data += "";
- coll.recordHandler = function(rec) {
- called = true;
- do_throw("shouldn't have gotten a record..");
- };
- coll._onProgress.call(stream);
- _("should still have 3 records with sum 111");
- do_check_eq(recCount, 3);
- do_check_eq(sum, 111);
- _("should have consumed nothing but still have nothing");
- do_check_eq(stream._data, "");
- do_check_false(called);
- _("\n");
-}
--- a/services/sync/tests/unit/xpcshell.ini
+++ b/services/sync/tests/unit/xpcshell.ini
@@ -44,17 +44,16 @@ tags = addons
[test_resource.js]
[test_resource_async.js]
[test_resource_header.js]
[test_resource_ua.js]
[test_syncstoragerequest.js]
# Generic Sync types.
[test_browserid_identity.js]
-[test_collection_inc_get.js]
[test_collection_getBatched.js]
[test_collections_recovery.js]
[test_keys.js]
[test_records_crypto.js]
[test_records_wbo.js]
# Engine APIs.
[test_engine.js]
--- a/services/sync/tps/extensions/tps/resource/tps.jsm
+++ b/services/sync/tps/extensions/tps/resource/tps.jsm
@@ -610,36 +610,38 @@ var TPS = {
}
},
/**
* Use Sync's bookmark validation code to see if we've corrupted the tree.
*/
ValidateBookmarks() {
- let getServerBookmarkState = () => {
+ let getServerBookmarkState = async () => {
let bookmarkEngine = Weave.Service.engineManager.get("bookmarks");
let collection = bookmarkEngine.itemSource();
let collectionKey = bookmarkEngine.service.collectionKeys.keyForCollection(bookmarkEngine.name);
collection.full = true;
let items = [];
- collection.recordHandler = function(item) {
- item.decrypt(collectionKey);
- items.push(item.cleartext);
- };
- Async.promiseSpinningly(collection.get());
+ let resp = await collection.get();
+ for (let json of resp.obj) {
+ let record = new collection._recordObj();
+ record.deserialize(json);
+ record.decrypt(collectionKey);
+ items.push(record.cleartext);
+ }
return items;
};
let serverRecordDumpStr;
try {
Logger.logInfo("About to perform bookmark validation");
let clientTree = Async.promiseSpinningly(PlacesUtils.promiseBookmarksTree("", {
includeItemIds: true
}));
- let serverRecords = getServerBookmarkState();
+ let serverRecords = Async.promiseSpinningly(getServerBookmarkState());
// We can't wait until catch to stringify this, since at that point it will have cycles.
serverRecordDumpStr = JSON.stringify(serverRecords);
let validator = new BookmarkValidator();
let {problemData} = Async.promiseSpinningly(validator.compareServerWithClient(serverRecords, clientTree));
for (let {name, count} of problemData.getSummary()) {
// Exclude mobile showing up on the server hackily so that we don't