Bug 1291821 - WIP first pass on 'green light' checkpoints draft
authorGrisha Kruglov <gkruglov@mozilla.com>
Mon, 03 Oct 2016 17:48:50 -0700
changeset 420291 c6da049063db054f2c169740d74d3888a4c27804
parent 420290 a18971736611ef65277bac41c883dc40e3f4bc99
child 532783 0d60e377cec2254a02cace8b5a8ffa42f97907df
push id31166
push userbmo:gkruglov@mozilla.com
push dateTue, 04 Oct 2016 00:49:33 +0000
bugs1291821
milestone52.0a1
Bug 1291821 - WIP first pass on 'green light' checkpoints MozReview-Commit-ID: GCSsEHGAvGy
mobile/android/services/src/main/java/org/mozilla/gecko/fxa/sync/FxAccountSyncAdapter.java
mobile/android/services/src/main/java/org/mozilla/gecko/sync/GlobalSession.java
mobile/android/services/src/main/java/org/mozilla/gecko/sync/middleware/BufferingMiddlewareRepository.java
mobile/android/services/src/main/java/org/mozilla/gecko/sync/middleware/BufferingMiddlewareRepositorySession.java
mobile/android/services/src/main/java/org/mozilla/gecko/sync/middleware/checkers/ConsistencyChecker.java
mobile/android/services/src/main/java/org/mozilla/gecko/sync/middleware/checkers/NoOpConsistencyChecker.java
mobile/android/services/src/main/java/org/mozilla/gecko/sync/repositories/ConstrainedServer11Repository.java
mobile/android/services/src/main/java/org/mozilla/gecko/sync/repositories/Server11Repository.java
mobile/android/services/src/main/java/org/mozilla/gecko/sync/repositories/downloaders/BatchingDownloader.java
mobile/android/services/src/main/java/org/mozilla/gecko/sync/repositories/downloaders/BatchingDownloaderDelegate.java
mobile/android/services/src/main/java/org/mozilla/gecko/sync/stage/AndroidBrowserBookmarksServerSyncStage.java
mobile/android/services/src/main/java/org/mozilla/gecko/sync/stage/AndroidBrowserHistoryServerSyncStage.java
mobile/android/services/src/main/java/org/mozilla/gecko/sync/stage/FennecTabsServerSyncStage.java
mobile/android/services/src/main/java/org/mozilla/gecko/sync/stage/FormHistoryServerSyncStage.java
mobile/android/services/src/main/java/org/mozilla/gecko/sync/stage/PasswordsServerSyncStage.java
mobile/android/services/src/main/java/org/mozilla/gecko/sync/stage/SafeConstrainedServer11Repository.java
mobile/android/services/src/main/java/org/mozilla/gecko/sync/stage/ServerSyncStage.java
--- a/mobile/android/services/src/main/java/org/mozilla/gecko/fxa/sync/FxAccountSyncAdapter.java
+++ b/mobile/android/services/src/main/java/org/mozilla/gecko/fxa/sync/FxAccountSyncAdapter.java
@@ -55,16 +55,17 @@ import java.net.URI;
 import java.net.URISyntaxException;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.EnumSet;
 import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.TimeUnit;
 
 public class FxAccountSyncAdapter extends AbstractThreadedSyncAdapter {
   private static final String LOG_TAG = FxAccountSyncAdapter.class.getSimpleName();
 
   public static final int NOTIFICATION_ID = LOG_TAG.hashCode();
 
   // Tracks the last seen storage hostname for backoff purposes.
   private static final String PREF_BACKOFF_STORAGE_HOST = "backoffStorageHost";
@@ -226,26 +227,26 @@ public class FxAccountSyncAdapter extend
     if (forced) {
       Logger.info(LOG_TAG, "Forced sync (" + kind + "): overruling remaining backoff of " + delay + "ms.");
     } else {
       Logger.info(LOG_TAG, "Not syncing (" + kind + "): must wait another " + delay + "ms.");
     }
     return forced;
   }
 
