Bug 1291821 - Wrap local repositories in buffering middleware r=rnewman
History stage does not wrap history respository in a buffer, because we'd like to
use a high-water-mark and offset resuming later on, and using a persistent buffer
for this stage does not make sense.
MozReview-Commit-ID: FS1swml2bIC
--- a/mobile/android/services/src/main/java/org/mozilla/gecko/sync/repositories/android/AndroidBrowserHistoryRepositorySession.java
+++ b/mobile/android/services/src/main/java/org/mozilla/gecko/sync/repositories/android/AndroidBrowserHistoryRepositorySession.java
@@ -186,23 +186,28 @@ public class AndroidBrowserHistoryReposi
}
trackRecord(succeeded);
storeDelegate.onRecordStoreSucceeded(succeeded.guid); // At this point, we are really inserted.
}
}
@Override
public void storeDone() {
+ storeDone(System.currentTimeMillis());
+ }
+
+ @Override
+ public void storeDone(final long end) {
storeWorkQueue.execute(new Runnable() {
@Override
public void run() {
synchronized (recordsBufferMonitor) {
try {
flushNewRecords();
} catch (Exception e) {
Logger.warn(LOG_TAG, "Error flushing records to database.", e);
}
}
- storeDone(System.currentTimeMillis());
+ AndroidBrowserHistoryRepositorySession.super.storeDone(end);
}
});
}
}
--- a/mobile/android/services/src/main/java/org/mozilla/gecko/sync/stage/AndroidBrowserBookmarksServerSyncStage.java
+++ b/mobile/android/services/src/main/java/org/mozilla/gecko/sync/stage/AndroidBrowserBookmarksServerSyncStage.java
@@ -2,16 +2,18 @@
* License, v. 2.0. If a copy of the MPL was not distributed with this
* file, You can obtain one at http://mozilla.org/MPL/2.0/. */
package org.mozilla.gecko.sync.stage;
import java.net.URISyntaxException;
import org.mozilla.gecko.sync.MetaGlobalException;
+import org.mozilla.gecko.sync.middleware.BufferingMiddlewareRepository;
+import org.mozilla.gecko.sync.middleware.storage.MemoryBufferStorage;
import org.mozilla.gecko.sync.repositories.ConstrainedServer11Repository;
import org.mozilla.gecko.sync.repositories.RecordFactory;
import org.mozilla.gecko.sync.repositories.Repository;
import org.mozilla.gecko.sync.repositories.android.AndroidBrowserBookmarksRepository;
import org.mozilla.gecko.sync.repositories.domain.BookmarkRecordFactory;
import org.mozilla.gecko.sync.repositories.domain.VersionConstants;
public class AndroidBrowserBookmarksServerSyncStage extends ServerSyncStage {
@@ -48,17 +50,21 @@ public class AndroidBrowserBookmarksServ
session.config.infoConfiguration,
BOOKMARKS_BATCH_LIMIT,
BOOKMARKS_SORT,
true /* allow multiple batches */);
}
@Override
protected Repository getLocalRepository() {
- return new AndroidBrowserBookmarksRepository();
+ return new BufferingMiddlewareRepository(
+ session.getSyncDeadline(),
+ new MemoryBufferStorage(),
+ new AndroidBrowserBookmarksRepository()
+ );
}
@Override
protected RecordFactory getRecordFactory() {
return new BookmarkRecordFactory();
}
@Override
--- a/mobile/android/services/src/main/java/org/mozilla/gecko/sync/stage/FennecTabsServerSyncStage.java
+++ b/mobile/android/services/src/main/java/org/mozilla/gecko/sync/stage/FennecTabsServerSyncStage.java
@@ -1,14 +1,16 @@
/* This Source Code Form is subject to the terms of the Mozilla Public
* License, v. 2.0. If a copy of the MPL was not distributed with this
* file, You can obtain one at http://mozilla.org/MPL/2.0/. */
package org.mozilla.gecko.sync.stage;
+import org.mozilla.gecko.sync.middleware.BufferingMiddlewareRepository;
+import org.mozilla.gecko.sync.middleware.storage.MemoryBufferStorage;
import org.mozilla.gecko.sync.repositories.RecordFactory;
import org.mozilla.gecko.sync.repositories.Repository;
import org.mozilla.gecko.sync.repositories.android.FennecTabsRepository;
import org.mozilla.gecko.sync.repositories.domain.TabsRecordFactory;
import org.mozilla.gecko.sync.repositories.domain.VersionConstants;
public class FennecTabsServerSyncStage extends ServerSyncStage {
private static final String COLLECTION = "tabs";
@@ -25,16 +27,20 @@ public class FennecTabsServerSyncStage e
@Override
public Integer getStorageVersion() {
return VersionConstants.TABS_ENGINE_VERSION;
}
@Override
protected Repository getLocalRepository() {
- return new FennecTabsRepository(session.getClientsDelegate());
+ return new BufferingMiddlewareRepository(
+ session.getSyncDeadline(),
+ new MemoryBufferStorage(),
+ new FennecTabsRepository(session.getClientsDelegate())
+ );
}
@Override
protected RecordFactory getRecordFactory() {
return new TabsRecordFactory();
}
}
--- a/mobile/android/services/src/main/java/org/mozilla/gecko/sync/stage/FormHistoryServerSyncStage.java
+++ b/mobile/android/services/src/main/java/org/mozilla/gecko/sync/stage/FormHistoryServerSyncStage.java
@@ -2,16 +2,18 @@
* License, v. 2.0. If a copy of the MPL was not distributed with this
* file, You can obtain one at http://mozilla.org/MPL/2.0/. */
package org.mozilla.gecko.sync.stage;
import java.net.URISyntaxException;
import org.mozilla.gecko.sync.CryptoRecord;
+import org.mozilla.gecko.sync.middleware.BufferingMiddlewareRepository;
+import org.mozilla.gecko.sync.middleware.storage.MemoryBufferStorage;
import org.mozilla.gecko.sync.repositories.ConstrainedServer11Repository;
import org.mozilla.gecko.sync.repositories.RecordFactory;
import org.mozilla.gecko.sync.repositories.Repository;
import org.mozilla.gecko.sync.repositories.android.FormHistoryRepositorySession;
import org.mozilla.gecko.sync.repositories.domain.FormHistoryRecord;
import org.mozilla.gecko.sync.repositories.domain.Record;
import org.mozilla.gecko.sync.repositories.domain.VersionConstants;
@@ -49,17 +51,21 @@ public class FormHistoryServerSyncStage
session.config.infoConfiguration,
FORM_HISTORY_BATCH_LIMIT,
FORM_HISTORY_SORT,
true /* allow multiple batches */);
}
@Override
protected Repository getLocalRepository() {
- return new FormHistoryRepositorySession.FormHistoryRepository();
+ return new BufferingMiddlewareRepository(
+ session.getSyncDeadline(),
+ new MemoryBufferStorage(),
+ new FormHistoryRepositorySession.FormHistoryRepository()
+ );
}
public class FormHistoryRecordFactory extends RecordFactory {
@Override
public Record createRecord(Record record) {
FormHistoryRecord r = new FormHistoryRecord();
r.initFromEnvelope((CryptoRecord) record);
--- a/mobile/android/services/src/main/java/org/mozilla/gecko/sync/stage/PasswordsServerSyncStage.java
+++ b/mobile/android/services/src/main/java/org/mozilla/gecko/sync/stage/PasswordsServerSyncStage.java
@@ -1,14 +1,16 @@
/* This Source Code Form is subject to the terms of the Mozilla Public
* License, v. 2.0. If a copy of the MPL was not distributed with this
* file, You can obtain one at http://mozilla.org/MPL/2.0/. */
package org.mozilla.gecko.sync.stage;
+import org.mozilla.gecko.sync.middleware.BufferingMiddlewareRepository;
+import org.mozilla.gecko.sync.middleware.storage.MemoryBufferStorage;
import org.mozilla.gecko.sync.repositories.RecordFactory;
import org.mozilla.gecko.sync.repositories.Repository;
import org.mozilla.gecko.sync.repositories.android.PasswordsRepositorySession;
import org.mozilla.gecko.sync.repositories.domain.PasswordRecordFactory;
import org.mozilla.gecko.sync.repositories.domain.VersionConstants;
public class PasswordsServerSyncStage extends ServerSyncStage {
@Override
@@ -23,16 +25,20 @@ public class PasswordsServerSyncStage ex
@Override
public Integer getStorageVersion() {
return VersionConstants.PASSWORDS_ENGINE_VERSION;
}
@Override
protected Repository getLocalRepository() {
- return new PasswordsRepositorySession.PasswordsRepository();
+ return new BufferingMiddlewareRepository(
+ session.getSyncDeadline(),
+ new MemoryBufferStorage(),
+ new PasswordsRepositorySession.PasswordsRepository()
+ );
}
@Override
protected RecordFactory getRecordFactory() {
return new PasswordRecordFactory();
}
}
--- a/mobile/android/services/src/main/java/org/mozilla/gecko/sync/synchronizer/RecordsChannel.java
+++ b/mobile/android/services/src/main/java/org/mozilla/gecko/sync/synchronizer/RecordsChannel.java
@@ -165,17 +165,26 @@ public class RecordsChannel implements
numFetched.set(0);
numFetchFailed.set(0);
numStored.set(0);
numStoreFailed.set(0);
// Start a consumer thread.
this.consumer = new ConcurrentRecordConsumer(this);
ThreadPool.run(this.consumer);
waitingForQueueDone = true;
- source.fetchSince(source.getLastSyncTimestamp(), this);
+
+ // Fetch all records that were modified since our previous flow. If our previous flow succeeded,
+ // we will use source's last-sync timestamp. If our previous flow didn't complete, resume it,
+ // starting from sink's high water mark timestamp.
+ // If there was no previous flow (first sync, or data was cleared...), fetch everything.
+ // Resuming a flow is supported for buffered RepositorySessions. We degrade gracefully otherwise.
+ final long highWaterMark = sink.getHighWaterMarkTimestamp();
+ final long lastSync = source.getLastSyncTimestamp();
+ final long sinceTimestamp = Math.max(highWaterMark, lastSync);
+ source.fetchSince(sinceTimestamp, this);
}
/**
* Begin both sessions, invoking flow() when done.
* @throws InvalidSessionTransitionException
*/
public void beginAndFlow() throws InvalidSessionTransitionException {
Logger.trace(LOG_TAG, "Beginning source.");