(DO NOT REVIEW) - Change getBatched() to return records directly instead of using a callback. draft
authorEdouard Oger <eoger@fastmail.com>
Tue, 30 May 2017 13:23:57 -0400
changeset 591760 87a28a5a493bd1369c9619da5eebe0bfbef40507
parent 591727 eca8d0ea03af1d2424550a037f714f14c0f7b1be
child 591761 d5663590e9f326489c086265732dcda2725b0f04
child 591774 9425f59de4092dc792ea46e357976e4a3c3ea070
push id63170
push userbmo:eoger@fastmail.com
push dateFri, 09 Jun 2017 15:24:16 +0000
milestone55.0a1
(DO NOT REVIEW) - Change getBatched() to return records directly instead of using a callback. MozReview-Commit-ID: HfwPe8jSH66
services/sync/modules/bookmark_validator.js
services/sync/modules/collection_validator.js
services/sync/modules/engines.js
services/sync/modules/record.js
services/sync/tests/unit/head_http_server.js
services/sync/tests/unit/test_bookmark_duping.js
services/sync/tests/unit/test_bookmark_engine.js
services/sync/tests/unit/test_collection_getBatched.js
services/sync/tests/unit/test_collection_inc_get.js
services/sync/tests/unit/xpcshell.ini
services/sync/tps/extensions/tps/resource/tps.jsm
--- a/services/sync/modules/bookmark_validator.js
+++ b/services/sync/modules/bookmark_validator.js
@@ -818,26 +818,24 @@ class BookmarkValidator {
   }
 
   async _getServerState(engine) {
 // XXXXX - todo - we need to capture last-modified of the server here and
 // ensure the repairer only applys with if-unmodified-since that date.
     let collection = engine.itemSource();
     let collectionKey = engine.service.collectionKeys.keyForCollection(engine.name);
     collection.full = true;
-    let items = [];
-    collection.recordHandler = function(item) {
-      item.decrypt(collectionKey);
-      items.push(item.cleartext);
-    };
-    let resp = await collection.getBatched();
-    if (!resp.success) {
-      throw resp;
+    let result = await collection.getBatched();
+    if (!result.response.success) {
+      throw result.response;
     }
-    return items;
+    return result.records.map(record => {
+      record.decrypt(collectionKey);
+      return record.cleartext;
+    });
   }
 
   async validate(engine) {
     let start = Date.now();
     let clientTree = await PlacesUtils.promiseBookmarksTree("", {
       includeItemIds: true
     });
     let serverState = await this._getServerState(engine);
--- a/services/sync/modules/collection_validator.js
+++ b/services/sync/modules/collection_validator.js
@@ -64,26 +64,24 @@ class CollectionValidator {
   emptyProblemData() {
     return new CollectionProblemData();
   }
 
   async getServerItems(engine) {
     let collection = engine.itemSource();
     let collectionKey = engine.service.collectionKeys.keyForCollection(engine.name);
     collection.full = true;
-    let items = [];
-    collection.recordHandler = function(item) {
-      item.decrypt(collectionKey);
-      items.push(item.cleartext);
-    };
-    let resp = await collection.getBatched();
-    if (!resp.success) {
-      throw resp;
+    let result = await collection.getBatched();
+    if (!result.response.success) {
+      throw result.response;
     }
-    return items;
+    return result.records.map(record => {
+      record.decrypt(collectionKey);
+      return record.cleartext;
+    });
   }
 
   // Should return a promise that resolves to an array of client items.
   getClientItems() {
     return Promise.reject("Must implement");
   }
 
   /**
--- a/services/sync/modules/engines.js
+++ b/services/sync/modules/engines.js
@@ -1118,17 +1118,17 @@ SyncEngine.prototype = {
     }
 
     let key = this.service.collectionKeys.keyForCollection(this.name);
 
     // Not binding this method to 'this' for performance reasons. It gets
     // called for every incoming record.
     let self = this;
 
-    newitems.recordHandler = function(item) {
+    let recordHandler = function(item) {
       if (aborting) {
         return;
       }
 
       // Grab a later last modified if possible
       if (self.lastModified == null || item.modified > self.lastModified)
         self.lastModified = item.modified;
 
@@ -1226,22 +1226,26 @@ SyncEngine.prototype = {
       if (applyBatch.length == self.applyIncomingBatchSize) {
         doApplyBatch.call(self);
       }
       self._store._sleep(0);
     };
 
     // Only bother getting data from the server if there's new things
     if (this.lastModified == null || this.lastModified > this.lastSync) {
-      let resp = Async.promiseSpinningly(newitems.getBatched());
+      let { response, records } = Async.promiseSpinningly(newitems.getBatched());
+      if (!response.success) {
+        response.failureCode = ENGINE_DOWNLOAD_FAIL;
+        throw response;
+      }
+
+      for (let record of records) {
+        recordHandler(record);
+      }
       doApplyBatchAndPersistFailed.call(this);
-      if (!resp.success) {
-        resp.failureCode = ENGINE_DOWNLOAD_FAIL;
-        throw resp;
-      }
 
       if (aborting) {
         throw aborting;
       }
     }
 
     // Mobile: check if we got the maximum that we requested; get the rest if so.
     if (handled.length == newitems.limit) {
@@ -1280,23 +1284,28 @@ SyncEngine.prototype = {
 
     while (fetchBatch.length && !aborting) {
       // Reuse the original query, but get rid of the restricting params
       // and batch remaining records.
       newitems.limit = 0;
       newitems.newer = 0;
       newitems.ids = fetchBatch.slice(0, batchSize);
 
-      // Reuse the existing record handler set earlier
       let resp = Async.promiseSpinningly(newitems.get());
       if (!resp.success) {
         resp.failureCode = ENGINE_DOWNLOAD_FAIL;
         throw resp;
       }
 
+      for (let json of resp.obj) {
+        let record = new this._recordObj();
+        record.deserialize(json);
+        recordHandler(record);
+      }
+
       // This batch was successfully applied. Not using
       // doApplyBatchAndPersistFailed() here to avoid writing toFetch twice.
       fetchBatch = fetchBatch.slice(batchSize);
       this.toFetch = Utils.arraySub(this.toFetch, newitems.ids);
       this.previousFailed = Utils.arrayUnion(this.previousFailed, failed);
       if (failed.length) {
         count.failed += failed.length;
         this._log.debug("Records that failed to apply: " + failed);
@@ -1810,25 +1819,25 @@ SyncEngine.prototype = {
 
     // Fetch the most recently uploaded record and try to decrypt it
     let test = new Collection(this.engineURL, this._recordObj, this.service);
     test.limit = 1;
     test.sort = "newest";
     test.full = true;
 
     let key = this.service.collectionKeys.keyForCollection(this.name);
-    test.recordHandler = function recordHandler(record) {
-      record.decrypt(key);
-      canDecrypt = true;
-    };
 
     // Any failure fetching/decrypting will just result in false
     try {
       this._log.trace("Trying to decrypt a record from the server..");
-      Async.promiseSpinningly(test.get());
+      let json = Async.promiseSpinningly(test.get()).obj[0];
+      let record = new this._recordObj();
+      record.deserialize(json);
+      record.decrypt(key);
+      canDecrypt = true;
     } catch (ex) {
       if (Async.isShutdownException(ex)) {
         throw ex;
       }
       this._log.debug("Failed test decrypt", ex);
     }
 
     return canDecrypt;
--- a/services/sync/modules/record.js
+++ b/services/sync/modules/record.js
@@ -702,47 +702,49 @@ Collection.prototype = {
   // deferring calling the record handler until we've gotten them all.
   //
   // Returns the last response processed, and doesn't run the record handler
   // on any items if a non-success status is received while downloading the
   // records (or if a network error occurs).
   async getBatched(batchSize = DEFAULT_DOWNLOAD_BATCH_SIZE) {
     let totalLimit = Number(this.limit) || Infinity;
     if (batchSize <= 0 || batchSize >= totalLimit) {
-      // Invalid batch sizes should arguably be an error, but they're easy to handle
-      return this.get();
+      throw new Error("Invalid batch size");
     }
 
     if (!this.full) {
       throw new Error("getBatched is unimplemented for guid-only GETs");
     }
 
     // _onComplete and _onProgress are reset after each `get` by AsyncResource.
-    // We overwrite _onRecord to something that stores the data in an array
-    // until the end.
-    let { _onComplete, _onProgress, _onRecord } = this;
+    let { _onComplete, _onProgress } = this;
     let recordBuffer = [];
     let resp;
     try {
-      this._onRecord = r => recordBuffer.push(r);
       let lastModifiedTime;
       this.limit = batchSize;
 
       do {
         this._onProgress = _onProgress;
         this._onComplete = _onComplete;
         if (batchSize + recordBuffer.length > totalLimit) {
           this.limit = totalLimit - recordBuffer.length;
         }
         this._log.trace("Performing batched GET", { limit: this.limit, offset: this.offset });
         // Actually perform the request
         resp = await this.get();
         if (!resp.success) {
+          recordBuffer = [];
           break;
         }
+        for (let json of resp.obj) {
+          let record = new this._recordObj();
+          record.deserialize(json);
+          recordBuffer.push(record);
+        }
 
         // Initialize last modified, or check that something broken isn't happening.
         let lastModified = resp.headers["x-last-modified"];
         if (!lastModifiedTime) {
           lastModifiedTime = lastModified;
           this.setHeader("X-If-Unmodified-Since", lastModified);
         } else if (lastModified != lastModifiedTime) {
           // Should be impossible -- We'd get a 412 in this case.
@@ -754,64 +756,22 @@ Collection.prototype = {
         this.offset = resp.headers["x-weave-next-offset"];
       } while (this.offset && totalLimit > recordBuffer.length);
     } finally {
       // Ensure we undo any temporary state so that subsequent calls to get()
       // or getBatched() work properly. We do this before calling the record
       // handler so that we can more convincingly pretend to be a normal get()
       // call. Note: we're resetting these to the values they had before this
       // function was called.
-      this._onRecord = _onRecord;
       this._limit = totalLimit;
       this._offset = null;
       delete this._headers["x-if-unmodified-since"];
       this._rebuildURL();
     }
-    if (resp.success && Async.checkAppReady()) {
-      // call the original _onRecord (e.g. the user supplied record handler)
-      // for each record we've stored
-      for (let record of recordBuffer) {
-        this._onRecord(record);
-      }
-    }
-    return resp;
-  },
-
-  set recordHandler(onRecord) {
-    // Save this because onProgress is called with this as the ChannelListener
-    let coll = this;
-
-    // Switch to newline separated records for incremental parsing
-    coll.setHeader("Accept", "application/newlines");
-
-    this._onRecord = onRecord;
-
-    this._onProgress = function(httpChannel) {
-      let newline, length = 0, contentLength = "unknown";
-
-      try {
-          // Content-Length of the value of this response header
-          contentLength = httpChannel.getResponseHeader("Content-Length");
-      } catch (ex) { }
-
-      while ((newline = this._data.indexOf("\n")) > 0) {
-        // Split the json record from the rest of the data
-        let json = this._data.slice(0, newline);
-        this._data = this._data.slice(newline + 1);
-
-        length += json.length;
-        coll._log.trace("Record: Content-Length = " + contentLength +
-                        ", ByteCount = " + length);
-
-        // Deserialize a record from json and give it to the callback
-        let record = new coll._recordObj();
-        record.deserialize(json);
-        coll._onRecord(record);
-      }
-    };
+    return { response: resp, records: recordBuffer };
   },
 
   // This object only supports posting via the postQueue object.
   post() {
     throw new Error("Don't directly post to a collection - use newPostQueue instead");
   },
 
   newPostQueue(log, timestamp, postCallback) {
--- a/services/sync/tests/unit/head_http_server.js
+++ b/services/sync/tests/unit/head_http_server.js
@@ -291,17 +291,17 @@ ServerCollection.prototype = {
     for (let wbo of Object.values(this._wbos)) {
       if (wbo.modified && this._inResultSet(wbo, options)) {
         c++;
       }
     }
     return c;
   },
 
-  get(options) {
+  get(options, request) {
     let result;
     if (options.full) {
       let data = [];
       for (let wbo of Object.values(this._wbos)) {
         // Drop deleted.
         if (wbo.modified && this._inResultSet(wbo, options)) {
           data.push(wbo.get());
         }
@@ -312,18 +312,23 @@ ServerCollection.prototype = {
         data = data.slice(start, start + options.limit);
         // use options as a backchannel to set x-weave-next-offset
         if (numItemsPastOffset > options.limit) {
           options.nextOffset = start + options.limit;
         }
       } else if (start) {
         data = data.slice(start);
       }
-      // Our implementation of application/newlines.
-      result = data.join("\n") + "\n";
+
+      if (request && request.getHeader("accept") == "application/newlines") {
+        this._log.error("Error: client requesting application/newlines content");
+        throw new Error("This server should not serve application/newlines content");
+      } else {
+        result = JSON.stringify(data);
+      }
 
       // Use options as a backchannel to report count.
       options.recordCount = data.length;
     } else {
       let data = [];
       for (let [id, wbo] of Object.entries(this._wbos)) {
         if (this._inResultSet(wbo, options)) {
           data.push(id);
--- a/services/sync/tests/unit/test_bookmark_duping.js
+++ b/services/sync/tests/unit/test_bookmark_duping.js
@@ -66,17 +66,17 @@ async function createBookmark(parentId, 
   let bookmark = await bms.insert({ parentGuid, url, index, title });
   let id = await PlacesUtils.promiseItemId(bookmark.guid);
   return { id, guid: bookmark.guid };
 }
 
 function getServerRecord(collection, id) {
   let wbo = collection.get({ full: true, ids: [id] });
   // Whew - lots of json strings inside strings.
-  return JSON.parse(JSON.parse(JSON.parse(wbo).payload).ciphertext);
+  return JSON.parse(JSON.parse(JSON.parse(JSON.parse(wbo)[0]).payload).ciphertext);
 }
 
 async function promiseNoLocalItem(guid) {
   // Check there's no item with the specified guid.
   let got = await bms.fetch({ guid });
   ok(!got, `No record remains with GUID ${guid}`);
   // and while we are here ensure the places cache doesn't still have it.
   await Assert.rejects(PlacesUtils.promiseItemId(guid));
--- a/services/sync/tests/unit/test_bookmark_engine.js
+++ b/services/sync/tests/unit/test_bookmark_engine.js
@@ -530,33 +530,19 @@ add_task(async function test_misreconcil
     title: "Bookmarks Toolbar",
     description: "Now you're for it.",
     parentName: "",
     parentid: "mobile",   // Why not?
     children: [],
   };
 
   let rec = new FakeRecord(BookmarkFolder, to_apply);
-  let encrypted = encryptPayload(rec.cleartext);
-  encrypted.decrypt = function() {
-    for (let x in rec) {
-      encrypted[x] = rec[x];
-    }
-  };
 
   _("Applying record.");
-  engine._processIncoming({
-    getBatched() {
-      return this.get();
-    },
-    async get() {
-      this.recordHandler(encrypted);
-      return {success: true}
-    },
-  });
+  store.applyIncoming(rec);
 
   // Ensure that afterwards, toolbar is still there.
   // As of 2012-12-05, this only passes because Places doesn't use "toolbar" as
   // the real GUID, instead using a generated one. Sync does the translation.
   let toolbarAfter = store.createRecord("toolbar", "bookmarks");
   let parentGUIDAfter = toolbarAfter.parentid;
   let parentIDAfter = store.idForGUID(parentGUIDAfter);
   do_check_eq(store.GUIDForId(toolbarIDBefore), "toolbar");
--- a/services/sync/tests/unit/test_collection_getBatched.js
+++ b/services/sync/tests/unit/test_collection_getBatched.js
@@ -10,48 +10,45 @@ function run_test() {
   run_next_test();
 }
 
 function recordRange(lim, offset, total) {
   let res = [];
   for (let i = offset; i < Math.min(lim + offset, total); ++i) {
     res.push(JSON.stringify({ id: String(i), payload: "test:" + i }));
   }
-  return res.join("\n") + "\n";
+  return res;
 }
 
 function get_test_collection_info({ totalRecords, batchSize, lastModified,
                                     throwAfter = Infinity,
                                     interruptedAfter = Infinity }) {
   let coll = new Collection("http://example.com/test/", WBORecord, Service);
   coll.full = true;
   let requests = [];
   let responses = [];
-  let sawRecord = false;
   coll.get = async function() {
-    ok(!sawRecord); // make sure we call record handler after all requests.
     let limit = +this.limit;
     let offset = 0;
     if (this.offset) {
       equal(this.offset.slice(0, 6), "foobar");
       offset = +this.offset.slice(6);
     }
     requests.push({
       limit,
       offset,
       spec: this.spec,
       headers: Object.assign({}, this.headers)
     });
     if (--throwAfter === 0) {
       throw "Some Network Error";
     }
     let body = recordRange(limit, offset, totalRecords);
-    this._onProgress.call({ _data: body });
     let response = {
-      body,
+      obj: body,
       success: true,
       status: 200,
       headers: {}
     };
     if (--interruptedAfter === 0) {
       response.success = false;
       response.status = 412;
       response.body = "";
@@ -59,43 +56,34 @@ function get_test_collection_info({ tota
       // Ensure we're treating this as an opaque string, since the docs say
       // it might not be numeric.
       response.headers["x-weave-next-offset"] = "foobar" + (offset + batchSize);
     }
     response.headers["x-last-modified"] = lastModified;
     responses.push(response);
     return response;
   };
-
-  let records = [];
-  coll.recordHandler = function(record) {
-    sawRecord = true;
-    // ensure records are coming in in the right order
-    equal(record.id, String(records.length));
-    equal(record.payload, "test:" + records.length);
-    records.push(record);
-  };
-  return { records, responses, requests, coll };
+  return { responses, requests, coll };
 }
 
 add_task(async function test_success() {
   const totalRecords = 11;
   const batchSize = 2;
   const lastModified = "111111";
-  let { records, responses, requests, coll } = get_test_collection_info({
+  let { responses, requests, coll } = get_test_collection_info({
     totalRecords,
     batchSize,
     lastModified,
   });
-  let response = await coll.getBatched(batchSize);
+  let { response, records } = await coll.getBatched(batchSize);
 
   equal(requests.length, Math.ceil(totalRecords / batchSize));
 
-  // records are mostly checked in recordHandler, we just care about the length
   equal(records.length, totalRecords);
+  checkRecordsOrder(records);
 
   // ensure we're returning the last response
   equal(responses[responses.length - 1], response);
 
   // check first separately since its a bit of a special case
   ok(!requests[0].headers["x-if-unmodified-since"]);
   ok(!requests[0].offset);
   equal(requests[0].limit, batchSize);
@@ -119,23 +107,24 @@ add_task(async function test_success() {
 });
 
 add_task(async function test_total_limit() {
   _("getBatched respects the (initial) value of the limit property");
   const totalRecords = 100;
   const recordLimit = 11;
   const batchSize = 2;
   const lastModified = "111111";
-  let { records, requests, coll } = get_test_collection_info({
+  let { requests, coll } = get_test_collection_info({
     totalRecords,
     batchSize,
     lastModified,
   });
   coll.limit = recordLimit;
-  await coll.getBatched(batchSize);
+  let { records } = await coll.getBatched(batchSize);
+  checkRecordsOrder(records);
 
   equal(requests.length, Math.ceil(recordLimit / batchSize));
   equal(records.length, recordLimit);
 
   for (let i = 0; i < requests.length; ++i) {
     let req = requests[i];
     if (i !== requests.length - 1) {
       equal(req.limit, batchSize);
@@ -147,43 +136,50 @@ add_task(async function test_total_limit
   equal(coll._limit, recordLimit);
 });
 
 add_task(async function test_412() {
   _("We shouldn't record records if we get a 412 in the middle of a batch");
   const totalRecords = 11;
   const batchSize = 2;
   const lastModified = "111111";
-  let { records, responses, requests, coll } = get_test_collection_info({
+  let { responses, requests, coll } = get_test_collection_info({
     totalRecords,
     batchSize,
     lastModified,
     interruptedAfter: 3
   });
-  let response = await coll.getBatched(batchSize);
+  let { response, records } = await coll.getBatched(batchSize);
 
   equal(requests.length, 3);
-  equal(records.length, 0); // record handler shouldn't be called for anything
+  equal(records.length, 0); // we should not get any records
 
   // ensure we're returning the last response
   equal(responses[responses.length - 1], response);
 
   ok(!response.success);
   equal(response.status, 412);
 });
 
 add_task(async function test_get_throws() {
-  _("We shouldn't record records if get() throws for some reason");
+  _("getBatched() should throw if a get() throws");
   const totalRecords = 11;
   const batchSize = 2;
   const lastModified = "111111";
-  let { records, requests, coll } = get_test_collection_info({
+  let { requests, coll } = get_test_collection_info({
     totalRecords,
     batchSize,
     lastModified,
     throwAfter: 3
   });
 
-  await Assert.rejects(coll.getBatched(batchSize), "Some Network Error");
+  await Assert.rejects(coll.getBatched(batchSize), /Some Network Error/);
 
   equal(requests.length, 3);
-  equal(records.length, 0);
 });
+
+function checkRecordsOrder(records) {
+  ok(records.length > 0)
+  for (let i = 0; i < records.length; i++) {
+    equal(records[i].id, String(i));
+    equal(records[i].payload, "test:" + i);
+  }
+}
deleted file mode 100644
--- a/services/sync/tests/unit/test_collection_inc_get.js
+++ /dev/null
@@ -1,188 +0,0 @@
-/* Any copyright is dedicated to the Public Domain.
-   http://creativecommons.org/publicdomain/zero/1.0/ */
-
-_("Make sure Collection can correctly incrementally parse GET requests");
-Cu.import("resource://services-sync/record.js");
-Cu.import("resource://services-sync/service.js");
-
-function run_test() {
-  let base = "http://fake/";
-  let coll = new Collection("http://fake/uri/", WBORecord, Service);
-  let stream = { _data: "" };
-  let called, recCount, sum;
-
-  _("Not-JSON, string payloads are strings");
-  called = false;
-  stream._data = '{"id":"hello","payload":"world"}\n';
-  coll.recordHandler = function(rec) {
-    called = true;
-    _("Got record:", JSON.stringify(rec));
-    rec.collection = "uri";           // This would be done by an engine, so do it here.
-    do_check_eq(rec.collection, "uri");
-    do_check_eq(rec.id, "hello");
-    do_check_eq(rec.uri(base).spec, "http://fake/uri/hello");
-    do_check_eq(rec.payload, "world");
-  };
-  coll._onProgress.call(stream);
-  do_check_eq(stream._data, "");
-  do_check_true(called);
-  _("\n");
-
-
-  _("Parse record with payload");
-  called = false;
-  stream._data = '{"payload":"{\\"value\\":123}"}\n';
-  coll.recordHandler = function(rec) {
-    called = true;
-    _("Got record:", JSON.stringify(rec));
-    do_check_eq(rec.payload.value, 123);
-  };
-  coll._onProgress.call(stream);
-  do_check_eq(stream._data, "");
-  do_check_true(called);
-  _("\n");
-
-
-  _("Parse multiple records in one go");
-  called = false;
-  recCount = 0;
-  sum = 0;
-  stream._data = '{"id":"hundred","payload":"{\\"value\\":100}"}\n{"id":"ten","payload":"{\\"value\\":10}"}\n{"id":"one","payload":"{\\"value\\":1}"}\n';
-  coll.recordHandler = function(rec) {
-    called = true;
-    _("Got record:", JSON.stringify(rec));
-    recCount++;
-    sum += rec.payload.value;
-    _("Incremental status: count", recCount, "sum", sum);
-    rec.collection = "uri";
-    switch (recCount) {
-      case 1:
-        do_check_eq(rec.id, "hundred");
-        do_check_eq(rec.uri(base).spec, "http://fake/uri/hundred");
-        do_check_eq(rec.payload.value, 100);
-        do_check_eq(sum, 100);
-        break;
-      case 2:
-        do_check_eq(rec.id, "ten");
-        do_check_eq(rec.uri(base).spec, "http://fake/uri/ten");
-        do_check_eq(rec.payload.value, 10);
-        do_check_eq(sum, 110);
-        break;
-      case 3:
-        do_check_eq(rec.id, "one");
-        do_check_eq(rec.uri(base).spec, "http://fake/uri/one");
-        do_check_eq(rec.payload.value, 1);
-        do_check_eq(sum, 111);
-        break;
-      default:
-        do_throw("unexpected number of record counts", recCount);
-        break;
-    }
-  };
-  coll._onProgress.call(stream);
-  do_check_eq(recCount, 3);
-  do_check_eq(sum, 111);
-  do_check_eq(stream._data, "");
-  do_check_true(called);
-  _("\n");
-
-
-  _("Handle incremental data incoming");
-  called = false;
-  recCount = 0;
-  sum = 0;
-  stream._data = '{"payl';
-  coll.recordHandler = function(rec) {
-    called = true;
-    do_throw("shouldn't have gotten a record..");
-  };
-  coll._onProgress.call(stream);
-  _("shouldn't have gotten anything yet");
-  do_check_eq(recCount, 0);
-  do_check_eq(sum, 0);
-  _("leading array bracket should have been trimmed");
-  do_check_eq(stream._data, '{"payl');
-  do_check_false(called);
-  _();
-
-  _("adding more data enough for one record..");
-  called = false;
-  stream._data += 'oad":"{\\"value\\":100}"}\n';
-  coll.recordHandler = function(rec) {
-    called = true;
-    _("Got record:", JSON.stringify(rec));
-    recCount++;
-    sum += rec.payload.value;
-  };
-  coll._onProgress.call(stream);
-  _("should have 1 record with sum 100");
-  do_check_eq(recCount, 1);
-  do_check_eq(sum, 100);
-  _("all data should have been consumed including trailing comma");
-  do_check_eq(stream._data, "");
-  do_check_true(called);
-  _();
-
-  _("adding more data..");
-  called = false;
-  stream._data += '{"payload":"{\\"value\\":10}"';
-  coll.recordHandler = function(rec) {
-    called = true;
-    do_throw("shouldn't have gotten a record..");
-  };
-  coll._onProgress.call(stream);
-  _("should still have 1 record with sum 100");
-  do_check_eq(recCount, 1);
-  do_check_eq(sum, 100);
-  _("should almost have a record");
-  do_check_eq(stream._data, '{"payload":"{\\"value\\":10}"');
-  do_check_false(called);
-  _();
-
-  _("add data for two records..");
-  called = false;
-  stream._data += '}\n{"payload":"{\\"value\\":1}"}\n';
-  coll.recordHandler = function(rec) {
-    called = true;
-    _("Got record:", JSON.stringify(rec));
-    recCount++;
-    sum += rec.payload.value;
-    switch (recCount) {
-      case 2:
-        do_check_eq(rec.payload.value, 10);
-        do_check_eq(sum, 110);
-        break;
-      case 3:
-        do_check_eq(rec.payload.value, 1);
-        do_check_eq(sum, 111);
-        break;
-      default:
-        do_throw("unexpected number of record counts", recCount);
-        break;
-    }
-  };
-  coll._onProgress.call(stream);
-  _("should have gotten all 3 records with sum 111");
-  do_check_eq(recCount, 3);
-  do_check_eq(sum, 111);
-  _("should have consumed all data");
-  do_check_eq(stream._data, "");
-  do_check_true(called);
-  _();
-
-  _("add no extra data");
-  called = false;
-  stream._data += "";
-  coll.recordHandler = function(rec) {
-    called = true;
-    do_throw("shouldn't have gotten a record..");
-  };
-  coll._onProgress.call(stream);
-  _("should still have 3 records with sum 111");
-  do_check_eq(recCount, 3);
-  do_check_eq(sum, 111);
-  _("should have consumed nothing but still have nothing");
-  do_check_eq(stream._data, "");
-  do_check_false(called);
-  _("\n");
-}
--- a/services/sync/tests/unit/xpcshell.ini
+++ b/services/sync/tests/unit/xpcshell.ini
@@ -44,17 +44,16 @@ tags = addons
 [test_resource.js]
 [test_resource_async.js]
 [test_resource_header.js]
 [test_resource_ua.js]
 [test_syncstoragerequest.js]
 
 # Generic Sync types.
 [test_browserid_identity.js]
-[test_collection_inc_get.js]
 [test_collection_getBatched.js]
 [test_collections_recovery.js]
 [test_keys.js]
 [test_records_crypto.js]
 [test_records_wbo.js]
 
 # Engine APIs.
 [test_engine.js]
--- a/services/sync/tps/extensions/tps/resource/tps.jsm
+++ b/services/sync/tps/extensions/tps/resource/tps.jsm
@@ -610,36 +610,38 @@ var TPS = {
     }
   },
 
   /**
    * Use Sync's bookmark validation code to see if we've corrupted the tree.
    */
   ValidateBookmarks() {
 
-    let getServerBookmarkState = () => {
+    let getServerBookmarkState = async () => {
       let bookmarkEngine = Weave.Service.engineManager.get("bookmarks");
       let collection = bookmarkEngine.itemSource();
       let collectionKey = bookmarkEngine.service.collectionKeys.keyForCollection(bookmarkEngine.name);
       collection.full = true;
       let items = [];
-      collection.recordHandler = function(item) {
-        item.decrypt(collectionKey);
-        items.push(item.cleartext);
-      };
-      Async.promiseSpinningly(collection.get());
+      let resp = await collection.get();
+      for (let json of resp.obj) {
+        let record = new collection._recordObj();
+        record.deserialize(json);
+        record.decrypt(collectionKey);
+        items.push(record.cleartext);
+      }
       return items;
     };
     let serverRecordDumpStr;
     try {
       Logger.logInfo("About to perform bookmark validation");
       let clientTree = Async.promiseSpinningly(PlacesUtils.promiseBookmarksTree("", {
         includeItemIds: true
       }));
-      let serverRecords = getServerBookmarkState();
+      let serverRecords = Async.promiseSpinningly(getServerBookmarkState());
       // We can't wait until catch to stringify this, since at that point it will have cycles.
       serverRecordDumpStr = JSON.stringify(serverRecords);
 
       let validator = new BookmarkValidator();
       let {problemData} = Async.promiseSpinningly(validator.compareServerWithClient(serverRecords, clientTree));
 
       for (let {name, count} of problemData.getSummary()) {
         // Exclude mobile showing up on the server hackily so that we don't