-  protected void syncWithAssertion(final String audience,
-                                   final String assertion,
+  protected void syncWithAssertion(final String assertion,
                                    final URI tokenServerEndpointURI,
                                    final BackoffHandler tokenBackoffHandler,
                                    final SharedPreferences sharedPrefs,
                                    final KeyBundle syncKeyBundle,
                                    final String clientState,
                                    final SessionCallback callback,
                                    final Bundle extras,
-                                   final AndroidFxAccount fxAccount) {
+                                   final AndroidFxAccount fxAccount,
+                                   final long syncDeadline) {
     final TokenServerClientDelegate delegate = new TokenServerClientDelegate() {
       private boolean didReceiveBackoff = false;
 
       @Override
       public String getUserAgent() {
         return FxAccountConstants.USER_AGENT;
       }
 
@@ -316,17 +317,17 @@ public class FxAccountSyncAdapter extend
           final Context context = getContext();
           final SyncConfiguration syncConfig = new SyncConfiguration(token.uid, authHeaderProvider, sharedPrefs, syncKeyBundle);
 
           Collection<String> knownStageNames = SyncConfiguration.validEngineNames();
           syncConfig.stagesToSync = Utils.getStagesToSyncFromBundle(knownStageNames, extras);
           syncConfig.setClusterURL(storageServerURI);
 
           globalSession = new GlobalSession(syncConfig, callback, context, clientsDataDelegate);
-          globalSession.start();
+          globalSession.start(syncDeadline);
         } catch (Exception e) {
           callback.handleError(globalSession, e);
           return;
         }
       }
 
       @Override
       public void handleFailure(TokenServerException e) {
@@ -381,16 +382,19 @@ public class FxAccountSyncAdapter extend
   @Override
   public void onPerformSync(final Account account, final Bundle extras, final String authority, ContentProviderClient provider, final SyncResult syncResult) {
     Logger.setThreadLogTag(FxAccountConstants.GLOBAL_LOG_TAG);
     Logger.resetLogging();
 
     final Context context = getContext();
     final AndroidFxAccount fxAccount = new AndroidFxAccount(context, account);
 
+    // Sync can't take longer than 30 minutes.
+    final long syncDeadline = System.currentTimeMillis() + TimeUnit.MINUTES.toMillis(30);
+
     Logger.info(LOG_TAG, "Syncing FxAccount" +
         " account named like " + Utils.obfuscateEmail(account.name) +
         " for authority " + authority +
         " with instance " + this + ".");
 
     Logger.info(LOG_TAG, "Account last synced at: " + fxAccount.getLastSyncedTimestamp());
 
     if (FxAccountUtils.LOG_PERSONAL_INFORMATION) {
@@ -531,17 +535,19 @@ public class FxAccountSyncAdapter extend
               Logger.info(LOG_TAG, "Not syncing (token server).");
               syncDelegate.postponeSync(tokenBackoffHandler.delayMilliseconds());
               return;
             }
 
             final SessionCallback sessionCallback = new SessionCallback(syncDelegate, schedulePolicy);
             final KeyBundle syncKeyBundle = married.getSyncKeyBundle();
             final String clientState = married.getClientState();
-            syncWithAssertion(audience, assertion, tokenServerEndpointURI, tokenBackoffHandler, sharedPrefs, syncKeyBundle, clientState, sessionCallback, extras, fxAccount);
+            syncWithAssertion(
+                    assertion, tokenServerEndpointURI, tokenBackoffHandler, sharedPrefs,
+                    syncKeyBundle, clientState, sessionCallback, extras, fxAccount, syncDeadline);
 
             // Register the device if necessary (asynchronous, in another thread)
             if (fxAccount.getDeviceRegistrationVersion() != FxAccountDeviceRegistrator.DEVICE_REGISTRATION_VERSION
                 || TextUtils.isEmpty(fxAccount.getDeviceId())) {
               FxAccountDeviceRegistrator.register(context);
             }
 
             // Force fetch the profile avatar information. (asynchronous, in another thread)
--- a/mobile/android/services/src/main/java/org/mozilla/gecko/sync/GlobalSession.java
+++ b/mobile/android/services/src/main/java/org/mozilla/gecko/sync/GlobalSession.java
@@ -69,16 +69,18 @@ public class GlobalSession implements Ht
 
   protected Map<Stage, GlobalSyncStage> stages;
   public Stage currentState = Stage.idle;
 
   public final GlobalSessionCallback callback;
   protected final Context context;
   protected final ClientsDataDelegate clientsDelegate;
 
+  private long syncDeadline;
+
   /**
    * Map from engine name to new settings for an updated meta/global record.
    * Engines to remove will have <code>null</code> EngineSettings.
    */
   public final Map<String, EngineSettings> enginesToUpdate = new HashMap<String, EngineSettings>();
 
    /*
    * Key accessors.
@@ -229,16 +231,20 @@ public class GlobalSession implements Ht
         out.add(stage);
       } catch (NoSuchStageException e) {
         Logger.warn(LOG_TAG, "Unable to find stage with name " + name);
       }
     }
     return out;
   }
 
+  public long getSyncDeadline() {
+    return syncDeadline;
+  }
+
   /**
    * Advance and loop around the stages of a sync.
    * @param current
    * @return
    *        The next stage to execute.
    */
   public static Stage nextStage(Stage current) {
     int index = current.ordinal() + 1;
@@ -288,35 +294,40 @@ public class GlobalSession implements Ht
    * <ul>
    * <li>Verifying that any backoffs/minimum next sync requests are respected.</li>
    * <li>Ensuring that the device is online.</li>
    * <li>Ensuring that dependencies are ready.</li>
    * </ul>
    *
    * @throws AlreadySyncingException
    */
-  public void start() throws AlreadySyncingException {
+  public void start(final long syncDeadline) throws AlreadySyncingException {
     if (this.currentState != GlobalSyncStage.Stage.idle) {
       throw new AlreadySyncingException(this.currentState);
     }
+
+    // Make the deadline value available to stages via its getter.
+    this.syncDeadline = syncDeadline;
+
     installAsHttpResponseObserver(); // Uninstalled by completeSync or abort.
     this.advance();
   }
 
   /**
    * Stop this sync and start again.
    * @throws AlreadySyncingException
    */
   protected void restart() throws AlreadySyncingException {
     this.currentState = GlobalSyncStage.Stage.idle;
     if (callback.shouldBackOffStorage()) {
       this.callback.handleAborted(this, "Told to back off.");
       return;
     }
-    this.start();
+    // Restart with the same deadline as before.
+    this.start(syncDeadline);
   }
 
   /**
    * We're finished (aborted or succeeded): release resources.
    */
   protected void cleanUp() {
     uninstallAsHttpResponseObserver();
     this.stages = null;
--- a/mobile/android/services/src/main/java/org/mozilla/gecko/sync/middleware/BufferingMiddlewareRepository.java
+++ b/mobile/android/services/src/main/java/org/mozilla/gecko/sync/middleware/BufferingMiddlewareRepository.java
@@ -11,16 +11,17 @@ import org.mozilla.gecko.sync.middleware
 import org.mozilla.gecko.sync.repositories.Repository;
 import org.mozilla.gecko.sync.repositories.RepositorySession;
 import org.mozilla.gecko.sync.repositories.delegates.RepositorySessionCreationDelegate;
 
 /**
  * @author grisha
  */
 public class BufferingMiddlewareRepository extends MiddlewareRepository {
+    private final long syncDeadline;
     private final Repository inner;
     private final BufferStorage bufferStorage;
     private final ConsistencyChecker consistencyChecker;
 
     public class BufferingMiddlewareRepositorySessionCreationDelegate extends MiddlewareRepository.SessionCreationDelegate {
         private final BufferingMiddlewareRepository repository;
         private final RepositorySessionCreationDelegate outerDelegate;
 
@@ -32,22 +33,23 @@ public class BufferingMiddlewareReposito
         @Override
         public void onSessionCreateFailed(Exception ex) {
             this.outerDelegate.onSessionCreateFailed(ex);
         }
 
         @Override
         public void onSessionCreated(RepositorySession session) {
             outerDelegate.onSessionCreated(new BufferingMiddlewareRepositorySession(
-                    session, this.repository, bufferStorage, consistencyChecker
+                    session, this.repository, syncDeadline, bufferStorage, consistencyChecker
             ));
         }
     }
 
-    public BufferingMiddlewareRepository(ConsistencyChecker consistencyChecker, BufferStorage bufferStore, Repository wrappedRepository) {
+    public BufferingMiddlewareRepository(long syncDeadline, ConsistencyChecker consistencyChecker, BufferStorage bufferStore, Repository wrappedRepository) {
+        this.syncDeadline = syncDeadline;
         this.inner = wrappedRepository;
         this.bufferStorage = bufferStore;
         this.consistencyChecker = consistencyChecker;
     }
 
     @Override
     public void createSession(RepositorySessionCreationDelegate delegate, Context context) {
         this.inner.createSession(
--- a/mobile/android/services/src/main/java/org/mozilla/gecko/sync/middleware/BufferingMiddlewareRepositorySession.java
+++ b/mobile/android/services/src/main/java/org/mozilla/gecko/sync/middleware/BufferingMiddlewareRepositorySession.java
@@ -25,21 +25,23 @@ import java.util.List;
  *
  *  Fetch is pass-through, store is buffered.
  *
  * @author grisha
  */
 public class BufferingMiddlewareRepositorySession extends MiddlewareRepositorySession {
     private final BufferStorage bufferStorage;
     private final ConsistencyChecker consistencyChecker;
+    private final long syncDeadline;
 
     public BufferingMiddlewareRepositorySession(
             RepositorySession repositorySession, MiddlewareRepository repository,
-            BufferStorage bufferStorage, ConsistencyChecker consistencyChecker) {
+            long syncDeadline, BufferStorage bufferStorage, ConsistencyChecker consistencyChecker) {
         super(repositorySession, repository);
+        this.syncDeadline = syncDeadline;
         this.bufferStorage = bufferStorage;
         this.consistencyChecker = consistencyChecker;
     }
 
     @Override
     public void fetchSince(long timestamp, RepositorySessionFetchRecordsDelegate delegate) {
         this.inner.fetchSince(timestamp, delegate);
     }
@@ -69,29 +71,32 @@ public class BufferingMiddlewareReposito
         bufferStorage.flush();
     }
 
     @Override
     public void storeDone(final long end) {
         // Now that records stopped flowing, persist them.
         bufferStorage.flush();
 
-        // Ensure that our buffered records are sane.
-        // What to do in case of problems? Inform the delegate? Try to fix? Ignore inconsistencies?
-        // Could this be a pipeline? E.g.:
-        // records -> consistency check -> fix -> store
-        // But: At this point we still don't have a "common ancestor" and can't do a three way merge.
-        final List<Record> records = bufferStorage.all();
-        if (!consistencyChecker.isValid(records)) {
-            abort();
+        final List<Record> unprocessedBuffer = bufferStorage.all();
+
+        // Determine if we have enough time to perform consistency checks on the buffered data and
+        // then store it. If we don't have enough time now, we keep our buffer and try again later.
+        // We don't store results of a buffer consistency check anywhere, so we can't treat it
+        // separately from storage.
+        if (!mayProceedToPersistBuffer(syncDeadline, unprocessedBuffer.size())) {
+            super.abort();
             return;
         }
 
+        // Process our buffer into a set of records we're happy to persist.
+        final List<Record> processedBuffer = consistencyChecker.processRecords(unprocessedBuffer);
+
         // Flush our buffer to the wrapped local repository. Data goes live!
-        for (Record record : records) {
+        for (Record record : processedBuffer) {
             try {
                 this.inner.store(record);
             } catch (NoStoreDelegateException e) {
                 // At this point we should have a delegate, so this won't happen.
             }
         }
 
         // Clean up the buffer, we don't need it anymore.
@@ -133,9 +138,18 @@ public class BufferingMiddlewareReposito
     @Override
     public long getLastSyncTimestamp() {
         if (bufferStorage.isEmpty()) {
             return super.getLastSyncTimestamp();
         }
 
         return bufferStorage.lastSyncedTimestamp();
     }
+
+    private static boolean mayProceedToPersistBuffer(long syncDeadline, int numberOfRecords) {
+        // TODO we want to know if we'll have enough time to store the buffer. For now, this includes
+        // first running consistency checks against the buffer, and then storing it, which consists of
+        // reconciling records against local data (which consists of bunch of SELECTs and comparisons),
+        // as well as actual INSERT/UPDATE queries.
+        // This is likely to be a function of record type and record count.
+        return true;
+    }
 }
--- a/mobile/android/services/src/main/java/org/mozilla/gecko/sync/middleware/checkers/ConsistencyChecker.java
+++ b/mobile/android/services/src/main/java/org/mozilla/gecko/sync/middleware/checkers/ConsistencyChecker.java
@@ -7,10 +7,10 @@ package org.mozilla.gecko.sync.middlewar
 import org.mozilla.gecko.sync.repositories.domain.Record;
 
 import java.util.List;
 
 /**
  * @author grisha
  */
 public interface ConsistencyChecker {
-    boolean isValid(List<Record> records);
+    List<Record> processRecords(List<Record> records);
 }
--- a/mobile/android/services/src/main/java/org/mozilla/gecko/sync/middleware/checkers/NoOpConsistencyChecker.java
+++ b/mobile/android/services/src/main/java/org/mozilla/gecko/sync/middleware/checkers/NoOpConsistencyChecker.java
@@ -8,12 +8,12 @@ import org.mozilla.gecko.sync.repositori
 
 import java.util.List;
 
 /**
  * @author grisha
  */
 public class NoOpConsistencyChecker implements ConsistencyChecker {
     @Override
-    public boolean isValid(List<Record> records) {
-        return true;
+    public List<Record> processRecords(List<Record> records) {
+        return records;
     }
 }
--- a/mobile/android/services/src/main/java/org/mozilla/gecko/sync/repositories/ConstrainedServer11Repository.java
+++ b/mobile/android/services/src/main/java/org/mozilla/gecko/sync/repositories/ConstrainedServer11Repository.java
@@ -17,23 +17,23 @@ import org.mozilla.gecko.sync.net.AuthHe
  *
  */
 public class ConstrainedServer11Repository extends Server11Repository {
 
   private final String sort;
   private final long batchLimit;
   private final long totalLimit;
 
-  public ConstrainedServer11Repository(String collection, String storageURL,
+  public ConstrainedServer11Repository(String collection, long syncDeadline, String storageURL,
                                        AuthHeaderProvider authHeaderProvider,
                                        InfoCollections infoCollections,
                                        InfoConfiguration infoConfiguration,
                                        long batchLimit, long totalLimit, String sort)
           throws URISyntaxException {
-    super(collection, storageURL, authHeaderProvider, infoCollections, infoConfiguration);
+    super(collection, syncDeadline, storageURL, authHeaderProvider, infoCollections, infoConfiguration);
     this.batchLimit = batchLimit;
     this.totalLimit = totalLimit;
     this.sort  = sort;
   }
 
   @Override
   public String getDefaultSort() {
     return sort;
--- a/mobile/android/services/src/main/java/org/mozilla/gecko/sync/repositories/Server11Repository.java
+++ b/mobile/android/services/src/main/java/org/mozilla/gecko/sync/repositories/Server11Repository.java
@@ -24,38 +24,40 @@ import android.support.annotation.Nullab
  *
  * @author rnewman
  */
 public class Server11Repository extends Repository {
   protected String collection;
   protected URI collectionURI;
   protected final AuthHeaderProvider authHeaderProvider;
   protected final InfoCollections infoCollections;
+  protected final long syncDeadline;
 
   private final InfoConfiguration infoConfiguration;
 
   /**
    * Construct a new repository that fetches and stores against the Sync 1.1. API.
    *
    * @param collection name.
    * @param storageURL full URL to storage endpoint.
    * @param authHeaderProvider to use in requests; may be null.
    * @param infoCollections instance; must not be null.
    * @throws URISyntaxException
    */
-  public Server11Repository(@NonNull String collection, @NonNull String storageURL, AuthHeaderProvider authHeaderProvider, @NonNull InfoCollections infoCollections, @NonNull InfoConfiguration infoConfiguration) throws URISyntaxException {
+  public Server11Repository(@NonNull String collection, long syncDeadline, @NonNull String storageURL, AuthHeaderProvider authHeaderProvider, @NonNull InfoCollections infoCollections, @NonNull InfoConfiguration infoConfiguration) throws URISyntaxException {
     if (collection == null) {
       throw new IllegalArgumentException("collection must not be null");
     }
     if (storageURL == null) {
       throw new IllegalArgumentException("storageURL must not be null");
     }
     if (infoCollections == null) {
       throw new IllegalArgumentException("infoCollections must not be null");
     }
+    this.syncDeadline = syncDeadline;
     this.collection = collection;
     this.collectionURI = new URI(storageURL + (storageURL.endsWith("/") ? collection : "/" + collection));
     this.authHeaderProvider = authHeaderProvider;
     this.infoCollections = infoCollections;
     this.infoConfiguration = infoConfiguration;
   }
 
   @Override
@@ -128,16 +130,20 @@ public class Server11Repository extends 
   public AuthHeaderProvider getAuthHeaderProvider() {
     return authHeaderProvider;
   }
 
   public boolean updateNeeded(long lastSyncTimestamp) {
     return infoCollections.updateNeeded(collection, lastSyncTimestamp);
   }
 
+  public long getSyncDeadline() {
+    return syncDeadline;
+  }
+
   @Nullable
   public Long getCollectionLastModified() {
     return infoCollections.getTimestamp(collection);
   }
 
   public InfoConfiguration getInfoConfiguration() {
     return infoConfiguration;
   }
--- a/mobile/android/services/src/main/java/org/mozilla/gecko/sync/repositories/downloaders/BatchingDownloader.java
+++ b/mobile/android/services/src/main/java/org/mozilla/gecko/sync/repositories/downloaders/BatchingDownloader.java
@@ -16,19 +16,21 @@ import org.mozilla.gecko.sync.net.SyncSt
 import org.mozilla.gecko.sync.repositories.Server11Repository;
 import org.mozilla.gecko.sync.repositories.Server11RepositorySession;
 import org.mozilla.gecko.sync.repositories.delegates.RepositorySessionFetchRecordsDelegate;
 
 import java.io.UnsupportedEncodingException;
 import java.net.URI;
 import java.net.URISyntaxException;
 import java.net.URLEncoder;
+import java.util.ArrayList;
 import java.util.Collections;
 import java.util.ConcurrentModificationException;
 import java.util.HashSet;
+import java.util.List;
 import java.util.Set;
 
 /**
  * Batching Downloader, which implements batching protocol as supported by Sync 1.5.
  *
  * Downloader's batching behaviour is configured via two parameters, obtained from the repository:
  * - Per-batch limit, which specified how many records may be fetched in an individual GET request.
  * - Total limit, which controls number of batch GET requests we will make.
@@ -57,16 +59,18 @@ public class BatchingDownloader {
     private final Server11RepositorySession repositorySession;
     private final DelayedWorkTracker workTracker = new DelayedWorkTracker();
     // Used to track outstanding requests, so that we can abort them as needed.
     @VisibleForTesting
     protected final Set<SyncStorageCollectionRequest> pending = Collections.synchronizedSet(new HashSet<SyncStorageCollectionRequest>());
     /* @GuardedBy("this") */ private String lastModified;
     /* @GuardedBy("this") */ private long numRecords = 0;
 
+    private final List<Long> batchDurations = Collections.synchronizedList(new ArrayList<Long>());
+
     public BatchingDownloader(final Server11Repository repository, final Server11RepositorySession repositorySession) {
         this.repository = repository;
         this.repositorySession = repositorySession;
     }
 
     @VisibleForTesting
     protected static String flattenIDs(String[] guids) {
         // Consider using Utils.toDelimitedString if and when the signature changes
@@ -95,17 +99,17 @@ public class BatchingDownloader {
                                     SyncStorageCollectionRequest request,
                                     RepositorySessionFetchRecordsDelegate fetchRecordsDelegate)
             throws URISyntaxException, UnsupportedEncodingException {
         if (batchLimit > repository.getDefaultTotalLimit()) {
             throw new IllegalArgumentException("Batch limit should not be greater than total limit");
         }
 
         request.delegate = new BatchingDownloaderDelegate(this, fetchRecordsDelegate, request,
-                newer, batchLimit, full, sort, ids);
+                newer, batchLimit, full, sort, ids, System.currentTimeMillis());
         this.pending.add(request);
         request.get();
     }
 
     @VisibleForTesting
     @Nullable
     protected String encodeParam(String param) throws UnsupportedEncodingException {
         if (param != null) {
@@ -161,27 +165,30 @@ public class BatchingDownloader {
 
     public Server11Repository getServerRepository() {
         return this.repository;
     }
 
     public void onFetchCompleted(SyncStorageResponse response,
                                  final RepositorySessionFetchRecordsDelegate fetchRecordsDelegate,
                                  final SyncStorageCollectionRequest request, long newer,
-                                 long limit, boolean full, String sort, String ids) {
+                                 long limit, boolean full, String sort, String ids, long requestStartTime) {
         removeRequestFromPending(request);
 
         // When we process our first request, we get back a X-Last-Modified header indicating when collection was modified last.
         // We pass it to the server with every subsequent request (if we need to make more) as the X-If-Unmodified-Since header,
         // and server is supposed to ensure that this pre-condition is met, and fail our request with a 412 error code otherwise.
         // So, if all of this happens, these checks should never fail.
         // However, we also track this header in client side, and can defensively validate against it here as well.
         final String currentLastModifiedTimestamp = response.lastModified();
         Logger.debug(LOG_TAG, "Last modified timestamp " + currentLastModifiedTimestamp);
 
+        // Record how long this batch fetch took, including time to store records locally.
+        batchDurations.add(System.currentTimeMillis() - requestStartTime);
+
         // Sanity check. We also did a null check in delegate before passing it into here.
         if (currentLastModifiedTimestamp == null) {
             this.abort(
                     fetchRecordsDelegate,
                     new IllegalStateException("Last modified timestamp is missing")
             );
             return;
         }
@@ -223,24 +230,30 @@ public class BatchingDownloader {
                     Logger.debug(LOG_TAG, "Delayed onFetchCompleted running.");
                     fetchRecordsDelegate.onFetchCompleted(normalizedTimestamp);
                 }
             });
             return;
         }
 
         // We need to make another batching request!
-        // Let the delegate know that a batch just completed before we proceed.
+        // Let the delegate know that a batch fetch just completed before we proceed.
         this.workTracker.incrementOutstanding();
         try {
             fetchRecordsDelegate.onBatchCompleted();
         } finally {
             this.workTracker.decrementOutstanding();
         }
 
+        // Should we proceed, however? Do we have enough time?
+        if (!mayProceedWithBatching(new ArrayList<>(batchDurations), repository.getSyncDeadline())) {
+            this.abort(fetchRecordsDelegate, new Exception("Not enough time to complete next batch"));
+            return;
+        }
+
         // Create and execute new batch request.
         try {
             final SyncStorageCollectionRequest newRequest = makeSyncStorageCollectionRequest(newer,
                     limit, full, sort, ids, offset);
             this.fetchWithParameters(newer, limit, full, sort, ids, newRequest, fetchRecordsDelegate);
         } catch (final URISyntaxException | UnsupportedEncodingException e) {
             this.workTracker.delayWorkItem(new Runnable() {
                 @Override
@@ -307,9 +320,21 @@ public class BatchingDownloader {
         this.workTracker.delayWorkItem(new Runnable() {
             @Override
             public void run() {
                 Logger.debug(LOG_TAG, "Delayed onFetchCompleted running.");
                 delegate.onFetchFailed(exception);
             }
         });
     }
+
+    private static boolean mayProceedWithBatching(final List<Long> durations, long deadline) {
+        final double TIME_LEFT_BUFFER_FACTOR = 1.5;
+        long sum = 0;
+        for (long duration : durations) {
+            sum += duration;
+        }
+        final long averageDuration = sum / durations.size();
+        final long timeLeft = deadline - System.currentTimeMillis();
+
+        return timeLeft > averageDuration * TIME_LEFT_BUFFER_FACTOR;
+    }
 }
--- a/mobile/android/services/src/main/java/org/mozilla/gecko/sync/repositories/downloaders/BatchingDownloaderDelegate.java
+++ b/mobile/android/services/src/main/java/org/mozilla/gecko/sync/repositories/downloaders/BatchingDownloaderDelegate.java
@@ -17,38 +17,41 @@ import org.mozilla.gecko.sync.repositori
 import java.util.ConcurrentModificationException;
 
 /**
  * Delegate that gets passed into fetch methods to handle server response from fetch.
  */
 public class BatchingDownloaderDelegate extends WBOCollectionRequestDelegate {
     public static final String LOG_TAG = "BatchingDownloaderDelegate";
 
-    private BatchingDownloader downloader;
-    private RepositorySessionFetchRecordsDelegate fetchRecordsDelegate;
-    public SyncStorageCollectionRequest request;
+    private final BatchingDownloader downloader;
+    private final RepositorySessionFetchRecordsDelegate fetchRecordsDelegate;
+    public final SyncStorageCollectionRequest request;
     // Used to pass back to BatchDownloader to start another fetch with these parameters if needed.
-    private long newer;
-    private long batchLimit;
-    private boolean full;
-    private String sort;
-    private String ids;
+    private final long newer;
+    private final long batchLimit;
+    private final boolean full;
+    private final String sort;
+    private final String ids;
+    private final long requestStartTime;
 
     public BatchingDownloaderDelegate(final BatchingDownloader downloader,
                                       final RepositorySessionFetchRecordsDelegate fetchRecordsDelegate,
                                       final SyncStorageCollectionRequest request, long newer,
-                                      long batchLimit, boolean full, String sort, String ids) {
+                                      long batchLimit, boolean full, String sort, String ids,
+                                      long requestStartTime) {
         this.downloader = downloader;
         this.fetchRecordsDelegate = fetchRecordsDelegate;
         this.request = request;
         this.newer = newer;
         this.batchLimit = batchLimit;
         this.full = full;
         this.sort = sort;
         this.ids = ids;
+        this.requestStartTime = requestStartTime;
     }
 
     @Override
     public AuthHeaderProvider getAuthHeaderProvider() {
         return this.downloader.getServerRepository().getAuthHeaderProvider();
     }
 
     @Override
@@ -56,17 +59,17 @@ public class BatchingDownloaderDelegate 
         return this.downloader.getLastModified();
     }
 
     @Override
     public void handleRequestSuccess(SyncStorageResponse response) {
         Logger.debug(LOG_TAG, "Fetch done.");
         if (response.lastModified() != null) {
             this.downloader.onFetchCompleted(response, this.fetchRecordsDelegate, this.request,
-                    this.newer, this.batchLimit, this.full, this.sort, this.ids);
+                    this.newer, this.batchLimit, this.full, this.sort, this.ids, this.requestStartTime);
             return;
         }
         this.downloader.onFetchFailed(
                 new IllegalStateException("Missing last modified header from response"),
                 this.fetchRecordsDelegate,
                 this.request);
     }
 
--- a/mobile/android/services/src/main/java/org/mozilla/gecko/sync/stage/AndroidBrowserBookmarksServerSyncStage.java
+++ b/mobile/android/services/src/main/java/org/mozilla/gecko/sync/stage/AndroidBrowserBookmarksServerSyncStage.java
@@ -47,30 +47,32 @@ public class AndroidBrowserBookmarksServ
   @Override
   protected Repository getRemoteRepository() throws URISyntaxException {
     // If this is a first sync, we need to check server counts to make sure that we aren't
     // going to screw up. SafeConstrainedServer11Repository does this. See Bug 814331.
     AuthHeaderProvider authHeaderProvider = session.getAuthHeaderProvider();
     final JSONRecordFetcher countsFetcher = new JSONRecordFetcher(session.config.infoCollectionCountsURL(), authHeaderProvider);
     String collection = getCollection();
     return new SafeConstrainedServer11Repository(
-        collection,
-        session.config.storageURL(),
-        session.getAuthHeaderProvider(),
-        session.config.infoCollections,
-        session.config.infoConfiguration,
-        BOOKMARKS_BATCH_LIMIT,
-        BOOKMARKS_TOTAL_LIMIT,
-        BOOKMARKS_SORT,
-        countsFetcher);
+            collection,
+            session.getSyncDeadline(),
+            session.config.storageURL(),
+            session.getAuthHeaderProvider(),
+            session.config.infoCollections,
+            session.config.infoConfiguration,
+            BOOKMARKS_BATCH_LIMIT,
+            BOOKMARKS_TOTAL_LIMIT,
+            BOOKMARKS_SORT,
+            countsFetcher);
   }
 
   @Override
   protected Repository getLocalRepository() {
     return new BufferingMiddlewareRepository(
+            session.getSyncDeadline(),
             new NoOpConsistencyChecker(),
             new MemoryBufferStorage(),
             new AndroidBrowserBookmarksRepository()
     );
   }
 
   @Override
   protected RecordFactory getRecordFactory() {
--- a/mobile/android/services/src/main/java/org/mozilla/gecko/sync/stage/AndroidBrowserHistoryServerSyncStage.java
+++ b/mobile/android/services/src/main/java/org/mozilla/gecko/sync/stage/AndroidBrowserHistoryServerSyncStage.java
@@ -20,18 +20,18 @@ import org.mozilla.gecko.sync.repositori
 public class AndroidBrowserHistoryServerSyncStage extends ServerSyncStage {
   protected static final String LOG_TAG = "HistoryStage";
 
   // Eventually this kind of sync stage will be data-driven,
   // and all this hard-coding can go away.
   private static final String HISTORY_SORT          = "oldest";
   // Sanity limit. Batch and total limit are the same for now, and will be adjusted
   // once buffer and high water mark are in place. See Bug 730142.
-  private static final long HISTORY_BATCH_LIMIT = 250;
-  private static final long HISTORY_TOTAL_LIMIT = 250;
+  private static final long HISTORY_BATCH_LIMIT = 500;
+  private static final long HISTORY_TOTAL_LIMIT = 10000;
 
   @Override
   protected String getCollection() {
     return "history";
   }
 
   @Override
   protected String getEngineName() {
@@ -40,36 +40,37 @@ public class AndroidBrowserHistoryServer
 
   @Override
   public Integer getStorageVersion() {
     return VersionConstants.HISTORY_ENGINE_VERSION;
   }
 
   @Override
   protected Repository getLocalRepository() {
-    // TODO this needs a green-light function and a good story around it.
     return new BufferingMiddlewareRepository(
+            session.getSyncDeadline(),
             new NoOpConsistencyChecker(),
             new MemoryBufferStorage(),
             new AndroidBrowserHistoryRepository()
     );
   }
 
   @Override
   protected Repository getRemoteRepository() throws URISyntaxException {
     String collection = getCollection();
     return new ConstrainedServer11Repository(
-                                             collection,
-                                             session.config.storageURL(),
-                                             session.getAuthHeaderProvider(),
-                                             session.config.infoCollections,
-                                             session.config.infoConfiguration,
-                                             HISTORY_BATCH_LIMIT,
-                                             HISTORY_TOTAL_LIMIT,
-                                             HISTORY_SORT);
+            collection,
+            session.getSyncDeadline(),
+            session.config.storageURL(),
+            session.getAuthHeaderProvider(),
+            session.config.infoCollections,
+            session.config.infoConfiguration,
+            HISTORY_BATCH_LIMIT,
+            HISTORY_TOTAL_LIMIT,
+            HISTORY_SORT);
   }
 
   @Override
   protected RecordFactory getRecordFactory() {
     return new HistoryRecordFactory();
   }
 
   @Override
--- a/mobile/android/services/src/main/java/org/mozilla/gecko/sync/stage/FennecTabsServerSyncStage.java
+++ b/mobile/android/services/src/main/java/org/mozilla/gecko/sync/stage/FennecTabsServerSyncStage.java
@@ -29,16 +29,17 @@ public class FennecTabsServerSyncStage e
   @Override
   public Integer getStorageVersion() {
     return VersionConstants.TABS_ENGINE_VERSION;
   }
 
   @Override
   protected Repository getLocalRepository() {
     return new BufferingMiddlewareRepository(
+            session.getSyncDeadline(),
             new NoOpConsistencyChecker(),
             new MemoryBufferStorage(),
             new FennecTabsRepository(session.getClientsDelegate())
     );
   }
 
   @Override
   protected RecordFactory getRecordFactory() {
--- a/mobile/android/services/src/main/java/org/mozilla/gecko/sync/stage/FormHistoryServerSyncStage.java
+++ b/mobile/android/services/src/main/java/org/mozilla/gecko/sync/stage/FormHistoryServerSyncStage.java
@@ -42,29 +42,31 @@ public class FormHistoryServerSyncStage 
   public Integer getStorageVersion() {
     return VersionConstants.FORMS_ENGINE_VERSION;
   }
 
   @Override
   protected Repository getRemoteRepository() throws URISyntaxException {
     String collection = getCollection();
     return new ConstrainedServer11Repository(
-        collection,
-        session.config.storageURL(),
-        session.getAuthHeaderProvider(),
-        session.config.infoCollections,
-        session.config.infoConfiguration,
-        FORM_HISTORY_BATCH_LIMIT,
-        FORM_HISTORY_TOTAL_LIMIT,
-        FORM_HISTORY_SORT);
+            collection,
+            session.getSyncDeadline(),
+            session.config.storageURL(),
+            session.getAuthHeaderProvider(),
+            session.config.infoCollections,
+            session.config.infoConfiguration,
+            FORM_HISTORY_BATCH_LIMIT,
+            FORM_HISTORY_TOTAL_LIMIT,
+            FORM_HISTORY_SORT);
   }
 
   @Override
   protected Repository getLocalRepository() {
     return new BufferingMiddlewareRepository(
+            session.getSyncDeadline(),
             new NoOpConsistencyChecker(),
             new MemoryBufferStorage(),
             new FormHistoryRepositorySession.FormHistoryRepository()
     );
   }
 
   public class FormHistoryRecordFactory extends RecordFactory {
 
--- a/mobile/android/services/src/main/java/org/mozilla/gecko/sync/stage/PasswordsServerSyncStage.java
+++ b/mobile/android/services/src/main/java/org/mozilla/gecko/sync/stage/PasswordsServerSyncStage.java
@@ -27,16 +27,17 @@ public class PasswordsServerSyncStage ex
   @Override
   public Integer getStorageVersion() {
     return VersionConstants.PASSWORDS_ENGINE_VERSION;
   }
 
   @Override
   protected Repository getLocalRepository() {
     return new BufferingMiddlewareRepository(
+            session.getSyncDeadline(),
             new NoOpConsistencyChecker(),
             new MemoryBufferStorage(),
             new PasswordsRepositorySession.PasswordsRepository()
     );
   }
 
   @Override
   protected RecordFactory getRecordFactory() {
--- a/mobile/android/services/src/main/java/org/mozilla/gecko/sync/stage/SafeConstrainedServer11Repository.java
+++ b/mobile/android/services/src/main/java/org/mozilla/gecko/sync/stage/SafeConstrainedServer11Repository.java
@@ -30,26 +30,27 @@ import android.content.Context;
  * "First sync" means that our sync timestamp is not greater than zero.
  */
 public class SafeConstrainedServer11Repository extends ConstrainedServer11Repository {
 
   // This can be lazily evaluated if we need it.
   private final JSONRecordFetcher countFetcher;
 
   public SafeConstrainedServer11Repository(String collection,
+                                           long syncDeadline,
                                            String storageURL,
                                            AuthHeaderProvider authHeaderProvider,
                                            InfoCollections infoCollections,
                                            InfoConfiguration infoConfiguration,
                                            long batchLimit,
                                            long totalLimit,
                                            String sort,
                                            JSONRecordFetcher countFetcher)
     throws URISyntaxException {
-    super(collection, storageURL, authHeaderProvider, infoCollections, infoConfiguration,
+    super(collection, syncDeadline, storageURL, authHeaderProvider, infoCollections, infoConfiguration,
             batchLimit, totalLimit, sort);
     if (countFetcher == null) {
       throw new IllegalArgumentException("countFetcher must not be null");
     }
     this.countFetcher = countFetcher;
   }
 
   @Override
--- a/mobile/android/services/src/main/java/org/mozilla/gecko/sync/stage/ServerSyncStage.java
+++ b/mobile/android/services/src/main/java/org/mozilla/gecko/sync/stage/ServerSyncStage.java
@@ -138,16 +138,17 @@ public abstract class ServerSyncStage ex
   protected abstract String getEngineName();
   protected abstract Repository getLocalRepository();
   protected abstract RecordFactory getRecordFactory();
 
   // Override this in subclasses.
   protected Repository getRemoteRepository() throws URISyntaxException {
     String collection = getCollection();
     return new Server11Repository(collection,
+                                  session.getSyncDeadline(),
                                   session.config.storageURL(),
                                   session.getAuthHeaderProvider(),
                                   session.config.infoCollections,
                                   session.config.infoConfiguration);
   }
 
   /**
    * Return a Crypto5Middleware-wrapped Server11Repository.