--- a/mobile/android/base/android-services.mozbuild
+++ b/mobile/android/base/android-services.mozbuild
@@ -992,37 +992,41 @@ sync_java_files = [TOPSRCDIR + '/mobile/
'sync/repositories/domain/PasswordRecord.java',
'sync/repositories/domain/PasswordRecordFactory.java',
'sync/repositories/domain/Record.java',
'sync/repositories/domain/RecordParseException.java',
'sync/repositories/domain/TabsRecord.java',
'sync/repositories/domain/TabsRecordFactory.java',
'sync/repositories/domain/VersionConstants.java',
'sync/repositories/downloaders/BatchingDownloader.java',
+ 'sync/repositories/downloaders/BatchingDownloaderController.java',
'sync/repositories/downloaders/BatchingDownloaderDelegate.java',
'sync/repositories/FetchFailedException.java',
'sync/repositories/HashSetStoreTracker.java',
'sync/repositories/HistoryRepository.java',
'sync/repositories/IdentityRecordFactory.java',
'sync/repositories/InactiveSessionException.java',
'sync/repositories/InvalidBookmarkTypeException.java',
'sync/repositories/InvalidRequestException.java',
'sync/repositories/InvalidSessionTransitionException.java',
'sync/repositories/MultipleRecordsForGuidException.java',
'sync/repositories/NoContentProviderException.java',
'sync/repositories/NoGuidForIdException.java',
+ 'sync/repositories/NonPersistentRepositoryStateProvider.java',
'sync/repositories/NoStoreDelegateException.java',
'sync/repositories/NullCursorException.java',
'sync/repositories/ParentNotFoundException.java',
+ 'sync/repositories/PersistentRepositoryStateProvider.java',
'sync/repositories/ProfileDatabaseException.java',
'sync/repositories/RecordFactory.java',
'sync/repositories/RecordFilter.java',
'sync/repositories/Repository.java',
'sync/repositories/RepositorySession.java',
'sync/repositories/RepositorySessionBundle.java',
+ 'sync/repositories/RepositoryStateProvider.java',
'sync/repositories/Server15Repository.java',
'sync/repositories/Server15RepositorySession.java',
'sync/repositories/StoreFailedException.java',
'sync/repositories/StoreTracker.java',
'sync/repositories/StoreTrackingRepositorySession.java',
'sync/repositories/uploaders/BatchingUploader.java',
'sync/repositories/uploaders/BatchMeta.java',
'sync/repositories/uploaders/BufferSizeTracker.java',
--- a/mobile/android/services/src/main/java/org/mozilla/gecko/sync/middleware/BufferingMiddlewareRepositorySession.java
+++ b/mobile/android/services/src/main/java/org/mozilla/gecko/sync/middleware/BufferingMiddlewareRepositorySession.java
@@ -153,21 +153,16 @@ import java.util.concurrent.Executors;
}
@Override
public void setStoreDelegate(RepositorySessionStoreDelegate delegate) {
inner.setStoreDelegate(delegate);
this.storeDelegate = delegate;
}
- @Override
- public long getHighWaterMarkTimestamp() {
- return bufferStorage.latestModifiedTimestamp();
- }
-
private boolean mayProceedToMergeBuffer() {
// If our buffer storage is not persistent, disallowing merging after buffer has been filled
// means throwing away records only to re-download them later.
// In this case allow merge to proceed even if we're past the deadline.
if (!bufferStorage.isPersistent()) {
return true;
}
--- a/mobile/android/services/src/main/java/org/mozilla/gecko/sync/middleware/storage/BufferStorage.java
+++ b/mobile/android/services/src/main/java/org/mozilla/gecko/sync/middleware/storage/BufferStorage.java
@@ -22,14 +22,10 @@ public interface BufferStorage {
// NB: For a database-backed storage, "replace" happens at a transaction level.
void addOrReplace(Record record);
// For database-backed implementations, commits any records that came in up to this point.
void flush();
void clear();
- // For buffers that are filled up oldest-first this is a high water mark, which enables resuming
- // a sync.
- long latestModifiedTimestamp();
-
boolean isPersistent();
}
--- a/mobile/android/services/src/main/java/org/mozilla/gecko/sync/middleware/storage/MemoryBufferStorage.java
+++ b/mobile/android/services/src/main/java/org/mozilla/gecko/sync/middleware/storage/MemoryBufferStorage.java
@@ -43,28 +43,9 @@ public class MemoryBufferStorage impleme
public void flush() {
// This is a no-op; flush intended for database-backed stores.
}
@Override
public void clear() {
recordBuffer.clear();
}
-
- @Override
- public long latestModifiedTimestamp() {
- long lastModified = 0;
-
- synchronized (recordBuffer) {
- if (recordBuffer.size() == 0) {
- return lastModified;
- }
-
- for (Record record : recordBuffer.values()) {
- if (record.lastModified > lastModified) {
- lastModified = record.lastModified;
- }
- }
- }
-
- return lastModified;
- }
}
--- a/mobile/android/services/src/main/java/org/mozilla/gecko/sync/repositories/ConfigurableServer15Repository.java
+++ b/mobile/android/services/src/main/java/org/mozilla/gecko/sync/repositories/ConfigurableServer15Repository.java
@@ -4,59 +4,81 @@
package org.mozilla.gecko.sync.repositories;
import java.net.URISyntaxException;
import org.mozilla.gecko.sync.InfoCollections;
import org.mozilla.gecko.sync.InfoConfiguration;
import org.mozilla.gecko.sync.net.AuthHeaderProvider;
+import org.mozilla.gecko.sync.stage.ServerSyncStage;
/**
- * A kind of Server15Repository that supports explicit setting of per-batch fetch limit,
- * batching mode (single batch vs multi-batch), and a sort order.
+ * A kind of Server15Repository that supports explicit setting of:
+ * - per-batch fetch limit
+ * - batching mode (single batch vs multi-batch)
+ * - sort order
+ * - repository state provider (persistent vs non-persistent)
+ * - whereas use of high-water-mark is allowed
*
* @author rnewman
*
*/
public class ConfigurableServer15Repository extends Server15Repository {
private final String sortOrder;
private final long batchLimit;
- private final boolean allowMultipleBatches;
+ private final ServerSyncStage.MultipleBatches multipleBatches;
+ private final ServerSyncStage.HighWaterMark highWaterMark;
public ConfigurableServer15Repository(
String collection,
long syncDeadline,
String storageURL,
AuthHeaderProvider authHeaderProvider,
InfoCollections infoCollections,
InfoConfiguration infoConfiguration,
long batchLimit,
String sort,
- boolean allowMultipleBatches) throws URISyntaxException {
+ ServerSyncStage.MultipleBatches multipleBatches,
+ ServerSyncStage.HighWaterMark highWaterMark,
+ RepositoryStateProvider stateProvider) throws URISyntaxException {
super(
collection,
syncDeadline,
storageURL,
authHeaderProvider,
infoCollections,
- infoConfiguration
+ infoConfiguration,
+ stateProvider
);
this.batchLimit = batchLimit;
this.sortOrder = sort;
- this.allowMultipleBatches = allowMultipleBatches;
+ this.multipleBatches = multipleBatches;
+ this.highWaterMark = highWaterMark;
+
+ // Sanity check: let's ensure we're configured correctly. At this point in time, it doesn't make
+ // sense to use H.W.M. with a non-persistent state provider. This might change if we start retrying
+ // during a download in case of 412s.
+ if (!stateProvider.isPersistent() && highWaterMark.equals(ServerSyncStage.HighWaterMark.Enabled)) {
+ throw new IllegalArgumentException("Can not use H.W.M. with NonPersistentRepositoryStateProvider");
+ }
}
@Override
public String getSortOrder() {
return sortOrder;
}
@Override
public Long getBatchLimit() {
return batchLimit;
}
@Override
public boolean getAllowMultipleBatches() {
- return allowMultipleBatches;
+ return multipleBatches.equals(ServerSyncStage.MultipleBatches.Enabled);
+ }
+
+ @Override
+ public boolean getAllowHighWaterMark() {
+ return highWaterMark.equals(ServerSyncStage.HighWaterMark.Enabled);
}
}
new file mode 100644
--- /dev/null
+++ b/mobile/android/services/src/main/java/org/mozilla/gecko/sync/repositories/NonPersistentRepositoryStateProvider.java
@@ -0,0 +1,76 @@
+/* This Source Code Form is subject to the terms of the Mozilla Public
+ * License, v. 2.0. If a copy of the MPL was not distributed with this
+ * file, You can obtain one at http://mozilla.org/MPL/2.0/. */
+
+package org.mozilla.gecko.sync.repositories;
+
+import android.support.annotation.Nullable;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * Simple non-persistent implementation of a repository state provider.
+ *
+ * Just like in the persistent implementation, changes to values are visible only after a commit.
+ *
+ * @author grisha
+ */
+public class NonPersistentRepositoryStateProvider implements RepositoryStateProvider {
+ // We'll have at least OFFSET and H.W.M. values set.
+ private final int INITIAL_CAPACITY = 2;
+ private final Map<String, Object> nonCommittedValuesMap = Collections.synchronizedMap(
+ new HashMap<String, Object>(INITIAL_CAPACITY)
+ );
+
+ // NB: Any changes are made by creating a new map instead of altering an existing one.
+ private volatile Map<String, Object> committedValuesMap = new HashMap<>(INITIAL_CAPACITY);
+
+ @Override
+ public boolean isPersistent() {
+ return false;
+ }
+
+ @Override
+ public boolean commit() {
+ committedValuesMap = new HashMap<>(nonCommittedValuesMap);
+ return true;
+ }
+
+ @Override
+ public NonPersistentRepositoryStateProvider clear(String key) {
+ nonCommittedValuesMap.remove(key);
+ return this;
+ }
+
+ @Override
+ public NonPersistentRepositoryStateProvider setString(String key, String value) {
+ nonCommittedValuesMap.put(key, value);
+ return this;
+ }
+
+ @Nullable
+ @Override
+ public String getString(String key) {
+ return (String) committedValuesMap.get(key);
+ }
+
+ @Override
+ public NonPersistentRepositoryStateProvider setLong(String key, Long value) {
+ nonCommittedValuesMap.put(key, value);
+ return this;
+ }
+
+ @Nullable
+ @Override
+ public Long getLong(String key) {
+ return (Long) committedValuesMap.get(key);
+ }
+
+ @Override
+ public boolean resetAndCommit() {
+ nonCommittedValuesMap.clear();
+ return commit();
+ }
+}
new file mode 100644
--- /dev/null
+++ b/mobile/android/services/src/main/java/org/mozilla/gecko/sync/repositories/PersistentRepositoryStateProvider.java
@@ -0,0 +1,83 @@
+/* This Source Code Form is subject to the terms of the Mozilla Public
+ * License, v. 2.0. If a copy of the MPL was not distributed with this
+ * file, You can obtain one at http://mozilla.org/MPL/2.0/. */
+
+package org.mozilla.gecko.sync.repositories;
+
+import android.support.annotation.Nullable;
+
+import org.mozilla.gecko.background.common.PrefsBranch;
+
+/**
+ * Simple persistent implementation of a repository state provider.
+ * Uses provided PrefsBranch object in order to persist values.
+ *
+ * Values must be committed before they become visible via getters.
+ * It is a caller's responsibility to perform a commit.
+ *
+ * @author grisha
+ */
+public class PersistentRepositoryStateProvider implements RepositoryStateProvider {
+ private final PrefsBranch prefs;
+
+ private final PrefsBranch.Editor editor;
+
+ public PersistentRepositoryStateProvider(PrefsBranch prefs) {
+ this.prefs = prefs;
+ // NB: It is a caller's responsibility to commit any changes it performs via setters.
+ this.editor = prefs.edit();
+ }
+
+ @Override
+ public boolean isPersistent() {
+ return true;
+ }
+
+ @Override
+ public boolean commit() {
+ return this.editor.commit();
+ }
+
+ @Override
+ public PersistentRepositoryStateProvider clear(String key) {
+ this.editor.remove(key);
+ return this;
+ }
+
+ @Override
+ public PersistentRepositoryStateProvider setString(String key, String value) {
+ this.editor.putString(key, value);
+ return this;
+ }
+
+ @Nullable
+ @Override
+ public String getString(String key) {
+ return this.prefs.getString(key, null);
+ }
+
+ @Override
+ public PersistentRepositoryStateProvider setLong(String key, Long value) {
+ this.editor.putLong(key, value);
+ return this;
+ }
+
+ @Nullable
+ @Override
+ public Long getLong(String key) {
+ if (!this.prefs.contains(key)) {
+ return null;
+ }
+ return this.prefs.getLong(key, 0);
+ }
+
+ @Override
+ public boolean resetAndCommit() {
+ return this.editor
+ .remove(KEY_HIGH_WATER_MARK)
+ .remove(KEY_OFFSET)
+ .remove(KEY_OFFSET_ORDER)
+ .remove(KEY_OFFSET_SINCE)
+ .commit();
+ }
+}
--- a/mobile/android/services/src/main/java/org/mozilla/gecko/sync/repositories/RepositorySession.java
+++ b/mobile/android/services/src/main/java/org/mozilla/gecko/sync/repositories/RepositorySession.java
@@ -71,21 +71,16 @@ public abstract class RepositorySession
// The time that the last sync on this collection completed, in milliseconds since epoch.
private long lastSyncTimestamp = 0;
public long getLastSyncTimestamp() {
return lastSyncTimestamp;
}
- // Override this in the buffering wrappers.
- public long getHighWaterMarkTimestamp() {
- return 0;
- }
-
public static long now() {
return System.currentTimeMillis();
}
public RepositorySession(Repository repository) {
this.repository = repository;
}
new file mode 100644
--- /dev/null
+++ b/mobile/android/services/src/main/java/org/mozilla/gecko/sync/repositories/RepositoryStateProvider.java
@@ -0,0 +1,47 @@
+/* This Source Code Form is subject to the terms of the Mozilla Public
+ * License, v. 2.0. If a copy of the MPL was not distributed with this
+ * file, You can obtain one at http://mozilla.org/MPL/2.0/. */
+
+package org.mozilla.gecko.sync.repositories;
+
+import android.support.annotation.CheckResult;
+import android.support.annotation.Nullable;
+
+/**
+ * Interface describing a repository state provider.
+ * Repository's state might consist of a number of key-value pairs.
+ *
+ * Currently there are two types of implementations: persistent and non-persistent state.
+ * Persistent state survives between syncs, and is currently used by the BatchingDownloader to
+ * resume downloads in case of interruptions. Non-persistent state is used when resuming downloads
+ * is not possible.
+ *
+ * In order to safely use a persistent state provider for resuming downloads, a sync stage must match
+ * the following criteria:
+ * - records are downloaded with sort=oldest
+ * - records must be downloaded into a persistent buffer, or applied to live storage
+ *
+ * @author grisha
+ */
+public interface RepositoryStateProvider {
+ String KEY_HIGH_WATER_MARK = "highWaterMark";
+ String KEY_OFFSET = "offset";
+ String KEY_OFFSET_SINCE = "offsetSince";
+ String KEY_OFFSET_ORDER = "offsetOrder";
+
+ boolean isPersistent();
+
+ @CheckResult
+ boolean commit();
+
+ RepositoryStateProvider clear(String key);
+
+ RepositoryStateProvider setString(String key, String value);
+ @Nullable String getString(String key);
+
+ RepositoryStateProvider setLong(String key, Long value);
+ @Nullable Long getLong(String key);
+
+ @CheckResult
+ boolean resetAndCommit();
+}
--- a/mobile/android/services/src/main/java/org/mozilla/gecko/sync/repositories/Server15Repository.java
+++ b/mobile/android/services/src/main/java/org/mozilla/gecko/sync/repositories/Server15Repository.java
@@ -26,16 +26,18 @@ public class Server15Repository extends
public final AuthHeaderProvider authHeaderProvider;
private final long syncDeadlineMillis;
/* package-local */ final URI collectionURI;
protected final String collection;
protected final InfoCollections infoCollections;
+ protected RepositoryStateProvider stateProvider;
+
private final InfoConfiguration infoConfiguration;
private final static String DEFAULT_SORT_ORDER = "oldest";
private final static long DEFAULT_BATCH_LIMIT = 100;
/**
* Construct a new repository that fetches and stores against the Sync 1.5 API.
*
* @param collection name.
@@ -45,32 +47,34 @@ public class Server15Repository extends
* @throws URISyntaxException
*/
public Server15Repository(
@NonNull String collection,
long syncDeadlineMillis,
@NonNull String storageURL,
AuthHeaderProvider authHeaderProvider,
@NonNull InfoCollections infoCollections,
- @NonNull InfoConfiguration infoConfiguration) throws URISyntaxException {
+ @NonNull InfoConfiguration infoConfiguration,
+ @NonNull RepositoryStateProvider stateProvider) throws URISyntaxException {
if (collection == null) {
throw new IllegalArgumentException("collection must not be null");
}
if (storageURL == null) {
throw new IllegalArgumentException("storageURL must not be null");
}
if (infoCollections == null) {
throw new IllegalArgumentException("infoCollections must not be null");
}
this.collection = collection;
this.syncDeadlineMillis = syncDeadlineMillis;
this.collectionURI = new URI(storageURL + (storageURL.endsWith("/") ? collection : "/" + collection));
this.authHeaderProvider = authHeaderProvider;
this.infoCollections = infoCollections;
this.infoConfiguration = infoConfiguration;
+ this.stateProvider = stateProvider;
}
@Override
public void createSession(RepositorySessionCreationDelegate delegate,
Context context) {
delegate.onSessionCreated(new Server15RepositorySession(this));
}
@@ -98,16 +102,20 @@ public class Server15Repository extends
public Long getBatchLimit() {
return DEFAULT_BATCH_LIMIT;
}
public boolean getAllowMultipleBatches() {
return true;
}
+ public boolean getAllowHighWaterMark() {
+ return false;
+ }
+
/**
* A point in time by which this repository's session must complete fetch and store operations.
* Particularly pertinent for batching downloads performed by the session (should we fetch
* another batch?) and buffered repositories (do we have enough time to merge what we've downloaded?).
*/
public long getSyncDeadline() {
return syncDeadlineMillis;
}
--- a/mobile/android/services/src/main/java/org/mozilla/gecko/sync/repositories/Server15RepositorySession.java
+++ b/mobile/android/services/src/main/java/org/mozilla/gecko/sync/repositories/Server15RepositorySession.java
@@ -8,29 +8,37 @@ import android.net.Uri;
import org.mozilla.gecko.background.common.log.Logger;
import org.mozilla.gecko.sync.repositories.delegates.RepositorySessionFetchRecordsDelegate;
import org.mozilla.gecko.sync.repositories.delegates.RepositorySessionGuidsSinceDelegate;
import org.mozilla.gecko.sync.repositories.delegates.RepositorySessionStoreDelegate;
import org.mozilla.gecko.sync.repositories.delegates.RepositorySessionWipeDelegate;
import org.mozilla.gecko.sync.repositories.domain.Record;
import org.mozilla.gecko.sync.repositories.downloaders.BatchingDownloader;
+import org.mozilla.gecko.sync.repositories.downloaders.BatchingDownloaderController;
import org.mozilla.gecko.sync.repositories.uploaders.BatchingUploader;
public class Server15RepositorySession extends RepositorySession {
public static final String LOG_TAG = "Server15RepositorySession";
protected final Server15Repository serverRepository;
private BatchingUploader uploader;
private final BatchingDownloader downloader;
public Server15RepositorySession(Repository repository) {
super(repository);
this.serverRepository = (Server15Repository) repository;
- this.downloader = initializeDownloader(this);
+ this.downloader = new BatchingDownloader(
+ this.serverRepository.authHeaderProvider,
+ Uri.parse(this.serverRepository.collectionURI().toString()),
+ this.serverRepository.getSyncDeadline(),
+ this.serverRepository.getAllowMultipleBatches(),
+ this.serverRepository.getAllowHighWaterMark(),
+ this.serverRepository.stateProvider,
+ this);
}
@Override
public void setStoreDelegate(RepositorySessionStoreDelegate storeDelegate) {
super.setStoreDelegate(storeDelegate);
// Now that we have the delegate, we can initialize our uploader.
this.uploader = new BatchingUploader(
@@ -44,17 +52,19 @@ public class Server15RepositorySession e
RepositorySessionGuidsSinceDelegate delegate) {
// TODO Auto-generated method stub
}
@Override
public void fetchSince(long sinceTimestamp,
RepositorySessionFetchRecordsDelegate delegate) {
- this.downloader.fetchSince(
+ BatchingDownloaderController.resumeFetchSinceIfPossible(
+ this.downloader,
+ this.serverRepository.stateProvider,
delegate,
sinceTimestamp,
serverRepository.getBatchLimit(),
serverRepository.getSortOrder()
);
}
@Override
@@ -98,22 +108,37 @@ public class Server15RepositorySession e
// If delegate was set, this shouldn't happen.
if (uploader == null) {
throw new IllegalStateException("Uploader haven't been initialized");
}
uploader.noMoreRecordsToUpload();
}
+ /**
+ * @return Repository's high-water-mark if it's available, its use is allowed by the repository,
+ * repository is set to fetch oldest-first, and it's greater than collection's last-synced timestamp.
+ * Otherwise, returns repository's last-synced timestamp.
+ */
+ @Override
+ public long getLastSyncTimestamp() {
+ if (!serverRepository.getAllowHighWaterMark() || !serverRepository.getSortOrder().equals("oldest")) {
+ return super.getLastSyncTimestamp();
+ }
+
+ final Long highWaterMark = serverRepository.stateProvider.getLong(
+ RepositoryStateProvider.KEY_HIGH_WATER_MARK);
+
+ // After a successful sync we expect that last-synced timestamp for a collection will be greater
+ // than the high-water-mark. High-water-mark is mostly useful in case of resuming a sync,
+ // and if we're resuming we did not bump our last-sync timestamps during the previous sync.
+ if (highWaterMark == null || super.getLastSyncTimestamp() > highWaterMark) {
+ return super.getLastSyncTimestamp();
+ }
+
+ return highWaterMark;
+ }
+
@Override
public boolean dataAvailable() {
return serverRepository.updateNeeded(getLastSyncTimestamp());
}
-
- protected static BatchingDownloader initializeDownloader(final Server15RepositorySession serverRepositorySession) {
- return new BatchingDownloader(
- serverRepositorySession.serverRepository.authHeaderProvider,
- Uri.parse(serverRepositorySession.serverRepository.collectionURI().toString()),
- serverRepositorySession.serverRepository.getSyncDeadline(),
- serverRepositorySession.serverRepository.getAllowMultipleBatches(),
- serverRepositorySession);
- }
}
--- a/mobile/android/services/src/main/java/org/mozilla/gecko/sync/repositories/android/AndroidBrowserHistoryRepositorySession.java
+++ b/mobile/android/services/src/main/java/org/mozilla/gecko/sync/repositories/android/AndroidBrowserHistoryRepositorySession.java
@@ -186,16 +186,38 @@ public class AndroidBrowserHistoryReposi
}
}
@Override
public void storeDone() {
storeDone(System.currentTimeMillis());
}
+ /**
+ * We need to flush our internal buffer of records in case of any interruptions of record flow
+ * from our "source". Downloader might be maintaining a "high-water-mark" based on the records
+ * it tried to store, so it's pertinent that all of the records that were queued for storage
+ * are eventually persisted.
+ */
+ @Override
+ public void storeIncomplete() {
+ storeWorkQueue.execute(new Runnable() {
+ @Override
+ public void run() {
+ synchronized (recordsBufferMonitor) {
+ try {
+ flushNewRecords();
+ } catch (Exception e) {
+ Logger.warn(LOG_TAG, "Error flushing records to database.", e);
+ }
+ }
+ }
+ });
+ }
+
@Override
public void storeDone(final long end) {
storeWorkQueue.execute(new Runnable() {
@Override
public void run() {
synchronized (recordsBufferMonitor) {
try {
flushNewRecords();
--- a/mobile/android/services/src/main/java/org/mozilla/gecko/sync/repositories/downloaders/BatchingDownloader.java
+++ b/mobile/android/services/src/main/java/org/mozilla/gecko/sync/repositories/downloaders/BatchingDownloader.java
@@ -15,16 +15,17 @@ import org.mozilla.gecko.sync.CryptoReco
import org.mozilla.gecko.sync.DelayedWorkTracker;
import org.mozilla.gecko.sync.SyncDeadlineReachedException;
import org.mozilla.gecko.sync.Utils;
import org.mozilla.gecko.sync.net.AuthHeaderProvider;
import org.mozilla.gecko.sync.net.SyncResponse;
import org.mozilla.gecko.sync.net.SyncStorageCollectionRequest;
import org.mozilla.gecko.sync.net.SyncStorageResponse;
import org.mozilla.gecko.sync.repositories.RepositorySession;
+import org.mozilla.gecko.sync.repositories.RepositoryStateProvider;
import org.mozilla.gecko.sync.repositories.delegates.RepositorySessionFetchRecordsDelegate;
import java.io.UnsupportedEncodingException;
import java.net.URI;
import java.net.URISyntaxException;
import java.util.Collections;
import java.util.HashSet;
import java.util.Set;
@@ -37,17 +38,17 @@ import java.util.concurrent.TimeUnit;
* - Per-batch limit, which specified how many records may be fetched in an individual GET request.
* - allowMultipleBatches, which determines if downloader is allowed to perform more than one fetch.
*
* Batching is implemented via specifying a 'limit' GET parameter, and looking for an 'offset' token
* in the response. If offset token is present, this indicates that there are more records than what
* we've received so far, and we perform an additional fetch, if we're allowed to do so by our
* configuration. Batching stops when offset token is no longer present (indicating that we're done).
*
- * If we are not allowed to perform multiple batches, we consider batching to be succesfully complete
+ * If we are not allowed to perform multiple batches, we consider batching to be successfully completed
* after fist fetch request succeeds. Similarly, a trivial case of collection having less records than
* the batch limit will also successfully complete in one fetch.
*
* In between batches, we maintain a Last-Modified timestamp, based off the value returned in the header
* of the first response. Every response will have a Last-Modified header, indicating when the collection
* was modified last. We pass along this header in our subsequent requests in a X-If-Unmodified-Since
* header. Server will ensure that our collection did not change while we are batching, if it did it will
* fail our fetch with a 412 error. Additionally, we perform the same checks locally.
@@ -56,35 +57,42 @@ public class BatchingDownloader {
public static final String LOG_TAG = "BatchingDownloader";
private static final String DEFAULT_SORT_ORDER = "index";
private final RepositorySession repositorySession;
private final DelayedWorkTracker workTracker = new DelayedWorkTracker();
private final Uri baseCollectionUri;
private final long fetchDeadline;
private final boolean allowMultipleBatches;
+ private final boolean keepTrackOfHighWaterMark;
+
+ private RepositoryStateProvider stateProvider;
/* package-local */ final AuthHeaderProvider authHeaderProvider;
// Used to track outstanding requests, so that we can abort them as needed.
@VisibleForTesting
protected final Set<SyncStorageCollectionRequest> pending = Collections.synchronizedSet(new HashSet<SyncStorageCollectionRequest>());
/* @GuardedBy("this") */ private String lastModified;
public BatchingDownloader(
AuthHeaderProvider authHeaderProvider,
Uri baseCollectionUri,
long fetchDeadline,
boolean allowMultipleBatches,
+ boolean keepTrackOfHighWaterMark,
+ RepositoryStateProvider stateProvider,
RepositorySession repositorySession) {
this.repositorySession = repositorySession;
this.authHeaderProvider = authHeaderProvider;
this.baseCollectionUri = baseCollectionUri;
this.allowMultipleBatches = allowMultipleBatches;
+ this.keepTrackOfHighWaterMark = keepTrackOfHighWaterMark;
this.fetchDeadline = fetchDeadline;
+ this.stateProvider = stateProvider;
}
@VisibleForTesting
protected static String flattenIDs(String[] guids) {
// Consider using Utils.toDelimitedString if and when the signature changes
// to Collection<String> guids.
if (guids.length == 0) {
return "";
@@ -125,21 +133,16 @@ public class BatchingDownloader {
String offset)
throws URISyntaxException, UnsupportedEncodingException {
final URI collectionURI = buildCollectionURI(baseCollectionUri, full, newer, batchLimit, sort, ids, offset);
Logger.debug(LOG_TAG, collectionURI.toString());
return new SyncStorageCollectionRequest(collectionURI);
}
- public void fetchSince(RepositorySessionFetchRecordsDelegate fetchRecordsDelegate, long timestamp, long batchLimit, String sortOrder) {
- this.fetchSince(fetchRecordsDelegate, timestamp, batchLimit, sortOrder, null);
- }
-
- @VisibleForTesting
public void fetchSince(RepositorySessionFetchRecordsDelegate fetchRecordsDelegate, long timestamp, long batchLimit, String sortOrder, String offset) {
try {
SyncStorageCollectionRequest request = makeSyncStorageCollectionRequest(timestamp,
batchLimit, true, sortOrder, null, offset);
this.fetchWithParameters(timestamp, batchLimit, true, sortOrder, null, request, fetchRecordsDelegate);
} catch (URISyntaxException | UnsupportedEncodingException e) {
fetchRecordsDelegate.onFetchFailed(e);
}
@@ -194,26 +197,49 @@ public class BatchingDownloader {
}
// If we can (or must) stop batching at this point, let the delegate know that we're all done!
final String offset = response.weaveOffset();
if (offset == null || !allowMultipleBatches) {
final long normalizedTimestamp = response.normalizedTimestampForHeader(SyncResponse.X_LAST_MODIFIED);
Logger.debug(LOG_TAG, "Fetch completed. Timestamp is " + normalizedTimestamp);
+ // This isn't great, but shouldn't be too problematic - but do see notes below.
+ // Failing to reset a resume context after we're done with batching means that on next
+ // sync we'll erroneously try to resume downloading. If resume proceeds, we will fetch
+ // from an older timestamp, but offset by the amount of records we've fetched prior.
+ // Since we're diligent about setting a X-I-U-S header, any remote collection changes
+ // will be caught and we'll receive a 412.
+ if (!BatchingDownloaderController.resetResumeContextAndCommit(this.stateProvider)) {
+ Logger.warn(LOG_TAG, "Failed to reset resume context while completing a batch");
+ }
+
this.workTracker.delayWorkItem(new Runnable() {
@Override
public void run() {
Logger.debug(LOG_TAG, "Delayed onFetchCompleted running.");
fetchRecordsDelegate.onFetchCompleted(normalizedTimestamp);
}
});
return;
}
+ // This is unfortunate, but largely just means that in case we need to resume later on, it
+ // either won't be possible (and we'll fetch w/o resuming), or won't be as efficient (i.e.
+ // we'll download more records than necessary).
+ if (BatchingDownloaderController.isResumeContextSet(this.stateProvider)) {
+ if (!BatchingDownloaderController.updateResumeContextAndCommit(this.stateProvider, offset)) {
+ Logger.warn(LOG_TAG, "Failed to update resume context while processing a batch.");
+ }
+ } else {
+ if (!BatchingDownloaderController.setInitialResumeContextAndCommit(this.stateProvider, offset, newer, sort)) {
+ Logger.warn(LOG_TAG, "Failed to set initial resume context while processing a batch.");
+ }
+ }
+
// We need to make another batching request!
// Let the delegate know that a batch fetch just completed before we proceed.
// This operation needs to run after every call to onFetchedRecord for this batch has been
// processed, hence the delayWorkItem call.
this.workTracker.delayWorkItem(new Runnable() {
@Override
public void run() {
Logger.debug(LOG_TAG, "Running onBatchCompleted.");
@@ -228,16 +254,19 @@ public class BatchingDownloader {
}
// Create and execute new batch request.
try {
final SyncStorageCollectionRequest newRequest = makeSyncStorageCollectionRequest(newer,
limit, full, sort, ids, offset);
this.fetchWithParameters(newer, limit, full, sort, ids, newRequest, fetchRecordsDelegate);
} catch (final URISyntaxException | UnsupportedEncodingException e) {
+ if (!this.stateProvider.commit()) {
+ Logger.warn(LOG_TAG, "Failed to commit repository state while handling request creation error");
+ }
this.workTracker.delayWorkItem(new Runnable() {
@Override
public void run() {
Logger.debug(LOG_TAG, "Delayed onFetchCompleted running.");
fetchRecordsDelegate.onFetchFailed(e);
}
});
}
@@ -248,30 +277,54 @@ public class BatchingDownloader {
handleFetchFailed(fetchRecordsDelegate, ex, null);
}
/* package-local */ void handleFetchFailed(final RepositorySessionFetchRecordsDelegate fetchRecordsDelegate,
final Exception ex,
@Nullable final SyncStorageCollectionRequest request) {
this.removeRequestFromPending(request);
this.abortRequests();
+
+ // Resume context is not discarded if we failed because of reaching our deadline. In this case,
+ // we keep it allowing us to resume our download exactly where we left off.
+ // Discard resume context for all other failures: 412 (concurrent modification), HTTP errors, ...
+ if (!(ex instanceof SyncDeadlineReachedException)) {
+ // Failing to reset context means that we will try to resume once we re-sync current stage.
+ // This won't affect X-I-U-S logic in case of 412 (it's set separately from resume context),
+ // and same notes apply after failing to reset context in onFetchCompleted (see above).
+ if (!BatchingDownloaderController.resetResumeContextAndCommit(stateProvider)) {
+ Logger.warn(LOG_TAG, "Failed to reset resume context while processing a non-deadline exception");
+ }
+ } else {
+ // Failing to commit the context here means that we didn't commit the latest high-water-mark,
+ // and won't be as efficient once we re-sync. That is, we might download more records than necessary.
+ if (!this.stateProvider.commit()) {
+ Logger.warn(LOG_TAG, "Failed to commit resume context while processing a deadline exception");
+ }
+ }
+
this.workTracker.delayWorkItem(new Runnable() {
@Override
public void run() {
Logger.debug(LOG_TAG, "Running onFetchFailed.");
fetchRecordsDelegate.onFetchFailed(ex);
}
});
}
public void onFetchedRecord(CryptoRecord record,
RepositorySessionFetchRecordsDelegate fetchRecordsDelegate) {
this.workTracker.incrementOutstanding();
+
try {
fetchRecordsDelegate.onFetchedRecord(record);
+ // NB: changes to stateProvider are committed in either onFetchCompleted or handleFetchFailed.
+ if (this.keepTrackOfHighWaterMark) {
+ this.stateProvider.setLong(RepositoryStateProvider.KEY_HIGH_WATER_MARK, record.lastModified);
+ }
} catch (Exception ex) {
Logger.warn(LOG_TAG, "Got exception calling onFetchedRecord with WBO.", ex);
throw new RuntimeException(ex);
} finally {
this.workTracker.decrementOutstanding();
}
}
new file mode 100644
--- /dev/null
+++ b/mobile/android/services/src/main/java/org/mozilla/gecko/sync/repositories/downloaders/BatchingDownloaderController.java
@@ -0,0 +1,118 @@
+/* This Source Code Form is subject to the terms of the Mozilla Public
+ * License, v. 2.0. If a copy of the MPL was not distributed with this
+ * file, You can obtain one at http://mozilla.org/MPL/2.0/. */
+
+package org.mozilla.gecko.sync.repositories.downloaders;
+
+import android.support.annotation.CheckResult;
+import android.util.Log;
+
+import org.mozilla.gecko.sync.repositories.RepositoryStateProvider;
+import org.mozilla.gecko.sync.repositories.delegates.RepositorySessionFetchRecordsDelegate;
+
+/**
+ * Encapsulates logic for resuming batching downloads.
+ *
+ * It's possible to resume a batching download if we have an offset token and the context in which
+ * we obtained the offset token did not change. Namely, we ensure that `since` and `order` parameters
+ * remain the same if offset is being used. See Bug 1330839 for a discussion on this.
+ *
+ * @author grisha
+ */
+public class BatchingDownloaderController {
+ private final static String LOG_TAG = "BatchingDownloaderCtrl";
+
+ private BatchingDownloaderController() {}
+
+ private static class ResumeContext {
+ private final String offset;
+ private final Long since;
+ private final String order;
+
+ private ResumeContext(String offset, Long since, String order) {
+ this.offset = offset;
+ this.since = since;
+ this.order = order;
+ }
+ }
+
+ private static ResumeContext getResumeContext(RepositoryStateProvider stateProvider, Long since, String order) {
+ // Build a "default" context around passed-in values if no context is available.
+ if (!isResumeContextSet(stateProvider)) {
+ return new ResumeContext(null, since, order);
+ }
+
+ final String offset = stateProvider.getString(RepositoryStateProvider.KEY_OFFSET);
+ final Long offsetSince = stateProvider.getLong(RepositoryStateProvider.KEY_OFFSET_SINCE);
+ final String offsetOrder = stateProvider.getString(RepositoryStateProvider.KEY_OFFSET_ORDER);
+
+ // If context is still valid, we can use it!
+ if (order.equals(offsetOrder)) {
+ return new ResumeContext(offset, offsetSince, offsetOrder);
+ }
+
+ // Build a "default" context around passed-in values.
+ return new ResumeContext(null, since, order);
+ }
+
+ /**
+ * Resumes a fetch if there is an offset present, and offset's context matches provided values.
+ * Otherwise, performs a regular fetch.
+ */
+ public static void resumeFetchSinceIfPossible(
+ BatchingDownloader downloader,
+ RepositoryStateProvider stateProvider,
+ RepositorySessionFetchRecordsDelegate fetchRecordsDelegate,
+ long since, long limit, String order) {
+ ResumeContext resumeContext = getResumeContext(stateProvider, since, order);
+
+ downloader.fetchSince(
+ fetchRecordsDelegate,
+ resumeContext.since,
+ limit,
+ resumeContext.order,
+ resumeContext.offset
+ );
+ }
+
+ @CheckResult
+ /* package-local */ static boolean setInitialResumeContextAndCommit(RepositoryStateProvider stateProvider, String offset, long since, String order) {
+ if (isResumeContextSet(stateProvider)) {
+ throw new IllegalStateException("Not allowed to set resume context more than once. Use update instead.");
+ }
+
+ return stateProvider
+ .setString(RepositoryStateProvider.KEY_OFFSET, offset)
+ .setLong(RepositoryStateProvider.KEY_OFFSET_SINCE, since)
+ .setString(RepositoryStateProvider.KEY_OFFSET_ORDER, order)
+ .commit();
+ }
+
+ @CheckResult
+ /* package-local */ static boolean updateResumeContextAndCommit(RepositoryStateProvider stateProvider, String offset) {
+ if (!isResumeContextSet(stateProvider)) {
+ throw new IllegalStateException("Tried to update resume context before it was set.");
+ }
+
+ return stateProvider
+ .setString(RepositoryStateProvider.KEY_OFFSET, offset)
+ .commit();
+ }
+
+ @CheckResult
+ /* package-local */ static boolean resetResumeContextAndCommit(RepositoryStateProvider stateProvider) {
+ return stateProvider
+ .clear(RepositoryStateProvider.KEY_OFFSET)
+ .clear(RepositoryStateProvider.KEY_OFFSET_SINCE)
+ .clear(RepositoryStateProvider.KEY_OFFSET_ORDER)
+ .commit();
+ }
+
+ /*package-local */ static boolean isResumeContextSet(RepositoryStateProvider stateProvider) {
+ final String offset = stateProvider.getString(RepositoryStateProvider.KEY_OFFSET);
+ final Long offsetSince = stateProvider.getLong(RepositoryStateProvider.KEY_OFFSET_SINCE);
+ final String offsetOrder = stateProvider.getString(RepositoryStateProvider.KEY_OFFSET_ORDER);
+
+ return offset != null && offsetSince != null && offsetOrder != null;
+ }
+}
--- a/mobile/android/services/src/main/java/org/mozilla/gecko/sync/stage/AndroidBrowserBookmarksServerSyncStage.java
+++ b/mobile/android/services/src/main/java/org/mozilla/gecko/sync/stage/AndroidBrowserBookmarksServerSyncStage.java
@@ -5,16 +5,17 @@
package org.mozilla.gecko.sync.stage;
import java.net.URISyntaxException;
import org.mozilla.gecko.sync.MetaGlobalException;
import org.mozilla.gecko.sync.middleware.BufferingMiddlewareRepository;
import org.mozilla.gecko.sync.middleware.storage.MemoryBufferStorage;
import org.mozilla.gecko.sync.repositories.ConfigurableServer15Repository;
+import org.mozilla.gecko.sync.repositories.NonPersistentRepositoryStateProvider;
import org.mozilla.gecko.sync.repositories.RecordFactory;
import org.mozilla.gecko.sync.repositories.Repository;
import org.mozilla.gecko.sync.repositories.android.AndroidBrowserBookmarksRepository;
import org.mozilla.gecko.sync.repositories.domain.BookmarkRecordFactory;
import org.mozilla.gecko.sync.repositories.domain.VersionConstants;
public class AndroidBrowserBookmarksServerSyncStage extends ServerSyncStage {
protected static final String LOG_TAG = "BookmarksStage";
@@ -34,28 +35,52 @@ public class AndroidBrowserBookmarksServ
return "bookmarks";
}
@Override
public Integer getStorageVersion() {
return VersionConstants.BOOKMARKS_ENGINE_VERSION;
}
+ /**
+ * We're downloading records into a non-persistent buffer for safety, so we can't use a H.W.M.
+ * Once this stage is using a persistent buffer, this should change. See Bug 1318515.
+ *
+ * @return HighWaterMark.Disabled
+ */
+ @Override
+ protected HighWaterMark getAllowedToUseHighWaterMark() {
+ return HighWaterMark.Disabled;
+ }
+
+ /**
+ * Full batching is allowed, because we want all of the records.
+ *
+ * @return MultipleBatches.Enabled
+ */
+ @Override
+ protected MultipleBatches getAllowedMultipleBatches() {
+ return MultipleBatches.Enabled;
+ }
+
@Override
protected Repository getRemoteRepository() throws URISyntaxException {
return new ConfigurableServer15Repository(
getCollection(),
session.getSyncDeadline(),
session.config.storageURL(),
session.getAuthHeaderProvider(),
session.config.infoCollections,
session.config.infoConfiguration,
BOOKMARKS_BATCH_LIMIT,
BOOKMARKS_SORT,
- true /* allow multiple batches */);
+ getAllowedMultipleBatches(),
+ getAllowedToUseHighWaterMark(),
+ getRepositoryStateProvider()
+ );
}
@Override
protected Repository getLocalRepository() {
return new BufferingMiddlewareRepository(
session.getSyncDeadline(),
new MemoryBufferStorage(),
new AndroidBrowserBookmarksRepository()
--- a/mobile/android/services/src/main/java/org/mozilla/gecko/sync/stage/AndroidBrowserHistoryServerSyncStage.java
+++ b/mobile/android/services/src/main/java/org/mozilla/gecko/sync/stage/AndroidBrowserHistoryServerSyncStage.java
@@ -3,18 +3,20 @@
* file, You can obtain one at http://mozilla.org/MPL/2.0/. */
package org.mozilla.gecko.sync.stage;
import java.net.URISyntaxException;
import org.mozilla.gecko.sync.MetaGlobalException;
import org.mozilla.gecko.sync.repositories.ConfigurableServer15Repository;
+import org.mozilla.gecko.sync.repositories.PersistentRepositoryStateProvider;
import org.mozilla.gecko.sync.repositories.RecordFactory;
import org.mozilla.gecko.sync.repositories.Repository;
+import org.mozilla.gecko.sync.repositories.RepositoryStateProvider;
import org.mozilla.gecko.sync.repositories.android.AndroidBrowserHistoryRepository;
import org.mozilla.gecko.sync.repositories.domain.HistoryRecordFactory;
import org.mozilla.gecko.sync.repositories.domain.VersionConstants;
public class AndroidBrowserHistoryServerSyncStage extends ServerSyncStage {
protected static final String LOG_TAG = "HistoryStage";
// Eventually this kind of sync stage will be data-driven,
@@ -37,28 +39,67 @@ public class AndroidBrowserHistoryServer
return VersionConstants.HISTORY_ENGINE_VERSION;
}
@Override
protected Repository getLocalRepository() {
return new AndroidBrowserHistoryRepository();
}
+ /**
+ * We use a persistent state provider for this stage, because it lets us resume interrupted
+ * syncs more efficiently.
+ * We are able to do this because we match criteria described in {@link RepositoryStateProvider}.
+ *
+ * @return Persistent repository state provider.
+ */
+ @Override
+ protected RepositoryStateProvider getRepositoryStateProvider() {
+ return new PersistentRepositoryStateProvider(
+ session.config.getBranch(statePreferencesPrefix())
+ );
+ }
+
+ /**
+ * We're downloading records oldest-first directly into live storage, forgoing any buffering other
+ * than AndroidBrowserHistoryRepository's internal records queue. These conditions allow us to use
+ * high-water-mark to resume downloads in case of interruptions.
+ *
+ * @return HighWaterMark.Enabled
+ */
+ @Override
+ protected HighWaterMark getAllowedToUseHighWaterMark() {
+ return HighWaterMark.Enabled;
+ }
+
+ /**
+ * Full batching is allowed, because we want all of the records.
+ *
+ * @return MultipleBatches.Enabled
+ */
+ @Override
+ protected MultipleBatches getAllowedMultipleBatches() {
+ return MultipleBatches.Enabled;
+ }
+
@Override
protected Repository getRemoteRepository() throws URISyntaxException {
return new ConfigurableServer15Repository(
getCollection(),
session.getSyncDeadline(),
session.config.storageURL(),
session.getAuthHeaderProvider(),
session.config.infoCollections,
session.config.infoConfiguration,
HISTORY_BATCH_LIMIT,
HISTORY_SORT,
- true /* allow multiple batches */);
+ getAllowedMultipleBatches(),
+ getAllowedToUseHighWaterMark(),
+ getRepositoryStateProvider()
+ );
}
@Override
protected RecordFactory getRecordFactory() {
return new HistoryRecordFactory();
}
@Override
--- a/mobile/android/services/src/main/java/org/mozilla/gecko/sync/stage/AndroidBrowserRecentHistoryServerSyncStage.java
+++ b/mobile/android/services/src/main/java/org/mozilla/gecko/sync/stage/AndroidBrowserRecentHistoryServerSyncStage.java
@@ -5,45 +5,79 @@
package org.mozilla.gecko.sync.stage;
import org.mozilla.gecko.sync.MetaGlobalException;
import org.mozilla.gecko.sync.NonObjectJSONException;
import org.mozilla.gecko.sync.SynchronizerConfiguration;
import org.mozilla.gecko.sync.middleware.BufferingMiddlewareRepository;
import org.mozilla.gecko.sync.middleware.storage.MemoryBufferStorage;
import org.mozilla.gecko.sync.repositories.ConfigurableServer15Repository;
+import org.mozilla.gecko.sync.repositories.NonPersistentRepositoryStateProvider;
+import org.mozilla.gecko.sync.repositories.PersistentRepositoryStateProvider;
import org.mozilla.gecko.sync.repositories.Repository;
+import org.mozilla.gecko.sync.repositories.RepositoryStateProvider;
import org.mozilla.gecko.sync.repositories.android.AndroidBrowserHistoryRepository;
import java.io.IOException;
import java.net.URISyntaxException;
/**
* History sync stage which is limited to just recent history, and will only run if the full history
* sync stage did not complete yet. Its purpose is to give users with a lot of history in their
* profiles a good experience during a large collection sync.
*
* @author grisha
*/
public class AndroidBrowserRecentHistoryServerSyncStage extends AndroidBrowserHistoryServerSyncStage {
protected static final String LOG_TAG = "RecentHistoryStage";
- // TODO: Bug 1316110 tracks follow up work to make this stage more efficient.
+ // Bug 1316110 tracks follow up work to generalize this stage and make it more efficient.
private static final int HISTORY_BATCH_LIMIT = 50;
// We need a custom configuration bundle name for this stage, because we want to track last-synced
// timestamp for this stage separately from that of a full history sync stage, yet their collection
// names are the same.
private static final String BUNDLE_NAME = "recentHistory.";
private static final String HISTORY_SORT = "newest";
@Override
public String bundlePrefix() {
return BUNDLE_NAME;
}
+ /**
+ * We use a non-persistent state provider for this stage, as it's designed to just run once.
+ *
+ * @return Non-persistent repository state provider.
+ */
+ @Override
+ protected RepositoryStateProvider getRepositoryStateProvider() {
+ return new NonPersistentRepositoryStateProvider();
+ }
+
+ /**
+ * Force download to be limited to a single batch.
+ * We just to want fetch a batch-worth of records for this stage.
+ *
+ * @return MultipleBatches.Disabled
+ */
+ @Override
+ protected MultipleBatches getAllowedMultipleBatches() {
+ return MultipleBatches.Disabled;
+ }
+
+ /**
+ * Right now this stage is designed to run just once, when there's no history data available.
+ *
+ * @return HighWaterMark.Disabled
+ */
+ @Override
+ protected HighWaterMark getAllowedToUseHighWaterMark() {
+ return HighWaterMark.Disabled;
+ }
+
@Override
protected Repository getLocalRepository() {
return new BufferingMiddlewareRepository(
session.getSyncDeadline(),
new MemoryBufferStorage(),
new AndroidBrowserHistoryRepository()
);
}
@@ -54,17 +88,19 @@ public class AndroidBrowserRecentHistory
getCollection(),
session.getSyncDeadline(),
session.config.storageURL(),
session.getAuthHeaderProvider(),
session.config.infoCollections,
session.config.infoConfiguration,
HISTORY_BATCH_LIMIT,
HISTORY_SORT,
- false /* force single batch only */);
+ getAllowedMultipleBatches(),
+ getAllowedToUseHighWaterMark(),
+ getRepositoryStateProvider());
}
/**
* This stage is only enabled if full history session is enabled and did not complete a sync yet.
*/
@Override
public boolean isEnabled() throws MetaGlobalException {
final boolean historyStageEnabled = super.isEnabled();
--- a/mobile/android/services/src/main/java/org/mozilla/gecko/sync/stage/FormHistoryServerSyncStage.java
+++ b/mobile/android/services/src/main/java/org/mozilla/gecko/sync/stage/FormHistoryServerSyncStage.java
@@ -5,16 +5,17 @@
package org.mozilla.gecko.sync.stage;
import java.net.URISyntaxException;
import org.mozilla.gecko.sync.CryptoRecord;
import org.mozilla.gecko.sync.middleware.BufferingMiddlewareRepository;
import org.mozilla.gecko.sync.middleware.storage.MemoryBufferStorage;
import org.mozilla.gecko.sync.repositories.ConfigurableServer15Repository;
+import org.mozilla.gecko.sync.repositories.NonPersistentRepositoryStateProvider;
import org.mozilla.gecko.sync.repositories.RecordFactory;
import org.mozilla.gecko.sync.repositories.Repository;
import org.mozilla.gecko.sync.repositories.android.FormHistoryRepositorySession;
import org.mozilla.gecko.sync.repositories.domain.FormHistoryRecord;
import org.mozilla.gecko.sync.repositories.domain.Record;
import org.mozilla.gecko.sync.repositories.domain.VersionConstants;
public class FormHistoryServerSyncStage extends ServerSyncStage {
@@ -34,29 +35,53 @@ public class FormHistoryServerSyncStage
return "forms";
}
@Override
public Integer getStorageVersion() {
return VersionConstants.FORMS_ENGINE_VERSION;
}
+ /**
+ * We're downloading records into a non-persistent buffer for safety, so we can't use a H.W.M.
+ * Once this stage is using a persistent buffer, this should change.
+ *
+ * @return HighWaterMark.Disabled
+ */
+ @Override
+ protected HighWaterMark getAllowedToUseHighWaterMark() {
+ return HighWaterMark.Disabled;
+ }
+
+ /**
+ * Full batching is allowed, because we want all of the records.
+ *
+ * @return MultipleBatches.Enabled
+ */
+ @Override
+ protected MultipleBatches getAllowedMultipleBatches() {
+ return MultipleBatches.Enabled;
+ }
+
@Override
protected Repository getRemoteRepository() throws URISyntaxException {
String collection = getCollection();
return new ConfigurableServer15Repository(
collection,
session.getSyncDeadline(),
session.config.storageURL(),
session.getAuthHeaderProvider(),
session.config.infoCollections,
session.config.infoConfiguration,
FORM_HISTORY_BATCH_LIMIT,
FORM_HISTORY_SORT,
- true /* allow multiple batches */);
+ getAllowedMultipleBatches(),
+ getAllowedToUseHighWaterMark(),
+ getRepositoryStateProvider()
+ );
}
@Override
protected Repository getLocalRepository() {
return new BufferingMiddlewareRepository(
session.getSyncDeadline(),
new MemoryBufferStorage(),
new FormHistoryRepositorySession.FormHistoryRepository()
--- a/mobile/android/services/src/main/java/org/mozilla/gecko/sync/stage/ServerSyncStage.java
+++ b/mobile/android/services/src/main/java/org/mozilla/gecko/sync/stage/ServerSyncStage.java
@@ -22,20 +22,22 @@ import org.mozilla.gecko.sync.delegates.
import org.mozilla.gecko.sync.middleware.Crypto5MiddlewareRepository;
import org.mozilla.gecko.sync.net.AuthHeaderProvider;
import org.mozilla.gecko.sync.net.BaseResource;
import org.mozilla.gecko.sync.net.SyncStorageRequest;
import org.mozilla.gecko.sync.net.SyncStorageRequestDelegate;
import org.mozilla.gecko.sync.net.SyncStorageResponse;
import org.mozilla.gecko.sync.repositories.InactiveSessionException;
import org.mozilla.gecko.sync.repositories.InvalidSessionTransitionException;
+import org.mozilla.gecko.sync.repositories.NonPersistentRepositoryStateProvider;
import org.mozilla.gecko.sync.repositories.RecordFactory;
import org.mozilla.gecko.sync.repositories.Repository;
import org.mozilla.gecko.sync.repositories.RepositorySession;
import org.mozilla.gecko.sync.repositories.RepositorySessionBundle;
+import org.mozilla.gecko.sync.repositories.RepositoryStateProvider;
import org.mozilla.gecko.sync.repositories.Server15Repository;
import org.mozilla.gecko.sync.repositories.delegates.RepositorySessionBeginDelegate;
import org.mozilla.gecko.sync.repositories.delegates.RepositorySessionCreationDelegate;
import org.mozilla.gecko.sync.repositories.delegates.RepositorySessionFinishDelegate;
import org.mozilla.gecko.sync.repositories.delegates.RepositorySessionWipeDelegate;
import org.mozilla.gecko.sync.synchronizer.ServerLocalSynchronizer;
import org.mozilla.gecko.sync.synchronizer.Synchronizer;
import org.mozilla.gecko.sync.synchronizer.SynchronizerDelegate;
@@ -56,16 +58,30 @@ import java.util.concurrent.ExecutorServ
public abstract class ServerSyncStage extends AbstractSessionManagingSyncStage implements SynchronizerDelegate {
protected static final String LOG_TAG = "ServerSyncStage";
protected long stageStartTimestamp = -1;
protected long stageCompleteTimestamp = -1;
/**
+ * Poor-man's boolean typing.
+ * These enums are used to configure {@link org.mozilla.gecko.sync.repositories.ConfigurableServer15Repository}.
+ */
+ public enum HighWaterMark {
+ Enabled,
+ Disabled
+ }
+
+ public enum MultipleBatches {
+ Enabled,
+ Disabled
+ }
+
+ /**
* Override these in your subclasses.
*
* @return true if this stage should be executed.
* @throws MetaGlobalException
*/
protected boolean isEnabled() throws MetaGlobalException {
EngineSettings engineSettings = null;
try {
@@ -136,25 +152,54 @@ public abstract class ServerSyncStage ex
return new EngineSettings(config.syncID, version);
}
protected abstract String getCollection();
protected abstract String getEngineName();
protected abstract Repository getLocalRepository();
protected abstract RecordFactory getRecordFactory();
+ /**
+ * Used to configure a {@link org.mozilla.gecko.sync.repositories.ConfigurableServer15Repository}.
+ * Override this if you need a persistent repository state provider.
+ *
+ * @return Non-persistent state provider.
+ */
+ protected RepositoryStateProvider getRepositoryStateProvider() {
+ return new NonPersistentRepositoryStateProvider();
+ }
+
+ /**
+ * Used to configure a {@link org.mozilla.gecko.sync.repositories.ConfigurableServer15Repository}.
+ * Override this if you want to restrict downloader to just a single batch.
+ */
+ protected MultipleBatches getAllowedMultipleBatches() {
+ return MultipleBatches.Enabled;
+ }
+
+ /**
+ * Used to configure a {@link org.mozilla.gecko.sync.repositories.ConfigurableServer15Repository}.
+ * Override this if you want to allow resuming record downloads from a high-water-mark.
+ * Ensure you're using a {@link org.mozilla.gecko.sync.repositories.PersistentRepositoryStateProvider}
+ * to persist high-water-mark across syncs.
+ */
+ protected HighWaterMark getAllowedToUseHighWaterMark() {
+ return HighWaterMark.Disabled;
+ }
+
// Override this in subclasses.
protected Repository getRemoteRepository() throws URISyntaxException {
String collection = getCollection();
return new Server15Repository(collection,
session.getSyncDeadline(),
session.config.storageURL(),
session.getAuthHeaderProvider(),
session.config.infoCollections,
- session.config.infoConfiguration);
+ session.config.infoConfiguration,
+ new NonPersistentRepositoryStateProvider());
}
/**
* Return a Crypto5Middleware-wrapped Server15Repository.
*
* @throws NoCollectionKeysSetException
* @throws URISyntaxException
*/
@@ -165,16 +210,20 @@ public abstract class ServerSyncStage ex
cryptoRepo.recordFactory = getRecordFactory();
return cryptoRepo;
}
protected String bundlePrefix() {
return this.getCollection() + ".";
}
+ protected String statePreferencesPrefix() {
+ return this.getCollection() + ".state.";
+ }
+
protected SynchronizerConfiguration getConfig() throws NonObjectJSONException, IOException {
return new SynchronizerConfiguration(session.config.getBranch(bundlePrefix()));
}
protected void persistConfig(SynchronizerConfiguration synchronizerConfiguration) {
synchronizerConfiguration.persist(session.config.getBranch(bundlePrefix()));
}
@@ -185,21 +234,31 @@ public abstract class ServerSyncStage ex
synchronizer.repositoryA = remote;
synchronizer.repositoryB = this.getLocalRepository();
synchronizer.load(getConfig());
return synchronizer;
}
/**
- * Reset timestamps.
+ * Reset timestamps and any repository state.
*/
@Override
protected void resetLocal() {
resetLocalWithSyncID(null);
+ if (!getRepositoryStateProvider().resetAndCommit()) {
+ // At the very least, we can log this.
+ // Failing to reset at this point means that we'll have lingering state for any stages using a
+ // persistent provider. In certain cases this might negatively affect first sync of this stage
+ // in the future.
+ // Our timestamp resetting code in `persistConfig` is affected by the same problem.
+ // A way to work around this is to further prefix our persisted SharedPreferences with
+ // clientID/syncID, ensuring a very defined scope for any persisted state. See Bug 1332431.
+ Logger.warn(LOG_TAG, "Failed to reset repository state");
+ }
}
/**
* Reset timestamps and possibly set syncID.
* @param syncID if non-null, new syncID to persist.
*/
protected void resetLocalWithSyncID(String syncID) {
// Clear both timestamps.
--- a/mobile/android/services/src/main/java/org/mozilla/gecko/sync/synchronizer/RecordsChannel.java
+++ b/mobile/android/services/src/main/java/org/mozilla/gecko/sync/synchronizer/RecordsChannel.java
@@ -1,16 +1,17 @@
/* This Source Code Form is subject to the terms of the Mozilla Public
* License, v. 2.0. If a copy of the MPL was not distributed with this
* file, You can obtain one at http://mozilla.org/MPL/2.0/. */
package org.mozilla.gecko.sync.synchronizer;
import android.support.annotation.NonNull;
import android.support.annotation.Nullable;
+import android.support.annotation.VisibleForTesting;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.atomic.AtomicInteger;
import org.mozilla.gecko.background.common.log.Logger;
import org.mozilla.gecko.sync.ReflowIsNecessaryException;
import org.mozilla.gecko.sync.ThreadPool;
@@ -171,26 +172,17 @@ public class RecordsChannel implements
numFetched.set(0);
numFetchFailed.set(0);
numStored.set(0);
numStoreFailed.set(0);
// Start a consumer thread.
this.consumer = new ConcurrentRecordConsumer(this);
ThreadPool.run(this.consumer);
waitingForQueueDone = true;
-
- // Fetch all records that were modified since our previous flow. If our previous flow succeeded,
- // we will use source's last-sync timestamp. If our previous flow didn't complete, resume it,
- // starting from sink's high water mark timestamp.
- // If there was no previous flow (first sync, or data was cleared...), fetch everything.
- // Resuming a flow is supported for buffered RepositorySessions. We degrade gracefully otherwise.
- final long highWaterMark = sink.getHighWaterMarkTimestamp();
- final long lastSync = source.getLastSyncTimestamp();
- final long sinceTimestamp = Math.max(highWaterMark, lastSync);
- source.fetchSince(sinceTimestamp, this);
+ source.fetchSince(source.getLastSyncTimestamp(), this);
}
/**
* Begin both sessions, invoking flow() when done.
* @throws InvalidSessionTransitionException
*/
public void beginAndFlow() throws InvalidSessionTransitionException {
Logger.trace(LOG_TAG, "Beginning source.");
@@ -210,19 +202,19 @@ public class RecordsChannel implements
@Override
public void onFetchFailed(Exception ex) {
Logger.warn(LOG_TAG, "onFetchFailed. Calling for immediate stop.", ex);
numFetchFailed.incrementAndGet();
if (ex instanceof ReflowIsNecessaryException) {
setReflowException((ReflowIsNecessaryException) ex);
}
+ delegate.onFlowFetchFailed(this, ex);
// Sink will be informed once consumer finishes.
this.consumer.halt();
- delegate.onFlowFetchFailed(this, ex);
}
@Override
public void onFetchedRecord(Record record) {
numFetched.incrementAndGet();
this.toProcess.add(record);
this.consumer.doNotify();
}
@@ -313,16 +305,17 @@ public class RecordsChannel implements
// Prevent "once consumer is done..." actions from taking place. They already have (case 2), or
// we don't need them (case 1).
waitingForQueueDone = false;
// If consumer is still going at it, tell it to stop.
this.consumer.halt();
+ delegate.onFlowStoreFailed(this, ex, null);
delegate.onFlowCompleted(this, fetchEnd, System.currentTimeMillis());
}
@Override
public void onBeginFailed(Exception ex) {
delegate.onFlowBeginFailed(this, ex);
}
@@ -358,17 +351,18 @@ public class RecordsChannel implements
@Override
public RepositorySessionFetchRecordsDelegate deferredFetchDelegate(ExecutorService executor) {
// Lie outright. We know that all of our fetch methods are safe.
return this;
}
@Nullable
- /* package-local */ synchronized ReflowIsNecessaryException getReflowException() {
+ @VisibleForTesting
+ public synchronized ReflowIsNecessaryException getReflowException() {
return reflowException;
}
private synchronized void setReflowException(@NonNull ReflowIsNecessaryException e) {
// It is a mistake to set reflow exception multiple times.
if (reflowException != null) {
throw new IllegalStateException("Reflow exception already set: " + reflowException);
}
--- a/mobile/android/tests/background/junit4/src/org/mozilla/android/sync/net/test/TestServer15Repository.java
+++ b/mobile/android/tests/background/junit4/src/org/mozilla/android/sync/net/test/TestServer15Repository.java
@@ -6,36 +6,39 @@ package org.mozilla.android.sync.net.tes
import android.os.SystemClock;
import org.junit.Assert;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.mozilla.gecko.background.testhelpers.TestRunner;
import org.mozilla.gecko.sync.InfoCollections;
import org.mozilla.gecko.sync.InfoConfiguration;
+import org.mozilla.gecko.sync.repositories.NonPersistentRepositoryStateProvider;
+import org.mozilla.gecko.sync.repositories.RepositoryStateProvider;
import org.mozilla.gecko.sync.repositories.Server15Repository;
import java.net.URI;
import java.net.URISyntaxException;
import java.util.concurrent.TimeUnit;
@RunWith(TestRunner.class)
public class TestServer15Repository {
private static final String COLLECTION = "bookmarks";
private static final String COLLECTION_URL = "http://foo.com/1.5/n6ec3u5bee3tixzp2asys7bs6fve4jfw/storage";
private static final long SYNC_DEADLINE = SystemClock.elapsedRealtime() + TimeUnit.MINUTES.toMillis(30);
protected final InfoCollections infoCollections = new InfoCollections();
protected final InfoConfiguration infoConfiguration = new InfoConfiguration();
+ protected final RepositoryStateProvider stateProvider = new NonPersistentRepositoryStateProvider();
public static void assertQueryEquals(String expected, URI u) {
Assert.assertEquals(expected, u.getRawQuery());
}
@Test
public void testCollectionURI() throws URISyntaxException {
- Server15Repository noTrailingSlash = new Server15Repository(COLLECTION, SYNC_DEADLINE, COLLECTION_URL, null, infoCollections, infoConfiguration);
- Server15Repository trailingSlash = new Server15Repository(COLLECTION, SYNC_DEADLINE, COLLECTION_URL + "/", null, infoCollections, infoConfiguration);
+ Server15Repository noTrailingSlash = new Server15Repository(COLLECTION, SYNC_DEADLINE, COLLECTION_URL, null, infoCollections, infoConfiguration, stateProvider);
+ Server15Repository trailingSlash = new Server15Repository(COLLECTION, SYNC_DEADLINE, COLLECTION_URL + "/", null, infoCollections, infoConfiguration, stateProvider);
Assert.assertEquals("http://foo.com/1.5/n6ec3u5bee3tixzp2asys7bs6fve4jfw/storage/bookmarks", noTrailingSlash.collectionURI().toASCIIString());
Assert.assertEquals("http://foo.com/1.5/n6ec3u5bee3tixzp2asys7bs6fve4jfw/storage/bookmarks", trailingSlash.collectionURI().toASCIIString());
}
}
--- a/mobile/android/tests/background/junit4/src/org/mozilla/gecko/sync/middleware/BufferingMiddlewareRepositorySessionTest.java
+++ b/mobile/android/tests/background/junit4/src/org/mozilla/gecko/sync/middleware/BufferingMiddlewareRepositorySessionTest.java
@@ -60,23 +60,21 @@ public class BufferingMiddlewareReposito
MockRecord record = new MockRecord("guid1", null, 1, false);
bufferingSession.store(record);
assertEquals(1, bufferStorage.all().size());
MockRecord record1 = new MockRecord("guid2", null, 1, false);
bufferingSession.store(record1);
assertEquals(2, bufferStorage.all().size());
- assertEquals(1, bufferStorage.latestModifiedTimestamp());
// record2 must replace record.
MockRecord record2 = new MockRecord("guid1", null, 2, false);
bufferingSession.store(record2);
assertEquals(2, bufferStorage.all().size());
- assertEquals(2, bufferStorage.latestModifiedTimestamp());
// Ensure inner session doesn't see incoming records.
verify(innerRepositorySession, never()).store(record);
verify(innerRepositorySession, never()).store(record1);
verify(innerRepositorySession, never()).store(record2);
}
@Test
@@ -172,32 +170,9 @@ public class BufferingMiddlewareReposito
}
@Test
public void setStoreDelegate() throws Exception {
RepositorySessionStoreDelegate delegate = mock(RepositorySessionStoreDelegate.class);
bufferingSession.setStoreDelegate(delegate);
verify(innerRepositorySession).setStoreDelegate(delegate);
}
-
- @Test
- public void getHighWaterMarkTimestamp() throws Exception {
- // Trivial case, empty buffer.
- assertEquals(0, bufferingSession.getHighWaterMarkTimestamp());
-
- MockRecord record = new MockRecord("guid1", null, 1, false);
- bufferingSession.store(record);
- assertEquals(1, bufferingSession.getHighWaterMarkTimestamp());
-
- MockRecord record3 = new MockRecord("guid3", null, 5, false);
- bufferingSession.store(record3);
- assertEquals(5, bufferingSession.getHighWaterMarkTimestamp());
-
- // NB: same guid as above.
- MockRecord record4 = new MockRecord("guid3", null, -1, false);
- bufferingSession.store(record4);
- assertEquals(1, bufferingSession.getHighWaterMarkTimestamp());
-
- MockRecord record2 = new MockRecord("guid2", null, 13, false);
- bufferingSession.store(record2);
- assertEquals(13, bufferingSession.getHighWaterMarkTimestamp());
- }
}
\ No newline at end of file
new file mode 100644
--- /dev/null
+++ b/mobile/android/tests/background/junit4/src/org/mozilla/gecko/sync/repositories/downloaders/BatchingDownloaderControllerTest.java
@@ -0,0 +1,91 @@
+/* This Source Code Form is subject to the terms of the Mozilla Public
+ * License, v. 2.0. If a copy of the MPL was not distributed with this
+ * file, You can obtain one at http://mozilla.org/MPL/2.0/. */
+
+package org.mozilla.gecko.sync.repositories.downloaders;
+
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mozilla.gecko.background.testhelpers.TestRunner;
+import org.mozilla.gecko.sync.InfoCollections;
+import org.mozilla.gecko.sync.InfoConfiguration;
+import org.mozilla.gecko.sync.repositories.Server15RepositorySession;
+
+import static org.junit.Assert.*;
+
+@RunWith(TestRunner.class)
+public class BatchingDownloaderControllerTest {
+ private BatchingDownloaderTest.MockSever15Repository serverRepository;
+ private Server15RepositorySession repositorySession;
+ private BatchingDownloaderTest.MockSessionFetchRecordsDelegate sessionFetchRecordsDelegate;
+ private BatchingDownloaderTest.MockDownloader mockDownloader;
+ private BatchingDownloaderTest.CountingShadowRepositoryState repositoryStateProvider;
+
+ @Before
+ public void setUp() throws Exception {
+ sessionFetchRecordsDelegate = new BatchingDownloaderTest.MockSessionFetchRecordsDelegate();
+
+ serverRepository = new BatchingDownloaderTest.MockSever15Repository(
+ "dummyCollection", "http://dummy.url/", null,
+ new InfoCollections(), new InfoConfiguration());
+ repositorySession = new Server15RepositorySession(serverRepository);
+ repositoryStateProvider = new BatchingDownloaderTest.CountingShadowRepositoryState();
+ mockDownloader = new BatchingDownloaderTest.MockDownloader(repositorySession, true, true, repositoryStateProvider);
+ }
+
+ @Test
+ public void resumeFetchSinceIfPossible() throws Exception {
+ assertTrue(BatchingDownloaderController.setInitialResumeContextAndCommit(repositoryStateProvider, "offset1", 2L, "oldest"));
+
+ // Test that we'll resume from offset if context is correct.
+ BatchingDownloaderController.resumeFetchSinceIfPossible(
+ mockDownloader, repositoryStateProvider, sessionFetchRecordsDelegate, 3L, 25L, "oldest");
+ assertEquals("offset1", mockDownloader.offset);
+ // Ensure we'll use context-provided since value.
+ assertEquals(2L, mockDownloader.newer);
+ assertEquals("oldest", mockDownloader.sort);
+
+ assertTrue(BatchingDownloaderController.updateResumeContextAndCommit(repositoryStateProvider, "offset2"));
+
+ // Test that we won't resume on context mismatch. Ensure that we use new context.
+ BatchingDownloaderController.resumeFetchSinceIfPossible(
+ mockDownloader, repositoryStateProvider, sessionFetchRecordsDelegate, 1L, 25L, "newest");
+ assertEquals(null, mockDownloader.offset);
+ assertEquals(1L, mockDownloader.newer);
+ assertEquals("newest", mockDownloader.sort);
+
+ assertTrue(BatchingDownloaderController.updateResumeContextAndCommit(repositoryStateProvider, "offset3"));
+
+ // Test that we may fetch with a different limit and still resume since our context is valid.
+ BatchingDownloaderController.resumeFetchSinceIfPossible(
+ mockDownloader, repositoryStateProvider, sessionFetchRecordsDelegate, 3L, 50L, "oldest");
+ assertEquals("offset3", mockDownloader.offset);
+ assertEquals("oldest", mockDownloader.sort);
+ assertEquals(2L, mockDownloader.newer);
+ }
+
+ @Test
+ public void testInitialSetAndUpdateOfContext() throws Exception {
+ assertFalse(BatchingDownloaderController.isResumeContextSet(repositoryStateProvider));
+
+ // Test that we can't update context which wasn't set yet.
+ try {
+ assertFalse(BatchingDownloaderController.updateResumeContextAndCommit(repositoryStateProvider, "offset2"));
+ fail();
+ } catch (IllegalStateException e) {}
+
+ // Test that we can set context and check that it's set.
+ assertTrue(BatchingDownloaderController.setInitialResumeContextAndCommit(repositoryStateProvider, "offset1", 1L, "newest"));
+ assertTrue(BatchingDownloaderController.isResumeContextSet(repositoryStateProvider));
+
+ // Test that we can't set context after it was already set.
+ try {
+ assertFalse(BatchingDownloaderController.setInitialResumeContextAndCommit(repositoryStateProvider, "offset1", 1L, "newest"));
+ fail();
+ } catch (IllegalStateException e) {}
+
+ // Test that we can update context after it was set.
+ assertTrue(BatchingDownloaderController.updateResumeContextAndCommit(repositoryStateProvider, "offset2"));
+ }
+}
\ No newline at end of file
--- a/mobile/android/tests/background/junit4/src/org/mozilla/gecko/sync/repositories/downloaders/BatchingDownloaderDelegateTest.java
+++ b/mobile/android/tests/background/junit4/src/org/mozilla/gecko/sync/repositories/downloaders/BatchingDownloaderDelegateTest.java
@@ -14,16 +14,17 @@ import org.mozilla.gecko.background.test
import org.mozilla.gecko.sync.CollectionConcurrentModificationException;
import org.mozilla.gecko.sync.CryptoRecord;
import org.mozilla.gecko.sync.HTTPFailureException;
import org.mozilla.gecko.sync.InfoCollections;
import org.mozilla.gecko.sync.InfoConfiguration;
import org.mozilla.gecko.sync.net.SyncResponse;
import org.mozilla.gecko.sync.net.SyncStorageCollectionRequest;
import org.mozilla.gecko.sync.net.SyncStorageResponse;
+import org.mozilla.gecko.sync.repositories.NonPersistentRepositoryStateProvider;
import org.mozilla.gecko.sync.repositories.RepositorySession;
import org.mozilla.gecko.sync.repositories.Server15RepositorySession;
import org.mozilla.gecko.sync.repositories.Server15Repository;
import org.mozilla.gecko.sync.repositories.delegates.RepositorySessionFetchRecordsDelegate;
import org.mozilla.gecko.sync.repositories.domain.Record;
import java.net.URI;
import java.util.concurrent.ExecutorService;
@@ -49,16 +50,18 @@ public class BatchingDownloaderDelegateT
public Exception ex;
public MockDownloader(RepositorySession repositorySession) {
super(
null,
Uri.EMPTY,
SystemClock.elapsedRealtime() + TimeUnit.MINUTES.toMillis(30),
true,
+ true,
+ new NonPersistentRepositoryStateProvider(),
repositorySession
);
}
@Override
public void onFetchCompleted(SyncStorageResponse response,
final RepositorySessionFetchRecordsDelegate fetchRecordsDelegate,
final SyncStorageCollectionRequest request,
@@ -111,28 +114,31 @@ public class BatchingDownloaderDelegateT
@Before
public void setUp() throws Exception {
repositorySession = new Server15RepositorySession(new Server15Repository(
"dummyCollection",
SystemClock.elapsedRealtime() + TimeUnit.MINUTES.toMillis(30),
DEFAULT_COLLECTION_URL,
null,
new InfoCollections(),
- new InfoConfiguration())
+ new InfoConfiguration(),
+ new NonPersistentRepositoryStateProvider())
);
mockDownloader = new MockDownloader(repositorySession);
}
@Test
public void testIfUnmodifiedSince() throws Exception {
BatchingDownloader downloader = new BatchingDownloader(
null,
Uri.EMPTY,
SystemClock.elapsedRealtime() + TimeUnit.MINUTES.toMillis(30),
true,
+ true,
+ new NonPersistentRepositoryStateProvider(),
repositorySession
);
RepositorySessionFetchRecordsDelegate delegate = new SimpleSessionFetchRecordsDelegate();
BatchingDownloaderDelegate downloaderDelegate = new BatchingDownloaderDelegate(downloader, delegate,
new SyncStorageCollectionRequest(new URI(DEFAULT_COLLECTION_URL)), 0, 0, true, null, null);
String lastModified = "12345678";
SyncStorageResponse response = makeSyncStorageResponse(200, lastModified);
downloaderDelegate.handleRequestSuccess(response);
--- a/mobile/android/tests/background/junit4/src/org/mozilla/gecko/sync/repositories/downloaders/BatchingDownloaderTest.java
+++ b/mobile/android/tests/background/junit4/src/org/mozilla/gecko/sync/repositories/downloaders/BatchingDownloaderTest.java
@@ -2,62 +2,128 @@
* License, v. 2.0. If a copy of the MPL was not distributed with this
* file, You can obtain one at http://mozilla.org/MPL/2.0/. */
package org.mozilla.gecko.sync.repositories.downloaders;
import android.net.Uri;
import android.os.SystemClock;
import android.support.annotation.NonNull;
+import android.support.annotation.Nullable;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.mozilla.gecko.background.testhelpers.TestRunner;
+import org.mozilla.gecko.sync.CollectionConcurrentModificationException;
import org.mozilla.gecko.sync.CryptoRecord;
import org.mozilla.gecko.sync.InfoCollections;
import org.mozilla.gecko.sync.InfoConfiguration;
+import org.mozilla.gecko.sync.SyncDeadlineReachedException;
import org.mozilla.gecko.sync.net.AuthHeaderProvider;
import org.mozilla.gecko.sync.net.SyncResponse;
import org.mozilla.gecko.sync.net.SyncStorageCollectionRequest;
import org.mozilla.gecko.sync.net.SyncStorageResponse;
+import org.mozilla.gecko.sync.repositories.NonPersistentRepositoryStateProvider;
import org.mozilla.gecko.sync.repositories.RepositorySession;
+import org.mozilla.gecko.sync.repositories.RepositoryStateProvider;
import org.mozilla.gecko.sync.repositories.Server15Repository;
import org.mozilla.gecko.sync.repositories.Server15RepositorySession;
import org.mozilla.gecko.sync.repositories.Repository;
import org.mozilla.gecko.sync.repositories.delegates.RepositorySessionFetchRecordsDelegate;
import org.mozilla.gecko.sync.repositories.domain.Record;
import java.io.UnsupportedEncodingException;
import java.net.URI;
import java.net.URISyntaxException;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
import ch.boye.httpclientandroidlib.ProtocolVersion;
import ch.boye.httpclientandroidlib.message.BasicHttpResponse;
import ch.boye.httpclientandroidlib.message.BasicStatusLine;
import static org.junit.Assert.*;
import static org.junit.Assert.assertEquals;
@RunWith(TestRunner.class)
public class BatchingDownloaderTest {
private MockSever15Repository serverRepository;
private Server15RepositorySession repositorySession;
private MockSessionFetchRecordsDelegate sessionFetchRecordsDelegate;
private MockDownloader mockDownloader;
private String DEFAULT_COLLECTION_NAME = "dummyCollection";
- private String DEFAULT_COLLECTION_URL = "http://dummy.url/";
+ private static String DEFAULT_COLLECTION_URL = "http://dummy.url/";
private long DEFAULT_NEWER = 1;
private String DEFAULT_SORT = "oldest";
private String DEFAULT_IDS = "1";
private String DEFAULT_LMHEADER = "12345678";
- class MockSessionFetchRecordsDelegate implements RepositorySessionFetchRecordsDelegate {
+ private CountingShadowRepositoryState repositoryStateProvider;
+
+ // Memory-backed state implementation which keeps a shadow of current values, so that they can be
+ // queried by the tests. Classes under test do not have access to the shadow, and follow regular
+ // non-persistent state semantics (value changes visible only after commit).
+ static class CountingShadowRepositoryState extends NonPersistentRepositoryStateProvider {
+ private AtomicInteger commitCount = new AtomicInteger(0);
+ private final Map<String, Object> shadowMap = Collections.synchronizedMap(new HashMap<String, Object>(2));
+
+ @Override
+ public boolean commit() {
+ commitCount.incrementAndGet();
+ return super.commit();
+ }
+
+ int getCommitCount() {
+ return commitCount.get();
+ }
+
+ @Nullable
+ public Long getShadowedLong(String key) {
+ return (Long) shadowMap.get(key);
+ }
+
+ @Nullable
+ public String getShadowedString(String key) {
+ return (String) shadowMap.get(key);
+ }
+
+ @Override
+ public CountingShadowRepositoryState setLong(String key, Long value) {
+ shadowMap.put(key, value);
+ super.setLong(key, value);
+ return this;
+ }
+
+ @Override
+ public CountingShadowRepositoryState setString(String key, String value) {
+ shadowMap.put(key, value);
+ super.setString(key, value);
+ return this;
+ }
+
+ @Override
+ public CountingShadowRepositoryState clear(String key) {
+ shadowMap.remove(key);
+ super.clear(key);
+ return this;
+ }
+
+ @Override
+ public boolean resetAndCommit() {
+ shadowMap.clear();
+ return super.resetAndCommit();
+ }
+ }
+
+ static class MockSessionFetchRecordsDelegate implements RepositorySessionFetchRecordsDelegate {
public boolean isFailure;
public boolean isFetched;
public boolean isSuccess;
public int batchesCompleted;
public Exception ex;
public Record record;
@Override
@@ -83,39 +149,40 @@ public class BatchingDownloaderTest {
}
@Override
public RepositorySessionFetchRecordsDelegate deferredFetchDelegate(ExecutorService executor) {
return null;
}
}
- class MockRequest extends SyncStorageCollectionRequest {
+ static class MockRequest extends SyncStorageCollectionRequest {
- public MockRequest(URI uri) {
+ MockRequest(URI uri) {
super(uri);
}
@Override
public void get() {
}
}
- class MockDownloader extends BatchingDownloader {
+ static class MockDownloader extends BatchingDownloader {
public long newer;
public long limit;
public boolean full;
public String sort;
public String ids;
public String offset;
public boolean abort;
- public MockDownloader(RepositorySession repositorySession, boolean allowMultipleBatches) {
- super(null, Uri.EMPTY, SystemClock.elapsedRealtime() + TimeUnit.MINUTES.toMillis(30), allowMultipleBatches, repositorySession);
+ MockDownloader(RepositorySession repositorySession, boolean allowMultipleBatches, boolean keepTrackOfHighWaterMark, RepositoryStateProvider repositoryStateProvider) {
+ super(null, Uri.EMPTY, SystemClock.elapsedRealtime() + TimeUnit.MINUTES.toMillis(30),
+ allowMultipleBatches, keepTrackOfHighWaterMark, repositoryStateProvider, repositorySession);
}
@Override
public void fetchWithParameters(long newer,
long batchLimit,
boolean full,
String sort,
String ids,
@@ -144,45 +211,48 @@ public class BatchingDownloaderTest {
String ids,
String offset)
throws URISyntaxException, UnsupportedEncodingException {
this.offset = offset;
return super.makeSyncStorageCollectionRequest(newer, batchLimit, full, sort, ids, offset);
}
}
- class MockSever15Repository extends Server15Repository {
- public MockSever15Repository(@NonNull String collection, @NonNull String storageURL,
+ static class MockSever15Repository extends Server15Repository {
+ MockSever15Repository(@NonNull String collection, @NonNull String storageURL,
AuthHeaderProvider authHeaderProvider, @NonNull InfoCollections infoCollections,
@NonNull InfoConfiguration infoConfiguration) throws URISyntaxException {
- super(collection, SystemClock.elapsedRealtime() + TimeUnit.MINUTES.toMillis(30), storageURL, authHeaderProvider, infoCollections, infoConfiguration);
+ super(collection, SystemClock.elapsedRealtime() + TimeUnit.MINUTES.toMillis(30),
+ storageURL, authHeaderProvider, infoCollections, infoConfiguration,
+ new NonPersistentRepositoryStateProvider());
}
}
- class MockRepositorySession extends Server15RepositorySession {
+ static class MockRepositorySession extends Server15RepositorySession {
public boolean abort;
- public MockRepositorySession(Repository repository) {
+ MockRepositorySession(Repository repository) {
super(repository);
}
@Override
public void abort() {
this.abort = true;
}
}
@Before
public void setUp() throws Exception {
sessionFetchRecordsDelegate = new MockSessionFetchRecordsDelegate();
serverRepository = new MockSever15Repository(DEFAULT_COLLECTION_NAME, DEFAULT_COLLECTION_URL, null,
new InfoCollections(), new InfoConfiguration());
repositorySession = new Server15RepositorySession(serverRepository);
- mockDownloader = new MockDownloader(repositorySession, true);
+ repositoryStateProvider = new CountingShadowRepositoryState();
+ mockDownloader = new MockDownloader(repositorySession, true, true, repositoryStateProvider);
}
@Test
public void testFlattenId() {
String[] emptyGuid = new String[]{};
String flatten = BatchingDownloader.flattenIDs(emptyGuid);
assertEquals("", flatten);
@@ -199,122 +269,135 @@ public class BatchingDownloaderTest {
multiGuid[1] = guid1;
multiGuid[2] = guid2;
flatten = BatchingDownloader.flattenIDs(multiGuid);
assertEquals("123456789abc,456789abc,789abc", flatten);
}
@Test
public void testBatchingTrivial() throws Exception {
- MockDownloader mockDownloader = new MockDownloader(repositorySession, true);
+ MockDownloader mockDownloader = new MockDownloader(repositorySession, true, true, repositoryStateProvider);
assertNull(mockDownloader.getLastModified());
// Number of records == batch limit.
final long BATCH_LIMIT = 100;
- mockDownloader.fetchSince(sessionFetchRecordsDelegate, DEFAULT_NEWER, BATCH_LIMIT, DEFAULT_SORT);
+ mockDownloader.fetchSince(sessionFetchRecordsDelegate, DEFAULT_NEWER, BATCH_LIMIT, DEFAULT_SORT, null);
SyncStorageResponse response = makeSyncStorageResponse(200, DEFAULT_LMHEADER, null, "100");
SyncStorageCollectionRequest request = new SyncStorageCollectionRequest(new URI(DEFAULT_COLLECTION_URL));
mockDownloader.onFetchCompleted(response, sessionFetchRecordsDelegate, request,
DEFAULT_NEWER, BATCH_LIMIT, true, DEFAULT_SORT, DEFAULT_IDS);
assertEquals(DEFAULT_LMHEADER, mockDownloader.getLastModified());
assertTrue(sessionFetchRecordsDelegate.isSuccess);
assertFalse(sessionFetchRecordsDelegate.isFetched);
assertFalse(sessionFetchRecordsDelegate.isFailure);
assertEquals(0, sessionFetchRecordsDelegate.batchesCompleted);
+
+ // NB: we set highWaterMark as part of onFetchedRecord, so we don't expect it to be set here.
+ // Expect no offset to be persisted.
+ ensureOffsetContextIsNull(repositoryStateProvider);
+ assertEquals(1, repositoryStateProvider.getCommitCount());
}
@Test
public void testBatchingSingleBatchMode() throws Exception {
- MockDownloader mockDownloader = new MockDownloader(repositorySession, false);
+ MockDownloader mockDownloader = new MockDownloader(repositorySession, false, true, repositoryStateProvider);
assertNull(mockDownloader.getLastModified());
// Number of records > batch limit. But, we're only allowed to make one batch request.
final long BATCH_LIMIT = 100;
- mockDownloader.fetchSince(sessionFetchRecordsDelegate, DEFAULT_NEWER, BATCH_LIMIT, DEFAULT_SORT);
+ mockDownloader.fetchSince(sessionFetchRecordsDelegate, DEFAULT_NEWER, BATCH_LIMIT, DEFAULT_SORT, null);
String offsetHeader = "25";
String recordsHeader = "500";
SyncStorageResponse response = makeSyncStorageResponse(200, DEFAULT_LMHEADER, offsetHeader, recordsHeader);
SyncStorageCollectionRequest request = new SyncStorageCollectionRequest(new URI(DEFAULT_COLLECTION_URL));
mockDownloader.onFetchCompleted(response, sessionFetchRecordsDelegate, request,
DEFAULT_NEWER, BATCH_LIMIT, true, DEFAULT_SORT, DEFAULT_IDS);
assertEquals(DEFAULT_LMHEADER, mockDownloader.getLastModified());
assertTrue(sessionFetchRecordsDelegate.isSuccess);
assertFalse(sessionFetchRecordsDelegate.isFetched);
assertFalse(sessionFetchRecordsDelegate.isFailure);
assertEquals(0, sessionFetchRecordsDelegate.batchesCompleted);
+
+ // We don't care about the offset in a single batch mode.
+ ensureOffsetContextIsNull(repositoryStateProvider);
+ assertEquals(1, repositoryStateProvider.getCommitCount());
}
@Test
public void testBatching() throws Exception {
final long BATCH_LIMIT = 25;
- mockDownloader = new MockDownloader(repositorySession, true);
+ mockDownloader = new MockDownloader(repositorySession, true, true, repositoryStateProvider);
assertNull(mockDownloader.getLastModified());
- mockDownloader.fetchSince(sessionFetchRecordsDelegate, DEFAULT_NEWER, BATCH_LIMIT, DEFAULT_SORT);
+ mockDownloader.fetchSince(sessionFetchRecordsDelegate, DEFAULT_NEWER, BATCH_LIMIT, DEFAULT_SORT, null);
- String offsetHeader = "25";
- String recordsHeader = "25";
- SyncStorageResponse response = makeSyncStorageResponse(200, DEFAULT_LMHEADER, offsetHeader, recordsHeader);
- SyncStorageCollectionRequest request = new SyncStorageCollectionRequest(new URI(DEFAULT_COLLECTION_URL));
- mockDownloader.onFetchCompleted(response, sessionFetchRecordsDelegate, request, DEFAULT_NEWER,
- BATCH_LIMIT, true, DEFAULT_SORT, DEFAULT_IDS);
+ final String recordsHeader = "25";
+ performOnFetchCompleted("25", recordsHeader, BATCH_LIMIT);
assertEquals(DEFAULT_LMHEADER, mockDownloader.getLastModified());
// Verify the same parameters are used in the next fetch.
assertSameParameters(mockDownloader, BATCH_LIMIT);
- assertEquals(offsetHeader, mockDownloader.offset);
+ assertEquals("25", mockDownloader.offset);
assertFalse(sessionFetchRecordsDelegate.isSuccess);
assertFalse(sessionFetchRecordsDelegate.isFetched);
assertFalse(sessionFetchRecordsDelegate.isFailure);
assertEquals(1, sessionFetchRecordsDelegate.batchesCompleted);
+ // Offset context set.
+ ensureOffsetContextIs(repositoryStateProvider, "25", "oldest", 1L);
+ assertEquals(1, repositoryStateProvider.getCommitCount());
+
// The next batch, we still have an offset token and has not exceed the total limit.
- offsetHeader = "50";
- response = makeSyncStorageResponse(200, DEFAULT_LMHEADER, offsetHeader, recordsHeader);
- mockDownloader.onFetchCompleted(response, sessionFetchRecordsDelegate, request, DEFAULT_NEWER,
- BATCH_LIMIT, true, DEFAULT_SORT, DEFAULT_IDS);
+ performOnFetchCompleted("50", recordsHeader, BATCH_LIMIT);
assertEquals(DEFAULT_LMHEADER, mockDownloader.getLastModified());
// Verify the same parameters are used in the next fetch.
assertSameParameters(mockDownloader, BATCH_LIMIT);
- assertEquals(offsetHeader, mockDownloader.offset);
+ assertEquals("50", mockDownloader.offset);
assertFalse(sessionFetchRecordsDelegate.isSuccess);
assertFalse(sessionFetchRecordsDelegate.isFetched);
assertFalse(sessionFetchRecordsDelegate.isFailure);
assertEquals(2, sessionFetchRecordsDelegate.batchesCompleted);
+ // Offset context updated.
+ ensureOffsetContextIs(repositoryStateProvider, "50", "oldest", 1L);
+ assertEquals(2, repositoryStateProvider.getCommitCount());
+
// The next batch, we still have an offset token and has not exceed the total limit.
- offsetHeader = "75";
- response = makeSyncStorageResponse(200, DEFAULT_LMHEADER, offsetHeader, recordsHeader);
- mockDownloader.onFetchCompleted(response, sessionFetchRecordsDelegate, request, DEFAULT_NEWER,
- BATCH_LIMIT, true, DEFAULT_SORT, DEFAULT_IDS);
+ performOnFetchCompleted("75", recordsHeader, BATCH_LIMIT);
assertEquals(DEFAULT_LMHEADER, mockDownloader.getLastModified());
// Verify the same parameters are used in the next fetch.
assertSameParameters(mockDownloader, BATCH_LIMIT);
- assertEquals(offsetHeader, mockDownloader.offset);
+ assertEquals("75", mockDownloader.offset);
assertFalse(sessionFetchRecordsDelegate.isSuccess);
assertFalse(sessionFetchRecordsDelegate.isFetched);
assertFalse(sessionFetchRecordsDelegate.isFailure);
assertEquals(3, sessionFetchRecordsDelegate.batchesCompleted);
+ // Offset context updated.
+ ensureOffsetContextIs(repositoryStateProvider, "75", "oldest", 1L);
+ assertEquals(3, repositoryStateProvider.getCommitCount());
+
// No more offset token, so we complete batching.
- response = makeSyncStorageResponse(200, DEFAULT_LMHEADER, null, recordsHeader);
- mockDownloader.onFetchCompleted(response, sessionFetchRecordsDelegate, request, DEFAULT_NEWER,
- BATCH_LIMIT, true, DEFAULT_SORT, DEFAULT_IDS);
+ performOnFetchCompleted(null, recordsHeader, BATCH_LIMIT);
assertEquals(DEFAULT_LMHEADER, mockDownloader.getLastModified());
assertTrue(sessionFetchRecordsDelegate.isSuccess);
assertFalse(sessionFetchRecordsDelegate.isFetched);
assertFalse(sessionFetchRecordsDelegate.isFailure);
assertEquals(3, sessionFetchRecordsDelegate.batchesCompleted);
+
+ // Offset context cleared since we finished batching, and committed.
+ ensureOffsetContextIsNull(repositoryStateProvider);
+ assertEquals(4, repositoryStateProvider.getCommitCount());
}
@Test
public void testFailureLMChangedMultiBatch() throws Exception {
assertNull(mockDownloader.getLastModified());
String lmHeader = "12345678";
String offsetHeader = "100";
@@ -331,57 +414,140 @@ public class BatchingDownloaderTest {
assertTrue(mockDownloader.full);
assertEquals(DEFAULT_SORT, mockDownloader.sort);
assertEquals(DEFAULT_IDS, mockDownloader.ids);
assertEquals(offsetHeader, mockDownloader.offset);
assertFalse(sessionFetchRecordsDelegate.isSuccess);
assertFalse(sessionFetchRecordsDelegate.isFetched);
assertFalse(sessionFetchRecordsDelegate.isFailure);
+ // Offset context set.
+ ensureOffsetContextIs(repositoryStateProvider, "100", "oldest", 1L);
+ assertEquals(1, repositoryStateProvider.getCommitCount());
+
// Last modified header somehow changed.
lmHeader = "10000000";
response = makeSyncStorageResponse(200, lmHeader, offsetHeader, null);
mockDownloader.onFetchCompleted(response, sessionFetchRecordsDelegate, request, DEFAULT_NEWER,
limit, true, DEFAULT_SORT, DEFAULT_IDS);
assertNotEquals(lmHeader, mockDownloader.getLastModified());
assertTrue(mockDownloader.abort);
assertFalse(sessionFetchRecordsDelegate.isSuccess);
assertFalse(sessionFetchRecordsDelegate.isFetched);
assertTrue(sessionFetchRecordsDelegate.isFailure);
+
+ // Since we failed due to a remotely modified collection, we expect offset context to be reset.
+ ensureOffsetContextIsNull(repositoryStateProvider);
+ assertEquals(2, repositoryStateProvider.getCommitCount());
}
@Test
public void testFailureException() throws Exception {
Exception ex = new IllegalStateException();
SyncStorageCollectionRequest request = new SyncStorageCollectionRequest(new URI("http://dummy.url"));
mockDownloader.handleFetchFailed(sessionFetchRecordsDelegate, ex, request);
assertFalse(sessionFetchRecordsDelegate.isSuccess);
assertFalse(sessionFetchRecordsDelegate.isFetched);
assertTrue(sessionFetchRecordsDelegate.isFailure);
assertEquals(ex.getClass(), sessionFetchRecordsDelegate.ex.getClass());
assertNull(sessionFetchRecordsDelegate.record);
}
@Test
+ public void testOffsetNotResetAfterDeadline() throws Exception {
+ final long BATCH_LIMIT = 25;
+ mockDownloader = new MockDownloader(repositorySession, true, true, repositoryStateProvider);
+ mockDownloader.fetchSince(sessionFetchRecordsDelegate, DEFAULT_NEWER, BATCH_LIMIT, DEFAULT_SORT, null);
+
+ SyncStorageCollectionRequest request = performOnFetchCompleted("25", "25", 25);
+
+ // Offset context set.
+ ensureOffsetContextIs(repositoryStateProvider, "25", "oldest", 1L);
+ assertEquals(1, repositoryStateProvider.getCommitCount());
+
+ Exception ex = new SyncDeadlineReachedException();
+ mockDownloader.handleFetchFailed(sessionFetchRecordsDelegate, ex, request);
+
+ // Offset context not reset.
+ ensureOffsetContextIs(repositoryStateProvider, "25", "oldest", 1L);
+ assertEquals(2, repositoryStateProvider.getCommitCount());
+ }
+
+ @Test
+ public void testOffsetResetAfterConcurrentModification() throws Exception {
+ final long BATCH_LIMIT = 25;
+ mockDownloader = new MockDownloader(repositorySession, true, true, repositoryStateProvider);
+ mockDownloader.fetchSince(sessionFetchRecordsDelegate, DEFAULT_NEWER, BATCH_LIMIT, DEFAULT_SORT, null);
+
+ SyncStorageCollectionRequest request = performOnFetchCompleted("25", "25", 25);
+
+ // Offset context set.
+ ensureOffsetContextIs(repositoryStateProvider, "25", "oldest", 1L);
+ assertEquals(1, repositoryStateProvider.getCommitCount());
+
+ Exception ex = new CollectionConcurrentModificationException();
+ mockDownloader.handleFetchFailed(sessionFetchRecordsDelegate, ex, request);
+
+ // Offset is reset.
+ ensureOffsetContextIsNull(repositoryStateProvider);
+ assertEquals(2, repositoryStateProvider.getCommitCount());
+ }
+
+ @Test
public void testFetchRecord() {
CryptoRecord record = new CryptoRecord();
mockDownloader.onFetchedRecord(record, sessionFetchRecordsDelegate);
assertTrue(sessionFetchRecordsDelegate.isFetched);
assertFalse(sessionFetchRecordsDelegate.isSuccess);
assertFalse(sessionFetchRecordsDelegate.isFailure);
assertEquals(record, sessionFetchRecordsDelegate.record);
}
@Test
+ public void testHighWaterMarkTracking() {
+ CryptoRecord record = new CryptoRecord();
+
+ // HWM enabled
+ mockDownloader = new MockDownloader(repositorySession, true, true, repositoryStateProvider);
+
+ record.lastModified = 1L;
+ mockDownloader.onFetchedRecord(record, sessionFetchRecordsDelegate);
+ assertEquals(Long.valueOf(1), repositoryStateProvider.getShadowedLong(RepositoryStateProvider.KEY_HIGH_WATER_MARK));
+
+ record.lastModified = 5L;
+ mockDownloader.onFetchedRecord(record, sessionFetchRecordsDelegate);
+ assertEquals(Long.valueOf(5), repositoryStateProvider.getShadowedLong(RepositoryStateProvider.KEY_HIGH_WATER_MARK));
+
+ // NB: Currently nothing is preventing HWM from "going down".
+ record.lastModified = 4L;
+ mockDownloader.onFetchedRecord(record, sessionFetchRecordsDelegate);
+ assertEquals(Long.valueOf(4), repositoryStateProvider.getShadowedLong(RepositoryStateProvider.KEY_HIGH_WATER_MARK));
+
+ // HWM disabled
+ mockDownloader = new MockDownloader(repositorySession, true, false, repositoryStateProvider);
+ assertTrue(repositoryStateProvider.resetAndCommit());
+
+ record.lastModified = 4L;
+ mockDownloader.onFetchedRecord(record, sessionFetchRecordsDelegate);
+ assertNull(repositoryStateProvider.getShadowedLong(RepositoryStateProvider.KEY_HIGH_WATER_MARK));
+
+ record.lastModified = 5L;
+ mockDownloader.onFetchedRecord(record, sessionFetchRecordsDelegate);
+ assertNull(repositoryStateProvider.getShadowedLong(RepositoryStateProvider.KEY_HIGH_WATER_MARK));
+ }
+
+ @Test
public void testAbortRequests() {
MockRepositorySession mockRepositorySession = new MockRepositorySession(serverRepository);
- BatchingDownloader downloader = new BatchingDownloader(null, Uri.EMPTY, SystemClock.elapsedRealtime() + TimeUnit.MINUTES.toMillis(30), true, mockRepositorySession);
+ BatchingDownloader downloader = new BatchingDownloader(
+ null, Uri.EMPTY, SystemClock.elapsedRealtime() + TimeUnit.MINUTES.toMillis(30),
+ true, true, new NonPersistentRepositoryStateProvider(), mockRepositorySession);
assertFalse(mockRepositorySession.abort);
downloader.abortRequests();
assertTrue(mockRepositorySession.abort);
}
@Test
public void testBuildCollectionURI() {
try {
@@ -393,16 +559,25 @@ public class BatchingDownloaderTest {
final Uri baseUri = Uri.parse("https://moztest.org/collection/");
assertEquals(baseUri + "?full=1&ids=123%2Cabc&offset=1234", BatchingDownloader.buildCollectionURI(baseUri, true, -1L, -1, null, "123,abc", "1234").toString());
} catch (URISyntaxException e) {
fail();
}
}
+ private SyncStorageCollectionRequest performOnFetchCompleted(String offsetHeader, String recordsHeader, long batchLimit) throws URISyntaxException {
+ SyncStorageResponse response = makeSyncStorageResponse(200, DEFAULT_LMHEADER, offsetHeader, recordsHeader);
+ SyncStorageCollectionRequest request = new SyncStorageCollectionRequest(new URI(DEFAULT_COLLECTION_URL));
+ mockDownloader.onFetchCompleted(response, sessionFetchRecordsDelegate, request, DEFAULT_NEWER,
+ batchLimit, true, DEFAULT_SORT, DEFAULT_IDS);
+
+ return request;
+ }
+
private void assertSameParameters(MockDownloader mockDownloader, long limit) {
assertEquals(DEFAULT_NEWER, mockDownloader.newer);
assertEquals(limit, mockDownloader.limit);
assertTrue(mockDownloader.full);
assertEquals(DEFAULT_SORT, mockDownloader.sort);
assertEquals(DEFAULT_IDS, mockDownloader.ids);
}
@@ -419,9 +594,21 @@ public class BatchingDownloaderTest {
}
if (records != null) {
response.addHeader(SyncResponse.X_WEAVE_RECORDS, records);
}
return new SyncStorageResponse(response);
}
+
+ private void ensureOffsetContextIsNull(CountingShadowRepositoryState stateProvider) {
+ assertNull(stateProvider.getShadowedString(RepositoryStateProvider.KEY_OFFSET));
+ assertNull(stateProvider.getShadowedString(RepositoryStateProvider.KEY_OFFSET_ORDER));
+ assertNull(stateProvider.getShadowedLong(RepositoryStateProvider.KEY_OFFSET_SINCE));
+ }
+
+ private void ensureOffsetContextIs(CountingShadowRepositoryState stateProvider, String offset, String order, Long since) {
+ assertEquals(offset, stateProvider.getShadowedString(RepositoryStateProvider.KEY_OFFSET));
+ assertEquals(since, stateProvider.getShadowedLong(RepositoryStateProvider.KEY_OFFSET_SINCE));
+ assertEquals(order, stateProvider.getShadowedString(RepositoryStateProvider.KEY_OFFSET_ORDER));
+ }
}
--- a/mobile/android/tests/background/junit4/src/org/mozilla/gecko/sync/repositories/uploaders/BatchingUploaderTest.java
+++ b/mobile/android/tests/background/junit4/src/org/mozilla/gecko/sync/repositories/uploaders/BatchingUploaderTest.java
@@ -15,16 +15,17 @@ import org.junit.runner.RunWith;
import org.mozilla.gecko.background.testhelpers.MockRecord;
import org.mozilla.gecko.background.testhelpers.TestRunner;
import org.mozilla.gecko.sync.ExtendedJSONObject;
import org.mozilla.gecko.sync.InfoCollections;
import org.mozilla.gecko.sync.InfoConfiguration;
import org.mozilla.gecko.sync.Utils;
import org.mozilla.gecko.sync.net.AuthHeaderProvider;
import org.mozilla.gecko.sync.repositories.RepositorySession;
+import org.mozilla.gecko.sync.repositories.NonPersistentRepositoryStateProvider;
import org.mozilla.gecko.sync.repositories.Server15Repository;
import org.mozilla.gecko.sync.repositories.Server15RepositorySession;
import org.mozilla.gecko.sync.repositories.delegates.RepositorySessionStoreDelegate;
import java.net.URISyntaxException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
@@ -589,17 +590,18 @@ public class BatchingUploaderTest {
try {
return new Server15Repository(
"dummyCollection",
SystemClock.elapsedRealtime() + TimeUnit.MINUTES.toMillis(30),
"http://dummy.url/",
null,
infoCollections,
- infoConfiguration
+ infoConfiguration,
+ new NonPersistentRepositoryStateProvider()
);
} catch (URISyntaxException e) {
// Won't throw, and this won't happen.
return null;
}
}
static abstract class TestRunnableWithTarget<T> {