Bug 1253111 - Part 2: Add support for batching uploads r=rnewman draft
authorGrigory Kruglov <gkruglov@mozilla.com>
Fri, 26 Aug 2016 14:05:47 -0700
changeset 407736 4a6a2db595fe72473b033acd4f3180fa799319d4
parent 407735 e029d60a063b2e1dd9061362c4d4a647263dd3ec
child 529941 8803d9a124963c96884003ea465e2577042499c8
push id28035
push usergkruglov@mozilla.com
push dateTue, 30 Aug 2016 23:11:36 +0000
reviewersrnewman
bugs1253111
milestone51.0a1
Bug 1253111 - Part 2: Add support for batching uploads r=rnewman - Introduce a new BatchingUploader class to handle storing records and keep track of batches/payloads - Refactor upload runnable and upload delegate into their own classes - Introduce Last-Modified and X-I-U-S handling into non-batching mode MozReview-Commit-ID: 3JLExwQvYzM
mobile/android/base/android-services.mozbuild
mobile/android/services/src/main/java/org/mozilla/gecko/sync/net/SyncResponse.java
mobile/android/services/src/main/java/org/mozilla/gecko/sync/net/SyncStorageRequest.java
mobile/android/services/src/main/java/org/mozilla/gecko/sync/net/SyncStorageResponse.java
mobile/android/services/src/main/java/org/mozilla/gecko/sync/repositories/ConstrainedServer11Repository.java
mobile/android/services/src/main/java/org/mozilla/gecko/sync/repositories/Server11Repository.java
mobile/android/services/src/main/java/org/mozilla/gecko/sync/repositories/Server11RepositorySession.java
mobile/android/services/src/main/java/org/mozilla/gecko/sync/repositories/uploaders/BatchMeta.java
mobile/android/services/src/main/java/org/mozilla/gecko/sync/repositories/uploaders/BatchingUploader.java
mobile/android/services/src/main/java/org/mozilla/gecko/sync/repositories/uploaders/BufferSizeTracker.java
mobile/android/services/src/main/java/org/mozilla/gecko/sync/repositories/uploaders/MayUploadProvider.java
mobile/android/services/src/main/java/org/mozilla/gecko/sync/repositories/uploaders/Payload.java
mobile/android/services/src/main/java/org/mozilla/gecko/sync/repositories/uploaders/PayloadUploadDelegate.java
mobile/android/services/src/main/java/org/mozilla/gecko/sync/repositories/uploaders/RecordUploadRunnable.java
mobile/android/services/src/main/java/org/mozilla/gecko/sync/stage/AndroidBrowserBookmarksServerSyncStage.java
mobile/android/services/src/main/java/org/mozilla/gecko/sync/stage/AndroidBrowserHistoryServerSyncStage.java
mobile/android/services/src/main/java/org/mozilla/gecko/sync/stage/FormHistoryServerSyncStage.java
mobile/android/services/src/main/java/org/mozilla/gecko/sync/stage/SafeConstrainedServer11Repository.java
mobile/android/services/src/main/java/org/mozilla/gecko/sync/stage/ServerSyncStage.java
mobile/android/tests/background/junit4/src/org/mozilla/android/sync/net/test/TestServer11Repository.java
mobile/android/tests/background/junit4/src/org/mozilla/android/sync/test/TestServer11RepositorySession.java
mobile/android/tests/background/junit4/src/org/mozilla/gecko/background/testhelpers/MockRecord.java
mobile/android/tests/background/junit4/src/org/mozilla/gecko/sync/repositories/test/TestSafeConstrainedServer11Repository.java
mobile/android/tests/background/junit4/src/org/mozilla/gecko/sync/repositories/uploaders/BatchMetaTest.java
mobile/android/tests/background/junit4/src/org/mozilla/gecko/sync/repositories/uploaders/BatchingUploaderTest.java
mobile/android/tests/background/junit4/src/org/mozilla/gecko/sync/repositories/uploaders/PayloadTest.java
mobile/android/tests/background/junit4/src/org/mozilla/gecko/sync/repositories/uploaders/PayloadUploadDelegateTest.java
mobile/android/tests/background/junit4/src/org/mozilla/gecko/sync/repositories/uploaders/RecordUploadRunnableTest.java
--- a/mobile/android/base/android-services.mozbuild
+++ b/mobile/android/base/android-services.mozbuild
@@ -1010,16 +1010,23 @@ sync_java_files = [TOPSRCDIR + '/mobile/
     'sync/repositories/Repository.java',
     'sync/repositories/RepositorySession.java',
     'sync/repositories/RepositorySessionBundle.java',
     'sync/repositories/Server11Repository.java',
     'sync/repositories/Server11RepositorySession.java',
     'sync/repositories/StoreFailedException.java',
     'sync/repositories/StoreTracker.java',
     'sync/repositories/StoreTrackingRepositorySession.java',
+    'sync/repositories/uploaders/BatchingUploader.java',
+    'sync/repositories/uploaders/BatchMeta.java',
+    'sync/repositories/uploaders/BufferSizeTracker.java',
+    'sync/repositories/uploaders/MayUploadProvider.java',
+    'sync/repositories/uploaders/Payload.java',
+    'sync/repositories/uploaders/PayloadUploadDelegate.java',
+    'sync/repositories/uploaders/RecordUploadRunnable.java',
     'sync/Server11PreviousPostFailedException.java',
     'sync/Server11RecordPostFailedException.java',
     'sync/setup/activities/ActivityUtils.java',
     'sync/setup/activities/WebURLFinder.java',
     'sync/setup/Constants.java',
     'sync/setup/InvalidSyncKeyException.java',
     'sync/SharedPreferencesClientsDataDelegate.java',
     'sync/stage/AbstractNonRepositorySyncStage.java',
--- a/mobile/android/services/src/main/java/org/mozilla/gecko/sync/net/SyncResponse.java
+++ b/mobile/android/services/src/main/java/org/mozilla/gecko/sync/net/SyncResponse.java
@@ -4,34 +4,42 @@
 
 package org.mozilla.gecko.sync.net;
 
 import org.mozilla.gecko.sync.Utils;
 
 import ch.boye.httpclientandroidlib.HttpResponse;
 
 public class SyncResponse extends MozResponse {
+  public static final String X_WEAVE_BACKOFF = "x-weave-backoff";
+  public static final String X_BACKOFF = "x-backoff";
+  public static final String X_LAST_MODIFIED = "x-last-modified";
+  public static final String X_WEAVE_TIMESTAMP = "x-weave-timestamp";
+  public static final String X_WEAVE_RECORDS = "x-weave-records";
+  public static final String X_WEAVE_QUOTA_REMAINING = "x-weave-quota-remaining";
+  public static final String X_WEAVE_ALERT = "x-weave-alert";
+
   public SyncResponse(HttpResponse res) {
     super(res);
   }
 
   /**
    * @return A number of seconds, or -1 if the 'X-Weave-Backoff' header was not
    *         present.
    */
   public int weaveBackoffInSeconds() throws NumberFormatException {
-    return this.getIntegerHeader("x-weave-backoff");
+    return this.getIntegerHeader(X_WEAVE_BACKOFF);
   }
 
   /**
    * @return A number of seconds, or -1 if the 'X-Backoff' header was not
    *         present.
    */
   public int xBackoffInSeconds() throws NumberFormatException {
-    return this.getIntegerHeader("x-backoff");
+    return this.getIntegerHeader(X_BACKOFF);
   }
 
   /**
    * Extract a number of seconds, or -1 if none of the specified headers were present.
    *
    * @param includeRetryAfter
    *          if <code>true</code>, the Retry-After header is excluded. This is
    *          useful for processing non-error responses where a Retry-After
@@ -75,41 +83,46 @@ public class SyncResponse extends MozRes
     long totalBackoff = totalBackoffInSeconds(true);
     if (totalBackoff < 0) {
       return -1;
     } else {
       return 1000 * totalBackoff;
     }
   }
 
+  public long normalizedWeaveTimestamp() {
+    return normalizedTimestampForHeader(X_WEAVE_TIMESTAMP);
+  }
+
   /**
-   * The timestamp returned from a Sync server is a decimal number of seconds,
+   * Timestamps returned from a Sync server are decimal numbers of seconds,
    * e.g., 1323393518.04.
    *
    * We want milliseconds since epoch.
    *
    * @return milliseconds since the epoch, as a long, or -1 if the header
    *         was missing or invalid.
    */
-  public long normalizedWeaveTimestamp() {
-    String h = "x-weave-timestamp";
-    if (!this.hasHeader(h)) {
+  public long normalizedTimestampForHeader(String header) {
+    if (!this.hasHeader(header)) {
       return -1;
     }
 
-    return Utils.decimalSecondsToMilliseconds(this.response.getFirstHeader(h).getValue());
+    return Utils.decimalSecondsToMilliseconds(
+            this.response.getFirstHeader(header).getValue()
+    );
   }
 
   public int weaveRecords() throws NumberFormatException {
-    return this.getIntegerHeader("x-weave-records");
+    return this.getIntegerHeader(X_WEAVE_RECORDS);
   }
 
   public int weaveQuotaRemaining() throws NumberFormatException {
-    return this.getIntegerHeader("x-weave-quota-remaining");
+    return this.getIntegerHeader(X_WEAVE_QUOTA_REMAINING);
   }
 
   public String weaveAlert() {
-    if (this.hasHeader("x-weave-alert")) {
-      return this.response.getFirstHeader("x-weave-alert").getValue();
+    if (this.hasHeader(X_WEAVE_ALERT)) {
+      return this.response.getFirstHeader(X_WEAVE_ALERT).getValue();
     }
     return null;
   }
 }
--- a/mobile/android/services/src/main/java/org/mozilla/gecko/sync/net/SyncStorageRequest.java
+++ b/mobile/android/services/src/main/java/org/mozilla/gecko/sync/net/SyncStorageRequest.java
@@ -115,17 +115,19 @@ public class SyncStorageRequest implemen
     }
 
     @Override
     public void handleHttpResponse(HttpResponse response) {
       Logger.debug(LOG_TAG, "SyncStorageResourceDelegate handling response: " + response.getStatusLine() + ".");
       SyncStorageRequestDelegate d = this.request.delegate;
       SyncStorageResponse res = new SyncStorageResponse(response);
       // It is the responsibility of the delegate handlers to completely consume the response.
-      if (res.wasSuccessful()) {
+      // In context of a Sync storage response, success is either a 200 OK or 202 Accepted.
+      // 202 is returned during uploads of data in a batching mode, indicating that more is expected.
+      if (res.getStatusCode() == 200 || res.getStatusCode() == 202) {
         d.handleRequestSuccess(res);
       } else {
         Logger.warn(LOG_TAG, "HTTP request failed.");
         try {
           Logger.warn(LOG_TAG, "HTTP response body: " + res.getErrorMessage());
         } catch (Exception e) {
           Logger.error(LOG_TAG, "Can't fetch HTTP response body.", e);
         }
--- a/mobile/android/services/src/main/java/org/mozilla/gecko/sync/net/SyncStorageResponse.java
+++ b/mobile/android/services/src/main/java/org/mozilla/gecko/sync/net/SyncStorageResponse.java
@@ -62,11 +62,24 @@ public class SyncStorageResponse extends
   public SyncStorageResponse(HttpResponse res) {
     super(res);
   }
 
   public String getErrorMessage() throws IllegalStateException, IOException {
     return SyncStorageResponse.getServerErrorMessage(this.body().trim());
   }
 
+  /**
+   * This header gives the last-modified time of the target resource as seen during processing of
+   * the request, and will be included in all success responses (200, 201, 204).
+   * When given in response to a write request, this will be equal to the server’s current time and
+   * to the new last-modified time of any BSOs created or changed by the request.
+   */
+  public String getLastModified() {
+    if (!response.containsHeader(X_LAST_MODIFIED)) {
+      return null;
+    }
+    return response.getFirstHeader(X_LAST_MODIFIED).getValue();
+  }
+
   // TODO: Content-Type and Content-Length validation.
 
 }
--- a/mobile/android/services/src/main/java/org/mozilla/gecko/sync/repositories/ConstrainedServer11Repository.java
+++ b/mobile/android/services/src/main/java/org/mozilla/gecko/sync/repositories/ConstrainedServer11Repository.java
@@ -2,31 +2,32 @@
  * License, v. 2.0. If a copy of the MPL was not distributed with this
  * file, You can obtain one at http://mozilla.org/MPL/2.0/. */
 
 package org.mozilla.gecko.sync.repositories;
 
 import java.net.URISyntaxException;
 
 import org.mozilla.gecko.sync.InfoCollections;
+import org.mozilla.gecko.sync.InfoConfiguration;
 import org.mozilla.gecko.sync.net.AuthHeaderProvider;
 
 /**
  * A kind of Server11Repository that supports explicit setting of limit and sort on operations.
  *
  * @author rnewman
  *
  */
 public class ConstrainedServer11Repository extends Server11Repository {
 
   private String sort = null;
   private long limit  = -1;
 
-  public ConstrainedServer11Repository(String collection, String storageURL, AuthHeaderProvider authHeaderProvider, InfoCollections infoCollections, long limit, String sort) throws URISyntaxException {
-    super(collection, storageURL, authHeaderProvider, infoCollections);
+  public ConstrainedServer11Repository(String collection, String storageURL, AuthHeaderProvider authHeaderProvider, InfoCollections infoCollections, InfoConfiguration infoConfiguration, long limit, String sort) throws URISyntaxException {
+    super(collection, storageURL, authHeaderProvider, infoCollections, infoConfiguration);
     this.limit = limit;
     this.sort  = sort;
   }
 
   @Override
   protected String getDefaultSort() {
     return sort;
   }
--- a/mobile/android/services/src/main/java/org/mozilla/gecko/sync/repositories/Server11Repository.java
+++ b/mobile/android/services/src/main/java/org/mozilla/gecko/sync/repositories/Server11Repository.java
@@ -4,57 +4,63 @@
 
 package org.mozilla.gecko.sync.repositories;
 
 import java.net.URI;
 import java.net.URISyntaxException;
 import java.util.ArrayList;
 
 import org.mozilla.gecko.sync.InfoCollections;
+import org.mozilla.gecko.sync.InfoConfiguration;
 import org.mozilla.gecko.sync.Utils;
 import org.mozilla.gecko.sync.net.AuthHeaderProvider;
 import org.mozilla.gecko.sync.repositories.delegates.RepositorySessionCreationDelegate;
 
 import android.content.Context;
+import android.support.annotation.NonNull;
+import android.support.annotation.Nullable;
 
 /**
  * A Server11Repository implements fetching and storing against the Sync 1.1 API.
  * It doesn't do crypto: that's the job of the middleware.
  *
  * @author rnewman
  */
 public class Server11Repository extends Repository {
   protected String collection;
   protected URI collectionURI;
   protected final AuthHeaderProvider authHeaderProvider;
   protected final InfoCollections infoCollections;
 
+  private final InfoConfiguration infoConfiguration;
+
   /**
    * Construct a new repository that fetches and stores against the Sync 1.1. API.
    *
    * @param collection name.
    * @param storageURL full URL to storage endpoint.
    * @param authHeaderProvider to use in requests; may be null.
    * @param infoCollections instance; must not be null.
    * @throws URISyntaxException
    */
-  public Server11Repository(String collection, String storageURL, AuthHeaderProvider authHeaderProvider, InfoCollections infoCollections) throws URISyntaxException {
+  public Server11Repository(@NonNull String collection, @NonNull String storageURL, AuthHeaderProvider authHeaderProvider, @NonNull InfoCollections infoCollections, @NonNull InfoConfiguration infoConfiguration) throws URISyntaxException {
     if (collection == null) {
       throw new IllegalArgumentException("collection must not be null");
     }
     if (storageURL == null) {
       throw new IllegalArgumentException("storageURL must not be null");
     }
     if (infoCollections == null) {
       throw new IllegalArgumentException("infoCollections must not be null");
     }
     this.collection = collection;
     this.collectionURI = new URI(storageURL + (storageURL.endsWith("/") ? collection : "/" + collection));
     this.authHeaderProvider = authHeaderProvider;
     this.infoCollections = infoCollections;
+    this.infoConfiguration = infoConfiguration;
   }
 
   @Override
   public void createSession(RepositorySessionCreationDelegate delegate,
                             Context context) {
     delegate.onSessionCreated(new Server11RepositorySession(this));
   }
 
@@ -114,9 +120,18 @@ public class Server11Repository extends 
 
   public AuthHeaderProvider getAuthHeaderProvider() {
     return authHeaderProvider;
   }
 
   public boolean updateNeeded(long lastSyncTimestamp) {
     return infoCollections.updateNeeded(collection, lastSyncTimestamp);
   }
+
+  @Nullable
+  public Long getCollectionLastModified() {
+    return infoCollections.getTimestamp(collection);
+  }
+
+  public InfoConfiguration getInfoConfiguration() {
+    return infoConfiguration;
+  }
 }
--- a/mobile/android/services/src/main/java/org/mozilla/gecko/sync/repositories/Server11RepositorySession.java
+++ b/mobile/android/services/src/main/java/org/mozilla/gecko/sync/repositories/Server11RepositorySession.java
@@ -1,102 +1,40 @@
 /* This Source Code Form is subject to the terms of the Mozilla Public
  * License, v. 2.0. If a copy of the MPL was not distributed with this
  * file, You can obtain one at http://mozilla.org/MPL/2.0/. */
 
 package org.mozilla.gecko.sync.repositories;
 
-import java.io.IOException;
-import java.io.OutputStream;
-import java.io.UnsupportedEncodingException;
 import java.net.URI;
 import java.net.URISyntaxException;
-import java.util.ArrayList;
 import java.util.Collections;
 import java.util.HashSet;
 import java.util.Set;
-import java.util.concurrent.atomic.AtomicLong;
 
-import org.json.simple.JSONArray;
 import org.mozilla.gecko.background.common.log.Logger;
 import org.mozilla.gecko.sync.CryptoRecord;
 import org.mozilla.gecko.sync.DelayedWorkTracker;
-import org.mozilla.gecko.sync.ExtendedJSONObject;
 import org.mozilla.gecko.sync.HTTPFailureException;
-import org.mozilla.gecko.sync.Server11PreviousPostFailedException;
-import org.mozilla.gecko.sync.Server11RecordPostFailedException;
-import org.mozilla.gecko.sync.UnexpectedJSONException;
 import org.mozilla.gecko.sync.crypto.KeyBundle;
 import org.mozilla.gecko.sync.net.AuthHeaderProvider;
+import org.mozilla.gecko.sync.net.SyncResponse;
 import org.mozilla.gecko.sync.net.SyncStorageCollectionRequest;
-import org.mozilla.gecko.sync.net.SyncStorageRequest;
-import org.mozilla.gecko.sync.net.SyncStorageRequestDelegate;
 import org.mozilla.gecko.sync.net.SyncStorageResponse;
 import org.mozilla.gecko.sync.net.WBOCollectionRequestDelegate;
-import org.mozilla.gecko.sync.repositories.delegates.RepositorySessionBeginDelegate;
 import org.mozilla.gecko.sync.repositories.delegates.RepositorySessionFetchRecordsDelegate;
 import org.mozilla.gecko.sync.repositories.delegates.RepositorySessionGuidsSinceDelegate;
 import org.mozilla.gecko.sync.repositories.delegates.RepositorySessionStoreDelegate;
 import org.mozilla.gecko.sync.repositories.delegates.RepositorySessionWipeDelegate;
 import org.mozilla.gecko.sync.repositories.domain.Record;
-
-import ch.boye.httpclientandroidlib.entity.ContentProducer;
-import ch.boye.httpclientandroidlib.entity.EntityTemplate;
+import org.mozilla.gecko.sync.repositories.uploaders.BatchingUploader;
 
 public class Server11RepositorySession extends RepositorySession {
-  private static byte[] recordsStart;
-  private static byte[] recordSeparator;
-  private static byte[] recordsEnd;
-
-  static {
-    try {
-      recordsStart    = "[\n".getBytes("UTF-8");
-      recordSeparator = ",\n".getBytes("UTF-8");
-      recordsEnd      = "\n]\n".getBytes("UTF-8");
-    } catch (UnsupportedEncodingException e) {
-      // These won't fail.
-    }
-  }
-
   public static final String LOG_TAG = "Server11Session";
 
-  private static final int UPLOAD_BYTE_THRESHOLD = 1024 * 1024;    // 1MB.
-  private static final int UPLOAD_ITEM_THRESHOLD = 50;
-  private static final int PER_RECORD_OVERHEAD   = 2;              // Comma, newline.
-  // {}, newlines, but we get to skip one record overhead.
-  private static final int PER_BATCH_OVERHEAD    = 5 - PER_RECORD_OVERHEAD;
-
-  /**
-   * Return the X-Weave-Timestamp header from <code>response</code>, or the
-   * current time if it is missing.
-   * <p>
-   * <b>Warning:</b> this can cause the timestamp of <code>response</code> to
-   * cross domains (from server clock to local clock), which could cause records
-   * to be skipped on account of clock drift. This should never happen, because
-   * <i>every</i> server response should have a well-formed X-Weave-Timestamp
-   * header.
-   *
-   * @param response
-   *          The <code>SyncStorageResponse</code> to interrogate.
-   * @return Normalized timestamp in milliseconds.
-   */
-  public static long getNormalizedTimestamp(SyncStorageResponse response) {
-    long normalizedTimestamp = -1;
-    try {
-      normalizedTimestamp = response.normalizedWeaveTimestamp();
-    } catch (NumberFormatException e) {
-      Logger.warn(LOG_TAG, "Malformed X-Weave-Timestamp header received.", e);
-    }
-    if (-1 == normalizedTimestamp) {
-      Logger.warn(LOG_TAG, "Computing stand-in timestamp from local clock. Clock drift could cause records to be skipped.");
-      normalizedTimestamp = System.currentTimeMillis();
-    }
-    return normalizedTimestamp;
-  }
-
   /**
    * Used to track outstanding requests, so that we can abort them as needed.
    */
   private final Set<SyncStorageCollectionRequest> pending = Collections.synchronizedSet(new HashSet<SyncStorageCollectionRequest>());
 
   @Override
   public void abort() {
     super.abort();
@@ -145,17 +83,18 @@ public class Server11RepositorySession e
       return null;
     }
 
     @Override
     public void handleRequestSuccess(SyncStorageResponse response) {
       Logger.debug(LOG_TAG, "Fetch done.");
       removeRequestFromPending();
 
-      final long normalizedTimestamp = getNormalizedTimestamp(response);
+      // This will change overall and will use X_LAST_MODIFIED in Bug 730142.
+      final long normalizedTimestamp = response.normalizedTimestampForHeader(SyncResponse.X_WEAVE_TIMESTAMP);
       Logger.debug(LOG_TAG, "Fetch completed. Timestamp is " + normalizedTimestamp);
 
       // When we're done processing other events, finish.
       workTracker.delayWorkItem(new Runnable() {
         @Override
         public void run() {
           Logger.debug(LOG_TAG, "Delayed onFetchCompleted running.");
           // TODO: verify number of returned records.
@@ -200,37 +139,36 @@ public class Server11RepositorySession e
 
     // TODO: this implies that we've screwed up our inheritance chain somehow.
     @Override
     public KeyBundle keyBundle() {
       return null;
     }
   }
 
-
   Server11Repository serverRepository;
-  AtomicLong uploadTimestamp = new AtomicLong(0);
-
-  private void bumpUploadTimestamp(long ts) {
-    while (true) {
-      long existing = uploadTimestamp.get();
-      if (existing > ts) {
-        return;
-      }
-      if (uploadTimestamp.compareAndSet(existing, ts)) {
-        return;
-      }
-    }
-  }
+  private BatchingUploader uploader;
 
   public Server11RepositorySession(Repository repository) {
     super(repository);
     serverRepository = (Server11Repository) repository;
   }
 
+  public Server11Repository getServerRepository() {
+    return serverRepository;
+  }
+
+  @Override
+  public void setStoreDelegate(RepositorySessionStoreDelegate delegate) {
+    this.delegate = delegate;
+
+    // Now that we have the delegate, we can initialize our uploader.
+    this.uploader = new BatchingUploader(this, storeWorkQueue, delegate);
+  }
+
   private String flattenIDs(String[] guids) {
     // Consider using Utils.toDelimitedString if and when the signature changes
     // to Collection<String> guids.
     if (guids.length == 0) {
       return "";
     }
     if (guids.length == 1) {
       return guids[0];
@@ -309,309 +247,39 @@ public class Server11RepositorySession e
   public void wipe(RepositorySessionWipeDelegate delegate) {
     if (!isActive()) {
       delegate.onWipeFailed(new InactiveSessionException(null));
       return;
     }
     // TODO: implement wipe.
   }
 
-  protected Object recordsBufferMonitor = new Object();
-
-  /**
-   * Data of outbound records.
-   * <p>
-   * We buffer the data (rather than the <code>Record</code>) so that we can
-   * flush the buffer based on outgoing transmission size.
-   * <p>
-   * Access should be synchronized on <code>recordsBufferMonitor</code>.
-   */
-  protected ArrayList<byte[]> recordsBuffer = new ArrayList<byte[]>();
-
-  /**
-   * GUIDs of outbound records.
-   * <p>
-   * Used to fail entire outgoing uploads.
-   * <p>
-   * Access should be synchronized on <code>recordsBufferMonitor</code>.
-   */
-  protected ArrayList<String> recordGuidsBuffer = new ArrayList<String>();
-  protected int byteCount = PER_BATCH_OVERHEAD;
-
   @Override
   public void store(Record record) throws NoStoreDelegateException {
     if (delegate == null) {
       throw new NoStoreDelegateException();
     }
-    this.enqueue(record);
-  }
-
-  /**
-   * Batch incoming records until some reasonable threshold (e.g., 50),
-   * some size limit is hit (probably way less than 3MB!), or storeDone
-   * is received.
-   * @param record
-   */
-  protected void enqueue(Record record) {
-    // JSONify and store the bytes, rather than the record.
-    byte[] json = record.toJSONBytes();
-    int delta   = json.length;
-    synchronized (recordsBufferMonitor) {
-      if ((delta + byteCount     > UPLOAD_BYTE_THRESHOLD) ||
-          (recordsBuffer.size() >= UPLOAD_ITEM_THRESHOLD)) {
 
-        // POST the existing contents, then enqueue.
-        flush();
-      }
-      recordsBuffer.add(json);
-      recordGuidsBuffer.add(record.guid);
-      byteCount += PER_RECORD_OVERHEAD + delta;
+    // If delegate was set, this shouldn't happen.
+    if (uploader == null) {
+      throw new IllegalStateException("Uploader haven't been initialized");
     }
-  }
 
-  // Asynchronously upload records.
-  // Must be locked!
-  protected void flush() {
-    if (recordsBuffer.size() > 0) {
-      final ArrayList<byte[]> outgoing = recordsBuffer;
-      final ArrayList<String> outgoingGuids = recordGuidsBuffer;
-      RepositorySessionStoreDelegate uploadDelegate = this.delegate;
-      storeWorkQueue.execute(new RecordUploadRunnable(uploadDelegate, outgoing, outgoingGuids, byteCount));
-
-      recordsBuffer = new ArrayList<byte[]>();
-      recordGuidsBuffer = new ArrayList<String>();
-      byteCount = PER_BATCH_OVERHEAD;
-    }
+    uploader.process(record);
   }
 
   @Override
   public void storeDone() {
     Logger.debug(LOG_TAG, "storeDone().");
-    synchronized (recordsBufferMonitor) {
-      flush();
-      // Do this in a Runnable so that the timestamp is grabbed after any upload.
-      final Runnable r = new Runnable() {
-        @Override
-        public void run() {
-          synchronized (recordsBufferMonitor) {
-            final long end = uploadTimestamp.get();
-            Logger.debug(LOG_TAG, "Calling storeDone with " + end);
-            storeDone(end);
-          }
-        }
-      };
-      storeWorkQueue.execute(r);
-    }
-  }
 
-  /**
-   * <code>true</code> if a record upload has failed this session.
-   * <p>
-   * This is only set in begin and possibly by <code>RecordUploadRunnable</code>.
-   * Since those are executed serially, we can use an unsynchronized
-   * volatile boolean here.
-   */
-  protected volatile boolean recordUploadFailed;
-
-  @Override
-  public void begin(RepositorySessionBeginDelegate delegate) throws InvalidSessionTransitionException {
-    recordUploadFailed = false;
-    super.begin(delegate);
-  }
-
-  /**
-   * Make an HTTP request, and convert HTTP request delegate callbacks into
-   * store callbacks within the context of this RepositorySession.
-   *
-   * @author rnewman
-   *
-   */
-  protected class RecordUploadRunnable implements Runnable, SyncStorageRequestDelegate {
-
-    public final String LOG_TAG = "RecordUploadRunnable";
-    private final ArrayList<byte[]> outgoing;
-    private ArrayList<String> outgoingGuids;
-    private final long byteCount;
-
-    public RecordUploadRunnable(RepositorySessionStoreDelegate storeDelegate,
-                                ArrayList<byte[]> outgoing,
-                                ArrayList<String> outgoingGuids,
-                                long byteCount) {
-      Logger.debug(LOG_TAG, "Preparing record upload for " +
-                  outgoing.size() + " records (" +
-                  byteCount + " bytes).");
-      this.outgoing = outgoing;
-      this.outgoingGuids = outgoingGuids;
-      this.byteCount = byteCount;
-    }
-
-    @Override
-    public AuthHeaderProvider getAuthHeaderProvider() {
-      return serverRepository.getAuthHeaderProvider();
-    }
-
-    @Override
-    public String ifUnmodifiedSince() {
-      return null;
+    // If delegate was set, this shouldn't happen.
+    if (uploader == null) {
+      throw new IllegalStateException("Uploader haven't been initialized");
     }
 
-    @Override
-    public void handleRequestSuccess(SyncStorageResponse response) {
-      Logger.trace(LOG_TAG, "POST of " + outgoing.size() + " records done.");
-
-      ExtendedJSONObject body;
-      try {
-        body = response.jsonObjectBody(); // jsonObjectBody() throws or returns non-null.
-      } catch (Exception e) {
-        Logger.error(LOG_TAG, "Got exception parsing POST success body.", e);
-        this.handleRequestError(e);
-        return;
-      }
-
-      // Be defensive when logging timestamp.
-      if (body.containsKey("modified")) {
-        Long modified = body.getTimestamp("modified");
-        if (modified != null) {
-          Logger.trace(LOG_TAG, "POST request success. Modified timestamp: " + modified);
-        } else {
-          Logger.warn(LOG_TAG, "POST success body contains malformed 'modified': " + body.toJSONString());
-        }
-      } else {
-        Logger.warn(LOG_TAG, "POST success body does not contain key 'modified': " + body.toJSONString());
-      }
-
-      try {
-        JSONArray          success = body.getArray("success");
-        if ((success != null) &&
-            (success.size() > 0)) {
-          Logger.trace(LOG_TAG, "Successful records: " + success.toString());
-          for (Object o : success) {
-            try {
-              delegate.onRecordStoreSucceeded((String) o);
-            } catch (ClassCastException e) {
-              Logger.error(LOG_TAG, "Got exception parsing POST success guid.", e);
-              // Not much to be done.
-            }
-          }
-
-          long normalizedTimestamp = getNormalizedTimestamp(response);
-          Logger.trace(LOG_TAG, "Passing back upload X-Weave-Timestamp: " + normalizedTimestamp);
-          bumpUploadTimestamp(normalizedTimestamp);
-        }
-        success = null; // Want to GC this ASAP.
-
-        ExtendedJSONObject failed  = body.getObject("failed");
-        if ((failed != null) &&
-            (failed.object.size() > 0)) {
-          Logger.debug(LOG_TAG, "Failed records: " + failed.object.toString());
-          Exception ex = new Server11RecordPostFailedException();
-          for (String guid : failed.keySet()) {
-            delegate.onRecordStoreFailed(ex, guid);
-          }
-        }
-        failed = null; // Want to GC this ASAP.
-      } catch (UnexpectedJSONException e) {
-        Logger.error(LOG_TAG, "Got exception processing success/failed in POST success body.", e);
-        // TODO
-        return;
-      }
-      Logger.debug(LOG_TAG, "POST of " + outgoing.size() + " records handled.");
-    }
-
-    @Override
-    public void handleRequestFailure(SyncStorageResponse response) {
-      // TODO: call session.interpretHTTPFailure.
-      this.handleRequestError(new HTTPFailureException(response));
-    }
-
-    @Override
-    public void handleRequestError(final Exception ex) {
-      Logger.warn(LOG_TAG, "Got request error.", ex);
-
-      recordUploadFailed = true;
-      ArrayList<String> failedOutgoingGuids = outgoingGuids;
-      outgoingGuids = null; // Want to GC this ASAP.
-      for (String guid : failedOutgoingGuids) {
-        delegate.onRecordStoreFailed(ex, guid);
-      }
-      return;
-    }
-
-    public class ByteArraysContentProducer implements ContentProducer {
-
-      ArrayList<byte[]> outgoing;
-      public ByteArraysContentProducer(ArrayList<byte[]> arrays) {
-        outgoing = arrays;
-      }
-
-      @Override
-      public void writeTo(OutputStream outstream) throws IOException {
-        int count = outgoing.size();
-        outstream.write(recordsStart);
-        outstream.write(outgoing.get(0));
-        for (int i = 1; i < count; ++i) {
-          outstream.write(recordSeparator);
-          outstream.write(outgoing.get(i));
-        }
-        outstream.write(recordsEnd);
-      }
-    }
-
-    public class ByteArraysEntity extends EntityTemplate {
-      private final long count;
-      public ByteArraysEntity(ArrayList<byte[]> arrays, long totalBytes) {
-        super(new ByteArraysContentProducer(arrays));
-        this.count = totalBytes;
-        this.setContentType("application/json");
-        // charset is set in BaseResource.
-      }
-
-      @Override
-      public long getContentLength() {
-        return count;
-      }
-
-      @Override
-      public boolean isRepeatable() {
-        return true;
-      }
-    }
-
-    public ByteArraysEntity getBodyEntity() {
-      ByteArraysEntity body = new ByteArraysEntity(outgoing, byteCount);
-      return body;
-    }
-
-    @Override
-    public void run() {
-      if (recordUploadFailed) {
-        Logger.info(LOG_TAG, "Previous record upload failed.  Failing all records and not retrying.");
-        Exception ex = new Server11PreviousPostFailedException();
-        for (String guid : outgoingGuids) {
-          delegate.onRecordStoreFailed(ex, guid);
-        }
-        return;
-      }
-
-      if (outgoing == null ||
-          outgoing.size() == 0) {
-        Logger.debug(LOG_TAG, "No items: RecordUploadRunnable returning immediately.");
-        return;
-      }
-
-      URI u = serverRepository.collectionURI();
-      SyncStorageRequest request = new SyncStorageRequest(u);
-
-      request.delegate = this;
-
-      // We don't want the task queue to proceed until this request completes.
-      // Fortunately, BaseResource is currently synchronous.
-      // If that ever changes, you'll need to block here.
-      ByteArraysEntity body = getBodyEntity();
-      request.post(body);
-    }
+    uploader.noMoreRecordsToUpload();
   }
 
   @Override
   public boolean dataAvailable() {
     return serverRepository.updateNeeded(getLastSyncTimestamp());
   }
 }
new file mode 100644
--- /dev/null
+++ b/mobile/android/services/src/main/java/org/mozilla/gecko/sync/repositories/uploaders/BatchMeta.java
@@ -0,0 +1,165 @@
+/* This Source Code Form is subject to the terms of the Mozilla Public
+ * License, v. 2.0. If a copy of the MPL was not distributed with this
+ * file, You can obtain one at http://mozilla.org/MPL/2.0/. */
+
+package org.mozilla.gecko.sync.repositories.uploaders;
+
+import android.support.annotation.CheckResult;
+import android.support.annotation.NonNull;
+import android.support.annotation.Nullable;
+
+import org.mozilla.gecko.background.common.log.Logger;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.mozilla.gecko.sync.repositories.uploaders.BatchingUploader.TokenModifiedException;
+import org.mozilla.gecko.sync.repositories.uploaders.BatchingUploader.LastModifiedChangedUnexpectedly;
+import org.mozilla.gecko.sync.repositories.uploaders.BatchingUploader.LastModifiedDidNotChange;
+
+/**
+ * Keeps track of token, Last-Modified value and GUIDs of succeeded records.
+ */
+/* @ThreadSafe */
+public class BatchMeta extends BufferSizeTracker {
+    private static final String LOG_TAG = "BatchMeta";
+
+    // Will be set once first payload upload succeeds. We don't expect this to change until we
+    // commit the batch, and which point it must change.
+    /* @GuardedBy("this") */ private Long lastModified;
+
+    // Will be set once first payload upload succeeds. We don't expect this to ever change until
+    // a commit succeeds, at which point this gets set to null.
+    /* @GuardedBy("this") */ private String token;
+
+    /* @GuardedBy("accessLock") */ private boolean isUnlimited = false;
+
+    // Accessed by synchronously running threads.
+    /* @GuardedBy("accessLock") */ private final List<String> successRecordGuids = new ArrayList<>();
+
+    /* @GuardedBy("accessLock") */ private boolean needsCommit = false;
+
+    protected final Long collectionLastModified;
+
+    public BatchMeta(@NonNull Object payloadLock, long maxBytes, long maxRecords, @Nullable Long collectionLastModified) {
+        super(payloadLock, maxBytes, maxRecords);
+        this.collectionLastModified = collectionLastModified;
+    }
+
+    protected void setIsUnlimited(boolean isUnlimited) {
+        synchronized (accessLock) {
+            this.isUnlimited = isUnlimited;
+        }
+    }
+
+    @Override
+    protected boolean canFit(long recordDeltaByteCount) {
+        synchronized (accessLock) {
+            return isUnlimited || super.canFit(recordDeltaByteCount);
+        }
+    }
+
+    @Override
+    @CheckResult
+    protected boolean addAndEstimateIfFull(long recordDeltaByteCount) {
+        synchronized (accessLock) {
+            needsCommit = true;
+            boolean isFull = super.addAndEstimateIfFull(recordDeltaByteCount);
+            return !isUnlimited && isFull;
+        }
+    }
+
+    protected boolean needToCommit() {
+        synchronized (accessLock) {
+            return needsCommit;
+        }
+    }
+
+    protected synchronized String getToken() {
+        return token;
+    }
+
+    protected synchronized void setToken(final String newToken, boolean isCommit) throws TokenModifiedException {
+        // Set token once in a batching mode.
+        // In a non-batching mode, this.token and newToken will be null, and this is a no-op.
+        if (token == null) {
+            token = newToken;
+            return;
+        }
+
+        // Sanity checks.
+        if (isCommit) {
+            // We expect token to be null when commit payload succeeds.
+            if (newToken != null) {
+                throw new TokenModifiedException();
+            } else {
+                token = null;
+            }
+            return;
+        }
+
+        // We expect new token to always equal current token for non-commit payloads.
+        if (!token.equals(newToken)) {
+            throw new TokenModifiedException();
+        }
+    }
+
+    protected synchronized Long getLastModified() {
+        if (lastModified == null) {
+            return collectionLastModified;
+        }
+        return lastModified;
+    }
+
+    protected synchronized void setLastModified(final Long newLastModified, final boolean expectedToChange) throws LastModifiedChangedUnexpectedly, LastModifiedDidNotChange {
+        if (lastModified == null) {
+            lastModified = newLastModified;
+            return;
+        }
+
+        if (!expectedToChange && !lastModified.equals(newLastModified)) {
+            Logger.debug(LOG_TAG, "Last-Modified timestamp changed when we didn't expect it");
+            throw new LastModifiedChangedUnexpectedly();
+
+        } else if (expectedToChange && lastModified.equals(newLastModified)) {
+            Logger.debug(LOG_TAG, "Last-Modified timestamp did not change when we expected it to");
+            throw new LastModifiedDidNotChange();
+
+        } else {
+            lastModified = newLastModified;
+        }
+    }
+
+    protected ArrayList<String> getSuccessRecordGuids() {
+        synchronized (accessLock) {
+            return new ArrayList<>(this.successRecordGuids);
+        }
+    }
+
+    protected void recordSucceeded(final String recordGuid) {
+        // Sanity check.
+        if (recordGuid == null) {
+            throw new IllegalStateException();
+        }
+
+        synchronized (accessLock) {
+            successRecordGuids.add(recordGuid);
+        }
+    }
+
+    @Override
+    protected boolean canFitRecordByteDelta(long byteDelta, long recordCount, long byteCount) {
+        return isUnlimited || super.canFitRecordByteDelta(byteDelta, recordCount, byteCount);
+    }
+
+    @Override
+    protected void reset() {
+        synchronized (accessLock) {
+            super.reset();
+            token = null;
+            lastModified = null;
+            successRecordGuids.clear();
+            needsCommit = false;
+        }
+    }
+}
\ No newline at end of file
new file mode 100644
--- /dev/null
+++ b/mobile/android/services/src/main/java/org/mozilla/gecko/sync/repositories/uploaders/BatchingUploader.java
@@ -0,0 +1,344 @@
+/* This Source Code Form is subject to the terms of the Mozilla Public
+ * License, v. 2.0. If a copy of the MPL was not distributed with this
+ * file, You can obtain one at http://mozilla.org/MPL/2.0/. */
+
+package org.mozilla.gecko.sync.repositories.uploaders;
+
+import android.net.Uri;
+import android.support.annotation.VisibleForTesting;
+
+import org.mozilla.gecko.background.common.log.Logger;
+import org.mozilla.gecko.sync.InfoConfiguration;
+import org.mozilla.gecko.sync.Server11RecordPostFailedException;
+import org.mozilla.gecko.sync.net.SyncResponse;
+import org.mozilla.gecko.sync.net.SyncStorageResponse;
+import org.mozilla.gecko.sync.repositories.Server11RepositorySession;
+import org.mozilla.gecko.sync.repositories.delegates.RepositorySessionStoreDelegate;
+import org.mozilla.gecko.sync.repositories.domain.Record;
+
+import java.util.ArrayList;
+import java.util.concurrent.Executor;
+import java.util.concurrent.atomic.AtomicLong;
+
+/**
+ * Uploader which implements batching introduced in Sync 1.5.
+ *
+ * Batch vs payload terminology:
+ * - batch is comprised of a series of payloads, which are all committed at the same time.
+ * -- identified via a "batch token", which is returned after first payload for the batch has been uploaded.
+ * - payload is a collection of records which are uploaded together. Associated with a batch.
+ * -- last payload, identified via commit=true, commits the batch.
+ *
+ * Limits for how many records can fit into a payload and into a batch are defined in the passed-in
+ * InfoConfiguration object.
+ *
+ * If we can't fit everything we'd like to upload into one batch (according to max-total-* limits),
+ * then we commit that batch, and start a new one. There are no explicit limits on total number of
+ * batches we might use, although at some point we'll start to run into storage limit errors from the API.
+ *
+ * Once we go past using one batch this uploader is no longer "atomic". Partial state is exposed
+ * to other clients after our first batch is committed and before our last batch is committed.
+ * However, our per-batch limits are high, X-I-U-S mechanics help protect downloading clients
+ * (as long as they implement X-I-U-S) with 412 error codes in case of interleaving upload and download,
+ * and most mobile clients will not be uploading large-enough amounts of data (especially structured
+ * data, such as bookmarks).
+ *
+ * Last-Modified header returned with the first batch payload POST success is maintained for a batch,
+ * to guard against concurrent-modification errors (different uploader commits before we're done).
+ *
+ * Non-batching mode notes:
+ * We also support Sync servers which don't enable batching for uploads. In this case, we respect
+ * payload limits for individual uploads, and every upload is considered a commit. Batching limits
+ * do not apply, and batch token is irrelevant.
+ * We do keep track of Last-Modified and send along X-I-U-S with our uploads, to protect against
+ * concurrent modifications by other clients.
+ */
+public class BatchingUploader {
+    private static final String LOG_TAG = "BatchingUploader";
+
+    private final Uri collectionUri;
+
+    private volatile boolean recordUploadFailed = false;
+
+    private final BatchMeta batchMeta;
+    private final Payload payload;
+
+    // Accessed by synchronously running threads, OK to not synchronize and just make it volatile.
+    private volatile Boolean inBatchingMode;
+
+    // Used to ensure we have thread-safe access to the following:
+    // - byte and record counts in both Payload and BatchMeta objects
+    // - buffers in the Payload object
+    private final Object payloadLock = new Object();
+
+    protected Executor workQueue;
+    protected final RepositorySessionStoreDelegate sessionStoreDelegate;
+    protected final Server11RepositorySession repositorySession;
+
+    protected AtomicLong uploadTimestamp = new AtomicLong(0);
+
+    protected static final int PER_RECORD_OVERHEAD_BYTE_COUNT = RecordUploadRunnable.RECORD_SEPARATOR.length;
+    protected static final int PER_PAYLOAD_OVERHEAD_BYTE_COUNT = RecordUploadRunnable.RECORDS_END.length;
+
+    // Sanity check. RECORD_SEPARATOR and RECORD_START are assumed to be of the same length.
+    static {
+        if (RecordUploadRunnable.RECORD_SEPARATOR.length != RecordUploadRunnable.RECORDS_START.length) {
+            throw new IllegalStateException("Separator and start tokens must be of the same length");
+        }
+    }
+
+    public BatchingUploader(final Server11RepositorySession repositorySession, final Executor workQueue, final RepositorySessionStoreDelegate sessionStoreDelegate) {
+        this.repositorySession = repositorySession;
+        this.workQueue = workQueue;
+        this.sessionStoreDelegate = sessionStoreDelegate;
+        this.collectionUri = Uri.parse(repositorySession.getServerRepository().collectionURI().toString());
+
+        InfoConfiguration config = repositorySession.getServerRepository().getInfoConfiguration();
+        this.batchMeta = new BatchMeta(
+                payloadLock, config.maxTotalBytes, config.maxTotalRecords,
+                repositorySession.getServerRepository().getCollectionLastModified()
+        );
+        this.payload = new Payload(payloadLock, config.maxPostBytes, config.maxPostRecords);
+    }
+
+    public void process(final Record record) {
+        final String guid = record.guid;
+        final byte[] recordBytes = record.toJSONBytes();
+        final long recordDeltaByteCount = recordBytes.length + PER_RECORD_OVERHEAD_BYTE_COUNT;
+
+        Logger.debug(LOG_TAG, "Processing a record with guid: " + guid);
+
+        // We can't upload individual records which exceed our payload byte limit.
+        if ((recordDeltaByteCount + PER_PAYLOAD_OVERHEAD_BYTE_COUNT) > payload.maxBytes) {
+            sessionStoreDelegate.onRecordStoreFailed(new RecordTooLargeToUpload(), guid);
+            return;
+        }
+
+        synchronized (payloadLock) {
+            final boolean canFitRecordIntoBatch = batchMeta.canFit(recordDeltaByteCount);
+            final boolean canFitRecordIntoPayload = payload.canFit(recordDeltaByteCount);
+
+            // Record fits!
+            if (canFitRecordIntoBatch && canFitRecordIntoPayload) {
+                Logger.debug(LOG_TAG, "Record fits into the current batch and payload");
+                addAndFlushIfNecessary(recordDeltaByteCount, recordBytes, guid);
+
+            // Payload won't fit the record.
+            } else if (canFitRecordIntoBatch) {
+                Logger.debug(LOG_TAG, "Current payload won't fit incoming record, uploading payload.");
+                flush(false, false);
+
+                Logger.debug(LOG_TAG, "Recording the incoming record into a new payload");
+
+                // Keep track of the overflow record.
+                addAndFlushIfNecessary(recordDeltaByteCount, recordBytes, guid);
+
+            // Batch won't fit the record.
+            } else {
+                Logger.debug(LOG_TAG, "Current batch won't fit incoming record, committing batch.");
+                flush(true, false);
+
+                Logger.debug(LOG_TAG, "Recording the incoming record into a new batch");
+                batchMeta.reset();
+
+                // Keep track of the overflow record.
+                addAndFlushIfNecessary(recordDeltaByteCount, recordBytes, guid);
+            }
+        }
+    }
+
+    // Convenience function used from the process method; caller must hold a payloadLock.
+    private void addAndFlushIfNecessary(long byteCount, byte[] recordBytes, String guid) {
+        boolean isPayloadFull = payload.addAndEstimateIfFull(byteCount, recordBytes, guid);
+        boolean isBatchFull = batchMeta.addAndEstimateIfFull(byteCount);
+
+        // Preemptive commit batch or upload a payload if they're estimated to be full.
+        if (isBatchFull) {
+            flush(true, false);
+            batchMeta.reset();
+        } else if (isPayloadFull) {
+            flush(false, false);
+        }
+    }
+
+    public void noMoreRecordsToUpload() {
+        Logger.debug(LOG_TAG, "Received 'no more records to upload' signal.");
+
+        // Run this after the last payload succeeds, so that we know for sure if we're in a batching
+        // mode and need to commit with a potentially empty payload.
+        workQueue.execute(new Runnable() {
+            @Override
+            public void run() {
+                commitIfNecessaryAfterLastPayload();
+            }
+        });
+    }
+
+    @VisibleForTesting
+    protected void commitIfNecessaryAfterLastPayload() {
+        // Must be called after last payload upload finishes.
+        synchronized (payload) {
+            // If we have any pending records in the Payload, flush them!
+            if (!payload.isEmpty()) {
+                flush(true, true);
+
+            // If we have an empty payload but need to commit the batch in the batching mode, flush!
+            } else if (batchMeta.needToCommit() && Boolean.TRUE.equals(inBatchingMode)) {
+                flush(true, true);
+
+            // Otherwise, we're done.
+            } else {
+                finished(uploadTimestamp);
+            }
+        }
+    }
+
+    /**
+     * We've been told by our upload delegate that a payload succeeded.
+     * Depending on the type of payload and batch mode status, inform our delegate of progress.
+     *
+     * @param response success response to our commit post
+     * @param isCommit was this a commit upload?
+     * @param isLastPayload was this a very last payload we'll upload?
+     */
+    public void payloadSucceeded(final SyncStorageResponse response, final boolean isCommit, final boolean isLastPayload) {
+        // Sanity check.
+        if (inBatchingMode == null) {
+            throw new IllegalStateException("Can't process payload success until we know if we're in a batching mode");
+        }
+
+        // We consider records to have been committed if we're not in a batching mode or this was a commit.
+        // If records have been committed, notify our store delegate.
+        if (!inBatchingMode || isCommit) {
+            for (String guid : batchMeta.getSuccessRecordGuids()) {
+                sessionStoreDelegate.onRecordStoreSucceeded(guid);
+            }
+        }
+
+        // If this was our very last commit, we're done storing records.
+        // Get Last-Modified timestamp from the response, and pass it upstream.
+        if (isLastPayload) {
+            finished(response.normalizedTimestampForHeader(SyncResponse.X_LAST_MODIFIED));
+        }
+    }
+
+    public void lastPayloadFailed() {
+        finished(uploadTimestamp);
+    }
+
+    private void finished(long lastModifiedTimestamp) {
+        bumpTimestampTo(uploadTimestamp, lastModifiedTimestamp);
+        finished(uploadTimestamp);
+    }
+
+    private void finished(AtomicLong lastModifiedTimestamp) {
+        repositorySession.storeDone(lastModifiedTimestamp.get());
+    }
+
+    public BatchMeta getCurrentBatch() {
+        return batchMeta;
+    }
+
+    public void setInBatchingMode(boolean inBatchingMode) {
+        this.inBatchingMode = inBatchingMode;
+
+        // If we know for sure that we're not in a batching mode,
+        // consider our batch to be of unlimited size.
+        this.batchMeta.setIsUnlimited(!inBatchingMode);
+    }
+
+    public Boolean getInBatchingMode() {
+        return inBatchingMode;
+    }
+
+    public void setLastModified(final Long lastModified, final boolean isCommit) throws BatchingUploaderException {
+        // Sanity check.
+        if (inBatchingMode == null) {
+            throw new IllegalStateException("Can't process Last-Modified before we know we're in a batching mode.");
+        }
+
+        // In non-batching mode, every time we receive a Last-Modified timestamp, we expect it to change
+        // since records are "committed" (become visible to other clients) on every payload.
+        // In batching mode, we only expect Last-Modified to change when we commit a batch.
+        batchMeta.setLastModified(lastModified, isCommit || !inBatchingMode);
+    }
+
+    public void recordSucceeded(final String recordGuid) {
+        Logger.debug(LOG_TAG, "Record store succeeded: " + recordGuid);
+        batchMeta.recordSucceeded(recordGuid);
+    }
+
+    public void recordFailed(final String recordGuid) {
+        recordFailed(new Server11RecordPostFailedException(), recordGuid);
+    }
+
+    public void recordFailed(final Exception e, final String recordGuid) {
+        Logger.debug(LOG_TAG, "Record store failed for guid " + recordGuid + " with exception: " + e.toString());
+        recordUploadFailed = true;
+        sessionStoreDelegate.onRecordStoreFailed(e, recordGuid);
+    }
+
+    public Server11RepositorySession getRepositorySession() {
+        return repositorySession;
+    }
+
+    private static void bumpTimestampTo(final AtomicLong current, long newValue) {
+        while (true) {
+            long existing = current.get();
+            if (existing > newValue) {
+                return;
+            }
+            if (current.compareAndSet(existing, newValue)) {
+                return;
+            }
+        }
+    }
+
+    private void flush(final boolean isCommit, final boolean isLastPayload) {
+        final ArrayList<byte[]> outgoing;
+        final ArrayList<String> outgoingGuids;
+        final long byteCount;
+
+        // Even though payload object itself is thread-safe, we want to ensure we get these altogether
+        // as a "unit". Another approach would be to create a wrapper object for these values, but this works.
+        synchronized (payloadLock) {
+            outgoing = payload.getRecordsBuffer();
+            outgoingGuids = payload.getRecordGuidsBuffer();
+            byteCount = payload.getByteCount();
+        }
+
+        workQueue.execute(new RecordUploadRunnable(
+                new BatchingAtomicUploaderMayUploadProvider(),
+                collectionUri,
+                batchMeta,
+                new PayloadUploadDelegate(this, outgoingGuids, isCommit, isLastPayload),
+                outgoing,
+                byteCount,
+                isCommit
+        ));
+
+        payload.reset();
+    }
+
+    private class BatchingAtomicUploaderMayUploadProvider implements MayUploadProvider {
+        public boolean mayUpload() {
+            return !recordUploadFailed;
+        }
+    }
+
+    public static class BatchingUploaderException extends Exception {
+        private static final long serialVersionUID = 1L;
+    }
+    public static class RecordTooLargeToUpload extends BatchingUploaderException {
+        private static final long serialVersionUID = 1L;
+    }
+    public static class LastModifiedDidNotChange extends BatchingUploaderException {
+        private static final long serialVersionUID = 1L;
+    }
+    public static class LastModifiedChangedUnexpectedly extends BatchingUploaderException {
+        private static final long serialVersionUID = 1L;
+    }
+    public static class TokenModifiedException extends BatchingUploaderException {
+        private static final long serialVersionUID = 1L;
+    };
+}
\ No newline at end of file
new file mode 100644
--- /dev/null
+++ b/mobile/android/services/src/main/java/org/mozilla/gecko/sync/repositories/uploaders/BufferSizeTracker.java
@@ -0,0 +1,103 @@
+/* This Source Code Form is subject to the terms of the Mozilla Public
+ * License, v. 2.0. If a copy of the MPL was not distributed with this
+ * file, You can obtain one at http://mozilla.org/MPL/2.0/. */
+
+package org.mozilla.gecko.sync.repositories.uploaders;
+
+import android.support.annotation.CallSuper;
+import android.support.annotation.CheckResult;
+
+/**
+ * Implements functionality shared by BatchMeta and Payload objects, namely:
+ * - keeping track of byte and record counts
+ * - incrementing those counts when records are added
+ * - checking if a record can fit
+ */
+/* @ThreadSafe */
+public abstract class BufferSizeTracker {
+    protected final Object accessLock;
+
+    /* @GuardedBy("accessLock") */ private long byteCount = BatchingUploader.PER_PAYLOAD_OVERHEAD_BYTE_COUNT;
+    /* @GuardedBy("accessLock") */ private long recordCount = 0;
+    /* @GuardedBy("accessLock") */ protected Long smallestRecordByteCount;
+
+    protected final long maxBytes;
+    protected final long maxRecords;
+
+    public BufferSizeTracker(Object accessLock, long maxBytes, long maxRecords) {
+        this.accessLock = accessLock;
+        this.maxBytes = maxBytes;
+        this.maxRecords = maxRecords;
+    }
+
+    @CallSuper
+    protected boolean canFit(long recordDeltaByteCount) {
+        synchronized (accessLock) {
+            return canFitRecordByteDelta(recordDeltaByteCount, recordCount, byteCount);
+        }
+    }
+
+    protected boolean isEmpty() {
+        synchronized (accessLock) {
+            return recordCount == 0;
+        }
+    }
+
+    /**
+     * Adds a record and returns a boolean indicating whether batch is estimated to be full afterwards.
+     */
+    @CheckResult
+    protected boolean addAndEstimateIfFull(long recordDeltaByteCount) {
+        synchronized (accessLock) {
+            // Sanity check. Calling this method when buffer won't fit the record is an error.
+            if (!canFitRecordByteDelta(recordDeltaByteCount, recordCount, byteCount)) {
+                throw new IllegalStateException("Buffer size exceeded");
+            }
+
+            byteCount += recordDeltaByteCount;
+            recordCount += 1;
+
+            if (smallestRecordByteCount == null || smallestRecordByteCount > recordDeltaByteCount) {
+                smallestRecordByteCount = recordDeltaByteCount;
+            }
+
+            // See if we're full or nearly full after adding a record.
+            // We're halving smallestRecordByteCount because we're erring
+            // on the side of "can hopefully fit". We're trying to upload as soon as we know we
+            // should, but we also need to be mindful of minimizing total number of uploads we make.
+            return !canFitRecordByteDelta(smallestRecordByteCount / 2, recordCount, byteCount);
+        }
+    }
+
+    protected long getByteCount() {
+        synchronized (accessLock) {
+            // Ensure we account for payload overhead twice when the batch is empty.
+            // Payload overhead is either RECORDS_START ("[") or RECORDS_END ("]"),
+            // and for an empty payload we need account for both ("[]").
+            if (recordCount == 0) {
+                return byteCount + BatchingUploader.PER_PAYLOAD_OVERHEAD_BYTE_COUNT;
+            }
+            return byteCount;
+        }
+    }
+
+    protected long getRecordCount() {
+        synchronized (accessLock) {
+            return recordCount;
+        }
+    }
+
+    @CallSuper
+    protected void reset() {
+        synchronized (accessLock) {
+            byteCount = BatchingUploader.PER_PAYLOAD_OVERHEAD_BYTE_COUNT;
+            recordCount = 0;
+        }
+    }
+
+    @CallSuper
+    protected boolean canFitRecordByteDelta(long byteDelta, long recordCount, long byteCount) {
+        return recordCount < maxRecords
+                && (byteCount + byteDelta) <= maxBytes;
+    }
+}
\ No newline at end of file
new file mode 100644
--- /dev/null
+++ b/mobile/android/services/src/main/java/org/mozilla/gecko/sync/repositories/uploaders/MayUploadProvider.java
@@ -0,0 +1,9 @@
+/* This Source Code Form is subject to the terms of the Mozilla Public
+ * License, v. 2.0. If a copy of the MPL was not distributed with this
+ * file, You can obtain one at http://mozilla.org/MPL/2.0/. */
+
+package org.mozilla.gecko.sync.repositories.uploaders;
+
+public interface MayUploadProvider {
+    boolean mayUpload();
+}
new file mode 100644
--- /dev/null
+++ b/mobile/android/services/src/main/java/org/mozilla/gecko/sync/repositories/uploaders/Payload.java
@@ -0,0 +1,66 @@
+/* This Source Code Form is subject to the terms of the Mozilla Public
+ * License, v. 2.0. If a copy of the MPL was not distributed with this
+ * file, You can obtain one at http://mozilla.org/MPL/2.0/. */
+
+package org.mozilla.gecko.sync.repositories.uploaders;
+
+import android.support.annotation.CheckResult;
+
+import java.util.ArrayList;
+
+/**
+ * Owns per-payload record byte and recordGuid buffers.
+ */
+/* @ThreadSafe */
+public class Payload extends BufferSizeTracker {
+    // Data of outbound records.
+    /* @GuardedBy("accessLock") */ private final ArrayList<byte[]> recordsBuffer = new ArrayList<>();
+
+    // GUIDs of outbound records. Used to fail entire payloads.
+    /* @GuardedBy("accessLock") */ private final ArrayList<String> recordGuidsBuffer = new ArrayList<>();
+
+    public Payload(Object payloadLock, long maxBytes, long maxRecords) {
+        super(payloadLock, maxBytes, maxRecords);
+    }
+
+    @Override
+    protected boolean addAndEstimateIfFull(long recordDelta) {
+        throw new UnsupportedOperationException();
+    }
+
+    @CheckResult
+    protected boolean addAndEstimateIfFull(long recordDelta, byte[] recordBytes, String guid) {
+        synchronized (accessLock) {
+            recordsBuffer.add(recordBytes);
+            recordGuidsBuffer.add(guid);
+            return super.addAndEstimateIfFull(recordDelta);
+        }
+    }
+
+    @Override
+    protected void reset() {
+        synchronized (accessLock) {
+            super.reset();
+            recordsBuffer.clear();
+            recordGuidsBuffer.clear();
+        }
+    }
+
+    protected ArrayList<byte[]> getRecordsBuffer() {
+        synchronized (accessLock) {
+            return new ArrayList<>(recordsBuffer);
+        }
+    }
+
+    protected ArrayList<String> getRecordGuidsBuffer() {
+        synchronized (accessLock) {
+            return new ArrayList<>(recordGuidsBuffer);
+        }
+    }
+
+    protected boolean isEmpty() {
+        synchronized (accessLock) {
+            return recordsBuffer.isEmpty();
+        }
+    }
+}
\ No newline at end of file
new file mode 100644
--- /dev/null
+++ b/mobile/android/services/src/main/java/org/mozilla/gecko/sync/repositories/uploaders/PayloadUploadDelegate.java
@@ -0,0 +1,185 @@
+/* This Source Code Form is subject to the terms of the Mozilla Public
+ * License, v. 2.0. If a copy of the MPL was not distributed with this
+ * file, You can obtain one at http://mozilla.org/MPL/2.0/. */
+
+package org.mozilla.gecko.sync.repositories.uploaders;
+
+import org.json.simple.JSONArray;
+import org.mozilla.gecko.background.common.log.Logger;
+import org.mozilla.gecko.sync.ExtendedJSONObject;
+import org.mozilla.gecko.sync.HTTPFailureException;
+import org.mozilla.gecko.sync.NonArrayJSONException;
+import org.mozilla.gecko.sync.NonObjectJSONException;
+import org.mozilla.gecko.sync.Utils;
+import org.mozilla.gecko.sync.net.AuthHeaderProvider;
+import org.mozilla.gecko.sync.net.SyncResponse;
+import org.mozilla.gecko.sync.net.SyncStorageRequestDelegate;
+import org.mozilla.gecko.sync.net.SyncStorageResponse;
+
+import java.util.ArrayList;
+
+public class PayloadUploadDelegate implements SyncStorageRequestDelegate {
+    private static final String LOG_TAG = "PayloadUploadDelegate";
+
+    private static final String KEY_BATCH = "batch";
+
+    private final BatchingUploader uploader;
+    private ArrayList<String> postedRecordGuids;
+    private final boolean isCommit;
+    private final boolean isLastPayload;
+
+    public PayloadUploadDelegate(BatchingUploader uploader, ArrayList<String> postedRecordGuids, boolean isCommit, boolean isLastPayload) {
+        this.uploader = uploader;
+        this.postedRecordGuids = postedRecordGuids;
+        this.isCommit = isCommit;
+        this.isLastPayload = isLastPayload;
+    }
+
+    @Override
+    public AuthHeaderProvider getAuthHeaderProvider() {
+        return uploader.getRepositorySession().getServerRepository().getAuthHeaderProvider();
+    }
+
+    @Override
+    public String ifUnmodifiedSince() {
+        final Long lastModified = uploader.getCurrentBatch().getLastModified();
+        if (lastModified == null) {
+            return null;
+        }
+        return Utils.millisecondsToDecimalSecondsString(lastModified);
+    }
+
+    @Override
+    public void handleRequestSuccess(final SyncStorageResponse response) {
+        // First, do some sanity checking.
+        if (response.getStatusCode() != 200 && response.getStatusCode() != 202) {
+            handleRequestError(
+                new IllegalStateException("handleRequestSuccess received a non-200/202 response: " + response.getStatusCode())
+            );
+            return;
+        }
+
+        // We always expect to see a Last-Modified header. It's returned with every success response.
+        if (!response.httpResponse().containsHeader(SyncResponse.X_LAST_MODIFIED)) {
+            handleRequestError(
+                    new IllegalStateException("Response did not have a Last-Modified header")
+            );
+            return;
+        }
+
+        // We expect to be able to parse the response as a JSON object.
+        final ExtendedJSONObject body;
+        try {
+            body = response.jsonObjectBody(); // jsonObjectBody() throws or returns non-null.
+        } catch (Exception e) {
+            Logger.error(LOG_TAG, "Got exception parsing POST success body.", e);
+            this.handleRequestError(e);
+            return;
+        }
+
+        // If we got a 200, it could be either a non-batching result, or a batch commit.
+        // - if we're in a batching mode, we expect this to be a commit.
+        // If we got a 202, we expect there to be a token present in the response
+        if (response.getStatusCode() == 200 && uploader.getCurrentBatch().getToken() != null) {
+            if (uploader.getInBatchingMode() && !isCommit) {
+                handleRequestError(
+                        new IllegalStateException("Got 200 OK in batching mode, but this was not a commit payload")
+                );
+                return;
+            }
+        } else if (response.getStatusCode() == 202) {
+            if (!body.containsKey(KEY_BATCH)) {
+                handleRequestError(
+                        new IllegalStateException("Batch response did not have a batch ID")
+                );
+                return;
+            }
+        }
+
+        // With sanity checks out of the way, can now safely say if we're in a batching mode or not.
+        // We only do this once per session.
+        if (uploader.getInBatchingMode() == null) {
+            uploader.setInBatchingMode(body.containsKey(KEY_BATCH));
+        }
+
+        // Tell current batch about the token we've received.
+        // Throws if token changed after being set once, or if we got a non-null token after a commit.
+        try {
+            uploader.getCurrentBatch().setToken(body.getString(KEY_BATCH), isCommit);
+        } catch (BatchingUploader.BatchingUploaderException e) {
+            handleRequestError(e);
+            return;
+        }
+
+        // Will throw if Last-Modified changed when it shouldn't have.
+        try {
+            uploader.setLastModified(
+                    response.normalizedTimestampForHeader(SyncResponse.X_LAST_MODIFIED),
+                    isCommit);
+        } catch (BatchingUploader.BatchingUploaderException e) {
+            handleRequestError(e);
+            return;
+        }
+
+        // All looks good up to this point, let's process success and failed arrays.
+        JSONArray success;
+        try {
+            success = body.getArray("success");
+        } catch (NonArrayJSONException e) {
+            handleRequestError(e);
+            return;
+        }
+
+        if (success != null && !success.isEmpty()) {
+            Logger.trace(LOG_TAG, "Successful records: " + success.toString());
+            for (Object o : success) {
+                try {
+                    uploader.recordSucceeded((String) o);
+                } catch (ClassCastException e) {
+                    Logger.error(LOG_TAG, "Got exception parsing POST success guid.", e);
+                    // Not much to be done.
+                }
+            }
+        }
+        // GC
+        success = null;
+
+        ExtendedJSONObject failed;
+        try {
+            failed = body.getObject("failed");
+        } catch (NonObjectJSONException e) {
+            handleRequestError(e);
+            return;
+        }
+
+        if (failed != null && !failed.object.isEmpty()) {
+            Logger.debug(LOG_TAG, "Failed records: " + failed.object.toString());
+            for (String guid : failed.keySet()) {
+                uploader.recordFailed(guid);
+            }
+        }
+        // GC
+        failed = null;
+
+        // And we're done! Let uploader finish up.
+        uploader.payloadSucceeded(response, isCommit, isLastPayload);
+    }
+
+    @Override
+    public void handleRequestFailure(final SyncStorageResponse response) {
+        this.handleRequestError(new HTTPFailureException(response));
+    }
+
+    @Override
+    public void handleRequestError(Exception e) {
+        for (String guid : postedRecordGuids) {
+            uploader.recordFailed(e, guid);
+        }
+        // GC
+        postedRecordGuids = null;
+
+        if (isLastPayload) {
+            uploader.lastPayloadFailed();
+        }
+    }
+}
\ No newline at end of file
new file mode 100644
--- /dev/null
+++ b/mobile/android/services/src/main/java/org/mozilla/gecko/sync/repositories/uploaders/RecordUploadRunnable.java
@@ -0,0 +1,177 @@
+/* This Source Code Form is subject to the terms of the Mozilla Public
+ * License, v. 2.0. If a copy of the MPL was not distributed with this
+ * file, You can obtain one at http://mozilla.org/MPL/2.0/. */
+
+package org.mozilla.gecko.sync.repositories.uploaders;
+
+import android.net.Uri;
+import android.support.annotation.VisibleForTesting;
+
+import org.mozilla.gecko.background.common.log.Logger;
+import org.mozilla.gecko.sync.Server11PreviousPostFailedException;
+import org.mozilla.gecko.sync.net.SyncStorageRequest;
+import org.mozilla.gecko.sync.net.SyncStorageRequestDelegate;
+
+import java.io.IOException;
+import java.io.OutputStream;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+
+import ch.boye.httpclientandroidlib.entity.ContentProducer;
+import ch.boye.httpclientandroidlib.entity.EntityTemplate;
+
+/**
+ * Responsible for creating and posting a <code>SyncStorageRequest</code> request object.
+ */
+public class RecordUploadRunnable implements Runnable {
+    public final String LOG_TAG = "RecordUploadRunnable";
+
+    public final static byte[] RECORDS_START = "[".getBytes(StandardCharsets.UTF_8);
+    public final static byte[] RECORD_SEPARATOR = ",".getBytes(StandardCharsets.UTF_8);
+    public final static byte[] RECORDS_END = "]".getBytes(StandardCharsets.UTF_8);
+
+    private static final String QUERY_PARAM_BATCH = "batch";
+    private static final String QUERY_PARAM_TRUE = "true";
+    private static final String QUERY_PARAM_BATCH_COMMIT = "commit";
+
+    private final MayUploadProvider mayUploadProvider;
+    private final SyncStorageRequestDelegate uploadDelegate;
+
+    private final ArrayList<byte[]> outgoing;
+    private final long byteCount;
+
+    // Used to construct POST URI during run().
+    @VisibleForTesting
+    public final boolean isCommit;
+    private final Uri collectionUri;
+    private final BatchMeta batchMeta;
+
+    public RecordUploadRunnable(MayUploadProvider mayUploadProvider,
+                                Uri collectionUri,
+                                BatchMeta batchMeta,
+                                SyncStorageRequestDelegate uploadDelegate,
+                                ArrayList<byte[]> outgoing,
+                                long byteCount,
+                                boolean isCommit) {
+        this.mayUploadProvider = mayUploadProvider;
+        this.uploadDelegate = uploadDelegate;
+        this.outgoing = outgoing;
+        this.byteCount = byteCount;
+        this.batchMeta = batchMeta;
+        this.collectionUri = collectionUri;
+        this.isCommit = isCommit;
+    }
+
+    public static class ByteArraysContentProducer implements ContentProducer {
+        ArrayList<byte[]> outgoing;
+        public ByteArraysContentProducer(ArrayList<byte[]> arrays) {
+            outgoing = arrays;
+        }
+
+        @Override
+        public void writeTo(OutputStream outstream) throws IOException {
+            int count = outgoing.size();
+            outstream.write(RECORDS_START);
+            if (count > 0) {
+                outstream.write(outgoing.get(0));
+                for (int i = 1; i < count; ++i) {
+                    outstream.write(RECORD_SEPARATOR);
+                    outstream.write(outgoing.get(i));
+                }
+            }
+            outstream.write(RECORDS_END);
+        }
+
+        public static long outgoingBytesCount(ArrayList<byte[]> outgoing) {
+            final long numberOfRecords = outgoing.size();
+
+            // Account for start and end tokens.
+            long count = RECORDS_START.length + RECORDS_END.length;
+
+            // Account for all the records.
+            for (int i = 0; i < numberOfRecords; i++) {
+                count += outgoing.get(i).length;
+            }
+
+            // Account for a separator between the records.
+            // There's one less separator than there are records.
+            if (numberOfRecords > 1) {
+                count += RECORD_SEPARATOR.length * (numberOfRecords - 1);
+            }
+
+            return count;
+        }
+    }
+
+    public static class ByteArraysEntity extends EntityTemplate {
+        private final long count;
+        public ByteArraysEntity(ArrayList<byte[]> arrays, long totalBytes) {
+            super(new ByteArraysContentProducer(arrays));
+            this.count = totalBytes;
+            this.setContentType("application/json");
+            // charset is set in BaseResource.
+
+            // Sanity check our byte counts.
+            long realByteCount = ByteArraysContentProducer.outgoingBytesCount(arrays);
+            if (realByteCount != totalBytes) {
+                throw new IllegalStateException("Mismatched byte counts. Received " + totalBytes + " while real byte count is " + realByteCount);
+            }
+        }
+
+        @Override
+        public long getContentLength() {
+            return count;
+        }
+
+        @Override
+        public boolean isRepeatable() {
+            return true;
+        }
+    }
+
+    @Override
+    public void run() {
+        if (!mayUploadProvider.mayUpload()) {
+            Logger.info(LOG_TAG, "Told not to proceed by the uploader. Cancelling upload, failing records.");
+            uploadDelegate.handleRequestError(new Server11PreviousPostFailedException());
+            return;
+        }
+
+        Logger.trace(LOG_TAG, "Running upload task. Outgoing records: " + outgoing.size());
+
+        // We don't want the task queue to proceed until this request completes.
+        // Fortunately, BaseResource is currently synchronous.
+        // If that ever changes, you'll need to block here.
+
+        final URI postURI = buildPostURI(isCommit, batchMeta, collectionUri);
+        final SyncStorageRequest request = new SyncStorageRequest(postURI);
+        request.delegate = uploadDelegate;
+
+        ByteArraysEntity body = new ByteArraysEntity(outgoing, byteCount);
+        request.post(body);
+    }
+
+    @VisibleForTesting
+    public static URI buildPostURI(boolean isCommit, BatchMeta batchMeta, Uri collectionUri) {
+        final Uri.Builder uriBuilder = collectionUri.buildUpon();
+        final String batchToken = batchMeta.getToken();
+
+        if (batchToken != null) {
+            uriBuilder.appendQueryParameter(QUERY_PARAM_BATCH, batchToken);
+        } else {
+            uriBuilder.appendQueryParameter(QUERY_PARAM_BATCH, QUERY_PARAM_TRUE);
+        }
+
+        if (isCommit) {
+            uriBuilder.appendQueryParameter(QUERY_PARAM_BATCH_COMMIT, QUERY_PARAM_TRUE);
+        }
+
+        try {
+            return new URI(uriBuilder.build().toString());
+        } catch (URISyntaxException e) {
+            throw new IllegalStateException("Failed to construct a collection URI", e);
+        }
+    }
+}
--- a/mobile/android/services/src/main/java/org/mozilla/gecko/sync/stage/AndroidBrowserBookmarksServerSyncStage.java
+++ b/mobile/android/services/src/main/java/org/mozilla/gecko/sync/stage/AndroidBrowserBookmarksServerSyncStage.java
@@ -45,16 +45,17 @@ public class AndroidBrowserBookmarksServ
     AuthHeaderProvider authHeaderProvider = session.getAuthHeaderProvider();
     final JSONRecordFetcher countsFetcher = new JSONRecordFetcher(session.config.infoCollectionCountsURL(), authHeaderProvider);
     String collection = getCollection();
     return new SafeConstrainedServer11Repository(
         collection,
         session.config.storageURL(),
         session.getAuthHeaderProvider(),
         session.config.infoCollections,
+        session.config.infoConfiguration,
         BOOKMARKS_REQUEST_LIMIT,
         BOOKMARKS_SORT,
         countsFetcher);
   }
 
   @Override
   protected Repository getLocalRepository() {
     return new AndroidBrowserBookmarksRepository();
--- a/mobile/android/services/src/main/java/org/mozilla/gecko/sync/stage/AndroidBrowserHistoryServerSyncStage.java
+++ b/mobile/android/services/src/main/java/org/mozilla/gecko/sync/stage/AndroidBrowserHistoryServerSyncStage.java
@@ -45,16 +45,17 @@ public class AndroidBrowserHistoryServer
   @Override
   protected Repository getRemoteRepository() throws URISyntaxException {
     String collection = getCollection();
     return new ConstrainedServer11Repository(
                                              collection,
                                              session.config.storageURL(),
                                              session.getAuthHeaderProvider(),
                                              session.config.infoCollections,
+                                             session.config.infoConfiguration,
                                              HISTORY_REQUEST_LIMIT,
                                              HISTORY_SORT);
   }
 
   @Override
   protected RecordFactory getRecordFactory() {
     return new HistoryRecordFactory();
   }
--- a/mobile/android/services/src/main/java/org/mozilla/gecko/sync/stage/FormHistoryServerSyncStage.java
+++ b/mobile/android/services/src/main/java/org/mozilla/gecko/sync/stage/FormHistoryServerSyncStage.java
@@ -40,16 +40,17 @@ public class FormHistoryServerSyncStage 
   @Override
   protected Repository getRemoteRepository() throws URISyntaxException {
     String collection = getCollection();
     return new ConstrainedServer11Repository(
         collection,
         session.config.storageURL(),
         session.getAuthHeaderProvider(),
         session.config.infoCollections,
+        session.config.infoConfiguration,
         FORM_HISTORY_REQUEST_LIMIT,
         FORM_HISTORY_SORT);
   }
 
   @Override
   protected Repository getLocalRepository() {
     return new FormHistoryRepositorySession.FormHistoryRepository();
   }
--- a/mobile/android/services/src/main/java/org/mozilla/gecko/sync/stage/SafeConstrainedServer11Repository.java
+++ b/mobile/android/services/src/main/java/org/mozilla/gecko/sync/stage/SafeConstrainedServer11Repository.java
@@ -3,16 +3,17 @@
  * file, You can obtain one at http://mozilla.org/MPL/2.0/. */
 
 package org.mozilla.gecko.sync.stage;
 
 import java.net.URISyntaxException;
 
 import org.mozilla.gecko.background.common.log.Logger;
 import org.mozilla.gecko.sync.InfoCollections;
+import org.mozilla.gecko.sync.InfoConfiguration;
 import org.mozilla.gecko.sync.InfoCounts;
 import org.mozilla.gecko.sync.JSONRecordFetcher;
 import org.mozilla.gecko.sync.net.AuthHeaderProvider;
 import org.mozilla.gecko.sync.repositories.ConstrainedServer11Repository;
 import org.mozilla.gecko.sync.repositories.Repository;
 import org.mozilla.gecko.sync.repositories.Server11RepositorySession;
 import org.mozilla.gecko.sync.repositories.delegates.RepositorySessionCreationDelegate;
 
@@ -32,21 +33,22 @@ public class SafeConstrainedServer11Repo
 
   // This can be lazily evaluated if we need it.
   private final JSONRecordFetcher countFetcher;
 
   public SafeConstrainedServer11Repository(String collection,
                                            String storageURL,
                                            AuthHeaderProvider authHeaderProvider,
                                            InfoCollections infoCollections,
+                                           InfoConfiguration infoConfiguration,
                                            long limit,
                                            String sort,
                                            JSONRecordFetcher countFetcher)
     throws URISyntaxException {
-    super(collection, storageURL, authHeaderProvider, infoCollections, limit, sort);
+    super(collection, storageURL, authHeaderProvider, infoCollections, infoConfiguration, limit, sort);
     if (countFetcher == null) {
       throw new IllegalArgumentException("countFetcher must not be null");
     }
     this.countFetcher = countFetcher;
   }
 
   @Override
   public void createSession(RepositorySessionCreationDelegate delegate,
--- a/mobile/android/services/src/main/java/org/mozilla/gecko/sync/stage/ServerSyncStage.java
+++ b/mobile/android/services/src/main/java/org/mozilla/gecko/sync/stage/ServerSyncStage.java
@@ -140,17 +140,18 @@ public abstract class ServerSyncStage ex
   protected abstract RecordFactory getRecordFactory();
 
   // Override this in subclasses.
   protected Repository getRemoteRepository() throws URISyntaxException {
     String collection = getCollection();
     return new Server11Repository(collection,
                                   session.config.storageURL(),
                                   session.getAuthHeaderProvider(),
-                                  session.config.infoCollections);
+                                  session.config.infoCollections,
+                                  session.config.infoConfiguration);
   }
 
   /**
    * Return a Crypto5Middleware-wrapped Server11Repository.
    *
    * @throws NoCollectionKeysSetException
    * @throws URISyntaxException
    */
--- a/mobile/android/tests/background/junit4/src/org/mozilla/android/sync/net/test/TestServer11Repository.java
+++ b/mobile/android/tests/background/junit4/src/org/mozilla/android/sync/net/test/TestServer11Repository.java
@@ -3,44 +3,46 @@
 
 package org.mozilla.android.sync.net.test;
 
 import org.junit.Assert;
 import org.junit.Test;
 import org.junit.runner.RunWith;
 import org.mozilla.gecko.background.testhelpers.TestRunner;
 import org.mozilla.gecko.sync.InfoCollections;
+import org.mozilla.gecko.sync.InfoConfiguration;
 import org.mozilla.gecko.sync.repositories.Server11Repository;
 
 import java.net.URI;
 import java.net.URISyntaxException;
 
 @RunWith(TestRunner.class)
 public class TestServer11Repository {
 
   private static final String COLLECTION = "bookmarks";
   private static final String COLLECTION_URL = "http://foo.com/1.1/n6ec3u5bee3tixzp2asys7bs6fve4jfw/storage";
 
   protected final InfoCollections infoCollections = new InfoCollections();
+  protected final InfoConfiguration infoConfiguration = new InfoConfiguration();
 
   public static void assertQueryEquals(String expected, URI u) {
     Assert.assertEquals(expected, u.getRawQuery());
   }
 
   @SuppressWarnings("static-method")
   @Test
   public void testCollectionURIFull() throws URISyntaxException {
-    Server11Repository r = new Server11Repository(COLLECTION, COLLECTION_URL, null, infoCollections);
+    Server11Repository r = new Server11Repository(COLLECTION, COLLECTION_URL, null, infoCollections, infoConfiguration);
     assertQueryEquals("full=1&newer=5000.000",              r.collectionURI(true,  5000000L, -1,    null, null));
     assertQueryEquals("newer=1230.000",                     r.collectionURI(false, 1230000L, -1,    null, null));
     assertQueryEquals("newer=5000.000&limit=10",            r.collectionURI(false, 5000000L, 10,    null, null));
     assertQueryEquals("full=1&newer=5000.000&sort=index",   r.collectionURI(true,  5000000L,  0, "index", null));
     assertQueryEquals("full=1&ids=123,abc",                 r.collectionURI(true,       -1L, -1,    null, "123,abc"));
   }
 
   @Test
   public void testCollectionURI() throws URISyntaxException {
-    Server11Repository noTrailingSlash = new Server11Repository(COLLECTION, COLLECTION_URL, null, infoCollections);
-    Server11Repository trailingSlash = new Server11Repository(COLLECTION, COLLECTION_URL + "/", null, infoCollections);
+    Server11Repository noTrailingSlash = new Server11Repository(COLLECTION, COLLECTION_URL, null, infoCollections, infoConfiguration);
+    Server11Repository trailingSlash = new Server11Repository(COLLECTION, COLLECTION_URL + "/", null, infoCollections, infoConfiguration);
     Assert.assertEquals("http://foo.com/1.1/n6ec3u5bee3tixzp2asys7bs6fve4jfw/storage/bookmarks", noTrailingSlash.collectionURI().toASCIIString());
     Assert.assertEquals("http://foo.com/1.1/n6ec3u5bee3tixzp2asys7bs6fve4jfw/storage/bookmarks", trailingSlash.collectionURI().toASCIIString());
   }
 }
--- a/mobile/android/tests/background/junit4/src/org/mozilla/android/sync/test/TestServer11RepositorySession.java
+++ b/mobile/android/tests/background/junit4/src/org/mozilla/android/sync/test/TestServer11RepositorySession.java
@@ -1,53 +1,46 @@
 /* Any copyright is dedicated to the Public Domain.
    http://creativecommons.org/publicdomain/zero/1.0/ */
 
 package org.mozilla.android.sync.test;
 
-import ch.boye.httpclientandroidlib.HttpEntity;
 import org.junit.Test;
 import org.junit.runner.RunWith;
 import org.mozilla.android.sync.test.SynchronizerHelpers.TrackingWBORepository;
 import org.mozilla.android.sync.test.helpers.BaseTestStorageRequestDelegate;
 import org.mozilla.android.sync.test.helpers.HTTPServerTestHelper;
 import org.mozilla.android.sync.test.helpers.MockServer;
-import org.mozilla.gecko.background.testhelpers.MockRecord;
 import org.mozilla.gecko.background.testhelpers.TestRunner;
 import org.mozilla.gecko.background.testhelpers.WaitHelper;
 import org.mozilla.gecko.sync.InfoCollections;
+import org.mozilla.gecko.sync.InfoConfiguration;
 import org.mozilla.gecko.sync.JSONRecordFetcher;
 import org.mozilla.gecko.sync.Utils;
 import org.mozilla.gecko.sync.crypto.KeyBundle;
 import org.mozilla.gecko.sync.middleware.Crypto5MiddlewareRepository;
 import org.mozilla.gecko.sync.net.AuthHeaderProvider;
 import org.mozilla.gecko.sync.net.BaseResource;
 import org.mozilla.gecko.sync.net.BasicAuthHeaderProvider;
-import org.mozilla.gecko.sync.net.SyncStorageRecordRequest;
 import org.mozilla.gecko.sync.net.SyncStorageResponse;
 import org.mozilla.gecko.sync.repositories.FetchFailedException;
-import org.mozilla.gecko.sync.repositories.Repository;
 import org.mozilla.gecko.sync.repositories.RepositorySession;
 import org.mozilla.gecko.sync.repositories.Server11Repository;
-import org.mozilla.gecko.sync.repositories.Server11RepositorySession;
 import org.mozilla.gecko.sync.repositories.StoreFailedException;
 import org.mozilla.gecko.sync.repositories.delegates.RepositorySessionCreationDelegate;
 import org.mozilla.gecko.sync.repositories.domain.BookmarkRecord;
 import org.mozilla.gecko.sync.repositories.domain.BookmarkRecordFactory;
-import org.mozilla.gecko.sync.repositories.domain.Record;
 import org.mozilla.gecko.sync.stage.SafeConstrainedServer11Repository;
 import org.mozilla.gecko.sync.synchronizer.ServerLocalSynchronizer;
 import org.mozilla.gecko.sync.synchronizer.Synchronizer;
 import org.simpleframework.http.ContentType;
 import org.simpleframework.http.Request;
 import org.simpleframework.http.Response;
 
 import java.io.IOException;
-import java.net.URI;
-import java.net.URISyntaxException;
 import java.util.concurrent.atomic.AtomicBoolean;
 
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertTrue;
 
 @RunWith(TestRunner.class)
 public class TestServer11RepositorySession {
@@ -65,104 +58,67 @@ public class TestServer11RepositorySessi
       System.out.println("Content-Type:" + contentType);
       super.handle(request, response, 200, "{success:[]}");
     }
   }
 
   private static final int    TEST_PORT   = HTTPServerTestHelper.getTestPort();
   private static final String TEST_SERVER = "http://localhost:" + TEST_PORT + "/";
   static final String LOCAL_BASE_URL      = TEST_SERVER + "1.1/n6ec3u5bee3tixzp2asys7bs6fve4jfw/";
-  static final String LOCAL_REQUEST_URL   = LOCAL_BASE_URL + "storage/bookmarks";
   static final String LOCAL_INFO_BASE_URL = LOCAL_BASE_URL + "info/";
   static final String LOCAL_COUNTS_URL    = LOCAL_INFO_BASE_URL + "collection_counts";
 
   // Corresponds to rnewman+atest1@mozilla.com, local.
   static final String TEST_USERNAME          = "n6ec3u5bee3tixzp2asys7bs6fve4jfw";
   static final String TEST_PASSWORD          = "passowrd";
   static final String SYNC_KEY          = "eh7ppnb82iwr5kt3z3uyi5vr44";
 
   public final AuthHeaderProvider authHeaderProvider = new BasicAuthHeaderProvider(TEST_USERNAME, TEST_PASSWORD);
   protected final InfoCollections infoCollections = new InfoCollections();
+  protected final InfoConfiguration infoConfiguration = new InfoConfiguration();
 
   // Few-second timeout so that our longer operations don't time out and cause spurious error-handling results.
   private static final int SHORT_TIMEOUT = 10000;
 
   public AuthHeaderProvider getAuthHeaderProvider() {
     return new BasicAuthHeaderProvider(TEST_USERNAME, TEST_PASSWORD);
   }
 
   private HTTPServerTestHelper data     = new HTTPServerTestHelper();
 
-  public class MockServer11RepositorySession extends Server11RepositorySession {
-    public MockServer11RepositorySession(Repository repository) {
-      super(repository);
-    }
-
-    public RecordUploadRunnable getRecordUploadRunnable() {
-      // TODO: implement upload delegate in the class, too!
-      return new RecordUploadRunnable(null, recordsBuffer, recordGuidsBuffer, byteCount);
-    }
-
-    public void enqueueRecord(Record r) {
-      super.enqueue(r);
-    }
-
-    public HttpEntity getEntity() {
-      return this.getRecordUploadRunnable().getBodyEntity();
-    }
-  }
-
   public class TestSyncStorageRequestDelegate extends
   BaseTestStorageRequestDelegate {
     public TestSyncStorageRequestDelegate(String username, String password) {
       super(username, password);
     }
 
     @Override
     public void handleRequestSuccess(SyncStorageResponse res) {
       assertTrue(res.wasSuccessful());
       assertTrue(res.httpResponse().containsHeader("X-Weave-Timestamp"));
       BaseResource.consumeEntity(res);
       data.stopHTTPServer();
     }
   }
 
-  @Test
-  public void test() throws URISyntaxException {
-
-    BaseResource.rewriteLocalhost = false;
-    data.startHTTPServer(new POSTMockServer());
-
-    MockServer11RepositorySession session = new MockServer11RepositorySession(
-        null);
-    session.enqueueRecord(new MockRecord(Utils.generateGuid(), null, 0, false));
-    session.enqueueRecord(new MockRecord(Utils.generateGuid(), null, 0, false));
-
-    URI uri = new URI(LOCAL_REQUEST_URL);
-    SyncStorageRecordRequest r = new SyncStorageRecordRequest(uri);
-    TestSyncStorageRequestDelegate delegate = new TestSyncStorageRequestDelegate(TEST_USERNAME, TEST_PASSWORD);
-    r.delegate = delegate;
-    r.post(session.getEntity());
-  }
-
   @SuppressWarnings("static-method")
   protected TrackingWBORepository getLocal(int numRecords) {
     final TrackingWBORepository local = new TrackingWBORepository();
     for (int i = 0; i < numRecords; i++) {
       BookmarkRecord outbound = new BookmarkRecord("outboundFail" + i, "bookmarks", 1, false);
       local.wbos.put(outbound.guid, outbound);
     }
     return local;
   }
 
   protected Exception doSynchronize(MockServer server) throws Exception {
     final String COLLECTION = "test";
 
     final TrackingWBORepository local = getLocal(100);
-    final Server11Repository remote = new Server11Repository(COLLECTION, getCollectionURL(COLLECTION), authHeaderProvider, infoCollections);
+    final Server11Repository remote = new Server11Repository(COLLECTION, getCollectionURL(COLLECTION), authHeaderProvider, infoCollections, infoConfiguration);
     KeyBundle collectionKey = new KeyBundle(TEST_USERNAME, SYNC_KEY);
     Crypto5MiddlewareRepository cryptoRepo = new Crypto5MiddlewareRepository(remote, collectionKey);
     cryptoRepo.recordFactory = new BookmarkRecordFactory();
 
     final Synchronizer synchronizer = new ServerLocalSynchronizer();
     synchronizer.repositoryA = cryptoRepo;
     synchronizer.repositoryB = local;
 
@@ -229,16 +185,17 @@ public class TestServer11RepositorySessi
       }
     };
     final JSONRecordFetcher countsFetcher = new JSONRecordFetcher(LOCAL_COUNTS_URL, getAuthHeaderProvider());
     String collection = "bookmarks";
     final SafeConstrainedServer11Repository remote = new SafeConstrainedServer11Repository(collection,
         getCollectionURL(collection),
         getAuthHeaderProvider(),
         infoCollections,
+        infoConfiguration,
         5000, "sortindex", countsFetcher);
 
     data.startHTTPServer(server);
     final AtomicBoolean out = new AtomicBoolean(false);
 
     // Verify that shouldSkip returns true due to a fetch of too large counts,
     // rather than due to a timeout failure waiting to fetch counts.
     try {
--- a/mobile/android/tests/background/junit4/src/org/mozilla/gecko/background/testhelpers/MockRecord.java
+++ b/mobile/android/tests/background/junit4/src/org/mozilla/gecko/background/testhelpers/MockRecord.java
@@ -1,20 +1,30 @@
 /* Any copyright is dedicated to the Public Domain.
    http://creativecommons.org/publicdomain/zero/1.0/ */
 
 package org.mozilla.gecko.background.testhelpers;
 
 import org.mozilla.gecko.sync.ExtendedJSONObject;
 import org.mozilla.gecko.sync.repositories.domain.Record;
 
+import java.util.Random;
+
 public class MockRecord extends Record {
-
+  private final int payloadByteCount;
   public MockRecord(String guid, String collection, long lastModified, boolean deleted) {
     super(guid, collection, lastModified, deleted);
+    // Payload used to be "foo", so let's not stray too far.
+    // Perhaps some tests "depend" on that payload size.
+    payloadByteCount = 3;
+  }
+
+  public MockRecord(String guid, String collection, long lastModified, boolean deleted, int payloadByteCount) {
+    super(guid, collection, lastModified, deleted);
+    this.payloadByteCount = payloadByteCount;
   }
 
   @Override
   protected void populatePayload(ExtendedJSONObject payload) {
   }
 
   @Override
   protected void initFromPayload(ExtendedJSONObject payload) {
@@ -24,11 +34,18 @@ public class MockRecord extends Record {
   public Record copyWithIDs(String guid, long androidID) {
     MockRecord r = new MockRecord(guid, this.collection, this.lastModified, this.deleted);
     r.androidID = androidID;
     return r;
   }
 
   @Override
   public String toJSONString() {
-    return "{\"id\":\"" + guid + "\", \"payload\": \"foo\"}";
+    // Build up a randomish payload string based on the length we were asked for.
+    final Random random = new Random();
+    final char[] payloadChars = new char[payloadByteCount];
+    for (int i = 0; i < payloadByteCount; i++) {
+      payloadChars[i] = (char) (random.nextInt(26) + 'a');
+    }
+    final String payloadString = new String(payloadChars);
+    return "{\"id\":\"" + guid + "\", \"payload\": \"" + payloadString+ "\"}";
   }
 }
\ No newline at end of file
--- a/mobile/android/tests/background/junit4/src/org/mozilla/gecko/sync/repositories/test/TestSafeConstrainedServer11Repository.java
+++ b/mobile/android/tests/background/junit4/src/org/mozilla/gecko/sync/repositories/test/TestSafeConstrainedServer11Repository.java
@@ -6,16 +6,17 @@ package org.mozilla.gecko.sync.repositor
 import org.junit.Assert;
 import org.junit.Test;
 import org.junit.runner.RunWith;
 import org.mozilla.android.sync.test.helpers.HTTPServerTestHelper;
 import org.mozilla.android.sync.test.helpers.MockServer;
 import org.mozilla.gecko.background.testhelpers.TestRunner;
 import org.mozilla.gecko.background.testhelpers.WaitHelper;
 import org.mozilla.gecko.sync.InfoCollections;
+import org.mozilla.gecko.sync.InfoConfiguration;
 import org.mozilla.gecko.sync.JSONRecordFetcher;
 import org.mozilla.gecko.sync.net.AuthHeaderProvider;
 import org.mozilla.gecko.sync.repositories.RepositorySession;
 import org.mozilla.gecko.sync.repositories.delegates.RepositorySessionCreationDelegate;
 import org.mozilla.gecko.sync.stage.SafeConstrainedServer11Repository;
 import org.simpleframework.http.Request;
 import org.simpleframework.http.Response;
 
@@ -30,16 +31,17 @@ public class TestSafeConstrainedServer11
   private static final String  TEST_USERNAME  = "c6o7dvmr2c4ud2fyv6woz2u4zi22bcyd";
   private static final String  TEST_BASE_PATH = "/1.1/" + TEST_USERNAME + "/";
 
   public AuthHeaderProvider getAuthHeaderProvider() {
     return null;
   }
 
   protected final InfoCollections infoCollections = new InfoCollections();
+  protected final InfoConfiguration infoConfiguration = new InfoConfiguration();
 
   private class CountsMockServer extends MockServer {
     public final AtomicInteger count = new AtomicInteger(0);
     public final AtomicBoolean error = new AtomicBoolean(false);
 
     @Override
     public void handle(Request request, Response response) {
       final String path = request.getPath().getPath();
@@ -80,17 +82,17 @@ public class TestSafeConstrainedServer11
     try {
       String countsURL = TEST_SERVER + TEST_BASE_PATH + "info/collection_counts";
       JSONRecordFetcher countFetcher = new JSONRecordFetcher(countsURL, getAuthHeaderProvider());
       String sort = "sortindex";
       String collection = "rotary";
 
       final int TEST_LIMIT = 1000;
       final SafeConstrainedServer11Repository repo = new SafeConstrainedServer11Repository(
-          collection, getCollectionURL(collection), null, infoCollections,
+          collection, getCollectionURL(collection), null, infoCollections, infoConfiguration,
           TEST_LIMIT, sort, countFetcher);
 
       final AtomicBoolean shouldSkipLots = new AtomicBoolean(false);
       final AtomicBoolean shouldSkipFew = new AtomicBoolean(true);
       final AtomicBoolean shouldSkip503 = new AtomicBoolean (false);
 
       WaitHelper.getTestWaiter().performWait(2000, new Runnable() {
         @Override
new file mode 100644
--- /dev/null
+++ b/mobile/android/tests/background/junit4/src/org/mozilla/gecko/sync/repositories/uploaders/BatchMetaTest.java
@@ -0,0 +1,282 @@
+/* Any copyright is dedicated to the Public Domain.
+   http://creativecommons.org/publicdomain/zero/1.0/ */
+
+package org.mozilla.gecko.sync.repositories.uploaders;
+
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mozilla.gecko.background.testhelpers.TestRunner;
+
+import static org.junit.Assert.*;
+
+@RunWith(TestRunner.class)
+public class BatchMetaTest {
+    private BatchMeta batchMeta;
+    private long byteLimit = 1024;
+    private long recordLimit = 5;
+    private Object lock = new Object();
+    private Long collectionLastModified = 123L;
+
+    @Before
+    public void setUp() throws Exception {
+        batchMeta = new BatchMeta(lock, byteLimit, recordLimit, collectionLastModified);
+    }
+
+    @Test
+    public void testConstructor() {
+        assertEquals(batchMeta.collectionLastModified, collectionLastModified);
+
+        BatchMeta otherBatchMeta = new BatchMeta(lock, byteLimit, recordLimit, null);
+        assertNull(otherBatchMeta.collectionLastModified);
+    }
+
+    @Test
+    public void testGetLastModified() {
+        // Defaults to collection L-M
+        assertEquals(batchMeta.getLastModified(), Long.valueOf(123L));
+
+        try {
+            batchMeta.setLastModified(333L, true);
+        } catch (BatchingUploader.LastModifiedChangedUnexpectedly e) {
+        } catch (BatchingUploader.LastModifiedDidNotChange e) {}
+
+        assertEquals(batchMeta.getLastModified(), Long.valueOf(333L));
+    }
+
+    @Test
+    public void testSetLastModified() {
+        assertEquals(batchMeta.getLastModified(), collectionLastModified);
+
+        try {
+            batchMeta.setLastModified(123L, true);
+            assertEquals(batchMeta.getLastModified(), Long.valueOf(123L));
+        } catch (BatchingUploader.LastModifiedChangedUnexpectedly e) {
+            fail("Should not check for modifications on first L-M set");
+        } catch (BatchingUploader.LastModifiedDidNotChange e) {
+            fail("Should not check for modifications on first L-M set");
+        }
+
+        // Now the same, but passing in 'false' for "expecting to change".
+        batchMeta.reset();
+        assertEquals(batchMeta.getLastModified(), collectionLastModified);
+
+        try {
+            batchMeta.setLastModified(123L, false);
+            assertEquals(batchMeta.getLastModified(), Long.valueOf(123L));
+        } catch (BatchingUploader.LastModifiedChangedUnexpectedly e) {
+            fail("Should not check for modifications on first L-M set");
+        } catch (BatchingUploader.LastModifiedDidNotChange e) {
+            fail("Should not check for modifications on first L-M set");
+        }
+
+        // Test that we can't modify L-M when we're not expecting to
+        try {
+            batchMeta.setLastModified(333L, false);
+        } catch (BatchingUploader.LastModifiedChangedUnexpectedly e) {
+            assertTrue("Must throw when L-M changes unexpectedly", true);
+        } catch (BatchingUploader.LastModifiedDidNotChange e) {
+            fail("Not expecting did-not-change throw");
+        }
+        assertEquals(batchMeta.getLastModified(), Long.valueOf(123L));
+
+        // Test that we can modify L-M when we're expecting to
+        try {
+            batchMeta.setLastModified(333L, true);
+        } catch (BatchingUploader.LastModifiedChangedUnexpectedly e) {
+            fail("Not expecting changed-unexpectedly throw");
+        } catch (BatchingUploader.LastModifiedDidNotChange e) {
+            fail("Not expecting did-not-change throw");
+        }
+        assertEquals(batchMeta.getLastModified(), Long.valueOf(333L));
+
+        // Test that we catch L-M modifications that expect to change but actually don't
+        try {
+            batchMeta.setLastModified(333L, true);
+        } catch (BatchingUploader.LastModifiedChangedUnexpectedly e) {
+            fail("Not expecting changed-unexpectedly throw");
+        } catch (BatchingUploader.LastModifiedDidNotChange e) {
+            assertTrue("Expected-to-change-but-did-not-change didn't throw", true);
+        }
+        assertEquals(batchMeta.getLastModified(), Long.valueOf(333));
+    }
+
+    @Test
+    public void testSetToken() {
+        assertNull(batchMeta.getToken());
+
+        try {
+            batchMeta.setToken("MTIzNA", false);
+        } catch (BatchingUploader.TokenModifiedException e) {
+            fail("Should be able to set token for the first time");
+        }
+        assertEquals("MTIzNA", batchMeta.getToken());
+
+        try {
+            batchMeta.setToken("XYCvNA", false);
+        } catch (BatchingUploader.TokenModifiedException e) {
+            assertTrue("Should not be able to modify a token", true);
+        }
+        assertEquals("MTIzNA", batchMeta.getToken());
+
+        try {
+            batchMeta.setToken("XYCvNA", true);
+        } catch (BatchingUploader.TokenModifiedException e) {
+            assertTrue("Should catch non-null tokens during onCommit sets", true);
+        }
+        assertEquals("MTIzNA", batchMeta.getToken());
+
+        try {
+            batchMeta.setToken(null, true);
+        } catch (BatchingUploader.TokenModifiedException e) {
+            fail("Should be able to set token to null during onCommit set");
+        }
+        assertNull(batchMeta.getToken());
+    }
+
+    @Test
+    public void testRecordSucceeded() {
+        assertTrue(batchMeta.getSuccessRecordGuids().isEmpty());
+
+        batchMeta.recordSucceeded("guid1");
+
+        assertTrue(batchMeta.getSuccessRecordGuids().size() == 1);
+        assertTrue(batchMeta.getSuccessRecordGuids().contains("guid1"));
+
+        try {
+            batchMeta.recordSucceeded(null);
+            fail();
+        } catch (IllegalStateException e) {
+            assertTrue("Should not be able to 'succeed' a null guid", true);
+        }
+    }
+
+    @Test
+    public void testByteLimits() {
+        assertTrue(batchMeta.canFit(0));
+
+        // Should just fit
+        assertTrue(batchMeta.canFit(byteLimit - BatchingUploader.PER_PAYLOAD_OVERHEAD_BYTE_COUNT));
+
+        // Can't fit a record due to payload overhead.
+        assertFalse(batchMeta.canFit(byteLimit));
+
+        assertFalse(batchMeta.canFit(byteLimit + BatchingUploader.PER_PAYLOAD_OVERHEAD_BYTE_COUNT));
+        assertFalse(batchMeta.canFit(byteLimit * 1000));
+
+        long recordDelta = byteLimit / 2;
+        assertFalse(batchMeta.addAndEstimateIfFull(recordDelta));
+
+        // Record delta shouldn't fit due to payload overhead.
+        assertFalse(batchMeta.canFit(recordDelta));
+    }
+
+    @Test
+    public void testCountLimits() {
+        // Our record limit is 5, let's add 4.
+        assertFalse(batchMeta.addAndEstimateIfFull(1));
+        assertFalse(batchMeta.addAndEstimateIfFull(1));
+        assertFalse(batchMeta.addAndEstimateIfFull(1));
+        assertFalse(batchMeta.addAndEstimateIfFull(1));
+
+        // 5th record still fits in
+        assertTrue(batchMeta.canFit(1));
+
+        // Add the 5th record
+        assertTrue(batchMeta.addAndEstimateIfFull(1));
+
+        // 6th record won't fit
+        assertFalse(batchMeta.canFit(1));
+    }
+
+    @Test
+    public void testNeedCommit() {
+        assertFalse(batchMeta.needToCommit());
+
+        assertFalse(batchMeta.addAndEstimateIfFull(1));
+
+        assertTrue(batchMeta.needToCommit());
+
+        assertFalse(batchMeta.addAndEstimateIfFull(1));
+        assertFalse(batchMeta.addAndEstimateIfFull(1));
+        assertFalse(batchMeta.addAndEstimateIfFull(1));
+
+        assertTrue(batchMeta.needToCommit());
+
+        batchMeta.reset();
+
+        assertFalse(batchMeta.needToCommit());
+    }
+
+    @Test
+    public void testAdd() {
+        // Ensure we account for payload overhead twice when the batch is empty.
+        // Payload overhead is either RECORDS_START or RECORDS_END, and for an empty payload
+        // we need both.
+        assertTrue(batchMeta.getByteCount() == 2 * BatchingUploader.PER_PAYLOAD_OVERHEAD_BYTE_COUNT);
+        assertTrue(batchMeta.getRecordCount() == 0);
+
+        assertFalse(batchMeta.addAndEstimateIfFull(1));
+
+        assertTrue(batchMeta.getByteCount() == (1 + BatchingUploader.PER_PAYLOAD_OVERHEAD_BYTE_COUNT));
+        assertTrue(batchMeta.getRecordCount() == 1);
+
+        assertFalse(batchMeta.addAndEstimateIfFull(1));
+        assertFalse(batchMeta.addAndEstimateIfFull(1));
+        assertFalse(batchMeta.addAndEstimateIfFull(1));
+
+        assertTrue(batchMeta.getByteCount() == (4 + BatchingUploader.PER_PAYLOAD_OVERHEAD_BYTE_COUNT));
+        assertTrue(batchMeta.getRecordCount() == 4);
+
+        assertTrue(batchMeta.addAndEstimateIfFull(1));
+
+        try {
+            assertTrue(batchMeta.addAndEstimateIfFull(1));
+            fail("BatchMeta should not let us insert records that won't fit");
+        } catch (IllegalStateException e) {
+            assertTrue(true);
+        }
+    }
+
+    @Test
+    public void testReset() {
+        assertTrue(batchMeta.getByteCount() == 2 * BatchingUploader.PER_PAYLOAD_OVERHEAD_BYTE_COUNT);
+        assertTrue(batchMeta.getRecordCount() == 0);
+        assertTrue(batchMeta.getSuccessRecordGuids().isEmpty());
+
+        // Shouldn't throw even if already empty
+        batchMeta.reset();
+        assertTrue(batchMeta.getByteCount() == 2 * BatchingUploader.PER_PAYLOAD_OVERHEAD_BYTE_COUNT);
+        assertTrue(batchMeta.getRecordCount() == 0);
+        assertTrue(batchMeta.getSuccessRecordGuids().isEmpty());
+
+        assertFalse(batchMeta.addAndEstimateIfFull(1));
+        batchMeta.recordSucceeded("guid1");
+        try {
+            batchMeta.setToken("MTIzNA", false);
+        } catch (BatchingUploader.TokenModifiedException e) {}
+        try {
+            batchMeta.setLastModified(333L, true);
+        } catch (BatchingUploader.LastModifiedChangedUnexpectedly e) {
+        } catch (BatchingUploader.LastModifiedDidNotChange e) {}
+        assertEquals(Long.valueOf(333L), batchMeta.getLastModified());
+        assertEquals("MTIzNA", batchMeta.getToken());
+        assertTrue(batchMeta.getSuccessRecordGuids().size() == 1);
+
+        batchMeta.reset();
+
+        // Counts must be reset
+        assertTrue(batchMeta.getByteCount() == 2 * BatchingUploader.PER_PAYLOAD_OVERHEAD_BYTE_COUNT);
+        assertTrue(batchMeta.getRecordCount() == 0);
+        assertTrue(batchMeta.getSuccessRecordGuids().isEmpty());
+
+        // Collection L-M shouldn't change
+        assertEquals(batchMeta.collectionLastModified, collectionLastModified);
+
+        // Token must be reset
+        assertNull(batchMeta.getToken());
+
+        // L-M must be reverted to collection L-M
+        assertEquals(batchMeta.getLastModified(), collectionLastModified);
+    }
+}
\ No newline at end of file
new file mode 100644
--- /dev/null
+++ b/mobile/android/tests/background/junit4/src/org/mozilla/gecko/sync/repositories/uploaders/BatchingUploaderTest.java
@@ -0,0 +1,441 @@
+/* Any copyright is dedicated to the Public Domain.
+   http://creativecommons.org/publicdomain/zero/1.0/ */
+
+package org.mozilla.gecko.sync.repositories.uploaders;
+
+import android.support.annotation.NonNull;
+
+import static org.junit.Assert.*;
+
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mozilla.gecko.background.testhelpers.MockRecord;
+import org.mozilla.gecko.background.testhelpers.TestRunner;
+import org.mozilla.gecko.sync.ExtendedJSONObject;
+import org.mozilla.gecko.sync.InfoCollections;
+import org.mozilla.gecko.sync.InfoConfiguration;
+import org.mozilla.gecko.sync.Utils;
+import org.mozilla.gecko.sync.repositories.Server11Repository;
+import org.mozilla.gecko.sync.repositories.Server11RepositorySession;
+import org.mozilla.gecko.sync.repositories.delegates.RepositorySessionStoreDelegate;
+
+import java.net.URISyntaxException;
+import java.util.Random;
+import java.util.concurrent.Executor;
+import java.util.concurrent.ExecutorService;
+
+@RunWith(TestRunner.class)
+public class BatchingUploaderTest {
+    class MockExecutorService implements Executor {
+        public int totalPayloads = 0;
+        public int commitPayloads = 0;
+
+        @Override
+        public void execute(@NonNull Runnable command) {
+            ++totalPayloads;
+            if (((RecordUploadRunnable) command).isCommit) {
+                ++commitPayloads;
+            }
+        }
+    }
+
+    class MockStoreDelegate implements RepositorySessionStoreDelegate {
+        public int storeFailed = 0;
+        public int storeSucceeded = 0;
+        public int storeCompleted = 0;
+
+        @Override
+        public void onRecordStoreFailed(Exception ex, String recordGuid) {
+            ++storeFailed;
+        }
+
+        @Override
+        public void onRecordStoreSucceeded(String guid) {
+            ++storeSucceeded;
+        }
+
+        @Override
+        public void onStoreCompleted(long storeEnd) {
+            ++storeCompleted;
+        }
+
+        @Override
+        public RepositorySessionStoreDelegate deferredStoreDelegate(ExecutorService executor) {
+            return null;
+        }
+    }
+
+    private Executor workQueue;
+    private RepositorySessionStoreDelegate storeDelegate;
+
+    @Before
+    public void setUp() throws Exception {
+        workQueue = new MockExecutorService();
+        storeDelegate = new MockStoreDelegate();
+    }
+
+    @Test
+    public void testProcessEvenPayloadBatch() {
+        BatchingUploader uploader = makeConstrainedUploader(2, 4);
+
+        MockRecord record = new MockRecord(Utils.generateGuid(), null, 0, false);
+        // 1st
+        uploader.process(record);
+        assertEquals(0, ((MockExecutorService) workQueue).totalPayloads);
+        assertEquals(0, ((MockExecutorService) workQueue).commitPayloads);
+        // 2nd -> payload full
+        uploader.process(record);
+        assertEquals(1, ((MockExecutorService) workQueue).totalPayloads);
+        assertEquals(0, ((MockExecutorService) workQueue).commitPayloads);
+        // 3rd
+        uploader.process(record);
+        assertEquals(1, ((MockExecutorService) workQueue).totalPayloads);
+        assertEquals(0, ((MockExecutorService) workQueue).commitPayloads);
+        // 4th -> batch & payload full
+        uploader.process(record);
+        assertEquals(2, ((MockExecutorService) workQueue).totalPayloads);
+        assertEquals(1, ((MockExecutorService) workQueue).commitPayloads);
+        // 5th
+        uploader.process(record);
+        assertEquals(2, ((MockExecutorService) workQueue).totalPayloads);
+        assertEquals(1, ((MockExecutorService) workQueue).commitPayloads);
+        // 6th -> payload full
+        uploader.process(record);
+        assertEquals(3, ((MockExecutorService) workQueue).totalPayloads);
+        assertEquals(1, ((MockExecutorService) workQueue).commitPayloads);
+        // 7th
+        uploader.process(record);
+        assertEquals(3, ((MockExecutorService) workQueue).totalPayloads);
+        assertEquals(1, ((MockExecutorService) workQueue).commitPayloads);
+        // 8th -> batch & payload full
+        uploader.process(record);
+        assertEquals(4, ((MockExecutorService) workQueue).totalPayloads);
+        assertEquals(2, ((MockExecutorService) workQueue).commitPayloads);
+        // 9th
+        uploader.process(record);
+        assertEquals(4, ((MockExecutorService) workQueue).totalPayloads);
+        assertEquals(2, ((MockExecutorService) workQueue).commitPayloads);
+        // 10th -> payload full
+        uploader.process(record);
+        assertEquals(5, ((MockExecutorService) workQueue).totalPayloads);
+        assertEquals(2, ((MockExecutorService) workQueue).commitPayloads);
+        // 11th
+        uploader.process(record);
+        assertEquals(5, ((MockExecutorService) workQueue).totalPayloads);
+        assertEquals(2, ((MockExecutorService) workQueue).commitPayloads);
+        // 12th -> batch & payload full
+        uploader.process(record);
+        assertEquals(6, ((MockExecutorService) workQueue).totalPayloads);
+        assertEquals(3, ((MockExecutorService) workQueue).commitPayloads);
+        // 13th
+        uploader.process(record);
+        assertEquals(6, ((MockExecutorService) workQueue).totalPayloads);
+        assertEquals(3, ((MockExecutorService) workQueue).commitPayloads);
+    }
+
+    @Test
+    public void testProcessUnevenPayloadBatch() {
+        BatchingUploader uploader = makeConstrainedUploader(2, 5);
+
+        MockRecord record = new MockRecord(Utils.generateGuid(), null, 0, false);
+        // 1st
+        uploader.process(record);
+        assertEquals(0, ((MockExecutorService) workQueue).totalPayloads);
+        assertEquals(0, ((MockExecutorService) workQueue).commitPayloads);
+        // 2nd -> payload full
+        uploader.process(record);
+        assertEquals(1, ((MockExecutorService) workQueue).totalPayloads);
+        assertEquals(0, ((MockExecutorService) workQueue).commitPayloads);
+        // 3rd
+        uploader.process(record);
+        assertEquals(1, ((MockExecutorService) workQueue).totalPayloads);
+        assertEquals(0, ((MockExecutorService) workQueue).commitPayloads);
+        // 4th -> payload full
+        uploader.process(record);
+        assertEquals(2, ((MockExecutorService) workQueue).totalPayloads);
+        assertEquals(0, ((MockExecutorService) workQueue).commitPayloads);
+        // 5th -> batch full
+        uploader.process(record);
+        assertEquals(3, ((MockExecutorService) workQueue).totalPayloads);
+        assertEquals(1, ((MockExecutorService) workQueue).commitPayloads);
+        // 6th -> starts new batch
+        uploader.process(record);
+        assertEquals(3, ((MockExecutorService) workQueue).totalPayloads);
+        assertEquals(1, ((MockExecutorService) workQueue).commitPayloads);
+        // 7th -> payload full
+        uploader.process(record);
+        assertEquals(4, ((MockExecutorService) workQueue).totalPayloads);
+        assertEquals(1, ((MockExecutorService) workQueue).commitPayloads);
+        // 8th
+        uploader.process(record);
+        assertEquals(4, ((MockExecutorService) workQueue).totalPayloads);
+        assertEquals(1, ((MockExecutorService) workQueue).commitPayloads);
+        // 9th -> payload full
+        uploader.process(record);
+        assertEquals(5, ((MockExecutorService) workQueue).totalPayloads);
+        assertEquals(1, ((MockExecutorService) workQueue).commitPayloads);
+        // 10th -> batch full
+        uploader.process(record);
+        assertEquals(6, ((MockExecutorService) workQueue).totalPayloads);
+        assertEquals(2, ((MockExecutorService) workQueue).commitPayloads);
+        // 11th -> starts new batch
+        uploader.process(record);
+        assertEquals(6, ((MockExecutorService) workQueue).totalPayloads);
+        assertEquals(2, ((MockExecutorService) workQueue).commitPayloads);
+    }
+
+    @Test
+    public void testNonBatchingOptimization() {
+        BatchingUploader uploader = makeConstrainedUploader(2, 4);
+
+        MockRecord record = new MockRecord(Utils.generateGuid(), null, 0, false);
+        // 1st
+        uploader.process(record);
+        assertEquals(0, ((MockExecutorService) workQueue).totalPayloads);
+        assertEquals(0, ((MockExecutorService) workQueue).commitPayloads);
+        // 2nd
+        uploader.process(record);
+        assertEquals(1, ((MockExecutorService) workQueue).totalPayloads);
+        assertEquals(0, ((MockExecutorService) workQueue).commitPayloads);
+        // 3rd
+        uploader.process(record);
+        assertEquals(1, ((MockExecutorService) workQueue).totalPayloads);
+        assertEquals(0, ((MockExecutorService) workQueue).commitPayloads);
+        // 4th
+        uploader.process(record);
+        assertEquals(2, ((MockExecutorService) workQueue).totalPayloads);
+        assertEquals(1, ((MockExecutorService) workQueue).commitPayloads);
+
+        // 5th
+        uploader.process(record);
+        assertEquals(2, ((MockExecutorService) workQueue).totalPayloads);
+        assertEquals(1, ((MockExecutorService) workQueue).commitPayloads);
+
+        // And now we tell uploader that batching isn't supported.
+        // It shouldn't bother with batches from now on, just payloads.
+        uploader.setInBatchingMode(false);
+
+        // 6th
+        uploader.process(record);
+        assertEquals(3, ((MockExecutorService) workQueue).totalPayloads);
+        assertEquals(1, ((MockExecutorService) workQueue).commitPayloads);
+
+        // 7th
+        uploader.process(record);
+        assertEquals(3, ((MockExecutorService) workQueue).totalPayloads);
+        assertEquals(1, ((MockExecutorService) workQueue).commitPayloads);
+
+        // 8th
+        uploader.process(record);
+        assertEquals(4, ((MockExecutorService) workQueue).totalPayloads);
+        assertEquals(1, ((MockExecutorService) workQueue).commitPayloads);
+
+        // 9th
+        uploader.process(record);
+        assertEquals(4, ((MockExecutorService) workQueue).totalPayloads);
+        assertEquals(1, ((MockExecutorService) workQueue).commitPayloads);
+
+        // 10th
+        uploader.process(record);
+        assertEquals(5, ((MockExecutorService) workQueue).totalPayloads);
+        assertEquals(1, ((MockExecutorService) workQueue).commitPayloads);
+    }
+
+    @Test
+    public void testPreemtiveUploadByteCounts() {
+        // While processing a record, if we know for sure that another one won't fit,
+        // we upload the payload.
+        BatchingUploader uploader = makeConstrainedUploader(3, 6);
+
+        // Payload byte max: 1024; batch byte max: 4096
+        MockRecord record = new MockRecord(Utils.generateGuid(), null, 0, false, 400);
+
+        uploader.process(record);
+        assertEquals(0, ((MockExecutorService) workQueue).totalPayloads);
+        assertEquals(0, ((MockExecutorService) workQueue).commitPayloads);
+
+        // After 2nd record, byte count is at 800+overhead. Our payload max is 1024, so it's unlikely
+        // we can fit another record at this pace. Expect payload to be uploaded.
+        uploader.process(record);
+        assertEquals(1, ((MockExecutorService) workQueue).totalPayloads);
+        assertEquals(0, ((MockExecutorService) workQueue).commitPayloads);
+
+        // After this record, we'll have less than 124 bytes of room left in the payload. Expect upload.
+        record = new MockRecord(Utils.generateGuid(), null, 0, false, 970);
+        uploader.process(record);
+        assertEquals(2, ((MockExecutorService) workQueue).totalPayloads);
+        assertEquals(0, ((MockExecutorService) workQueue).commitPayloads);
+
+        uploader.process(record);
+        assertEquals(3, ((MockExecutorService) workQueue).totalPayloads);
+        assertEquals(0, ((MockExecutorService) workQueue).commitPayloads);
+
+        // At this point our byte count for the batch is at 3600+overhead;
+        // since we have just 496 bytes left in the batch, it's unlikely we'll fit another record.
+        // Expect a batch commit
+        uploader.process(record);
+        assertEquals(4, ((MockExecutorService) workQueue).totalPayloads);
+        assertEquals(1, ((MockExecutorService) workQueue).commitPayloads);
+    }
+
+    @Test
+    public void testRandomPayloadSizesBatching() {
+        BatchingUploader uploader = makeConstrainedUploader(2, 4);
+
+        final Random random = new Random();
+        for (int i = 0; i < 15000; i++) {
+            uploader.process(new MockRecord(Utils.generateGuid(), null, 0, false, random.nextInt(15000)));
+        }
+    }
+
+    @Test
+    public void testRandomPayloadSizesNonBatching() {
+        BatchingUploader uploader = makeConstrainedUploader(2, 4);
+
+        final Random random = new Random();
+        uploader.setInBatchingMode(false);
+        for (int i = 0; i < 15000; i++) {
+            uploader.process(new MockRecord(Utils.generateGuid(), null, 0, false, random.nextInt(15000)));
+        }
+    }
+
+    @Test
+    public void testRandomPayloadSizesNonBatchingDelayed() {
+        BatchingUploader uploader = makeConstrainedUploader(2, 4);
+
+        final Random random = new Random();
+        // Delay telling uploader that batching isn't supported.
+        // Randomize how many records we wait for.
+        final int delay = random.nextInt(20);
+        for (int i = 0; i < 15000; i++) {
+            if (delay == i) {
+                uploader.setInBatchingMode(false);
+            }
+            uploader.process(new MockRecord(Utils.generateGuid(), null, 0, false, random.nextInt(15000)));
+        }
+    }
+
+    @Test
+    public void testNoMoreRecordsAfterPayloadPost() {
+        BatchingUploader uploader = makeConstrainedUploader(2, 4);
+
+        // Process two records (payload limit is also two, batch is four),
+        // and ensure that 'no more records' commits.
+        MockRecord record = new MockRecord(Utils.generateGuid(), null, 0, false);
+        uploader.process(record);
+        uploader.process(record);
+        uploader.setInBatchingMode(true);
+        uploader.commitIfNecessaryAfterLastPayload();
+        // One will be a payload post, the other one is batch commit (empty payload)
+        assertEquals(2, ((MockExecutorService) workQueue).totalPayloads);
+        assertEquals(1, ((MockExecutorService) workQueue).commitPayloads);
+    }
+
+    @Test
+    public void testNoMoreRecordsAfterPayloadPostWithOneRecordLeft() {
+        BatchingUploader uploader = makeConstrainedUploader(2, 4);
+
+        // Process two records (payload limit is also two, batch is four),
+        // and ensure that 'no more records' commits.
+        MockRecord record = new MockRecord(Utils.generateGuid(), null, 0, false);
+        uploader.process(record);
+        uploader.process(record);
+        uploader.process(record);
+        uploader.commitIfNecessaryAfterLastPayload();
+        // One will be a payload post, the other one is batch commit (one record payload)
+        assertEquals(2, ((MockExecutorService) workQueue).totalPayloads);
+        assertEquals(1, ((MockExecutorService) workQueue).commitPayloads);
+    }
+
+    @Test
+    public void testNoMoreRecordsNoOp() {
+        BatchingUploader uploader = makeConstrainedUploader(2, 4);
+
+        uploader.commitIfNecessaryAfterLastPayload();
+        assertEquals(0, ((MockExecutorService) workQueue).totalPayloads);
+        assertEquals(0, ((MockExecutorService) workQueue).commitPayloads);
+    }
+
+    @Test
+    public void testNoMoreRecordsNoOpAfterCommit() {
+        BatchingUploader uploader = makeConstrainedUploader(2, 4);
+
+        MockRecord record = new MockRecord(Utils.generateGuid(), null, 0, false);
+        uploader.process(record);
+        uploader.process(record);
+        uploader.process(record);
+        uploader.process(record);
+        assertEquals(2, ((MockExecutorService) workQueue).totalPayloads);
+        assertEquals(1, ((MockExecutorService) workQueue).commitPayloads);
+
+        uploader.commitIfNecessaryAfterLastPayload();
+        assertEquals(2, ((MockExecutorService) workQueue).totalPayloads);
+        assertEquals(1, ((MockExecutorService) workQueue).commitPayloads);
+    }
+
+    @Test
+    public void testNoMoreRecordsEvenNonBatching() {
+        BatchingUploader uploader = makeConstrainedUploader(2, 4);
+
+        // Process two records (payload limit is also two, batch is four),
+        // set non-batching mode, and ensure that 'no more records' doesn't commit.
+        MockRecord record = new MockRecord(Utils.generateGuid(), null, 0, false);
+        uploader.process(record);
+        uploader.process(record);
+        uploader.setInBatchingMode(false);
+        uploader.commitIfNecessaryAfterLastPayload();
+        // One will be a payload post, the other one is batch commit (one record payload)
+        assertEquals(1, ((MockExecutorService) workQueue).totalPayloads);
+        assertEquals(0, ((MockExecutorService) workQueue).commitPayloads);
+    }
+
+    @Test
+    public void testNoMoreRecordsIncompletePayload() {
+        BatchingUploader uploader = makeConstrainedUploader(2, 4);
+
+        // We have one record (payload limit is 2), and "no-more-records" signal should commit it.
+        MockRecord record = new MockRecord(Utils.generateGuid(), null, 0, false);
+        uploader.process(record);
+
+        uploader.commitIfNecessaryAfterLastPayload();
+        assertEquals(1, ((MockExecutorService) workQueue).totalPayloads);
+        assertEquals(1, ((MockExecutorService) workQueue).commitPayloads);
+    }
+
+    private BatchingUploader makeConstrainedUploader(long maxPostRecords, long maxTotalRecords) {
+        Server11RepositorySession server11RepositorySession = new Server11RepositorySession(
+                makeCountConstrainedRepository(maxPostRecords, maxTotalRecords)
+        );
+        server11RepositorySession.setStoreDelegate(storeDelegate);
+        return new BatchingUploader(server11RepositorySession, workQueue, storeDelegate);
+    }
+
+    private Server11Repository makeCountConstrainedRepository(long maxPostRecords, long maxTotalRecords) {
+        return makeConstrainedRepository(1024, 1024, maxPostRecords, 4096, maxTotalRecords);
+    }
+
+    private Server11Repository makeConstrainedRepository(long maxRequestBytes, long maxPostBytes, long maxPostRecords, long maxTotalBytes, long maxTotalRecords) {
+        ExtendedJSONObject infoConfigurationJSON = new ExtendedJSONObject();
+        infoConfigurationJSON.put(InfoConfiguration.MAX_TOTAL_BYTES, maxTotalBytes);
+        infoConfigurationJSON.put(InfoConfiguration.MAX_TOTAL_RECORDS, maxTotalRecords);
+        infoConfigurationJSON.put(InfoConfiguration.MAX_POST_RECORDS, maxPostRecords);
+        infoConfigurationJSON.put(InfoConfiguration.MAX_POST_BYTES, maxPostBytes);
+        infoConfigurationJSON.put(InfoConfiguration.MAX_REQUEST_BYTES, maxRequestBytes);
+
+        InfoConfiguration infoConfiguration = new InfoConfiguration(infoConfigurationJSON);
+
+        try {
+            return new Server11Repository(
+                    "dummyCollection",
+                    "http://dummy.url/",
+                    null,
+                    new InfoCollections(),
+                    infoConfiguration
+            );
+        } catch (URISyntaxException e) {
+            // Won't throw, and this won't happen.
+            return null;
+        }
+    }
+}
\ No newline at end of file
new file mode 100644
--- /dev/null
+++ b/mobile/android/tests/background/junit4/src/org/mozilla/gecko/sync/repositories/uploaders/PayloadTest.java
@@ -0,0 +1,137 @@
+/* Any copyright is dedicated to the Public Domain.
+   http://creativecommons.org/publicdomain/zero/1.0/ */
+
+package org.mozilla.gecko.sync.repositories.uploaders;
+
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mozilla.gecko.background.testhelpers.TestRunner;
+
+import static org.junit.Assert.*;
+
+@RunWith(TestRunner.class)
+public class PayloadTest {
+    private Payload payload;
+    private long byteLimit = 1024;
+    private long recordLimit = 5;
+    private Object lock = new Object();
+
+    @Before
+    public void setUp() throws Exception {
+        payload = new Payload(lock, byteLimit, recordLimit);
+    }
+
+    @Test
+    public void testByteLimits() {
+        assertTrue(payload.canFit(0));
+
+        // Should just fit
+        assertTrue(payload.canFit(byteLimit - BatchingUploader.PER_PAYLOAD_OVERHEAD_BYTE_COUNT));
+
+        // Can't fit a record due to payload overhead.
+        assertFalse(payload.canFit(byteLimit));
+
+        assertFalse(payload.canFit(byteLimit + BatchingUploader.PER_PAYLOAD_OVERHEAD_BYTE_COUNT));
+        assertFalse(payload.canFit(byteLimit * 1000));
+
+        long recordDelta = byteLimit / 2;
+        assertFalse(payload.addAndEstimateIfFull(recordDelta, new byte[0], null));
+
+        // Record delta shouldn't fit due to payload overhead.
+        assertFalse(payload.canFit(recordDelta));
+    }
+
+    @Test
+    public void testCountLimits() {
+        byte[] bytes = new byte[0];
+
+        // Our record limit is 5, let's add 4.
+        assertFalse(payload.addAndEstimateIfFull(1, bytes, null));
+        assertFalse(payload.addAndEstimateIfFull(1, bytes, null));
+        assertFalse(payload.addAndEstimateIfFull(1, bytes, null));
+        assertFalse(payload.addAndEstimateIfFull(1, bytes, null));
+
+        // 5th record still fits in
+        assertTrue(payload.canFit(1));
+
+        // Add the 5th record
+        assertTrue(payload.addAndEstimateIfFull(1, bytes, null));
+
+        // 6th record won't fit
+        assertFalse(payload.canFit(1));
+    }
+
+    @Test
+    public void testAdd() {
+        assertTrue(payload.getByteCount() == 2 * BatchingUploader.PER_PAYLOAD_OVERHEAD_BYTE_COUNT);
+        assertTrue(payload.getRecordCount() == 0);
+        assertTrue(payload.isEmpty());
+        assertTrue(payload.getRecordsBuffer().isEmpty());
+        assertTrue(payload.getRecordGuidsBuffer().isEmpty());
+
+        try {
+            payload.addAndEstimateIfFull(1024);
+            fail("Simple add is not supported");
+        } catch (UnsupportedOperationException e) {
+            assertTrue(true);
+        }
+
+        byte[] recordBytes1 = new byte[100];
+        assertFalse(payload.addAndEstimateIfFull(1, recordBytes1, "guid1"));
+
+        assertTrue(payload.getRecordsBuffer().size() == 1);
+        assertTrue(payload.getRecordGuidsBuffer().size() == 1);
+        assertTrue(payload.getRecordGuidsBuffer().contains("guid1"));
+        assertTrue(payload.getRecordsBuffer().contains(recordBytes1));
+
+        assertTrue(payload.getByteCount() == (1 + BatchingUploader.PER_PAYLOAD_OVERHEAD_BYTE_COUNT));
+        assertTrue(payload.getRecordCount() == 1);
+
+        assertFalse(payload.isEmpty());
+
+        assertFalse(payload.addAndEstimateIfFull(1, recordBytes1, "guid2"));
+        assertFalse(payload.addAndEstimateIfFull(1, recordBytes1, "guid3"));
+        assertFalse(payload.addAndEstimateIfFull(1, recordBytes1, "guid4"));
+
+        assertTrue(payload.getByteCount() == (4 + BatchingUploader.PER_PAYLOAD_OVERHEAD_BYTE_COUNT));
+        assertTrue(payload.getRecordCount() == 4);
+
+        assertTrue(payload.addAndEstimateIfFull(1, recordBytes1, "guid5"));
+
+        try {
+            assertTrue(payload.addAndEstimateIfFull(1, recordBytes1, "guid6"));
+            fail("Payload should not let us insert records that won't fit");
+        } catch (IllegalStateException e) {
+            assertTrue(true);
+        }
+    }
+
+    @Test
+    public void testReset() {
+        assertTrue(payload.getByteCount() == 2 * BatchingUploader.PER_PAYLOAD_OVERHEAD_BYTE_COUNT);
+        assertTrue(payload.getRecordCount() == 0);
+        assertTrue(payload.getRecordsBuffer().isEmpty());
+        assertTrue(payload.getRecordGuidsBuffer().isEmpty());
+        assertTrue(payload.isEmpty());
+
+        // Shouldn't throw even if already empty
+        payload.reset();
+        assertTrue(payload.getByteCount() == 2 * BatchingUploader.PER_PAYLOAD_OVERHEAD_BYTE_COUNT);
+        assertTrue(payload.getRecordCount() == 0);
+        assertTrue(payload.getRecordsBuffer().isEmpty());
+        assertTrue(payload.getRecordGuidsBuffer().isEmpty());
+        assertTrue(payload.isEmpty());
+
+        byte[] recordBytes1 = new byte[100];
+        assertFalse(payload.addAndEstimateIfFull(1, recordBytes1, "guid1"));
+        assertFalse(payload.isEmpty());
+        payload.reset();
+
+        assertTrue(payload.getByteCount() == 2 * BatchingUploader.PER_PAYLOAD_OVERHEAD_BYTE_COUNT);
+        assertTrue(payload.getRecordCount() == 0);
+        assertTrue(payload.getRecordsBuffer().isEmpty());
+        assertTrue(payload.getRecordGuidsBuffer().isEmpty());
+        assertTrue(payload.isEmpty());
+    }
+}
\ No newline at end of file
new file mode 100644
--- /dev/null
+++ b/mobile/android/tests/background/junit4/src/org/mozilla/gecko/sync/repositories/uploaders/PayloadUploadDelegateTest.java
@@ -0,0 +1,404 @@
+/* Any copyright is dedicated to the Public Domain.
+   http://creativecommons.org/publicdomain/zero/1.0/ */
+
+package org.mozilla.gecko.sync.repositories.uploaders;
+
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mozilla.gecko.background.testhelpers.TestRunner;
+import org.mozilla.gecko.sync.HTTPFailureException;
+import org.mozilla.gecko.sync.InfoCollections;
+import org.mozilla.gecko.sync.InfoConfiguration;
+import org.mozilla.gecko.sync.NonObjectJSONException;
+import org.mozilla.gecko.sync.net.SyncResponse;
+import org.mozilla.gecko.sync.net.SyncStorageResponse;
+import org.mozilla.gecko.sync.repositories.Server11Repository;
+import org.mozilla.gecko.sync.repositories.Server11RepositorySession;
+import org.mozilla.gecko.sync.repositories.delegates.RepositorySessionStoreDelegate;
+
+import java.io.ByteArrayInputStream;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.concurrent.Executor;
+
+import ch.boye.httpclientandroidlib.HttpResponse;
+import ch.boye.httpclientandroidlib.ProtocolVersion;
+import ch.boye.httpclientandroidlib.entity.BasicHttpEntity;
+import ch.boye.httpclientandroidlib.message.BasicHttpResponse;
+import ch.boye.httpclientandroidlib.message.BasicStatusLine;
+
+import static org.junit.Assert.*;
+
+@RunWith(TestRunner.class)
+public class PayloadUploadDelegateTest {
+    private BatchingUploader batchingUploader;
+
+    class MockUploader extends BatchingUploader {
+        public final ArrayList<String> successRecords = new ArrayList<>();
+        public final HashMap<String, Exception> failedRecords = new HashMap<>();
+        public boolean didLastPayloadFail = false;
+
+        public ArrayList<SyncStorageResponse> successResponses = new ArrayList<>();
+        public int commitPayloadsSucceeded = 0;
+        public int lastPayloadsSucceeded = 0;
+
+        public MockUploader(final Server11RepositorySession repositorySession, final Executor workQueue, final RepositorySessionStoreDelegate sessionStoreDelegate) {
+            super(repositorySession, workQueue, sessionStoreDelegate);
+        }
+
+        @Override
+        public void payloadSucceeded(final SyncStorageResponse response, final boolean isCommit, final boolean isLastPayload) {
+            successResponses.add(response);
+            if (isCommit) {
+                ++commitPayloadsSucceeded;
+            }
+            if (isLastPayload) {
+                ++lastPayloadsSucceeded;
+            }
+        }
+
+        @Override
+        public void recordSucceeded(final String recordGuid) {
+            successRecords.add(recordGuid);
+        }
+
+        @Override
+        public void recordFailed(final String recordGuid) {
+            recordFailed(new Exception(), recordGuid);
+        }
+
+        @Override
+        public void recordFailed(final Exception e, final String recordGuid) {
+            failedRecords.put(recordGuid, e);
+        }
+
+        @Override
+        public void lastPayloadFailed() {
+            didLastPayloadFail = true;
+        }
+    }
+
+    @Before
+    public void setUp() throws Exception {
+        Server11Repository server11Repository = new Server11Repository(
+                "dummyCollection",
+                "http://dummy.url/",
+                null,
+                new InfoCollections(),
+                new InfoConfiguration()
+        );
+        batchingUploader = new MockUploader(
+                new Server11RepositorySession(server11Repository),
+                null,
+                null
+        );
+    }
+
+    @Test
+    public void testHandleRequestSuccessNonSuccess() {
+        ArrayList<String> postedGuids = new ArrayList<>(2);
+        postedGuids.add("testGuid1");
+        postedGuids.add("testGuid2");
+        PayloadUploadDelegate payloadUploadDelegate = new PayloadUploadDelegate(
+                batchingUploader, postedGuids, false, false);
+
+        // Test that non-2* responses aren't processed
+        payloadUploadDelegate.handleRequestSuccess(makeSyncStorageResponse(404, null, null));
+        assertEquals(2, ((MockUploader) batchingUploader).failedRecords.size());
+        assertFalse(((MockUploader) batchingUploader).didLastPayloadFail);
+        assertEquals(IllegalStateException.class,
+                ((MockUploader) batchingUploader).failedRecords.get("testGuid1").getClass());
+        assertEquals(IllegalStateException.class,
+                ((MockUploader) batchingUploader).failedRecords.get("testGuid2").getClass());
+    }
+
+    @Test
+    public void testHandleRequestSuccessNoHeaders() {
+        ArrayList<String> postedGuids = new ArrayList<>(2);
+        postedGuids.add("testGuid1");
+        postedGuids.add("testGuid2");
+        PayloadUploadDelegate payloadUploadDelegate = new PayloadUploadDelegate(
+                batchingUploader, postedGuids, false, false);
+
+        // Test that responses without X-Last-Modified header aren't processed
+        payloadUploadDelegate.handleRequestSuccess(makeSyncStorageResponse(200, null, null));
+        assertEquals(2, ((MockUploader) batchingUploader).failedRecords.size());
+        assertFalse(((MockUploader) batchingUploader).didLastPayloadFail);
+        assertEquals(IllegalStateException.class,
+                ((MockUploader) batchingUploader).failedRecords.get("testGuid1").getClass());
+        assertEquals(IllegalStateException.class,
+                ((MockUploader) batchingUploader).failedRecords.get("testGuid2").getClass());
+    }
+
+    @Test
+    public void testHandleRequestSuccessBadBody() {
+        ArrayList<String> postedGuids = new ArrayList<>(2);
+        postedGuids.add("testGuid1");
+        postedGuids.add("testGuid2");
+        PayloadUploadDelegate payloadUploadDelegate = new PayloadUploadDelegate(
+                batchingUploader, postedGuids, false, true);
+
+        // Test that we catch json processing errors
+        payloadUploadDelegate.handleRequestSuccess(makeSyncStorageResponse(200, "non json body", "123"));
+        assertEquals(2, ((MockUploader) batchingUploader).failedRecords.size());
+        assertTrue(((MockUploader) batchingUploader).didLastPayloadFail);
+        assertEquals(NonObjectJSONException.class,
+                ((MockUploader) batchingUploader).failedRecords.get("testGuid1").getClass());
+        assertEquals(NonObjectJSONException.class,
+                ((MockUploader) batchingUploader).failedRecords.get("testGuid2").getClass());
+    }
+
+    @Test
+    public void testHandleRequestSuccess202NoToken() {
+        ArrayList<String> postedGuids = new ArrayList<>(1);
+        postedGuids.add("testGuid1");
+        PayloadUploadDelegate payloadUploadDelegate = new PayloadUploadDelegate(
+                batchingUploader, postedGuids, false, true);
+
+        // Test that we catch absent tokens in 202 responses
+        payloadUploadDelegate.handleRequestSuccess(makeSyncStorageResponse(202, "{\"success\": []}", "123"));
+        assertEquals(1, ((MockUploader) batchingUploader).failedRecords.size());
+        assertEquals(IllegalStateException.class,
+                ((MockUploader) batchingUploader).failedRecords.get("testGuid1").getClass());
+    }
+
+    @Test
+    public void testHandleRequestSuccessBad200() {
+        ArrayList<String> postedGuids = new ArrayList<>(1);
+        postedGuids.add("testGuid1");
+
+        PayloadUploadDelegate payloadUploadDelegate = new PayloadUploadDelegate(
+                batchingUploader, postedGuids, false, false);
+
+        // Test that if in batching mode and saw the token, 200 must be a response to a commit
+        try {
+            batchingUploader.getCurrentBatch().setToken("MTIzNA", true);
+        } catch (BatchingUploader.BatchingUploaderException e) {}
+        batchingUploader.setInBatchingMode(true);
+
+        // not a commit, so should fail
+        payloadUploadDelegate.handleRequestSuccess(makeSyncStorageResponse(200, "{\"success\": []}", "123"));
+        assertEquals(1, ((MockUploader) batchingUploader).failedRecords.size());
+        assertEquals(IllegalStateException.class,
+                ((MockUploader) batchingUploader).failedRecords.get("testGuid1").getClass());
+    }
+
+    @Test
+    public void testHandleRequestSuccessNonBatchingFailedLM() {
+        ArrayList<String> postedGuids = new ArrayList<>(1);
+        postedGuids.add("guid1");
+        postedGuids.add("guid2");
+        postedGuids.add("guid3");
+        PayloadUploadDelegate payloadUploadDelegate = new PayloadUploadDelegate(
+                batchingUploader, postedGuids, false, false);
+
+        payloadUploadDelegate.handleRequestSuccess(
+                makeSyncStorageResponse(200, "{\"success\": [\"guid1\", \"guid2\", \"guid3\"]}", "123"));
+        assertEquals(0, ((MockUploader) batchingUploader).failedRecords.size());
+        assertEquals(3, ((MockUploader) batchingUploader).successRecords.size());
+        assertFalse(((MockUploader) batchingUploader).didLastPayloadFail);
+        assertEquals(1, ((MockUploader) batchingUploader).successResponses.size());
+        assertEquals(0, ((MockUploader) batchingUploader).commitPayloadsSucceeded);
+        assertEquals(0, ((MockUploader) batchingUploader).lastPayloadsSucceeded);
+
+        // These should fail, because we're returning a non-changed L-M in a non-batching mode
+        postedGuids.add("guid4");
+        postedGuids.add("guid6");
+        payloadUploadDelegate = new PayloadUploadDelegate(
+                batchingUploader, postedGuids, false, false);
+        payloadUploadDelegate.handleRequestSuccess(
+                makeSyncStorageResponse(200, "{\"success\": [\"guid4\", 5, \"guid6\"]}", "123"));
+        assertEquals(5, ((MockUploader) batchingUploader).failedRecords.size());
+        assertEquals(3, ((MockUploader) batchingUploader).successRecords.size());
+        assertFalse(((MockUploader) batchingUploader).didLastPayloadFail);
+        assertEquals(1, ((MockUploader) batchingUploader).successResponses.size());
+        assertEquals(0, ((MockUploader) batchingUploader).commitPayloadsSucceeded);
+        assertEquals(0, ((MockUploader) batchingUploader).lastPayloadsSucceeded);
+        assertEquals(BatchingUploader.LastModifiedDidNotChange.class,
+                ((MockUploader) batchingUploader).failedRecords.get("guid4").getClass());
+    }
+
+    @Test
+    public void testHandleRequestSuccessNonBatching() {
+        ArrayList<String> postedGuids = new ArrayList<>();
+        postedGuids.add("guid1");
+        postedGuids.add("guid2");
+        postedGuids.add("guid3");
+        PayloadUploadDelegate payloadUploadDelegate = new PayloadUploadDelegate(
+                batchingUploader, postedGuids, false, false);
+        payloadUploadDelegate.handleRequestSuccess(
+                makeSyncStorageResponse(200, "{\"success\": [\"guid1\", \"guid2\", \"guid3\"], \"failed\": {}}", "123"));
+
+        postedGuids = new ArrayList<>();
+        postedGuids.add("guid4");
+        postedGuids.add("guid5");
+        payloadUploadDelegate = new PayloadUploadDelegate(
+                batchingUploader, postedGuids, false, false);
+        payloadUploadDelegate.handleRequestSuccess(
+                makeSyncStorageResponse(200, "{\"success\": [\"guid4\", \"guid5\"], \"failed\": {}}", "333"));
+
+        postedGuids = new ArrayList<>();
+        postedGuids.add("guid6");
+        payloadUploadDelegate = new PayloadUploadDelegate(
+                batchingUploader, postedGuids, false, true);
+        payloadUploadDelegate.handleRequestSuccess(
+                makeSyncStorageResponse(200, "{\"success\": [\"guid6\"], \"failed\": {}}", "444"));
+
+        assertEquals(0, ((MockUploader) batchingUploader).failedRecords.size());
+        assertEquals(6, ((MockUploader) batchingUploader).successRecords.size());
+        assertFalse(((MockUploader) batchingUploader).didLastPayloadFail);
+        assertEquals(3, ((MockUploader) batchingUploader).successResponses.size());
+        assertEquals(0, ((MockUploader) batchingUploader).commitPayloadsSucceeded);
+        assertEquals(1, ((MockUploader) batchingUploader).lastPayloadsSucceeded);
+        assertFalse(batchingUploader.getInBatchingMode());
+
+        postedGuids = new ArrayList<>();
+        postedGuids.add("guid7");
+        postedGuids.add("guid8");
+        payloadUploadDelegate = new PayloadUploadDelegate(
+                batchingUploader, postedGuids, false, true);
+        payloadUploadDelegate.handleRequestSuccess(
+                makeSyncStorageResponse(200, "{\"success\": [\"guid8\"], \"failed\": {\"guid7\": \"reason\"}}", "555"));
+        assertEquals(1, ((MockUploader) batchingUploader).failedRecords.size());
+        assertTrue(((MockUploader) batchingUploader).failedRecords.containsKey("guid7"));
+        assertEquals(7, ((MockUploader) batchingUploader).successRecords.size());
+        assertFalse(((MockUploader) batchingUploader).didLastPayloadFail);
+        assertEquals(4, ((MockUploader) batchingUploader).successResponses.size());
+        assertEquals(0, ((MockUploader) batchingUploader).commitPayloadsSucceeded);
+        assertEquals(2, ((MockUploader) batchingUploader).lastPayloadsSucceeded);
+        assertFalse(batchingUploader.getInBatchingMode());
+    }
+
+    @Test
+    public void testHandleRequestSuccessBatching() {
+        ArrayList<String> postedGuids = new ArrayList<>();
+        postedGuids.add("guid1");
+        postedGuids.add("guid2");
+        postedGuids.add("guid3");
+        PayloadUploadDelegate payloadUploadDelegate = new PayloadUploadDelegate(
+                batchingUploader, postedGuids, false, false);
+        payloadUploadDelegate.handleRequestSuccess(
+                makeSyncStorageResponse(202, "{\"batch\": \"MTIzNA\", \"success\": [\"guid1\", \"guid2\", \"guid3\"], \"failed\": {}}", "123"));
+
+        assertTrue(batchingUploader.getInBatchingMode());
+        assertEquals("MTIzNA", batchingUploader.getCurrentBatch().getToken());
+
+        postedGuids = new ArrayList<>();
+        postedGuids.add("guid4");
+        postedGuids.add("guid5");
+        postedGuids.add("guid6");
+        payloadUploadDelegate = new PayloadUploadDelegate(
+                batchingUploader, postedGuids, false, false);
+        payloadUploadDelegate.handleRequestSuccess(
+                makeSyncStorageResponse(202, "{\"batch\": \"MTIzNA\", \"success\": [\"guid4\", \"guid5\", \"guid6\"], \"failed\": {}}", "123"));
+
+        assertTrue(batchingUploader.getInBatchingMode());
+        assertEquals("MTIzNA", batchingUploader.getCurrentBatch().getToken());
+
+        postedGuids = new ArrayList<>();
+        postedGuids.add("guid7");
+        payloadUploadDelegate = new PayloadUploadDelegate(
+                batchingUploader, postedGuids, true, false);
+        payloadUploadDelegate.handleRequestSuccess(
+                makeSyncStorageResponse(200, "{\"success\": [\"guid6\"], \"failed\": {}}", "222"));
+
+        // Even though everything indicates we're not in a batching, we were, so test that
+        // we don't reset the flag.
+        assertTrue(batchingUploader.getInBatchingMode());
+        assertNull(batchingUploader.getCurrentBatch().getToken());
+
+        postedGuids = new ArrayList<>();
+        postedGuids.add("guid8");
+        payloadUploadDelegate = new PayloadUploadDelegate(
+                batchingUploader, postedGuids, true, true);
+        payloadUploadDelegate.handleRequestSuccess(
+                makeSyncStorageResponse(200, "{\"success\": [\"guid7\"], \"failed\": {}}", "333"));
+
+        assertEquals(0, ((MockUploader) batchingUploader).failedRecords.size());
+        assertEquals(8, ((MockUploader) batchingUploader).successRecords.size());
+        assertFalse(((MockUploader) batchingUploader).didLastPayloadFail);
+        assertEquals(4, ((MockUploader) batchingUploader).successResponses.size());
+        assertEquals(2, ((MockUploader) batchingUploader).commitPayloadsSucceeded);
+        assertEquals(1, ((MockUploader) batchingUploader).lastPayloadsSucceeded);
+        assertTrue(batchingUploader.getInBatchingMode());
+    }
+
+    @Test
+    public void testHandleRequestError() {
+        ArrayList<String> postedGuids = new ArrayList<>(3);
+        postedGuids.add("testGuid1");
+        postedGuids.add("testGuid2");
+        postedGuids.add("testGuid3");
+        PayloadUploadDelegate payloadUploadDelegate = new PayloadUploadDelegate(batchingUploader, postedGuids, false, false);
+
+        IllegalStateException e = new IllegalStateException();
+        payloadUploadDelegate.handleRequestError(e);
+
+        assertEquals(3, ((MockUploader) batchingUploader).failedRecords.size());
+        assertEquals(e, ((MockUploader) batchingUploader).failedRecords.get("testGuid1"));
+        assertEquals(e, ((MockUploader) batchingUploader).failedRecords.get("testGuid2"));
+        assertEquals(e, ((MockUploader) batchingUploader).failedRecords.get("testGuid3"));
+        assertFalse(((MockUploader) batchingUploader).didLastPayloadFail);
+
+        payloadUploadDelegate = new PayloadUploadDelegate(batchingUploader, postedGuids, false, true);
+        payloadUploadDelegate.handleRequestError(e);
+        assertEquals(3, ((MockUploader) batchingUploader).failedRecords.size());
+        assertTrue(((MockUploader) batchingUploader).didLastPayloadFail);
+    }
+
+    @Test
+    public void testHandleRequestFailure() {
+        ArrayList<String> postedGuids = new ArrayList<>(3);
+        postedGuids.add("testGuid1");
+        postedGuids.add("testGuid2");
+        postedGuids.add("testGuid3");
+        PayloadUploadDelegate payloadUploadDelegate = new PayloadUploadDelegate(batchingUploader, postedGuids, false, false);
+
+        final HttpResponse response = new BasicHttpResponse(
+                new BasicStatusLine(new ProtocolVersion("HTTP", 1, 1), 503, "Illegal method/protocol"));
+        payloadUploadDelegate.handleRequestFailure(new SyncStorageResponse(response));
+        assertEquals(3, ((MockUploader) batchingUploader).failedRecords.size());
+        assertEquals(HTTPFailureException.class,
+                ((MockUploader) batchingUploader).failedRecords.get("testGuid1").getClass());
+        assertEquals(HTTPFailureException.class,
+                ((MockUploader) batchingUploader).failedRecords.get("testGuid2").getClass());
+        assertEquals(HTTPFailureException.class,
+                ((MockUploader) batchingUploader).failedRecords.get("testGuid3").getClass());
+
+        payloadUploadDelegate = new PayloadUploadDelegate(batchingUploader, postedGuids, false, true);
+        payloadUploadDelegate.handleRequestFailure(new SyncStorageResponse(response));
+        assertEquals(3, ((MockUploader) batchingUploader).failedRecords.size());
+        assertTrue(((MockUploader) batchingUploader).didLastPayloadFail);
+    }
+
+    @Test
+    public void testIfUnmodifiedSince() {
+        PayloadUploadDelegate payloadUploadDelegate = new PayloadUploadDelegate(
+                batchingUploader, new ArrayList<String>(), false, false);
+
+        assertNull(payloadUploadDelegate.ifUnmodifiedSince());
+
+        try {
+            batchingUploader.getCurrentBatch().setLastModified(1471645412480L, true);
+        } catch (BatchingUploader.BatchingUploaderException e) {}
+
+        assertEquals("1471645412.480", payloadUploadDelegate.ifUnmodifiedSince());
+    }
+
+    private SyncStorageResponse makeSyncStorageResponse(int code, String body, String lastModified) {
+        BasicHttpResponse response = new BasicHttpResponse(
+                new BasicStatusLine(new ProtocolVersion("HTTP", 1, 1), code, null));
+
+        if (body != null) {
+            BasicHttpEntity entity = new BasicHttpEntity();
+            entity.setContent(new ByteArrayInputStream(body.getBytes()));
+            response.setEntity(entity);
+        }
+
+        if (lastModified != null) {
+            response.addHeader(SyncResponse.X_LAST_MODIFIED, lastModified);
+        }
+        return new SyncStorageResponse(response);
+    }
+}
\ No newline at end of file
new file mode 100644
--- /dev/null
+++ b/mobile/android/tests/background/junit4/src/org/mozilla/gecko/sync/repositories/uploaders/RecordUploadRunnableTest.java
@@ -0,0 +1,38 @@
+/* Any copyright is dedicated to the Public Domain.
+   http://creativecommons.org/publicdomain/zero/1.0/ */
+
+package org.mozilla.gecko.sync.repositories.uploaders;
+
+import android.net.Uri;
+
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mozilla.gecko.background.testhelpers.TestRunner;
+
+import java.net.URI;
+
+import static org.junit.Assert.*;
+
+@RunWith(TestRunner.class)
+public class RecordUploadRunnableTest {
+    @Test
+    public void testBuildPostURI() throws Exception {
+        BatchMeta batchMeta = new BatchMeta(new Object(), 1, 1, null);
+        URI postURI = RecordUploadRunnable.buildPostURI(
+                false, batchMeta, Uri.parse("http://example.com/"));
+        assertEquals("http://example.com/?batch=true", postURI.toString());
+
+        postURI = RecordUploadRunnable.buildPostURI(
+                true, batchMeta, Uri.parse("http://example.com/"));
+        assertEquals("http://example.com/?batch=true&commit=true", postURI.toString());
+
+        batchMeta.setToken("MTIzNA", false);
+        postURI = RecordUploadRunnable.buildPostURI(
+                false, batchMeta, Uri.parse("http://example.com/"));
+        assertEquals("http://example.com/?batch=MTIzNA", postURI.toString());
+
+        postURI = RecordUploadRunnable.buildPostURI(
+                true, batchMeta, Uri.parse("http://example.com/"));
+        assertEquals("http://example.com/?batch=MTIzNA&commit=true", postURI.toString());
+    }
+}
\ No newline at end of file