Bug 1408710 - Serialize RecordsChannel r=rnewman draft
authorGrigory Kruglov <gkruglov@mozilla.com>
Mon, 26 Feb 2018 15:12:34 -0500
changeset 760194 62f5f7940bb8db9a18704edfd0b9cb38eb410b71
parent 760193 5f4092c2629414fc4f9051d8f8f0c113f1d6910d
child 760195 e7443ffabde02059008e6c833ee52c45e206ec81
push id100565
push userbmo:gkruglov@mozilla.com
push dateMon, 26 Feb 2018 23:36:11 +0000
reviewersrnewman
bugs1408710
milestone60.0a1
Bug 1408710 - Serialize RecordsChannel r=rnewman This patch does two things: - serializes flow of records through the RecordsChannel - simplifies the batching logic The two are connected: rather than queuing records in ConcurrentLinkedQueue, we now buffer downloaded records in an ArrayList, and deliver them to the receiving repository all at once. Doing this work right at the channel level lets us kill off the buffering middleware. An addition of a NonBufferingSyncStage lets individual SyncStages use a RecordsChannel which doesn't perform any kind of buffering. Prior, stages did this by wrapping their receiving repositories in the buffering middleware. The main goal is to speed up the flow of records, keep within the same memory footprint and do some simplification in the process. This patch explicitly does not address the delegated nature of fetch and store, which is now largely irrelevant. MozReview-Commit-ID: J2afmgr1Td1
mobile/android/base/java/org/mozilla/gecko/db/BrowserProvider.java
mobile/android/services/src/androidTest/java/org/mozilla/gecko/background/db/TestBookmarks.java
mobile/android/services/src/androidTest/java/org/mozilla/gecko/background/sync/TestStoreTracking.java
mobile/android/services/src/androidTest/java/org/mozilla/gecko/background/sync/helpers/DefaultFetchDelegate.java
mobile/android/services/src/main/java/org/mozilla/gecko/sync/middleware/BufferingMiddlewareRepository.java
mobile/android/services/src/main/java/org/mozilla/gecko/sync/middleware/BufferingMiddlewareRepositorySession.java
mobile/android/services/src/main/java/org/mozilla/gecko/sync/middleware/Crypto5MiddlewareRepositorySession.java
mobile/android/services/src/main/java/org/mozilla/gecko/sync/middleware/storage/BufferStorage.java
mobile/android/services/src/main/java/org/mozilla/gecko/sync/middleware/storage/MemoryBufferStorage.java
mobile/android/services/src/main/java/org/mozilla/gecko/sync/repositories/RepositorySession.java
mobile/android/services/src/main/java/org/mozilla/gecko/sync/repositories/StoreTrackingRepositorySession.java
mobile/android/services/src/main/java/org/mozilla/gecko/sync/repositories/VersioningDelegateHelper.java
mobile/android/services/src/main/java/org/mozilla/gecko/sync/repositories/android/BookmarksValidationRepository.java
mobile/android/services/src/main/java/org/mozilla/gecko/sync/repositories/delegates/DeferredRepositorySessionFetchRecordsDelegate.java
mobile/android/services/src/main/java/org/mozilla/gecko/sync/repositories/delegates/RepositorySessionFetchRecordsDelegate.java
mobile/android/services/src/main/java/org/mozilla/gecko/sync/repositories/downloaders/BatchingDownloader.java
mobile/android/services/src/main/java/org/mozilla/gecko/sync/stage/BookmarksServerSyncStage.java
mobile/android/services/src/main/java/org/mozilla/gecko/sync/stage/FennecTabsServerSyncStage.java
mobile/android/services/src/main/java/org/mozilla/gecko/sync/stage/FormHistoryServerSyncStage.java
mobile/android/services/src/main/java/org/mozilla/gecko/sync/stage/HistoryServerSyncStage.java
mobile/android/services/src/main/java/org/mozilla/gecko/sync/stage/NonBufferingServerSyncStage.java
mobile/android/services/src/main/java/org/mozilla/gecko/sync/stage/PasswordsServerSyncStage.java
mobile/android/services/src/main/java/org/mozilla/gecko/sync/stage/RecentHistoryServerSyncStage.java
mobile/android/services/src/main/java/org/mozilla/gecko/sync/stage/ServerSyncStage.java
mobile/android/services/src/main/java/org/mozilla/gecko/sync/synchronizer/ConcurrentRecordConsumer.java
mobile/android/services/src/main/java/org/mozilla/gecko/sync/synchronizer/NonBufferingRecordsChannel.java
mobile/android/services/src/main/java/org/mozilla/gecko/sync/synchronizer/NonBufferingSynchronizer.java
mobile/android/services/src/main/java/org/mozilla/gecko/sync/synchronizer/NonBufferingSynchronizerSession.java
mobile/android/services/src/main/java/org/mozilla/gecko/sync/synchronizer/RecordsChannel.java
mobile/android/services/src/main/java/org/mozilla/gecko/sync/synchronizer/Synchronizer.java
mobile/android/services/src/main/java/org/mozilla/gecko/sync/synchronizer/SynchronizerSession.java
mobile/android/services/src/test/java/org/mozilla/android/sync/test/SynchronizerHelpers.java
mobile/android/services/src/test/java/org/mozilla/android/sync/test/TestRecordsChannel.java
mobile/android/services/src/test/java/org/mozilla/android/sync/test/helpers/ExpectSuccessRepositorySessionFetchRecordsDelegate.java
mobile/android/services/src/test/java/org/mozilla/gecko/sync/middleware/BufferingMiddlewareRepositorySessionTest.java
mobile/android/services/src/test/java/org/mozilla/gecko/sync/repositories/downloaders/BatchingDownloaderDelegateTest.java
mobile/android/services/src/test/java/org/mozilla/gecko/sync/repositories/downloaders/BatchingDownloaderTest.java
--- a/mobile/android/base/java/org/mozilla/gecko/db/BrowserProvider.java
+++ b/mobile/android/base/java/org/mozilla/gecko/db/BrowserProvider.java
@@ -2959,17 +2959,17 @@ public class BrowserProvider extends Sha
 
                     // NB:
                     // Constraint exception will be thrown if we try to insert a visit violating
                     // unique(guid, date) constraint. We don't expect to do that, but our incoming
                     // data might not be clean - either due to duplicate entries in the sync data,
                     // or, less likely, due to record reconciliation bugs at the RepositorySession
                     // level.
                     } catch (SQLiteConstraintException e) {
-                        Log.w(LOGTAG, "Unexpected constraint exception while inserting a visit", e);
+                        // Don't log this, it'll just cause memory churn.
                     }
                 }
                 if (inserted != valueSet.length) {
                     Log.w(LOGTAG, "Failed to insert some of the visits. " +
                             "Expected: " + valueSet.length + ", actual: " + inserted);
                 }
                 totalInserted += inserted;
             }
--- a/mobile/android/services/src/androidTest/java/org/mozilla/gecko/background/db/TestBookmarks.java
+++ b/mobile/android/services/src/androidTest/java/org/mozilla/gecko/background/db/TestBookmarks.java
@@ -643,22 +643,17 @@ public class TestBookmarks extends Andro
       public void onFetchedRecord(Record record) {
         fetchedGUIDs.add(record.guid);
       }
 
       @Override
       public void onFetchCompleted() {
         finishAndNotify(session);
       }
-
-      @Override
-      public void onBatchCompleted() {
-
-      }
-    };
+     };
     session.fetchModified(fetchDelegate);
 
     return fetchedGUIDs;
   }
 
   private BookmarkRecord fetchGUID(BookmarksRepository repo,
                                   final String guid) throws SyncException {
     Logger.info(LOG_TAG, "Fetching for " + guid);
@@ -672,21 +667,16 @@ public class TestBookmarks extends Andro
       public void onFetchedRecord(Record record) {
         fetchedRecords.add(record);
       }
 
       @Override
       public void onFetchCompleted() {
         finishAndNotify(session);
       }
-
-      @Override
-      public void onBatchCompleted() {
-
-      }
     };
 
     performWait(new Runnable() {
       @Override
       public void run() {
         try {
           session.fetch(new String[]{guid}, fetchDelegate);
         } catch (InactiveSessionException e) {
--- a/mobile/android/services/src/androidTest/java/org/mozilla/gecko/background/sync/TestStoreTracking.java
+++ b/mobile/android/services/src/androidTest/java/org/mozilla/gecko/background/sync/TestStoreTracking.java
@@ -101,28 +101,18 @@ public class TestStoreTracking extends A
                                                     RepositorySessionBundle bundle) {
                         performNotify();
                       }
                     });
                   } catch (InactiveSessionException e) {
                     performNotify(e);
                   }
                 }
-
-                @Override
-                public void onBatchCompleted() {
-
-                }
               });
             }
-
-            @Override
-            public void onBatchCompleted() {
-
-            }
           });
         } catch (InactiveSessionException e) {
           performNotify(e);
         }
       }
 
       @Override
       public void onStoreFailed(Exception e) {
@@ -177,20 +167,16 @@ public class TestStoreTracking extends A
                 public void onFinishFailed(Exception ex) {
                   performNotify(ex);
                 }
               });
             } catch (InactiveSessionException e) {
               performNotify(e);
             }
           }
-
-          @Override
-          public void onBatchCompleted() {
-          }
         });
       }
     };
 
     performWait(doFetch);
   }
 
   /**
--- a/mobile/android/services/src/androidTest/java/org/mozilla/gecko/background/sync/helpers/DefaultFetchDelegate.java
+++ b/mobile/android/services/src/androidTest/java/org/mozilla/gecko/background/sync/helpers/DefaultFetchDelegate.java
@@ -94,17 +94,12 @@ public class DefaultFetchDelegate extend
   }
 
   @Override
   public void onFetchCompleted() {
     Logger.debug(LOG_TAG, "onFetchCompleted. Doing nothing.");
   }
 
   @Override
-  public void onBatchCompleted() {
-    Logger.debug(LOG_TAG, "onBatchCompleted. Doing nothing.");
-  }
-
-  @Override
   public RepositorySessionFetchRecordsDelegate deferredFetchDelegate(final ExecutorService executor) {
     return new DeferredRepositorySessionFetchRecordsDelegate(this, executor);
   }
 }
deleted file mode 100644
--- a/mobile/android/services/src/main/java/org/mozilla/gecko/sync/middleware/BufferingMiddlewareRepository.java
+++ /dev/null
@@ -1,36 +0,0 @@
-/* This Source Code Form is subject to the terms of the Mozilla Public
- * License, v. 2.0. If a copy of the MPL was not distributed with this
- * file, You can obtain one at http://mozilla.org/MPL/2.0/. */
-
-package org.mozilla.gecko.sync.middleware;
-
-import android.content.Context;
-
-import org.mozilla.gecko.sync.SessionCreateException;
-import org.mozilla.gecko.sync.middleware.storage.BufferStorage;
-import org.mozilla.gecko.sync.repositories.Repository;
-import org.mozilla.gecko.sync.repositories.RepositorySession;
-
-/**
- * A buffering-enabled middleware which is intended to wrap local repositories. Configurable with
- * a sync deadline, buffer storage implementation and a consistency checker implementation.
- *
- * @author grisha
- */
-public class BufferingMiddlewareRepository extends Repository {
-    private final long syncDeadline;
-    private final Repository inner;
-    private final BufferStorage bufferStorage;
-
-    public BufferingMiddlewareRepository(long syncDeadline, BufferStorage bufferStore, Repository wrappedRepository) {
-        this.syncDeadline = syncDeadline;
-        this.inner = wrappedRepository;
-        this.bufferStorage = bufferStore;
-    }
-
-    @Override
-    public RepositorySession createSession(Context context) throws SessionCreateException {
-        final RepositorySession innerSession = this.inner.createSession(context);
-        return new BufferingMiddlewareRepositorySession(innerSession, this, syncDeadline, bufferStorage);
-    }
-}
deleted file mode 100644
--- a/mobile/android/services/src/main/java/org/mozilla/gecko/sync/middleware/BufferingMiddlewareRepositorySession.java
+++ /dev/null
@@ -1,168 +0,0 @@
-/* This Source Code Form is subject to the terms of the Mozilla Public
- * License, v. 2.0. If a copy of the MPL was not distributed with this
- * file, You can obtain one at http://mozilla.org/MPL/2.0/. */
-
-package org.mozilla.gecko.sync.middleware;
-
-import android.os.SystemClock;
-import android.support.annotation.VisibleForTesting;
-
-import org.mozilla.gecko.sync.SyncDeadlineReachedException;
-import org.mozilla.gecko.sync.middleware.storage.BufferStorage;
-import org.mozilla.gecko.sync.repositories.InactiveSessionException;
-import org.mozilla.gecko.sync.repositories.NoStoreDelegateException;
-import org.mozilla.gecko.sync.repositories.Repository;
-import org.mozilla.gecko.sync.repositories.RepositorySession;
-import org.mozilla.gecko.sync.repositories.delegates.RepositorySessionFetchRecordsDelegate;
-import org.mozilla.gecko.sync.repositories.delegates.RepositorySessionStoreDelegate;
-import org.mozilla.gecko.sync.repositories.domain.Record;
-
-import java.util.Collection;
-
-/**
- * Buffering middleware which is intended to wrap local RepositorySessions.
- *
- * Configure it:
- *  - with an appropriate BufferStore (in-memory, record-type-aware database-backed, etc).
- *
- *  Fetch is pass-through, store is buffered.
- *
- * @author grisha
- */
-/* package-local */ class BufferingMiddlewareRepositorySession extends MiddlewareRepositorySession {
-    private final BufferStorage bufferStorage;
-    private final long syncDeadlineMillis;
-
-    /* package-local */ BufferingMiddlewareRepositorySession(
-            RepositorySession repositorySession, BufferingMiddlewareRepository repository,
-            long syncDeadlineMillis, BufferStorage bufferStorage) {
-        super(repositorySession, repository);
-        this.syncDeadlineMillis = syncDeadlineMillis;
-        this.bufferStorage = bufferStorage;
-    }
-
-    @Override
-    public void fetchModified(RepositorySessionFetchRecordsDelegate delegate) {
-        this.inner.fetchModified(delegate);
-    }
-
-    @Override
-    public void fetch(String[] guids, RepositorySessionFetchRecordsDelegate delegate) throws InactiveSessionException {
-        this.inner.fetch(guids, delegate);
-    }
-
-    @Override
-    public void fetchAll(RepositorySessionFetchRecordsDelegate delegate) {
-        this.inner.fetchAll(delegate);
-    }
-
-    /**
-     * Will be called when this repository is acting as a `source`, and a flow of records into `sink`
-     * was completed. That is, we've uploaded merged records to the server, so now is a good time
-     * to clean up our buffer for this repository.
-     */
-    @Override
-    public void performCleanup() {
-        bufferStorage.clear();
-        inner.performCleanup();
-    }
-
-    @Override
-    public void store(Record record) throws NoStoreDelegateException {
-        bufferStorage.addOrReplace(record);
-    }
-
-    /**
-     * When source fails to provide all records, we need to decide what to do with the buffer.
-     * We might fail because of a network partition, or because of a concurrent modification of a
-     * collection, or because we ran out of time fetching records, or some other reason.
-     *
-     * Either way we do not clear the buffer in any error scenario, but rather
-     * allow it to be re-filled, replacing existing records with their newer versions if necessary.
-     *
-     * If a collection has been modified, affected records' last-modified timestamps will be bumped,
-     * and we will receive those records during the next sync. If we already have them in our buffer,
-     * we replace our now-old copy. Otherwise, they are new records and we just append them.
-     *
-     * Incoming records are mapped to existing ones via GUIDs.
-     */
-    @Override
-    public void storeIncomplete() {
-        bufferStorage.flush();
-    }
-
-    @Override
-    public void storeFlush() {
-        bufferStorage.flush();
-    }
-
-    @Override
-    public void storeDone() {
-        bufferStorage.flush();
-
-        // Determine if we have enough time to merge the buffer data.
-        // If we don't have enough time now, we keep our buffer and try again later.
-        if (!mayProceedToMergeBuffer()) {
-            super.abort();
-            storeDelegate.deferredStoreDelegate(storeWorkQueue).onStoreFailed(new SyncDeadlineReachedException());
-            return;
-        }
-
-        doMergeBuffer();
-    }
-
-    @VisibleForTesting
-    /* package-local */ void doMergeBuffer() {
-        final Collection<Record> bufferData = bufferStorage.all();
-
-        // Trivial case of an empty buffer.
-        if (bufferData.isEmpty()) {
-            super.storeDone();
-            return;
-        }
-
-        // Let session handle actual storing of records as it pleases.
-        // See Bug 1332094 which is concerned with allowing merge to proceed transactionally.
-        try {
-            for (Record record : bufferData) {
-                this.inner.store(record);
-            }
-        } catch (NoStoreDelegateException e) {
-            // At this point we should have a store delegate set on the session, so this won't happen.
-        }
-
-        // Let session know that there are no more records to store.
-        super.storeDone();
-    }
-
-    /**
-     * Session abnormally aborted. This doesn't mean our so-far buffered data is invalid.
-     * Clean up after ourselves, if there's anything to clean up.
-     */
-    @Override
-    public void abort() {
-        bufferStorage.flush();
-        super.abort();
-    }
-
-    @Override
-    public void setStoreDelegate(RepositorySessionStoreDelegate delegate) {
-        inner.setStoreDelegate(delegate);
-        this.storeDelegate = delegate;
-    }
-
-    private boolean mayProceedToMergeBuffer() {
-        // If our buffer storage is not persistent, disallowing merging after buffer has been filled
-        // means throwing away records only to re-download them later.
-        // In this case allow merge to proceed even if we're past the deadline.
-        if (!bufferStorage.isPersistent()) {
-            return true;
-        }
-
-        // While actual runtime of a merge operation is a function of record type, buffer size, etc.,
-        // let's do a simple thing for now and say that we may proceed if we have couple of minutes
-        // of runtime left. That surely is enough, right?
-        final long timeLeftMillis = syncDeadlineMillis - SystemClock.elapsedRealtime();
-        return timeLeftMillis > 1000 * 60 * 2;
-    }
-}
--- a/mobile/android/services/src/main/java/org/mozilla/gecko/sync/middleware/Crypto5MiddlewareRepositorySession.java
+++ b/mobile/android/services/src/main/java/org/mozilla/gecko/sync/middleware/Crypto5MiddlewareRepositorySession.java
@@ -110,21 +110,16 @@ public class Crypto5MiddlewareRepository
     }
 
     @Override
     public void onFetchCompleted() {
       next.onFetchCompleted();
     }
 
     @Override
