Bug 1253051 - wip. draft
authorMark Hammond <mhammond@skippinet.com.au>
Tue, 19 Jul 2016 13:01:58 +1000
changeset 392742 481e4a1238e72e6a3da0d0d2cb09918b40883dea
parent 392698 94968a940273882150fc98556d4abf961b287ad8
child 526405 cadf177253f1a985f199c0bb81a5ca3751da03aa
push id24114
push userbmo:markh@mozilla.com
push dateTue, 26 Jul 2016 08:20:42 +0000
bugs1253051
milestone50.0a1
Bug 1253051 - wip. MozReview-Commit-ID: CLE0LaE8x5a
services/sync/modules/engines.js
services/sync/modules/record.js
services/sync/modules/service.js
services/sync/tests/unit/head_http_server.js
services/sync/tests/unit/test_postqueue.js
services/sync/tests/unit/test_syncengine_sync.js
services/sync/tests/unit/xpcshell.ini
--- a/services/sync/modules/engines.js
+++ b/services/sync/modules/engines.js
@@ -1453,16 +1453,21 @@ SyncEngine.prototype = {
       this._log.trace("Preparing " + modifiedIDs.length +
                       " outgoing records");
 
       let counts = { sent: modifiedIDs.length, failed: 0 };
 
       // collection we'll upload
       let up = new Collection(this.engineURL, null, this.service);
       let handleResponse = resp => {
+        // XXX - TODO - this needs to be aware of batch semantics somehow:
+        // * We don't want to update this.lastSync until the batch is complete.
+        // * We still need to remember the success/failure indicators, but
+        //   not actually save them to this._modified etc until a batch
+        //   is complete.
         if (!resp.success) {
           this._log.debug("Uploading records failed: " + resp);
           resp.failureCode = ENGINE_UPLOAD_FAIL;
           throw resp;
         }
 
         // Update server timestamp from the upload.
         let modified = resp.headers["x-weave-timestamp"];
--- a/services/sync/modules/record.js
+++ b/services/sync/modules/record.js
@@ -526,16 +526,19 @@ this.Collection = function Collection(ur
   this._service = service;
 
   this._full = false;
   this._ids = null;
   this._limit = 0;
   this._older = 0;
   this._newer = 0;
   this._data = [];
+  // optional members used by batch operations.
+  this._batch = null;
+  this._commit = false;
 }
 Collection.prototype = {
   __proto__: Resource.prototype,
   _logName: "Sync.Collection",
 
   _rebuildURL: function Coll__rebuildURL() {
     // XXX should consider what happens if it's not a URL...
     this.uri.QueryInterface(Ci.nsIURL);
@@ -549,16 +552,20 @@ Collection.prototype = {
     if (this.full)
       args.push('full=1');
     if (this.sort)
       args.push('sort=' + this.sort);
     if (this.ids != null)
       args.push("ids=" + this.ids);
     if (this.limit > 0 && this.limit != Infinity)
       args.push("limit=" + this.limit);
+    if (this._batch)
+      args.push("batch=" + this._batch);
+    if (this._commit)
+      args.push("commit=true");
 
     this.uri.query = (args.length > 0)? '?' + args.join('&') : '';
   },
 
   // get full items
   get full() { return this._full; },
   set full(value) {
     this._full = value;
@@ -598,16 +605,29 @@ Collection.prototype = {
   // newest (newest first)
   // index
   get sort() { return this._sort; },
   set sort(value) {
     this._sort = value;
     this._rebuildURL();
   },
 
+  // Set information about the batch for this request.
+  get batch() { throw new Error("Can't get the batch ID via a getter")},
+  set batch(value) {
+    this._batch = value;
+    this._rebuildURL();
+  },
+
+  get commit() { throw new Error("Can't get the commit flag via a getter")},
+  set commit(value) {
+    this._commit = value;
+    this._rebuildURL();
+  },
+
   set recordHandler(onRecord) {
     // Save this because onProgress is called with this as the ChannelListener
     let coll = this;
 
     // Switch to newline separated records for incremental parsing
     coll.setHeader("Accept", "application/newlines");
 
     this._onProgress = function() {
@@ -626,44 +646,91 @@ Collection.prototype = {
   },
 
   // This object only supports posting via the postQueue object.
   post() {
     throw new Error("Don't directly post to a collection - use newPostQueue instead");
   },
 
   newPostQueue(log, postCallback) {
-    let poster = data => {
+    let poster = (data, headers, batch, commit) => {
+      this.batch = batch;
+      this.commit = commit;
+      for (let [header, value] of headers) {
+        this.setHeader(header, value);
+      }
       return Resource.prototype.post.call(this, data);
     }
-    return new PostQueue(poster, log, postCallback);
+    let getConfig = (name, defaultVal) => {
+      return this._service.serverConfiguration && this._service.serverConfiguration[name] ?
+             this._service.serverConfiguration[name] :
+             defaultVal;
+    }
+    let config = {
+      max_post_bytes: getConfig("max_post_bytes", MAX_UPLOAD_BYTES),
+      max_post_records: getConfig("max_post_records", MAX_UPLOAD_RECORDS),
+      // XXX - docs for this feature suggest "max_batch_bytes" etc, but in
+      // reality the server is currently sending "max_total_bytes".
+      // We still use max_batch_* internally as the intent is clearer.
+      max_batch_bytes: getConfig("max_total_bytes", Infinity),
+      max_batch_records: getConfig("max_total_records", Infinity),
+    }
+    return new PostQueue(poster, config, log, postCallback);
   },
 };
 
 /* A helper to manage the posting of records while respecting the various
    size limits.
+
+   This supports the concept of a server-side "batch". The general idea is:
+   * We queue as many records as allowed in memory, then make a single POST.
+   * This first POST (optionally) gives us a batch ID, which we use for
+     all subsequent posts, until...
+   * At some point we hit a batch-maximum, and jump through a few hoops to
+   * commit the current batch (ie, all previous POSTs) and start a new one.
+   * Eventually commit the final batch.
+
+  In most cases we expect there to be exactly 1 batch consisting of possibly
+  multiple POSTs.
 */
-function PostQueue(poster, log, postCallback) {
+function PostQueue(poster, config, log, postCallback) {
   // The "post" function we should use when it comes time to do the post.
   this.poster = poster;
   this.log = log;
 
+  // The config we use
+  this.config = config;
+
   // The callback we make with the response when we do get around to making the
   // post (which could be during any of the enqueue() calls or the final flush())
   // This callback may be called multiple times and must not add new items to
   // the queue.
   this.postCallback = postCallback;
 
   // The string where we are capturing the stringified version of the records
   // queued so far. It will always be invalid JSON as it is always missing the
-  // close bracket.
+  // closing bracket.
   this.queued = "";
 
-  // The number of records we've queued so far.
+  // The number of records we've queued so far but are yet to POST.
   this.numQueued = 0;
+
+  // The number of records/bytes we've processed in previous POSTs for our
+  // current batch. Does *not* include records currently queued for the next POST.
+  this.numAlreadyBatched = 0;
+  this.bytesAlreadyBatched = 0;
+
+  // The ID of our current batch. Can be undefined (meaning we are yet to make
+  // the first post of a patch, so don't know if we have a batch), null (meaning
+  // we've made the first post but the server response indicated no batching
+  // semantics, otherwise we have made the first post and it holds the batch ID
+  // returned from the server.
+  this.batchID = undefined;
+  // The X-Last-Modified header from the request that created the batch.
+  this.batchLastModified = null;
 }
 
 PostQueue.prototype = {
   enqueue(record) {
     // We want to ensure the record has a .toJSON() method defined - even
     // though JSON.stringify() would implicitly call it, the stringify might
     // still work even if it isn't defined, which isn't what we want.
     let jsonRepr = record.toJSON();
@@ -673,33 +740,103 @@ PostQueue.prototype = {
     let bytes = JSON.stringify(jsonRepr);
     // Note that we purposely don't check if a single record would exceed our
     // limit - we still attempt the post and if it sees a 413 like we think it
     // will, we just let that do whatever it does (which is probably cause
     // ongoing sync failures for that engine - bug 1241356 exists to fix this)
     // (Note that counter-intuitively, the post of the oversized record will
     // not happen here but on the next .enqueue/.flush.)
 
-    // Do a flush if we can't add this record without exceeding our limits.
+    // Do a flush if we can't add this record without exceeding our single-request
+    // limits, or without exceeding the total limit for a single batch.
     let newLength = this.queued.length + bytes.length + 1; // extra 1 for trailing "]"
-    if (this.numQueued >= MAX_UPLOAD_RECORDS || newLength >= MAX_UPLOAD_BYTES) {
-      this.log.trace("PostQueue flushing"); // flush logs more info...
-      // We need to write the queue out before handling this one.
-      this.flush();
+    let postSizeExceeded = this.numQueued >= this.config.max_post_records ||
+                           newLength >= this.config.max_post_bytes;
+    let batchSizeExceeded = this.numQueued + this.numAlreadyBatched >= this.config.max_batch_records ||
+                            newLength + this.bytesAlreadyBatched >= this.config.max_batch_bytes;
+
+    if (postSizeExceeded || batchSizeExceeded) {
+      this.log.trace(`PostQueue flushing due to postSizeExceeded=${postSizeExceeded}, batchSizeExceeded=${batchSizeExceeded}`);
+      // We need to write the queue out before handling this one, but we only
+      // commit the batch (and thus start a new one) if the batch is full.
+      this.flush(batchSizeExceeded);
     }
     // Either a ',' or a '[' depending on whether this is the first record.
     this.queued += this.numQueued ? "," : "[";
     this.queued += bytes;
     this.numQueued++;
   },
 
-  flush() {
+  flush(finalBatchPost = true) {
     if (!this.queued) {
-      // nothing queued.
+      // nothing queued - we can't be in a batch, and something has gone very
+      // bad if we think we are.
+      if (this.batchID) {
+        throw new Error(`Flush called when no queued records but we are in a batch ${this.batchID}`);
+      }
       return;
     }
-    this.log.info(`Posting ${this.numQueued} records of ${this.queued.length+1} bytes`);
+    // the batch query-param and headers we'll send.
+    let batch;
+    let headers = [];
+    if (this.batchID === undefined) {
+      batch = "true";
+    } else if (this.batchID) {
+      // We have an existing batch.
+      batch = this.batchID;
+      headers.push(["x-if-unmodified-since", this.batchLastModified]);
+    } else {
+      // Not the first post and we know we have no batch semantics.
+      batch = null;
+    }
+
+    this.log.info(`Posting ${this.numQueued} records of ${this.queued.length+1} bytes with batch=${batch}`);
     let queued = this.queued + "]";
+    if (finalBatchPost) {
+      this.bytesAlreadyBatched = 0;
+      this.numAlreadyBatched = 0;
+    } else {
+      this.bytesAlreadyBatched += queued.length;
+      this.numAlreadyBatched += this.numQueued;
+    }
     this.queued = "";
     this.numQueued = 0;
-    this.postCallback(this.poster(queued));
+    let response = this.poster(queued, headers, batch, !!(finalBatchPost && this.batchID !== null));
+    if (response.success) {
+      if (finalBatchPost) {
+        this.log.trace("Committed batch", this.batchID);
+        this.batchID = undefined; // we are now in "first post for the batch" state.
+      } else {
+        if (response.status == 202) {
+          // this response is saying the server has batch semantics - we should
+          // always have a batch ID in the response.
+          let responseBatchID = response.obj.batch;
+          this.log.trace("Server responsed 202 with batch", responseBatchID);
+          if (!responseBatchID) {
+            throw new Error("Invalid server response: 202 without a batch ID", response);
+          }
+          if (this.batchID === undefined) {
+            this.batchID = responseBatchID;
+            this.batchLastModified = response.headers["x-last-modified"];
+            if (!this.batchLastModified) {
+              throw new Error("Batch response without x-last-modified");
+            }
+          } else {
+            if (this.batchID != responseBatchID) {
+              throw new Error(`Invalid client/server batch state - client has {this.batchID}, server has ${responseBatchID}`);
+            }
+          }
+        } else {
+          if (this.batchID) {
+            throw new Error("Server responded non-202 success code while a batch was in progress", response);
+          }
+          this.batchID = null; // no batch semantics are in place.
+        }
+      }
+    } else {
+      this.log.trace("Server error response during a batch", response);
+      // not clear what we should do here - we expect the consumer of this to
+      // abort by throwing in the postCallback below.
+      // We should probably reset the batch indicators just in case...
+    }
+    this.postCallback(response);
   },
 }
--- a/services/sync/modules/service.js
+++ b/services/sync/modules/service.js
@@ -1051,21 +1051,53 @@ Sync11Service.prototype = {
     try {
       cb.wait();
       return null;
     } catch (ex) {
       return this.errorHandler.errorStr(ex.body);
     }
   },
 
+  _fetchServerConfiguration() {
+    this.serverConfiguration = null;
+    // This is similar to _fetchInfo, but with different error handling.
+
+    let infoURL = this.userBaseURL + "info/configuration";
+    this._log.debug("Fetching server configuration", infoURL);
+    let configResponse;
+    // At this stage we ignore almost all errors fetching the config as it
+    // is an optional facility.
+    try {
+      configResponse = this.resource(infoURL).get();
+    } catch (ex) {
+      // This is probably a network or similar error.
+      this._log.warn("Failed to fetch info/configuration", ex);
+      this.errorHandler.checkServerError(ex);
+      return;
+    }
+
+    if (configResponse.status == 404) {
+      // This server doesn't support the URL yet - that's OK.
+      this._log.debug("info/configuration returned 404 - using default upload semantics");
+    } else if (configResponse.status != 200) {
+      this._log.warn(`info/configuration returned ${configResponse.status} - using default configuration`);
+      this.errorHandler.checkServerError(configResponse);
+    } else {
+      this.serverConfiguration = configResponse.obj;
+    }
+    this._log.trace("info/configuration for this server", this.serverConfiguration);
+  },
+
   // Stuff we need to do after login, before we can really do
   // anything (e.g. key setup).
   _remoteSetup: function _remoteSetup(infoResponse) {
     let reset = false;
 
+    this._fetchServerConfiguration();
+
     this._log.debug("Fetching global metadata record");
     let meta = this.recordManager.get(this.metaURL);
 
     // Checking modified time of the meta record.
     if (infoResponse &&
         (infoResponse.obj.meta != this.metaModified) &&
         (!meta || !meta.isNew)) {
 
--- a/services/sync/tests/unit/head_http_server.js
+++ b/services/sync/tests/unit/head_http_server.js
@@ -389,36 +389,36 @@ ServerCollection.prototype = {
         options.newer = parseFloat(options.newer);
       }
       if (options.limit) {
         options.limit = parseInt(options.limit, 10);
       }
 
       switch(request.method) {
         case "GET":
-          body = self.get(options);
+          body = self.get(options, request);
           // "If supported by the db, this header will return the number of
           // records total in the request body of any multiple-record GET
           // request."
           let records = options.recordCount;
           self._log.info("Records: " + records);
           if (records != null) {
             response.setHeader("X-Weave-Records", "" + records);
           }
           break;
 
         case "POST":
-          let res = self.post(readBytesFromInputStream(request.bodyInputStream));
+          let res = self.post(readBytesFromInputStream(request.bodyInputStream), request);
           body = JSON.stringify(res);
           response.newModified = res.modified;
           break;
 
         case "DELETE":
           self._log.debug("Invoking ServerCollection.DELETE.");
-          let deleted = self.delete(options);
+          let deleted = self.delete(options, request);
           let ts = new_timestamp();
           body = JSON.stringify(ts);
           response.newModified = ts;
           response.deleted = deleted;
           break;
       }
       response.setHeader("X-Weave-Timestamp",
                          "" + new_timestamp(),
new file mode 100644
--- /dev/null
+++ b/services/sync/tests/unit/test_postqueue.js
@@ -0,0 +1,264 @@
+/* Any copyright is dedicated to the Public Domain.
+ * http://creativecommons.org/publicdomain/zero/1.0/ */
+
+let { PostQueue } = Cu.import("resource://services-sync/record.js", {});
+
+initTestLogging("Trace");
+
+function makeRecord(nbytes) {
+  // make a string 2-bytes less - the added quotes will make it correct.
+  return {
+    toJSON: () => "x".repeat(nbytes-2),
+  }
+}
+
+function makePostQueue(config, responseGenerator) {
+  let stats = {
+    posts: [],
+  }
+  let poster = (data, headers, batch, commit) => {
+    let thisPost = { nbytes: data.length, batch, commit };
+    if (headers.length) {
+      thisPost.headers = headers;
+    }
+    stats.posts.push(thisPost);
+    return responseGenerator.next().value;
+  }
+
+  let done = () => {}
+  let pq = new PostQueue(poster, config, getTestLogger(), done);
+  return { pq, stats };
+}
+
+add_test(function test_simple() {
+  let config = {
+    max_post_bytes: 1000,
+    max_post_records: 100,
+    max_batch_bytes: Infinity,
+    max_batch_records: Infinity,
+  }
+
+  function* responseGenerator() {
+    yield { success: true, status: 200 };
+  }
+
+  let { pq, stats } = makePostQueue(config, responseGenerator());
+  pq.enqueue(makeRecord(10));
+  pq.flush();
+
+  deepEqual(stats.posts, [{
+    nbytes: 12, // expect our 10 byte record plus "[]" to wrap it.
+    commit: true, // we don't know if we have batch semantics, so committed.
+    batch: "true"}]);
+
+  run_next_test();
+});
+
+// Test we do the right thing when we need to make multiple posts when there
+// are no batch semantics
+add_test(function test_max_post_bytes_no_batch() {
+  let config = {
+    max_post_bytes: 50,
+    max_post_records: 4,
+    max_batch_bytes: Infinity,
+    max_batch_records: Infinity,
+  }
+
+  function* responseGenerator() {
+    yield { success: true, status: 200 };
+    yield { success: true, status: 200 };
+  }
+
+  let { pq, stats } = makePostQueue(config, responseGenerator());
+  pq.enqueue(makeRecord(20)); // total size now 22 bytes - "[" + record + "]"
+  pq.enqueue(makeRecord(20)); // total size now 43 bytes - "[" + record + "," + record + "]"
+  pq.enqueue(makeRecord(20)); // this will exceed our byte limit, so will be in the 2nd POST.
+  pq.flush();
+
+  deepEqual(stats.posts, [
+    {
+      nbytes: 43, // 43 for the first post
+      commit: false,
+      batch: "true",
+    },{
+      nbytes: 22,
+      commit: false, // we know we aren't in a batch, so never commit.
+      batch: null,
+    }
+  ]);
+
+  run_next_test();
+});
+
+// Similar to the above, but we've hit max_records instead of max_bytes.
+add_test(function test_max_post_records_no_batch() {
+  let config = {
+    max_post_bytes: 100,
+    max_post_records: 2,
+    max_batch_bytes: Infinity,
+    max_batch_records: Infinity,
+  }
+
+  function* responseGenerator() {
+    yield { success: true, status: 200 };
+    yield { success: true, status: 200 };
+  }
+
+  let { pq, stats } = makePostQueue(config, responseGenerator());
+  pq.enqueue(makeRecord(20)); // total size now 22 bytes - "[" + record + "]"
+  pq.enqueue(makeRecord(20)); // total size now 43 bytes - "[" + record + "," + record + "]"
+  pq.enqueue(makeRecord(20)); // this will exceed our records limit, so will be in the 2nd POST.
+  pq.flush();
+
+  deepEqual(stats.posts, [
+    {
+      nbytes: 43, // 43 for the first post
+      commit: false,
+      batch: "true",
+    },{
+      nbytes: 22,
+      commit: false, // we know we aren't in a batch, so never commit.
+      batch: null,
+    }
+  ]);
+
+  run_next_test();
+});
+
+// Batch tests.
+
+// Test making a single post when batch semantics are in place.
+add_test(function test_single_batch() {
+  let config = {
+    max_post_bytes: 1000,
+    max_post_records: 100,
+    max_batch_bytes: 2000,
+    max_batch_records: 200,
+  }
+
+  function* responseGenerator() {
+    yield { success: true, status: 202, obj: { batch: 1234 },
+            headers: { 'x-last-modified': 11111111 },
+    };
+  }
+
+  let { pq, stats } = makePostQueue(config, responseGenerator());
+  pq.enqueue(makeRecord(10));
+  pq.flush();
+
+  deepEqual(stats.posts, [
+    {
+      nbytes: 12, // expect our 10 byte record plus "[]" to wrap it.
+      commit: true, // we don't know if we have batch semantics, so committed.
+      batch: "true",
+    }
+  ]);
+
+  run_next_test();
+});
+
+// Test we do the right thing when we need to make multiple posts when there
+// are batch semantics in place.
+add_test(function test_max_post_bytes_batch() {
+  let config = {
+    max_post_bytes: 50,
+    max_post_records: 4,
+    max_batch_bytes: 5000,
+    max_batch_records: 100,
+  }
+
+  function* responseGenerator() {
+    yield { success: true, status: 202, obj: { batch: 1234 },
+            headers: { 'x-last-modified': 11111111 },
+    };
+    yield { success: true, status: 202, obj: { batch: 1234 },
+            headers: { 'x-last-modified': 11111111 },
+   };
+  }
+
+  let { pq, stats } = makePostQueue(config, responseGenerator());
+  pq.enqueue(makeRecord(20)); // total size now 22 bytes - "[" + record + "]"
+  pq.enqueue(makeRecord(20)); // total size now 43 bytes - "[" + record + "," + record + "]"
+  pq.enqueue(makeRecord(20)); // this will exceed our byte limit, so will be in the 2nd POST.
+  pq.flush();
+
+  deepEqual(stats.posts, [
+    {
+      nbytes: 43, // 43 for the first post
+      commit: false,
+      batch: "true",
+    },{
+      nbytes: 22,
+      commit: true,
+      batch: 1234,
+      headers: [['x-if-unmodified-since', 11111111]],
+    }
+  ]);
+
+  run_next_test();
+});
+
+// Test we do the right thing when the batch bytes limit is exceeded.
+add_test(function test_max_post_bytes_batch() {
+  let config = {
+    max_post_bytes: 50,
+    max_post_records: 20,
+    max_batch_bytes: 70,
+    max_batch_records: 100,
+  }
+
+  function* responseGenerator() {
+    yield { success: true, status: 202, obj: { batch: 1234 },
+            headers: { 'x-last-modified': 11111111 },
+    };
+    yield { success: true, status: 202, obj: { batch: 1234 },
+            headers: { 'x-last-modified': 11111111 },
+    };
+    yield { success: true, status: 202, obj: { batch: 5678 },
+            headers: { 'x-last-modified': 22222222 },
+    };
+    yield { success: true, status: 202, obj: { batch: 5678 },
+            headers: { 'x-last-modified': 22222222 },
+    };
+  }
+
+  let { pq, stats } = makePostQueue(config, responseGenerator());
+  pq.enqueue(makeRecord(20)); // total size now 22 bytes - "[" + record + "]"
+  pq.enqueue(makeRecord(20)); // total size now 43 bytes - "[" + record + "," + record + "]"
+  // this will exceed our POST byte limit, so will be in the 2nd POST - but still in the first batch.
+  pq.enqueue(makeRecord(20)); // 22 bytes for 2nd post, 55 bytes in the batch.
+  // this will exceed our batch byte limit, so will be in a new batch.
+  pq.enqueue(makeRecord(20)); // 22 bytes in 3rd post/2nd batch
+  pq.enqueue(makeRecord(20)); // 43 bytes in 3rd post/2nd batch
+  // This will exceed POST byte limit, so will be in the 4th post, part of the 2nd batch.
+  pq.enqueue(makeRecord(20)); // 22 bytes for 4th post/2nd batch
+  pq.flush();
+
+  deepEqual(stats.posts, [
+    {
+      nbytes: 43, // 43 for the first post
+      commit: false,
+      batch: "true",
+    },{
+      // second post of 22 bytes in the first batch, committing it.
+      nbytes: 22,
+      commit: true,
+      batch: 1234,
+      headers: [['x-if-unmodified-since', 11111111]],
+    }, {
+      // 3rd post of 43 bytes in a new batch, not yet committing it.
+      nbytes: 43,
+      commit: false,
+      batch: "true",
+    },{
+      // 4th post of 22 bytes in second batch, committing it.
+      nbytes: 22,
+      commit: true,
+      batch: 5678,
+      headers: [['x-if-unmodified-since', 22222222]],
+    },
+  ]);
+
+  run_next_test();
+});
+
--- a/services/sync/tests/unit/test_syncengine_sync.js
+++ b/services/sync/tests/unit/test_syncengine_sync.js
@@ -1426,29 +1426,41 @@ add_task(function *test_uploadOutgoing_f
     do_check_eq(engine._tracker.changedIDs['scotsman'], SCOTSMAN_CHANGED);
     do_check_eq(engine._tracker.changedIDs['peppercorn'], PEPPERCORN_CHANGED);
 
   } finally {
     yield promiseClean(server);
   }
 });
 
-
+/* A couple of "functional" tests to ensure we split records into appropriate
+   POST requests. More comprehensive unit-tests for this "batching" are in
+   test_postqueue.js.
+*/
 add_test(function test_uploadOutgoing_MAX_UPLOAD_RECORDS() {
   _("SyncEngine._uploadOutgoing uploads in batches of MAX_UPLOAD_RECORDS");
 
   Service.identity.username = "foo";
   let collection = new ServerCollection();
 
   // Let's count how many times the client posts to the server
   var noOfUploads = 0;
   collection.post = (function(orig) {
-    return function() {
+    return function(data, request) {
+      // This test doesn't arrange for batch semantics - so we expect the
+      // first request to come in with batch=true and the others to have no
+      // batch related headers at all (as the first response did not provide
+      // a batch ID)
+      if (noOfUploads == 0) {
+        do_check_eq(request.queryString, "batch=true");
+      } else {
+        do_check_eq(request.queryString, "");
+      }
       noOfUploads++;
-      return orig.apply(this, arguments);
+      return orig.call(this, data, request);
     };
   }(collection.post));
 
   // Create a bunch of records (and server side handlers)
   let engine = makeRotaryEngine();
   for (var i = 0; i < 234; i++) {
     let id = 'record-no-' + i;
     engine._store.items[id] = "Record No. " + i;
--- a/services/sync/tests/unit/xpcshell.ini
+++ b/services/sync/tests/unit/xpcshell.ini
@@ -175,16 +175,17 @@ skip-if = debug
 [test_prefs_store.js]
 support-files = prefs_test_prefs_store.js
 [test_prefs_tracker.js]
 [test_tab_engine.js]
 [test_tab_store.js]
 [test_tab_tracker.js]
 
 [test_warn_on_truncated_response.js]
+[test_postqueue.js]
 
 # FxA migration
 [test_fxa_migration.js]
 
 # Synced tabs.
 [test_syncedtabs.js]
 
 [test_telemetry.js]