--- a/mobile/android/base/android-services.mozbuild
+++ b/mobile/android/base/android-services.mozbuild
@@ -861,16 +861,17 @@ sync_java_files = [TOPSRCDIR + '/mobile/
'fxa/SyncStatusListener.java',
'push/autopush/AutopushClient.java',
'push/autopush/AutopushClientException.java',
'push/RegisterUserAgentResponse.java',
'push/SubscribeChannelResponse.java',
'sync/AlreadySyncingException.java',
'sync/BackoffHandler.java',
'sync/BadRequiredFieldJSONException.java',
+ 'sync/CollectionConcurrentModificationException.java',
'sync/CollectionKeys.java',
'sync/CommandProcessor.java',
'sync/CommandRunner.java',
'sync/CredentialException.java',
'sync/crypto/CryptoException.java',
'sync/crypto/CryptoInfo.java',
'sync/crypto/HKDF.java',
'sync/crypto/HMACVerificationException.java',
@@ -939,16 +940,17 @@ sync_java_files = [TOPSRCDIR + '/mobile/
'sync/net/WBORequestDelegate.java',
'sync/NoCollectionKeysSetException.java',
'sync/NodeAuthenticationException.java',
'sync/NonArrayJSONException.java',
'sync/NonObjectJSONException.java',
'sync/NullClusterURLException.java',
'sync/PersistedMetaGlobal.java',
'sync/PrefsBackoffHandler.java',
+ 'sync/ReflowIsNecessaryException.java',
'sync/repositories/android/AndroidBrowserBookmarksDataAccessor.java',
'sync/repositories/android/AndroidBrowserBookmarksRepository.java',
'sync/repositories/android/AndroidBrowserBookmarksRepositorySession.java',
'sync/repositories/android/AndroidBrowserHistoryDataAccessor.java',
'sync/repositories/android/AndroidBrowserHistoryRepository.java',
'sync/repositories/android/AndroidBrowserHistoryRepositorySession.java',
'sync/repositories/android/AndroidBrowserRepository.java',
'sync/repositories/android/AndroidBrowserRepositoryDataAccessor.java',
@@ -1055,16 +1057,17 @@ sync_java_files = [TOPSRCDIR + '/mobile/
'sync/stage/PasswordsServerSyncStage.java',
'sync/stage/ServerSyncStage.java',
'sync/stage/SyncClientsEngineStage.java',
'sync/stage/UploadMetaGlobalStage.java',
'sync/Sync11Configuration.java',
'sync/SyncConfiguration.java',
'sync/SyncConfigurationException.java',
'sync/SyncConstants.java',
+ 'sync/SyncDeadlineReachedException.java',
'sync/SyncException.java',
'sync/synchronizer/ConcurrentRecordConsumer.java',
'sync/synchronizer/RecordConsumer.java',
'sync/synchronizer/RecordsChannel.java',
'sync/synchronizer/RecordsChannelDelegate.java',
'sync/synchronizer/RecordsConsumerDelegate.java',
'sync/synchronizer/ServerLocalSynchronizer.java',
'sync/synchronizer/ServerLocalSynchronizerSession.java',
--- a/mobile/android/services/src/main/java/org/mozilla/gecko/fxa/sync/FxAccountSyncAdapter.java
+++ b/mobile/android/services/src/main/java/org/mozilla/gecko/fxa/sync/FxAccountSyncAdapter.java
@@ -48,19 +48,20 @@ import org.mozilla.gecko.sync.stage.Glob
import org.mozilla.gecko.sync.telemetry.TelemetryContract;
import org.mozilla.gecko.tokenserver.TokenServerClient;
import org.mozilla.gecko.tokenserver.TokenServerClientDelegate;
import org.mozilla.gecko.tokenserver.TokenServerException;
import org.mozilla.gecko.tokenserver.TokenServerToken;
import java.net.URI;
import java.net.URISyntaxException;
+import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
-import java.util.EnumSet;
+import java.util.List;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
public class FxAccountSyncAdapter extends AbstractThreadedSyncAdapter {
private static final String LOG_TAG = FxAccountSyncAdapter.class.getSimpleName();
@@ -118,18 +119,25 @@ public class FxAccountSyncAdapter extend
super.postponeSync(millis);
}
@Override
public void rejectSync() {
super.rejectSync();
}
+ /* package-local */ void requestFollowUpSync(String stage) {
+ this.stageNamesForFollowUpSync.add(stage);
+ }
+
protected final Collection<String> stageNamesToSync;
+ // Keeps track of incomplete stages during this sync that need to be re-synced once we're done.
+ private final List<String> stageNamesForFollowUpSync = Collections.synchronizedList(new ArrayList<String>());
+
public SyncDelegate(BlockingQueue<Result> latch, SyncResult syncResult, AndroidFxAccount fxAccount, Collection<String> stageNamesToSync) {
super(latch, syncResult);
this.stageNamesToSync = Collections.unmodifiableCollection(stageNamesToSync);
}
public Collection<String> getStageNamesToSync() {
return this.stageNamesToSync;
}
@@ -178,16 +186,25 @@ public class FxAccountSyncAdapter extend
"Firefox Account informMigrated called, but it's not yet possible to migrate. " +
"Ignoring even though something is terribly wrong.");
}
@Override
public void handleStageCompleted(Stage currentState, GlobalSession globalSession) {
}
+ /**
+ * Schedule an incomplete stage for a follow-up sync.
+ */
+ @Override
+ public void handleIncompleteStage(Stage currentState,
+ GlobalSession globalSession) {
+ syncDelegate.requestFollowUpSync(currentState.getRepositoryName());
+ }
+
@Override
public void handleSuccess(GlobalSession globalSession) {
Logger.info(LOG_TAG, "Global session succeeded.");
// Get the number of clients, so we can schedule the sync interval accordingly.
try {
int otherClientsCount = globalSession.getClientsDelegate().getClientsCount();
Logger.debug(LOG_TAG, "" + otherClientsCount + " other client(s).");
@@ -569,12 +586,29 @@ public class FxAccountSyncAdapter extend
latch.take();
} catch (Exception e) {
Logger.error(LOG_TAG, "Got error syncing.", e);
syncDelegate.handleError(e);
} finally {
fxAccount.releaseSharedAccountStateLock();
}
- Logger.info(LOG_TAG, "Syncing done.");
+ // If there are any incomplete stages, request a follow-up sync. Otherwise, we're done.
+ // Incomplete stage is:
+ // - one that hit a 412 error during either upload or download of data, indicating that
+ // its collection has been modified remotely, or
+ // - one that hit a sync deadline
+ final String[] stagesToSyncAgain;
+ synchronized (syncDelegate.stageNamesForFollowUpSync) {
+ stagesToSyncAgain = syncDelegate.stageNamesForFollowUpSync.toArray(
+ new String[syncDelegate.stageNamesForFollowUpSync.size()]
+ );
+ }
+
+ if (stagesToSyncAgain.length > 0) {
+ Logger.info(LOG_TAG, "Syncing done. Requesting an immediate follow-up sync.");
+ fxAccount.requestImmediateSync(stagesToSyncAgain, null);
+ } else {
+ Logger.info(LOG_TAG, "Syncing done.");
+ }
lastSyncRealtimeMillis = SystemClock.elapsedRealtime();
}
}
new file mode 100644
--- /dev/null
+++ b/mobile/android/services/src/main/java/org/mozilla/gecko/sync/CollectionConcurrentModificationException.java
@@ -0,0 +1,15 @@
+/* This Source Code Form is subject to the terms of the Mozilla Public
+ * License, v. 2.0. If a copy of the MPL was not distributed with this
+ * file, You can obtain one at http://mozilla.org/MPL/2.0/. */
+
+package org.mozilla.gecko.sync;
+
+/**
+ * Thrown when a collection has been modified by another client while we were either
+ * downloading from it or uploading to it.
+ *
+ * @author grisha
+ */
+public class CollectionConcurrentModificationException extends ReflowIsNecessaryException {
+ private static final long serialVersionUID = 2701457832508838524L;
+}
--- a/mobile/android/services/src/main/java/org/mozilla/gecko/sync/GlobalSession.java
+++ b/mobile/android/services/src/main/java/org/mozilla/gecko/sync/GlobalSession.java
@@ -474,16 +474,21 @@ public class GlobalSession implements Ht
// e is null, or we aborted for a non-HTTP reason; okay to upload new meta/global record.
if (this.hasUpdatedMetaGlobal()) {
this.uploadUpdatedMetaGlobal(); // Only logs errors; does not call abort.
}
}
this.callback.handleError(this, e);
}
+ public void handleIncompleteStage() {
+ // Let our delegate know that current stage is incomplete and needs to be synced again.
+ callback.handleIncompleteStage(this.currentState, this);
+ }
+
public void handleHTTPError(SyncStorageResponse response, String reason) {
// TODO: handling of 50x (backoff), 401 (node reassignment or auth error).
// Fall back to aborting.
Logger.warn(LOG_TAG, "Aborting sync due to HTTP " + response.getStatusCode());
this.interpretHTTPFailure(response.httpResponse());
this.abort(new HTTPFailureException(response), reason);
}
new file mode 100644
--- /dev/null
+++ b/mobile/android/services/src/main/java/org/mozilla/gecko/sync/ReflowIsNecessaryException.java
@@ -0,0 +1,21 @@
+/* This Source Code Form is subject to the terms of the Mozilla Public
+ * License, v. 2.0. If a copy of the MPL was not distributed with this
+ * file, You can obtain one at http://mozilla.org/MPL/2.0/. */
+
+package org.mozilla.gecko.sync;
+
+/**
+ * Used by SynchronizerSession to indicate that reflow of a stage is necessary.
+ * To reflow a stage is to request that it is synced again. Depending on the stage and its current
+ * state (last-synced timestamp, resume context, high-water-mark) we might resume, or sync from a
+ * high-water-mark if allowed, or sync regularly from last-synced timestamp.
+ * A re-sync of a stage is no different from a regular sync of the same stage.
+ *
+ * Stages which complete only partially due to hitting a concurrent collection modification error or
+ * hitting a sync deadline should be re-synced as soon as possible.
+ *
+ * @author grisha
+ */
+public class ReflowIsNecessaryException extends Exception {
+ private static final long serialVersionUID = -2614772437814638768L;
+}
new file mode 100644
--- /dev/null
+++ b/mobile/android/services/src/main/java/org/mozilla/gecko/sync/SyncDeadlineReachedException.java
@@ -0,0 +1,14 @@
+/* This Source Code Form is subject to the terms of the Mozilla Public
+ * License, v. 2.0. If a copy of the MPL was not distributed with this
+ * file, You can obtain one at http://mozilla.org/MPL/2.0/. */
+
+package org.mozilla.gecko.sync;
+
+/**
+ * Thrown when we've hit a self-imposed sync deadline, and decided not to proceed.
+ *
+ * @author grisha
+ */
+public class SyncDeadlineReachedException extends ReflowIsNecessaryException {
+ private static final long serialVersionUID = 2305367921350245484L;
+}
--- a/mobile/android/services/src/main/java/org/mozilla/gecko/sync/delegates/GlobalSessionCallback.java
+++ b/mobile/android/services/src/main/java/org/mozilla/gecko/sync/delegates/GlobalSessionCallback.java
@@ -33,16 +33,17 @@ public interface GlobalSessionCallback {
* This account should stop syncing immediately, and arrange to delete itself.
*/
void informMigrated(GlobalSession session);
void handleAborted(GlobalSession globalSession, String reason);
void handleError(GlobalSession globalSession, Exception ex);
void handleSuccess(GlobalSession globalSession);
void handleStageCompleted(Stage currentState, GlobalSession globalSession);
+ void handleIncompleteStage(Stage currentState, GlobalSession globalSession);
/**
* Called when a {@link GlobalSession} wants to know if it should continue
* to make storage requests.
*
* @return false if the session should make no further requests.
*/
boolean shouldBackOffStorage();
--- a/mobile/android/services/src/main/java/org/mozilla/gecko/sync/middleware/BufferingMiddlewareRepositorySession.java
+++ b/mobile/android/services/src/main/java/org/mozilla/gecko/sync/middleware/BufferingMiddlewareRepositorySession.java
@@ -2,16 +2,17 @@
* License, v. 2.0. If a copy of the MPL was not distributed with this
* file, You can obtain one at http://mozilla.org/MPL/2.0/. */
package org.mozilla.gecko.sync.middleware;
import android.os.SystemClock;
import android.support.annotation.VisibleForTesting;
+import org.mozilla.gecko.sync.SyncDeadlineReachedException;
import org.mozilla.gecko.sync.middleware.storage.BufferStorage;
import org.mozilla.gecko.sync.repositories.InactiveSessionException;
import org.mozilla.gecko.sync.repositories.NoStoreDelegateException;
import org.mozilla.gecko.sync.repositories.RepositorySession;
import org.mozilla.gecko.sync.repositories.delegates.RepositorySessionFetchRecordsDelegate;
import org.mozilla.gecko.sync.repositories.delegates.RepositorySessionStoreDelegate;
import org.mozilla.gecko.sync.repositories.domain.Record;
@@ -30,18 +31,16 @@ import java.util.concurrent.Executors;
* @author grisha
*/
/* package-local */ class BufferingMiddlewareRepositorySession extends MiddlewareRepositorySession {
private final BufferStorage bufferStorage;
private final long syncDeadlineMillis;
private ExecutorService storeDelegateExecutor = Executors.newSingleThreadExecutor();
- private volatile boolean storeMarkedIncomplete = false;
-
/* package-local */ BufferingMiddlewareRepositorySession(
RepositorySession repositorySession, MiddlewareRepository repository,
long syncDeadlineMillis, BufferStorage bufferStorage) {
super(repositorySession, repository);
this.syncDeadlineMillis = syncDeadlineMillis;
this.bufferStorage = bufferStorage;
}
@@ -70,97 +69,85 @@ import java.util.concurrent.Executors;
bufferStorage.clear();
}
@Override
public void store(Record record) throws NoStoreDelegateException {
bufferStorage.addOrReplace(record);
}
+ /**
+ * When source fails to provide all records, we need to decide what to do with the buffer.
+ * We might fail because of a network partition, or because of a concurrent modification of a
+ * collection, or because we ran out of time fetching records, or some other reason.
+ *
+ * Either way we do not clear the buffer in any error scenario, but rather
+ * allow it to be re-filled, replacing existing records with their newer versions if necessary.
+ *
+ * If a collection has been modified, affected records' last-modified timestamps will be bumped,
+ * and we will receive those records during the next sync. If we already have them in our buffer,
+ * we replace our now-old copy. Otherwise, they are new records and we just append them.
+ *
+ * Incoming records are mapped to existing ones via GUIDs.
+ */
@Override
public void storeIncomplete() {
- storeMarkedIncomplete = true;
+ bufferStorage.flush();
}
@Override
public void storeDone() {
storeDone(System.currentTimeMillis());
}
@Override
public void storeFlush() {
bufferStorage.flush();
}
@Override
public void storeDone(final long end) {
- doStoreDonePrepare();
+ bufferStorage.flush();
- // Determine if we have enough time to perform consistency checks on the buffered data and
- // then store it. If we don't have enough time now, we keep our buffer and try again later.
- // We don't store results of a buffer consistency check anywhere, so we can't treat it
- // separately from storage.
- if (storeMarkedIncomplete || !mayProceedToMergeBuffer()) {
+ // Determine if we have enough time to merge the buffer data.
+ // If we don't have enough time now, we keep our buffer and try again later.
+ if (!mayProceedToMergeBuffer()) {
super.abort();
- storeDelegate.deferredStoreDelegate(storeDelegateExecutor).onStoreCompleted(end);
+ storeDelegate.deferredStoreDelegate(storeDelegateExecutor).onStoreFailed(new SyncDeadlineReachedException());
return;
}
- // Separate actual merge, so that it may be tested without involving system clock.
- doStoreDone(end);
+ doMergeBuffer(end);
}
@VisibleForTesting
- public void doStoreDonePrepare() {
- // Now that records stopped flowing, persist them.
- bufferStorage.flush();
- }
-
- @VisibleForTesting
- public void doStoreDone(final long end) {
- final Collection<Record> buffer = bufferStorage.all();
+ /* package-local */ void doMergeBuffer(long end) {
+ final Collection<Record> bufferData = bufferStorage.all();
// Trivial case of an empty buffer.
- if (buffer.isEmpty()) {
+ if (bufferData.isEmpty()) {
super.storeDone(end);
return;
}
- // Flush our buffer to the wrapped local repository. Data goes live!
+ // Let session handle actual storing of records as it pleases.
+ // See Bug 1332094 which is concerned with allowing merge to proceed transactionally.
try {
- for (Record record : buffer) {
+ for (Record record : bufferData) {
this.inner.store(record);
}
} catch (NoStoreDelegateException e) {
- // At this point we should have a delegate, so this won't happen.
+ // At this point we should have a store delegate set on the session, so this won't happen.
}
- // And, we're done!
+ // Let session know that there are no more records to store.
super.storeDone(end);
}
/**
- * When source fails to provide more records, we need to decide what to do with the buffer.
- * We might fail because of a network partition, or because of a concurrent modification of a
- * collection. Either way we do not clear the buffer in a general case. If a collection has been
- * modified, affected records' last-modified timestamps will be bumped, and we will receive those
- * records during the next sync. If we already have them in our buffer, we replace our now-old
- * copy. Otherwise, they are new records and we just append them.
- *
- * We depend on GUIDs to be a primary key for incoming records.
- *
- * @param e indicates reason of failure.
- */
- @Override
- public void sourceFailed(Exception e) {
- bufferStorage.flush();
- super.sourceFailed(e);
- }
-
- /**
* Session abnormally aborted. This doesn't mean our so-far buffered data is invalid.
* Clean up after ourselves, if there's anything to clean up.
*/
@Override
public void abort() {
bufferStorage.flush();
super.abort();
}
--- a/mobile/android/services/src/main/java/org/mozilla/gecko/sync/repositories/RepositorySession.java
+++ b/mobile/android/services/src/main/java/org/mozilla/gecko/sync/repositories/RepositorySession.java
@@ -19,17 +19,17 @@ import org.mozilla.gecko.sync.repositori
import org.mozilla.gecko.sync.repositories.delegates.RepositorySessionWipeDelegate;
import org.mozilla.gecko.sync.repositories.domain.Record;
/**
* A <code>RepositorySession</code> is created and used thusly:
*
*<ul>
* <li>Construct, with a reference to its parent {@link Repository}, by calling
- * {@link Repository#createSession(RepositorySessionCreationDelegate, android.content.Context)}.</li>
+ * {@link Repository#createSession(org.mozilla.gecko.sync.repositories.delegates.RepositorySessionCreationDelegate, android.content.Context)}.</li>
* <li>Populate with saved information by calling {@link #unbundle(RepositorySessionBundle)}.</li>
* <li>Begin a sync by calling {@link #begin(RepositorySessionBeginDelegate)}. <code>begin()</code>
* is an appropriate place to initialize expensive resources.</li>
* <li>Perform operations such as {@link #fetchSince(long, RepositorySessionFetchRecordsDelegate)} and
* {@link #store(Record)}.</li>
* <li>Finish by calling {@link #finish(RepositorySessionFinishDelegate)}, retrieving and storing
* the current bundle.</li>
*</ul>
@@ -154,24 +154,16 @@ public abstract class RepositorySession
/**
* Indicates that a number of records have been stored, more are still to come but after some time,
* and now would be a good time to flush records and perform any other similar operations.
*/
public void storeFlush() {
}
/**
- * During flow of records, indicates that source failed.
- *
- * @param e indicates reason of failure.
- */
- public void sourceFailed(Exception e) {
- }
-
- /**
* Indicates that a flow of records have been completed.
*/
public void performCleanup() {
}
public abstract void wipe(RepositorySessionWipeDelegate delegate);
/**
--- a/mobile/android/services/src/main/java/org/mozilla/gecko/sync/repositories/delegates/DeferredRepositorySessionStoreDelegate.java
+++ b/mobile/android/services/src/main/java/org/mozilla/gecko/sync/repositories/delegates/DeferredRepositorySessionStoreDelegate.java
@@ -49,9 +49,19 @@ public class DeferredRepositorySessionSt
public void onStoreCompleted(final long storeEnd) {
executor.execute(new Runnable() {
@Override
public void run() {
inner.onStoreCompleted(storeEnd);
}
});
}
+
+ @Override
+ public void onStoreFailed(final Exception e) {
+ executor.execute(new Runnable() {
+ @Override
+ public void run() {
+ inner.onStoreFailed(e);
+ }
+ });
+ }
}
--- a/mobile/android/services/src/main/java/org/mozilla/gecko/sync/repositories/delegates/RepositorySessionStoreDelegate.java
+++ b/mobile/android/services/src/main/java/org/mozilla/gecko/sync/repositories/delegates/RepositorySessionStoreDelegate.java
@@ -9,15 +9,16 @@ import java.util.concurrent.ExecutorServ
/**
* These methods *must* be invoked asynchronously. Use deferredStoreDelegate if you
* need help doing this.
*
* @author rnewman
*
*/
public interface RepositorySessionStoreDelegate {
- public void onRecordStoreFailed(Exception ex, String recordGuid);
+ void onRecordStoreFailed(Exception ex, String recordGuid);
// Called with a GUID when store has succeeded.
- public void onRecordStoreSucceeded(String guid);
- public void onStoreCompleted(long storeEnd);
- public RepositorySessionStoreDelegate deferredStoreDelegate(ExecutorService executor);
+ void onRecordStoreSucceeded(String guid);
+ void onStoreCompleted(long storeEnd);
+ void onStoreFailed(Exception e);
+ RepositorySessionStoreDelegate deferredStoreDelegate(ExecutorService executor);
}
--- a/mobile/android/services/src/main/java/org/mozilla/gecko/sync/repositories/downloaders/BatchingDownloader.java
+++ b/mobile/android/services/src/main/java/org/mozilla/gecko/sync/repositories/downloaders/BatchingDownloader.java
@@ -5,31 +5,32 @@
package org.mozilla.gecko.sync.repositories.downloaders;
import android.net.Uri;
import android.os.SystemClock;
import android.support.annotation.Nullable;
import android.support.annotation.VisibleForTesting;
import org.mozilla.gecko.background.common.log.Logger;
+import org.mozilla.gecko.sync.CollectionConcurrentModificationException;
import org.mozilla.gecko.sync.CryptoRecord;
import org.mozilla.gecko.sync.DelayedWorkTracker;
+import org.mozilla.gecko.sync.SyncDeadlineReachedException;
import org.mozilla.gecko.sync.Utils;
import org.mozilla.gecko.sync.net.AuthHeaderProvider;
import org.mozilla.gecko.sync.net.SyncResponse;
import org.mozilla.gecko.sync.net.SyncStorageCollectionRequest;
import org.mozilla.gecko.sync.net.SyncStorageResponse;
import org.mozilla.gecko.sync.repositories.RepositorySession;
import org.mozilla.gecko.sync.repositories.delegates.RepositorySessionFetchRecordsDelegate;
import java.io.UnsupportedEncodingException;
import java.net.URI;
import java.net.URISyntaxException;
import java.util.Collections;
-import java.util.ConcurrentModificationException;
import java.util.HashSet;
import java.util.Set;
import java.util.concurrent.TimeUnit;
/**
* Batching Downloader implements batching protocol as supported by Sync 1.5.
*
* Downloader's batching behaviour is configured via two parameters, obtained from the repository:
@@ -180,19 +181,19 @@ public class BatchingDownloader {
this.lastModified = currentLastModifiedTimestamp;
}
lastModifiedChanged = !this.lastModified.equals(currentLastModifiedTimestamp);
}
// We expected server to fail our request with 412 in case of concurrent modifications, so
// this is unexpected. However, let's treat this case just as if we received a 412.
if (lastModifiedChanged) {
- this.abort(
+ this.handleFetchFailed(
fetchRecordsDelegate,
- new ConcurrentModificationException("Last-modified timestamp has changed unexpectedly")
+ new CollectionConcurrentModificationException()
);
return;
}
// If we can (or must) stop batching at this point, let the delegate know that we're all done!
final String offset = response.weaveOffset();
if (offset == null || !allowMultipleBatches) {
final long normalizedTimestamp = response.normalizedTimestampForHeader(SyncResponse.X_LAST_MODIFIED);
@@ -217,17 +218,17 @@ public class BatchingDownloader {
public void run() {
Logger.debug(LOG_TAG, "Running onBatchCompleted.");
fetchRecordsDelegate.onBatchCompleted();
}
});
// Should we proceed, however? Do we have enough time?
if (!mayProceedWithBatching(fetchDeadline)) {
- this.abort(fetchRecordsDelegate, new Exception("Not enough time to complete next batch"));
+ this.handleFetchFailed(fetchRecordsDelegate, new SyncDeadlineReachedException());
return;
}
// Create and execute new batch request.
try {
final SyncStorageCollectionRequest newRequest = makeSyncStorageCollectionRequest(newer,
limit, full, sort, ids, offset);
this.fetchWithParameters(newer, limit, full, sort, ids, newRequest, fetchRecordsDelegate);
@@ -237,20 +238,26 @@ public class BatchingDownloader {
public void run() {
Logger.debug(LOG_TAG, "Delayed onFetchCompleted running.");
fetchRecordsDelegate.onFetchFailed(e);
}
});
}
}
- public void onFetchFailed(final Exception ex,
- final RepositorySessionFetchRecordsDelegate fetchRecordsDelegate,
- final SyncStorageCollectionRequest request) {
- removeRequestFromPending(request);
+ private void handleFetchFailed(final RepositorySessionFetchRecordsDelegate fetchRecordsDelegate,
+ final Exception ex) {
+ handleFetchFailed(fetchRecordsDelegate, ex, null);
+ }
+
+ /* package-local */ void handleFetchFailed(final RepositorySessionFetchRecordsDelegate fetchRecordsDelegate,
+ final Exception ex,
+ @Nullable final SyncStorageCollectionRequest request) {
+ this.removeRequestFromPending(request);
+ this.abortRequests();
this.workTracker.delayWorkItem(new Runnable() {
@Override
public void run() {
Logger.debug(LOG_TAG, "Running onFetchFailed.");
fetchRecordsDelegate.onFetchFailed(ex);
}
});
}
@@ -286,28 +293,16 @@ public class BatchingDownloader {
}
}
@Nullable
protected synchronized String getLastModified() {
return this.lastModified;
}
- private void abort(final RepositorySessionFetchRecordsDelegate delegate, final Exception exception) {
- Logger.error(LOG_TAG, exception.getMessage());
- this.abortRequests();
- this.workTracker.delayWorkItem(new Runnable() {
- @Override
- public void run() {
- Logger.debug(LOG_TAG, "Delayed onFetchCompleted running.");
- delegate.onFetchFailed(exception);
- }
- });
- }
-
private static boolean mayProceedWithBatching(long deadline) {
// For simplicity, allow batching to proceed if there's at least a minute left for the sync.
// This should be enough to fetch and process records in the batch.
final long timeLeft = deadline - SystemClock.elapsedRealtime();
return timeLeft > TimeUnit.MINUTES.toMillis(1);
}
@VisibleForTesting
--- a/mobile/android/services/src/main/java/org/mozilla/gecko/sync/repositories/downloaders/BatchingDownloaderDelegate.java
+++ b/mobile/android/services/src/main/java/org/mozilla/gecko/sync/repositories/downloaders/BatchingDownloaderDelegate.java
@@ -1,27 +1,26 @@
/* This Source Code Form is subject to the terms of the Mozilla Public
* License, v. 2.0. If a copy of the MPL was not distributed with this
* file, You can obtain one at http://mozilla.org/MPL/2.0/. */
package org.mozilla.gecko.sync.repositories.downloaders;
import org.mozilla.gecko.background.common.log.Logger;
+import org.mozilla.gecko.sync.CollectionConcurrentModificationException;
import org.mozilla.gecko.sync.CryptoRecord;
import org.mozilla.gecko.sync.HTTPFailureException;
import org.mozilla.gecko.sync.crypto.KeyBundle;
import org.mozilla.gecko.sync.net.AuthHeaderProvider;
import org.mozilla.gecko.sync.net.SyncStorageCollectionRequest;
import org.mozilla.gecko.sync.net.SyncStorageResponse;
import org.mozilla.gecko.sync.net.WBOCollectionRequestDelegate;
import org.mozilla.gecko.sync.repositories.delegates.RepositorySessionFetchRecordsDelegate;
-import java.util.ConcurrentModificationException;
-
/**
* Delegate that gets passed into fetch methods to handle server response from fetch.
*/
public class BatchingDownloaderDelegate extends WBOCollectionRequestDelegate {
public static final String LOG_TAG = "BatchingDownloaderDelegate";
private final BatchingDownloader downloader;
private final RepositorySessionFetchRecordsDelegate fetchRecordsDelegate;
@@ -55,47 +54,48 @@ public class BatchingDownloaderDelegate
@Override
public String ifUnmodifiedSince() {
return this.downloader.getLastModified();
}
@Override
public void handleRequestSuccess(SyncStorageResponse response) {
Logger.debug(LOG_TAG, "Fetch done.");
- if (response.lastModified() != null) {
- this.downloader.onFetchCompleted(response, this.fetchRecordsDelegate, this.request,
- this.newer, this.batchLimit, this.full, this.sort, this.ids);
+
+ // Sanity check.
+ if (response.lastModified() == null) {
+ this.downloader.handleFetchFailed(
+ this.fetchRecordsDelegate,
+ new IllegalStateException("Missing last modified header from response"),
+ this.request
+ );
return;
}
- this.downloader.onFetchFailed(
- new IllegalStateException("Missing last modified header from response"),
- this.fetchRecordsDelegate,
- this.request);
+
+ this.downloader.onFetchCompleted(response, this.fetchRecordsDelegate, this.request,
+ this.newer, this.batchLimit, this.full, this.sort, this.ids);
}
@Override
public void handleRequestFailure(SyncStorageResponse response) {
Logger.warn(LOG_TAG, "Got a non-success response.");
- // Handle concurrent modification errors separately. We will need to signal upwards that
- // this happened, in case stage buffer will want to clean up.
+ // Handle concurrent modification errors separately.
+ final Exception ex;
if (response.getStatusCode() == 412) {
- this.downloader.onFetchFailed(
- new ConcurrentModificationException(),
- this.fetchRecordsDelegate,
- this.request
- );
+ ex = new CollectionConcurrentModificationException();
} else {
- this.handleRequestError(new HTTPFailureException(response));
+ ex = new HTTPFailureException(response);
}
+ this.handleRequestError(ex);
}
@Override
public void handleRequestError(final Exception ex) {
Logger.warn(LOG_TAG, "Got request error.", ex);
- this.downloader.onFetchFailed(ex, this.fetchRecordsDelegate, this.request);
+ this.downloader.handleFetchFailed(this.fetchRecordsDelegate, ex, this.request);
}
@Override
public void handleWBO(CryptoRecord record) {
this.downloader.onFetchedRecord(record, this.fetchRecordsDelegate);
}
@Override
--- a/mobile/android/services/src/main/java/org/mozilla/gecko/sync/repositories/uploaders/BatchingUploader.java
+++ b/mobile/android/services/src/main/java/org/mozilla/gecko/sync/repositories/uploaders/BatchingUploader.java
@@ -3,24 +3,27 @@
* file, You can obtain one at http://mozilla.org/MPL/2.0/. */
package org.mozilla.gecko.sync.repositories.uploaders;
import android.net.Uri;
import android.support.annotation.VisibleForTesting;
import org.mozilla.gecko.background.common.log.Logger;
+import org.mozilla.gecko.sync.CollectionConcurrentModificationException;
import org.mozilla.gecko.sync.InfoConfiguration;
+import org.mozilla.gecko.sync.Server15PreviousPostFailedException;
import org.mozilla.gecko.sync.net.AuthHeaderProvider;
import org.mozilla.gecko.sync.repositories.RepositorySession;
import org.mozilla.gecko.sync.repositories.delegates.RepositorySessionStoreDelegate;
import org.mozilla.gecko.sync.repositories.domain.Record;
import java.util.ArrayList;
import java.util.concurrent.Executor;
+import java.util.concurrent.ExecutorService;
import java.util.concurrent.atomic.AtomicLong;
/**
* Uploader which implements batching introduced in Sync 1.5.
*
* Batch vs payload terminology:
* - batch is comprised of a series of payloads, which are all committed at the same time.
* -- identified via a "batch token", which is returned after first payload for the batch has been uploaded.
@@ -65,16 +68,17 @@ public class BatchingUploader {
// Sanity check. RECORD_SEPARATOR and RECORD_START are assumed to be of the same length.
static {
if (RecordUploadRunnable.RECORD_SEPARATOR.length != RecordUploadRunnable.RECORDS_START.length) {
throw new IllegalStateException("Separator and start tokens must be of the same length");
}
}
// Accessed by the record consumer thread pool.
+ private final ExecutorService executor;
// Will be re-created, so mark it as volatile.
private volatile Payload payload;
// Accessed by both the record consumer thread pool and the network worker thread(s).
/* package-local */ final Uri collectionUri;
/* package-local */ final RepositorySessionStoreDelegate sessionStoreDelegate;
/* package-local */ @VisibleForTesting final PayloadDispatcher payloadDispatcher;
/* package-local */ final AuthHeaderProvider authHeaderProvider;
@@ -83,36 +87,53 @@ public class BatchingUploader {
private volatile UploaderMeta uploaderMeta;
// Used to ensure we have thread-safe access to the following:
// - byte and record counts in both Payload and BatchMeta objects
// - buffers in the Payload object
private final Object payloadLock = new Object();
public BatchingUploader(
- final RepositorySession repositorySession, final Executor workQueue,
+ final RepositorySession repositorySession, final ExecutorService workQueue,
final RepositorySessionStoreDelegate sessionStoreDelegate, final Uri baseCollectionUri,
final Long localCollectionLastModified, final InfoConfiguration infoConfiguration,
final AuthHeaderProvider authHeaderProvider) {
this.repositorySession = repositorySession;
this.sessionStoreDelegate = sessionStoreDelegate;
this.collectionUri = baseCollectionUri;
this.authHeaderProvider = authHeaderProvider;
this.uploaderMeta = new UploaderMeta(
payloadLock, infoConfiguration.maxTotalBytes, infoConfiguration.maxTotalRecords);
this.payload = new Payload(
payloadLock, infoConfiguration.maxPostBytes, infoConfiguration.maxPostRecords);
- this.payloadDispatcher = new PayloadDispatcher(workQueue, this, localCollectionLastModified);
+ this.payloadDispatcher = createPayloadDispatcher(workQueue, localCollectionLastModified);
+
+ this.executor = workQueue;
}
// Called concurrently from the threads running off of a record consumer thread pool.
public void process(final Record record) {
final String guid = record.guid;
+
+ // If store failed entirely, just bail out. We've already told our delegate that we failed.
+ if (payloadDispatcher.storeFailed) {
+ return;
+ }
+
+ // If a record or a payload failed, we won't let subsequent requests proceed.'
+ // This means that we may bail much earlier.
+ if (payloadDispatcher.recordUploadFailed) {
+ sessionStoreDelegate.deferredStoreDelegate(executor).onRecordStoreFailed(
+ new Server15PreviousPostFailedException(), guid
+ );
+ return;
+ }
+
final byte[] recordBytes = record.toJSONBytes();
final long recordDeltaByteCount = recordBytes.length + PER_RECORD_OVERHEAD_BYTE_COUNT;
Logger.debug(LOG_TAG, "Processing a record with guid: " + guid);
// We can't upload individual records which exceed our payload byte limit.
if ((recordDeltaByteCount + PER_PAYLOAD_OVERHEAD_BYTE_COUNT) > payload.maxBytes) {
sessionStoreDelegate.onRecordStoreFailed(new RecordTooLargeToUpload(), guid);
@@ -213,17 +234,25 @@ public class BatchingUploader {
payloadDispatcher.queue(outgoing, outgoingGuids, byteCount, isCommit, isLastPayload);
if (isCommit && !isLastPayload) {
uploaderMeta = uploaderMeta.nextUploaderMeta();
}
}
- /* package-local */ static class BatchingUploaderException extends Exception {
+ /**
+ * Allows tests to define their own PayloadDispatcher.
+ */
+ @VisibleForTesting
+ PayloadDispatcher createPayloadDispatcher(ExecutorService workQueue, Long localCollectionLastModified) {
+ return new PayloadDispatcher(workQueue, this, localCollectionLastModified);
+ }
+
+ public static class BatchingUploaderException extends Exception {
private static final long serialVersionUID = 1L;
}
/* package-local */ static class LastModifiedDidNotChange extends BatchingUploaderException {
private static final long serialVersionUID = 1L;
}
/* package-local */ static class LastModifiedChangedUnexpectedly extends BatchingUploaderException {
private static final long serialVersionUID = 1L;
}
--- a/mobile/android/services/src/main/java/org/mozilla/gecko/sync/repositories/uploaders/PayloadDispatcher.java
+++ b/mobile/android/services/src/main/java/org/mozilla/gecko/sync/repositories/uploaders/PayloadDispatcher.java
@@ -3,16 +3,17 @@
* file, You can obtain one at http://mozilla.org/MPL/2.0/. */
package org.mozilla.gecko.sync.repositories.uploaders;
import android.support.annotation.Nullable;
import android.support.annotation.VisibleForTesting;
import org.mozilla.gecko.background.common.log.Logger;
+import org.mozilla.gecko.sync.CollectionConcurrentModificationException;
import org.mozilla.gecko.sync.Server15RecordPostFailedException;
import org.mozilla.gecko.sync.net.SyncResponse;
import org.mozilla.gecko.sync.net.SyncStorageResponse;
import java.util.ArrayList;
import java.util.concurrent.Executor;
import java.util.concurrent.atomic.AtomicLong;
@@ -25,22 +26,25 @@ import java.util.concurrent.atomic.Atomi
class PayloadDispatcher {
private static final String LOG_TAG = "PayloadDispatcher";
// All payload runnables share the same whiteboard.
// It's accessed directly by the runnables; tests also make use of this direct access.
volatile BatchMeta batchWhiteboard;
private final AtomicLong uploadTimestamp = new AtomicLong(0);
- // Accessed from different threads sequentially running on the 'executor'.
- private volatile boolean recordUploadFailed = false;
-
private final Executor executor;
private final BatchingUploader uploader;
+ // For both of these flags:
+ // Written from sequentially running thread(s) on the SingleThreadExecutor `executor`.
+ // Read by many threads running concurrently on the records consumer thread pool.
+ volatile boolean recordUploadFailed = false;
+ volatile boolean storeFailed = false;
+
PayloadDispatcher(Executor executor, BatchingUploader uploader, @Nullable Long initialLastModified) {
// Initially we don't know if we're in a batching mode.
this.batchWhiteboard = new BatchMeta(initialLastModified, null);
this.uploader = uploader;
this.executor = executor;
}
void queue(
@@ -48,31 +52,17 @@ class PayloadDispatcher {
final ArrayList<String> outgoingGuids,
final long byteCount,
final boolean isCommit, final boolean isLastPayload) {
// Note that `executor` is expected to be a SingleThreadExecutor.
executor.execute(new BatchContextRunnable(isCommit) {
@Override
public void run() {
- new RecordUploadRunnable(
- new BatchingAtomicUploaderMayUploadProvider(),
- uploader.collectionUri,
- batchWhiteboard.getToken(),
- new PayloadUploadDelegate(
- uploader.getRepositorySession().getServerRepository().getAuthHeaderProvider(),
- PayloadDispatcher.this,
- outgoingGuids,
- isCommit,
- isLastPayload
- ),
- outgoing,
- byteCount,
- isCommit
- ).run();
+ createRecordUploadRunnable(outgoing, outgoingGuids, byteCount, isCommit, isLastPayload).run();
}
});
}
void setInBatchingMode(boolean inBatchingMode) {
batchWhiteboard.setInBatchingMode(inBatchingMode);
uploader.setUnlimitedMode(!inBatchingMode);
}
@@ -137,16 +127,22 @@ class PayloadDispatcher {
}
void recordFailed(final Exception e, final String recordGuid) {
Logger.debug(LOG_TAG, "Record store failed for guid " + recordGuid + " with exception: " + e.toString());
recordUploadFailed = true;
uploader.sessionStoreDelegate.onRecordStoreFailed(e, recordGuid);
}
+ void concurrentModificationDetected() {
+ recordUploadFailed = true;
+ storeFailed = true;
+ uploader.sessionStoreDelegate.onStoreFailed(new CollectionConcurrentModificationException());
+ }
+
void prepareForNextBatch() {
batchWhiteboard = batchWhiteboard.nextBatchMeta();
}
private static void bumpTimestampTo(final AtomicLong current, long newValue) {
while (true) {
long existing = current.get();
if (existing > newValue) {
@@ -154,16 +150,41 @@ class PayloadDispatcher {
}
if (current.compareAndSet(existing, newValue)) {
return;
}
}
}
/**
+ * Allows tests to define their own RecordUploadRunnable.
+ */
+ @VisibleForTesting
+ Runnable createRecordUploadRunnable(final ArrayList<byte[]> outgoing,
+ final ArrayList<String> outgoingGuids,
+ final long byteCount,
+ final boolean isCommit, final boolean isLastPayload) {
+ return new RecordUploadRunnable(
+ new BatchingAtomicUploaderMayUploadProvider(),
+ uploader.collectionUri,
+ batchWhiteboard.getToken(),
+ new PayloadUploadDelegate(
+ uploader.authHeaderProvider,
+ PayloadDispatcher.this,
+ outgoingGuids,
+ isCommit,
+ isLastPayload
+ ),
+ outgoing,
+ byteCount,
+ isCommit
+ );
+ }
+
+ /**
* Allows tests to easily peek into the flow of upload tasks.
*/
@VisibleForTesting
abstract static class BatchContextRunnable implements Runnable {
boolean isCommit;
BatchContextRunnable(boolean isCommit) {
this.isCommit = isCommit;
@@ -171,14 +192,15 @@ class PayloadDispatcher {
}
/**
* Allows tests to tell apart non-payload runnables going through the executor.
*/
@VisibleForTesting
abstract static class NonPayloadContextRunnable implements Runnable {}
+ // Instances of this class must be accessed from threads running on the `executor`.
private class BatchingAtomicUploaderMayUploadProvider implements MayUploadProvider {
public boolean mayUpload() {
return !recordUploadFailed;
}
}
}
--- a/mobile/android/services/src/main/java/org/mozilla/gecko/sync/repositories/uploaders/PayloadUploadDelegate.java
+++ b/mobile/android/services/src/main/java/org/mozilla/gecko/sync/repositories/uploaders/PayloadUploadDelegate.java
@@ -179,17 +179,21 @@ class PayloadUploadDelegate implements S
if (isCommit && !isLastPayload) {
dispatcher.prepareForNextBatch();
}
}
@Override
public void handleRequestFailure(final SyncStorageResponse response) {
- this.handleRequestError(new HTTPFailureException(response));
+ if (response.getStatusCode() == 412) {
+ dispatcher.concurrentModificationDetected();
+ } else {
+ this.handleRequestError(new HTTPFailureException(response));
+ }
}
@Override
public void handleRequestError(Exception e) {
for (String guid : postedRecordGuids) {
dispatcher.recordFailed(e, guid);
}
// GC
--- a/mobile/android/services/src/main/java/org/mozilla/gecko/sync/stage/ServerSyncStage.java
+++ b/mobile/android/services/src/main/java/org/mozilla/gecko/sync/stage/ServerSyncStage.java
@@ -9,16 +9,17 @@ import android.os.SystemClock;
import org.mozilla.gecko.background.common.log.Logger;
import org.mozilla.gecko.sync.EngineSettings;
import org.mozilla.gecko.sync.GlobalSession;
import org.mozilla.gecko.sync.HTTPFailureException;
import org.mozilla.gecko.sync.MetaGlobalException;
import org.mozilla.gecko.sync.NoCollectionKeysSetException;
import org.mozilla.gecko.sync.NonObjectJSONException;
+import org.mozilla.gecko.sync.ReflowIsNecessaryException;
import org.mozilla.gecko.sync.SynchronizerConfiguration;
import org.mozilla.gecko.sync.Utils;
import org.mozilla.gecko.sync.crypto.KeyBundle;
import org.mozilla.gecko.sync.delegates.WipeServerDelegate;
import org.mozilla.gecko.sync.middleware.Crypto5MiddlewareRepository;
import org.mozilla.gecko.sync.net.AuthHeaderProvider;
import org.mozilla.gecko.sync.net.BaseResource;
import org.mozilla.gecko.sync.net.SyncStorageRequest;
@@ -617,13 +618,19 @@ public abstract class ServerSyncStage ex
if (response.retryAfterInSeconds() > 0) {
session.handleHTTPError(response, reason); // Calls session.abort().
return;
} else {
session.interpretHTTPFailure(response.httpResponse()); // Does not call session.abort().
}
}
+ // Let global session know that this stage is not complete (due to a 412 or hitting a deadline).
+ // This stage will be re-synced once current sync is complete.
+ if (lastException instanceof ReflowIsNecessaryException) {
+ session.handleIncompleteStage();
+ }
+
Logger.info(LOG_TAG, "Advancing session even though stage failed (took " + getStageDurationString() +
"). Timestamps not persisted.");
session.advance();
}
}
--- a/mobile/android/services/src/main/java/org/mozilla/gecko/sync/synchronizer/ConcurrentRecordConsumer.java
+++ b/mobile/android/services/src/main/java/org/mozilla/gecko/sync/synchronizer/ConcurrentRecordConsumer.java
@@ -60,17 +60,21 @@ class ConcurrentRecordConsumer extends R
Logger.trace(LOG_TAG, "Record stored. Notifying.");
synchronized (countMonitor) {
counter++;
}
}
private void consumerIsDone() {
Logger.debug(LOG_TAG, "Consumer is done. Processed " + counter + ((counter == 1) ? " record." : " records."));
- delegate.consumerIsDone(allRecordsQueued);
+ if (allRecordsQueued) {
+ delegate.consumerIsDoneFull();
+ } else {
+ delegate.consumerIsDonePartial();
+ }
}
@Override
public void run() {
Record record;
while (true) {
// The queue is concurrent-safe.
--- a/mobile/android/services/src/main/java/org/mozilla/gecko/sync/synchronizer/RecordsChannel.java
+++ b/mobile/android/services/src/main/java/org/mozilla/gecko/sync/synchronizer/RecordsChannel.java
@@ -1,19 +1,23 @@
/* This Source Code Form is subject to the terms of the Mozilla Public
* License, v. 2.0. If a copy of the MPL was not distributed with this
* file, You can obtain one at http://mozilla.org/MPL/2.0/. */
package org.mozilla.gecko.sync.synchronizer;
+import android.support.annotation.NonNull;
+import android.support.annotation.Nullable;
+
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.atomic.AtomicInteger;
import org.mozilla.gecko.background.common.log.Logger;
+import org.mozilla.gecko.sync.ReflowIsNecessaryException;
import org.mozilla.gecko.sync.ThreadPool;
import org.mozilla.gecko.sync.repositories.InvalidSessionTransitionException;
import org.mozilla.gecko.sync.repositories.NoStoreDelegateException;
import org.mozilla.gecko.sync.repositories.RepositorySession;
import org.mozilla.gecko.sync.repositories.delegates.DeferredRepositorySessionBeginDelegate;
import org.mozilla.gecko.sync.repositories.delegates.DeferredRepositorySessionStoreDelegate;
import org.mozilla.gecko.sync.repositories.delegates.RepositorySessionBeginDelegate;
import org.mozilla.gecko.sync.repositories.delegates.RepositorySessionFetchRecordsDelegate;
@@ -67,16 +71,18 @@ public class RecordsChannel implements
RepositorySessionBeginDelegate {
private static final String LOG_TAG = "RecordsChannel";
public RepositorySession source;
public RepositorySession sink;
private final RecordsChannelDelegate delegate;
private long fetchEnd = -1;
+ private volatile ReflowIsNecessaryException reflowException;
+
protected final AtomicInteger numFetched = new AtomicInteger();
protected final AtomicInteger numFetchFailed = new AtomicInteger();
protected final AtomicInteger numStored = new AtomicInteger();
protected final AtomicInteger numStoreFailed = new AtomicInteger();
public RecordsChannel(RepositorySession source, RepositorySession sink, RecordsChannelDelegate delegate) {
this.source = source;
this.sink = sink;
@@ -88,17 +94,17 @@ public class RecordsChannel implements
* A separate thread is waiting for us to notify it of work to do.
* When we tell it to stop, it'll stop. We do that when the fetch
* is completed.
* When it stops, we tell the sink that there are no more records,
* and wait for the sink to tell us that storing is done.
* Then we notify our delegate of completion.
*/
private RecordConsumer consumer;
- private boolean waitingForQueueDone = false;
+ private volatile boolean waitingForQueueDone = false;
private final ConcurrentLinkedQueue<Record> toProcess = new ConcurrentLinkedQueue<Record>();
@Override
public ConcurrentLinkedQueue<Record> getQueue() {
return toProcess;
}
protected boolean isReady() {
@@ -199,19 +205,22 @@ public class RecordsChannel implements
} catch (NoStoreDelegateException e) {
Logger.error(LOG_TAG, "Got NoStoreDelegateException in RecordsChannel.store(). This should not occur. Aborting.", e);
delegate.onFlowStoreFailed(this, e, record.guid);
}
}
@Override
public void onFetchFailed(Exception ex) {
- Logger.warn(LOG_TAG, "onFetchFailed. Informing sink, calling for immediate stop.", ex);
- sink.sourceFailed(ex);
+ Logger.warn(LOG_TAG, "onFetchFailed. Calling for immediate stop.", ex);
numFetchFailed.incrementAndGet();
+ if (ex instanceof ReflowIsNecessaryException) {
+ setReflowException((ReflowIsNecessaryException) ex);
+ }
+ // Sink will be informed once consumer finishes.
this.consumer.halt();
delegate.onFlowFetchFailed(this, ex);
}
@Override
public void onFetchedRecord(Record record) {
numFetched.incrementAndGet();
this.toProcess.add(record);
@@ -241,38 +250,80 @@ public class RecordsChannel implements
}
@Override
public void onRecordStoreSucceeded(String guid) {
Logger.trace(LOG_TAG, "Stored record with guid " + guid);
this.consumer.stored();
}
+ @Override
+ public void consumerIsDoneFull() {
+ Logger.trace(LOG_TAG, "Consumer is done, processed all records. Are we waiting for it? " + waitingForQueueDone);
+ if (waitingForQueueDone) {
+ waitingForQueueDone = false;
+
+ // Now we'll be waiting for sink to call its delegate's onStoreCompleted or onStoreFailed.
+ this.sink.storeDone();
+ }
+ }
@Override
- public void consumerIsDone(boolean allRecordsQueued) {
- Logger.trace(LOG_TAG, "Consumer is done. Are we waiting for it? " + waitingForQueueDone);
+ public void consumerIsDonePartial() {
+ Logger.trace(LOG_TAG, "Consumer is done, processed some records. Are we waiting for it? " + waitingForQueueDone);
if (waitingForQueueDone) {
waitingForQueueDone = false;
- if (!allRecordsQueued) {
- this.sink.storeIncomplete();
- }
- this.sink.storeDone(); // Now we'll be waiting for onStoreCompleted.
+
+ // Let sink clean up or flush records if necessary.
+ this.sink.storeIncomplete();
+
+ delegate.onFlowCompleted(this, fetchEnd, System.currentTimeMillis());
}
}
@Override
public void onStoreCompleted(long storeEnd) {
Logger.trace(LOG_TAG, "onStoreCompleted. Notifying delegate of onFlowCompleted. " +
"Fetch end is " + fetchEnd + ", store end is " + storeEnd);
// Source might have used caches used to facilitate flow of records, so now is a good
// time to clean up. Particularly pertinent for buffered sources.
+ // Rephrasing this in a more concrete way, buffers are cleared only once records have been merged
+ // locally and results of the merge have been uploaded to the server successfully.
this.source.performCleanup();
- // TODO: synchronize on consumer callback?
delegate.onFlowCompleted(this, fetchEnd, storeEnd);
+
+ }
+
+ @Override
+ public void onStoreFailed(Exception ex) {
+ Logger.warn(LOG_TAG, "onStoreFailed. Calling for immediate stop.", ex);
+ if (ex instanceof ReflowIsNecessaryException) {
+ setReflowException((ReflowIsNecessaryException) ex);
+ }
+
+ // NB: consumer might or might not be running at this point. There are two cases here:
+ // 1) If we're storing records remotely, we might fail due to a 412.
+ // -- we might hit 412 at any point, so consumer might be in either state.
+ // Action: ignore consumer state, we have nothing else to do other to inform our delegate
+ // that we're done with this flow. Based on the reflow exception, it'll determine what to do.
+
+ // 2) If we're storing (merging) records locally, we might fail due to a sync deadline.
+ // -- we might hit a deadline only prior to attempting to merge records,
+ // -- at which point consumer would have finished already, and storeDone was called.
+ // Action: consumer state is known (done), so we can ignore it safely and inform our delegate
+ // that we're done.
+
+ // Prevent "once consumer is done..." actions from taking place. They already have (case 2), or
+ // we don't need them (case 1).
+ waitingForQueueDone = false;
+
+ // If consumer is still going at it, tell it to stop.
+ this.consumer.halt();
+
+ delegate.onFlowCompleted(this, fetchEnd, System.currentTimeMillis());
}
@Override
public void onBeginFailed(Exception ex) {
delegate.onFlowBeginFailed(this, ex);
}
@Override
@@ -305,9 +356,22 @@ public class RecordsChannel implements
return new DeferredRepositorySessionBeginDelegate(this, executor);
}
@Override
public RepositorySessionFetchRecordsDelegate deferredFetchDelegate(ExecutorService executor) {
// Lie outright. We know that all of our fetch methods are safe.
return this;
}
+
+ @Nullable
+ /* package-local */ synchronized ReflowIsNecessaryException getReflowException() {
+ return reflowException;
+ }
+
+ private synchronized void setReflowException(@NonNull ReflowIsNecessaryException e) {
+ // It is a mistake to set reflow exception multiple times.
+ if (reflowException != null) {
+ throw new IllegalStateException("Reflow exception already set: " + reflowException);
+ }
+ reflowException = e;
+ }
}
--- a/mobile/android/services/src/main/java/org/mozilla/gecko/sync/synchronizer/RecordsConsumerDelegate.java
+++ b/mobile/android/services/src/main/java/org/mozilla/gecko/sync/synchronizer/RecordsConsumerDelegate.java
@@ -4,20 +4,24 @@
package org.mozilla.gecko.sync.synchronizer;
import java.util.concurrent.ConcurrentLinkedQueue;
import org.mozilla.gecko.sync.repositories.domain.Record;
interface RecordsConsumerDelegate {
- public abstract ConcurrentLinkedQueue<Record> getQueue();
+ ConcurrentLinkedQueue<Record> getQueue();
/**
* Called when no more items will be processed.
- * If forced is true, the consumer is terminating because it was told to halt;
- * not all items will necessarily have been processed.
- * If forced is false, the consumer has invoked store and received an onStoreCompleted callback.
- * @param forced
+ * Indicates that all items have been processed.
*/
- public abstract void consumerIsDone(boolean forced);
- public abstract void store(Record record);
+ void consumerIsDoneFull();
+
+ /**
+ * Called when no more items will be processed.
+ * Indicates that only some of the items have been processed.
+ */
+ void consumerIsDonePartial();
+
+ void store(Record record);
}
\ No newline at end of file
--- a/mobile/android/services/src/main/java/org/mozilla/gecko/sync/synchronizer/ServerLocalSynchronizerSession.java
+++ b/mobile/android/services/src/main/java/org/mozilla/gecko/sync/synchronizer/ServerLocalSynchronizerSession.java
@@ -1,15 +1,16 @@
/* This Source Code Form is subject to the terms of the Mozilla Public
* License, v. 2.0. If a copy of the MPL was not distributed with this
* file, You can obtain one at http://mozilla.org/MPL/2.0/. */
package org.mozilla.gecko.sync.synchronizer;
import org.mozilla.gecko.background.common.log.Logger;
+import org.mozilla.gecko.sync.ReflowIsNecessaryException;
import org.mozilla.gecko.sync.repositories.FetchFailedException;
import org.mozilla.gecko.sync.repositories.StoreFailedException;
/**
* A <code>SynchronizerSession</code> designed to be used between a remote
* server and a local repository.
* <p>
* Handles failure cases as follows (in the order they will occur during a sync):
@@ -24,16 +25,25 @@ public class ServerLocalSynchronizerSess
protected static final String LOG_TAG = "ServLocSynchronizerSess";
public ServerLocalSynchronizerSession(Synchronizer synchronizer, SynchronizerSessionDelegate delegate) {
super(synchronizer, delegate);
}
@Override
public void onFirstFlowCompleted(RecordsChannel recordsChannel, long fetchEnd, long storeEnd) {
+ // If a "reflow exception" was thrown, consider this synchronization failed.
+ final ReflowIsNecessaryException reflowException = recordsChannel.getReflowException();
+ if (reflowException != null) {
+ final String message = "Reflow is necessary: " + reflowException;
+ Logger.warn(LOG_TAG, message + " Aborting session.");
+ delegate.onSynchronizeFailed(this, reflowException, message);
+ return;
+ }
+
// Fetch failures always abort.
int numRemoteFetchFailed = recordsChannel.getFetchFailureCount();
if (numRemoteFetchFailed > 0) {
final String message = "Got " + numRemoteFetchFailed + " failures fetching remote records!";
Logger.warn(LOG_TAG, message + " Aborting session.");
delegate.onSynchronizeFailed(this, new FetchFailedException(), message);
return;
}
@@ -48,16 +58,25 @@ public class ServerLocalSynchronizerSess
Logger.trace(LOG_TAG, "No failures storing local records.");
}
super.onFirstFlowCompleted(recordsChannel, fetchEnd, storeEnd);
}
@Override
public void onSecondFlowCompleted(RecordsChannel recordsChannel, long fetchEnd, long storeEnd) {
+ // If a "reflow exception" was thrown, consider this synchronization failed.
+ final ReflowIsNecessaryException reflowException = recordsChannel.getReflowException();
+ if (reflowException != null) {
+ final String message = "Reflow is necessary: " + reflowException;
+ Logger.warn(LOG_TAG, message + " Aborting session.");
+ delegate.onSynchronizeFailed(this, reflowException, message);
+ return;
+ }
+
// Fetch failures always abort.
int numLocalFetchFailed = recordsChannel.getFetchFailureCount();
if (numLocalFetchFailed > 0) {
final String message = "Got " + numLocalFetchFailed + " failures fetching local records!";
Logger.warn(LOG_TAG, message + " Aborting session.");
delegate.onSynchronizeFailed(this, new FetchFailedException(), message);
return;
}
--- a/mobile/android/tests/background/junit3/src/org/mozilla/gecko/background/db/TestBookmarks.java
+++ b/mobile/android/tests/background/junit3/src/org/mozilla/gecko/background/db/TestBookmarks.java
@@ -741,16 +741,21 @@ public class TestBookmarks extends Andro
}
}
finishAndNotify(session);
}
@Override
public void onRecordStoreSucceeded(String guid) {
}
+
+ @Override
+ public void onStoreFailed(Exception e) {
+
+ }
};
session.setStoreDelegate(storeDelegate);
for (BookmarkRecord record : records) {
try {
session.store(record);
} catch (NoStoreDelegateException e) {
// Never happens.
}
--- a/mobile/android/tests/background/junit3/src/org/mozilla/gecko/background/sync/TestStoreTracking.java
+++ b/mobile/android/tests/background/junit3/src/org/mozilla/gecko/background/sync/TestStoreTracking.java
@@ -116,16 +116,21 @@ public class TestStoreTracking extends A
public void onBatchCompleted() {
}
});
} catch (InactiveSessionException e) {
performNotify(e);
}
}
+
+ @Override
+ public void onStoreFailed(Exception e) {
+
+ }
};
session.setStoreDelegate(storeDelegate);
try {
Logger.debug(getName(), "Storing...");
session.store(record);
session.storeDone();
} catch (NoStoreDelegateException e) {
--- a/mobile/android/tests/background/junit3/src/org/mozilla/gecko/background/sync/helpers/DefaultStoreDelegate.java
+++ b/mobile/android/tests/background/junit3/src/org/mozilla/gecko/background/sync/helpers/DefaultStoreDelegate.java
@@ -6,30 +6,35 @@ package org.mozilla.gecko.background.syn
import java.util.concurrent.ExecutorService;
import org.mozilla.gecko.sync.repositories.delegates.RepositorySessionStoreDelegate;
public class DefaultStoreDelegate extends DefaultDelegate implements RepositorySessionStoreDelegate {
@Override
public void onRecordStoreFailed(Exception ex, String guid) {
- performNotify("Store failed", ex);
+ performNotify("Record store failed", ex);
}
@Override
public void onRecordStoreSucceeded(String guid) {
performNotify("DefaultStoreDelegate used", null);
}
@Override
public void onStoreCompleted(long storeEnd) {
performNotify("DefaultStoreDelegate used", null);
}
@Override
+ public void onStoreFailed(Exception ex) {
+ performNotify("Store failed", ex);
+ }
+
+ @Override
public RepositorySessionStoreDelegate deferredStoreDelegate(final ExecutorService executor) {
final RepositorySessionStoreDelegate self = this;
return new RepositorySessionStoreDelegate() {
@Override
public void onRecordStoreSucceeded(final String guid) {
executor.execute(new Runnable() {
@Override
@@ -55,16 +60,21 @@ public class DefaultStoreDelegate extend
@Override
public void run() {
self.onStoreCompleted(storeEnd);
}
});
}
@Override
+ public void onStoreFailed(Exception e) {
+
+ }
+
+ @Override
public RepositorySessionStoreDelegate deferredStoreDelegate(ExecutorService newExecutor) {
if (newExecutor == executor) {
return this;
}
throw new IllegalArgumentException("Can't re-defer this delegate.");
}
};
}
--- a/mobile/android/tests/background/junit3/src/org/mozilla/gecko/background/testhelpers/DefaultGlobalSessionCallback.java
+++ b/mobile/android/tests/background/junit3/src/org/mozilla/gecko/background/testhelpers/DefaultGlobalSessionCallback.java
@@ -24,16 +24,22 @@ public class DefaultGlobalSessionCallbac
public void informUpgradeRequiredResponse(GlobalSession session) {
}
@Override
public void informMigrated(GlobalSession globalSession) {
}
@Override
+ public void handleIncompleteStage(Stage currentState,
+ GlobalSession globalSession) {
+
+ }
+
+ @Override
public void handleAborted(GlobalSession globalSession, String reason) {
}
@Override
public void handleError(GlobalSession globalSession, Exception ex) {
}
@Override
--- a/mobile/android/tests/background/junit4/src/org/mozilla/android/sync/test/SynchronizerHelpers.java
+++ b/mobile/android/tests/background/junit4/src/org/mozilla/android/sync/test/SynchronizerHelpers.java
@@ -1,16 +1,18 @@
/* Any copyright is dedicated to the Public Domain.
http://creativecommons.org/publicdomain/zero/1.0/ */
package org.mozilla.android.sync.test;
import android.content.Context;
import org.mozilla.gecko.background.common.log.Logger;
import org.mozilla.gecko.background.testhelpers.WBORepository;
+import org.mozilla.gecko.sync.CollectionConcurrentModificationException;
+import org.mozilla.gecko.sync.SyncDeadlineReachedException;
import org.mozilla.gecko.sync.repositories.FetchFailedException;
import org.mozilla.gecko.sync.repositories.InactiveSessionException;
import org.mozilla.gecko.sync.repositories.InvalidSessionTransitionException;
import org.mozilla.gecko.sync.repositories.NoStoreDelegateException;
import org.mozilla.gecko.sync.repositories.StoreFailedException;
import org.mozilla.gecko.sync.repositories.delegates.RepositorySessionBeginDelegate;
import org.mozilla.gecko.sync.repositories.delegates.RepositorySessionCreationDelegate;
import org.mozilla.gecko.sync.repositories.delegates.RepositorySessionFetchRecordsDelegate;
@@ -18,32 +20,60 @@ import org.mozilla.gecko.sync.repositori
import org.mozilla.gecko.sync.repositories.domain.Record;
import java.util.ArrayList;
import java.util.concurrent.ExecutorService;
public class SynchronizerHelpers {
public static final String FAIL_SENTINEL = "Fail";
+ enum FailMode {
+ COLLECTION_MODIFIED,
+ DEADLINE_REACHED,
+ FETCH,
+ STORE
+ }
+
+ private static Exception getFailException(FailMode failMode) {
+ switch (failMode) {
+ case COLLECTION_MODIFIED:
+ return new CollectionConcurrentModificationException();
+ case DEADLINE_REACHED:
+ return new SyncDeadlineReachedException();
+ case FETCH:
+ return new FetchFailedException();
+ case STORE:
+ return new StoreFailedException();
+ default:
+ throw new IllegalStateException();
+ }
+ }
+
/**
* Store one at a time, failing if the guid contains FAIL_SENTINEL.
*/
public static class FailFetchWBORepository extends WBORepository {
+ private final FailMode failMode;
+
+ public FailFetchWBORepository(FailMode failMode) {
+ this.failMode = failMode;
+ }
+
@Override
public void createSession(RepositorySessionCreationDelegate delegate,
Context context) {
delegate.deferredCreationDelegate().onSessionCreated(new WBORepositorySession(this) {
@Override
public void fetchSince(long timestamp,
final RepositorySessionFetchRecordsDelegate delegate) {
super.fetchSince(timestamp, new RepositorySessionFetchRecordsDelegate() {
@Override
public void onFetchedRecord(Record record) {
if (record.guid.contains(FAIL_SENTINEL)) {
- delegate.onFetchFailed(new FetchFailedException());
+ delegate.onFetchFailed(getFailException(failMode));
} else {
delegate.onFetchedRecord(record);
}
}
@Override
public void onFetchFailed(Exception ex) {
delegate.onFetchFailed(ex);
@@ -68,27 +98,38 @@ public class SynchronizerHelpers {
});
}
}
/**
* Store one at a time, failing if the guid contains FAIL_SENTINEL.
*/
public static class SerialFailStoreWBORepository extends WBORepository {
+ private final FailMode failMode;
+
+ public SerialFailStoreWBORepository(FailMode failMode) {
+ this.failMode = failMode;
+ }
+
@Override
public void createSession(RepositorySessionCreationDelegate delegate,
Context context) {
delegate.deferredCreationDelegate().onSessionCreated(new WBORepositorySession(this) {
@Override
public void store(final Record record) throws NoStoreDelegateException {
if (storeDelegate == null) {
throw new NoStoreDelegateException();
}
if (record.guid.contains(FAIL_SENTINEL)) {
- storeDelegate.onRecordStoreFailed(new StoreFailedException(), record.guid);
+ Exception ex = getFailException(failMode);
+ if (ex instanceof CollectionConcurrentModificationException) {
+ storeDelegate.onStoreFailed(ex);
+ } else {
+ storeDelegate.onRecordStoreFailed(ex, record.guid);
+ }
} else {
super.store(record);
}
}
});
}
}
--- a/mobile/android/tests/background/junit4/src/org/mozilla/android/sync/test/TestRecordsChannel.java
+++ b/mobile/android/tests/background/junit4/src/org/mozilla/android/sync/test/TestRecordsChannel.java
@@ -1,108 +1,101 @@
/* Any copyright is dedicated to the Public Domain.
http://creativecommons.org/publicdomain/zero/1.0/ */
package org.mozilla.android.sync.test;
+import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.mozilla.android.sync.test.SynchronizerHelpers.FailFetchWBORepository;
import org.mozilla.android.sync.test.helpers.ExpectSuccessRepositorySessionCreationDelegate;
import org.mozilla.android.sync.test.helpers.ExpectSuccessRepositorySessionFinishDelegate;
import org.mozilla.gecko.background.testhelpers.TestRunner;
import org.mozilla.gecko.background.testhelpers.WBORepository;
import org.mozilla.gecko.background.testhelpers.WaitHelper;
+import org.mozilla.gecko.sync.CollectionConcurrentModificationException;
+import org.mozilla.gecko.sync.SyncDeadlineReachedException;
import org.mozilla.gecko.sync.repositories.InactiveSessionException;
import org.mozilla.gecko.sync.repositories.InvalidSessionTransitionException;
-import org.mozilla.gecko.sync.repositories.Repository;
import org.mozilla.gecko.sync.repositories.RepositorySession;
import org.mozilla.gecko.sync.repositories.RepositorySessionBundle;
import org.mozilla.gecko.sync.repositories.domain.BookmarkRecord;
import org.mozilla.gecko.sync.synchronizer.RecordsChannel;
import org.mozilla.gecko.sync.synchronizer.RecordsChannelDelegate;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;
@RunWith(TestRunner.class)
public class TestRecordsChannel {
- protected WBORepository remote;
- protected WBORepository local;
+ private WBORepository sourceRepository;
+ private RepositorySession sourceSession;
+ private WBORepository sinkRepository;
+ private RepositorySession sinkSession;
- protected RepositorySession source;
- protected RepositorySession sink;
- protected RecordsChannelDelegate rcDelegate;
-
- protected AtomicInteger numFlowFetchFailed;
- protected AtomicInteger numFlowStoreFailed;
- protected AtomicInteger numFlowCompleted;
- protected AtomicBoolean flowBeginFailed;
- protected AtomicBoolean flowFinishFailed;
+ private RecordsChannelDelegate rcDelegate;
- public void doFlow(final Repository remote, final Repository local) throws Exception {
- WaitHelper.getTestWaiter().performWait(new Runnable() {
- @Override
- public void run() {
- remote.createSession(new ExpectSuccessRepositorySessionCreationDelegate(WaitHelper.getTestWaiter()) {
- @Override
- public void onSessionCreated(RepositorySession session) {
- source = session;
- local.createSession(new ExpectSuccessRepositorySessionCreationDelegate(WaitHelper.getTestWaiter()) {
- @Override
- public void onSessionCreated(RepositorySession session) {
- sink = session;
- WaitHelper.getTestWaiter().performNotify();
- }
- }, null);
- }
- }, null);
- }
- });
+ private AtomicInteger numFlowFetchFailed;
+ private AtomicInteger numFlowStoreFailed;
+ private AtomicInteger numFlowCompleted;
+ private AtomicBoolean flowBeginFailed;
+ private AtomicBoolean flowFinishFailed;
- assertNotNull(source);
- assertNotNull(sink);
+ private volatile RecordsChannel recordsChannel;
+ private volatile Exception fetchException;
+ private volatile Exception storeException;
+ @Before
+ public void setUp() throws Exception {
numFlowFetchFailed = new AtomicInteger(0);
numFlowStoreFailed = new AtomicInteger(0);
numFlowCompleted = new AtomicInteger(0);
flowBeginFailed = new AtomicBoolean(false);
flowFinishFailed = new AtomicBoolean(false);
+ // Repositories and sessions will be set/created by tests.
+ sourceRepository = null;
+ sourceSession = null;
+ sinkRepository = null;
+ sinkSession = null;
+
rcDelegate = new RecordsChannelDelegate() {
@Override
public void onFlowFetchFailed(RecordsChannel recordsChannel, Exception ex) {
numFlowFetchFailed.incrementAndGet();
+ fetchException = ex;
}
@Override
public void onFlowStoreFailed(RecordsChannel recordsChannel, Exception ex, String recordGuid) {
numFlowStoreFailed.incrementAndGet();
+ storeException = ex;
}
@Override
public void onFlowFinishFailed(RecordsChannel recordsChannel, Exception ex) {
flowFinishFailed.set(true);
WaitHelper.getTestWaiter().performNotify();
}
@Override
public void onFlowCompleted(RecordsChannel recordsChannel, long fetchEnd, long storeEnd) {
numFlowCompleted.incrementAndGet();
try {
- sink.finish(new ExpectSuccessRepositorySessionFinishDelegate(WaitHelper.getTestWaiter()) {
+ sinkSession.finish(new ExpectSuccessRepositorySessionFinishDelegate(WaitHelper.getTestWaiter()) {
@Override
public void onFinishSucceeded(RepositorySession session, RepositorySessionBundle bundle) {
try {
- source.finish(new ExpectSuccessRepositorySessionFinishDelegate(WaitHelper.getTestWaiter()) {
+ sourceSession.finish(new ExpectSuccessRepositorySessionFinishDelegate(WaitHelper.getTestWaiter()) {
@Override
public void onFinishSucceeded(RepositorySession session, RepositorySessionBundle bundle) {
performNotify();
}
});
} catch (InactiveSessionException e) {
WaitHelper.getTestWaiter().performNotify(e);
}
@@ -114,116 +107,230 @@ public class TestRecordsChannel {
}
@Override
public void onFlowBeginFailed(RecordsChannel recordsChannel, Exception ex) {
flowBeginFailed.set(true);
WaitHelper.getTestWaiter().performNotify();
}
};
+ }
- final RecordsChannel rc = new RecordsChannel(source, sink, rcDelegate);
+ private void createSessions() {
+ WaitHelper.getTestWaiter().performWait(new Runnable() {
+ @Override
+ public void run() {
+ sourceRepository.createSession(new ExpectSuccessRepositorySessionCreationDelegate(WaitHelper.getTestWaiter()) {
+ @Override
+ public void onSessionCreated(RepositorySession session) {
+ sourceSession = session;
+ sinkRepository.createSession(new ExpectSuccessRepositorySessionCreationDelegate(WaitHelper.getTestWaiter()) {
+ @Override
+ public void onSessionCreated(RepositorySession session) {
+ sinkSession = session;
+ WaitHelper.getTestWaiter().performNotify();
+ }
+ }, null);
+ }
+ }, null);
+ }
+ });
+ }
+
+ public void doFlow() throws Exception {
+ createSessions();
+ assertNotNull(sourceSession);
+ assertNotNull(sinkSession);
+ recordsChannel = new RecordsChannel(sourceSession, sinkSession, rcDelegate);
WaitHelper.getTestWaiter().performWait(new Runnable() {
@Override
public void run() {
try {
- rc.beginAndFlow();
+ recordsChannel.beginAndFlow();
} catch (InvalidSessionTransitionException e) {
WaitHelper.getTestWaiter().performNotify(e);
}
}
});
}
+ // NB: records in WBORepository are stored in a HashMap, so don't assume an order.
public static final BookmarkRecord[] inbounds = new BookmarkRecord[] {
new BookmarkRecord("inboundSucc1", "bookmarks", 1, false),
new BookmarkRecord("inboundSucc2", "bookmarks", 1, false),
new BookmarkRecord("inboundFail1", "bookmarks", 1, false),
new BookmarkRecord("inboundSucc3", "bookmarks", 1, false),
new BookmarkRecord("inboundSucc4", "bookmarks", 1, false),
new BookmarkRecord("inboundFail2", "bookmarks", 1, false),
};
public static final BookmarkRecord[] outbounds = new BookmarkRecord[] {
new BookmarkRecord("outboundSucc1", "bookmarks", 1, false),
new BookmarkRecord("outboundSucc2", "bookmarks", 1, false),
new BookmarkRecord("outboundSucc3", "bookmarks", 1, false),
+ new BookmarkRecord("outboundFail6", "bookmarks", 1, false),
new BookmarkRecord("outboundSucc4", "bookmarks", 1, false),
new BookmarkRecord("outboundSucc5", "bookmarks", 1, false),
- new BookmarkRecord("outboundFail6", "bookmarks", 1, false),
};
protected WBORepository empty() {
WBORepository repo = new SynchronizerHelpers.TrackingWBORepository();
return repo;
}
protected WBORepository full() {
WBORepository repo = new SynchronizerHelpers.TrackingWBORepository();
for (BookmarkRecord outbound : outbounds) {
repo.wbos.put(outbound.guid, outbound);
}
return repo;
}
- protected WBORepository failingFetch() {
- WBORepository repo = new FailFetchWBORepository();
+ protected WBORepository failingFetch(SynchronizerHelpers.FailMode failMode) {
+ WBORepository repo = new FailFetchWBORepository(failMode);
+
for (BookmarkRecord outbound : outbounds) {
repo.wbos.put(outbound.guid, outbound);
}
return repo;
}
@Test
public void testSuccess() throws Exception {
- WBORepository source = full();
- WBORepository sink = empty();
- doFlow(source, sink);
+ sourceRepository = full();
+ sinkRepository = empty();
+ doFlow();
assertEquals(1, numFlowCompleted.get());
assertEquals(0, numFlowFetchFailed.get());
assertEquals(0, numFlowStoreFailed.get());
- assertEquals(source.wbos, sink.wbos);
+ assertEquals(sourceRepository.wbos, sinkRepository.wbos);
+ assertEquals(0, recordsChannel.getFetchFailureCount());
+ assertEquals(0, recordsChannel.getStoreFailureCount());
+ assertEquals(6, recordsChannel.getStoreCount());
}
@Test
public void testFetchFail() throws Exception {
- WBORepository source = failingFetch();
- WBORepository sink = empty();
- doFlow(source, sink);
+ sourceRepository = failingFetch(SynchronizerHelpers.FailMode.FETCH);
+ sinkRepository = empty();
+ doFlow();
+ assertEquals(1, numFlowCompleted.get());
+ assertTrue(numFlowFetchFailed.get() > 0);
+ assertEquals(0, numFlowStoreFailed.get());
+ assertTrue(sinkRepository.wbos.size() < 6);
+ assertTrue(recordsChannel.getFetchFailureCount() > 0);
+ assertEquals(0, recordsChannel.getStoreFailureCount());
+ assertTrue(recordsChannel.getStoreCount() < 6);
+ }
+
+ @Test
+ public void testStoreFetchFailedCollectionModified() throws Exception {
+ sourceRepository = failingFetch(SynchronizerHelpers.FailMode.COLLECTION_MODIFIED);
+ sinkRepository = empty();
+ doFlow();
assertEquals(1, numFlowCompleted.get());
assertTrue(numFlowFetchFailed.get() > 0);
assertEquals(0, numFlowStoreFailed.get());
- assertTrue(sink.wbos.size() < 6);
+ assertTrue(sinkRepository.wbos.size() < 6);
+
+ assertTrue(recordsChannel.getFetchFailureCount() > 0);
+ assertEquals(0, recordsChannel.getStoreFailureCount());
+ assertTrue(recordsChannel.getStoreCount() < sourceRepository.wbos.size());
+
+ assertEquals(CollectionConcurrentModificationException.class, fetchException.getClass());
+ final Exception ex = recordsChannel.getReflowException();
+ assertNotNull(ex);
+ assertEquals(CollectionConcurrentModificationException.class, ex.getClass());
+ }
+
+ @Test
+ public void testStoreFetchFailedDeadline() throws Exception {
+ sourceRepository = failingFetch(SynchronizerHelpers.FailMode.DEADLINE_REACHED);
+ sinkRepository = empty();
+ doFlow();
+ assertEquals(1, numFlowCompleted.get());
+ assertTrue(numFlowFetchFailed.get() > 0);
+ assertEquals(0, numFlowStoreFailed.get());
+ assertTrue(sinkRepository.wbos.size() < 6);
+
+ assertTrue(recordsChannel.getFetchFailureCount() > 0);
+ assertEquals(0, recordsChannel.getStoreFailureCount());
+ assertTrue(recordsChannel.getStoreCount() < sourceRepository.wbos.size());
+
+ assertEquals(SyncDeadlineReachedException.class, fetchException.getClass());
+ final Exception ex = recordsChannel.getReflowException();
+ assertNotNull(ex);
+ assertEquals(SyncDeadlineReachedException.class, ex.getClass());
}
@Test
public void testStoreSerialFail() throws Exception {
- WBORepository source = full();
- WBORepository sink = new SynchronizerHelpers.SerialFailStoreWBORepository();
- doFlow(source, sink);
+ sourceRepository = full();
+ sinkRepository = new SynchronizerHelpers.SerialFailStoreWBORepository(
+ SynchronizerHelpers.FailMode.STORE);
+ doFlow();
assertEquals(1, numFlowCompleted.get());
assertEquals(0, numFlowFetchFailed.get());
assertEquals(1, numFlowStoreFailed.get());
- assertEquals(5, sink.wbos.size());
+ // We will fail to store one of the records but expect flow to continue.
+ assertEquals(5, sinkRepository.wbos.size());
+
+ assertEquals(0, recordsChannel.getFetchFailureCount());
+ assertEquals(1, recordsChannel.getStoreFailureCount());
+ // Number of store attempts.
+ assertEquals(sourceRepository.wbos.size(), recordsChannel.getStoreCount());
+ }
+
+ @Test
+ public void testStoreSerialFailCollectionModified() throws Exception {
+ sourceRepository = full();
+ sinkRepository = new SynchronizerHelpers.SerialFailStoreWBORepository(
+ SynchronizerHelpers.FailMode.COLLECTION_MODIFIED);
+ doFlow();
+ assertEquals(1, numFlowCompleted.get());
+ assertEquals(0, numFlowFetchFailed.get());
+ assertEquals(1, numFlowStoreFailed.get());
+ // One of the records will fail, at which point we'll stop flowing them.
+ final int sunkenRecords = sinkRepository.wbos.size();
+ assertTrue(sunkenRecords > 0 && sunkenRecords < 6);
+
+ assertEquals(0, recordsChannel.getFetchFailureCount());
+ // RecordChannel's storeFail count is only incremented for failures of individual records.
+ assertEquals(0, recordsChannel.getStoreFailureCount());
+
+ assertEquals(CollectionConcurrentModificationException.class, storeException.getClass());
+ final Exception ex = recordsChannel.getReflowException();
+ assertNotNull(ex);
+ assertEquals(CollectionConcurrentModificationException.class, ex.getClass());
}
@Test
public void testStoreBatchesFail() throws Exception {
- WBORepository source = full();
- WBORepository sink = new SynchronizerHelpers.BatchFailStoreWBORepository(3);
- doFlow(source, sink);
+ sourceRepository = full();
+ sinkRepository = new SynchronizerHelpers.BatchFailStoreWBORepository(3);
+ doFlow();
assertEquals(1, numFlowCompleted.get());
assertEquals(0, numFlowFetchFailed.get());
assertEquals(3, numFlowStoreFailed.get()); // One batch fails.
- assertEquals(3, sink.wbos.size()); // One batch succeeds.
+ assertEquals(3, sinkRepository.wbos.size()); // One batch succeeds.
+
+ assertEquals(0, recordsChannel.getFetchFailureCount());
+ assertEquals(3, recordsChannel.getStoreFailureCount());
+ // Number of store attempts.
+ assertEquals(sourceRepository.wbos.size(), recordsChannel.getStoreCount());
}
@Test
public void testStoreOneBigBatchFail() throws Exception {
- WBORepository source = full();
- WBORepository sink = new SynchronizerHelpers.BatchFailStoreWBORepository(50);
- doFlow(source, sink);
+ sourceRepository = full();
+ sinkRepository = new SynchronizerHelpers.BatchFailStoreWBORepository(50);
+ doFlow();
assertEquals(1, numFlowCompleted.get());
assertEquals(0, numFlowFetchFailed.get());
assertEquals(6, numFlowStoreFailed.get()); // One (big) batch fails.
- assertEquals(0, sink.wbos.size()); // No batches succeed.
+ assertEquals(0, sinkRepository.wbos.size()); // No batches succeed.
+
+ assertEquals(0, recordsChannel.getFetchFailureCount());
+ assertEquals(6, recordsChannel.getStoreFailureCount());
+ // Number of store attempts.
+ assertEquals(sourceRepository.wbos.size(), recordsChannel.getStoreCount());
}
}
--- a/mobile/android/tests/background/junit4/src/org/mozilla/android/sync/test/TestServer15RepositorySession.java
+++ b/mobile/android/tests/background/junit4/src/org/mozilla/android/sync/test/TestServer15RepositorySession.java
@@ -7,26 +7,28 @@ import android.os.SystemClock;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.mozilla.android.sync.test.SynchronizerHelpers.TrackingWBORepository;
import org.mozilla.android.sync.test.helpers.BaseTestStorageRequestDelegate;
import org.mozilla.android.sync.test.helpers.HTTPServerTestHelper;
import org.mozilla.android.sync.test.helpers.MockServer;
import org.mozilla.gecko.background.testhelpers.TestRunner;
+import org.mozilla.gecko.sync.CollectionConcurrentModificationException;
import org.mozilla.gecko.sync.InfoCollections;
import org.mozilla.gecko.sync.InfoConfiguration;
import org.mozilla.gecko.sync.Utils;
import org.mozilla.gecko.sync.crypto.KeyBundle;
import org.mozilla.gecko.sync.middleware.Crypto5MiddlewareRepository;
import org.mozilla.gecko.sync.net.AuthHeaderProvider;
import org.mozilla.gecko.sync.net.BaseResource;
import org.mozilla.gecko.sync.net.BasicAuthHeaderProvider;
import org.mozilla.gecko.sync.net.SyncStorageResponse;
import org.mozilla.gecko.sync.repositories.FetchFailedException;
+import org.mozilla.gecko.sync.repositories.NonPersistentRepositoryStateProvider;
import org.mozilla.gecko.sync.repositories.Server15Repository;
import org.mozilla.gecko.sync.repositories.StoreFailedException;
import org.mozilla.gecko.sync.repositories.domain.BookmarkRecord;
import org.mozilla.gecko.sync.repositories.domain.BookmarkRecordFactory;
import org.mozilla.gecko.sync.synchronizer.ServerLocalSynchronizer;
import org.mozilla.gecko.sync.synchronizer.Synchronizer;
import org.simpleframework.http.ContentType;
import org.simpleframework.http.Request;
@@ -107,17 +109,18 @@ public class TestServer15RepositorySessi
}
protected Exception doSynchronize(MockServer server) throws Exception {
final String COLLECTION = "test";
final TrackingWBORepository local = getLocal(100);
final Server15Repository remote = new Server15Repository(
COLLECTION, SystemClock.elapsedRealtime() + TimeUnit.MINUTES.toMillis(30),
- getCollectionURL(COLLECTION), authHeaderProvider, infoCollections, infoConfiguration);
+ getCollectionURL(COLLECTION), authHeaderProvider, infoCollections, infoConfiguration,
+ new NonPersistentRepositoryStateProvider());
KeyBundle collectionKey = new KeyBundle(TEST_USERNAME, SYNC_KEY);
Crypto5MiddlewareRepository cryptoRepo = new Crypto5MiddlewareRepository(remote, collectionKey);
cryptoRepo.recordFactory = new BookmarkRecordFactory();
final Synchronizer synchronizer = new ServerLocalSynchronizer();
synchronizer.repositoryA = cryptoRepo;
synchronizer.repositoryB = local;
@@ -138,16 +141,24 @@ public class TestServer15RepositorySessi
public void testFetchFailure() throws Exception {
MockServer server = new MockServer(404, "error");
Exception e = doSynchronize(server);
assertNotNull(e);
assertEquals(FetchFailedException.class, e.getClass());
}
@Test
+ public void testFetch412Failure() throws Exception {
+ MockServer server = new MockServer(412, "error");
+ Exception e = doSynchronize(server);
+ assertNotNull(e);
+ assertEquals(CollectionConcurrentModificationException.class, e.getClass());
+ }
+
+ @Test
public void testStorePostSuccessWithFailingRecords() throws Exception {
MockServer server = new MockServer(200, "{ modified: \" + " + Utils.millisecondsToDecimalSeconds(System.currentTimeMillis()) + ", " +
"success: []," +
"failed: { outboundFail2: [] } }");
Exception e = doSynchronize(server);
assertNotNull(e);
assertEquals(StoreFailedException.class, e.getClass());
}
--- a/mobile/android/tests/background/junit4/src/org/mozilla/android/sync/test/TestServerLocalSynchronizer.java
+++ b/mobile/android/tests/background/junit4/src/org/mozilla/android/sync/test/TestServerLocalSynchronizer.java
@@ -102,47 +102,47 @@ public class TestServerLocalSynchronizer
assertEquals(12, local.wbos.size());
assertEquals(12, remote.wbos.size());
}
@Test
public void testLocalFetchErrors() {
WBORepository remote = new TrackingWBORepository();
- WBORepository local = new FailFetchWBORepository();
+ WBORepository local = new FailFetchWBORepository(SynchronizerHelpers.FailMode.FETCH);
Synchronizer synchronizer = getSynchronizer(remote, local);
Exception e = doSynchronize(synchronizer);
assertNotNull(e);
assertEquals(FetchFailedException.class, e.getClass());
// Neither session gets finished successfully, so all records are dropped.
assertEquals(6, local.wbos.size());
assertEquals(6, remote.wbos.size());
}
@Test
public void testRemoteFetchErrors() {
- WBORepository remote = new FailFetchWBORepository();
+ WBORepository remote = new FailFetchWBORepository(SynchronizerHelpers.FailMode.FETCH);
WBORepository local = new TrackingWBORepository();
Synchronizer synchronizer = getSynchronizer(remote, local);
Exception e = doSynchronize(synchronizer);
assertNotNull(e);
assertEquals(FetchFailedException.class, e.getClass());
// Neither session gets finished successfully, so all records are dropped.
assertEquals(6, local.wbos.size());
assertEquals(6, remote.wbos.size());
}
@Test
public void testLocalSerialStoreErrorsAreIgnored() {
WBORepository remote = new TrackingWBORepository();
- WBORepository local = new SerialFailStoreWBORepository();
+ WBORepository local = new SerialFailStoreWBORepository(SynchronizerHelpers.FailMode.FETCH);
Synchronizer synchronizer = getSynchronizer(remote, local);
assertNull(doSynchronize(synchronizer));
assertEquals(9, local.wbos.size());
assertEquals(12, remote.wbos.size());
}
@@ -153,17 +153,17 @@ public class TestServerLocalSynchronizer
Synchronizer synchronizer = getSynchronizer(new TrackingWBORepository(), new BatchFailStoreWBORepository(BATCH_SIZE));
Exception e = doSynchronize(synchronizer);
assertNull(e);
}
@Test
public void testRemoteSerialStoreErrorsAreNotIgnored() throws Exception {
- Synchronizer synchronizer = getSynchronizer(new SerialFailStoreWBORepository(), new TrackingWBORepository()); // Tracking so we don't send incoming records back.
+ Synchronizer synchronizer = getSynchronizer(new SerialFailStoreWBORepository(SynchronizerHelpers.FailMode.STORE), new TrackingWBORepository()); // Tracking so we don't send incoming records back.
Exception e = doSynchronize(synchronizer);
assertNotNull(e);
assertEquals(StoreFailedException.class, e.getClass());
}
@Test
public void testRemoteBatchStoreErrorsAreNotIgnoredManyBatches() throws Exception {
--- a/mobile/android/tests/background/junit4/src/org/mozilla/android/sync/test/helpers/ExpectSuccessRepositorySessionStoreDelegate.java
+++ b/mobile/android/tests/background/junit4/src/org/mozilla/android/sync/test/helpers/ExpectSuccessRepositorySessionStoreDelegate.java
@@ -28,12 +28,18 @@ public class ExpectSuccessRepositorySess
}
@Override
public void onStoreCompleted(long storeEnd) {
log("Record store completed at " + storeEnd);
}
@Override
+ public void onStoreFailed(Exception e) {
+ log("Store failed.", e);
+ performNotify(new AssertionFailedError("onStoreFailed: store should not have failed."));
+ }
+
+ @Override
public RepositorySessionStoreDelegate deferredStoreDelegate(ExecutorService executor) {
return this;
}
}
--- a/mobile/android/tests/background/junit4/src/org/mozilla/android/sync/test/helpers/MockGlobalSessionCallback.java
+++ b/mobile/android/tests/background/junit4/src/org/mozilla/android/sync/test/helpers/MockGlobalSessionCallback.java
@@ -52,16 +52,22 @@ public class MockGlobalSessionCallback i
@Override
public void handleError(GlobalSession globalSession, Exception ex) {
this.calledError = true;
this.calledErrorException = ex;
this.testWaiter().performNotify();
}
@Override
+ public void handleIncompleteStage(Stage currentState,
+ GlobalSession globalSession) {
+
+ }
+
+ @Override
public void handleStageCompleted(Stage currentState,
GlobalSession globalSession) {
stageCounter--;
}
@Override
public void requestBackoff(long backoff) {
this.calledRequestBackoff = true;
--- a/mobile/android/tests/background/junit4/src/org/mozilla/gecko/background/testhelpers/DefaultGlobalSessionCallback.java
+++ b/mobile/android/tests/background/junit4/src/org/mozilla/gecko/background/testhelpers/DefaultGlobalSessionCallback.java
@@ -40,12 +40,18 @@ public class DefaultGlobalSessionCallbac
}
@Override
public void handleStageCompleted(Stage currentState,
GlobalSession globalSession) {
}
@Override
+ public void handleIncompleteStage(Stage currentState,
+ GlobalSession globalSession) {
+
+ }
+
+ @Override
public boolean shouldBackOffStorage() {
return false;
}
}
--- a/mobile/android/tests/background/junit4/src/org/mozilla/gecko/sync/middleware/BufferingMiddlewareRepositorySessionTest.java
+++ b/mobile/android/tests/background/junit4/src/org/mozilla/gecko/sync/middleware/BufferingMiddlewareRepositorySessionTest.java
@@ -76,23 +76,18 @@ public class BufferingMiddlewareReposito
// Ensure inner session doesn't see incoming records.
verify(innerRepositorySession, never()).store(record);
verify(innerRepositorySession, never()).store(record1);
verify(innerRepositorySession, never()).store(record2);
}
@Test
public void storeDone() throws Exception {
- // Verify that storage's flush is called.
- verify(bufferStorageMocked, times(0)).flush();
- bufferingSessionMocked.doStoreDonePrepare();
- verify(bufferStorageMocked, times(1)).flush();
-
// Trivial case, no records to merge.
- bufferingSession.doStoreDone(123L);
+ bufferingSession.doMergeBuffer(123L);
verify(innerRepositorySession, times(1)).storeDone(123L);
verify(innerRepositorySession, never()).store(any(Record.class));
// Reset call counters.
reset(innerRepositorySession);
// Now store some records.
MockRecord record = new MockRecord("guid1", null, 1, false);
@@ -104,17 +99,17 @@ public class BufferingMiddlewareReposito
MockRecord record3 = new MockRecord("guid3", null, 5, false);
bufferingSession.store(record3);
// NB: same guid as above.
MockRecord record4 = new MockRecord("guid3", null, -1, false);
bufferingSession.store(record4);
// Done storing.
- bufferingSession.doStoreDone(123L);
+ bufferingSession.doMergeBuffer(123L);
// Ensure all records where stored in the wrapped session.
verify(innerRepositorySession, times(1)).store(record);
verify(innerRepositorySession, times(1)).store(record2);
verify(innerRepositorySession, times(1)).store(record4);
// Ensure storeDone was called on the wrapped session.
verify(innerRepositorySession, times(1)).storeDone(123L);
@@ -150,42 +145,16 @@ public class BufferingMiddlewareReposito
assertEquals(2, bufferStorage.all().size());
// Test that buffer storage is cleaned up.
bufferingSession.performCleanup();
assertEquals(0, bufferStorage.all().size());
}
@Test
- public void sourceFailed() throws Exception {
- // Source failes before any records have been stored.
- bufferingSession.sourceFailed(new Exception());
- assertEquals(0, bufferStorage.all().size());
-
- // Store some records now.
- MockRecord record = new MockRecord("guid1", null, 1, false);
- bufferingSession.store(record);
-
- MockRecord record2 = new MockRecord("guid2", null, 13, false);
- bufferingSession.store(record2);
-
- MockRecord record3 = new MockRecord("guid3", null, 5, false);
- bufferingSession.store(record3);
-
- // Verify that buffer is intact after source fails.
- bufferingSession.sourceFailed(new Exception());
- assertEquals(3, bufferStorage.all().size());
-
- // Verify that buffer is flushed after source fails.
- verify(bufferStorageMocked, times(0)).flush();
- bufferingSessionMocked.sourceFailed(new Exception());
- verify(bufferStorageMocked, times(1)).flush();
- }
-
- @Test
public void abort() throws Exception {
MockRecord record = new MockRecord("guid1", null, 1, false);
bufferingSession.store(record);
MockRecord record2 = new MockRecord("guid2", null, 13, false);
bufferingSession.store(record2);
MockRecord record3 = new MockRecord("guid3", null, 5, false);
--- a/mobile/android/tests/background/junit4/src/org/mozilla/gecko/sync/repositories/downloaders/BatchingDownloaderDelegateTest.java
+++ b/mobile/android/tests/background/junit4/src/org/mozilla/gecko/sync/repositories/downloaders/BatchingDownloaderDelegateTest.java
@@ -6,16 +6,17 @@ package org.mozilla.gecko.sync.repositor
import android.net.Uri;
import android.os.SystemClock;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.mozilla.gecko.background.testhelpers.TestRunner;
+import org.mozilla.gecko.sync.CollectionConcurrentModificationException;
import org.mozilla.gecko.sync.CryptoRecord;
import org.mozilla.gecko.sync.HTTPFailureException;
import org.mozilla.gecko.sync.InfoCollections;
import org.mozilla.gecko.sync.InfoConfiguration;
import org.mozilla.gecko.sync.net.SyncResponse;
import org.mozilla.gecko.sync.net.SyncStorageCollectionRequest;
import org.mozilla.gecko.sync.net.SyncStorageResponse;
import org.mozilla.gecko.sync.repositories.RepositorySession;
@@ -61,18 +62,18 @@ public class BatchingDownloaderDelegateT
public void onFetchCompleted(SyncStorageResponse response,
final RepositorySessionFetchRecordsDelegate fetchRecordsDelegate,
final SyncStorageCollectionRequest request,
long l, long newerTimestamp, boolean full, String sort, String ids) {
this.isSuccess = true;
}
@Override
- public void onFetchFailed(final Exception ex,
- final RepositorySessionFetchRecordsDelegate fetchRecordsDelegate,
+ public void handleFetchFailed(final RepositorySessionFetchRecordsDelegate fetchRecordsDelegate,
+ final Exception ex,
final SyncStorageCollectionRequest request) {
this.isFailure = true;
this.ex = ex;
}
@Override
public void onFetchedRecord(CryptoRecord record,
RepositorySessionFetchRecordsDelegate fetchRecordsDelegate) {
@@ -169,16 +170,28 @@ public class BatchingDownloaderDelegateT
downloaderDelegate.handleRequestFailure(response);
assertTrue(mockDownloader.isFailure);
assertEquals(HTTPFailureException.class, mockDownloader.ex.getClass());
assertFalse(mockDownloader.isSuccess);
assertFalse(mockDownloader.isFetched);
}
@Test
+ public void testFailure412() throws Exception {
+ BatchingDownloaderDelegate downloaderDelegate = new BatchingDownloaderDelegate(mockDownloader, null,
+ new SyncStorageCollectionRequest(new URI(DEFAULT_COLLECTION_URL)), 0, 0, true, null, null);
+ SyncStorageResponse response = makeSyncStorageResponse(412, null);
+ downloaderDelegate.handleRequestFailure(response);
+ assertTrue(mockDownloader.isFailure);
+ assertEquals(CollectionConcurrentModificationException.class, mockDownloader.ex.getClass());
+ assertFalse(mockDownloader.isSuccess);
+ assertFalse(mockDownloader.isFetched);
+ }
+
+ @Test
public void testFailureRequestError() throws Exception {
BatchingDownloaderDelegate downloaderDelegate = new BatchingDownloaderDelegate(mockDownloader, null,
new SyncStorageCollectionRequest(new URI(DEFAULT_COLLECTION_URL)), 0, 0, true, null, null);
downloaderDelegate.handleRequestError(new ClientProtocolException());
assertTrue(mockDownloader.isFailure);
assertEquals(ClientProtocolException.class, mockDownloader.ex.getClass());
assertFalse(mockDownloader.isSuccess);
assertFalse(mockDownloader.isFetched);
--- a/mobile/android/tests/background/junit4/src/org/mozilla/gecko/sync/repositories/downloaders/BatchingDownloaderTest.java
+++ b/mobile/android/tests/background/junit4/src/org/mozilla/gecko/sync/repositories/downloaders/BatchingDownloaderTest.java
@@ -348,17 +348,17 @@ public class BatchingDownloaderTest {
assertFalse(sessionFetchRecordsDelegate.isFetched);
assertTrue(sessionFetchRecordsDelegate.isFailure);
}
@Test
public void testFailureException() throws Exception {
Exception ex = new IllegalStateException();
SyncStorageCollectionRequest request = new SyncStorageCollectionRequest(new URI("http://dummy.url"));
- mockDownloader.onFetchFailed(ex, sessionFetchRecordsDelegate, request);
+ mockDownloader.handleFetchFailed(sessionFetchRecordsDelegate, ex, request);
assertFalse(sessionFetchRecordsDelegate.isSuccess);
assertFalse(sessionFetchRecordsDelegate.isFetched);
assertTrue(sessionFetchRecordsDelegate.isFailure);
assertEquals(ex.getClass(), sessionFetchRecordsDelegate.ex.getClass());
assertNull(sessionFetchRecordsDelegate.record);
}
--- a/mobile/android/tests/background/junit4/src/org/mozilla/gecko/sync/repositories/uploaders/BatchingUploaderTest.java
+++ b/mobile/android/tests/background/junit4/src/org/mozilla/gecko/sync/repositories/uploaders/BatchingUploaderTest.java
@@ -13,45 +13,119 @@ import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.mozilla.gecko.background.testhelpers.MockRecord;
import org.mozilla.gecko.background.testhelpers.TestRunner;
import org.mozilla.gecko.sync.ExtendedJSONObject;
import org.mozilla.gecko.sync.InfoCollections;
import org.mozilla.gecko.sync.InfoConfiguration;
import org.mozilla.gecko.sync.Utils;
+import org.mozilla.gecko.sync.net.AuthHeaderProvider;
+import org.mozilla.gecko.sync.repositories.RepositorySession;
import org.mozilla.gecko.sync.repositories.Server15Repository;
import org.mozilla.gecko.sync.repositories.Server15RepositorySession;
import org.mozilla.gecko.sync.repositories.delegates.RepositorySessionStoreDelegate;
import java.net.URISyntaxException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
import java.util.Random;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
@RunWith(TestRunner.class)
public class BatchingUploaderTest {
- class MockExecutorService implements Executor {
+ class MockExecutorService implements ExecutorService {
int totalPayloads = 0;
int commitPayloads = 0;
@Override
public void execute(@NonNull Runnable command) {
if (command instanceof PayloadDispatcher.NonPayloadContextRunnable) {
command.run();
return;
}
++totalPayloads;
if (((PayloadDispatcher.BatchContextRunnable) command).isCommit) {
++commitPayloads;
}
command.run();
}
+
+ @Override
+ public void shutdown() {}
+
+ @NonNull
+ @Override
+ public List<Runnable> shutdownNow() {
+ return null;
+ }
+
+ @Override
+ public boolean isShutdown() {
+ return false;
+ }
+
+ @Override
+ public boolean isTerminated() {
+ return false;
+ }
+
+ @Override
+ public boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException {
+ return false;
+ }
+
+ @NonNull
+ @Override
+ public <T> Future<T> submit(Callable<T> task) {
+ return null;
+ }
+
+ @NonNull
+ @Override
+ public <T> Future<T> submit(Runnable task, T result) {
+ return null;
+ }
+
+ @NonNull
+ @Override
+ public Future<?> submit(Runnable task) {
+ return null;
+ }
+
+ @NonNull
+ @Override
+ public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks) throws InterruptedException {
+ return null;
+ }
+
+ @NonNull
+ @Override
+ public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit) throws InterruptedException {
+ return null;
+ }
+
+ @NonNull
+ @Override
+ public <T> T invokeAny(Collection<? extends Callable<T>> tasks) throws InterruptedException, ExecutionException {
+ return null;
+ }
+
+ @Override
+ public <T> T invokeAny(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException {
+ return null;
+ }
}
class MockStoreDelegate implements RepositorySessionStoreDelegate {
int storeFailed = 0;
int storeSucceeded = 0;
int storeCompleted = 0;
@Override
@@ -65,22 +139,27 @@ public class BatchingUploaderTest {
}
@Override
public void onStoreCompleted(long storeEnd) {
++storeCompleted;
}
@Override
+ public void onStoreFailed(Exception e) {
+
+ }
+
+ @Override
public RepositorySessionStoreDelegate deferredStoreDelegate(ExecutorService executor) {
return null;
}
}
- private Executor workQueue;
+ private ExecutorService workQueue;
private RepositorySessionStoreDelegate storeDelegate;
@Before
public void setUp() throws Exception {
workQueue = new MockExecutorService();
storeDelegate = new MockStoreDelegate();
}
@@ -437,21 +516,51 @@ public class BatchingUploaderTest {
infoConfigurationJSON.put(InfoConfiguration.MAX_POST_RECORDS, maxPostRecords);
infoConfigurationJSON.put(InfoConfiguration.MAX_POST_BYTES, 1024L);
infoConfigurationJSON.put(InfoConfiguration.MAX_REQUEST_BYTES, 1024L);
Server15RepositorySession server15RepositorySession = new Server15RepositorySession(
makeCountConstrainedRepository(maxPostRecords, maxTotalRecords, firstSync)
);
server15RepositorySession.setStoreDelegate(storeDelegate);
- return new BatchingUploader(
- server15RepositorySession, workQueue, storeDelegate, Uri.EMPTY, null,
+ return new MockUploader(
+ server15RepositorySession, workQueue, storeDelegate, Uri.EMPTY, 0L,
new InfoConfiguration(infoConfigurationJSON), null);
}
+ class MockPayloadDispatcher extends PayloadDispatcher {
+ MockPayloadDispatcher(final Executor workQueue, final BatchingUploader uploader, Long lastModified) {
+ super(workQueue, uploader, lastModified);
+ }
+
+ @Override
+ Runnable createRecordUploadRunnable(ArrayList<byte[]> outgoing, ArrayList<String> outgoingGuids, long byteCount, boolean isCommit, boolean isLastPayload) {
+ // No-op runnable. We don't want this to actually do any work for these tests.
+ return new Runnable() {
+ @Override
+ public void run() {}
+ };
+ }
+ }
+
+ class MockUploader extends BatchingUploader {
+ MockUploader(final RepositorySession repositorySession, final ExecutorService workQueue,
+ final RepositorySessionStoreDelegate sessionStoreDelegate, final Uri baseCollectionUri,
+ final Long localCollectionLastModified, final InfoConfiguration infoConfiguration,
+ final AuthHeaderProvider authHeaderProvider) {
+ super(repositorySession, workQueue, sessionStoreDelegate, baseCollectionUri,
+ localCollectionLastModified, infoConfiguration, authHeaderProvider);
+ }
+
+ @Override
+ PayloadDispatcher createPayloadDispatcher(ExecutorService workQueue, Long localCollectionLastModified) {
+ return new MockPayloadDispatcher(workQueue, this, localCollectionLastModified);
+ }
+ }
+
private Server15Repository makeCountConstrainedRepository(long maxPostRecords, long maxTotalRecords, boolean firstSync) {
return makeConstrainedRepository(1024, 1024, maxPostRecords, 4096, maxTotalRecords, firstSync);
}
private Server15Repository makeConstrainedRepository(long maxRequestBytes, long maxPostBytes, long maxPostRecords, long maxTotalBytes, long maxTotalRecords, boolean firstSync) {
ExtendedJSONObject infoConfigurationJSON = new ExtendedJSONObject();
infoConfigurationJSON.put(InfoConfiguration.MAX_TOTAL_BYTES, maxTotalBytes);
infoConfigurationJSON.put(InfoConfiguration.MAX_TOTAL_RECORDS, maxTotalRecords);
--- a/mobile/android/tests/background/junit4/src/org/mozilla/gecko/sync/repositories/uploaders/PayloadUploadDelegateTest.java
+++ b/mobile/android/tests/background/junit4/src/org/mozilla/gecko/sync/repositories/uploaders/PayloadUploadDelegateTest.java
@@ -1,43 +1,53 @@
/* Any copyright is dedicated to the Public Domain.
http://creativecommons.org/publicdomain/zero/1.0/ */
package org.mozilla.gecko.sync.repositories.uploaders;
+import android.net.Uri;
+
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.mozilla.gecko.background.testhelpers.TestRunner;
+import org.mozilla.gecko.sync.CollectionConcurrentModificationException;
import org.mozilla.gecko.sync.HTTPFailureException;
+import org.mozilla.gecko.sync.InfoConfiguration;
import org.mozilla.gecko.sync.NonObjectJSONException;
import org.mozilla.gecko.sync.net.AuthHeaderProvider;
import org.mozilla.gecko.sync.net.SyncResponse;
import org.mozilla.gecko.sync.net.SyncStorageResponse;
+import org.mozilla.gecko.sync.repositories.RepositorySession;
+import org.mozilla.gecko.sync.repositories.RepositoryStateProvider;
+import org.mozilla.gecko.sync.repositories.delegates.RepositorySessionStoreDelegate;
import static org.mockito.Mockito.mock;
import java.io.ByteArrayInputStream;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.concurrent.Executor;
+import java.util.concurrent.ExecutorService;
import ch.boye.httpclientandroidlib.HttpResponse;
import ch.boye.httpclientandroidlib.ProtocolVersion;
import ch.boye.httpclientandroidlib.entity.BasicHttpEntity;
import ch.boye.httpclientandroidlib.message.BasicHttpResponse;
import ch.boye.httpclientandroidlib.message.BasicStatusLine;
import static org.junit.Assert.*;
@RunWith(TestRunner.class)
public class PayloadUploadDelegateTest {
private PayloadDispatcher payloadDispatcher;
private AuthHeaderProvider authHeaderProvider;
+ private RepositorySessionStoreDelegate sessionStoreDelegate;
+
class MockPayloadDispatcher extends PayloadDispatcher {
public final HashMap<String, Exception> failedRecords = new HashMap<>();
public boolean didLastPayloadFail = false;
public ArrayList<SyncStorageResponse> successResponses = new ArrayList<>();
public int commitPayloadsSucceeded = 0;
public int lastPayloadsSucceeded = 0;
@@ -61,30 +71,64 @@ public class PayloadUploadDelegateTest {
@Override
public void recordFailed(final String recordGuid) {
recordFailed(new Exception(), recordGuid);
}
@Override
public void recordFailed(final Exception e, final String recordGuid) {
+ recordUploadFailed = true;
failedRecords.put(recordGuid, e);
}
@Override
public void lastPayloadFailed() {
didLastPayloadFail = true;
}
}
+ class MockRepositorySessionStoreDelegate implements RepositorySessionStoreDelegate {
+ Exception storeFailedException;
+ ArrayList<String> succeededGuids = new ArrayList<>();
+ HashMap<String, Exception> failedGuids = new HashMap<>();
+
+ @Override
+ public void onRecordStoreFailed(Exception ex, String recordGuid) {
+ failedGuids.put(recordGuid, ex);
+ }
+
+ @Override
+ public void onRecordStoreSucceeded(String guid) {
+ succeededGuids.add(guid);
+ }
+
+ @Override
+ public void onStoreCompleted(long storeEnd) {
+
+ }
+
+ @Override
+ public void onStoreFailed(Exception e) {
+ storeFailedException = e;
+ }
+
+ @Override
+ public RepositorySessionStoreDelegate deferredStoreDelegate(ExecutorService executor) {
+ return null;
+ }
+ }
+
@Before
public void setUp() throws Exception {
+ sessionStoreDelegate = new MockRepositorySessionStoreDelegate();
+
payloadDispatcher = new MockPayloadDispatcher(
null,
- mock(BatchingUploader.class)
+ new BatchingUploader(mock(RepositorySession.class), null, sessionStoreDelegate, Uri.parse("https://example.com"), 0L, mock(InfoConfiguration.class), mock(AuthHeaderProvider.class))
);
authHeaderProvider = mock(AuthHeaderProvider.class);
}
@Test
public void testHandleRequestSuccessNonSuccess() {
ArrayList<String> postedGuids = new ArrayList<>(2);
@@ -363,16 +407,36 @@ public class PayloadUploadDelegateTest {
assertEquals(HTTPFailureException.class,
((MockPayloadDispatcher) payloadDispatcher).failedRecords.get("testGuid3").getClass());
payloadUploadDelegate = new PayloadUploadDelegate(
authHeaderProvider, payloadDispatcher, postedGuids, false, true);
payloadUploadDelegate.handleRequestFailure(new SyncStorageResponse(response));
assertEquals(3, ((MockPayloadDispatcher) payloadDispatcher).failedRecords.size());
assertTrue(((MockPayloadDispatcher) payloadDispatcher).didLastPayloadFail);
+ assertTrue(payloadDispatcher.recordUploadFailed);
+ }
+
+ @Test
+ public void testHandleRequestFailure412() {
+ ArrayList<String> postedGuids = new ArrayList<>(3);
+ postedGuids.add("testGuid1");
+ postedGuids.add("testGuid2");
+ postedGuids.add("testGuid3");
+ PayloadUploadDelegate payloadUploadDelegate = new PayloadUploadDelegate(authHeaderProvider, payloadDispatcher, postedGuids, false, false);
+
+ final HttpResponse response = new BasicHttpResponse(
+ new BasicStatusLine(new ProtocolVersion("HTTP", 1, 1), 412, "Precondition failed"));
+ payloadUploadDelegate.handleRequestFailure(new SyncStorageResponse(response));
+
+ assertEquals(0, ((MockPayloadDispatcher) payloadDispatcher).failedRecords.size());
+ assertTrue(payloadDispatcher.recordUploadFailed);
+ assertTrue(payloadDispatcher.storeFailed);
+
+ assertTrue(((MockRepositorySessionStoreDelegate) sessionStoreDelegate).storeFailedException instanceof CollectionConcurrentModificationException);
}
@Test
public void testIfUnmodifiedSinceNoLM() {
PayloadUploadDelegate payloadUploadDelegate = new PayloadUploadDelegate(
authHeaderProvider, payloadDispatcher, new ArrayList<String>(), false, false);
assertNull(payloadUploadDelegate.ifUnmodifiedSince());