-    public void onBatchCompleted() {
-      next.onBatchCompleted();
-    }
-
-    @Override
     public RepositorySessionFetchRecordsDelegate deferredFetchDelegate(ExecutorService executor) {
       // Synchronously perform *our* work, passing through appropriately.
       RepositorySessionFetchRecordsDelegate deferredNext = next.deferredFetchDelegate(executor);
       return new DecryptingTransformingFetchDelegate(deferredNext, keyBundle, recordFactory);
     }
   }
 
   private DecryptingTransformingFetchDelegate makeUnwrappingDelegate(RepositorySessionFetchRecordsDelegate inner) {
deleted file mode 100644
--- a/mobile/android/services/src/main/java/org/mozilla/gecko/sync/middleware/storage/BufferStorage.java
+++ /dev/null
@@ -1,31 +0,0 @@
-/* This Source Code Form is subject to the terms of the Mozilla Public
- * License, v. 2.0. If a copy of the MPL was not distributed with this
- * file, You can obtain one at http://mozilla.org/MPL/2.0/. */
-
-package org.mozilla.gecko.sync.middleware.storage;
-
-import org.mozilla.gecko.sync.repositories.domain.Record;
-
-import java.util.Collection;
-
-/**
- * A contract between BufferingMiddleware and specific storage implementations.
- *
- * @author grisha
- */
-public interface BufferStorage {
-    // Returns all of the records currently present in the buffer.
-    Collection<Record> all();
-
-    // Implementations are responsible to ensure that any incoming records with duplicate GUIDs replace
-    // what's already present in the storage layer.
-    // NB: For a database-backed storage, "replace" happens at a transaction level.
-    void addOrReplace(Record record);
-
-    // For database-backed implementations, commits any records that came in up to this point.
-    void flush();
-
-    void clear();
-
-    boolean isPersistent();
-}
deleted file mode 100644
--- a/mobile/android/services/src/main/java/org/mozilla/gecko/sync/middleware/storage/MemoryBufferStorage.java
+++ /dev/null
@@ -1,51 +0,0 @@
-/* This Source Code Form is subject to the terms of the Mozilla Public
- * License, v. 2.0. If a copy of the MPL was not distributed with this
- * file, You can obtain one at http://mozilla.org/MPL/2.0/. */
-
-package org.mozilla.gecko.sync.middleware.storage;
-
-import org.mozilla.gecko.sync.repositories.domain.Record;
-
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.Map;
-
-/**
- * A trivial, memory-backed, transient implementation of a BufferStorage.
- * Its intended use is to buffer syncing of small collections.
- * Thread-safe.
- *
- * @author grisha
- */
-public class MemoryBufferStorage implements BufferStorage {
-    private final Map<String, Record> recordBuffer = Collections.synchronizedMap(new HashMap<String, Record>());
-
-    @Override
-    public boolean isPersistent() {
-        return false;
-    }
-
-    @Override
-    public Collection<Record> all() {
-        synchronized (recordBuffer) {
-            return new ArrayList<>(recordBuffer.values());
-        }
-    }
-
-    @Override
-    public void addOrReplace(Record record) {
-        recordBuffer.put(record.guid, record);
-    }
-
-    @Override
-    public void flush() {
-        // This is a no-op; flush intended for database-backed stores.
-    }
-
-    @Override
-    public void clear() {
-        recordBuffer.clear();
-    }
-}
--- a/mobile/android/services/src/main/java/org/mozilla/gecko/sync/repositories/RepositorySession.java
+++ b/mobile/android/services/src/main/java/org/mozilla/gecko/sync/repositories/RepositorySession.java
@@ -179,23 +179,16 @@ public abstract class RepositorySession 
         setLastStoreTimestamp(end);
         storeDelegate.onStoreCompleted();
       }
     };
     storeWorkQueue.execute(command);
   }
 
   /**
-   * Indicates that a number of records have been stored, more are still to come but after some time,
-   * and now would be a good time to flush records and perform any other similar operations.
-   */
-  public void storeFlush() {
-  }
-
-  /**
    * Indicates that a flow of records have been completed.
    */
   public void performCleanup() {
   }
 
   public abstract void wipe(RepositorySessionWipeDelegate delegate);
 
   /**
--- a/mobile/android/services/src/main/java/org/mozilla/gecko/sync/repositories/StoreTrackingRepositorySession.java
+++ b/mobile/android/services/src/main/java/org/mozilla/gecko/sync/repositories/StoreTrackingRepositorySession.java
@@ -56,19 +56,16 @@ public abstract class StoreTrackingRepos
       return;
     }
     for (String guid : guids) {
       this.storeTracker.untrackStoredForExclusion(guid);
     }
   }
 
   public void trackRecord(Record record) {
-
-    Logger.debug(LOG_TAG, "Tracking record " + record.guid +
-                           " (" + record.lastModified + ") to avoid re-upload.");
     // Future: we care about the timestamp…
     trackGUID(record.guid);
   }
 
   protected void untrackRecord(Record record) {
     Logger.debug(LOG_TAG, "Un-tracking record " + record.guid + ".");
     untrackGUID(record.guid);
   }
--- a/mobile/android/services/src/main/java/org/mozilla/gecko/sync/repositories/VersioningDelegateHelper.java
+++ b/mobile/android/services/src/main/java/org/mozilla/gecko/sync/repositories/VersioningDelegateHelper.java
@@ -147,21 +147,16 @@ public class VersioningDelegateHelper {
         }
 
         @Override
         public void onFetchCompleted() {
             this.inner.onFetchCompleted();
         }
 
         @Override
-        public void onBatchCompleted() {
-            this.inner.onBatchCompleted();
-        }
-
-        @Override
         public RepositorySessionFetchRecordsDelegate deferredFetchDelegate(ExecutorService executor) {
             return this.inner.deferredFetchDelegate(executor);
         }
     }
 
     private class VersionedStoreDelegate implements RepositorySessionStoreDelegate {
         private final RepositorySessionStoreDelegate inner;
 
--- a/mobile/android/services/src/main/java/org/mozilla/gecko/sync/repositories/android/BookmarksValidationRepository.java
+++ b/mobile/android/services/src/main/java/org/mozilla/gecko/sync/repositories/android/BookmarksValidationRepository.java
@@ -149,19 +149,16 @@ public class BookmarksValidationReposito
 
                 @Override
                 public void onFetchCompleted() {
                     validateForTelemetry();
                     delegate.onFetchCompleted();
                 }
 
                 @Override
-                public void onBatchCompleted() {}
-
-                @Override
                 public RepositorySessionFetchRecordsDelegate deferredFetchDelegate(ExecutorService executor) {
                     return null;
                 }
             });
         }
 
         @Override
         public void fetch(String[] guids, RepositorySessionFetchRecordsDelegate delegate) throws InactiveSessionException {
--- a/mobile/android/services/src/main/java/org/mozilla/gecko/sync/repositories/delegates/DeferredRepositorySessionFetchRecordsDelegate.java
+++ b/mobile/android/services/src/main/java/org/mozilla/gecko/sync/repositories/delegates/DeferredRepositorySessionFetchRecordsDelegate.java
@@ -42,25 +42,15 @@ public class DeferredRepositorySessionFe
       @Override
       public void run() {
         inner.onFetchCompleted();
       }
     });
   }
 
   @Override
-  public void onBatchCompleted() {
-    executor.execute(new Runnable() {
-      @Override
-      public void run() {
-        inner.onBatchCompleted();
-      }
-    });
-  }
-
-  @Override
   public RepositorySessionFetchRecordsDelegate deferredFetchDelegate(ExecutorService newExecutor) {
     if (newExecutor == executor) {
       return this;
     }
     throw new IllegalArgumentException("Can't re-defer this delegate.");
   }
 }
--- a/mobile/android/services/src/main/java/org/mozilla/gecko/sync/repositories/delegates/RepositorySessionFetchRecordsDelegate.java
+++ b/mobile/android/services/src/main/java/org/mozilla/gecko/sync/repositories/delegates/RepositorySessionFetchRecordsDelegate.java
@@ -9,25 +9,13 @@ import java.util.concurrent.ExecutorServ
 import org.mozilla.gecko.sync.repositories.domain.Record;
 
 public interface RepositorySessionFetchRecordsDelegate {
   void onFetchFailed(Exception ex);
   void onFetchedRecord(Record record);
 
   /**
    * Called when all records in this fetch have been returned.
-   *
-   * @param fetchEnd
-   *        A millisecond-resolution timestamp indicating the *remote* timestamp
-   *        at the end of the range of records. Usually this is the timestamp at
-   *        which the request was received.
-   *        E.g., the (normalized) value of the X-Weave-Timestamp header.
    */
   void onFetchCompleted();
 
-  /**
-   * Called when a number of records have been returned but more are still expected to come,
-   * possibly after a certain pause.
-   */
-  void onBatchCompleted();
-
   RepositorySessionFetchRecordsDelegate deferredFetchDelegate(ExecutorService executor);
 }
--- a/mobile/android/services/src/main/java/org/mozilla/gecko/sync/repositories/downloaders/BatchingDownloader.java
+++ b/mobile/android/services/src/main/java/org/mozilla/gecko/sync/repositories/downloaders/BatchingDownloader.java
@@ -242,29 +242,16 @@ public class BatchingDownloader {
                 Logger.warn(LOG_TAG, "Failed to update resume context while processing a batch.");
             }
         } else {
             if (!BatchingDownloaderController.setInitialResumeContextAndCommit(this.stateProvider, offset, newer, sort)) {
                 Logger.warn(LOG_TAG, "Failed to set initial resume context while processing a batch.");
             }
         }
 
