--- a/mobile/android/services/src/main/java/org/mozilla/gecko/sync/middleware/Crypto5MiddlewareRepositorySession.java
+++ b/mobile/android/services/src/main/java/org/mozilla/gecko/sync/middleware/Crypto5MiddlewareRepositorySession.java
@@ -105,18 +105,18 @@ public class Crypto5MiddlewareRepository
} catch (Exception e) {
next.onFetchFailed(e);
return;
}
next.onFetchedRecord(transformed);
}
@Override
- public void onFetchCompleted(final long fetchEnd) {
- next.onFetchCompleted(fetchEnd);
+ public void onFetchCompleted() {
+ next.onFetchCompleted();
}
@Override
public void onBatchCompleted() {
next.onBatchCompleted();
}
@Override
--- a/mobile/android/services/src/main/java/org/mozilla/gecko/sync/middleware/MiddlewareRepositorySession.java
+++ b/mobile/android/services/src/main/java/org/mozilla/gecko/sync/middleware/MiddlewareRepositorySession.java
@@ -170,9 +170,19 @@ public abstract class MiddlewareReposito
public void unbundle(RepositorySessionBundle bundle) {
inner.unbundle(bundle);
}
@Override
public long getLastSyncTimestamp() {
return inner.getLastSyncTimestamp();
}
+
+ @Override
+ public long getLastFetchTimestamp() {
+ return inner.getLastFetchTimestamp();
+ }
+
+ @Override
+ public long getLastStoreTimestamp() {
+ return inner.getLastStoreTimestamp();
+ }
}
--- a/mobile/android/services/src/main/java/org/mozilla/gecko/sync/repositories/RepositorySession.java
+++ b/mobile/android/services/src/main/java/org/mozilla/gecko/sync/repositories/RepositorySession.java
@@ -1,14 +1,17 @@
/* This Source Code Form is subject to the terms of the Mozilla Public
* License, v. 2.0. If a copy of the MPL was not distributed with this
* file, You can obtain one at http://mozilla.org/MPL/2.0/. */
package org.mozilla.gecko.sync.repositories;
+import android.support.annotation.Nullable;
+import android.util.Log;
+
import java.util.ArrayList;
import java.util.Collection;
import java.util.Iterator;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import org.mozilla.gecko.background.common.log.Logger;
import org.mozilla.gecko.sync.repositories.delegates.RepositorySessionBeginDelegate;
@@ -63,16 +66,49 @@ public abstract class RepositorySession
* This includes actual store work, and also the consequences of storeDone.
* This provides strict ordering.
*/
protected ExecutorService storeWorkQueue = Executors.newSingleThreadExecutor();
// The time that the last sync on this collection completed, in milliseconds since epoch.
private long lastSyncTimestamp = 0;
+ // As session progresses, it keeps track of the main points of interaction.
+ // If these timestamps aren't set, that means corresponding operation didn't complete.
+ private volatile Long fetchEnd;
+ private volatile Long storeEnd;
+
+ public void setLastFetchTimestamp(long timestamp) {
+ fetchEnd = timestamp;
+ }
+
+ public void setLastStoreTimestamp(long timestamp) {
+ storeEnd = timestamp;
+ }
+
+ /**
+ * @return timestamp of the last "fetch" from this session, or lastSyncTimestamp if fetch didn't happen.
+ */
+ public long getLastFetchTimestamp() {
+ if (fetchEnd != null) {
+ return fetchEnd;
+ }
+ return lastSyncTimestamp;
+ }
+
+ /**
+ * @return timestamp of the last "store" for this session, or lastSyncTimestamp if store didn't happen.
+ */
+ public long getLastStoreTimestamp() {
+ if (storeEnd != null) {
+ return storeEnd;
+ }
+ return lastSyncTimestamp;
+ }
+
public long getLastSyncTimestamp() {
return lastSyncTimestamp;
}
public static long now() {
return System.currentTimeMillis();
}
@@ -131,22 +167,23 @@ public abstract class RepositorySession
public void storeDone() {
// Our default behavior will be to assume that the Runnable is
// executed as soon as all the stores synchronously finish, so
// our end timestamp can just be… now.
// Sessions may override this behavior if the above assumption is incorrect.
// For example, a session may choose to build up buffers which will need to be flushed in
// storeDone, and so it will need to call onStoreComplete with the end timestamp after those
// operations complete.
+ Logger.debug(LOG_TAG, "Scheduling onStoreCompleted for after storing is done");
final long end = now();
- Logger.debug(LOG_TAG, "Scheduling onStoreCompleted for after storing is done: " + end);
Runnable command = new Runnable() {
@Override
public void run() {
- storeDelegate.onStoreCompleted(end);
+ setLastStoreTimestamp(end);
+ storeDelegate.onStoreCompleted();
}
};
storeWorkQueue.execute(command);
}
/**
* Indicates that a number of records have been stored, more are still to come but after some time,
* and now would be a good time to flush records and perform any other similar operations.
--- a/mobile/android/services/src/main/java/org/mozilla/gecko/sync/repositories/VersioningDelegateHelper.java
+++ b/mobile/android/services/src/main/java/org/mozilla/gecko/sync/repositories/VersioningDelegateHelper.java
@@ -142,18 +142,18 @@ public class VersioningDelegateHelper {
if (record.localVersion == null) {
throw new IllegalStateException("Encountered an unversioned record during versioned sync.");
}
localVersionsOfGuids.put(record.guid, record.localVersion);
this.inner.onFetchedRecord(record);
}
@Override
- public void onFetchCompleted(long fetchEnd) {
- this.inner.onFetchCompleted(fetchEnd);
+ public void onFetchCompleted() {
+ this.inner.onFetchCompleted();
}
@Override
public void onBatchCompleted() {
this.inner.onBatchCompleted();
}
@Override
@@ -192,18 +192,18 @@ public class VersioningDelegateHelper {
}
@Override
public void onRecordStoreSucceeded(String guid) {
inner.onRecordStoreSucceeded(guid);
}
@Override
- public void onStoreCompleted(long storeEnd) {
- inner.onStoreCompleted(storeEnd);
+ public void onStoreCompleted() {
+ inner.onStoreCompleted();
}
@Override
public void onStoreFailed(Exception e) {
inner.onStoreFailed(e);
}
@Override
--- a/mobile/android/services/src/main/java/org/mozilla/gecko/sync/repositories/android/BookmarksRepositorySession.java
+++ b/mobile/android/services/src/main/java/org/mozilla/gecko/sync/repositories/android/BookmarksRepositorySession.java
@@ -268,13 +268,14 @@ public class BookmarksRepositorySession
@Override
public void storeDone() {
storeWorkQueue.execute(sessionHelper.getStoreDoneRunnable(storeDelegate));
// Work queue is single-threaded, and so this should be well-ordered - onStoreComplete will run
// after the above runnable is finished.
storeWorkQueue.execute(new Runnable() {
@Override
public void run() {
- storeDelegate.onStoreCompleted(now());
+ setLastStoreTimestamp(now());
+ storeDelegate.onStoreCompleted();
}
});
}
}
\ No newline at end of file
--- a/mobile/android/services/src/main/java/org/mozilla/gecko/sync/repositories/android/BookmarksValidationRepository.java
+++ b/mobile/android/services/src/main/java/org/mozilla/gecko/sync/repositories/android/BookmarksValidationRepository.java
@@ -180,19 +180,19 @@ public class BookmarksValidationReposito
}
@Override
public void onFetchedRecord(Record record) {
local.add((BookmarkRecord)record);
}
@Override
- public void onFetchCompleted(long fetchEnd) {
+ public void onFetchCompleted() {
validateForTelemetry();
- delegate.onFetchCompleted(fetchEnd);
+ delegate.onFetchCompleted();
}
@Override
public void onBatchCompleted() {}
@Override
public RepositorySessionFetchRecordsDelegate deferredFetchDelegate(ExecutorService executor) {
return null;
--- a/mobile/android/services/src/main/java/org/mozilla/gecko/sync/repositories/android/FennecTabsRepository.java
+++ b/mobile/android/services/src/main/java/org/mozilla/gecko/sync/repositories/android/FennecTabsRepository.java
@@ -154,17 +154,18 @@ public class FennecTabsRepository extend
}
} finally {
cursor.close();
}
} catch (Exception e) {
delegate.onFetchFailed(e);
return;
}
- delegate.onFetchCompleted(now());
+ setLastFetchTimestamp(now());
+ delegate.onFetchCompleted();
}
};
delegateQueue.execute(command);
}
@Override
public void fetchModified(final RepositorySessionFetchRecordsDelegate delegate) {
@@ -198,17 +199,17 @@ public class FennecTabsRepository extend
public void fetch(final String[] guids,
final RepositorySessionFetchRecordsDelegate delegate) {
// Bug 783692: Now that Bug 730039 has landed, we could implement this,
// but it's not a priority since it's not used (yet).
Logger.warn(LOG_TAG, "Not returning anything from fetch");
delegateQueue.execute(new Runnable() {
@Override
public void run() {
- delegate.onFetchCompleted(now());
+ delegate.onFetchCompleted();
}
});
}
private static final String TABS_CLIENT_GUID_IS = BrowserContract.Tabs.CLIENT_GUID + " = ?";
private static final String CLIENT_GUID_IS = BrowserContract.Clients.GUID + " = ?";
@Override
--- a/mobile/android/services/src/main/java/org/mozilla/gecko/sync/repositories/android/FormHistoryRepositorySession.java
+++ b/mobile/android/services/src/main/java/org/mozilla/gecko/sync/repositories/android/FormHistoryRepositorySession.java
@@ -208,17 +208,18 @@ public class FormHistoryRepositorySessio
fetchFromCursor(cursor, filter, delegate); // Closes cursor.
} catch (Exception e) {
Logger.warn(LOG_TAG, "Exception during fetchHelper", e);
delegate.onFetchFailed(e);
return;
}
}
- delegate.onFetchCompleted(end);
+ setLastFetchTimestamp(end);
+ delegate.onFetchCompleted();
}
};
delegateQueue.execute(command);
}
protected static String regularBetween(long start, long end) {
return FormHistory.FIRST_USED + " >= " + Long.toString(1000 * start) + " AND " +
@@ -431,17 +432,18 @@ public class FormHistoryRepositorySessio
Runnable command = new Runnable() {
@Override
public void run() {
Logger.debug(LOG_TAG, "Checking for residual form history items to insert.");
try {
synchronized (recordsBufferMonitor) {
flushInsertQueue();
}
- storeDelegate.deferredStoreDelegate(storeWorkQueue).onStoreCompleted(now());
+ setLastStoreTimestamp(now());
+ storeDelegate.deferredStoreDelegate(storeWorkQueue).onStoreCompleted();
} catch (Exception e) {
// XXX TODO
storeDelegate.deferredStoreDelegate(storeWorkQueue).onRecordStoreFailed(e, null);
}
}
};
storeWorkQueue.execute(command);
}
--- a/mobile/android/services/src/main/java/org/mozilla/gecko/sync/repositories/android/HistoryRepositorySession.java
+++ b/mobile/android/services/src/main/java/org/mozilla/gecko/sync/repositories/android/HistoryRepositorySession.java
@@ -128,17 +128,18 @@ public class HistoryRepositorySession ex
@Override
public void storeDone() {
storeWorkQueue.execute(sessionHelper.getStoreDoneRunnable(storeDelegate));
// Work queue is single-threaded, and so this should be well-ordered - onStoreComplete will run
// after the above runnable is finished.
storeWorkQueue.execute(new Runnable() {
@Override
public void run() {
- storeDelegate.onStoreCompleted(now());
+ setLastStoreTimestamp(now());
+ storeDelegate.onStoreCompleted();
}
});
}
@Override
public void finish(RepositorySessionFinishDelegate delegate) throws InactiveSessionException {
sessionHelper.finish();
super.finish(delegate);
--- a/mobile/android/services/src/main/java/org/mozilla/gecko/sync/repositories/android/PasswordsRepositorySession.java
+++ b/mobile/android/services/src/main/java/org/mozilla/gecko/sync/repositories/android/PasswordsRepositorySession.java
@@ -94,17 +94,18 @@ public class PasswordsRepositorySession
dateModifiedWhereDeleted(timestamp),
null, null);
if (!fetchAndCloseCursorDeleted(deleted, true, filter, delegate)) {
return;
}
// Success!
try {
- delegate.onFetchCompleted(end);
+ setLastFetchTimestamp(end);
+ delegate.onFetchCompleted();
} catch (Exception e) {
Logger.error(LOG_TAG, "Delegate fetch completed callback failed.", e);
// Don't call failure callback.
return;
}
} catch (Exception e) {
Logger.error(LOG_TAG, "Exception in fetch.");
delegate.onFetchFailed(e);
@@ -124,21 +125,20 @@ public class PasswordsRepositorySession
public void fetchAll(RepositorySessionFetchRecordsDelegate delegate) {
this.fetchSince(-1, delegate);
}
@Override
public void fetch(final String[] guids, final RepositorySessionFetchRecordsDelegate delegate) {
if (guids == null || guids.length < 1) {
Logger.error(LOG_TAG, "No guids to be fetched.");
- final long end = now();
delegateQueue.execute(new Runnable() {
@Override
public void run() {
- delegate.onFetchCompleted(end);
+ delegate.onFetchCompleted();
}
});
return;
}
// Checks succeeded, now fetch.
final RecordFilter filter = this.storeTracker.getFilter();
final Runnable fetchRunnable = new Runnable() {
@@ -165,17 +165,18 @@ public class PasswordsRepositorySession
// Fetch records from deleted table.
Cursor deleted = deletedPasswordsHelper.safeQuery(passwordsProvider, ".fetch",
getAllDeletedColumns(),
where, guids, null);
if (!fetchAndCloseCursorDeleted(deleted, true, filter, delegate)) {
return;
}
- delegate.onFetchCompleted(end);
+ setLastFetchTimestamp(end);
+ delegate.onFetchCompleted();
} catch (Exception e) {
Logger.error(LOG_TAG, "Exception in fetch.");
delegate.onFetchFailed(e);
}
}
};
--- a/mobile/android/services/src/main/java/org/mozilla/gecko/sync/repositories/android/SessionHelper.java
+++ b/mobile/android/services/src/main/java/org/mozilla/gecko/sync/repositories/android/SessionHelper.java
@@ -328,17 +328,17 @@ import org.mozilla.gecko.sync.repositori
// reconcile attempts.
if (reconcileAttempt > maxReconcileAttempts) {
Logger.error(LOG_TAG, "Failed to store record within maximum number of allowed attempts: " + record.guid);
storeDelegate.onRecordStoreFailed(
new IllegalStateException("Reached maximum storage attempts for a record"),
record.guid
);
} else {
- Logger.info(LOG_TAG, "Stored after reconcile attempt #" + reconcileAttempt);
+ Logger.debug(LOG_TAG, "Stored after reconcile attempt #" + reconcileAttempt);
}
} catch (MultipleRecordsForGuidException e) {
Logger.error(LOG_TAG, "Multiple records returned for given guid: " + record.guid);
storeDelegate.onRecordStoreFailed(e, record.guid);
} catch (NoGuidForIdException e) {
Logger.error(LOG_TAG, "Store failed for " + record.guid, e);
storeDelegate.onRecordStoreFailed(e, record.guid);
} catch (Exception e) {
@@ -561,32 +561,33 @@ import org.mozilla.gecko.sync.repositori
this.delegate = delegate;
}
/* package-private */ void fetchFromCursor(Cursor cursor, RecordFilter filter, long end) {
Logger.debug(LOG_TAG, "Fetch from cursor:");
try {
try {
if (!cursor.moveToFirst()) {
- delegate.onFetchCompleted(end);
+ delegate.onFetchCompleted();
return;
}
while (!cursor.isAfterLast()) {
Record r = retrieveDuringFetch(cursor);
if (r != null) {
if (filter == null || !filter.excludeRecord(r)) {
Logger.trace(LOG_TAG, "Processing record " + r.guid);
delegate.onFetchedRecord(transformRecord(r));
} else {
Logger.debug(LOG_TAG, "Skipping filtered record " + r.guid);
}
}
cursor.moveToNext();
}
- delegate.onFetchCompleted(end);
+ session.setLastFetchTimestamp(end);
+ delegate.onFetchCompleted();
// } catch (NoGuidForIdException e) {
// Logger.warn(LOG_TAG, "No GUID for ID.", e);
// delegate.onFetchFailed(e);
} catch (Exception e) {
Logger.warn(LOG_TAG, "Exception in fetchFromCursor.", e);
delegate.onFetchFailed(e);
return;
}
--- a/mobile/android/services/src/main/java/org/mozilla/gecko/sync/repositories/delegates/DeferredRepositorySessionFetchRecordsDelegate.java
+++ b/mobile/android/services/src/main/java/org/mozilla/gecko/sync/repositories/delegates/DeferredRepositorySessionFetchRecordsDelegate.java
@@ -32,21 +32,21 @@ public class DeferredRepositorySessionFe
@Override
public void run() {
inner.onFetchFailed(ex);
}
});
}
@Override
- public void onFetchCompleted(final long fetchEnd) {
+ public void onFetchCompleted() {
executor.execute(new Runnable() {
@Override
public void run() {
- inner.onFetchCompleted(fetchEnd);
+ inner.onFetchCompleted();
}
});
}
@Override
public void onBatchCompleted() {
executor.execute(new Runnable() {
@Override
--- a/mobile/android/services/src/main/java/org/mozilla/gecko/sync/repositories/delegates/DeferredRepositorySessionStoreDelegate.java
+++ b/mobile/android/services/src/main/java/org/mozilla/gecko/sync/repositories/delegates/DeferredRepositorySessionStoreDelegate.java
@@ -41,21 +41,21 @@ public class DeferredRepositorySessionSt
public RepositorySessionStoreDelegate deferredStoreDelegate(ExecutorService newExecutor) {
if (newExecutor == executor) {
return this;
}
throw new IllegalArgumentException("Can't re-defer this delegate.");
}
@Override
- public void onStoreCompleted(final long storeEnd) {
+ public void onStoreCompleted() {
executor.execute(new Runnable() {
@Override
public void run() {
- inner.onStoreCompleted(storeEnd);
+ inner.onStoreCompleted();
}
});
}
@Override
public void onStoreFailed(final Exception e) {
executor.execute(new Runnable() {
@Override
--- a/mobile/android/services/src/main/java/org/mozilla/gecko/sync/repositories/delegates/RepositorySessionFetchRecordsDelegate.java
+++ b/mobile/android/services/src/main/java/org/mozilla/gecko/sync/repositories/delegates/RepositorySessionFetchRecordsDelegate.java
@@ -16,17 +16,17 @@ public interface RepositorySessionFetchR
* Called when all records in this fetch have been returned.
*
* @param fetchEnd
* A millisecond-resolution timestamp indicating the *remote* timestamp
* at the end of the range of records. Usually this is the timestamp at
* which the request was received.
* E.g., the (normalized) value of the X-Weave-Timestamp header.
*/
- void onFetchCompleted(final long fetchEnd);
+ void onFetchCompleted();
/**
* Called when a number of records have been returned but more are still expected to come,
* possibly after a certain pause.
*/
void onBatchCompleted();
RepositorySessionFetchRecordsDelegate deferredFetchDelegate(ExecutorService executor);
--- a/mobile/android/services/src/main/java/org/mozilla/gecko/sync/repositories/delegates/RepositorySessionStoreDelegate.java
+++ b/mobile/android/services/src/main/java/org/mozilla/gecko/sync/repositories/delegates/RepositorySessionStoreDelegate.java
@@ -18,12 +18,12 @@ public interface RepositorySessionStoreD
// Meant for signaling that a record has been reconciled.
// Only makes sense in context of local repositories.
// Further call to onRecordStoreSucceeded is necessary.
void onRecordStoreReconciled(String guid, String oldGuid, Integer newVersion);
// Called with a GUID when store has succeeded.
void onRecordStoreSucceeded(String guid);
- void onStoreCompleted(long storeEnd);
+ void onStoreCompleted();
void onStoreFailed(Exception e);
RepositorySessionStoreDelegate deferredStoreDelegate(ExecutorService executor);
}
--- a/mobile/android/services/src/main/java/org/mozilla/gecko/sync/repositories/downloaders/BatchingDownloader.java
+++ b/mobile/android/services/src/main/java/org/mozilla/gecko/sync/repositories/downloaders/BatchingDownloader.java
@@ -191,37 +191,40 @@ public class BatchingDownloader {
if (lastModifiedChanged) {
this.handleFetchFailed(
fetchRecordsDelegate,
new CollectionConcurrentModificationException()
);
return;
}
+ // We fetched and supposedly processed some records, so move forward the "last fetch" timestamp.
+ final long normalizedTimestamp = response.normalizedTimestampForHeader(SyncResponse.X_LAST_MODIFIED);
+ repositorySession.setLastFetchTimestamp(normalizedTimestamp);
+
// If we can (or must) stop batching at this point, let the delegate know that we're all done!
final String offset = response.weaveOffset();
if (offset == null || !allowMultipleBatches) {
- final long normalizedTimestamp = response.normalizedTimestampForHeader(SyncResponse.X_LAST_MODIFIED);
Logger.debug(LOG_TAG, "Fetch completed. Timestamp is " + normalizedTimestamp);
// This isn't great, but shouldn't be too problematic - but do see notes below.
// Failing to reset a resume context after we're done with batching means that on next
// sync we'll erroneously try to resume downloading. If resume proceeds, we will fetch
// from an older timestamp, but offset by the amount of records we've fetched prior.
// Since we're diligent about setting a X-I-U-S header, any remote collection changes
// will be caught and we'll receive a 412.
if (!BatchingDownloaderController.resetResumeContextAndCommit(this.stateProvider)) {
Logger.warn(LOG_TAG, "Failed to reset resume context while completing a batch");
}
this.workTracker.delayWorkItem(new Runnable() {
@Override
public void run() {
Logger.debug(LOG_TAG, "Delayed onFetchCompleted running.");
- fetchRecordsDelegate.onFetchCompleted(normalizedTimestamp);
+ fetchRecordsDelegate.onFetchCompleted();
}
});
return;
}
// This is unfortunate, but largely just means that in case we need to resume later on, it
// either won't be possible (and we'll fetch w/o resuming), or won't be as efficient (i.e.
// we'll download more records than necessary).
--- a/mobile/android/services/src/main/java/org/mozilla/gecko/sync/repositories/uploaders/BatchingUploader.java
+++ b/mobile/android/services/src/main/java/org/mozilla/gecko/sync/repositories/uploaders/BatchingUploader.java
@@ -241,18 +241,22 @@ public class BatchingUploader {
payloadDispatcher.finalizeQueue(uploaderMeta.needToCommit(), new Runnable() {
@Override
public void run() {
flush(true, true);
}
});
}
- /* package-local */ void finished(AtomicLong lastModifiedTimestamp) {
- sessionStoreDelegate.deferredStoreDelegate(executor).onStoreCompleted(lastModifiedTimestamp.get());
+ /* package-local */ void setLastStoreTimestamp(AtomicLong lastModifiedTimestamp) {
+ repositorySession.setLastStoreTimestamp(lastModifiedTimestamp.get());
+ }
+
+ /* package-local */ void finished() {
+ sessionStoreDelegate.deferredStoreDelegate(executor).onStoreCompleted();
}
// Will be called from a thread dispatched by PayloadDispatcher.
// NB: Access to `uploaderMeta.isUnlimited` is guarded by the payloadLock.
/* package-local */ void setUnlimitedMode(boolean isUnlimited) {
// If we know for sure that we're not in a batching mode,
// consider our batch to be of unlimited size.
this.uploaderMeta.setIsUnlimited(isUnlimited);
--- a/mobile/android/services/src/main/java/org/mozilla/gecko/sync/repositories/uploaders/PayloadDispatcher.java
+++ b/mobile/android/services/src/main/java/org/mozilla/gecko/sync/repositories/uploaders/PayloadDispatcher.java
@@ -83,45 +83,46 @@ class PayloadDispatcher {
}
// We consider records to have been committed if we're not in a batching mode or this was a commit.
// If records have been committed, notify our store delegate.
if (!batchWhiteboard.getInBatchingMode() || isCommit) {
for (String guid : guids) {
uploader.sessionStoreDelegate.onRecordStoreSucceeded(guid);
}
+
+ // If we're not in a batching mode, or just committed a batch, uploaded records have
+ // been applied to the server storage and are now visible to other clients.
+ // Therefore, we bump our local "last store" timestamp.
+ bumpTimestampTo(uploadTimestamp, response.normalizedTimestampForHeader(SyncResponse.X_LAST_MODIFIED));
+ uploader.setLastStoreTimestamp(uploadTimestamp);
}
// If this was our very last commit, we're done storing records.
// Get Last-Modified timestamp from the response, and pass it upstream.
if (isLastPayload) {
- finished(response.normalizedTimestampForHeader(SyncResponse.X_LAST_MODIFIED));
+ uploader.finished();
}
}
void lastPayloadFailed(Exception e) {
uploader.sessionStoreDelegate.onStoreFailed(e);
}
- private void finished(long lastModifiedTimestamp) {
- bumpTimestampTo(uploadTimestamp, lastModifiedTimestamp);
- uploader.finished(uploadTimestamp);
- }
-
void finalizeQueue(final boolean needToCommit, final Runnable finalRunnable) {
executor.execute(new NonPayloadContextRunnable() {
@Override
public void run() {
// Must be called after last payload upload finishes.
if (needToCommit && Boolean.TRUE.equals(batchWhiteboard.getInBatchingMode())) {
finalRunnable.run();
// Otherwise, we're done.
} else {
- uploader.finished(uploadTimestamp);
+ uploader.finished();
}
}
});
}
void recordFailed(final String recordGuid) {
recordFailed(new Server15RecordPostFailedException(), recordGuid);
}
--- a/mobile/android/services/src/main/java/org/mozilla/gecko/sync/synchronizer/RecordsChannel.java
+++ b/mobile/android/services/src/main/java/org/mozilla/gecko/sync/synchronizer/RecordsChannel.java
@@ -69,17 +69,16 @@ public class RecordsChannel implements
RepositorySessionStoreDelegate,
RecordsConsumerDelegate,
RepositorySessionBeginDelegate {
private static final String LOG_TAG = "RecordsChannel";
public RepositorySession source;
private RepositorySession sink;
private final RecordsChannelDelegate delegate;
- private long fetchEnd = -1;
private volatile ReflowIsNecessaryException reflowException;
private final AtomicInteger fetchedCount = new AtomicInteger();
private final AtomicInteger fetchFailedCount = new AtomicInteger();
// Expected value relationships:
// attempted = accepted + failed
@@ -172,18 +171,17 @@ public class RecordsChannel implements
failed = sink;
}
this.delegate.onFlowBeginFailed(this, new SessionNotBegunException(failed));
return;
}
if (!source.dataAvailable()) {
Logger.info(LOG_TAG, "No data available: short-circuiting flow from source " + source);
- long now = System.currentTimeMillis();
- this.delegate.onFlowCompleted(this, now, now);
+ this.delegate.onFlowCompleted(this);
return;
}
sink.setStoreDelegate(this);
fetchedCount.set(0);
fetchFailedCount.set(0);
storeAttemptedCount.set(0);
storeAcceptedCount.set(0);
@@ -231,20 +229,18 @@ public class RecordsChannel implements
@Override
public void onFetchedRecord(Record record) {
fetchedCount.incrementAndGet();
this.toProcess.add(record);
this.consumer.doNotify();
}
@Override
- public void onFetchCompleted(final long fetchEnd) {
+ public void onFetchCompleted() {
Logger.trace(LOG_TAG, "onFetchCompleted. Stopping consumer once stores are done.");
- Logger.trace(LOG_TAG, "Fetch timestamp is " + fetchEnd);
- this.fetchEnd = fetchEnd;
this.consumer.queueFilled();
}
@Override
public void onBatchCompleted() {
this.sink.storeFlush();
}
@@ -285,30 +281,29 @@ public class RecordsChannel implements
public void consumerIsDonePartial() {
Logger.trace(LOG_TAG, "Consumer is done, processed some records. Are we waiting for it? " + waitingForQueueDone);
if (waitingForQueueDone) {
waitingForQueueDone = false;
// Let sink clean up or flush records if necessary.
this.sink.storeIncomplete();
- delegate.onFlowCompleted(this, fetchEnd, System.currentTimeMillis());
+ delegate.onFlowCompleted(this);
}
}
@Override
- public void onStoreCompleted(long storeEnd) {
- Logger.trace(LOG_TAG, "onStoreCompleted. Notifying delegate of onFlowCompleted. " +
- "Fetch end is " + fetchEnd + ", store end is " + storeEnd);
+ public void onStoreCompleted() {
+ Logger.trace(LOG_TAG, "onStoreCompleted. Notifying delegate of onFlowCompleted.");
// Source might have used caches used to facilitate flow of records, so now is a good
// time to clean up. Particularly pertinent for buffered sources.
// Rephrasing this in a more concrete way, buffers are cleared only once records have been merged
// locally and results of the merge have been uploaded to the server successfully.
this.source.performCleanup();
- delegate.onFlowCompleted(this, fetchEnd, storeEnd);
+ delegate.onFlowCompleted(this);
}
@Override
public void onStoreFailed(Exception ex) {
Logger.warn(LOG_TAG, "onStoreFailed. Calling for immediate stop.", ex);
if (ex instanceof ReflowIsNecessaryException) {
setReflowException((ReflowIsNecessaryException) ex);
@@ -329,17 +324,17 @@ public class RecordsChannel implements
// Prevent "once consumer is done..." actions from taking place. They already have (case 2), or
// we don't need them (case 1).
waitingForQueueDone = false;
// If consumer is still going at it, tell it to stop.
this.consumer.halt();
delegate.onFlowStoreFailed(this, ex, null);
- delegate.onFlowCompleted(this, fetchEnd, System.currentTimeMillis());
+ delegate.onFlowCompleted(this);
}
@Override
public void onBeginFailed(Exception ex) {
delegate.onFlowBeginFailed(this, ex);
}
@Override
--- a/mobile/android/services/src/main/java/org/mozilla/gecko/sync/synchronizer/RecordsChannelDelegate.java
+++ b/mobile/android/services/src/main/java/org/mozilla/gecko/sync/synchronizer/RecordsChannelDelegate.java
@@ -1,12 +1,12 @@
/* This Source Code Form is subject to the terms of the Mozilla Public
* License, v. 2.0. If a copy of the MPL was not distributed with this
* file, You can obtain one at http://mozilla.org/MPL/2.0/. */
package org.mozilla.gecko.sync.synchronizer;
public interface RecordsChannelDelegate {
- public void onFlowCompleted(RecordsChannel recordsChannel, long fetchEnd, long storeEnd);
+ public void onFlowCompleted(RecordsChannel recordsChannel);
public void onFlowBeginFailed(RecordsChannel recordsChannel, Exception ex);
public void onFlowFetchFailed(RecordsChannel recordsChannel, Exception ex);
public void onFlowStoreFailed(RecordsChannel recordsChannel, Exception ex, String recordGuid);
}
\ No newline at end of file
--- a/mobile/android/services/src/main/java/org/mozilla/gecko/sync/synchronizer/ServerLocalSynchronizerSession.java
+++ b/mobile/android/services/src/main/java/org/mozilla/gecko/sync/synchronizer/ServerLocalSynchronizerSession.java
@@ -24,17 +24,17 @@ import org.mozilla.gecko.sync.repositori
public class ServerLocalSynchronizerSession extends SynchronizerSession {
protected static final String LOG_TAG = "ServLocSynchronizerSess";
public ServerLocalSynchronizerSession(Synchronizer synchronizer, SynchronizerSessionDelegate delegate) {
super(synchronizer, delegate);
}
@Override
- public void onFirstFlowCompleted(RecordsChannel recordsChannel, long fetchEnd, long storeEnd) {
+ public void onFirstFlowCompleted(RecordsChannel recordsChannel) {
// If a "reflow exception" was thrown, consider this synchronization failed.
final ReflowIsNecessaryException reflowException = recordsChannel.getReflowException();
if (reflowException != null) {
final String message = "Reflow is necessary: " + reflowException;
Logger.warn(LOG_TAG, message + " Aborting session.");
delegate.onSynchronizeFailed(this, reflowException, message);
return;
}
@@ -53,21 +53,21 @@ public class ServerLocalSynchronizerSess
int numLocalStoreFailed = recordsChannel.getStoreFailureCount();
if (numLocalStoreFailed > 0) {
final String message = "Got " + numLocalStoreFailed + " failures storing local records!";
Logger.warn(LOG_TAG, message + " Ignoring local store failures and continuing synchronizer session.");
} else {
Logger.trace(LOG_TAG, "No failures storing local records.");
}
- super.onFirstFlowCompleted(recordsChannel, fetchEnd, storeEnd);
+ super.onFirstFlowCompleted(recordsChannel);
}
@Override
- public void onSecondFlowCompleted(RecordsChannel recordsChannel, long fetchEnd, long storeEnd) {
+ public void onSecondFlowCompleted(RecordsChannel recordsChannel) {
// If a "reflow exception" was thrown, consider this synchronization failed.
final ReflowIsNecessaryException reflowException = recordsChannel.getReflowException();
if (reflowException != null) {
final String message = "Reflow is necessary: " + reflowException;
Logger.warn(LOG_TAG, message + " Aborting session.");
delegate.onSynchronizeFailed(this, reflowException, message);
return;
}
@@ -87,11 +87,11 @@ public class ServerLocalSynchronizerSess
if (numRemoteStoreFailed > 0) {
final String message = "Got " + numRemoteStoreFailed + " failures storing remote records!";
Logger.warn(LOG_TAG, message + " Aborting session.");
delegate.onSynchronizeFailed(this, new StoreFailedException(), message);
return;
}
Logger.trace(LOG_TAG, "No failures storing remote records.");
- super.onSecondFlowCompleted(recordsChannel, fetchEnd, storeEnd);
+ super.onSecondFlowCompleted(recordsChannel);
}
}
--- a/mobile/android/services/src/main/java/org/mozilla/gecko/sync/synchronizer/SynchronizerSession.java
+++ b/mobile/android/services/src/main/java/org/mozilla/gecko/sync/synchronizer/SynchronizerSession.java
@@ -59,20 +59,22 @@ implements RecordsChannelDelegate,
/*
* Computed during init.
*/
private RepositorySession sessionA;
private RepositorySession sessionB;
private RepositorySessionBundle bundleA;
private RepositorySessionBundle bundleB;
- // Bug 726054: just like desktop, we track our last interaction with the server,
- // not the last record timestamp that we fetched. This ensures that we don't re-
- // download the records we just uploaded, at the cost of skipping any records
- // that a concurrently syncing client has uploaded.
+ // Bug 1392505: for each "side" of the channel, we keep track of lastFetch and lastStore timestamps.
+ // For local repositories these timestamps represent our last interactions with local data.
+ // For the remote repository these timestamps represent server collection's last-modified
+ // timestamp after a corresponding operation (GET or POST) finished. We obtain these from server's
+ // response headers.
+ // It's important that we never compare timestamps which originated from different clocks.
private long pendingATimestamp = -1;
private long pendingBTimestamp = -1;
private long storeEndATimestamp = -1;
private long storeEndBTimestamp = -1;
private boolean flowAToBCompleted = false;
private boolean flowBToACompleted = false;
private final AtomicInteger numInboundRecords = new AtomicInteger(-1);
@@ -195,18 +197,18 @@ implements RecordsChannelDelegate,
// This is the *second* record channel to flow.
// I, SynchronizerSession, am the delegate for the *second* flow.
channelBToA = new RecordsChannel(this.sessionB, this.sessionA, this);
// This is the delegate for the *first* flow.
RecordsChannelDelegate channelAToBDelegate = new RecordsChannelDelegate() {
@Override
- public void onFlowCompleted(RecordsChannel recordsChannel, long fetchEnd, long storeEnd) {
- session.onFirstFlowCompleted(recordsChannel, fetchEnd, storeEnd);
+ public void onFlowCompleted(RecordsChannel recordsChannel) {
+ session.onFirstFlowCompleted(recordsChannel);
}
@Override
public void onFlowBeginFailed(RecordsChannel recordsChannel, Exception ex) {
Logger.warn(LOG_TAG, "First RecordsChannel onFlowBeginFailed. Logging session error.", ex);
session.delegate.onSynchronizeFailed(session, ex, "Failed to begin first flow.");
}
@@ -237,63 +239,58 @@ implements RecordsChannelDelegate,
}
}
/**
* Called after the first flow completes.
* <p>
* By default, any fetch and store failures are ignored.
* @param recordsChannel the <code>RecordsChannel</code> (for error testing).
- * @param fetchEnd timestamp when fetches completed.
- * @param storeEnd timestamp when stores completed.
*/
- public void onFirstFlowCompleted(RecordsChannel recordsChannel, long fetchEnd, long storeEnd) {
+ public void onFirstFlowCompleted(RecordsChannel recordsChannel) {
Logger.trace(LOG_TAG, "First RecordsChannel onFlowCompleted.");
- Logger.debug(LOG_TAG, "Fetch end is " + fetchEnd + ". Store end is " + storeEnd + ". Starting next.");
- pendingATimestamp = fetchEnd;
- storeEndBTimestamp = storeEnd;
+ pendingATimestamp = sessionA.getLastFetchTimestamp();
+ storeEndBTimestamp = sessionB.getLastStoreTimestamp();
+ Logger.debug(LOG_TAG, "Fetch end is " + pendingATimestamp + ". Store end is " + storeEndBTimestamp + ". Starting next.");
numInboundRecords.set(recordsChannel.getFetchCount());
numInboundRecordsStored.set(recordsChannel.getStoreAcceptedCount());
numInboundRecordsFailed.set(recordsChannel.getStoreFailureCount());
numInboundRecordsReconciled.set(recordsChannel.getStoreReconciledCount());
flowAToBCompleted = true;
channelBToA.flow();
}
/**
* Called after the second flow completes.
* <p>
* By default, any fetch and store failures are ignored.
* @param recordsChannel the <code>RecordsChannel</code> (for error testing).
- * @param fetchEnd timestamp when fetches completed.
- * @param storeEnd timestamp when stores completed.
*/
- public void onSecondFlowCompleted(RecordsChannel recordsChannel, long fetchEnd, long storeEnd) {
+ public void onSecondFlowCompleted(RecordsChannel recordsChannel) {
Logger.trace(LOG_TAG, "Second RecordsChannel onFlowCompleted.");
- Logger.debug(LOG_TAG, "Fetch end is " + fetchEnd + ". Store end is " + storeEnd + ". Finishing.");
-
- pendingBTimestamp = fetchEnd;
- storeEndATimestamp = storeEnd;
+ pendingBTimestamp = sessionB.getLastFetchTimestamp();
+ storeEndATimestamp = sessionA.getLastStoreTimestamp();
+ Logger.debug(LOG_TAG, "Fetch end is " + pendingBTimestamp + ". Store end is " + storeEndATimestamp + ". Finishing.");
numOutboundRecords.set(recordsChannel.getFetchCount());
numOutboundRecordsStored.set(recordsChannel.getStoreAcceptedCount());
numOutboundRecordsFailed.set(recordsChannel.getStoreFailureCount());
flowBToACompleted = true;
// Finish the two sessions.
try {
this.sessionA.finish(this);
} catch (InactiveSessionException e) {
this.onFinishFailed(e);
return;
}
}
@Override
- public void onFlowCompleted(RecordsChannel recordsChannel, long fetchEnd, long storeEnd) {
- onSecondFlowCompleted(recordsChannel, fetchEnd, storeEnd);
+ public void onFlowCompleted(RecordsChannel recordsChannel) {
+ onSecondFlowCompleted(recordsChannel);
}
@Override
public void onFlowBeginFailed(RecordsChannel recordsChannel, Exception ex) {
Logger.warn(LOG_TAG, "Second RecordsChannel onFlowBeginFailed. Logging session error.", ex);
this.delegate.onSynchronizeFailed(this, ex, "Failed to begin second flow.");
}
--- a/mobile/android/tests/background/junit3/src/org/mozilla/gecko/background/db/TestBookmarks.java
+++ b/mobile/android/tests/background/junit3/src/org/mozilla/gecko/background/db/TestBookmarks.java
@@ -621,17 +621,17 @@ public class TestBookmarks extends Andro
RepositorySessionFetchRecordsDelegate fetchDelegate = new SimpleSuccessFetchDelegate() {
@Override
public void onFetchedRecord(Record record) {
fetchedGUIDs.add(record.guid);
}
@Override
- public void onFetchCompleted(long end) {
+ public void onFetchCompleted() {
finishAndNotify(session);
}
@Override
public void onBatchCompleted() {
}
};
@@ -656,17 +656,17 @@ public class TestBookmarks extends Andro
RepositorySessionFetchRecordsDelegate fetchDelegate = new SimpleSuccessFetchDelegate() {
@Override
public void onFetchedRecord(Record record) {
fetchedRecord = record;
}
@Override
- public void onFetchCompleted(long end) {
+ public void onFetchCompleted() {
finishAndNotify(session);
}
@Override
public void onBatchCompleted() {
}
};
@@ -691,17 +691,17 @@ public class TestBookmarks extends Andro
final BookmarkRecord[] records,
final Collection<String> tracked) {
SimpleSuccessBeginDelegate beginDelegate = new SimpleSuccessBeginDelegate() {
@Override
public void onBeginSucceeded(final RepositorySession session) {
RepositorySessionStoreDelegate storeDelegate = new SimpleSuccessStoreDelegate() {
@Override
- public void onStoreCompleted(final long storeEnd) {
+ public void onStoreCompleted() {
// Pass back whatever we tracked.
if (tracked != null) {
Iterator<String> iter = session.getTrackedRecordIDs();
while (iter.hasNext()) {
tracked.add(iter.next());
}
}
finishAndNotify(session);
--- a/mobile/android/tests/background/junit3/src/org/mozilla/gecko/background/db/ThreadedRepositoryTestCase.java
+++ b/mobile/android/tests/background/junit3/src/org/mozilla/gecko/background/db/ThreadedRepositoryTestCase.java
@@ -552,17 +552,17 @@ public abstract class ThreadedRepository
dispose(session);
}
// Special delegate so that we don't verify parenting is correct since
// at some points it won't be since parent folder hasn't been stored.
private DefaultFetchDelegate getTimestampDelegate(final String guid) {
return new DefaultFetchDelegate() {
@Override
- public void onFetchCompleted(final long fetchEnd) {
+ public void onFetchCompleted() {
assertEquals(guid, this.records.get(0).guid);
performNotify();
}
};
}
/*
* Insert a record that is marked as deleted, local has newer timestamp
--- a/mobile/android/tests/background/junit3/src/org/mozilla/gecko/background/sync/TestStoreTracking.java
+++ b/mobile/android/tests/background/junit3/src/org/mozilla/gecko/background/sync/TestStoreTracking.java
@@ -62,43 +62,43 @@ public class TestStoreTracking extends A
@Override
public void onRecordStoreReconciled(String guid, String oldGuid, Integer newVersion) {
Logger.debug(getName(), "Reconciled " + guid);
assertEq(expectedGUID, guid);
}
@Override
- public void onStoreCompleted(long storeEnd) {
- Logger.debug(getName(), "Store completed at " + storeEnd + ".");
+ public void onStoreCompleted() {
+ Logger.debug(getName(), "Store completed.");
try {
session.fetch(new String[] { expectedGUID }, new SimpleSuccessFetchDelegate() {
@Override
public void onFetchedRecord(Record record) {
Logger.debug(getName(), "Hurrah! Fetched record " + record.guid);
assertEq(expectedGUID, record.guid);
}
@Override
- public void onFetchCompleted(final long fetchEnd) {
- Logger.debug(getName(), "Fetch completed at " + fetchEnd + ".");
+ public void onFetchCompleted() {
+ Logger.debug(getName(), "Fetch completed.");
// But fetching by time returns nothing.
session.fetchModified(new SimpleSuccessFetchDelegate() {
private AtomicBoolean fetched = new AtomicBoolean(false);
@Override
public void onFetchedRecord(Record record) {
Logger.debug(getName(), "Fetched record " + record.guid);
fetched.set(true);
performNotify(new AssertionFailedError("Should have fetched no record!"));
}
@Override
- public void onFetchCompleted(final long fetchEnd) {
+ public void onFetchCompleted() {
if (fetched.get()) {
Logger.debug(getName(), "Not finishing session: record retrieved.");
return;
}
try {
session.finish(new SimpleSuccessFinishDelegate() {
@Override
public void onFinishSucceeded(RepositorySession session,
@@ -158,17 +158,17 @@ public class TestStoreTracking extends A
session.fetchModified(new SimpleSuccessFetchDelegate() {
@Override
public void onFetchedRecord(Record record) {
assertEq(expectedGUID, record.guid);
}
@Override
- public void onFetchCompleted(long end) {
+ public void onFetchCompleted() {
try {
session.finish(new SimpleSuccessFinishDelegate() {
@Override
public void onFinishSucceeded(RepositorySession session,
RepositorySessionBundle bundle) {
// Hooray!
performNotify();
}
--- a/mobile/android/tests/background/junit3/src/org/mozilla/gecko/background/sync/helpers/DefaultFetchDelegate.java
+++ b/mobile/android/tests/background/junit3/src/org/mozilla/gecko/background/sync/helpers/DefaultFetchDelegate.java
@@ -26,19 +26,18 @@ public class DefaultFetchDelegate extend
public ArrayList<Record> records = new ArrayList<Record>();
public Set<String> ignore = new HashSet<String>();
@Override
public void onFetchFailed(Exception ex) {
performNotify("Fetch failed.", ex);
}
- protected void onDone(ArrayList<Record> records, HashMap<String, Record> expected, long end) {
+ protected void onDone(ArrayList<Record> records, HashMap<String, Record> expected) {
Logger.debug(LOG_TAG, "onDone.");
- Logger.debug(LOG_TAG, "End timestamp is " + end);
Logger.debug(LOG_TAG, "Expected is " + expected);
Logger.debug(LOG_TAG, "Records is " + records);
Set<String> foundGuids = new HashSet<String>();
try {
int expectedCount = 0;
int expectedFound = 0;
Logger.debug(LOG_TAG, "Counting expected keys.");
for (String key : expected.keySet()) {
@@ -90,17 +89,17 @@ public class DefaultFetchDelegate extend
@Override
public void onFetchedRecord(Record record) {
Logger.debug(LOG_TAG, "onFetchedRecord(" + record.guid + ")");
records.add(record);
}
@Override
- public void onFetchCompleted(final long fetchEnd) {
+ public void onFetchCompleted() {
Logger.debug(LOG_TAG, "onFetchCompleted. Doing nothing.");
}
@Override
public void onBatchCompleted() {
Logger.debug(LOG_TAG, "onBatchCompleted. Doing nothing.");
}
--- a/mobile/android/tests/background/junit3/src/org/mozilla/gecko/background/sync/helpers/DefaultStoreDelegate.java
+++ b/mobile/android/tests/background/junit3/src/org/mozilla/gecko/background/sync/helpers/DefaultStoreDelegate.java
@@ -15,17 +15,17 @@ public class DefaultStoreDelegate extend
}
@Override
public void onRecordStoreSucceeded(String guid) {
performNotify("DefaultStoreDelegate used", null);
}
@Override
- public void onStoreCompleted(long storeEnd) {
+ public void onStoreCompleted() {
performNotify("DefaultStoreDelegate used", null);
}
@Override
public void onStoreFailed(Exception ex) {
performNotify("Store failed", ex);
}
@@ -63,21 +63,21 @@ public class DefaultStoreDelegate extend
@Override
public void run() {
self.onRecordStoreReconciled(guid, null, null);
}
});
}
@Override
- public void onStoreCompleted(final long storeEnd) {
+ public void onStoreCompleted() {
executor.execute(new Runnable() {
@Override
public void run() {
- self.onStoreCompleted(storeEnd);
+ self.onStoreCompleted();
}
});
}
@Override
public void onStoreFailed(Exception e) {
}
--- a/mobile/android/tests/background/junit3/src/org/mozilla/gecko/background/sync/helpers/ExpectFetchDelegate.java
+++ b/mobile/android/tests/background/junit3/src/org/mozilla/gecko/background/sync/helpers/ExpectFetchDelegate.java
@@ -17,16 +17,16 @@ public class ExpectFetchDelegate extends
}
@Override
public void onFetchedRecord(Record record) {
this.records.add(record);
}
@Override
- public void onFetchCompleted(final long fetchEnd) {
- super.onDone(this.records, this.expect, fetchEnd);
+ public void onFetchCompleted() {
+ super.onDone(this.records, this.expect);
}
public Record recordAt(int i) {
return this.records.get(i);
}
}
--- a/mobile/android/tests/background/junit3/src/org/mozilla/gecko/background/sync/helpers/ExpectFetchSinceDelegate.java
+++ b/mobile/android/tests/background/junit3/src/org/mozilla/gecko/background/sync/helpers/ExpectFetchSinceDelegate.java
@@ -19,17 +19,17 @@ public class ExpectFetchSinceDelegate ex
public ExpectFetchSinceDelegate(long timestamp, String[] guids) {
expected = guids;
earliest = timestamp;
Arrays.sort(expected);
}
@Override
- public void onFetchCompleted(final long fetchEnd) {
+ public void onFetchCompleted() {
AssertionFailedError err = null;
try {
int countSpecials = 0;
for (Record record : records) {
// Check if record should be ignored.
if (!ignore.contains(record.guid)) {
assertFalse(-1 == Arrays.binarySearch(this.expected, record.guid));
} else {
--- a/mobile/android/tests/background/junit3/src/org/mozilla/gecko/background/sync/helpers/ExpectManyStoredDelegate.java
+++ b/mobile/android/tests/background/junit3/src/org/mozilla/gecko/background/sync/helpers/ExpectManyStoredDelegate.java
@@ -22,17 +22,17 @@ public class ExpectManyStoredDelegate ex
for (Record record : records) {
s.add(record.guid);
}
expectedGUIDs = s;
stored = new AtomicLong(0);
}
@Override
- public void onStoreCompleted(long storeEnd) {
+ public void onStoreCompleted() {
try {
assertEquals(expectedGUIDs.size(), stored.get());
performNotify();
} catch (AssertionFailedError e) {
performNotify(e);
}
}
--- a/mobile/android/tests/background/junit3/src/org/mozilla/gecko/background/sync/helpers/ExpectStoreCompletedDelegate.java
+++ b/mobile/android/tests/background/junit3/src/org/mozilla/gecko/background/sync/helpers/ExpectStoreCompletedDelegate.java
@@ -6,12 +6,12 @@ package org.mozilla.gecko.background.syn
public class ExpectStoreCompletedDelegate extends DefaultStoreDelegate {
@Override
public void onRecordStoreSucceeded(String guid) {
// That's fine.
}
@Override
- public void onStoreCompleted(long storeEnd) {
+ public void onStoreCompleted() {
performNotify();
}
}
--- a/mobile/android/tests/background/junit3/src/org/mozilla/gecko/background/sync/helpers/ExpectStoredDelegate.java
+++ b/mobile/android/tests/background/junit3/src/org/mozilla/gecko/background/sync/helpers/ExpectStoredDelegate.java
@@ -11,17 +11,17 @@ public class ExpectStoredDelegate extend
String expectedGUID;
String storedGuid;
public ExpectStoredDelegate(String guid) {
this.expectedGUID = guid;
}
@Override
- public synchronized void onStoreCompleted(long storeEnd) {
+ public synchronized void onStoreCompleted() {
try {
assertNotNull(storedGuid);
performNotify();
} catch (AssertionFailedError e) {
performNotify("GUID " + this.expectedGUID + " was not stored", e);
}
}
--- a/mobile/android/tests/background/junit3/src/org/mozilla/gecko/background/testhelpers/WBORepository.java
+++ b/mobile/android/tests/background/junit3/src/org/mozilla/gecko/background/testhelpers/WBORepository.java
@@ -81,45 +81,48 @@ public class WBORepository extends Repos
Logger.debug(LOG_TAG, "Excluding record " + record.guid);
continue;
}
delegate.deferredFetchDelegate(delegateExecutor).onFetchedRecord(record);
}
}
long fetchCompleted = now();
stats.fetchCompleted = fetchCompleted;
- delegate.deferredFetchDelegate(delegateExecutor).onFetchCompleted(fetchCompleted);
+ setLastFetchTimestamp(fetchCompleted);
+ delegate.deferredFetchDelegate(delegateExecutor).onFetchCompleted();
}
@Override
public void fetch(final String[] guids,
final RepositorySessionFetchRecordsDelegate delegate) {
long fetchBegan = now();
stats.fetchBegan = fetchBegan;
for (String guid : guids) {
if (wbos.containsKey(guid)) {
delegate.deferredFetchDelegate(delegateExecutor).onFetchedRecord(wbos.get(guid));
}
}
long fetchCompleted = now();
stats.fetchCompleted = fetchCompleted;
- delegate.deferredFetchDelegate(delegateExecutor).onFetchCompleted(fetchCompleted);
+ setLastFetchTimestamp(fetchCompleted);
+ delegate.deferredFetchDelegate(delegateExecutor).onFetchCompleted();
}
@Override
public void fetchAll(final RepositorySessionFetchRecordsDelegate delegate) {
long fetchBegan = now();
stats.fetchBegan = fetchBegan;
for (Entry<String, Record> entry : wbos.entrySet()) {
Record record = entry.getValue();
delegate.deferredFetchDelegate(delegateExecutor).onFetchedRecord(record);
}
long fetchCompleted = now();
stats.fetchCompleted = fetchCompleted;
- delegate.deferredFetchDelegate(delegateExecutor).onFetchCompleted(fetchCompleted);
+ setLastFetchTimestamp(fetchCompleted);
+ delegate.deferredFetchDelegate(delegateExecutor).onFetchCompleted();
}
@Override
public void store(final Record record) throws NoStoreDelegateException {
if (storeDelegate == null) {
throw new NoStoreDelegateException();
}
final long now = now();
@@ -183,17 +186,18 @@ public class WBORepository extends Repos
public void storeDone() {
// TODO: this is not guaranteed to be called after all of the record
// store callbacks have completed!
final long end = now();
if (stats.storeBegan < 0) {
stats.storeBegan = end;
}
stats.storeCompleted = end;
- storeDelegate.deferredStoreDelegate(delegateExecutor).onStoreCompleted(end);
+ setLastStoreTimestamp(end);
+ storeDelegate.deferredStoreDelegate(delegateExecutor).onStoreCompleted();
}
}
public ConcurrentHashMap<String, Record> wbos;
public WBORepository(boolean bumpTimestamps) {
super();
this.bumpTimestamps = bumpTimestamps;
--- a/mobile/android/tests/background/junit4/src/org/mozilla/android/sync/test/SynchronizerHelpers.java
+++ b/mobile/android/tests/background/junit4/src/org/mozilla/android/sync/test/SynchronizerHelpers.java
@@ -74,18 +74,18 @@ public class SynchronizerHelpers {
}
@Override
public void onFetchFailed(Exception ex) {
delegate.onFetchFailed(ex);
}
@Override
- public void onFetchCompleted(long fetchEnd) {
- delegate.onFetchCompleted(fetchEnd);
+ public void onFetchCompleted() {
+ delegate.onFetchCompleted();
}
@Override
public void onBatchCompleted() {
}
@Override
@@ -197,17 +197,18 @@ public class SynchronizerHelpers {
synchronized (batch) {
flush();
// Do this in a Runnable so that the timestamp is grabbed after any upload.
final Runnable r = new Runnable() {
@Override
public void run() {
synchronized (batch) {
Logger.trace("XXX", "Calling storeDone.");
- storeDelegate.onStoreCompleted(now());
+ setLastStoreTimestamp(now());
+ storeDelegate.onStoreCompleted();
}
}
};
storeWorkQueue.execute(r);
}
}
}
public BatchFailStoreWBORepository(int batchSize) {
--- a/mobile/android/tests/background/junit4/src/org/mozilla/android/sync/test/TestRecordsChannel.java
+++ b/mobile/android/tests/background/junit4/src/org/mozilla/android/sync/test/TestRecordsChannel.java
@@ -72,18 +72,17 @@ public class TestRecordsChannel {
@Override
public void onFlowStoreFailed(RecordsChannel recordsChannel, Exception ex, String recordGuid) {
numFlowStoreFailed.incrementAndGet();
storeException = ex;
}
@Override
- @Override
- public void onFlowCompleted(RecordsChannel recordsChannel, long fetchEnd, long storeEnd) {
+ public void onFlowCompleted(RecordsChannel recordsChannel) {
numFlowCompleted.incrementAndGet();
try {
sinkSession.finish(new ExpectSuccessRepositorySessionFinishDelegate(WaitHelper.getTestWaiter()) {
@Override
public void onFinishSucceeded(RepositorySession session, RepositorySessionBundle bundle) {
try {
sourceSession.finish(new ExpectSuccessRepositorySessionFinishDelegate(WaitHelper.getTestWaiter()) {
@Override
--- a/mobile/android/tests/background/junit4/src/org/mozilla/android/sync/test/TestSynchronizerSession.java
+++ b/mobile/android/tests/background/junit4/src/org/mozilla/android/sync/test/TestSynchronizerSession.java
@@ -227,20 +227,20 @@ public class TestSynchronizerSession {
// Didn't lose any records.
assertFirstContainsSecond(repoA.wbos, originalWbosA);
assertFirstContainsSecond(repoB.wbos, originalWbosB);
// Didn't get records we shouldn't have fetched.
assertFirstDoesNotContainSecond(repoA.wbos, originalWbosB);
assertFirstDoesNotContainSecond(repoB.wbos, originalWbosA);
- // Timestamps updated.
+ // Timestamps not updated.
SynchronizerConfiguration sc = syncSession.getSynchronizer().save();
- TestSynchronizer.assertInRangeInclusive(before, sc.localBundle.getTimestamp(), after);
- TestSynchronizer.assertInRangeInclusive(before, sc.remoteBundle.getTimestamp(), after);
+ assertEquals(0L, sc.localBundle.getTimestamp());
+ assertEquals(0L, sc.remoteBundle.getTimestamp());
}
protected void doSkipTest(boolean remoteShouldSkip, boolean localShouldSkip) {
repoA = new ShouldSkipWBORepository(remoteShouldSkip);
repoB = new ShouldSkipWBORepository(localShouldSkip);
Synchronizer synchronizer = new Synchronizer();
synchronizer.repositoryA = repoA;
--- a/mobile/android/tests/background/junit4/src/org/mozilla/android/sync/test/helpers/ExpectSuccessRepositorySessionFetchRecordsDelegate.java
+++ b/mobile/android/tests/background/junit4/src/org/mozilla/android/sync/test/helpers/ExpectSuccessRepositorySessionFetchRecordsDelegate.java
@@ -27,17 +27,17 @@ public class ExpectSuccessRepositorySess
@Override
public void onFetchedRecord(Record record) {
fetchedRecords.add(record);
log("Fetched record with guid '" + record.guid + "'.");
}
@Override
- public void onFetchCompleted(long end) {
+ public void onFetchCompleted() {
log("Fetch completed.");
performNotify();
}
@Override
public void onBatchCompleted() {
log("Batch completed.");
}
--- a/mobile/android/tests/background/junit4/src/org/mozilla/android/sync/test/helpers/ExpectSuccessRepositorySessionStoreDelegate.java
+++ b/mobile/android/tests/background/junit4/src/org/mozilla/android/sync/test/helpers/ExpectSuccessRepositorySessionStoreDelegate.java
@@ -23,18 +23,18 @@ public class ExpectSuccessRepositorySess
}
@Override
public void onRecordStoreSucceeded(String guid) {
log("Record store succeeded.");
}
@Override
- public void onStoreCompleted(long storeEnd) {
- log("Record store completed at " + storeEnd);
+ public void onStoreCompleted() {
+ log("Record store completed");
}
@Override
public void onStoreFailed(Exception e) {
log("Store failed.", e);
performNotify(new AssertionFailedError("onStoreFailed: store should not have failed."));
}
--- a/mobile/android/tests/background/junit4/src/org/mozilla/gecko/background/testhelpers/WBORepository.java
+++ b/mobile/android/tests/background/junit4/src/org/mozilla/gecko/background/testhelpers/WBORepository.java
@@ -80,45 +80,48 @@ public class WBORepository extends Repos
Logger.debug(LOG_TAG, "Excluding record " + record.guid);
continue;
}
delegate.deferredFetchDelegate(delegateExecutor).onFetchedRecord(record);
}
}
long fetchCompleted = now();
stats.fetchCompleted = fetchCompleted;
- delegate.deferredFetchDelegate(delegateExecutor).onFetchCompleted(fetchCompleted);
+ setLastFetchTimestamp(fetchCompleted);
+ delegate.deferredFetchDelegate(delegateExecutor).onFetchCompleted();
}
@Override
public void fetch(final String[] guids,
final RepositorySessionFetchRecordsDelegate delegate) {
long fetchBegan = now();
stats.fetchBegan = fetchBegan;
for (String guid : guids) {
if (wbos.containsKey(guid)) {
delegate.deferredFetchDelegate(delegateExecutor).onFetchedRecord(wbos.get(guid));
}
}
long fetchCompleted = now();
stats.fetchCompleted = fetchCompleted;
- delegate.deferredFetchDelegate(delegateExecutor).onFetchCompleted(fetchCompleted);
+ setLastFetchTimestamp(fetchCompleted);
+ delegate.deferredFetchDelegate(delegateExecutor).onFetchCompleted();
}
@Override
public void fetchAll(final RepositorySessionFetchRecordsDelegate delegate) {
long fetchBegan = now();
stats.fetchBegan = fetchBegan;
for (Entry<String, Record> entry : wbos.entrySet()) {
Record record = entry.getValue();
delegate.deferredFetchDelegate(delegateExecutor).onFetchedRecord(record);
}
long fetchCompleted = now();
stats.fetchCompleted = fetchCompleted;
- delegate.deferredFetchDelegate(delegateExecutor).onFetchCompleted(fetchCompleted);
+ setLastFetchTimestamp(fetchCompleted);
+ delegate.deferredFetchDelegate(delegateExecutor).onFetchCompleted();
}
@Override
public void store(final Record record) throws NoStoreDelegateException {
if (storeDelegate == null) {
throw new NoStoreDelegateException();
}
final long now = now();
@@ -182,17 +185,18 @@ public class WBORepository extends Repos
public void storeDone() {
// TODO: this is not guaranteed to be called after all of the record
// store callbacks have completed!
final long end = now();
if (stats.storeBegan < 0) {
stats.storeBegan = end;
}
stats.storeCompleted = end;
- storeDelegate.deferredStoreDelegate(delegateExecutor).onStoreCompleted(end);
+ setLastStoreTimestamp(end);
+ storeDelegate.deferredStoreDelegate(delegateExecutor).onStoreCompleted();
}
}
public ConcurrentHashMap<String, Record> wbos;
public WBORepository(boolean bumpTimestamps) {
super();
this.bumpTimestamps = bumpTimestamps;
--- a/mobile/android/tests/background/junit4/src/org/mozilla/gecko/sync/repositories/downloaders/BatchingDownloaderDelegateTest.java
+++ b/mobile/android/tests/background/junit4/src/org/mozilla/gecko/sync/repositories/downloaders/BatchingDownloaderDelegateTest.java
@@ -91,17 +91,17 @@ public class BatchingDownloaderDelegateT
}
@Override
public void onFetchedRecord(Record record) {
}
@Override
- public void onFetchCompleted(long fetchEnd) {
+ public void onFetchCompleted() {
}
@Override
public void onBatchCompleted() {
}
--- a/mobile/android/tests/background/junit4/src/org/mozilla/gecko/sync/repositories/downloaders/BatchingDownloaderTest.java
+++ b/mobile/android/tests/background/junit4/src/org/mozilla/gecko/sync/repositories/downloaders/BatchingDownloaderTest.java
@@ -134,17 +134,17 @@ public class BatchingDownloaderTest {
@Override
public void onFetchedRecord(Record record) {
this.isFetched = true;
this.record = record;
}
@Override
- public void onFetchCompleted(long fetchEnd) {
+ public void onFetchCompleted() {
this.isSuccess = true;
}
@Override
public void onBatchCompleted() {
this.batchesCompleted += 1;
}
--- a/mobile/android/tests/background/junit4/src/org/mozilla/gecko/sync/repositories/uploaders/BatchingUploaderTest.java
+++ b/mobile/android/tests/background/junit4/src/org/mozilla/gecko/sync/repositories/uploaders/BatchingUploaderTest.java
@@ -139,17 +139,17 @@ public class BatchingUploaderTest {
}
@Override
public void onRecordStoreSucceeded(String guid) {
++recordStoreSucceeded;
}
@Override
- public void onStoreCompleted(long storeEnd) {
+ public void onStoreCompleted() {
++storeCompleted;
}
@Override
public void onStoreFailed(Exception e) {
lastStoreFailedException = e;
}
--- a/mobile/android/tests/background/junit4/src/org/mozilla/gecko/sync/repositories/uploaders/PayloadUploadDelegateTest.java
+++ b/mobile/android/tests/background/junit4/src/org/mozilla/gecko/sync/repositories/uploaders/PayloadUploadDelegateTest.java
@@ -96,19 +96,17 @@ public class PayloadUploadDelegateTest {
}
@Override
public void onRecordStoreSucceeded(String guid) {
succeededGuids.add(guid);
}
@Override
- public void onStoreCompleted(long storeEnd) {
-
- }
+ public void onStoreCompleted() {}
@Override
public void onStoreFailed(Exception e) {
storeFailedException = e;
}
@Override
public void onRecordStoreReconciled(String guid, String oldGuid, Integer newVersion) {