--- a/mobile/android/base/java/org/mozilla/gecko/db/BrowserProvider.java
+++ b/mobile/android/base/java/org/mozilla/gecko/db/BrowserProvider.java
@@ -2959,17 +2959,17 @@ public class BrowserProvider extends Sha
// NB:
// Constraint exception will be thrown if we try to insert a visit violating
// unique(guid, date) constraint. We don't expect to do that, but our incoming
// data might not be clean - either due to duplicate entries in the sync data,
// or, less likely, due to record reconciliation bugs at the RepositorySession
// level.
} catch (SQLiteConstraintException e) {
- Log.w(LOGTAG, "Unexpected constraint exception while inserting a visit", e);
+ // Don't log this, it'll just cause memory churn.
}
}
if (inserted != valueSet.length) {
Log.w(LOGTAG, "Failed to insert some of the visits. " +
"Expected: " + valueSet.length + ", actual: " + inserted);
}
totalInserted += inserted;
}
--- a/mobile/android/services/src/androidTest/java/org/mozilla/gecko/background/db/TestBookmarks.java
+++ b/mobile/android/services/src/androidTest/java/org/mozilla/gecko/background/db/TestBookmarks.java
@@ -643,22 +643,17 @@ public class TestBookmarks extends Andro
public void onFetchedRecord(Record record) {
fetchedGUIDs.add(record.guid);
}
@Override
public void onFetchCompleted() {
finishAndNotify(session);
}
-
- @Override
- public void onBatchCompleted() {
-
- }
- };
+ };
session.fetchModified(fetchDelegate);
return fetchedGUIDs;
}
private BookmarkRecord fetchGUID(BookmarksRepository repo,
final String guid) throws SyncException {
Logger.info(LOG_TAG, "Fetching for " + guid);
@@ -672,21 +667,16 @@ public class TestBookmarks extends Andro
public void onFetchedRecord(Record record) {
fetchedRecords.add(record);
}
@Override
public void onFetchCompleted() {
finishAndNotify(session);
}
-
- @Override
- public void onBatchCompleted() {
-
- }
};
performWait(new Runnable() {
@Override
public void run() {
try {
session.fetch(new String[]{guid}, fetchDelegate);
} catch (InactiveSessionException e) {
--- a/mobile/android/services/src/androidTest/java/org/mozilla/gecko/background/sync/TestStoreTracking.java
+++ b/mobile/android/services/src/androidTest/java/org/mozilla/gecko/background/sync/TestStoreTracking.java
@@ -101,28 +101,18 @@ public class TestStoreTracking extends A
RepositorySessionBundle bundle) {
performNotify();
}
});
} catch (InactiveSessionException e) {
performNotify(e);
}
}
-
- @Override
- public void onBatchCompleted() {
-
- }
});
}
-
- @Override
- public void onBatchCompleted() {
-
- }
});
} catch (InactiveSessionException e) {
performNotify(e);
}
}
@Override
public void onStoreFailed(Exception e) {
@@ -177,20 +167,16 @@ public class TestStoreTracking extends A
public void onFinishFailed(Exception ex) {
performNotify(ex);
}
});
} catch (InactiveSessionException e) {
performNotify(e);
}
}
-
- @Override
- public void onBatchCompleted() {
- }
});
}
};
performWait(doFetch);
}
/**
--- a/mobile/android/services/src/androidTest/java/org/mozilla/gecko/background/sync/helpers/DefaultFetchDelegate.java
+++ b/mobile/android/services/src/androidTest/java/org/mozilla/gecko/background/sync/helpers/DefaultFetchDelegate.java
@@ -94,17 +94,12 @@ public class DefaultFetchDelegate extend
}
@Override
public void onFetchCompleted() {
Logger.debug(LOG_TAG, "onFetchCompleted. Doing nothing.");
}
@Override
- public void onBatchCompleted() {
- Logger.debug(LOG_TAG, "onBatchCompleted. Doing nothing.");
- }
-
- @Override
public RepositorySessionFetchRecordsDelegate deferredFetchDelegate(final ExecutorService executor) {
return new DeferredRepositorySessionFetchRecordsDelegate(this, executor);
}
}
deleted file mode 100644
--- a/mobile/android/services/src/main/java/org/mozilla/gecko/sync/middleware/BufferingMiddlewareRepository.java
+++ /dev/null
@@ -1,36 +0,0 @@
-/* This Source Code Form is subject to the terms of the Mozilla Public
- * License, v. 2.0. If a copy of the MPL was not distributed with this
- * file, You can obtain one at http://mozilla.org/MPL/2.0/. */
-
-package org.mozilla.gecko.sync.middleware;
-
-import android.content.Context;
-
-import org.mozilla.gecko.sync.SessionCreateException;
-import org.mozilla.gecko.sync.middleware.storage.BufferStorage;
-import org.mozilla.gecko.sync.repositories.Repository;
-import org.mozilla.gecko.sync.repositories.RepositorySession;
-
-/**
- * A buffering-enabled middleware which is intended to wrap local repositories. Configurable with
- * a sync deadline, buffer storage implementation and a consistency checker implementation.
- *
- * @author grisha
- */
-public class BufferingMiddlewareRepository extends Repository {
- private final long syncDeadline;
- private final Repository inner;
- private final BufferStorage bufferStorage;
-
- public BufferingMiddlewareRepository(long syncDeadline, BufferStorage bufferStore, Repository wrappedRepository) {
- this.syncDeadline = syncDeadline;
- this.inner = wrappedRepository;
- this.bufferStorage = bufferStore;
- }
-
- @Override
- public RepositorySession createSession(Context context) throws SessionCreateException {
- final RepositorySession innerSession = this.inner.createSession(context);
- return new BufferingMiddlewareRepositorySession(innerSession, this, syncDeadline, bufferStorage);
- }
-}
deleted file mode 100644
--- a/mobile/android/services/src/main/java/org/mozilla/gecko/sync/middleware/BufferingMiddlewareRepositorySession.java
+++ /dev/null
@@ -1,168 +0,0 @@
-/* This Source Code Form is subject to the terms of the Mozilla Public
- * License, v. 2.0. If a copy of the MPL was not distributed with this
- * file, You can obtain one at http://mozilla.org/MPL/2.0/. */
-
-package org.mozilla.gecko.sync.middleware;
-
-import android.os.SystemClock;
-import android.support.annotation.VisibleForTesting;
-
-import org.mozilla.gecko.sync.SyncDeadlineReachedException;
-import org.mozilla.gecko.sync.middleware.storage.BufferStorage;
-import org.mozilla.gecko.sync.repositories.InactiveSessionException;
-import org.mozilla.gecko.sync.repositories.NoStoreDelegateException;
-import org.mozilla.gecko.sync.repositories.Repository;
-import org.mozilla.gecko.sync.repositories.RepositorySession;
-import org.mozilla.gecko.sync.repositories.delegates.RepositorySessionFetchRecordsDelegate;
-import org.mozilla.gecko.sync.repositories.delegates.RepositorySessionStoreDelegate;
-import org.mozilla.gecko.sync.repositories.domain.Record;
-
-import java.util.Collection;
-
-/**
- * Buffering middleware which is intended to wrap local RepositorySessions.
- *
- * Configure it:
- * - with an appropriate BufferStore (in-memory, record-type-aware database-backed, etc).
- *
- * Fetch is pass-through, store is buffered.
- *
- * @author grisha
- */
-/* package-local */ class BufferingMiddlewareRepositorySession extends MiddlewareRepositorySession {
- private final BufferStorage bufferStorage;
- private final long syncDeadlineMillis;
-
- /* package-local */ BufferingMiddlewareRepositorySession(
- RepositorySession repositorySession, BufferingMiddlewareRepository repository,
- long syncDeadlineMillis, BufferStorage bufferStorage) {
- super(repositorySession, repository);
- this.syncDeadlineMillis = syncDeadlineMillis;
- this.bufferStorage = bufferStorage;
- }
-
- @Override
- public void fetchModified(RepositorySessionFetchRecordsDelegate delegate) {
- this.inner.fetchModified(delegate);
- }
-
- @Override
- public void fetch(String[] guids, RepositorySessionFetchRecordsDelegate delegate) throws InactiveSessionException {
- this.inner.fetch(guids, delegate);
- }
-
- @Override
- public void fetchAll(RepositorySessionFetchRecordsDelegate delegate) {
- this.inner.fetchAll(delegate);
- }
-
- /**
- * Will be called when this repository is acting as a `source`, and a flow of records into `sink`
- * was completed. That is, we've uploaded merged records to the server, so now is a good time
- * to clean up our buffer for this repository.
- */
- @Override
- public void performCleanup() {
- bufferStorage.clear();
- inner.performCleanup();
- }
-
- @Override
- public void store(Record record) throws NoStoreDelegateException {
- bufferStorage.addOrReplace(record);
- }
-
- /**
- * When source fails to provide all records, we need to decide what to do with the buffer.
- * We might fail because of a network partition, or because of a concurrent modification of a
- * collection, or because we ran out of time fetching records, or some other reason.
- *
- * Either way we do not clear the buffer in any error scenario, but rather
- * allow it to be re-filled, replacing existing records with their newer versions if necessary.
- *
- * If a collection has been modified, affected records' last-modified timestamps will be bumped,
- * and we will receive those records during the next sync. If we already have them in our buffer,
- * we replace our now-old copy. Otherwise, they are new records and we just append them.
- *
- * Incoming records are mapped to existing ones via GUIDs.
- */
- @Override
- public void storeIncomplete() {
- bufferStorage.flush();
- }
-
- @Override
- public void storeFlush() {
- bufferStorage.flush();
- }
-
- @Override
- public void storeDone() {
- bufferStorage.flush();
-
- // Determine if we have enough time to merge the buffer data.
- // If we don't have enough time now, we keep our buffer and try again later.
- if (!mayProceedToMergeBuffer()) {
- super.abort();
- storeDelegate.deferredStoreDelegate(storeWorkQueue).onStoreFailed(new SyncDeadlineReachedException());
- return;
- }
-
- doMergeBuffer();
- }
-
- @VisibleForTesting
- /* package-local */ void doMergeBuffer() {
- final Collection<Record> bufferData = bufferStorage.all();
-
- // Trivial case of an empty buffer.
- if (bufferData.isEmpty()) {
- super.storeDone();
- return;
- }
-
- // Let session handle actual storing of records as it pleases.
- // See Bug 1332094 which is concerned with allowing merge to proceed transactionally.
- try {
- for (Record record : bufferData) {
- this.inner.store(record);
- }
- } catch (NoStoreDelegateException e) {
- // At this point we should have a store delegate set on the session, so this won't happen.
- }
-
- // Let session know that there are no more records to store.
- super.storeDone();
- }
-
- /**
- * Session abnormally aborted. This doesn't mean our so-far buffered data is invalid.
- * Clean up after ourselves, if there's anything to clean up.
- */
- @Override
- public void abort() {
- bufferStorage.flush();
- super.abort();
- }
-
- @Override
- public void setStoreDelegate(RepositorySessionStoreDelegate delegate) {
- inner.setStoreDelegate(delegate);
- this.storeDelegate = delegate;
- }
-
- private boolean mayProceedToMergeBuffer() {
- // If our buffer storage is not persistent, disallowing merging after buffer has been filled
- // means throwing away records only to re-download them later.
- // In this case allow merge to proceed even if we're past the deadline.
- if (!bufferStorage.isPersistent()) {
- return true;
- }
-
- // While actual runtime of a merge operation is a function of record type, buffer size, etc.,
- // let's do a simple thing for now and say that we may proceed if we have couple of minutes
- // of runtime left. That surely is enough, right?
- final long timeLeftMillis = syncDeadlineMillis - SystemClock.elapsedRealtime();
- return timeLeftMillis > 1000 * 60 * 2;
- }
-}
--- a/mobile/android/services/src/main/java/org/mozilla/gecko/sync/middleware/Crypto5MiddlewareRepositorySession.java
+++ b/mobile/android/services/src/main/java/org/mozilla/gecko/sync/middleware/Crypto5MiddlewareRepositorySession.java
@@ -110,21 +110,16 @@ public class Crypto5MiddlewareRepository
}
@Override
public void onFetchCompleted() {
next.onFetchCompleted();
}
@Override
- public void onBatchCompleted() {
- next.onBatchCompleted();
- }
-
- @Override
public RepositorySessionFetchRecordsDelegate deferredFetchDelegate(ExecutorService executor) {
// Synchronously perform *our* work, passing through appropriately.
RepositorySessionFetchRecordsDelegate deferredNext = next.deferredFetchDelegate(executor);
return new DecryptingTransformingFetchDelegate(deferredNext, keyBundle, recordFactory);
}
}
private DecryptingTransformingFetchDelegate makeUnwrappingDelegate(RepositorySessionFetchRecordsDelegate inner) {
deleted file mode 100644
--- a/mobile/android/services/src/main/java/org/mozilla/gecko/sync/middleware/storage/BufferStorage.java
+++ /dev/null
@@ -1,31 +0,0 @@
-/* This Source Code Form is subject to the terms of the Mozilla Public
- * License, v. 2.0. If a copy of the MPL was not distributed with this
- * file, You can obtain one at http://mozilla.org/MPL/2.0/. */
-
-package org.mozilla.gecko.sync.middleware.storage;
-
-import org.mozilla.gecko.sync.repositories.domain.Record;
-
-import java.util.Collection;
-
-/**
- * A contract between BufferingMiddleware and specific storage implementations.
- *
- * @author grisha
- */
-public interface BufferStorage {
- // Returns all of the records currently present in the buffer.
- Collection<Record> all();
-
- // Implementations are responsible to ensure that any incoming records with duplicate GUIDs replace
- // what's already present in the storage layer.
- // NB: For a database-backed storage, "replace" happens at a transaction level.
- void addOrReplace(Record record);
-
- // For database-backed implementations, commits any records that came in up to this point.
- void flush();
-
- void clear();
-
- boolean isPersistent();
-}
deleted file mode 100644
--- a/mobile/android/services/src/main/java/org/mozilla/gecko/sync/middleware/storage/MemoryBufferStorage.java
+++ /dev/null
@@ -1,51 +0,0 @@
-/* This Source Code Form is subject to the terms of the Mozilla Public
- * License, v. 2.0. If a copy of the MPL was not distributed with this
- * file, You can obtain one at http://mozilla.org/MPL/2.0/. */
-
-package org.mozilla.gecko.sync.middleware.storage;
-
-import org.mozilla.gecko.sync.repositories.domain.Record;
-
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.Map;
-
-/**
- * A trivial, memory-backed, transient implementation of a BufferStorage.
- * Its intended use is to buffer syncing of small collections.
- * Thread-safe.
- *
- * @author grisha
- */
-public class MemoryBufferStorage implements BufferStorage {
- private final Map<String, Record> recordBuffer = Collections.synchronizedMap(new HashMap<String, Record>());
-
- @Override
- public boolean isPersistent() {
- return false;
- }
-
- @Override
- public Collection<Record> all() {
- synchronized (recordBuffer) {
- return new ArrayList<>(recordBuffer.values());
- }
- }
-
- @Override
- public void addOrReplace(Record record) {
- recordBuffer.put(record.guid, record);
- }
-
- @Override
- public void flush() {
- // This is a no-op; flush intended for database-backed stores.
- }
-
- @Override
- public void clear() {
- recordBuffer.clear();
- }
-}
--- a/mobile/android/services/src/main/java/org/mozilla/gecko/sync/repositories/RepositorySession.java
+++ b/mobile/android/services/src/main/java/org/mozilla/gecko/sync/repositories/RepositorySession.java
@@ -179,23 +179,16 @@ public abstract class RepositorySession
setLastStoreTimestamp(end);
storeDelegate.onStoreCompleted();
}
};
storeWorkQueue.execute(command);
}
/**
- * Indicates that a number of records have been stored, more are still to come but after some time,
- * and now would be a good time to flush records and perform any other similar operations.
- */
- public void storeFlush() {
- }
-
- /**
* Indicates that a flow of records have been completed.
*/
public void performCleanup() {
}
public abstract void wipe(RepositorySessionWipeDelegate delegate);
/**
--- a/mobile/android/services/src/main/java/org/mozilla/gecko/sync/repositories/StoreTrackingRepositorySession.java
+++ b/mobile/android/services/src/main/java/org/mozilla/gecko/sync/repositories/StoreTrackingRepositorySession.java
@@ -56,19 +56,16 @@ public abstract class StoreTrackingRepos
return;
}
for (String guid : guids) {
this.storeTracker.untrackStoredForExclusion(guid);
}
}
public void trackRecord(Record record) {
-
- Logger.debug(LOG_TAG, "Tracking record " + record.guid +
- " (" + record.lastModified + ") to avoid re-upload.");
// Future: we care about the timestamp…
trackGUID(record.guid);
}
protected void untrackRecord(Record record) {
Logger.debug(LOG_TAG, "Un-tracking record " + record.guid + ".");
untrackGUID(record.guid);
}
--- a/mobile/android/services/src/main/java/org/mozilla/gecko/sync/repositories/VersioningDelegateHelper.java
+++ b/mobile/android/services/src/main/java/org/mozilla/gecko/sync/repositories/VersioningDelegateHelper.java
@@ -147,21 +147,16 @@ public class VersioningDelegateHelper {
}
@Override
public void onFetchCompleted() {
this.inner.onFetchCompleted();
}
@Override
- public void onBatchCompleted() {
- this.inner.onBatchCompleted();
- }
-
- @Override
public RepositorySessionFetchRecordsDelegate deferredFetchDelegate(ExecutorService executor) {
return this.inner.deferredFetchDelegate(executor);
}
}
private class VersionedStoreDelegate implements RepositorySessionStoreDelegate {
private final RepositorySessionStoreDelegate inner;
--- a/mobile/android/services/src/main/java/org/mozilla/gecko/sync/repositories/android/BookmarksValidationRepository.java
+++ b/mobile/android/services/src/main/java/org/mozilla/gecko/sync/repositories/android/BookmarksValidationRepository.java
@@ -149,19 +149,16 @@ public class BookmarksValidationReposito
@Override
public void onFetchCompleted() {
validateForTelemetry();
delegate.onFetchCompleted();
}
@Override
- public void onBatchCompleted() {}
-
- @Override
public RepositorySessionFetchRecordsDelegate deferredFetchDelegate(ExecutorService executor) {
return null;
}
});
}
@Override
public void fetch(String[] guids, RepositorySessionFetchRecordsDelegate delegate) throws InactiveSessionException {
--- a/mobile/android/services/src/main/java/org/mozilla/gecko/sync/repositories/delegates/DeferredRepositorySessionFetchRecordsDelegate.java
+++ b/mobile/android/services/src/main/java/org/mozilla/gecko/sync/repositories/delegates/DeferredRepositorySessionFetchRecordsDelegate.java
@@ -42,25 +42,15 @@ public class DeferredRepositorySessionFe
@Override
public void run() {
inner.onFetchCompleted();
}
});
}
@Override
- public void onBatchCompleted() {
- executor.execute(new Runnable() {
- @Override
- public void run() {
- inner.onBatchCompleted();
- }
- });
- }
-
- @Override
public RepositorySessionFetchRecordsDelegate deferredFetchDelegate(ExecutorService newExecutor) {
if (newExecutor == executor) {
return this;
}
throw new IllegalArgumentException("Can't re-defer this delegate.");
}
}
--- a/mobile/android/services/src/main/java/org/mozilla/gecko/sync/repositories/delegates/RepositorySessionFetchRecordsDelegate.java
+++ b/mobile/android/services/src/main/java/org/mozilla/gecko/sync/repositories/delegates/RepositorySessionFetchRecordsDelegate.java
@@ -9,25 +9,13 @@ import java.util.concurrent.ExecutorServ
import org.mozilla.gecko.sync.repositories.domain.Record;
public interface RepositorySessionFetchRecordsDelegate {
void onFetchFailed(Exception ex);
void onFetchedRecord(Record record);
/**
* Called when all records in this fetch have been returned.
- *
- * @param fetchEnd
- * A millisecond-resolution timestamp indicating the *remote* timestamp
- * at the end of the range of records. Usually this is the timestamp at
- * which the request was received.
- * E.g., the (normalized) value of the X-Weave-Timestamp header.
*/
void onFetchCompleted();
- /**
- * Called when a number of records have been returned but more are still expected to come,
- * possibly after a certain pause.
- */
- void onBatchCompleted();
-
RepositorySessionFetchRecordsDelegate deferredFetchDelegate(ExecutorService executor);
}
--- a/mobile/android/services/src/main/java/org/mozilla/gecko/sync/repositories/downloaders/BatchingDownloader.java
+++ b/mobile/android/services/src/main/java/org/mozilla/gecko/sync/repositories/downloaders/BatchingDownloader.java
@@ -242,29 +242,16 @@ public class BatchingDownloader {
Logger.warn(LOG_TAG, "Failed to update resume context while processing a batch.");
}
} else {
if (!BatchingDownloaderController.setInitialResumeContextAndCommit(this.stateProvider, offset, newer, sort)) {
Logger.warn(LOG_TAG, "Failed to set initial resume context while processing a batch.");
}
}
- // We need to make another batching request!
- // Let the delegate know that a batch fetch just completed before we proceed.
- // Beware that while this operation will run after every call to onFetchedRecord returned,
- // it's not guaranteed that the 'sink' session actually processed all of the fetched records.
- // See Bug https://bugzilla.mozilla.org/show_bug.cgi?id=1351673#c28 for details.
- runTaskOnQueue(new Runnable() {
- @Override
- public void run() {
- Logger.debug(LOG_TAG, "Running onBatchCompleted.");
- fetchRecordsDelegate.onBatchCompleted();
- }
- });
-
// Should we proceed, however? Do we have enough time?
if (!mayProceedWithBatching(fetchDeadline)) {
this.handleFetchFailed(fetchRecordsDelegate, new SyncDeadlineReachedException());
return;
}
// Create and execute new batch request.
try {
--- a/mobile/android/services/src/main/java/org/mozilla/gecko/sync/stage/BookmarksServerSyncStage.java
+++ b/mobile/android/services/src/main/java/org/mozilla/gecko/sync/stage/BookmarksServerSyncStage.java
@@ -4,18 +4,16 @@
package org.mozilla.gecko.sync.stage;
import android.net.Uri;
import java.net.URISyntaxException;
import org.mozilla.gecko.sync.MetaGlobalException;
-import org.mozilla.gecko.sync.middleware.BufferingMiddlewareRepository;
-import org.mozilla.gecko.sync.middleware.storage.MemoryBufferStorage;
import org.mozilla.gecko.sync.repositories.ConfigurableServer15Repository;
import org.mozilla.gecko.sync.repositories.RecordFactory;
import org.mozilla.gecko.sync.repositories.Repository;
import org.mozilla.gecko.sync.repositories.android.BrowserContractHelpers;
import org.mozilla.gecko.sync.repositories.android.BookmarksRepository;
import org.mozilla.gecko.sync.repositories.domain.BookmarkRecordFactory;
import org.mozilla.gecko.sync.repositories.domain.VersionConstants;
@@ -84,21 +82,17 @@ public class BookmarksServerSyncStage ex
getRepositoryStateProvider(),
false,
true
);
}
@Override
protected Repository getLocalRepository() {
- return new BufferingMiddlewareRepository(
- session.getSyncDeadline(),
- new MemoryBufferStorage(),
- new BookmarksRepository()
- );
+ return new BookmarksRepository();
}
@Override
protected RecordFactory getRecordFactory() {
return new BookmarkRecordFactory();
}
@Override
--- a/mobile/android/services/src/main/java/org/mozilla/gecko/sync/stage/FennecTabsServerSyncStage.java
+++ b/mobile/android/services/src/main/java/org/mozilla/gecko/sync/stage/FennecTabsServerSyncStage.java
@@ -1,16 +1,14 @@
/* This Source Code Form is subject to the terms of the Mozilla Public
* License, v. 2.0. If a copy of the MPL was not distributed with this
* file, You can obtain one at http://mozilla.org/MPL/2.0/. */
package org.mozilla.gecko.sync.stage;
-import org.mozilla.gecko.sync.middleware.BufferingMiddlewareRepository;
-import org.mozilla.gecko.sync.middleware.storage.MemoryBufferStorage;
import org.mozilla.gecko.sync.repositories.RecordFactory;
import org.mozilla.gecko.sync.repositories.Repository;
import org.mozilla.gecko.sync.repositories.android.FennecTabsRepository;
import org.mozilla.gecko.sync.repositories.domain.TabsRecordFactory;
import org.mozilla.gecko.sync.repositories.domain.VersionConstants;
public class FennecTabsServerSyncStage extends ServerSyncStage {
private static final String COLLECTION = "tabs";
@@ -27,20 +25,16 @@ public class FennecTabsServerSyncStage e
@Override
public Integer getStorageVersion() {
return VersionConstants.TABS_ENGINE_VERSION;
}
@Override
protected Repository getLocalRepository() {
- return new BufferingMiddlewareRepository(
- session.getSyncDeadline(),
- new MemoryBufferStorage(),
- new FennecTabsRepository(session.getClientsDelegate())
- );
+ return new FennecTabsRepository(session.getClientsDelegate());
}
@Override
protected RecordFactory getRecordFactory() {
return new TabsRecordFactory();
}
}
--- a/mobile/android/services/src/main/java/org/mozilla/gecko/sync/stage/FormHistoryServerSyncStage.java
+++ b/mobile/android/services/src/main/java/org/mozilla/gecko/sync/stage/FormHistoryServerSyncStage.java
@@ -2,20 +2,17 @@
* License, v. 2.0. If a copy of the MPL was not distributed with this
* file, You can obtain one at http://mozilla.org/MPL/2.0/. */
package org.mozilla.gecko.sync.stage;
import java.net.URISyntaxException;
import org.mozilla.gecko.sync.CryptoRecord;
-import org.mozilla.gecko.sync.middleware.BufferingMiddlewareRepository;
-import org.mozilla.gecko.sync.middleware.storage.MemoryBufferStorage;
import org.mozilla.gecko.sync.repositories.ConfigurableServer15Repository;
-import org.mozilla.gecko.sync.repositories.NonPersistentRepositoryStateProvider;
import org.mozilla.gecko.sync.repositories.RecordFactory;
import org.mozilla.gecko.sync.repositories.Repository;
import org.mozilla.gecko.sync.repositories.android.FormHistoryRepositorySession;
import org.mozilla.gecko.sync.repositories.domain.FormHistoryRecord;
import org.mozilla.gecko.sync.repositories.domain.Record;
import org.mozilla.gecko.sync.repositories.domain.VersionConstants;
public class FormHistoryServerSyncStage extends ServerSyncStage {
@@ -78,21 +75,17 @@ public class FormHistoryServerSyncStage
getRepositoryStateProvider(),
false,
false
);
}
@Override
protected Repository getLocalRepository() {
- return new BufferingMiddlewareRepository(
- session.getSyncDeadline(),
- new MemoryBufferStorage(),
- new FormHistoryRepositorySession.FormHistoryRepository()
- );
+ return new FormHistoryRepositorySession.FormHistoryRepository();
}
public static final class FormHistoryRecordFactory extends RecordFactory {
@Override
public Record createRecord(Record record) {
FormHistoryRecord r = new FormHistoryRecord();
r.initFromEnvelope((CryptoRecord) record);
--- a/mobile/android/services/src/main/java/org/mozilla/gecko/sync/stage/HistoryServerSyncStage.java
+++ b/mobile/android/services/src/main/java/org/mozilla/gecko/sync/stage/HistoryServerSyncStage.java
@@ -11,17 +11,25 @@ import org.mozilla.gecko.sync.repositori
import org.mozilla.gecko.sync.repositories.PersistentRepositoryStateProvider;
import org.mozilla.gecko.sync.repositories.RecordFactory;
import org.mozilla.gecko.sync.repositories.Repository;
import org.mozilla.gecko.sync.repositories.RepositoryStateProvider;
import org.mozilla.gecko.sync.repositories.android.HistoryRepository;
import org.mozilla.gecko.sync.repositories.domain.HistoryRecordFactory;
import org.mozilla.gecko.sync.repositories.domain.VersionConstants;
-public class HistoryServerSyncStage extends ServerSyncStage {
+/**
+ * History records are not buffered.
+ We buy little from buffering it from data integrity point of view, but we gain a lot by
+ not buffering:
+ - roughly half the memory footprint, important when dealing with lots of history
+ - ability to resume downloads, since records are stored as they're fetched, and we can
+ -- maintain a "high watermark"
+ */
+public class HistoryServerSyncStage extends NonBufferingServerSyncStage {
protected static final String LOG_TAG = "HistoryStage";
// Eventually this kind of sync stage will be data-driven,
// and all this hard-coding can go away.
private static final String HISTORY_SORT = "oldest";
private static final long HISTORY_BATCH_LIMIT = 500;
@Override
new file mode 100644
--- /dev/null
+++ b/mobile/android/services/src/main/java/org/mozilla/gecko/sync/stage/NonBufferingServerSyncStage.java
@@ -0,0 +1,15 @@
+/* This Source Code Form is subject to the terms of the Mozilla Public
+ * License, v. 2.0. If a copy of the MPL was not distributed with this
+ * file, You can obtain one at http://mozilla.org/MPL/2.0/. */
+
+package org.mozilla.gecko.sync.stage;
+
+import org.mozilla.gecko.sync.synchronizer.NonBufferingSynchronizer;
+import org.mozilla.gecko.sync.synchronizer.Synchronizer;
+
+public abstract class NonBufferingServerSyncStage extends ServerSyncStage {
+ @Override
+ protected Synchronizer getSynchronizer() {
+ return new NonBufferingSynchronizer();
+ }
+}
--- a/mobile/android/services/src/main/java/org/mozilla/gecko/sync/stage/PasswordsServerSyncStage.java
+++ b/mobile/android/services/src/main/java/org/mozilla/gecko/sync/stage/PasswordsServerSyncStage.java
@@ -1,16 +1,14 @@
/* This Source Code Form is subject to the terms of the Mozilla Public
* License, v. 2.0. If a copy of the MPL was not distributed with this
* file, You can obtain one at http://mozilla.org/MPL/2.0/. */
package org.mozilla.gecko.sync.stage;
-import org.mozilla.gecko.sync.middleware.BufferingMiddlewareRepository;
-import org.mozilla.gecko.sync.middleware.storage.MemoryBufferStorage;
import org.mozilla.gecko.sync.repositories.RecordFactory;
import org.mozilla.gecko.sync.repositories.Repository;
import org.mozilla.gecko.sync.repositories.android.PasswordsRepositorySession;
import org.mozilla.gecko.sync.repositories.domain.PasswordRecordFactory;
import org.mozilla.gecko.sync.repositories.domain.VersionConstants;
public class PasswordsServerSyncStage extends ServerSyncStage {
@Override
@@ -25,20 +23,16 @@ public class PasswordsServerSyncStage ex
@Override
public Integer getStorageVersion() {
return VersionConstants.PASSWORDS_ENGINE_VERSION;
}
@Override
protected Repository getLocalRepository() {
- return new BufferingMiddlewareRepository(
- session.getSyncDeadline(),
- new MemoryBufferStorage(),
- new PasswordsRepositorySession.PasswordsRepository()
- );
+ return new PasswordsRepositorySession.PasswordsRepository();
}
@Override
protected RecordFactory getRecordFactory() {
return new PasswordRecordFactory();
}
}
--- a/mobile/android/services/src/main/java/org/mozilla/gecko/sync/stage/RecentHistoryServerSyncStage.java
+++ b/mobile/android/services/src/main/java/org/mozilla/gecko/sync/stage/RecentHistoryServerSyncStage.java
@@ -2,18 +2,16 @@
* License, v. 2.0. If a copy of the MPL was not distributed with this
* file, You can obtain one at http://mozilla.org/MPL/2.0/. */
package org.mozilla.gecko.sync.stage;
import org.mozilla.gecko.sync.MetaGlobalException;
import org.mozilla.gecko.sync.NonObjectJSONException;
import org.mozilla.gecko.sync.SynchronizerConfiguration;
-import org.mozilla.gecko.sync.middleware.BufferingMiddlewareRepository;
-import org.mozilla.gecko.sync.middleware.storage.MemoryBufferStorage;
import org.mozilla.gecko.sync.repositories.ConfigurableServer15Repository;
import org.mozilla.gecko.sync.repositories.NonPersistentRepositoryStateProvider;
import org.mozilla.gecko.sync.repositories.Repository;
import org.mozilla.gecko.sync.repositories.RepositoryStateProvider;
import org.mozilla.gecko.sync.repositories.android.HistoryRepository;
import java.io.IOException;
import java.net.URISyntaxException;
@@ -69,21 +67,17 @@ public class RecentHistoryServerSyncStag
*/
@Override
protected HighWaterMark getAllowedToUseHighWaterMark() {
return HighWaterMark.Disabled;
}
@Override
protected Repository getLocalRepository() {
- return new BufferingMiddlewareRepository(
- session.getSyncDeadline(),
- new MemoryBufferStorage(),
- new HistoryRepository()
- );
+ return new HistoryRepository();
}
@Override
protected Repository getRemoteRepository() throws URISyntaxException {
return new ConfigurableServer15Repository(
getCollection(),
session.getSyncDeadline(),
session.config.storageURL(),
--- a/mobile/android/services/src/main/java/org/mozilla/gecko/sync/stage/ServerSyncStage.java
+++ b/mobile/android/services/src/main/java/org/mozilla/gecko/sync/stage/ServerSyncStage.java
@@ -53,18 +53,18 @@ import java.util.concurrent.ExecutorServ
*
* @author rnewman
*
*/
public abstract class ServerSyncStage extends AbstractSessionManagingSyncStage implements SynchronizerDelegate {
protected static final String LOG_TAG = "ServerSyncStage";
- protected long stageStartTimestamp = -1;
- protected long stageCompleteTimestamp = -1;
+ private long stageStartTimestamp = -1;
+ private long stageCompleteTimestamp = -1;
/**
* Poor-man's boolean typing.
* These enums are used to configure {@link org.mozilla.gecko.sync.repositories.ConfigurableServer15Repository}.
*/
public enum HighWaterMark {
Enabled,
Disabled
@@ -118,31 +118,31 @@ public abstract class ServerSyncStage ex
* to be updated.
*
* @param enabledInMetaGlobal
* boolean of engine sync state in meta/global
* @throws MetaGlobalException
* if engine sync state has been changed in Sync Settings, with new
* engine sync state.
*/
- protected void checkAndUpdateUserSelectedEngines(boolean enabledInMetaGlobal) throws MetaGlobalException {
+ private void checkAndUpdateUserSelectedEngines(boolean enabledInMetaGlobal) throws MetaGlobalException {
Map<String, Boolean> selectedEngines = session.config.userSelectedEngines;
String thisEngine = this.getEngineName();
if (selectedEngines != null && selectedEngines.containsKey(thisEngine)) {
boolean enabledInSelection = selectedEngines.get(thisEngine);
if (enabledInMetaGlobal != enabledInSelection) {
// Engine enable state has been changed by the user.
Logger.debug(LOG_TAG, "Engine state has been changed by user. Throwing exception.");
throw new MetaGlobalException.MetaGlobalEngineStateChangedException(enabledInSelection);
}
}
}
- protected EngineSettings getEngineSettings() throws NonObjectJSONException, IOException {
+ private EngineSettings getEngineSettings() throws NonObjectJSONException, IOException {
Integer version = getStorageVersion();
if (version == null) {
Logger.warn(LOG_TAG, "null storage version for " + this + "; using version 0.");
version = 0;
}
SynchronizerConfiguration config = this.getConfig();
if (config == null) {
@@ -209,39 +209,43 @@ public abstract class ServerSyncStage ex
cryptoRepo.recordFactory = getRecordFactory();
return cryptoRepo;
}
protected String bundlePrefix() {
return this.getCollection() + ".";
}
- protected String statePreferencesPrefix() {
+ /* package-private */ String statePreferencesPrefix() {
return this.getCollection() + ".state.";
}
protected SynchronizerConfiguration getConfig() throws NonObjectJSONException, IOException {
return new SynchronizerConfiguration(session.config.getBranch(bundlePrefix()));
}
- protected void persistConfig(SynchronizerConfiguration synchronizerConfiguration) {
+ private void persistConfig(SynchronizerConfiguration synchronizerConfiguration) {
synchronizerConfiguration.persist(session.config.getBranch(bundlePrefix()));
}
- public Synchronizer getConfiguredSynchronizer(GlobalSession session) throws NoCollectionKeysSetException, URISyntaxException, NonObjectJSONException, IOException {
+ private Synchronizer getConfiguredSynchronizer() throws NoCollectionKeysSetException, URISyntaxException, NonObjectJSONException, IOException {
Repository remote = wrappedServerRepo();
- Synchronizer synchronizer = new Synchronizer();
+ Synchronizer synchronizer = getSynchronizer();
synchronizer.repositoryA = remote;
synchronizer.repositoryB = this.getLocalRepository();
synchronizer.load(getConfig());
return synchronizer;
}
+ protected Synchronizer getSynchronizer() {
+ return new Synchronizer();
+ }
+
/**
* Reset timestamps and any repository state.
*/
@Override
protected void resetLocal() {
resetLocalWithSyncID(null);
if (!getRepositoryStateProvider().resetAndCommit()) {
// At the very least, we can log this.
@@ -254,17 +258,17 @@ public abstract class ServerSyncStage ex
Logger.warn(LOG_TAG, "Failed to reset repository state");
}
}
/**
* Reset timestamps and possibly set syncID.
* @param syncID if non-null, new syncID to persist.
*/
- protected void resetLocalWithSyncID(String syncID) {
+ private void resetLocalWithSyncID(String syncID) {
// Clear both timestamps.
SynchronizerConfiguration config;
try {
config = this.getConfig();
} catch (Exception e) {
Logger.warn(LOG_TAG, "Unable to reset " + this + ": fetching config failed.", e);
return;
}
@@ -276,18 +280,18 @@ public abstract class ServerSyncStage ex
config.localBundle.setTimestamp(0L);
config.remoteBundle.setTimestamp(0L);
persistConfig(config);
Logger.info(LOG_TAG, "Reset timestamps for " + this);
}
// Not thread-safe. Use with caution.
private static final class WipeWaiter {
- public boolean sessionSucceeded = true;
- public boolean wipeSucceeded = true;
+ /* package-private */ boolean sessionSucceeded = true;
+ /* package-private */ boolean wipeSucceeded = true;
public Exception error;
public void notify(Exception e, boolean sessionSucceeded) {
this.sessionSucceeded = sessionSucceeded;
this.wipeSucceeded = false;
this.error = e;
this.notify();
}
@@ -405,17 +409,17 @@ public abstract class ServerSyncStage ex
}
Logger.info(LOG_TAG, "Wiping stage complete.");
}
/**
* Asynchronously wipe collection on server.
*/
- protected void wipeServer(final AuthHeaderProvider authHeaderProvider, final WipeServerDelegate wipeDelegate) {
+ private void wipeServer(final AuthHeaderProvider authHeaderProvider, final WipeServerDelegate wipeDelegate) {
SyncStorageRequest request;
try {
request = new SyncStorageRequest(session.config.collectionURI(getCollection()));
} catch (URISyntaxException ex) {
Logger.warn(LOG_TAG, "Invalid URI in wipeServer.");
wipeDelegate.onWipeFailed(ex);
return;
@@ -459,17 +463,17 @@ public abstract class ServerSyncStage ex
request.delete();
}
/**
* Synchronously wipe the server.
* <p>
* Logs and re-throws an exception on failure.
*/
- public void wipeServer(final GlobalSession session) throws Exception {
+ private void wipeServer(final GlobalSession session) throws Exception {
this.session = session;
final WipeWaiter monitor = new WipeWaiter();
final Runnable doWipe = new Runnable() {
@Override
public void run() {
wipeServer(session.getAuthHeaderProvider(), new WipeServerDelegate() {
@@ -575,17 +579,17 @@ public abstract class ServerSyncStage ex
}
} catch (MetaGlobalException e) {
session.abort(e, "Inappropriate meta/global; refusing to execute " + name + " stage.");
return;
}
Synchronizer synchronizer;
try {
- synchronizer = this.getConfiguredSynchronizer(session);
+ synchronizer = this.getConfiguredSynchronizer();
} catch (NoCollectionKeysSetException e) {
session.abort(e, "No CollectionKeys.");
return;
} catch (URISyntaxException e) {
session.abort(e, "Invalid URI syntax for server repository.");
return;
} catch (NonObjectJSONException | IOException e) {
session.abort(e, "Invalid persisted JSON for config.");
@@ -597,17 +601,17 @@ public abstract class ServerSyncStage ex
Logger.debug(LOG_TAG, "Reached end of execute.");
}
/**
* Express the duration taken by this stage as a String, like "0.56 seconds".
*
* @return formatted string.
*/
- protected String getStageDurationString() {
+ private String getStageDurationString() {
return Utils.formatDuration(stageStartTimestamp, stageCompleteTimestamp);
}
/**
* We synced this engine! Persist timestamps and advance the session.
*
* @param synchronizer the <code>Synchronizer</code> that succeeded.
*/
deleted file mode 100644
--- a/mobile/android/services/src/main/java/org/mozilla/gecko/sync/synchronizer/ConcurrentRecordConsumer.java
+++ /dev/null
@@ -1,126 +0,0 @@
-/* This Source Code Form is subject to the terms of the Mozilla Public
- * License, v. 2.0. If a copy of the MPL was not distributed with this
- * file, You can obtain one at http://mozilla.org/MPL/2.0/. */
-
-package org.mozilla.gecko.sync.synchronizer;
-
-import org.mozilla.gecko.background.common.log.Logger;
-import org.mozilla.gecko.sync.repositories.domain.Record;
-
-/**
- * Consume records from a queue inside a RecordsChannel, as fast as we can.
- * TODO: rewrite this in terms of an ExecutorService and a CompletionService.
- * See Bug 713483.
- *
- * @author rnewman
- *
- */
-class ConcurrentRecordConsumer extends RecordConsumer {
- private static final String LOG_TAG = "CRecordConsumer";
-
- /**
- * When this is true and all records have been processed, the consumer
- * will notify its delegate.
- */
- protected boolean allRecordsQueued = false;
- private long counter = 0;
-
- public ConcurrentRecordConsumer(RecordsConsumerDelegate delegate) {
- this.delegate = delegate;
- }
-
- private final Object monitor = new Object();
- @Override
- public void doNotify() {
- synchronized (monitor) {
- monitor.notify();
- }
- }
-
- @Override
- public void queueFilled() {
- Logger.debug(LOG_TAG, "Queue filled.");
- synchronized (monitor) {
- this.allRecordsQueued = true;
- monitor.notify();
- }
- }
-
- @Override
- public void halt() {
- synchronized (monitor) {
- this.stopImmediately = true;
- monitor.notify();
- }
- }
-
- private final Object countMonitor = new Object();
- @Override
- public void stored() {
- Logger.trace(LOG_TAG, "Record stored. Notifying.");
- synchronized (countMonitor) {
- counter++;
- }
- }
-
- private void consumerIsDone() {
- Logger.debug(LOG_TAG, "Consumer is done. Processed " + counter + ((counter == 1) ? " record." : " records."));
- if (allRecordsQueued) {
- delegate.consumerIsDoneFull();
- } else {
- delegate.consumerIsDonePartial();
- }
- }
-
- @Override
- public void run() {
- Record record;
-
- while (true) {
- // The queue is concurrent-safe.
- while ((record = delegate.getQueue().poll()) != null) {
- synchronized (monitor) {
- Logger.trace(LOG_TAG, "run() took monitor.");
- if (stopImmediately) {
- Logger.debug(LOG_TAG, "Stopping immediately. Clearing queue.");
- delegate.getQueue().clear();
- Logger.debug(LOG_TAG, "Notifying consumer.");
- consumerIsDone();
- return;
- }
- Logger.debug(LOG_TAG, "run() dropped monitor.");
- }
-
- Logger.trace(LOG_TAG, "Storing record with guid " + record.guid + ".");
- try {
- delegate.store(record);
- } catch (Exception e) {
- // TODO: Bug 709371: track records that failed to apply.
- Logger.error(LOG_TAG, "Caught error in store.", e);
- }
- Logger.trace(LOG_TAG, "Done with record.");
- }
- synchronized (monitor) {
- Logger.trace(LOG_TAG, "run() took monitor.");
-
- if (allRecordsQueued) {
- Logger.debug(LOG_TAG, "Done with records and no more to come. Notifying consumerIsDone.");
- consumerIsDone();
- return;
- }
- if (stopImmediately) {
- Logger.debug(LOG_TAG, "Done with records and told to stop immediately. Notifying consumerIsDone.");
- consumerIsDone();
- return;
- }
- try {
- Logger.debug(LOG_TAG, "Not told to stop but no records. Waiting.");
- monitor.wait(10000);
- } catch (InterruptedException e) {
- // TODO
- }
- Logger.trace(LOG_TAG, "run() dropped monitor.");
- }
- }
- }
-}
new file mode 100644
--- /dev/null
+++ b/mobile/android/services/src/main/java/org/mozilla/gecko/sync/synchronizer/NonBufferingRecordsChannel.java
@@ -0,0 +1,59 @@
+/* This Source Code Form is subject to the terms of the Mozilla Public
+ * License, v. 2.0. If a copy of the MPL was not distributed with this
+ * file, You can obtain one at http://mozilla.org/MPL/2.0/. */
+
+package org.mozilla.gecko.sync.synchronizer;
+
+import org.mozilla.gecko.background.common.log.Logger;
+import org.mozilla.gecko.sync.repositories.NoStoreDelegateException;
+import org.mozilla.gecko.sync.repositories.RepositorySession;
+import org.mozilla.gecko.sync.repositories.domain.Record;
+
+/**
+ * Same as a regular RecordsChannel, except records aren't buffered and are stored when encountered.
+ */
+public class NonBufferingRecordsChannel extends RecordsChannel {
+ private static final String LOG_TAG = "NonBufferingRecordsChannel";
+
+ public NonBufferingRecordsChannel(RepositorySession source, RepositorySession sink, RecordsChannelDelegate delegate) {
+ super(source, sink, delegate);
+ }
+
+ @Override
+ public void onFetchedRecord(Record record) {
+ // Don't bother trying to store if we already failed.
+ if (fetchFailed.get()) {
+ return;
+ }
+
+ fetchedCount.incrementAndGet();
+ storeAttemptedCount.incrementAndGet();
+
+ try {
+ sink.store(record);
+ } catch (NoStoreDelegateException e) {
+ // Must not happen, bail out.
+ throw new IllegalStateException(e);
+ }
+ }
+
+ @Override
+ public void onFetchFailed(Exception ex) {
+ // Let non-buffered sessions clean-up their internal state.
+ sink.storeIncomplete();
+ super.onFetchFailed(ex);
+ }
+
+ @Override
+ public void onFetchCompleted() {
+ // If we already failed, the flow has been finished via onFetchFailed,
+ // yet our delegatee might have kept going.
+ if (fetchFailed.get()) {
+ return;
+ }
+
+ // Now we wait for onStoreComplete
+ Logger.trace(LOG_TAG, "onFetchCompleted. Calling storeDone.");
+ sink.storeDone();
+ }
+}
new file mode 100644
--- /dev/null
+++ b/mobile/android/services/src/main/java/org/mozilla/gecko/sync/synchronizer/NonBufferingSynchronizer.java
@@ -0,0 +1,12 @@
+/* This Source Code Form is subject to the terms of the Mozilla Public
+ * License, v. 2.0. If a copy of the MPL was not distributed with this
+ * file, You can obtain one at http://mozilla.org/MPL/2.0/. */
+
+package org.mozilla.gecko.sync.synchronizer;
+
+public class NonBufferingSynchronizer extends Synchronizer {
+ @Override
+ protected SynchronizerSession newSynchronizerSession() {
+ return new NonBufferingSynchronizerSession(this, this);
+ }
+}
new file mode 100644
--- /dev/null
+++ b/mobile/android/services/src/main/java/org/mozilla/gecko/sync/synchronizer/NonBufferingSynchronizerSession.java
@@ -0,0 +1,18 @@
+/* This Source Code Form is subject to the terms of the Mozilla Public
+ * License, v. 2.0. If a copy of the MPL was not distributed with this
+ * file, You can obtain one at http://mozilla.org/MPL/2.0/. */
+
+package org.mozilla.gecko.sync.synchronizer;
+
+import org.mozilla.gecko.sync.repositories.RepositorySession;
+
+public class NonBufferingSynchronizerSession extends SynchronizerSession {
+ /* package-private */ NonBufferingSynchronizerSession(Synchronizer synchronizer, SynchronizerSessionDelegate delegate) {
+ super(synchronizer, delegate);
+ }
+
+ @Override
+ protected RecordsChannel getRecordsChannel(RepositorySession sink, RepositorySession source, RecordsChannelDelegate delegate) {
+ return new NonBufferingRecordsChannel(sink, source, delegate);
+ }
+}
--- a/mobile/android/services/src/main/java/org/mozilla/gecko/sync/synchronizer/RecordsChannel.java
+++ b/mobile/android/services/src/main/java/org/mozilla/gecko/sync/synchronizer/RecordsChannel.java
@@ -3,34 +3,34 @@
* file, You can obtain one at http://mozilla.org/MPL/2.0/. */
package org.mozilla.gecko.sync.synchronizer;
import android.support.annotation.NonNull;
import android.support.annotation.Nullable;
import java.util.ArrayList;
-import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ExecutorService;
+import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import org.mozilla.gecko.background.common.log.Logger;
import org.mozilla.gecko.sync.ReflowIsNecessaryException;
import org.mozilla.gecko.sync.SyncException;
-import org.mozilla.gecko.sync.ThreadPool;
import org.mozilla.gecko.sync.repositories.InvalidSessionTransitionException;
import org.mozilla.gecko.sync.repositories.NoStoreDelegateException;
import org.mozilla.gecko.sync.repositories.RepositorySession;
import org.mozilla.gecko.sync.repositories.delegates.DeferredRepositorySessionStoreDelegate;
import org.mozilla.gecko.sync.repositories.delegates.RepositorySessionFetchRecordsDelegate;
import org.mozilla.gecko.sync.repositories.delegates.RepositorySessionStoreDelegate;
import org.mozilla.gecko.sync.repositories.domain.Record;
/**
- * Pulls records from `source`, applying them to `sink`.
+ * Pulls records from `source`, buffering them and then applying them to `sink` when
+ * all records have been pulled.
* Notifies its delegate of errors and completion.
*
* All stores (initiated by a fetch) must have been completed before storeDone
* is invoked on the sink. This is to avoid the existing stored items being
* considered as the total set, with onStoreCompleted being called when they're
* done:
*
* store(A) store(B)
@@ -42,128 +42,101 @@ import org.mozilla.gecko.sync.repositori
* Storing of C complete.
* We're done! Call onStoreCompleted.
* store(B) finishes... uh oh.
*
* In other words, storeDone must be gated on the synchronous invocation of every store.
*
* Similarly, we require that every store callback have returned before onStoreCompleted is invoked.
*
- * This whole set of guarantees should be achievable thusly:
- *
- * * The fetch process must run in a single thread, and invoke store()
- * synchronously. After processing every incoming record, storeDone is called,
- * setting a flag.
- * If the fetch cannot be implicitly queued, it must be explicitly queued.
- * In this implementation, we assume that fetch callbacks are strictly ordered in this way.
- *
- * * The store process must be (implicitly or explicitly) queued. When the
- * queue empties, the consumer checks the storeDone flag. If it's set, and the
- * queue is exhausted, invoke onStoreCompleted.
- *
* RecordsChannel exists to enforce this ordering of operations.
*
* @author rnewman
*
*/
public class RecordsChannel implements
RepositorySessionFetchRecordsDelegate,
- RepositorySessionStoreDelegate,
- RecordsConsumerDelegate {
+ RepositorySessionStoreDelegate {
private static final String LOG_TAG = "RecordsChannel";
public RepositorySession source;
- private RepositorySession sink;
+ /* package-private */ RepositorySession sink;
private final RecordsChannelDelegate delegate;
private volatile ReflowIsNecessaryException reflowException;
- private final AtomicInteger fetchedCount = new AtomicInteger();
- private final AtomicInteger fetchFailedCount = new AtomicInteger();
+ /* package-private */ final AtomicInteger fetchedCount = new AtomicInteger(0);
+ final AtomicBoolean fetchFailed = new AtomicBoolean(false);
+ private final AtomicBoolean storeFailed = new AtomicBoolean(false);
+
+ private ArrayList<Record> toProcess = new ArrayList<>();
// Expected value relationships:
// attempted = accepted + failed
// reconciled <= accepted <= attempted
// reconciled = accepted - `new`, where `new` is inferred.
- private final AtomicInteger storeAttemptedCount = new AtomicInteger();
- private final AtomicInteger storeAcceptedCount = new AtomicInteger();
- private final AtomicInteger storeFailedCount = new AtomicInteger();
- private final AtomicInteger storeReconciledCount = new AtomicInteger();
+ // TODO these likely don't need to be Atomic, see Bug 1441281.
+ final AtomicInteger storeAttemptedCount = new AtomicInteger(0);
+ private final AtomicInteger storeAcceptedCount = new AtomicInteger(0);
+ private final AtomicInteger storeFailedCount = new AtomicInteger(0);
+ private final AtomicInteger storeReconciledCount = new AtomicInteger(0);
private final StoreBatchTracker storeTracker = new StoreBatchTracker();
public RecordsChannel(RepositorySession source, RepositorySession sink, RecordsChannelDelegate delegate) {
- this.source = source;
- this.sink = sink;
- this.delegate = delegate;
- }
-
- /*
- * We push fetched records into a queue.
- * A separate thread is waiting for us to notify it of work to do.
- * When we tell it to stop, it'll stop. We do that when the fetch
- * is completed.
- * When it stops, we tell the sink that there are no more records,
- * and wait for the sink to tell us that storing is done.
- * Then we notify our delegate of completion.
- */
- private RecordConsumer consumer;
- private volatile boolean waitingForQueueDone = false;
- private final ConcurrentLinkedQueue<Record> toProcess = new ConcurrentLinkedQueue<Record>();
-
- @Override
- public ConcurrentLinkedQueue<Record> getQueue() {
- return toProcess;
+ this.source = source;
+ this.sink = sink;
+ this.delegate = delegate;
}
protected boolean isReady() {
return source.isActive() && sink.isActive();
}
/**
* Get the number of records fetched so far.
*
* @return number of fetches.
*/
- public int getFetchCount() {
+ /* package-private */ int getFetchCount() {
return fetchedCount.get();
}
/**
* Get the number of fetch failures recorded so far.
*
* @return number of fetch failures.
*/
- public int getFetchFailureCount() {
- return fetchFailedCount.get();
+ public boolean didFetchFail() {
+ return fetchFailed.get();
}
/**
* Get the number of store attempts (successful or not) so far.
*
* @return number of stores attempted.
*/
public int getStoreAttemptedCount() {
return storeAttemptedCount.get();
}
- public int getStoreAcceptedCount() {
+ /* package-private */ int getStoreAcceptedCount() {
return storeAcceptedCount.get();
}
/**
* Get the number of store failures recorded so far.
*
* @return number of store failures.
*/
public int getStoreFailureCount() {
return storeFailedCount.get();
}
- public int getStoreReconciledCount() {
+ /* package-private */ int getStoreReconciledCount() {
return storeReconciledCount.get();
}
/**
* Start records flowing through the channel.
*/
public void flow() {
if (!isReady()) {
@@ -177,27 +150,17 @@ public class RecordsChannel implements
if (!source.dataAvailable()) {
Logger.info(LOG_TAG, "No data available: short-circuiting flow from source " + source);
this.delegate.onFlowCompleted(this);
return;
}
sink.setStoreDelegate(this);
- fetchedCount.set(0);
- fetchFailedCount.set(0);
- storeAttemptedCount.set(0);
- storeAcceptedCount.set(0);
- storeFailedCount.set(0);
- storeReconciledCount.set(0);
storeTracker.reset();
- // Start a consumer thread.
- this.consumer = new ConcurrentRecordConsumer(this);
- ThreadPool.run(this.consumer);
- waitingForQueueDone = true;
source.fetchModified(this);
}
/**
* Begin both sessions, invoking flow() when done.
* @throws InvalidSessionTransitionException
*/
public void beginAndFlow() throws InvalidSessionTransitionException {
@@ -209,131 +172,124 @@ public class RecordsChannel implements
} catch (SyncException e) {
delegate.onFlowBeginFailed(this, e);
return;
}
this.flow();
}
- @Override
- public void store(Record record) {
- storeAttemptedCount.incrementAndGet();
- storeTracker.onRecordStoreAttempted();
- try {
- sink.store(record);
- } catch (NoStoreDelegateException e) {
- Logger.error(LOG_TAG, "Got NoStoreDelegateException in RecordsChannel.store(). This should not occur. Aborting.", e);
- delegate.onFlowStoreFailed(this, e, record.guid);
- }
- }
-
/* package-local */ ArrayList<StoreBatchTracker.Batch> getStoreBatches() {
return this.storeTracker.getStoreBatches();
}
@Override
public void onFetchFailed(Exception ex) {
Logger.warn(LOG_TAG, "onFetchFailed. Calling for immediate stop.", ex);
- fetchFailedCount.incrementAndGet();
+ if (fetchFailed.getAndSet(true)) {
+ return;
+ }
+
if (ex instanceof ReflowIsNecessaryException) {
setReflowException((ReflowIsNecessaryException) ex);
}
+
delegate.onFlowFetchFailed(this, ex);
- // Sink will be informed once consumer finishes.
- this.consumer.halt();
+
+ // We haven't tried storing anything yet, so fine to short-circuit around storeDone.
+ delegate.onFlowCompleted(this);
}
@Override
public void onFetchedRecord(Record record) {
- fetchedCount.incrementAndGet();
+ // Don't bother if we've already failed; we'll just ignore these records later on.
+ if (fetchFailed.get()) {
+ return;
+ }
this.toProcess.add(record);
- this.consumer.doNotify();
}
@Override
public void onFetchCompleted() {
- Logger.trace(LOG_TAG, "onFetchCompleted. Stopping consumer once stores are done.");
- this.consumer.queueFilled();
- }
+ if (fetchFailed.get()) {
+ return;
+ }
+
+ fetchedCount.set(toProcess.size());
+
+ Logger.info(LOG_TAG, "onFetchCompleted. Fetched " + fetchedCount.get() + " records. Storing...");
- @Override
- public void onBatchCompleted() {
- this.sink.storeFlush();
+ try {
+ for (Record record : toProcess) {
+ storeAttemptedCount.incrementAndGet();
+ storeTracker.onRecordStoreAttempted();
+ sink.store(record);
+ }
+ } catch (NoStoreDelegateException e) {
+ // Must not happen, bail out.
+ throw new IllegalStateException(e);
+ }
+
+ // Allow this buffer to be reclaimed.
+ toProcess = null;
+
+ // Now we wait for onStoreComplete
+ Logger.trace(LOG_TAG, "onFetchCompleted. Calling storeDone.");
+ sink.storeDone();
}
// Sent for "store" batches.
@Override
public void onBatchCommitted() {
storeTracker.onBatchFinished();
}
@Override
public void onRecordStoreFailed(Exception ex, String recordGuid) {
Logger.trace(LOG_TAG, "Failed to store record with guid " + recordGuid);
storeFailedCount.incrementAndGet();
storeTracker.onRecordStoreFailed();
- this.consumer.stored();
delegate.onFlowStoreFailed(this, ex, recordGuid);
- // TODO: abort?
}
@Override
public void onRecordStoreSucceeded(String guid) {
- Logger.trace(LOG_TAG, "Stored record with guid " + guid);
storeAcceptedCount.incrementAndGet();
storeTracker.onRecordStoreSucceeded();
- this.consumer.stored();
}
@Override
public void onRecordStoreReconciled(String guid, String oldGuid, Integer newVersion) {
- Logger.trace(LOG_TAG, "Reconciled record with guid " + guid);
storeReconciledCount.incrementAndGet();
}
@Override
- public void consumerIsDoneFull() {
- Logger.trace(LOG_TAG, "Consumer is done, processed all records. Are we waiting for it? " + waitingForQueueDone);
- if (waitingForQueueDone) {
- waitingForQueueDone = false;
-
- // Now we'll be waiting for sink to call its delegate's onStoreCompleted or onStoreFailed.
- this.sink.storeDone();
- }
- }
-
- @Override
- public void consumerIsDonePartial() {
- Logger.trace(LOG_TAG, "Consumer is done, processed some records. Are we waiting for it? " + waitingForQueueDone);
- if (waitingForQueueDone) {
- waitingForQueueDone = false;
-
- // Let sink clean up or flush records if necessary.
- this.sink.storeIncomplete();
-
- delegate.onFlowCompleted(this);
- }
- }
-
- @Override
public void onStoreCompleted() {
- Logger.trace(LOG_TAG, "onStoreCompleted. Notifying delegate of onFlowCompleted.");
+ Logger.info(LOG_TAG, "Performing source cleanup.");
// Source might have used caches used to facilitate flow of records, so now is a good
// time to clean up. Particularly pertinent for buffered sources.
// Rephrasing this in a more concrete way, buffers are cleared only once records have been merged
// locally and results of the merge have been uploaded to the server successfully.
this.source.performCleanup();
+
+ if (storeFailed.get()) {
+ return;
+ }
+
+ Logger.info(LOG_TAG, "onStoreCompleted. Attempted to store " + storeAttemptedCount.get() + " records; Store accepted " + storeAcceptedCount.get() + ", reconciled " + storeReconciledCount.get() + ", failed " + storeFailedCount.get());
delegate.onFlowCompleted(this);
-
}
@Override
public void onStoreFailed(Exception ex) {
- Logger.warn(LOG_TAG, "onStoreFailed. Calling for immediate stop.", ex);
+ if (storeFailed.getAndSet(true)) {
+ return;
+ }
+
+ Logger.info(LOG_TAG, "onStoreFailed. Calling for immediate stop.", ex);
if (ex instanceof ReflowIsNecessaryException) {
setReflowException((ReflowIsNecessaryException) ex);
}
storeTracker.onBatchFailed();
// NB: consumer might or might not be running at this point. There are two cases here:
// 1) If we're storing records remotely, we might fail due to a 412.
@@ -342,23 +298,16 @@ public class RecordsChannel implements
// that we're done with this flow. Based on the reflow exception, it'll determine what to do.
// 2) If we're storing (merging) records locally, we might fail due to a sync deadline.
// -- we might hit a deadline only prior to attempting to merge records,
// -- at which point consumer would have finished already, and storeDone was called.
// Action: consumer state is known (done), so we can ignore it safely and inform our delegate
// that we're done.
- // Prevent "once consumer is done..." actions from taking place. They already have (case 2), or
- // we don't need them (case 1).
- waitingForQueueDone = false;
-
- // If consumer is still going at it, tell it to stop.
- this.consumer.halt();
-
delegate.onFlowStoreFailed(this, ex, null);
delegate.onFlowCompleted(this);
}
@Override
public RepositorySessionStoreDelegate deferredStoreDelegate(final ExecutorService executor) {
return new DeferredRepositorySessionStoreDelegate(this, executor);
}
--- a/mobile/android/services/src/main/java/org/mozilla/gecko/sync/synchronizer/Synchronizer.java
+++ b/mobile/android/services/src/main/java/org/mozilla/gecko/sync/synchronizer/Synchronizer.java
@@ -25,19 +25,19 @@ import android.content.Context;
* store, or session error while synchronizing.
*
* After synchronizing, call `save` to get back a SynchronizerConfiguration with
* updated bundle information.
*/
public class Synchronizer implements SynchronizerSessionDelegate {
public static final String LOG_TAG = "SyncDelSDelegate";
- protected String configSyncID; // Used to pass syncID from load() back into save().
+ private String configSyncID; // Used to pass syncID from load() back into save().
- protected SynchronizerDelegate synchronizerDelegate;
+ private SynchronizerDelegate synchronizerDelegate;
protected SynchronizerSession session = null;
public SynchronizerSession getSynchronizerSession() {
return session;
}
@Override
--- a/mobile/android/services/src/main/java/org/mozilla/gecko/sync/synchronizer/SynchronizerSession.java
+++ b/mobile/android/services/src/main/java/org/mozilla/gecko/sync/synchronizer/SynchronizerSession.java
@@ -263,17 +263,17 @@ public class SynchronizerSession impleme
}
final SynchronizerSession session = this;
// TODO: failed record handling.
// This is the *second* record channel to flow.
// I, SynchronizerSession, am the delegate for the *second* flow.
- channelBToA = new RecordsChannel(this.sessionB, this.sessionA, this);
+ channelBToA = getRecordsChannel(this.sessionB, this.sessionA, this);
// This is the delegate for the *first* flow.
RecordsChannelDelegate channelAToBDelegate = new RecordsChannelDelegate() {
@Override
public void onFlowCompleted(RecordsChannel recordsChannel) {
session.onFirstFlowCompleted(recordsChannel);
}
@@ -295,26 +295,30 @@ public class SynchronizerSession impleme
// Currently we're just recording the very last exception which occurred. This is a reasonable
// approach, but ideally we'd want to categorize the exceptions and count them for the purposes
// of better telemetry. See Bug 1362208.
storeFailedCauseException = ex;
}
};
// This is the *first* channel to flow.
- channelAToB = new RecordsChannel(this.sessionA, this.sessionB, channelAToBDelegate);
+ channelAToB = getRecordsChannel(this.sessionA, this.sessionB, channelAToBDelegate);
Logger.trace(LOG_TAG, "Starting A to B flow. Channel is " + channelAToB);
try {
channelAToB.beginAndFlow();
} catch (InvalidSessionTransitionException e) {
onFlowBeginFailed(channelAToB, e);
}
}
+ protected RecordsChannel getRecordsChannel(RepositorySession sink, RepositorySession source, RecordsChannelDelegate delegate) {
+ return new RecordsChannel(sink, source, delegate);
+ }
+
/**
* Called after the first flow completes.
* <p>
* By default, any fetch and store failures are ignored.
* @param recordsChannel the <code>RecordsChannel</code> (for error testing).
*/
public void onFirstFlowCompleted(RecordsChannel recordsChannel) {
// If a "reflow exception" was thrown, consider this synchronization failed.
@@ -322,19 +326,18 @@ public class SynchronizerSession impleme
if (reflowException != null) {
final String message = "Reflow is necessary: " + reflowException;
Logger.warn(LOG_TAG, message + " Aborting session.");
delegate.onSynchronizeFailed(this, reflowException, message);
return;
}
// Fetch failures always abort.
- int numRemoteFetchFailed = recordsChannel.getFetchFailureCount();
- if (numRemoteFetchFailed > 0) {
- final String message = "Got " + numRemoteFetchFailed + " failures fetching remote records!";
+ if (recordsChannel.didFetchFail()) {
+ final String message = "Saw failures fetching remote records!";
Logger.warn(LOG_TAG, message + " Aborting session.");
delegate.onSynchronizeFailed(this, new FetchFailedException(), message);
return;
}
Logger.trace(LOG_TAG, "No failures fetching remote records.");
// Local store failures are ignored.
int numLocalStoreFailed = recordsChannel.getStoreFailureCount();
@@ -369,19 +372,18 @@ public class SynchronizerSession impleme
if (reflowException != null) {
final String message = "Reflow is necessary: " + reflowException;
Logger.warn(LOG_TAG, message + " Aborting session.");
delegate.onSynchronizeFailed(this, reflowException, message);
return;
}
// Fetch failures always abort.
- int numLocalFetchFailed = recordsChannel.getFetchFailureCount();
- if (numLocalFetchFailed > 0) {
- final String message = "Got " + numLocalFetchFailed + " failures fetching local records!";
+ if (recordsChannel.didFetchFail()) {
+ final String message = "Saw failures fetching local records!";
Logger.warn(LOG_TAG, message + " Aborting session.");
delegate.onSynchronizeFailed(this, new FetchFailedException(), message);
return;
}
Logger.trace(LOG_TAG, "No failures fetching local records.");
// Remote store failures abort!
int numRemoteStoreFailed = recordsChannel.getStoreFailureCount();
--- a/mobile/android/services/src/test/java/org/mozilla/android/sync/test/SynchronizerHelpers.java
+++ b/mobile/android/services/src/test/java/org/mozilla/android/sync/test/SynchronizerHelpers.java
@@ -77,21 +77,16 @@ public class SynchronizerHelpers {
}
@Override
public void onFetchCompleted() {
delegate.onFetchCompleted();
}
@Override
- public void onBatchCompleted() {
-
- }
-
- @Override
public RepositorySessionFetchRecordsDelegate deferredFetchDelegate(ExecutorService executor) {
return this;
}
});
}
};
}
}
--- a/mobile/android/services/src/test/java/org/mozilla/android/sync/test/TestRecordsChannel.java
+++ b/mobile/android/services/src/test/java/org/mozilla/android/sync/test/TestRecordsChannel.java
@@ -13,23 +13,25 @@ import org.mozilla.gecko.background.test
import org.mozilla.gecko.background.testhelpers.WaitHelper;
import org.mozilla.gecko.sync.CollectionConcurrentModificationException;
import org.mozilla.gecko.sync.SyncDeadlineReachedException;
import org.mozilla.gecko.sync.repositories.InactiveSessionException;
import org.mozilla.gecko.sync.repositories.InvalidSessionTransitionException;
import org.mozilla.gecko.sync.repositories.RepositorySession;
import org.mozilla.gecko.sync.repositories.RepositorySessionBundle;
import org.mozilla.gecko.sync.repositories.domain.BookmarkRecord;
+import org.mozilla.gecko.sync.synchronizer.NonBufferingRecordsChannel;
import org.mozilla.gecko.sync.synchronizer.RecordsChannel;
import org.mozilla.gecko.sync.synchronizer.RecordsChannelDelegate;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;
@RunWith(TestRunner.class)
public class TestRecordsChannel {
private WBORepository sourceRepository;
private RepositorySession sourceSession;
@@ -37,29 +39,27 @@ public class TestRecordsChannel {
private RepositorySession sinkSession;
private RecordsChannelDelegate rcDelegate;
private AtomicInteger numFlowFetchFailed;
private AtomicInteger numFlowStoreFailed;
private AtomicInteger numFlowCompleted;
private AtomicBoolean flowBeginFailed;
- private AtomicBoolean flowFinishFailed;
private volatile RecordsChannel recordsChannel;
private volatile Exception fetchException;
private volatile Exception storeException;
@Before
public void setUp() throws Exception {
numFlowFetchFailed = new AtomicInteger(0);
numFlowStoreFailed = new AtomicInteger(0);
numFlowCompleted = new AtomicInteger(0);
flowBeginFailed = new AtomicBoolean(false);
- flowFinishFailed = new AtomicBoolean(false);
// Repositories and sessions will be set/created by tests.
sourceRepository = null;
sourceSession = null;
sinkRepository = null;
sinkSession = null;
rcDelegate = new RecordsChannelDelegate() {
@@ -107,21 +107,25 @@ public class TestRecordsChannel {
};
}
private void createSessions() {
sourceSession = sourceRepository.createSession(null);
sinkSession = sinkRepository.createSession(null);
}
- public void doFlow() throws Exception {
+ public void doFlow(boolean nonBuffering) throws Exception {
createSessions();
assertNotNull(sourceSession);
assertNotNull(sinkSession);
- recordsChannel = new RecordsChannel(sourceSession, sinkSession, rcDelegate);
+ if (nonBuffering) {
+ recordsChannel = new NonBufferingRecordsChannel(sourceSession, sinkSession, rcDelegate);
+ } else {
+ recordsChannel = new RecordsChannel(sourceSession, sinkSession, rcDelegate);
+ }
WaitHelper.getTestWaiter().performWait(new Runnable() {
@Override
public void run() {
try {
recordsChannel.beginAndFlow();
} catch (InvalidSessionTransitionException e) {
WaitHelper.getTestWaiter().performNotify(e);
}
@@ -168,146 +172,286 @@ public class TestRecordsChannel {
}
return repo;
}
@Test
public void testSuccess() throws Exception {
sourceRepository = full();
sinkRepository = empty();
- doFlow();
+ doFlow(false);
assertEquals(1, numFlowCompleted.get());
assertEquals(0, numFlowFetchFailed.get());
assertEquals(0, numFlowStoreFailed.get());
assertEquals(sourceRepository.wbos, sinkRepository.wbos);
- assertEquals(0, recordsChannel.getFetchFailureCount());
+ assertFalse(recordsChannel.didFetchFail());
+ assertEquals(0, recordsChannel.getStoreFailureCount());
+ assertEquals(6, recordsChannel.getStoreAttemptedCount());
+ }
+
+ @Test
+ public void testSuccessNB() throws Exception {
+ sourceRepository = full();
+ sinkRepository = empty();
+ doFlow(true);
+ assertEquals(1, numFlowCompleted.get());
+ assertEquals(0, numFlowFetchFailed.get());
+ assertEquals(0, numFlowStoreFailed.get());
+ assertEquals(sourceRepository.wbos, sinkRepository.wbos);
+ assertFalse(recordsChannel.didFetchFail());
assertEquals(0, recordsChannel.getStoreFailureCount());
assertEquals(6, recordsChannel.getStoreAttemptedCount());
}
@Test
public void testFetchFail() throws Exception {
sourceRepository = failingFetch(SynchronizerHelpers.FailMode.FETCH);
sinkRepository = empty();
- doFlow();
+ doFlow(false);
assertEquals(1, numFlowCompleted.get());
assertTrue(numFlowFetchFailed.get() > 0);
assertEquals(0, numFlowStoreFailed.get());
assertTrue(sinkRepository.wbos.size() < 6);
- assertTrue(recordsChannel.getFetchFailureCount() > 0);
+ assertTrue(recordsChannel.didFetchFail());
+ assertEquals(0, recordsChannel.getStoreFailureCount());
+ assertTrue(recordsChannel.getStoreAttemptedCount() < 6);
+ }
+
+ @Test
+ public void testFetchFailNB() throws Exception {
+ sourceRepository = failingFetch(SynchronizerHelpers.FailMode.FETCH);
+ sinkRepository = empty();
+ doFlow(true);
+ assertEquals(1, numFlowCompleted.get());
+ assertTrue(numFlowFetchFailed.get() > 0);
+ assertEquals(0, numFlowStoreFailed.get());
+ assertTrue(sinkRepository.wbos.size() < 6);
+ assertTrue(recordsChannel.didFetchFail());
assertEquals(0, recordsChannel.getStoreFailureCount());
assertTrue(recordsChannel.getStoreAttemptedCount() < 6);
}
@Test
public void testStoreFetchFailedCollectionModified() throws Exception {
sourceRepository = failingFetch(SynchronizerHelpers.FailMode.COLLECTION_MODIFIED);
sinkRepository = empty();
- doFlow();
+ doFlow(false);
assertEquals(1, numFlowCompleted.get());
assertTrue(numFlowFetchFailed.get() > 0);
assertEquals(0, numFlowStoreFailed.get());
assertTrue(sinkRepository.wbos.size() < 6);
- assertTrue(recordsChannel.getFetchFailureCount() > 0);
+ assertTrue(recordsChannel.didFetchFail());
+ assertEquals(0, recordsChannel.getStoreFailureCount());
+ assertTrue(recordsChannel.getStoreAttemptedCount() < sourceRepository.wbos.size());
+
+ assertEquals(CollectionConcurrentModificationException.class, fetchException.getClass());
+ final Exception ex = recordsChannel.getReflowException();
+ assertNotNull(ex);
+ assertEquals(CollectionConcurrentModificationException.class, ex.getClass());
+ }
+
+ @Test
+ public void testStoreFetchFailedCollectionModifiedNB() throws Exception {
+ sourceRepository = failingFetch(SynchronizerHelpers.FailMode.COLLECTION_MODIFIED);
+ sinkRepository = empty();
+ doFlow(true);
+ assertEquals(1, numFlowCompleted.get());
+ assertTrue(numFlowFetchFailed.get() > 0);
+ assertEquals(0, numFlowStoreFailed.get());
+ assertTrue(sinkRepository.wbos.size() < 6);
+
+ assertTrue(recordsChannel.didFetchFail());
assertEquals(0, recordsChannel.getStoreFailureCount());
assertTrue(recordsChannel.getStoreAttemptedCount() < sourceRepository.wbos.size());
assertEquals(CollectionConcurrentModificationException.class, fetchException.getClass());
final Exception ex = recordsChannel.getReflowException();
assertNotNull(ex);
assertEquals(CollectionConcurrentModificationException.class, ex.getClass());
}
@Test
public void testStoreFetchFailedDeadline() throws Exception {
sourceRepository = failingFetch(SynchronizerHelpers.FailMode.DEADLINE_REACHED);
sinkRepository = empty();
- doFlow();
+ doFlow(false);
assertEquals(1, numFlowCompleted.get());
assertTrue(numFlowFetchFailed.get() > 0);
assertEquals(0, numFlowStoreFailed.get());
assertTrue(sinkRepository.wbos.size() < 6);
- assertTrue(recordsChannel.getFetchFailureCount() > 0);
+ assertTrue(recordsChannel.didFetchFail());
+ assertEquals(0, recordsChannel.getStoreFailureCount());
+ assertTrue(recordsChannel.getStoreAttemptedCount() < sourceRepository.wbos.size());
+
+ assertEquals(SyncDeadlineReachedException.class, fetchException.getClass());
+ final Exception ex = recordsChannel.getReflowException();
+ assertNotNull(ex);
+ assertEquals(SyncDeadlineReachedException.class, ex.getClass());
+ }
+
+ @Test
+ public void testStoreFetchFailedDeadlineNB() throws Exception {
+ sourceRepository = failingFetch(SynchronizerHelpers.FailMode.DEADLINE_REACHED);
+ sinkRepository = empty();
+ doFlow(true);
+ assertEquals(1, numFlowCompleted.get());
+ assertTrue(numFlowFetchFailed.get() > 0);
+ assertEquals(0, numFlowStoreFailed.get());
+ assertTrue(sinkRepository.wbos.size() < 6);
+
+ assertTrue(recordsChannel.didFetchFail());
assertEquals(0, recordsChannel.getStoreFailureCount());
assertTrue(recordsChannel.getStoreAttemptedCount() < sourceRepository.wbos.size());
assertEquals(SyncDeadlineReachedException.class, fetchException.getClass());
final Exception ex = recordsChannel.getReflowException();
assertNotNull(ex);
assertEquals(SyncDeadlineReachedException.class, ex.getClass());
}
@Test
public void testStoreSerialFail() throws Exception {
sourceRepository = full();
sinkRepository = new SynchronizerHelpers.SerialFailStoreWBORepository(
SynchronizerHelpers.FailMode.STORE);
- doFlow();
+ doFlow(false);
assertEquals(1, numFlowCompleted.get());
assertEquals(0, numFlowFetchFailed.get());
assertEquals(1, numFlowStoreFailed.get());
// We will fail to store one of the records but expect flow to continue.
assertEquals(5, sinkRepository.wbos.size());
- assertEquals(0, recordsChannel.getFetchFailureCount());
+ assertFalse(recordsChannel.didFetchFail());
+ assertEquals(1, recordsChannel.getStoreFailureCount());
+ // Number of store attempts.
+ assertEquals(sourceRepository.wbos.size(), recordsChannel.getStoreAttemptedCount());
+ }
+
+ @Test
+ public void testStoreSerialFailNB() throws Exception {
+ sourceRepository = full();
+ sinkRepository = new SynchronizerHelpers.SerialFailStoreWBORepository(
+ SynchronizerHelpers.FailMode.STORE);
+ doFlow(true);
+ assertEquals(1, numFlowCompleted.get());
+ assertEquals(0, numFlowFetchFailed.get());
+ assertEquals(1, numFlowStoreFailed.get());
+ // We will fail to store one of the records but expect flow to continue.
+ assertEquals(5, sinkRepository.wbos.size());
+
+ assertFalse(recordsChannel.didFetchFail());
assertEquals(1, recordsChannel.getStoreFailureCount());
// Number of store attempts.
assertEquals(sourceRepository.wbos.size(), recordsChannel.getStoreAttemptedCount());
}
@Test
public void testStoreSerialFailCollectionModified() throws Exception {
sourceRepository = full();
sinkRepository = new SynchronizerHelpers.SerialFailStoreWBORepository(
SynchronizerHelpers.FailMode.COLLECTION_MODIFIED);
- doFlow();
+ doFlow(false);
assertEquals(1, numFlowCompleted.get());
assertEquals(0, numFlowFetchFailed.get());
assertEquals(1, numFlowStoreFailed.get());
// One of the records will fail, at which point we'll stop flowing them.
final int sunkenRecords = sinkRepository.wbos.size();
assertTrue(sunkenRecords > 0 && sunkenRecords < 6);
- assertEquals(0, recordsChannel.getFetchFailureCount());
+ assertFalse(recordsChannel.didFetchFail());
+ // RecordChannel's storeFail count is only incremented for failures of individual records.
+ assertEquals(0, recordsChannel.getStoreFailureCount());
+
+ assertEquals(CollectionConcurrentModificationException.class, storeException.getClass());
+ final Exception ex = recordsChannel.getReflowException();
+ assertNotNull(ex);
+ assertEquals(CollectionConcurrentModificationException.class, ex.getClass());
+ }
+
+ @Test
+ public void testStoreSerialFailCollectionModifiedNB() throws Exception {
+ sourceRepository = full();
+ sinkRepository = new SynchronizerHelpers.SerialFailStoreWBORepository(
+ SynchronizerHelpers.FailMode.COLLECTION_MODIFIED);
+ doFlow(true);
+ assertEquals(1, numFlowCompleted.get());
+ assertEquals(0, numFlowFetchFailed.get());
+ assertEquals(1, numFlowStoreFailed.get());
+ // One of the records will fail, at which point we'll stop flowing them.
+ final int sunkenRecords = sinkRepository.wbos.size();
+ assertTrue(sunkenRecords > 0 && sunkenRecords < 6);
+
+ assertFalse(recordsChannel.didFetchFail());
// RecordChannel's storeFail count is only incremented for failures of individual records.
assertEquals(0, recordsChannel.getStoreFailureCount());
assertEquals(CollectionConcurrentModificationException.class, storeException.getClass());
final Exception ex = recordsChannel.getReflowException();
assertNotNull(ex);
assertEquals(CollectionConcurrentModificationException.class, ex.getClass());
}
@Test
public void testStoreBatchesFail() throws Exception {
sourceRepository = full();
sinkRepository = new SynchronizerHelpers.BatchFailStoreWBORepository(3);
- doFlow();
+ doFlow(false);
assertEquals(1, numFlowCompleted.get());
assertEquals(0, numFlowFetchFailed.get());
assertEquals(3, numFlowStoreFailed.get()); // One batch fails.
assertEquals(3, sinkRepository.wbos.size()); // One batch succeeds.
- assertEquals(0, recordsChannel.getFetchFailureCount());
+ assertFalse(recordsChannel.didFetchFail());
+ assertEquals(3, recordsChannel.getStoreFailureCount());
+ // Number of store attempts.
+ assertEquals(sourceRepository.wbos.size(), recordsChannel.getStoreAttemptedCount());
+ }
+
+ @Test
+ public void testStoreBatchesFailNB() throws Exception {
+ sourceRepository = full();
+ sinkRepository = new SynchronizerHelpers.BatchFailStoreWBORepository(3);
+ doFlow(true);
+ assertEquals(1, numFlowCompleted.get());
+ assertEquals(0, numFlowFetchFailed.get());
+ assertEquals(3, numFlowStoreFailed.get()); // One batch fails.
+ assertEquals(3, sinkRepository.wbos.size()); // One batch succeeds.
+
+ assertFalse(recordsChannel.didFetchFail());
assertEquals(3, recordsChannel.getStoreFailureCount());
// Number of store attempts.
assertEquals(sourceRepository.wbos.size(), recordsChannel.getStoreAttemptedCount());
}
-
@Test
public void testStoreOneBigBatchFail() throws Exception {
sourceRepository = full();
sinkRepository = new SynchronizerHelpers.BatchFailStoreWBORepository(50);
- doFlow();
+ doFlow(false);
assertEquals(1, numFlowCompleted.get());
assertEquals(0, numFlowFetchFailed.get());
assertEquals(6, numFlowStoreFailed.get()); // One (big) batch fails.
assertEquals(0, sinkRepository.wbos.size()); // No batches succeed.
- assertEquals(0, recordsChannel.getFetchFailureCount());
+ assertFalse(recordsChannel.didFetchFail());
+ assertEquals(6, recordsChannel.getStoreFailureCount());
+ // Number of store attempts.
+ assertEquals(sourceRepository.wbos.size(), recordsChannel.getStoreAttemptedCount());
+ }
+
+ @Test
+ public void testStoreOneBigBatchFailNB() throws Exception {
+ sourceRepository = full();
+ sinkRepository = new SynchronizerHelpers.BatchFailStoreWBORepository(50);
+ doFlow(true);
+ assertEquals(1, numFlowCompleted.get());
+ assertEquals(0, numFlowFetchFailed.get());
+ assertEquals(6, numFlowStoreFailed.get()); // One (big) batch fails.
+ assertEquals(0, sinkRepository.wbos.size()); // No batches succeed.
+
+ assertFalse(recordsChannel.didFetchFail());
assertEquals(6, recordsChannel.getStoreFailureCount());
// Number of store attempts.
assertEquals(sourceRepository.wbos.size(), recordsChannel.getStoreAttemptedCount());
}
}
--- a/mobile/android/services/src/test/java/org/mozilla/android/sync/test/helpers/ExpectSuccessRepositorySessionFetchRecordsDelegate.java
+++ b/mobile/android/services/src/test/java/org/mozilla/android/sync/test/helpers/ExpectSuccessRepositorySessionFetchRecordsDelegate.java
@@ -33,17 +33,12 @@ public class ExpectSuccessRepositorySess
@Override
public void onFetchCompleted() {
log("Fetch completed.");
performNotify();
}
@Override
- public void onBatchCompleted() {
- log("Batch completed.");
- }
-
- @Override
public RepositorySessionFetchRecordsDelegate deferredFetchDelegate(ExecutorService executor) {
return this;
}
}
deleted file mode 100644
--- a/mobile/android/services/src/test/java/org/mozilla/gecko/sync/middleware/BufferingMiddlewareRepositorySessionTest.java
+++ /dev/null
@@ -1,178 +0,0 @@
-/* Any copyright is dedicated to the Public Domain.
- http://creativecommons.org/publicdomain/zero/1.0/ */
-
-package org.mozilla.gecko.sync.middleware;
-
-import org.junit.Before;
-import org.junit.Test;
-import org.mozilla.gecko.background.testhelpers.MockRecord;
-import org.mozilla.gecko.sync.middleware.storage.BufferStorage;
-import org.mozilla.gecko.sync.middleware.storage.MemoryBufferStorage;
-import org.mozilla.gecko.sync.repositories.Repository;
-import org.mozilla.gecko.sync.repositories.RepositorySession;
-import org.mozilla.gecko.sync.repositories.delegates.RepositorySessionStoreDelegate;
-import org.mozilla.gecko.sync.repositories.domain.Record;
-
-import static org.mockito.Matchers.any;
-import static org.mockito.Mockito.mock;
-
-import static org.junit.Assert.*;
-import static org.mockito.Mockito.never;
-import static org.mockito.Mockito.reset;
-import static org.mockito.Mockito.times;
-import static org.mockito.Mockito.verify;
-
-public class BufferingMiddlewareRepositorySessionTest {
- private RepositorySession innerRepositorySession;
- private BufferingMiddlewareRepositorySession bufferingSession;
- private BufferingMiddlewareRepositorySession bufferingSessionMocked;
- private BufferStorage bufferStorage;
- private BufferStorage bufferStorageMocked;
-
- @Before
- public void setUp() throws Exception {
- BufferingMiddlewareRepository bufferingRepository;
- Repository innerRepositoy;
-
- innerRepositoy = mock(Repository.class);
- innerRepositorySession = mock(RepositorySession.class);
- bufferingRepository = new BufferingMiddlewareRepository(
- 0L,
- new MemoryBufferStorage(),
- innerRepositoy
- );
-
- bufferStorage = new MemoryBufferStorage();
- bufferStorageMocked = mock(MemoryBufferStorage.class);
-
- bufferingSession = new BufferingMiddlewareRepositorySession(
- innerRepositorySession, bufferingRepository, 0L,
- bufferStorage);
-
- bufferingSessionMocked = new BufferingMiddlewareRepositorySession(
- innerRepositorySession, bufferingRepository, 0L,
- bufferStorageMocked);
- }
-
- @Test
- public void store() throws Exception {
- assertEquals(0, bufferStorage.all().size());
-
- MockRecord record = new MockRecord("guid1", null, 1, false);
- bufferingSession.store(record);
- assertEquals(1, bufferStorage.all().size());
-
- MockRecord record1 = new MockRecord("guid2", null, 1, false);
- bufferingSession.store(record1);
- assertEquals(2, bufferStorage.all().size());
-
- // record2 must replace record.
- MockRecord record2 = new MockRecord("guid1", null, 2, false);
- bufferingSession.store(record2);
- assertEquals(2, bufferStorage.all().size());
-
- // Ensure inner session doesn't see incoming records.
- verify(innerRepositorySession, never()).store(record);
- verify(innerRepositorySession, never()).store(record1);
- verify(innerRepositorySession, never()).store(record2);
- }
-
- @Test
- public void storeDone() throws Exception {
- // Trivial case, no records to merge.
- bufferingSession.doMergeBuffer();
- verify(innerRepositorySession, times(1)).storeDone();
- verify(innerRepositorySession, never()).store(any(Record.class));
-
- // Reset call counters.
- reset(innerRepositorySession);
-
- // Now store some records.
- MockRecord record = new MockRecord("guid1", null, 1, false);
- bufferingSession.store(record);
-
- MockRecord record2 = new MockRecord("guid2", null, 13, false);
- bufferingSession.store(record2);
-
- MockRecord record3 = new MockRecord("guid3", null, 5, false);
- bufferingSession.store(record3);
-
- // NB: same guid as above.
- MockRecord record4 = new MockRecord("guid3", null, -1, false);
- bufferingSession.store(record4);
-
- // Done storing.
- bufferingSession.doMergeBuffer();
-
- // Ensure all records where stored in the wrapped session.
- verify(innerRepositorySession, times(1)).store(record);
- verify(innerRepositorySession, times(1)).store(record2);
- verify(innerRepositorySession, times(1)).store(record4);
-
- // Ensure storeDone was called on the wrapped session.
- verify(innerRepositorySession, times(1)).storeDone();
-
- // Ensure buffer wasn't cleared on the wrapped session.
- assertEquals(3, bufferStorage.all().size());
- }
-
- @Test
- public void storeFlush() throws Exception {
- verify(bufferStorageMocked, times(0)).flush();
- bufferingSessionMocked.storeFlush();
- verify(bufferStorageMocked, times(1)).flush();
- }
-
- @Test
- public void performCleanup() throws Exception {
- // Baseline.
- assertEquals(0, bufferStorage.all().size());
-
- // Test that we can call cleanup with an empty buffer storage.
- bufferingSession.performCleanup();
- assertEquals(0, bufferStorage.all().size());
-
- // Store a couple of records.
- MockRecord record = new MockRecord("guid1", null, 1, false);
- bufferingSession.store(record);
-
- MockRecord record2 = new MockRecord("guid2", null, 13, false);
- bufferingSession.store(record2);
-
- // Confirm it worked.
- assertEquals(2, bufferStorage.all().size());
-
- // Test that buffer storage is cleaned up.
- bufferingSession.performCleanup();
- assertEquals(0, bufferStorage.all().size());
- }
-
- @Test
- public void abort() throws Exception {
- MockRecord record = new MockRecord("guid1", null, 1, false);
- bufferingSession.store(record);
-
- MockRecord record2 = new MockRecord("guid2", null, 13, false);
- bufferingSession.store(record2);
-
- MockRecord record3 = new MockRecord("guid3", null, 5, false);
- bufferingSession.store(record3);
-
- // NB: same guid as above.
- MockRecord record4 = new MockRecord("guid3", null, -1, false);
- bufferingSession.store(record4);
-
- bufferingSession.abort();
-
- // Verify number of records didn't change.
- // Abort shouldn't clear the buffer.
- assertEquals(3, bufferStorage.all().size());
- }
-
- @Test
- public void setStoreDelegate() throws Exception {
- RepositorySessionStoreDelegate delegate = mock(RepositorySessionStoreDelegate.class);
- bufferingSession.setStoreDelegate(delegate);
- verify(innerRepositorySession).setStoreDelegate(delegate);
- }
-}
\ No newline at end of file
--- a/mobile/android/services/src/test/java/org/mozilla/gecko/sync/repositories/downloaders/BatchingDownloaderDelegateTest.java
+++ b/mobile/android/services/src/test/java/org/mozilla/gecko/sync/repositories/downloaders/BatchingDownloaderDelegateTest.java
@@ -98,21 +98,16 @@ public class BatchingDownloaderDelegateT
}
@Override
public void onFetchCompleted() {
}
@Override
- public void onBatchCompleted() {
-
- }
-
- @Override
public RepositorySessionFetchRecordsDelegate deferredFetchDelegate(ExecutorService executor) {
return null;
}
}
@Before
public void setUp() throws Exception {
repositorySession = new Server15RepositorySession(new Server15Repository(
--- a/mobile/android/services/src/test/java/org/mozilla/gecko/sync/repositories/downloaders/BatchingDownloaderTest.java
+++ b/mobile/android/services/src/test/java/org/mozilla/gecko/sync/repositories/downloaders/BatchingDownloaderTest.java
@@ -118,17 +118,16 @@ public class BatchingDownloaderTest {
return super.resetAndCommit();
}
}
static class MockSessionFetchRecordsDelegate implements RepositorySessionFetchRecordsDelegate {
public boolean isFailure;
public boolean isFetched;
public boolean isSuccess;
- public int batchesCompleted;
public Exception ex;
public Record record;
@Override
public void onFetchFailed(Exception ex) {
this.isFailure = true;
this.ex = ex;
}
@@ -140,21 +139,16 @@ public class BatchingDownloaderTest {
}
@Override
public void onFetchCompleted() {
this.isSuccess = true;
}
@Override
- public void onBatchCompleted() {
- this.batchesCompleted += 1;
- }
-
- @Override
public RepositorySessionFetchRecordsDelegate deferredFetchDelegate(ExecutorService executor) {
return null;
}
}
static class MockRequest extends SyncStorageCollectionRequest {
MockRequest(URI uri) {
@@ -291,17 +285,16 @@ public class BatchingDownloaderTest {
SyncStorageCollectionRequest request = new SyncStorageCollectionRequest(new URI(DEFAULT_COLLECTION_URL));
mockDownloader.onFetchCompleted(response, sessionFetchRecordsDelegate, request,
DEFAULT_NEWER, BATCH_LIMIT, true, DEFAULT_SORT, DEFAULT_IDS);
assertEquals(DEFAULT_LMHEADER, mockDownloader.getLastModified());
assertTrue(sessionFetchRecordsDelegate.isSuccess);
assertFalse(sessionFetchRecordsDelegate.isFetched);
assertFalse(sessionFetchRecordsDelegate.isFailure);
- assertEquals(0, sessionFetchRecordsDelegate.batchesCompleted);
// NB: we set highWaterMark as part of onFetchedRecord, so we don't expect it to be set here.
// Expect no offset to be persisted.
ensureOffsetContextIsNull(repositoryStateProvider);
assertEquals(1, repositoryStateProvider.getCommitCount());
}
@Test
@@ -319,17 +312,16 @@ public class BatchingDownloaderTest {
SyncStorageCollectionRequest request = new SyncStorageCollectionRequest(new URI(DEFAULT_COLLECTION_URL));
mockDownloader.onFetchCompleted(response, sessionFetchRecordsDelegate, request,
DEFAULT_NEWER, BATCH_LIMIT, true, DEFAULT_SORT, DEFAULT_IDS);
assertEquals(DEFAULT_LMHEADER, mockDownloader.getLastModified());
assertTrue(sessionFetchRecordsDelegate.isSuccess);
assertFalse(sessionFetchRecordsDelegate.isFetched);
assertFalse(sessionFetchRecordsDelegate.isFailure);
- assertEquals(0, sessionFetchRecordsDelegate.batchesCompleted);
// We don't care about the offset in a single batch mode.
ensureOffsetContextIsNull(repositoryStateProvider);
assertEquals(1, repositoryStateProvider.getCommitCount());
}
@Test
public void testBatching() throws Exception {
@@ -344,62 +336,58 @@ public class BatchingDownloaderTest {
assertEquals(DEFAULT_LMHEADER, mockDownloader.getLastModified());
// Verify the same parameters are used in the next fetch.
assertSameParameters(mockDownloader, BATCH_LIMIT);
assertEquals("25", mockDownloader.offset);
assertFalse(sessionFetchRecordsDelegate.isSuccess);
assertFalse(sessionFetchRecordsDelegate.isFetched);
assertFalse(sessionFetchRecordsDelegate.isFailure);
- assertEquals(1, sessionFetchRecordsDelegate.batchesCompleted);
// Offset context set.
ensureOffsetContextIs(repositoryStateProvider, "25", "oldest", 1L);
assertEquals(1, repositoryStateProvider.getCommitCount());
// The next batch, we still have an offset token and has not exceed the total limit.
performOnFetchCompleted("50", recordsHeader, BATCH_LIMIT);
assertEquals(DEFAULT_LMHEADER, mockDownloader.getLastModified());
// Verify the same parameters are used in the next fetch.
assertSameParameters(mockDownloader, BATCH_LIMIT);
assertEquals("50", mockDownloader.offset);
assertFalse(sessionFetchRecordsDelegate.isSuccess);
assertFalse(sessionFetchRecordsDelegate.isFetched);
assertFalse(sessionFetchRecordsDelegate.isFailure);
- assertEquals(2, sessionFetchRecordsDelegate.batchesCompleted);
// Offset context updated.
ensureOffsetContextIs(repositoryStateProvider, "50", "oldest", 1L);
assertEquals(2, repositoryStateProvider.getCommitCount());
// The next batch, we still have an offset token and has not exceed the total limit.
performOnFetchCompleted("75", recordsHeader, BATCH_LIMIT);
assertEquals(DEFAULT_LMHEADER, mockDownloader.getLastModified());
// Verify the same parameters are used in the next fetch.
assertSameParameters(mockDownloader, BATCH_LIMIT);
assertEquals("75", mockDownloader.offset);
assertFalse(sessionFetchRecordsDelegate.isSuccess);
assertFalse(sessionFetchRecordsDelegate.isFetched);
assertFalse(sessionFetchRecordsDelegate.isFailure);
- assertEquals(3, sessionFetchRecordsDelegate.batchesCompleted);
// Offset context updated.
ensureOffsetContextIs(repositoryStateProvider, "75", "oldest", 1L);
assertEquals(3, repositoryStateProvider.getCommitCount());
// No more offset token, so we complete batching.
performOnFetchCompleted(null, recordsHeader, BATCH_LIMIT);
assertEquals(DEFAULT_LMHEADER, mockDownloader.getLastModified());
assertTrue(sessionFetchRecordsDelegate.isSuccess);
assertFalse(sessionFetchRecordsDelegate.isFetched);
assertFalse(sessionFetchRecordsDelegate.isFailure);
- assertEquals(3, sessionFetchRecordsDelegate.batchesCompleted);
// Offset context cleared since we finished batching, and committed.
ensureOffsetContextIsNull(repositoryStateProvider);
assertEquals(4, repositoryStateProvider.getCommitCount());
}
@Test
public void testFailureLMChangedMultiBatch() throws Exception {