-        // We need to make another batching request!
-        // Let the delegate know that a batch fetch just completed before we proceed.
-        // Beware that while this operation will run after every call to onFetchedRecord returned,
-        // it's not guaranteed that the 'sink' session actually processed all of the fetched records.
-        // See Bug https://bugzilla.mozilla.org/show_bug.cgi?id=1351673#c28 for details.
-        runTaskOnQueue(new Runnable() {
-            @Override
-            public void run() {
-                Logger.debug(LOG_TAG, "Running onBatchCompleted.");
-                fetchRecordsDelegate.onBatchCompleted();
-            }
-        });
-
         // Should we proceed, however? Do we have enough time?
         if (!mayProceedWithBatching(fetchDeadline)) {
             this.handleFetchFailed(fetchRecordsDelegate, new SyncDeadlineReachedException());
             return;
         }
 
         // Create and execute new batch request.
         try {
--- a/mobile/android/services/src/main/java/org/mozilla/gecko/sync/stage/BookmarksServerSyncStage.java
+++ b/mobile/android/services/src/main/java/org/mozilla/gecko/sync/stage/BookmarksServerSyncStage.java
@@ -4,18 +4,16 @@
 
 package org.mozilla.gecko.sync.stage;
 
 import android.net.Uri;
 
 import java.net.URISyntaxException;
 
 import org.mozilla.gecko.sync.MetaGlobalException;
-import org.mozilla.gecko.sync.middleware.BufferingMiddlewareRepository;
-import org.mozilla.gecko.sync.middleware.storage.MemoryBufferStorage;
 import org.mozilla.gecko.sync.repositories.ConfigurableServer15Repository;
 import org.mozilla.gecko.sync.repositories.RecordFactory;
 import org.mozilla.gecko.sync.repositories.Repository;
 import org.mozilla.gecko.sync.repositories.android.BrowserContractHelpers;
 import org.mozilla.gecko.sync.repositories.android.BookmarksRepository;
 import org.mozilla.gecko.sync.repositories.domain.BookmarkRecordFactory;
 import org.mozilla.gecko.sync.repositories.domain.VersionConstants;
 
@@ -84,21 +82,17 @@ public class BookmarksServerSyncStage ex
             getRepositoryStateProvider(),
             false,
             true
     );
   }
 
   @Override
   protected Repository getLocalRepository() {
-    return new BufferingMiddlewareRepository(
-            session.getSyncDeadline(),
-            new MemoryBufferStorage(),
-            new BookmarksRepository()
-    );
+    return new BookmarksRepository();
   }
 
   @Override
   protected RecordFactory getRecordFactory() {
     return new BookmarkRecordFactory();
   }
 
   @Override
--- a/mobile/android/services/src/main/java/org/mozilla/gecko/sync/stage/FennecTabsServerSyncStage.java
+++ b/mobile/android/services/src/main/java/org/mozilla/gecko/sync/stage/FennecTabsServerSyncStage.java
@@ -1,16 +1,14 @@
 /* This Source Code Form is subject to the terms of the Mozilla Public
  * License, v. 2.0. If a copy of the MPL was not distributed with this
  * file, You can obtain one at http://mozilla.org/MPL/2.0/. */
 
 package org.mozilla.gecko.sync.stage;
 
-import org.mozilla.gecko.sync.middleware.BufferingMiddlewareRepository;
-import org.mozilla.gecko.sync.middleware.storage.MemoryBufferStorage;
 import org.mozilla.gecko.sync.repositories.RecordFactory;
 import org.mozilla.gecko.sync.repositories.Repository;
 import org.mozilla.gecko.sync.repositories.android.FennecTabsRepository;
 import org.mozilla.gecko.sync.repositories.domain.TabsRecordFactory;
 import org.mozilla.gecko.sync.repositories.domain.VersionConstants;
 
 public class FennecTabsServerSyncStage extends ServerSyncStage {
   private static final String COLLECTION = "tabs";
@@ -27,20 +25,16 @@ public class FennecTabsServerSyncStage e
 
   @Override
   public Integer getStorageVersion() {
     return VersionConstants.TABS_ENGINE_VERSION;
   }
 
   @Override
   protected Repository getLocalRepository() {
-    return new BufferingMiddlewareRepository(
-            session.getSyncDeadline(),
-            new MemoryBufferStorage(),
-            new FennecTabsRepository(session.getClientsDelegate())
-    );
+    return new FennecTabsRepository(session.getClientsDelegate());
   }
 
   @Override
   protected RecordFactory getRecordFactory() {
     return new TabsRecordFactory();
   }
 }
--- a/mobile/android/services/src/main/java/org/mozilla/gecko/sync/stage/FormHistoryServerSyncStage.java
+++ b/mobile/android/services/src/main/java/org/mozilla/gecko/sync/stage/FormHistoryServerSyncStage.java
@@ -2,20 +2,17 @@
  * License, v. 2.0. If a copy of the MPL was not distributed with this
  * file, You can obtain one at http://mozilla.org/MPL/2.0/. */
 
 package org.mozilla.gecko.sync.stage;
 
 import java.net.URISyntaxException;
 
 import org.mozilla.gecko.sync.CryptoRecord;
-import org.mozilla.gecko.sync.middleware.BufferingMiddlewareRepository;
-import org.mozilla.gecko.sync.middleware.storage.MemoryBufferStorage;
 import org.mozilla.gecko.sync.repositories.ConfigurableServer15Repository;
-import org.mozilla.gecko.sync.repositories.NonPersistentRepositoryStateProvider;
 import org.mozilla.gecko.sync.repositories.RecordFactory;
 import org.mozilla.gecko.sync.repositories.Repository;
 import org.mozilla.gecko.sync.repositories.android.FormHistoryRepositorySession;
 import org.mozilla.gecko.sync.repositories.domain.FormHistoryRecord;
 import org.mozilla.gecko.sync.repositories.domain.Record;
 import org.mozilla.gecko.sync.repositories.domain.VersionConstants;
 
 public class FormHistoryServerSyncStage extends ServerSyncStage {
@@ -78,21 +75,17 @@ public class FormHistoryServerSyncStage 
             getRepositoryStateProvider(),
             false,
             false
     );
   }
 
   @Override
   protected Repository getLocalRepository() {
-    return new BufferingMiddlewareRepository(
-            session.getSyncDeadline(),
-            new MemoryBufferStorage(),
-            new FormHistoryRepositorySession.FormHistoryRepository()
-    );
+    return new FormHistoryRepositorySession.FormHistoryRepository();
   }
 
   public static final class FormHistoryRecordFactory extends RecordFactory {
 
     @Override
     public Record createRecord(Record record) {
       FormHistoryRecord r = new FormHistoryRecord();
       r.initFromEnvelope((CryptoRecord) record);
--- a/mobile/android/services/src/main/java/org/mozilla/gecko/sync/stage/HistoryServerSyncStage.java
+++ b/mobile/android/services/src/main/java/org/mozilla/gecko/sync/stage/HistoryServerSyncStage.java
@@ -11,17 +11,25 @@ import org.mozilla.gecko.sync.repositori
 import org.mozilla.gecko.sync.repositories.PersistentRepositoryStateProvider;
 import org.mozilla.gecko.sync.repositories.RecordFactory;
 import org.mozilla.gecko.sync.repositories.Repository;
 import org.mozilla.gecko.sync.repositories.RepositoryStateProvider;
 import org.mozilla.gecko.sync.repositories.android.HistoryRepository;
 import org.mozilla.gecko.sync.repositories.domain.HistoryRecordFactory;
 import org.mozilla.gecko.sync.repositories.domain.VersionConstants;
 
-public class HistoryServerSyncStage extends ServerSyncStage {
+/**
+ *  History records are not buffered.
+    We buy little from buffering it from data integrity point of view, but we gain a lot by
+    not buffering:
+    - roughly half the memory footprint, important when dealing with lots of history
+    - ability to resume downloads, since records are stored as they're fetched, and we can
+    -- maintain a "high watermark"
+ */
+public class HistoryServerSyncStage extends NonBufferingServerSyncStage {
   protected static final String LOG_TAG = "HistoryStage";
 
   // Eventually this kind of sync stage will be data-driven,
   // and all this hard-coding can go away.
   private static final String HISTORY_SORT = "oldest";
   private static final long HISTORY_BATCH_LIMIT = 500;
 
   @Override
new file mode 100644
--- /dev/null
+++ b/mobile/android/services/src/main/java/org/mozilla/gecko/sync/stage/NonBufferingServerSyncStage.java
@@ -0,0 +1,15 @@
+/* This Source Code Form is subject to the terms of the Mozilla Public
+ * License, v. 2.0. If a copy of the MPL was not distributed with this
+ * file, You can obtain one at http://mozilla.org/MPL/2.0/. */
+
+package org.mozilla.gecko.sync.stage;
+
+import org.mozilla.gecko.sync.synchronizer.NonBufferingSynchronizer;
+import org.mozilla.gecko.sync.synchronizer.Synchronizer;
+
+public abstract class NonBufferingServerSyncStage extends ServerSyncStage {
+    @Override
+    protected Synchronizer getSynchronizer() {
+        return new NonBufferingSynchronizer();
+    }
+}
--- a/mobile/android/services/src/main/java/org/mozilla/gecko/sync/stage/PasswordsServerSyncStage.java
+++ b/mobile/android/services/src/main/java/org/mozilla/gecko/sync/stage/PasswordsServerSyncStage.java
@@ -1,16 +1,14 @@
 /* This Source Code Form is subject to the terms of the Mozilla Public
  * License, v. 2.0. If a copy of the MPL was not distributed with this
  * file, You can obtain one at http://mozilla.org/MPL/2.0/. */
 
 package org.mozilla.gecko.sync.stage;
 
-import org.mozilla.gecko.sync.middleware.BufferingMiddlewareRepository;
-import org.mozilla.gecko.sync.middleware.storage.MemoryBufferStorage;
 import org.mozilla.gecko.sync.repositories.RecordFactory;
 import org.mozilla.gecko.sync.repositories.Repository;
 import org.mozilla.gecko.sync.repositories.android.PasswordsRepositorySession;
 import org.mozilla.gecko.sync.repositories.domain.PasswordRecordFactory;
 import org.mozilla.gecko.sync.repositories.domain.VersionConstants;
 
 public class PasswordsServerSyncStage extends ServerSyncStage {
   @Override
@@ -25,20 +23,16 @@ public class PasswordsServerSyncStage ex
 
   @Override
   public Integer getStorageVersion() {
     return VersionConstants.PASSWORDS_ENGINE_VERSION;
   }
 
   @Override
   protected Repository getLocalRepository() {
-    return new BufferingMiddlewareRepository(
-            session.getSyncDeadline(),
-            new MemoryBufferStorage(),
-            new PasswordsRepositorySession.PasswordsRepository()
-    );
+    return new PasswordsRepositorySession.PasswordsRepository();
   }
 
   @Override
   protected RecordFactory getRecordFactory() {
     return new PasswordRecordFactory();
   }
 }
--- a/mobile/android/services/src/main/java/org/mozilla/gecko/sync/stage/RecentHistoryServerSyncStage.java
+++ b/mobile/android/services/src/main/java/org/mozilla/gecko/sync/stage/RecentHistoryServerSyncStage.java
@@ -2,18 +2,16 @@
  * License, v. 2.0. If a copy of the MPL was not distributed with this
  * file, You can obtain one at http://mozilla.org/MPL/2.0/. */
 
 package org.mozilla.gecko.sync.stage;
 
 import org.mozilla.gecko.sync.MetaGlobalException;
 import org.mozilla.gecko.sync.NonObjectJSONException;
 import org.mozilla.gecko.sync.SynchronizerConfiguration;
-import org.mozilla.gecko.sync.middleware.BufferingMiddlewareRepository;
-import org.mozilla.gecko.sync.middleware.storage.MemoryBufferStorage;
 import org.mozilla.gecko.sync.repositories.ConfigurableServer15Repository;
 import org.mozilla.gecko.sync.repositories.NonPersistentRepositoryStateProvider;
 import org.mozilla.gecko.sync.repositories.Repository;
 import org.mozilla.gecko.sync.repositories.RepositoryStateProvider;
 import org.mozilla.gecko.sync.repositories.android.HistoryRepository;
 
 import java.io.IOException;
 import java.net.URISyntaxException;
@@ -69,21 +67,17 @@ public class RecentHistoryServerSyncStag
      */
     @Override
     protected HighWaterMark getAllowedToUseHighWaterMark() {
         return HighWaterMark.Disabled;
     }
 
     @Override
     protected Repository getLocalRepository() {
-        return new BufferingMiddlewareRepository(
-                session.getSyncDeadline(),
-                new MemoryBufferStorage(),
-                new HistoryRepository()
-        );
+        return new HistoryRepository();
     }
 
     @Override
     protected Repository getRemoteRepository() throws URISyntaxException {
         return new ConfigurableServer15Repository(
                 getCollection(),
                 session.getSyncDeadline(),
                 session.config.storageURL(),
--- a/mobile/android/services/src/main/java/org/mozilla/gecko/sync/stage/ServerSyncStage.java
+++ b/mobile/android/services/src/main/java/org/mozilla/gecko/sync/stage/ServerSyncStage.java
@@ -53,18 +53,18 @@ import java.util.concurrent.ExecutorServ
  *
  * @author rnewman
  *
  */
 public abstract class ServerSyncStage extends AbstractSessionManagingSyncStage implements SynchronizerDelegate {
 
   protected static final String LOG_TAG = "ServerSyncStage";
 
-  protected long stageStartTimestamp = -1;
-  protected long stageCompleteTimestamp = -1;
+  private long stageStartTimestamp = -1;
+  private long stageCompleteTimestamp = -1;
 
   /**
    * Poor-man's boolean typing.
    * These enums are used to configure {@link org.mozilla.gecko.sync.repositories.ConfigurableServer15Repository}.
    */
   public enum HighWaterMark {
     Enabled,
     Disabled
@@ -118,31 +118,31 @@ public abstract class ServerSyncStage ex
    * to be updated.
    *
    * @param enabledInMetaGlobal
    *          boolean of engine sync state in meta/global
    * @throws MetaGlobalException
    *           if engine sync state has been changed in Sync Settings, with new
    *           engine sync state.
    */
-  protected void checkAndUpdateUserSelectedEngines(boolean enabledInMetaGlobal) throws MetaGlobalException {
+  private void checkAndUpdateUserSelectedEngines(boolean enabledInMetaGlobal) throws MetaGlobalException {
     Map<String, Boolean> selectedEngines = session.config.userSelectedEngines;
     String thisEngine = this.getEngineName();
 
     if (selectedEngines != null && selectedEngines.containsKey(thisEngine)) {
       boolean enabledInSelection = selectedEngines.get(thisEngine);
       if (enabledInMetaGlobal != enabledInSelection) {
         // Engine enable state has been changed by the user.
         Logger.debug(LOG_TAG, "Engine state has been changed by user. Throwing exception.");
         throw new MetaGlobalException.MetaGlobalEngineStateChangedException(enabledInSelection);
       }
     }
   }
 
-  protected EngineSettings getEngineSettings() throws NonObjectJSONException, IOException {
+  private EngineSettings getEngineSettings() throws NonObjectJSONException, IOException {
     Integer version = getStorageVersion();
     if (version == null) {
       Logger.warn(LOG_TAG, "null storage version for " + this + "; using version 0.");
       version = 0;
     }
 
     SynchronizerConfiguration config = this.getConfig();
     if (config == null) {
@@ -209,39 +209,43 @@ public abstract class ServerSyncStage ex
     cryptoRepo.recordFactory = getRecordFactory();
     return cryptoRepo;
   }
 
   protected String bundlePrefix() {
     return this.getCollection() + ".";
   }
 
-  protected String statePreferencesPrefix() {
+  /* package-private */ String statePreferencesPrefix() {
     return this.getCollection() + ".state.";
   }
 
   protected SynchronizerConfiguration getConfig() throws NonObjectJSONException, IOException {
     return new SynchronizerConfiguration(session.config.getBranch(bundlePrefix()));
   }
 
-  protected void persistConfig(SynchronizerConfiguration synchronizerConfiguration) {
+  private void persistConfig(SynchronizerConfiguration synchronizerConfiguration) {
     synchronizerConfiguration.persist(session.config.getBranch(bundlePrefix()));
   }
 
-  public Synchronizer getConfiguredSynchronizer(GlobalSession session) throws NoCollectionKeysSetException, URISyntaxException, NonObjectJSONException, IOException {
+  private Synchronizer getConfiguredSynchronizer() throws NoCollectionKeysSetException, URISyntaxException, NonObjectJSONException, IOException {
     Repository remote = wrappedServerRepo();
 
-    Synchronizer synchronizer = new Synchronizer();
+    Synchronizer synchronizer = getSynchronizer();
     synchronizer.repositoryA = remote;
     synchronizer.repositoryB = this.getLocalRepository();
     synchronizer.load(getConfig());
 
     return synchronizer;
   }
 
+  protected Synchronizer getSynchronizer() {
+    return new Synchronizer();
+  }
+
   /**
    * Reset timestamps and any repository state.
    */
   @Override
   protected void resetLocal() {
     resetLocalWithSyncID(null);
     if (!getRepositoryStateProvider().resetAndCommit()) {
       // At the very least, we can log this.
@@ -254,17 +258,17 @@ public abstract class ServerSyncStage ex
       Logger.warn(LOG_TAG, "Failed to reset repository state");
     }
   }
 
   /**
    * Reset timestamps and possibly set syncID.
    * @param syncID if non-null, new syncID to persist.
    */
-  protected void resetLocalWithSyncID(String syncID) {
+  private void resetLocalWithSyncID(String syncID) {
     // Clear both timestamps.
     SynchronizerConfiguration config;
     try {
       config = this.getConfig();
     } catch (Exception e) {
       Logger.warn(LOG_TAG, "Unable to reset " + this + ": fetching config failed.", e);
       return;
     }
@@ -276,18 +280,18 @@ public abstract class ServerSyncStage ex
     config.localBundle.setTimestamp(0L);
     config.remoteBundle.setTimestamp(0L);
     persistConfig(config);
     Logger.info(LOG_TAG, "Reset timestamps for " + this);
   }
 
   // Not thread-safe. Use with caution.
   private static final class WipeWaiter {
-    public boolean sessionSucceeded = true;
-    public boolean wipeSucceeded = true;
+    /* package-private */ boolean sessionSucceeded = true;
+    /* package-private */ boolean wipeSucceeded = true;
     public Exception error;
 
     public void notify(Exception e, boolean sessionSucceeded) {
       this.sessionSucceeded = sessionSucceeded;
       this.wipeSucceeded = false;
       this.error = e;
       this.notify();
     }
@@ -405,17 +409,17 @@ public abstract class ServerSyncStage ex
     }
 
     Logger.info(LOG_TAG, "Wiping stage complete.");
   }
 
   /**
    * Asynchronously wipe collection on server.
    */
-  protected void wipeServer(final AuthHeaderProvider authHeaderProvider, final WipeServerDelegate wipeDelegate) {
+  private void wipeServer(final AuthHeaderProvider authHeaderProvider, final WipeServerDelegate wipeDelegate) {
     SyncStorageRequest request;
 
     try {
       request = new SyncStorageRequest(session.config.collectionURI(getCollection()));
     } catch (URISyntaxException ex) {
       Logger.warn(LOG_TAG, "Invalid URI in wipeServer.");
       wipeDelegate.onWipeFailed(ex);
       return;
@@ -459,17 +463,17 @@ public abstract class ServerSyncStage ex
     request.delete();
   }
 
   /**
    * Synchronously wipe the server.
    * <p>
    * Logs and re-throws an exception on failure.
    */
-  public void wipeServer(final GlobalSession session) throws Exception {
+  private void wipeServer(final GlobalSession session) throws Exception {
     this.session = session;
 
     final WipeWaiter monitor = new WipeWaiter();
 
     final Runnable doWipe = new Runnable() {
       @Override
       public void run() {
         wipeServer(session.getAuthHeaderProvider(), new WipeServerDelegate() {
@@ -575,17 +579,17 @@ public abstract class ServerSyncStage ex
       }
     } catch (MetaGlobalException e) {
       session.abort(e, "Inappropriate meta/global; refusing to execute " + name + " stage.");
       return;
     }
 
     Synchronizer synchronizer;
     try {
-      synchronizer = this.getConfiguredSynchronizer(session);
+      synchronizer = this.getConfiguredSynchronizer();
     } catch (NoCollectionKeysSetException e) {
       session.abort(e, "No CollectionKeys.");
       return;
     } catch (URISyntaxException e) {
       session.abort(e, "Invalid URI syntax for server repository.");
       return;
     } catch (NonObjectJSONException | IOException e) {
       session.abort(e, "Invalid persisted JSON for config.");
@@ -597,17 +601,17 @@ public abstract class ServerSyncStage ex
     Logger.debug(LOG_TAG, "Reached end of execute.");
   }
 
   /**
    * Express the duration taken by this stage as a String, like "0.56 seconds".
    *
    * @return formatted string.
    */
-  protected String getStageDurationString() {
+  private String getStageDurationString() {
     return Utils.formatDuration(stageStartTimestamp, stageCompleteTimestamp);
   }
 
   /**
    * We synced this engine!  Persist timestamps and advance the session.
    *
    * @param synchronizer the <code>Synchronizer</code> that succeeded.
    */
deleted file mode 100644
--- a/mobile/android/services/src/main/java/org/mozilla/gecko/sync/synchronizer/ConcurrentRecordConsumer.java
+++ /dev/null
@@ -1,126 +0,0 @@
-/* This Source Code Form is subject to the terms of the Mozilla Public
- * License, v. 2.0. If a copy of the MPL was not distributed with this
- * file, You can obtain one at http://mozilla.org/MPL/2.0/. */
-
-package org.mozilla.gecko.sync.synchronizer;
-
-import org.mozilla.gecko.background.common.log.Logger;
-import org.mozilla.gecko.sync.repositories.domain.Record;
-
-/**
- * Consume records from a queue inside a RecordsChannel, as fast as we can.
- * TODO: rewrite this in terms of an ExecutorService and a CompletionService.
- * See Bug 713483.
- *
- * @author rnewman
- *
- */
-class ConcurrentRecordConsumer extends RecordConsumer {
-  private static final String LOG_TAG = "CRecordConsumer";
-
-  /**
-   * When this is true and all records have been processed, the consumer
-   * will notify its delegate.
-   */
-  protected boolean allRecordsQueued = false;
-  private long counter = 0;
-
-  public ConcurrentRecordConsumer(RecordsConsumerDelegate delegate) {
-    this.delegate = delegate;
-  }
-
-  private final Object monitor = new Object();
-  @Override
-  public void doNotify() {
-    synchronized (monitor) {
-      monitor.notify();
-    }
-  }
-
-  @Override
-  public void queueFilled() {
-    Logger.debug(LOG_TAG, "Queue filled.");
-    synchronized (monitor) {
-      this.allRecordsQueued = true;
-      monitor.notify();
-    }
-  }
-
-  @Override
-  public void halt() {
-    synchronized (monitor) {
-      this.stopImmediately = true;
-      monitor.notify();
-    }
-  }
-
-  private final Object countMonitor = new Object();
-  @Override
-  public void stored() {
-    Logger.trace(LOG_TAG, "Record stored. Notifying.");
-    synchronized (countMonitor) {
-      counter++;
-    }
-  }
-
-  private void consumerIsDone() {
-    Logger.debug(LOG_TAG, "Consumer is done. Processed " + counter + ((counter == 1) ? " record." : " records."));
-    if (allRecordsQueued) {
-      delegate.consumerIsDoneFull();
-    } else {
-      delegate.consumerIsDonePartial();
-    }
-  }
-
-  @Override
-  public void run() {
-    Record record;
-
-    while (true) {
-      // The queue is concurrent-safe.
-      while ((record = delegate.getQueue().poll()) != null) {
-        synchronized (monitor) {
-          Logger.trace(LOG_TAG, "run() took monitor.");
-          if (stopImmediately) {
-            Logger.debug(LOG_TAG, "Stopping immediately. Clearing queue.");
-            delegate.getQueue().clear();
-            Logger.debug(LOG_TAG, "Notifying consumer.");
-            consumerIsDone();
-            return;
-          }
-          Logger.debug(LOG_TAG, "run() dropped monitor.");
-        }
-
-        Logger.trace(LOG_TAG, "Storing record with guid " + record.guid + ".");
-        try {
-          delegate.store(record);
-        } catch (Exception e) {
-          // TODO: Bug 709371: track records that failed to apply.
-          Logger.error(LOG_TAG, "Caught error in store.", e);
-        }
-        Logger.trace(LOG_TAG, "Done with record.");
-      }
-      synchronized (monitor) {
-        Logger.trace(LOG_TAG, "run() took monitor.");
-
-        if (allRecordsQueued) {
-          Logger.debug(LOG_TAG, "Done with records and no more to come. Notifying consumerIsDone.");
-          consumerIsDone();
-          return;
-        }
-        if (stopImmediately) {
-          Logger.debug(LOG_TAG, "Done with records and told to stop immediately. Notifying consumerIsDone.");
-          consumerIsDone();
-          return;
-        }
-        try {
-          Logger.debug(LOG_TAG, "Not told to stop but no records. Waiting.");
-          monitor.wait(10000);
-        } catch (InterruptedException e) {
-          // TODO
-        }
-        Logger.trace(LOG_TAG, "run() dropped monitor.");
-      }
-    }
-  }
-}
new file mode 100644
--- /dev/null
+++ b/mobile/android/services/src/main/java/org/mozilla/gecko/sync/synchronizer/NonBufferingRecordsChannel.java
@@ -0,0 +1,59 @@
+/* This Source Code Form is subject to the terms of the Mozilla Public
+ * License, v. 2.0. If a copy of the MPL was not distributed with this
+ * file, You can obtain one at http://mozilla.org/MPL/2.0/. */
+
+package org.mozilla.gecko.sync.synchronizer;
+
+import org.mozilla.gecko.background.common.log.Logger;
+import org.mozilla.gecko.sync.repositories.NoStoreDelegateException;
+import org.mozilla.gecko.sync.repositories.RepositorySession;
+import org.mozilla.gecko.sync.repositories.domain.Record;
+
+/**
+ * Same as a regular RecordsChannel, except records aren't buffered and are stored when encountered.
+ */
+public class NonBufferingRecordsChannel extends RecordsChannel {
+    private static final String LOG_TAG = "NonBufferingRecordsChannel";
+
+    public NonBufferingRecordsChannel(RepositorySession source, RepositorySession sink, RecordsChannelDelegate delegate) {
+        super(source, sink, delegate);
+    }
+
+    @Override
+    public void onFetchedRecord(Record record) {
+        // Don't bother trying to store if we already failed.
+        if (fetchFailed.get()) {
+            return;
+        }
+
+        fetchedCount.incrementAndGet();
+        storeAttemptedCount.incrementAndGet();
+
+        try {
+            sink.store(record);
+        } catch (NoStoreDelegateException e) {
+            // Must not happen, bail out.
+            throw new IllegalStateException(e);
+        }
+    }
+
+    @Override
+    public void onFetchFailed(Exception ex) {
+        // Let non-buffered sessions clean-up their internal state.
+        sink.storeIncomplete();
+        super.onFetchFailed(ex);
+    }
+
+    @Override
+    public void onFetchCompleted() {
+        // If we already failed, the flow has been finished via onFetchFailed,
+        // yet our delegatee might have kept going.
+        if (fetchFailed.get()) {
+            return;
+        }
+
+        // Now we wait for onStoreComplete
+        Logger.trace(LOG_TAG, "onFetchCompleted. Calling storeDone.");
+        sink.storeDone();
+    }
+}
new file mode 100644
--- /dev/null
+++ b/mobile/android/services/src/main/java/org/mozilla/gecko/sync/synchronizer/NonBufferingSynchronizer.java
@@ -0,0 +1,12 @@
+/* This Source Code Form is subject to the terms of the Mozilla Public
+ * License, v. 2.0. If a copy of the MPL was not distributed with this
+ * file, You can obtain one at http://mozilla.org/MPL/2.0/. */
+
+package org.mozilla.gecko.sync.synchronizer;
+
+public class NonBufferingSynchronizer extends Synchronizer {
+    @Override
+    protected SynchronizerSession newSynchronizerSession() {
+        return new NonBufferingSynchronizerSession(this, this);
+    }
+}
new file mode 100644
--- /dev/null
+++ b/mobile/android/services/src/main/java/org/mozilla/gecko/sync/synchronizer/NonBufferingSynchronizerSession.java
@@ -0,0 +1,18 @@
+/* This Source Code Form is subject to the terms of the Mozilla Public
+ * License, v. 2.0. If a copy of the MPL was not distributed with this
+ * file, You can obtain one at http://mozilla.org/MPL/2.0/. */
+
+package org.mozilla.gecko.sync.synchronizer;
+
+import org.mozilla.gecko.sync.repositories.RepositorySession;
+
+public class NonBufferingSynchronizerSession extends SynchronizerSession {
+    /* package-private */ NonBufferingSynchronizerSession(Synchronizer synchronizer, SynchronizerSessionDelegate delegate) {
+        super(synchronizer, delegate);
+    }
+
+    @Override
+    protected RecordsChannel getRecordsChannel(RepositorySession sink, RepositorySession source, RecordsChannelDelegate delegate) {
+        return new NonBufferingRecordsChannel(sink, source, delegate);
+    }
+}
--- a/mobile/android/services/src/main/java/org/mozilla/gecko/sync/synchronizer/RecordsChannel.java
+++ b/mobile/android/services/src/main/java/org/mozilla/gecko/sync/synchronizer/RecordsChannel.java
@@ -3,34 +3,34 @@
  * file, You can obtain one at http://mozilla.org/MPL/2.0/. */
 
 package org.mozilla.gecko.sync.synchronizer;
 
 import android.support.annotation.NonNull;
 import android.support.annotation.Nullable;
 
 import java.util.ArrayList;
-import java.util.concurrent.ConcurrentLinkedQueue;
 import java.util.concurrent.ExecutorService;
+import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
 
 import org.mozilla.gecko.background.common.log.Logger;
 import org.mozilla.gecko.sync.ReflowIsNecessaryException;
 import org.mozilla.gecko.sync.SyncException;
-import org.mozilla.gecko.sync.ThreadPool;
 import org.mozilla.gecko.sync.repositories.InvalidSessionTransitionException;
 import org.mozilla.gecko.sync.repositories.NoStoreDelegateException;
 import org.mozilla.gecko.sync.repositories.RepositorySession;
 import org.mozilla.gecko.sync.repositories.delegates.DeferredRepositorySessionStoreDelegate;
 import org.mozilla.gecko.sync.repositories.delegates.RepositorySessionFetchRecordsDelegate;
 import org.mozilla.gecko.sync.repositories.delegates.RepositorySessionStoreDelegate;
 import org.mozilla.gecko.sync.repositories.domain.Record;
 
 /**
- * Pulls records from `source`, applying them to `sink`.
+ * Pulls records from `source`, buffering them and then applying them to `sink` when
+ * all records have been pulled.
  * Notifies its delegate of errors and completion.
  *
  * All stores (initiated by a fetch) must have been completed before storeDone
  * is invoked on the sink. This is to avoid the existing stored items being
  * considered as the total set, with onStoreCompleted being called when they're
  * done:
  *
  *   store(A) store(B)
@@ -42,128 +42,101 @@ import org.mozilla.gecko.sync.repositori
  *   Storing of C complete.
  *   We're done! Call onStoreCompleted.
  *   store(B) finishes... uh oh.
  *
  * In other words, storeDone must be gated on the synchronous invocation of every store.
  *
  * Similarly, we require that every store callback have returned before onStoreCompleted is invoked.
  *
- * This whole set of guarantees should be achievable thusly:
- *
- * * The fetch process must run in a single thread, and invoke store()
- *   synchronously. After processing every incoming record, storeDone is called,
- *   setting a flag.
- *   If the fetch cannot be implicitly queued, it must be explicitly queued.
- *   In this implementation, we assume that fetch callbacks are strictly ordered in this way.
- *
- * * The store process must be (implicitly or explicitly) queued. When the
- *   queue empties, the consumer checks the storeDone flag. If it's set, and the
- *   queue is exhausted, invoke onStoreCompleted.
- *
  * RecordsChannel exists to enforce this ordering of operations.
  *
  * @author rnewman
  *
  */
 public class RecordsChannel implements
   RepositorySessionFetchRecordsDelegate,
-  RepositorySessionStoreDelegate,
-  RecordsConsumerDelegate {
+  RepositorySessionStoreDelegate {
 
   private static final String LOG_TAG = "RecordsChannel";
   public RepositorySession source;
-  private RepositorySession sink;
+  /* package-private */ RepositorySession sink;
   private final RecordsChannelDelegate delegate;
 
   private volatile ReflowIsNecessaryException reflowException;
 
-  private final AtomicInteger fetchedCount = new AtomicInteger();
-  private final AtomicInteger fetchFailedCount = new AtomicInteger();
+  /* package-private */ final AtomicInteger fetchedCount = new AtomicInteger(0);
+  final AtomicBoolean fetchFailed = new AtomicBoolean(false);
+  private final AtomicBoolean storeFailed = new AtomicBoolean(false);
+
+  private ArrayList<Record> toProcess = new ArrayList<>();
 
   // Expected value relationships:
   // attempted = accepted + failed
   // reconciled <= accepted <= attempted
   // reconciled = accepted - `new`, where `new` is inferred.
-  private final AtomicInteger storeAttemptedCount = new AtomicInteger();
-  private final AtomicInteger storeAcceptedCount = new AtomicInteger();
-  private final AtomicInteger storeFailedCount = new AtomicInteger();
-  private final AtomicInteger storeReconciledCount = new AtomicInteger();
+  // TODO these likely don't need to be Atomic, see Bug 1441281.
+  final AtomicInteger storeAttemptedCount = new AtomicInteger(0);
+  private final AtomicInteger storeAcceptedCount = new AtomicInteger(0);
+  private final AtomicInteger storeFailedCount = new AtomicInteger(0);
+  private final AtomicInteger storeReconciledCount = new AtomicInteger(0);
 
   private final StoreBatchTracker storeTracker = new StoreBatchTracker();
 
   public RecordsChannel(RepositorySession source, RepositorySession sink, RecordsChannelDelegate delegate) {
-    this.source    = source;
-    this.sink      = sink;
-    this.delegate  = delegate;
-  }
-
-  /*
-   * We push fetched records into a queue.
-   * A separate thread is waiting for us to notify it of work to do.
-   * When we tell it to stop, it'll stop. We do that when the fetch
-   * is completed.
-   * When it stops, we tell the sink that there are no more records,
-   * and wait for the sink to tell us that storing is done.
-   * Then we notify our delegate of completion.
-   */
-  private RecordConsumer consumer;
-  private volatile boolean waitingForQueueDone = false;
-  private final ConcurrentLinkedQueue<Record> toProcess = new ConcurrentLinkedQueue<Record>();
-
-  @Override
-  public ConcurrentLinkedQueue<Record> getQueue() {
-    return toProcess;
+    this.source = source;
+    this.sink = sink;
+    this.delegate = delegate;
   }
 
   protected boolean isReady() {
     return source.isActive() && sink.isActive();
   }
 
   /**
    * Get the number of records fetched so far.
    *
    * @return number of fetches.
    */
-  public int getFetchCount() {
+  /* package-private */ int getFetchCount() {
     return fetchedCount.get();
   }
 
   /**
    * Get the number of fetch failures recorded so far.
    *
    * @return number of fetch failures.
    */
-  public int getFetchFailureCount() {
-    return fetchFailedCount.get();
+  public boolean didFetchFail() {
+    return fetchFailed.get();
   }
 
   /**
    * Get the number of store attempts (successful or not) so far.
    *
    * @return number of stores attempted.
    */
   public int getStoreAttemptedCount() {
     return storeAttemptedCount.get();
   }
 
-  public int getStoreAcceptedCount() {
+  /* package-private */ int getStoreAcceptedCount() {
     return storeAcceptedCount.get();
   }
 
   /**
    * Get the number of store failures recorded so far.
    *
    * @return number of store failures.
    */
   public int getStoreFailureCount() {
     return storeFailedCount.get();
   }
 
-  public int getStoreReconciledCount() {
+  /* package-private */ int getStoreReconciledCount() {
     return storeReconciledCount.get();
   }
 
   /**
    * Start records flowing through the channel.
    */
   public void flow() {
     if (!isReady()) {
@@ -177,27 +150,17 @@ public class RecordsChannel implements
 
     if (!source.dataAvailable()) {
       Logger.info(LOG_TAG, "No data available: short-circuiting flow from source " + source);
       this.delegate.onFlowCompleted(this);
       return;
     }
 
     sink.setStoreDelegate(this);
-    fetchedCount.set(0);
-    fetchFailedCount.set(0);
-    storeAttemptedCount.set(0);
-    storeAcceptedCount.set(0);
-    storeFailedCount.set(0);
-    storeReconciledCount.set(0);
     storeTracker.reset();
-    // Start a consumer thread.
-    this.consumer = new ConcurrentRecordConsumer(this);
-    ThreadPool.run(this.consumer);
-    waitingForQueueDone = true;
     source.fetchModified(this);
   }
 
   /**
    * Begin both sessions, invoking flow() when done.
    * @throws InvalidSessionTransitionException
    */
   public void beginAndFlow() throws InvalidSessionTransitionException {
@@ -209,131 +172,124 @@ public class RecordsChannel implements
     } catch (SyncException e) {
       delegate.onFlowBeginFailed(this, e);
       return;
     }
 
     this.flow();
   }
 
-  @Override
-  public void store(Record record) {
-    storeAttemptedCount.incrementAndGet();
-    storeTracker.onRecordStoreAttempted();
-    try {
-      sink.store(record);
-    } catch (NoStoreDelegateException e) {
-      Logger.error(LOG_TAG, "Got NoStoreDelegateException in RecordsChannel.store(). This should not occur. Aborting.", e);
-      delegate.onFlowStoreFailed(this, e, record.guid);
-    }
-  }
-
   /* package-local */ ArrayList<StoreBatchTracker.Batch> getStoreBatches() {
     return this.storeTracker.getStoreBatches();
   }
 
   @Override
   public void onFetchFailed(Exception ex) {
     Logger.warn(LOG_TAG, "onFetchFailed. Calling for immediate stop.", ex);
-    fetchFailedCount.incrementAndGet();
+    if (fetchFailed.getAndSet(true)) {
+      return;
+    }
+
     if (ex instanceof ReflowIsNecessaryException) {
       setReflowException((ReflowIsNecessaryException) ex);
     }
+
     delegate.onFlowFetchFailed(this, ex);
-    // Sink will be informed once consumer finishes.
-    this.consumer.halt();
+
+    // We haven't tried storing anything yet, so fine to short-circuit around storeDone.
+    delegate.onFlowCompleted(this);
   }
 
   @Override
   public void onFetchedRecord(Record record) {
-    fetchedCount.incrementAndGet();
+    // Don't bother if we've already failed; we'll just ignore these records later on.
+    if (fetchFailed.get()) {
+      return;
+    }
     this.toProcess.add(record);
-    this.consumer.doNotify();
   }
 
   @Override
   public void onFetchCompleted() {
-    Logger.trace(LOG_TAG, "onFetchCompleted. Stopping consumer once stores are done.");
-    this.consumer.queueFilled();
-  }
+    if (fetchFailed.get()) {
+      return;
+    }
+
+    fetchedCount.set(toProcess.size());
+
+    Logger.info(LOG_TAG, "onFetchCompleted. Fetched " + fetchedCount.get() + " records. Storing...");
 
-  @Override
-  public void onBatchCompleted() {
-    this.sink.storeFlush();
+    try {
+      for (Record record : toProcess) {
+        storeAttemptedCount.incrementAndGet();
+        storeTracker.onRecordStoreAttempted();
+        sink.store(record);
+      }
+    } catch (NoStoreDelegateException e) {
+      // Must not happen, bail out.
+      throw new IllegalStateException(e);
+    }
+
+    // Allow this buffer to be reclaimed.
+    toProcess = null;
+
+    // Now we wait for onStoreComplete
+    Logger.trace(LOG_TAG, "onFetchCompleted. Calling storeDone.");
+    sink.storeDone();
   }
 
   // Sent for "store" batches.
   @Override
   public void onBatchCommitted() {
     storeTracker.onBatchFinished();
   }
 
   @Override
   public void onRecordStoreFailed(Exception ex, String recordGuid) {
     Logger.trace(LOG_TAG, "Failed to store record with guid " + recordGuid);
     storeFailedCount.incrementAndGet();
     storeTracker.onRecordStoreFailed();
-    this.consumer.stored();
     delegate.onFlowStoreFailed(this, ex, recordGuid);
-    // TODO: abort?
   }
 
   @Override
   public void onRecordStoreSucceeded(String guid) {
-    Logger.trace(LOG_TAG, "Stored record with guid " + guid);
     storeAcceptedCount.incrementAndGet();
     storeTracker.onRecordStoreSucceeded();
-    this.consumer.stored();
   }
 
   @Override
   public void onRecordStoreReconciled(String guid, String oldGuid, Integer newVersion) {
-    Logger.trace(LOG_TAG, "Reconciled record with guid " + guid);
     storeReconciledCount.incrementAndGet();
   }
 
   @Override
-  public void consumerIsDoneFull() {
-    Logger.trace(LOG_TAG, "Consumer is done, processed all records. Are we waiting for it? " + waitingForQueueDone);
-    if (waitingForQueueDone) {
-      waitingForQueueDone = false;
-
-      // Now we'll be waiting for sink to call its delegate's onStoreCompleted or onStoreFailed.
-      this.sink.storeDone();
-    }
-  }
-
-  @Override
-  public void consumerIsDonePartial() {
-    Logger.trace(LOG_TAG, "Consumer is done, processed some records. Are we waiting for it? " + waitingForQueueDone);
-    if (waitingForQueueDone) {
-      waitingForQueueDone = false;
-
-      // Let sink clean up or flush records if necessary.
-      this.sink.storeIncomplete();
-
-      delegate.onFlowCompleted(this);
-    }
-  }
-
-  @Override
   public void onStoreCompleted() {
-    Logger.trace(LOG_TAG, "onStoreCompleted. Notifying delegate of onFlowCompleted.");
+    Logger.info(LOG_TAG, "Performing source cleanup.");
     // Source might have used caches used to facilitate flow of records, so now is a good
     // time to clean up. Particularly pertinent for buffered sources.
     // Rephrasing this in a more concrete way, buffers are cleared only once records have been merged
     // locally and results of the merge have been uploaded to the server successfully.
     this.source.performCleanup();
+
+    if (storeFailed.get()) {
+      return;
+    }
+
+    Logger.info(LOG_TAG, "onStoreCompleted. Attempted to store " + storeAttemptedCount.get() + " records; Store accepted " + storeAcceptedCount.get() + ", reconciled " + storeReconciledCount.get() + ", failed " + storeFailedCount.get());
     delegate.onFlowCompleted(this);
-
   }
 
   @Override
   public void onStoreFailed(Exception ex) {
-    Logger.warn(LOG_TAG, "onStoreFailed. Calling for immediate stop.", ex);
+    if (storeFailed.getAndSet(true)) {
+      return;
+    }
+
+    Logger.info(LOG_TAG, "onStoreFailed. Calling for immediate stop.", ex);
     if (ex instanceof ReflowIsNecessaryException) {
       setReflowException((ReflowIsNecessaryException) ex);
     }
 
     storeTracker.onBatchFailed();
 
     // NB: consumer might or might not be running at this point. There are two cases here:
     // 1) If we're storing records remotely, we might fail due to a 412.
@@ -342,23 +298,16 @@ public class RecordsChannel implements
     // that we're done with this flow. Based on the reflow exception, it'll determine what to do.
 
     // 2) If we're storing (merging) records locally, we might fail due to a sync deadline.
     // -- we might hit a deadline only prior to attempting to merge records,
     // -- at which point consumer would have finished already, and storeDone was called.
     // Action: consumer state is known (done), so we can ignore it safely and inform our delegate
     // that we're done.
 
-    // Prevent "once consumer is done..." actions from taking place. They already have (case 2), or
-    // we don't need them (case 1).
-    waitingForQueueDone = false;
-
-    // If consumer is still going at it, tell it to stop.
-    this.consumer.halt();
-
     delegate.onFlowStoreFailed(this, ex, null);
     delegate.onFlowCompleted(this);
   }
 
   @Override
   public RepositorySessionStoreDelegate deferredStoreDelegate(final ExecutorService executor) {
     return new DeferredRepositorySessionStoreDelegate(this, executor);
   }
--- a/mobile/android/services/src/main/java/org/mozilla/gecko/sync/synchronizer/Synchronizer.java
+++ b/mobile/android/services/src/main/java/org/mozilla/gecko/sync/synchronizer/Synchronizer.java
@@ -25,19 +25,19 @@ import android.content.Context;
  * store, or session error while synchronizing.
  *
  * After synchronizing, call `save` to get back a SynchronizerConfiguration with
  * updated bundle information.
  */
 public class Synchronizer implements SynchronizerSessionDelegate {
   public static final String LOG_TAG = "SyncDelSDelegate";
 
-  protected String configSyncID; // Used to pass syncID from load() back into save().
+  private String configSyncID; // Used to pass syncID from load() back into save().
 
-  protected SynchronizerDelegate synchronizerDelegate;
+  private SynchronizerDelegate synchronizerDelegate;
 
   protected SynchronizerSession session = null;
 
   public SynchronizerSession getSynchronizerSession() {
     return session;
   }
 
   @Override
--- a/mobile/android/services/src/main/java/org/mozilla/gecko/sync/synchronizer/SynchronizerSession.java
+++ b/mobile/android/services/src/main/java/org/mozilla/gecko/sync/synchronizer/SynchronizerSession.java
@@ -263,17 +263,17 @@ public class SynchronizerSession impleme
     }
 
     final SynchronizerSession session = this;
 
     // TODO: failed record handling.
 
     // This is the *second* record channel to flow.
     // I, SynchronizerSession, am the delegate for the *second* flow.
-    channelBToA = new RecordsChannel(this.sessionB, this.sessionA, this);
+    channelBToA = getRecordsChannel(this.sessionB, this.sessionA, this);
 
     // This is the delegate for the *first* flow.
     RecordsChannelDelegate channelAToBDelegate = new RecordsChannelDelegate() {
       @Override
       public void onFlowCompleted(RecordsChannel recordsChannel) {
         session.onFirstFlowCompleted(recordsChannel);
       }
 
@@ -295,26 +295,30 @@ public class SynchronizerSession impleme
         // Currently we're just recording the very last exception which occurred. This is a reasonable
         // approach, but ideally we'd want to categorize the exceptions and count them for the purposes
         // of better telemetry. See Bug 1362208.
         storeFailedCauseException = ex;
       }
     };
 
     // This is the *first* channel to flow.
-    channelAToB = new RecordsChannel(this.sessionA, this.sessionB, channelAToBDelegate);
+    channelAToB = getRecordsChannel(this.sessionA, this.sessionB, channelAToBDelegate);
 
     Logger.trace(LOG_TAG, "Starting A to B flow. Channel is " + channelAToB);
     try {
       channelAToB.beginAndFlow();
     } catch (InvalidSessionTransitionException e) {
       onFlowBeginFailed(channelAToB, e);
     }
   }
 
+  protected RecordsChannel getRecordsChannel(RepositorySession sink, RepositorySession source, RecordsChannelDelegate delegate) {
+    return new RecordsChannel(sink, source, delegate);
+  }
+
   /**
    * Called after the first flow completes.
    * <p>
    * By default, any fetch and store failures are ignored.
    * @param recordsChannel the <code>RecordsChannel</code> (for error testing).
    */
   public void onFirstFlowCompleted(RecordsChannel recordsChannel) {
     // If a "reflow exception" was thrown, consider this synchronization failed.
@@ -322,19 +326,18 @@ public class SynchronizerSession impleme
     if (reflowException != null) {
       final String message = "Reflow is necessary: " + reflowException;
       Logger.warn(LOG_TAG, message + " Aborting session.");
       delegate.onSynchronizeFailed(this, reflowException, message);
       return;
     }
 
     // Fetch failures always abort.
-    int numRemoteFetchFailed = recordsChannel.getFetchFailureCount();
-    if (numRemoteFetchFailed > 0) {
-      final String message = "Got " + numRemoteFetchFailed + " failures fetching remote records!";
+    if (recordsChannel.didFetchFail()) {
+      final String message = "Saw failures fetching remote records!";
       Logger.warn(LOG_TAG, message + " Aborting session.");
       delegate.onSynchronizeFailed(this, new FetchFailedException(), message);
       return;
     }
     Logger.trace(LOG_TAG, "No failures fetching remote records.");
 
     // Local store failures are ignored.
     int numLocalStoreFailed = recordsChannel.getStoreFailureCount();
@@ -369,19 +372,18 @@ public class SynchronizerSession impleme
     if (reflowException != null) {
       final String message = "Reflow is necessary: " + reflowException;
       Logger.warn(LOG_TAG, message + " Aborting session.");
       delegate.onSynchronizeFailed(this, reflowException, message);
       return;
     }
 
     // Fetch failures always abort.
-    int numLocalFetchFailed = recordsChannel.getFetchFailureCount();
-    if (numLocalFetchFailed > 0) {
-      final String message = "Got " + numLocalFetchFailed + " failures fetching local records!";
+    if (recordsChannel.didFetchFail()) {
+      final String message = "Saw failures fetching local records!";
       Logger.warn(LOG_TAG, message + " Aborting session.");
       delegate.onSynchronizeFailed(this, new FetchFailedException(), message);
       return;
     }
     Logger.trace(LOG_TAG, "No failures fetching local records.");
 
     // Remote store failures abort!
     int numRemoteStoreFailed = recordsChannel.getStoreFailureCount();
--- a/mobile/android/services/src/test/java/org/mozilla/android/sync/test/SynchronizerHelpers.java
+++ b/mobile/android/services/src/test/java/org/mozilla/android/sync/test/SynchronizerHelpers.java
@@ -77,21 +77,16 @@ public class SynchronizerHelpers {
             }
 
             @Override
             public void onFetchCompleted() {
               delegate.onFetchCompleted();
             }
 
             @Override
-            public void onBatchCompleted() {
-
-            }
-
-            @Override
             public RepositorySessionFetchRecordsDelegate deferredFetchDelegate(ExecutorService executor) {
               return this;
             }
           });
         }
       };
     }
   }
--- a/mobile/android/services/src/test/java/org/mozilla/android/sync/test/TestRecordsChannel.java
+++ b/mobile/android/services/src/test/java/org/mozilla/android/sync/test/TestRecordsChannel.java
@@ -13,23 +13,25 @@ import org.mozilla.gecko.background.test
 import org.mozilla.gecko.background.testhelpers.WaitHelper;
 import org.mozilla.gecko.sync.CollectionConcurrentModificationException;
 import org.mozilla.gecko.sync.SyncDeadlineReachedException;
 import org.mozilla.gecko.sync.repositories.InactiveSessionException;
 import org.mozilla.gecko.sync.repositories.InvalidSessionTransitionException;
 import org.mozilla.gecko.sync.repositories.RepositorySession;
 import org.mozilla.gecko.sync.repositories.RepositorySessionBundle;
 import org.mozilla.gecko.sync.repositories.domain.BookmarkRecord;
+import org.mozilla.gecko.sync.synchronizer.NonBufferingRecordsChannel;
 import org.mozilla.gecko.sync.synchronizer.RecordsChannel;
 import org.mozilla.gecko.sync.synchronizer.RecordsChannelDelegate;
 
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
 
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertTrue;
 
 @RunWith(TestRunner.class)
 public class TestRecordsChannel {
 
   private WBORepository sourceRepository;
   private RepositorySession sourceSession;
@@ -37,29 +39,27 @@ public class TestRecordsChannel {
   private RepositorySession sinkSession;
 
   private RecordsChannelDelegate rcDelegate;
 
   private AtomicInteger numFlowFetchFailed;
   private AtomicInteger numFlowStoreFailed;
   private AtomicInteger numFlowCompleted;
   private AtomicBoolean flowBeginFailed;
-  private AtomicBoolean flowFinishFailed;
 
   private volatile RecordsChannel recordsChannel;
   private volatile Exception fetchException;
   private volatile Exception storeException;
 
   @Before
   public void setUp() throws Exception {
     numFlowFetchFailed = new AtomicInteger(0);
     numFlowStoreFailed = new AtomicInteger(0);
     numFlowCompleted = new AtomicInteger(0);
     flowBeginFailed = new AtomicBoolean(false);
-    flowFinishFailed = new AtomicBoolean(false);
 
     // Repositories and sessions will be set/created by tests.
     sourceRepository = null;
     sourceSession = null;
     sinkRepository = null;
     sinkSession = null;
 
     rcDelegate = new RecordsChannelDelegate() {
@@ -107,21 +107,25 @@ public class TestRecordsChannel {
     };
   }
 
   private void createSessions() {
     sourceSession = sourceRepository.createSession(null);
     sinkSession = sinkRepository.createSession(null);
   }
 
-  public void doFlow() throws Exception {
+  public void doFlow(boolean nonBuffering) throws Exception {
     createSessions();
     assertNotNull(sourceSession);
     assertNotNull(sinkSession);
-    recordsChannel = new RecordsChannel(sourceSession,  sinkSession, rcDelegate);
+    if (nonBuffering) {
+      recordsChannel = new NonBufferingRecordsChannel(sourceSession, sinkSession, rcDelegate);
+    } else {
+      recordsChannel = new RecordsChannel(sourceSession, sinkSession, rcDelegate);
+    }
     WaitHelper.getTestWaiter().performWait(new Runnable() {
       @Override
       public void run() {
         try {
           recordsChannel.beginAndFlow();
         } catch (InvalidSessionTransitionException e) {
           WaitHelper.getTestWaiter().performNotify(e);
         }
@@ -168,146 +172,286 @@ public class TestRecordsChannel {
     }
     return repo;
   }
 
   @Test
   public void testSuccess() throws Exception {
     sourceRepository = full();
     sinkRepository = empty();
-    doFlow();
+    doFlow(false);
     assertEquals(1, numFlowCompleted.get());
     assertEquals(0, numFlowFetchFailed.get());
     assertEquals(0, numFlowStoreFailed.get());
     assertEquals(sourceRepository.wbos, sinkRepository.wbos);
-    assertEquals(0, recordsChannel.getFetchFailureCount());
+    assertFalse(recordsChannel.didFetchFail());
+    assertEquals(0, recordsChannel.getStoreFailureCount());
+    assertEquals(6, recordsChannel.getStoreAttemptedCount());
+  }
+
+  @Test
+  public void testSuccessNB() throws Exception {
+    sourceRepository = full();
+    sinkRepository = empty();
+    doFlow(true);
+    assertEquals(1, numFlowCompleted.get());
+    assertEquals(0, numFlowFetchFailed.get());
+    assertEquals(0, numFlowStoreFailed.get());
+    assertEquals(sourceRepository.wbos, sinkRepository.wbos);
+    assertFalse(recordsChannel.didFetchFail());
     assertEquals(0, recordsChannel.getStoreFailureCount());
     assertEquals(6, recordsChannel.getStoreAttemptedCount());
   }
 
   @Test
   public void testFetchFail() throws Exception {
     sourceRepository = failingFetch(SynchronizerHelpers.FailMode.FETCH);
     sinkRepository = empty();
-    doFlow();
+    doFlow(false);
     assertEquals(1, numFlowCompleted.get());
     assertTrue(numFlowFetchFailed.get() > 0);
     assertEquals(0, numFlowStoreFailed.get());
     assertTrue(sinkRepository.wbos.size() < 6);
-    assertTrue(recordsChannel.getFetchFailureCount() > 0);
+    assertTrue(recordsChannel.didFetchFail());
+    assertEquals(0, recordsChannel.getStoreFailureCount());
+    assertTrue(recordsChannel.getStoreAttemptedCount() < 6);
+  }
+
+  @Test
+  public void testFetchFailNB() throws Exception {
+    sourceRepository = failingFetch(SynchronizerHelpers.FailMode.FETCH);
+    sinkRepository = empty();
+    doFlow(true);
+    assertEquals(1, numFlowCompleted.get());
+    assertTrue(numFlowFetchFailed.get() > 0);
+    assertEquals(0, numFlowStoreFailed.get());
+    assertTrue(sinkRepository.wbos.size() < 6);
+    assertTrue(recordsChannel.didFetchFail());
     assertEquals(0, recordsChannel.getStoreFailureCount());
     assertTrue(recordsChannel.getStoreAttemptedCount() < 6);
   }
 
   @Test
   public void testStoreFetchFailedCollectionModified() throws Exception {
     sourceRepository = failingFetch(SynchronizerHelpers.FailMode.COLLECTION_MODIFIED);
     sinkRepository = empty();
-    doFlow();
+    doFlow(false);
     assertEquals(1, numFlowCompleted.get());
     assertTrue(numFlowFetchFailed.get() > 0);
     assertEquals(0, numFlowStoreFailed.get());
     assertTrue(sinkRepository.wbos.size() < 6);
 
-    assertTrue(recordsChannel.getFetchFailureCount() > 0);
+    assertTrue(recordsChannel.didFetchFail());
+    assertEquals(0, recordsChannel.getStoreFailureCount());
+    assertTrue(recordsChannel.getStoreAttemptedCount() < sourceRepository.wbos.size());
+
+    assertEquals(CollectionConcurrentModificationException.class, fetchException.getClass());
+    final Exception ex = recordsChannel.getReflowException();
+    assertNotNull(ex);
+    assertEquals(CollectionConcurrentModificationException.class, ex.getClass());
+  }
+
+  @Test
+  public void testStoreFetchFailedCollectionModifiedNB() throws Exception {
+    sourceRepository = failingFetch(SynchronizerHelpers.FailMode.COLLECTION_MODIFIED);
+    sinkRepository = empty();
+    doFlow(true);
+    assertEquals(1, numFlowCompleted.get());
+    assertTrue(numFlowFetchFailed.get() > 0);
+    assertEquals(0, numFlowStoreFailed.get());
+    assertTrue(sinkRepository.wbos.size() < 6);
+
+    assertTrue(recordsChannel.didFetchFail());
     assertEquals(0, recordsChannel.getStoreFailureCount());
     assertTrue(recordsChannel.getStoreAttemptedCount() < sourceRepository.wbos.size());
 
     assertEquals(CollectionConcurrentModificationException.class, fetchException.getClass());
     final Exception ex = recordsChannel.getReflowException();
     assertNotNull(ex);
     assertEquals(CollectionConcurrentModificationException.class, ex.getClass());
   }
 
   @Test
   public void testStoreFetchFailedDeadline() throws Exception {
     sourceRepository = failingFetch(SynchronizerHelpers.FailMode.DEADLINE_REACHED);
     sinkRepository = empty();
-    doFlow();
+    doFlow(false);
     assertEquals(1, numFlowCompleted.get());
     assertTrue(numFlowFetchFailed.get() > 0);
     assertEquals(0, numFlowStoreFailed.get());
     assertTrue(sinkRepository.wbos.size() < 6);
 
-    assertTrue(recordsChannel.getFetchFailureCount() > 0);
+    assertTrue(recordsChannel.didFetchFail());
+    assertEquals(0, recordsChannel.getStoreFailureCount());
+    assertTrue(recordsChannel.getStoreAttemptedCount() < sourceRepository.wbos.size());
+
+    assertEquals(SyncDeadlineReachedException.class, fetchException.getClass());
+    final Exception ex = recordsChannel.getReflowException();
+    assertNotNull(ex);
+    assertEquals(SyncDeadlineReachedException.class, ex.getClass());
+  }
+
+  @Test
+  public void testStoreFetchFailedDeadlineNB() throws Exception {
+    sourceRepository = failingFetch(SynchronizerHelpers.FailMode.DEADLINE_REACHED);
+    sinkRepository = empty();
+    doFlow(true);
+    assertEquals(1, numFlowCompleted.get());
+    assertTrue(numFlowFetchFailed.get() > 0);
+    assertEquals(0, numFlowStoreFailed.get());
+    assertTrue(sinkRepository.wbos.size() < 6);
+
+    assertTrue(recordsChannel.didFetchFail());
     assertEquals(0, recordsChannel.getStoreFailureCount());
     assertTrue(recordsChannel.getStoreAttemptedCount() < sourceRepository.wbos.size());
 
     assertEquals(SyncDeadlineReachedException.class, fetchException.getClass());
     final Exception ex = recordsChannel.getReflowException();
     assertNotNull(ex);
     assertEquals(SyncDeadlineReachedException.class, ex.getClass());
   }
 
   @Test
   public void testStoreSerialFail() throws Exception {
     sourceRepository = full();
     sinkRepository = new SynchronizerHelpers.SerialFailStoreWBORepository(
             SynchronizerHelpers.FailMode.STORE);
-    doFlow();
+    doFlow(false);
     assertEquals(1, numFlowCompleted.get());
     assertEquals(0, numFlowFetchFailed.get());
     assertEquals(1, numFlowStoreFailed.get());
     // We will fail to store one of the records but expect flow to continue.
     assertEquals(5, sinkRepository.wbos.size());
 
-    assertEquals(0, recordsChannel.getFetchFailureCount());
+    assertFalse(recordsChannel.didFetchFail());
+    assertEquals(1, recordsChannel.getStoreFailureCount());
+    // Number of store attempts.
+    assertEquals(sourceRepository.wbos.size(), recordsChannel.getStoreAttemptedCount());
+  }
+
+  @Test
+  public void testStoreSerialFailNB() throws Exception {
+    sourceRepository = full();
+    sinkRepository = new SynchronizerHelpers.SerialFailStoreWBORepository(
+            SynchronizerHelpers.FailMode.STORE);
+    doFlow(true);
+    assertEquals(1, numFlowCompleted.get());
+    assertEquals(0, numFlowFetchFailed.get());
+    assertEquals(1, numFlowStoreFailed.get());
+    // We will fail to store one of the records but expect flow to continue.
+    assertEquals(5, sinkRepository.wbos.size());
+
+    assertFalse(recordsChannel.didFetchFail());
     assertEquals(1, recordsChannel.getStoreFailureCount());
     // Number of store attempts.
     assertEquals(sourceRepository.wbos.size(), recordsChannel.getStoreAttemptedCount());
   }
 
   @Test
   public void testStoreSerialFailCollectionModified() throws Exception {
     sourceRepository = full();
     sinkRepository = new SynchronizerHelpers.SerialFailStoreWBORepository(
             SynchronizerHelpers.FailMode.COLLECTION_MODIFIED);
-    doFlow();
+    doFlow(false);
     assertEquals(1, numFlowCompleted.get());
     assertEquals(0, numFlowFetchFailed.get());
     assertEquals(1, numFlowStoreFailed.get());
     // One of the records will fail, at which point we'll stop flowing them.
     final int sunkenRecords = sinkRepository.wbos.size();
     assertTrue(sunkenRecords > 0 && sunkenRecords < 6);
 
-    assertEquals(0, recordsChannel.getFetchFailureCount());
+    assertFalse(recordsChannel.didFetchFail());
+    // RecordChannel's storeFail count is only incremented for failures of individual records.
+    assertEquals(0, recordsChannel.getStoreFailureCount());
+
+    assertEquals(CollectionConcurrentModificationException.class, storeException.getClass());
+    final Exception ex = recordsChannel.getReflowException();
+    assertNotNull(ex);
+    assertEquals(CollectionConcurrentModificationException.class, ex.getClass());
+  }
+
+  @Test
+  public void testStoreSerialFailCollectionModifiedNB() throws Exception {
+    sourceRepository = full();
+    sinkRepository = new SynchronizerHelpers.SerialFailStoreWBORepository(
+            SynchronizerHelpers.FailMode.COLLECTION_MODIFIED);
+    doFlow(true);
+    assertEquals(1, numFlowCompleted.get());
+    assertEquals(0, numFlowFetchFailed.get());
+    assertEquals(1, numFlowStoreFailed.get());
+    // One of the records will fail, at which point we'll stop flowing them.
+    final int sunkenRecords = sinkRepository.wbos.size();
+    assertTrue(sunkenRecords > 0 && sunkenRecords < 6);
+
+    assertFalse(recordsChannel.didFetchFail());
     // RecordChannel's storeFail count is only incremented for failures of individual records.
     assertEquals(0, recordsChannel.getStoreFailureCount());
 
     assertEquals(CollectionConcurrentModificationException.class, storeException.getClass());
     final Exception ex = recordsChannel.getReflowException();
     assertNotNull(ex);
     assertEquals(CollectionConcurrentModificationException.class, ex.getClass());
   }
 
   @Test
   public void testStoreBatchesFail() throws Exception {
     sourceRepository = full();
     sinkRepository = new SynchronizerHelpers.BatchFailStoreWBORepository(3);
-    doFlow();
+    doFlow(false);
     assertEquals(1, numFlowCompleted.get());
     assertEquals(0, numFlowFetchFailed.get());
     assertEquals(3, numFlowStoreFailed.get()); // One batch fails.
     assertEquals(3, sinkRepository.wbos.size()); // One batch succeeds.
 
-    assertEquals(0, recordsChannel.getFetchFailureCount());
+    assertFalse(recordsChannel.didFetchFail());
+    assertEquals(3, recordsChannel.getStoreFailureCount());
+    // Number of store attempts.
+    assertEquals(sourceRepository.wbos.size(), recordsChannel.getStoreAttemptedCount());
+  }
+
+  @Test
+  public void testStoreBatchesFailNB() throws Exception {
+    sourceRepository = full();
+    sinkRepository = new SynchronizerHelpers.BatchFailStoreWBORepository(3);
+    doFlow(true);
+    assertEquals(1, numFlowCompleted.get());
+    assertEquals(0, numFlowFetchFailed.get());
+    assertEquals(3, numFlowStoreFailed.get()); // One batch fails.
+    assertEquals(3, sinkRepository.wbos.size()); // One batch succeeds.
+
+    assertFalse(recordsChannel.didFetchFail());
     assertEquals(3, recordsChannel.getStoreFailureCount());
     // Number of store attempts.
     assertEquals(sourceRepository.wbos.size(), recordsChannel.getStoreAttemptedCount());
   }
 
-
   @Test
   public void testStoreOneBigBatchFail() throws Exception {
     sourceRepository = full();
     sinkRepository = new SynchronizerHelpers.BatchFailStoreWBORepository(50);
-    doFlow();
+    doFlow(false);
     assertEquals(1, numFlowCompleted.get());
     assertEquals(0, numFlowFetchFailed.get());
     assertEquals(6, numFlowStoreFailed.get()); // One (big) batch fails.
     assertEquals(0, sinkRepository.wbos.size()); // No batches succeed.
 
-    assertEquals(0, recordsChannel.getFetchFailureCount());
+    assertFalse(recordsChannel.didFetchFail());
+    assertEquals(6, recordsChannel.getStoreFailureCount());
+    // Number of store attempts.
+    assertEquals(sourceRepository.wbos.size(), recordsChannel.getStoreAttemptedCount());
+  }
+
+  @Test
+  public void testStoreOneBigBatchFailNB() throws Exception {
+    sourceRepository = full();
+    sinkRepository = new SynchronizerHelpers.BatchFailStoreWBORepository(50);
+    doFlow(true);
+    assertEquals(1, numFlowCompleted.get());
+    assertEquals(0, numFlowFetchFailed.get());
+    assertEquals(6, numFlowStoreFailed.get()); // One (big) batch fails.
+    assertEquals(0, sinkRepository.wbos.size()); // No batches succeed.
+
+    assertFalse(recordsChannel.didFetchFail());
     assertEquals(6, recordsChannel.getStoreFailureCount());
     // Number of store attempts.
     assertEquals(sourceRepository.wbos.size(), recordsChannel.getStoreAttemptedCount());
   }
 }
--- a/mobile/android/services/src/test/java/org/mozilla/android/sync/test/helpers/ExpectSuccessRepositorySessionFetchRecordsDelegate.java
+++ b/mobile/android/services/src/test/java/org/mozilla/android/sync/test/helpers/ExpectSuccessRepositorySessionFetchRecordsDelegate.java
@@ -33,17 +33,12 @@ public class ExpectSuccessRepositorySess
 
   @Override
   public void onFetchCompleted() {
     log("Fetch completed.");
     performNotify();
   }
 
   @Override
-  public void onBatchCompleted() {
-    log("Batch completed.");
-  }
-
-  @Override
   public RepositorySessionFetchRecordsDelegate deferredFetchDelegate(ExecutorService executor) {
     return this;
   }
 }
deleted file mode 100644
--- a/mobile/android/services/src/test/java/org/mozilla/gecko/sync/middleware/BufferingMiddlewareRepositorySessionTest.java
+++ /dev/null
@@ -1,178 +0,0 @@
-/* Any copyright is dedicated to the Public Domain.
-   http://creativecommons.org/publicdomain/zero/1.0/ */
-
-package org.mozilla.gecko.sync.middleware;
-
-import org.junit.Before;
-import org.junit.Test;
-import org.mozilla.gecko.background.testhelpers.MockRecord;
-import org.mozilla.gecko.sync.middleware.storage.BufferStorage;
-import org.mozilla.gecko.sync.middleware.storage.MemoryBufferStorage;
-import org.mozilla.gecko.sync.repositories.Repository;
-import org.mozilla.gecko.sync.repositories.RepositorySession;
-import org.mozilla.gecko.sync.repositories.delegates.RepositorySessionStoreDelegate;
-import org.mozilla.gecko.sync.repositories.domain.Record;
-
-import static org.mockito.Matchers.any;
-import static org.mockito.Mockito.mock;
-
-import static org.junit.Assert.*;
-import static org.mockito.Mockito.never;
-import static org.mockito.Mockito.reset;
-import static org.mockito.Mockito.times;
-import static org.mockito.Mockito.verify;
-
-public class BufferingMiddlewareRepositorySessionTest {
-    private RepositorySession innerRepositorySession;
-    private BufferingMiddlewareRepositorySession bufferingSession;
-    private BufferingMiddlewareRepositorySession bufferingSessionMocked;
-    private BufferStorage bufferStorage;
-    private BufferStorage bufferStorageMocked;
-
-    @Before
-    public void setUp() throws Exception {
-        BufferingMiddlewareRepository bufferingRepository;
-        Repository innerRepositoy;
-
-        innerRepositoy = mock(Repository.class);
-        innerRepositorySession = mock(RepositorySession.class);
-        bufferingRepository = new BufferingMiddlewareRepository(
-                0L,
-                new MemoryBufferStorage(),
-                innerRepositoy
-        );
-
-        bufferStorage = new MemoryBufferStorage();
-        bufferStorageMocked = mock(MemoryBufferStorage.class);
-
-        bufferingSession = new BufferingMiddlewareRepositorySession(
-                innerRepositorySession, bufferingRepository, 0L,
-                bufferStorage);
-
-        bufferingSessionMocked = new BufferingMiddlewareRepositorySession(
-                innerRepositorySession, bufferingRepository, 0L,
-                bufferStorageMocked);
-    }
-
-    @Test
-    public void store() throws Exception {
-        assertEquals(0, bufferStorage.all().size());
-
-        MockRecord record = new MockRecord("guid1", null, 1, false);
-        bufferingSession.store(record);
-        assertEquals(1, bufferStorage.all().size());
-
-        MockRecord record1 = new MockRecord("guid2", null, 1, false);
-        bufferingSession.store(record1);
-        assertEquals(2, bufferStorage.all().size());
-
-        // record2 must replace record.
-        MockRecord record2 = new MockRecord("guid1", null, 2, false);
-        bufferingSession.store(record2);
-        assertEquals(2, bufferStorage.all().size());
-
-        // Ensure inner session doesn't see incoming records.
-        verify(innerRepositorySession, never()).store(record);
-        verify(innerRepositorySession, never()).store(record1);
-        verify(innerRepositorySession, never()).store(record2);
-    }
-
-    @Test
-    public void storeDone() throws Exception {
-        // Trivial case, no records to merge.
-        bufferingSession.doMergeBuffer();
-        verify(innerRepositorySession, times(1)).storeDone();
-        verify(innerRepositorySession, never()).store(any(Record.class));
-
-        // Reset call counters.
-        reset(innerRepositorySession);
-
-        // Now store some records.
-        MockRecord record = new MockRecord("guid1", null, 1, false);
-        bufferingSession.store(record);
-
-        MockRecord record2 = new MockRecord("guid2", null, 13, false);
-        bufferingSession.store(record2);
-
-        MockRecord record3 = new MockRecord("guid3", null, 5, false);
-        bufferingSession.store(record3);
-
-        // NB: same guid as above.
-        MockRecord record4 = new MockRecord("guid3", null, -1, false);
-        bufferingSession.store(record4);
-
-        // Done storing.
-        bufferingSession.doMergeBuffer();
-
-        // Ensure all records where stored in the wrapped session.
-        verify(innerRepositorySession, times(1)).store(record);
-        verify(innerRepositorySession, times(1)).store(record2);
-        verify(innerRepositorySession, times(1)).store(record4);
-
-        // Ensure storeDone was called on the wrapped session.
-        verify(innerRepositorySession, times(1)).storeDone();
-
-        // Ensure buffer wasn't cleared on the wrapped session.
-        assertEquals(3, bufferStorage.all().size());
-    }
-
-    @Test
-    public void storeFlush() throws Exception {
-        verify(bufferStorageMocked, times(0)).flush();
-        bufferingSessionMocked.storeFlush();
-        verify(bufferStorageMocked, times(1)).flush();
-    }
-
-    @Test
-    public void performCleanup() throws Exception {
-        // Baseline.
-        assertEquals(0, bufferStorage.all().size());
-
-        // Test that we can call cleanup with an empty buffer storage.
-        bufferingSession.performCleanup();
-        assertEquals(0, bufferStorage.all().size());
-
-        // Store a couple of records.
-        MockRecord record = new MockRecord("guid1", null, 1, false);
-        bufferingSession.store(record);
-
-        MockRecord record2 = new MockRecord("guid2", null, 13, false);
-        bufferingSession.store(record2);
-
-        // Confirm it worked.
-        assertEquals(2, bufferStorage.all().size());
-
-        // Test that buffer storage is cleaned up.
-        bufferingSession.performCleanup();
-        assertEquals(0, bufferStorage.all().size());
-    }
-
-    @Test
-    public void abort() throws Exception {
-        MockRecord record = new MockRecord("guid1", null, 1, false);
-        bufferingSession.store(record);
-
-        MockRecord record2 = new MockRecord("guid2", null, 13, false);
-        bufferingSession.store(record2);
-
-        MockRecord record3 = new MockRecord("guid3", null, 5, false);
-        bufferingSession.store(record3);
-
-        // NB: same guid as above.
-        MockRecord record4 = new MockRecord("guid3", null, -1, false);
-        bufferingSession.store(record4);
-
-        bufferingSession.abort();
-
-        // Verify number of records didn't change.
-        // Abort shouldn't clear the buffer.
-        assertEquals(3, bufferStorage.all().size());
-    }
-
-    @Test
-    public void setStoreDelegate() throws Exception {
-        RepositorySessionStoreDelegate delegate = mock(RepositorySessionStoreDelegate.class);
-        bufferingSession.setStoreDelegate(delegate);
-        verify(innerRepositorySession).setStoreDelegate(delegate);
-    }
-}
\ No newline at end of file
--- a/mobile/android/services/src/test/java/org/mozilla/gecko/sync/repositories/downloaders/BatchingDownloaderDelegateTest.java
+++ b/mobile/android/services/src/test/java/org/mozilla/gecko/sync/repositories/downloaders/BatchingDownloaderDelegateTest.java
@@ -98,21 +98,16 @@ public class BatchingDownloaderDelegateT
         }
 
         @Override
         public void onFetchCompleted() {
 
         }
 
         @Override
-        public void onBatchCompleted() {
-
-        }
-
-        @Override
         public RepositorySessionFetchRecordsDelegate deferredFetchDelegate(ExecutorService executor) {
             return null;
         }
     }
 
     @Before
     public void setUp() throws Exception {
         repositorySession = new Server15RepositorySession(new Server15Repository(
--- a/mobile/android/services/src/test/java/org/mozilla/gecko/sync/repositories/downloaders/BatchingDownloaderTest.java
+++ b/mobile/android/services/src/test/java/org/mozilla/gecko/sync/repositories/downloaders/BatchingDownloaderTest.java
@@ -118,17 +118,16 @@ public class BatchingDownloaderTest {
             return super.resetAndCommit();
         }
     }
 
     static class MockSessionFetchRecordsDelegate implements RepositorySessionFetchRecordsDelegate {
         public boolean isFailure;
         public boolean isFetched;
         public boolean isSuccess;
-        public int batchesCompleted;
         public Exception ex;
         public Record record;
 
         @Override
         public void onFetchFailed(Exception ex) {
             this.isFailure = true;
             this.ex = ex;
         }
@@ -140,21 +139,16 @@ public class BatchingDownloaderTest {
         }
 
         @Override
         public void onFetchCompleted() {
             this.isSuccess = true;
         }
 
         @Override
-        public void onBatchCompleted() {
-            this.batchesCompleted += 1;
-        }
-
-        @Override
         public RepositorySessionFetchRecordsDelegate deferredFetchDelegate(ExecutorService executor) {
             return null;
         }
     }
 
     static class MockRequest extends SyncStorageCollectionRequest {
 
         MockRequest(URI uri) {
@@ -291,17 +285,16 @@ public class BatchingDownloaderTest {
         SyncStorageCollectionRequest request = new SyncStorageCollectionRequest(new URI(DEFAULT_COLLECTION_URL));
         mockDownloader.onFetchCompleted(response, sessionFetchRecordsDelegate, request,
                 DEFAULT_NEWER, BATCH_LIMIT, true, DEFAULT_SORT, DEFAULT_IDS);
 
         assertEquals(DEFAULT_LMHEADER, mockDownloader.getLastModified());
         assertTrue(sessionFetchRecordsDelegate.isSuccess);
         assertFalse(sessionFetchRecordsDelegate.isFetched);
         assertFalse(sessionFetchRecordsDelegate.isFailure);
-        assertEquals(0, sessionFetchRecordsDelegate.batchesCompleted);
 
         // NB: we set highWaterMark as part of onFetchedRecord, so we don't expect it to be set here.
         // Expect no offset to be persisted.
         ensureOffsetContextIsNull(repositoryStateProvider);
         assertEquals(1, repositoryStateProvider.getCommitCount());
     }
 
     @Test
@@ -319,17 +312,16 @@ public class BatchingDownloaderTest {
         SyncStorageCollectionRequest request = new SyncStorageCollectionRequest(new URI(DEFAULT_COLLECTION_URL));
         mockDownloader.onFetchCompleted(response, sessionFetchRecordsDelegate, request,
                 DEFAULT_NEWER, BATCH_LIMIT, true, DEFAULT_SORT, DEFAULT_IDS);
 
         assertEquals(DEFAULT_LMHEADER, mockDownloader.getLastModified());
         assertTrue(sessionFetchRecordsDelegate.isSuccess);
         assertFalse(sessionFetchRecordsDelegate.isFetched);
         assertFalse(sessionFetchRecordsDelegate.isFailure);
-        assertEquals(0, sessionFetchRecordsDelegate.batchesCompleted);
 
         // We don't care about the offset in a single batch mode.
         ensureOffsetContextIsNull(repositoryStateProvider);
         assertEquals(1, repositoryStateProvider.getCommitCount());
     }
 
     @Test
     public void testBatching() throws Exception {
@@ -344,62 +336,58 @@ public class BatchingDownloaderTest {
 
         assertEquals(DEFAULT_LMHEADER, mockDownloader.getLastModified());
         // Verify the same parameters are used in the next fetch.
         assertSameParameters(mockDownloader, BATCH_LIMIT);
         assertEquals("25", mockDownloader.offset);
         assertFalse(sessionFetchRecordsDelegate.isSuccess);
         assertFalse(sessionFetchRecordsDelegate.isFetched);
         assertFalse(sessionFetchRecordsDelegate.isFailure);
-        assertEquals(1, sessionFetchRecordsDelegate.batchesCompleted);
 
         // Offset context set.
         ensureOffsetContextIs(repositoryStateProvider, "25", "oldest", 1L);
         assertEquals(1, repositoryStateProvider.getCommitCount());
 
         // The next batch, we still have an offset token and has not exceed the total limit.
         performOnFetchCompleted("50", recordsHeader, BATCH_LIMIT);
 
         assertEquals(DEFAULT_LMHEADER, mockDownloader.getLastModified());
         // Verify the same parameters are used in the next fetch.
         assertSameParameters(mockDownloader, BATCH_LIMIT);
         assertEquals("50", mockDownloader.offset);
         assertFalse(sessionFetchRecordsDelegate.isSuccess);
         assertFalse(sessionFetchRecordsDelegate.isFetched);
         assertFalse(sessionFetchRecordsDelegate.isFailure);
-        assertEquals(2, sessionFetchRecordsDelegate.batchesCompleted);
 
         // Offset context updated.
         ensureOffsetContextIs(repositoryStateProvider, "50", "oldest", 1L);
         assertEquals(2, repositoryStateProvider.getCommitCount());
 
         // The next batch, we still have an offset token and has not exceed the total limit.
         performOnFetchCompleted("75", recordsHeader, BATCH_LIMIT);
 
         assertEquals(DEFAULT_LMHEADER, mockDownloader.getLastModified());
         // Verify the same parameters are used in the next fetch.
         assertSameParameters(mockDownloader, BATCH_LIMIT);
         assertEquals("75", mockDownloader.offset);
         assertFalse(sessionFetchRecordsDelegate.isSuccess);
         assertFalse(sessionFetchRecordsDelegate.isFetched);
         assertFalse(sessionFetchRecordsDelegate.isFailure);
-        assertEquals(3, sessionFetchRecordsDelegate.batchesCompleted);
 
         // Offset context updated.
         ensureOffsetContextIs(repositoryStateProvider, "75", "oldest", 1L);
         assertEquals(3, repositoryStateProvider.getCommitCount());
 
         // No more offset token, so we complete batching.
         performOnFetchCompleted(null, recordsHeader, BATCH_LIMIT);
 
         assertEquals(DEFAULT_LMHEADER, mockDownloader.getLastModified());
         assertTrue(sessionFetchRecordsDelegate.isSuccess);
         assertFalse(sessionFetchRecordsDelegate.isFetched);
         assertFalse(sessionFetchRecordsDelegate.isFailure);
-        assertEquals(3, sessionFetchRecordsDelegate.batchesCompleted);
 
         // Offset context cleared since we finished batching, and committed.
         ensureOffsetContextIsNull(repositoryStateProvider);
         assertEquals(4, repositoryStateProvider.getCommitCount());
     }
 
     @Test
     public void testFailureLMChangedMultiBatch() throws Exception {