--- a/mobile/android/base/android-services.mozbuild
+++ b/mobile/android/base/android-services.mozbuild
@@ -896,20 +896,24 @@ sync_java_files = [TOPSRCDIR + '/mobile/
'sync/InfoConfiguration.java',
'sync/InfoCounts.java',
'sync/JSONRecordFetcher.java',
'sync/KeyBundleProvider.java',
'sync/MetaGlobal.java',
'sync/MetaGlobalException.java',
'sync/MetaGlobalMissingEnginesException.java',
'sync/MetaGlobalNotSetException.java',
+ 'sync/middleware/BufferingMiddlewareRepository.java',
+ 'sync/middleware/BufferingMiddlewareRepositorySession.java',
'sync/middleware/Crypto5MiddlewareRepository.java',
'sync/middleware/Crypto5MiddlewareRepositorySession.java',
'sync/middleware/MiddlewareRepository.java',
'sync/middleware/MiddlewareRepositorySession.java',
+ 'sync/middleware/storage/BufferStorage.java',
+ 'sync/middleware/storage/MemoryBufferStorage.java',
'sync/net/AbstractBearerTokenAuthHeaderProvider.java',
'sync/net/AuthHeaderProvider.java',
'sync/net/BaseResource.java',
'sync/net/BaseResourceDelegate.java',
'sync/net/BasicAuthHeaderProvider.java',
'sync/net/BearerAuthHeaderProvider.java',
'sync/net/BrowserIDAuthHeaderProvider.java',
'sync/net/ConnectionMonitorThread.java',
new file mode 100644
--- /dev/null
+++ b/mobile/android/services/src/main/java/org/mozilla/gecko/sync/middleware/BufferingMiddlewareRepository.java
@@ -0,0 +1,60 @@
+/* This Source Code Form is subject to the terms of the Mozilla Public
+ * License, v. 2.0. If a copy of the MPL was not distributed with this
+ * file, You can obtain one at http://mozilla.org/MPL/2.0/. */
+
+package org.mozilla.gecko.sync.middleware;
+
+import android.content.Context;
+
+import org.mozilla.gecko.sync.middleware.storage.BufferStorage;
+import org.mozilla.gecko.sync.repositories.Repository;
+import org.mozilla.gecko.sync.repositories.RepositorySession;
+import org.mozilla.gecko.sync.repositories.delegates.RepositorySessionCreationDelegate;
+
+/**
+ * A buffering-enabled middleware which is intended to wrap local repositories. Configurable with
+ * a sync deadline, buffer storage implementation and a consistency checker implementation.
+ *
+ * @author grisha
+ */
+public class BufferingMiddlewareRepository extends MiddlewareRepository {
+ private final long syncDeadline;
+ private final Repository inner;
+ private final BufferStorage bufferStorage;
+
+ private class BufferingMiddlewareRepositorySessionCreationDelegate extends MiddlewareRepository.SessionCreationDelegate {
+ private final BufferingMiddlewareRepository repository;
+ private final RepositorySessionCreationDelegate outerDelegate;
+
+ private BufferingMiddlewareRepositorySessionCreationDelegate(BufferingMiddlewareRepository repository, RepositorySessionCreationDelegate outerDelegate) {
+ this.repository = repository;
+ this.outerDelegate = outerDelegate;
+ }
+
+ @Override
+ public void onSessionCreateFailed(Exception ex) {
+ this.outerDelegate.onSessionCreateFailed(ex);
+ }
+
+ @Override
+ public void onSessionCreated(RepositorySession session) {
+ outerDelegate.onSessionCreated(new BufferingMiddlewareRepositorySession(
+ session, this.repository, syncDeadline, bufferStorage
+ ));
+ }
+ }
+
+ public BufferingMiddlewareRepository(long syncDeadline, BufferStorage bufferStore, Repository wrappedRepository) {
+ this.syncDeadline = syncDeadline;
+ this.inner = wrappedRepository;
+ this.bufferStorage = bufferStore;
+ }
+
+ @Override
+ public void createSession(RepositorySessionCreationDelegate delegate, Context context) {
+ this.inner.createSession(
+ new BufferingMiddlewareRepositorySessionCreationDelegate(this, delegate),
+ context
+ );
+ }
+}
new file mode 100644
--- /dev/null
+++ b/mobile/android/services/src/main/java/org/mozilla/gecko/sync/middleware/BufferingMiddlewareRepositorySession.java
@@ -0,0 +1,193 @@
+/* This Source Code Form is subject to the terms of the Mozilla Public
+ * License, v. 2.0. If a copy of the MPL was not distributed with this
+ * file, You can obtain one at http://mozilla.org/MPL/2.0/. */
+
+package org.mozilla.gecko.sync.middleware;
+
+import android.os.SystemClock;
+import android.support.annotation.VisibleForTesting;
+
+import org.mozilla.gecko.sync.middleware.storage.BufferStorage;
+import org.mozilla.gecko.sync.repositories.InactiveSessionException;
+import org.mozilla.gecko.sync.repositories.NoStoreDelegateException;
+import org.mozilla.gecko.sync.repositories.RepositorySession;
+import org.mozilla.gecko.sync.repositories.delegates.RepositorySessionFetchRecordsDelegate;
+import org.mozilla.gecko.sync.repositories.delegates.RepositorySessionStoreDelegate;
+import org.mozilla.gecko.sync.repositories.domain.Record;
+
+import java.util.Collection;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+
+/**
+ * Buffering middleware which is intended to wrap local RepositorySessions.
+ *
+ * Configure it:
+ * - with an appropriate BufferStore (in-memory, record-type-aware database-backed, etc).
+ *
+ * Fetch is pass-through, store is buffered.
+ *
+ * @author grisha
+ */
+/* package-local */ class BufferingMiddlewareRepositorySession extends MiddlewareRepositorySession {
+ private final BufferStorage bufferStorage;
+ private final long syncDeadlineMillis;
+
+ private ExecutorService storeDelegateExecutor = Executors.newSingleThreadExecutor();
+
+ private volatile boolean storeMarkedIncomplete = false;
+
+ /* package-local */ BufferingMiddlewareRepositorySession(
+ RepositorySession repositorySession, MiddlewareRepository repository,
+ long syncDeadlineMillis, BufferStorage bufferStorage) {
+ super(repositorySession, repository);
+ this.syncDeadlineMillis = syncDeadlineMillis;
+ this.bufferStorage = bufferStorage;
+ }
+
+ @Override
+ public void fetchSince(long timestamp, RepositorySessionFetchRecordsDelegate delegate) {
+ this.inner.fetchSince(timestamp, delegate);
+ }
+
+ @Override
+ public void fetch(String[] guids, RepositorySessionFetchRecordsDelegate delegate) throws InactiveSessionException {
+ this.inner.fetch(guids, delegate);
+ }
+
+ @Override
+ public void fetchAll(RepositorySessionFetchRecordsDelegate delegate) {
+ this.inner.fetchAll(delegate);
+ }
+
+ /**
+ * Will be called when this repository is acting as a `source`, and a flow of records into `sink`
+ * was completed. That is, we've uploaded merged records to the server, so now is a good time
+ * to clean up our buffer for this repository.
+ */
+ @Override
+ public void performCleanup() {
+ bufferStorage.clear();
+ }
+
+ @Override
+ public void store(Record record) throws NoStoreDelegateException {
+ bufferStorage.addOrReplace(record);
+ }
+
+ @Override
+ public void storeIncomplete() {
+ storeMarkedIncomplete = true;
+ }
+
+ @Override
+ public void storeDone() {
+ storeDone(System.currentTimeMillis());
+ }
+
+ @Override
+ public void storeFlush() {
+ bufferStorage.flush();
+ }
+
+ @Override
+ public void storeDone(final long end) {
+ doStoreDonePrepare();
+
+ // Determine if we have enough time to perform consistency checks on the buffered data and
+ // then store it. If we don't have enough time now, we keep our buffer and try again later.
+ // We don't store results of a buffer consistency check anywhere, so we can't treat it
+ // separately from storage.
+ if (storeMarkedIncomplete || !mayProceedToMergeBuffer()) {
+ super.abort();
+ storeDelegate.deferredStoreDelegate(storeDelegateExecutor).onStoreCompleted(end);
+ return;
+ }
+
+ // Separate actual merge, so that it may be tested without involving system clock.
+ doStoreDone(end);
+ }
+
+ @VisibleForTesting
+ public void doStoreDonePrepare() {
+ // Now that records stopped flowing, persist them.
+ bufferStorage.flush();
+ }
+
+ @VisibleForTesting
+ public void doStoreDone(final long end) {
+ final Collection<Record> buffer = bufferStorage.all();
+
+ // Trivial case of an empty buffer.
+ if (buffer.isEmpty()) {
+ super.storeDone(end);
+ return;
+ }
+
+ // Flush our buffer to the wrapped local repository. Data goes live!
+ try {
+ for (Record record : buffer) {
+ this.inner.store(record);
+ }
+ } catch (NoStoreDelegateException e) {
+ // At this point we should have a delegate, so this won't happen.
+ }
+
+ // And, we're done!
+ super.storeDone(end);
+ }
+
+ /**
+ * When source fails to provide more records, we need to decide what to do with the buffer.
+ * We might fail because of a network partition, or because of a concurrent modification of a
+ * collection. Either way we do not clear the buffer in a general case. If a collection has been
+ * modified, affected records' last-modified timestamps will be bumped, and we will receive those
+ * records during the next sync. If we already have them in our buffer, we replace our now-old
+ * copy. Otherwise, they are new records and we just append them.
+ *
+ * We depend on GUIDs to be a primary key for incoming records.
+ *
+ * @param e indicates reason of failure.
+ */
+ @Override
+ public void sourceFailed(Exception e) {
+ bufferStorage.flush();
+ super.sourceFailed(e);
+ }
+
+ /**
+ * Session abnormally aborted. This doesn't mean our so-far buffered data is invalid.
+ * Clean up after ourselves, if there's anything to clean up.
+ */
+ @Override
+ public void abort() {
+ bufferStorage.flush();
+ super.abort();
+ }
+
+ @Override
+ public void setStoreDelegate(RepositorySessionStoreDelegate delegate) {
+ inner.setStoreDelegate(delegate);
+ this.storeDelegate = delegate;
+ }
+
+ @Override
+ public long getHighWaterMarkTimestamp() {
+ return bufferStorage.latestModifiedTimestamp();
+ }
+
+ private boolean mayProceedToMergeBuffer() {
+ // If our buffer storage is not persistent, disallowing merging after buffer has been filled
+ // means throwing away records only to re-download them later.
+ // In this case allow merge to proceed even if we're past the deadline.
+ if (!bufferStorage.isPersistent()) {
+ return true;
+ }
+
+ // While actual runtime of a merge operation is a function of record type, buffer size, etc.,
+ // let's do a simple thing for now and say that we may proceed if we have couple of minutes
+ // of runtime left. That surely is enough, right?
+ final long timeLeftMillis = syncDeadlineMillis - SystemClock.elapsedRealtime();
+ return timeLeftMillis > 1000 * 60 * 2;
+ }
+}
new file mode 100644
--- /dev/null
+++ b/mobile/android/services/src/main/java/org/mozilla/gecko/sync/middleware/storage/BufferStorage.java
@@ -0,0 +1,35 @@
+/* This Source Code Form is subject to the terms of the Mozilla Public
+ * License, v. 2.0. If a copy of the MPL was not distributed with this
+ * file, You can obtain one at http://mozilla.org/MPL/2.0/. */
+
+package org.mozilla.gecko.sync.middleware.storage;
+
+import org.mozilla.gecko.sync.repositories.domain.Record;
+
+import java.util.Collection;
+
+/**
+ * A contract between BufferingMiddleware and specific storage implementations.
+ *
+ * @author grisha
+ */
+public interface BufferStorage {
+ // Returns all of the records currently present in the buffer.
+ Collection<Record> all();
+
+ // Implementations are responsible to ensure that any incoming records with duplicate GUIDs replace
+ // what's already present in the storage layer.
+ // NB: For a database-backed storage, "replace" happens at a transaction level.
+ void addOrReplace(Record record);
+
+ // For database-backed implementations, commits any records that came in up to this point.
+ void flush();
+
+ void clear();
+
+ // For buffers that are filled up oldest-first this is a high water mark, which enables resuming
+ // a sync.
+ long latestModifiedTimestamp();
+
+ boolean isPersistent();
+}
new file mode 100644
--- /dev/null
+++ b/mobile/android/services/src/main/java/org/mozilla/gecko/sync/middleware/storage/MemoryBufferStorage.java
@@ -0,0 +1,70 @@
+/* This Source Code Form is subject to the terms of the Mozilla Public
+ * License, v. 2.0. If a copy of the MPL was not distributed with this
+ * file, You can obtain one at http://mozilla.org/MPL/2.0/. */
+
+package org.mozilla.gecko.sync.middleware.storage;
+
+import org.mozilla.gecko.sync.repositories.domain.Record;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * A trivial, memory-backed, transient implementation of a BufferStorage.
+ * Its intended use is to buffer syncing of small collections.
+ * Thread-safe.
+ *
+ * @author grisha
+ */
+public class MemoryBufferStorage implements BufferStorage {
+ private final Map<String, Record> recordBuffer = Collections.synchronizedMap(new HashMap<String, Record>());
+
+ @Override
+ public boolean isPersistent() {
+ return false;
+ }
+
+ @Override
+ public Collection<Record> all() {
+ synchronized (recordBuffer) {
+ return new ArrayList<>(recordBuffer.values());
+ }
+ }
+
+ @Override
+ public void addOrReplace(Record record) {
+ recordBuffer.put(record.guid, record);
+ }
+
+ @Override
+ public void flush() {
+ // This is a no-op; flush intended for database-backed stores.
+ }
+
+ @Override
+ public void clear() {
+ recordBuffer.clear();
+ }
+
+ @Override
+ public long latestModifiedTimestamp() {
+ long lastModified = 0;
+
+ synchronized (recordBuffer) {
+ if (recordBuffer.size() == 0) {
+ return lastModified;
+ }
+
+ for (Record record : recordBuffer.values()) {
+ if (record.lastModified > lastModified) {
+ lastModified = record.lastModified;
+ }
+ }
+ }
+
+ return lastModified;
+ }
+}
--- a/mobile/android/services/src/main/java/org/mozilla/gecko/sync/repositories/RepositorySession.java
+++ b/mobile/android/services/src/main/java/org/mozilla/gecko/sync/repositories/RepositorySession.java
@@ -71,16 +71,21 @@ public abstract class RepositorySession
// The time that the last sync on this collection completed, in milliseconds since epoch.
private long lastSyncTimestamp = 0;
public long getLastSyncTimestamp() {
return lastSyncTimestamp;
}
+ // Override this in the buffering wrappers.
+ public long getHighWaterMarkTimestamp() {
+ return 0;
+ }
+
public static long now() {
return System.currentTimeMillis();
}
public RepositorySession(Repository repository) {
this.repository = repository;
}
@@ -141,16 +146,37 @@ public abstract class RepositorySession
@Override
public void run() {
delegate.onStoreCompleted(end);
}
};
storeWorkQueue.execute(command);
}
+ /**
+ * Indicates that a number of records have been stored, more are still to come but after some time,
+ * and now would be a good time to flush records and perform any other similar operations.
+ */
+ public void storeFlush() {
+ }
+
+ /**
+ * During flow of records, indicates that source failed.
+ *
+ * @param e indicates reason of failure.
+ */
+ public void sourceFailed(Exception e) {
+ }
+
+ /**
+ * Indicates that a flow of records have been completed.
+ */
+ public void performCleanup() {
+ }
+
public abstract void wipe(RepositorySessionWipeDelegate delegate);
/**
* Synchronously perform the shared work of beginning. Throws on failure.
* @throws InvalidSessionTransitionException
*
*/
protected void sharedBegin() throws InvalidSessionTransitionException {
--- a/mobile/android/services/src/main/java/org/mozilla/gecko/sync/synchronizer/RecordsChannel.java
+++ b/mobile/android/services/src/main/java/org/mozilla/gecko/sync/synchronizer/RecordsChannel.java
@@ -243,16 +243,19 @@ public class RecordsChannel implements
this.sink.storeDone(); // Now we'll be waiting for onStoreCompleted.
}
}
@Override
public void onStoreCompleted(long storeEnd) {
Logger.trace(LOG_TAG, "onStoreCompleted. Notifying delegate of onFlowCompleted. " +
"Fetch end is " + fetchEnd + ", store end is " + storeEnd);
+ // Source might have used caches used to facilitate flow of records, so now is a good
+ // time to clean up. Particularly pertinent for buffered sources.
+ this.source.performCleanup();
// TODO: synchronize on consumer callback?
delegate.onFlowCompleted(this, fetchEnd, storeEnd);
}
@Override
public void onBeginFailed(Exception ex) {
delegate.onFlowBeginFailed(this, ex);
}
new file mode 100644
--- /dev/null
+++ b/mobile/android/tests/background/junit4/src/org/mozilla/gecko/sync/middleware/BufferingMiddlewareRepositorySessionTest.java
@@ -0,0 +1,234 @@
+/* Any copyright is dedicated to the Public Domain.
+ http://creativecommons.org/publicdomain/zero/1.0/ */
+
+package org.mozilla.gecko.sync.middleware;
+
+import org.junit.Before;
+import org.junit.Test;
+import org.mozilla.gecko.background.testhelpers.MockRecord;
+import org.mozilla.gecko.sync.middleware.storage.BufferStorage;
+import org.mozilla.gecko.sync.middleware.storage.MemoryBufferStorage;
+import org.mozilla.gecko.sync.repositories.Repository;
+import org.mozilla.gecko.sync.repositories.RepositorySession;
+import org.mozilla.gecko.sync.repositories.delegates.RepositorySessionStoreDelegate;
+import org.mozilla.gecko.sync.repositories.domain.Record;
+
+import static org.mockito.Matchers.any;
+import static org.mockito.Mockito.mock;
+
+import static org.junit.Assert.*;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.reset;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+
+public class BufferingMiddlewareRepositorySessionTest {
+ private RepositorySession innerRepositorySession;
+ private BufferingMiddlewareRepositorySession bufferingSession;
+ private BufferingMiddlewareRepositorySession bufferingSessionMocked;
+ private BufferStorage bufferStorage;
+ private BufferStorage bufferStorageMocked;
+
+ @Before
+ public void setUp() throws Exception {
+ BufferingMiddlewareRepository bufferingRepository;
+ Repository innerRepositoy;
+
+ innerRepositoy = mock(Repository.class);
+ innerRepositorySession = mock(RepositorySession.class);
+ bufferingRepository = new BufferingMiddlewareRepository(
+ 0L,
+ new MemoryBufferStorage(),
+ innerRepositoy
+ );
+
+ bufferStorage = new MemoryBufferStorage();
+ bufferStorageMocked = mock(MemoryBufferStorage.class);
+
+ bufferingSession = new BufferingMiddlewareRepositorySession(
+ innerRepositorySession, bufferingRepository, 0L,
+ bufferStorage);
+
+ bufferingSessionMocked = new BufferingMiddlewareRepositorySession(
+ innerRepositorySession, bufferingRepository, 0L,
+ bufferStorageMocked);
+ }
+
+ @Test
+ public void store() throws Exception {
+ assertEquals(0, bufferStorage.all().size());
+
+ MockRecord record = new MockRecord("guid1", null, 1, false);
+ bufferingSession.store(record);
+ assertEquals(1, bufferStorage.all().size());
+
+ MockRecord record1 = new MockRecord("guid2", null, 1, false);
+ bufferingSession.store(record1);
+ assertEquals(2, bufferStorage.all().size());
+ assertEquals(1, bufferStorage.latestModifiedTimestamp());
+
+ // record2 must replace record.
+ MockRecord record2 = new MockRecord("guid1", null, 2, false);
+ bufferingSession.store(record2);
+ assertEquals(2, bufferStorage.all().size());
+ assertEquals(2, bufferStorage.latestModifiedTimestamp());
+
+ // Ensure inner session doesn't see incoming records.
+ verify(innerRepositorySession, never()).store(record);
+ verify(innerRepositorySession, never()).store(record1);
+ verify(innerRepositorySession, never()).store(record2);
+ }
+
+ @Test
+ public void storeDone() throws Exception {
+ // Verify that storage's flush is called.
+ verify(bufferStorageMocked, times(0)).flush();
+ bufferingSessionMocked.doStoreDonePrepare();
+ verify(bufferStorageMocked, times(1)).flush();
+
+ // Trivial case, no records to merge.
+ bufferingSession.doStoreDone(123L);
+ verify(innerRepositorySession, times(1)).storeDone(123L);
+ verify(innerRepositorySession, never()).store(any(Record.class));
+
+ // Reset call counters.
+ reset(innerRepositorySession);
+
+ // Now store some records.
+ MockRecord record = new MockRecord("guid1", null, 1, false);
+ bufferingSession.store(record);
+
+ MockRecord record2 = new MockRecord("guid2", null, 13, false);
+ bufferingSession.store(record2);
+
+ MockRecord record3 = new MockRecord("guid3", null, 5, false);
+ bufferingSession.store(record3);
+
+ // NB: same guid as above.
+ MockRecord record4 = new MockRecord("guid3", null, -1, false);
+ bufferingSession.store(record4);
+
+ // Done storing.
+ bufferingSession.doStoreDone(123L);
+
+ // Ensure all records where stored in the wrapped session.
+ verify(innerRepositorySession, times(1)).store(record);
+ verify(innerRepositorySession, times(1)).store(record2);
+ verify(innerRepositorySession, times(1)).store(record4);
+
+ // Ensure storeDone was called on the wrapped session.
+ verify(innerRepositorySession, times(1)).storeDone(123L);
+
+ // Ensure buffer wasn't cleared on the wrapped session.
+ assertEquals(3, bufferStorage.all().size());
+ }
+
+ @Test
+ public void storeFlush() throws Exception {
+ verify(bufferStorageMocked, times(0)).flush();
+ bufferingSessionMocked.storeFlush();
+ verify(bufferStorageMocked, times(1)).flush();
+ }
+
+ @Test
+ public void performCleanup() throws Exception {
+ // Baseline.
+ assertEquals(0, bufferStorage.all().size());
+
+ // Test that we can call cleanup with an empty buffer storage.
+ bufferingSession.performCleanup();
+ assertEquals(0, bufferStorage.all().size());
+
+ // Store a couple of records.
+ MockRecord record = new MockRecord("guid1", null, 1, false);
+ bufferingSession.store(record);
+
+ MockRecord record2 = new MockRecord("guid2", null, 13, false);
+ bufferingSession.store(record2);
+
+ // Confirm it worked.
+ assertEquals(2, bufferStorage.all().size());
+
+ // Test that buffer storage is cleaned up.
+ bufferingSession.performCleanup();
+ assertEquals(0, bufferStorage.all().size());
+ }
+
+ @Test
+ public void sourceFailed() throws Exception {
+ // Source failes before any records have been stored.
+ bufferingSession.sourceFailed(new Exception());
+ assertEquals(0, bufferStorage.all().size());
+
+ // Store some records now.
+ MockRecord record = new MockRecord("guid1", null, 1, false);
+ bufferingSession.store(record);
+
+ MockRecord record2 = new MockRecord("guid2", null, 13, false);
+ bufferingSession.store(record2);
+
+ MockRecord record3 = new MockRecord("guid3", null, 5, false);
+ bufferingSession.store(record3);
+
+ // Verify that buffer is intact after source fails.
+ bufferingSession.sourceFailed(new Exception());
+ assertEquals(3, bufferStorage.all().size());
+
+ // Verify that buffer is flushed after source fails.
+ verify(bufferStorageMocked, times(0)).flush();
+ bufferingSessionMocked.sourceFailed(new Exception());
+ verify(bufferStorageMocked, times(1)).flush();
+ }
+
+ @Test
+ public void abort() throws Exception {
+ MockRecord record = new MockRecord("guid1", null, 1, false);
+ bufferingSession.store(record);
+
+ MockRecord record2 = new MockRecord("guid2", null, 13, false);
+ bufferingSession.store(record2);
+
+ MockRecord record3 = new MockRecord("guid3", null, 5, false);
+ bufferingSession.store(record3);
+
+ // NB: same guid as above.
+ MockRecord record4 = new MockRecord("guid3", null, -1, false);
+ bufferingSession.store(record4);
+
+ bufferingSession.abort();
+
+ // Verify number of records didn't change.
+ // Abort shouldn't clear the buffer.
+ assertEquals(3, bufferStorage.all().size());
+ }
+
+ @Test
+ public void setStoreDelegate() throws Exception {
+ RepositorySessionStoreDelegate delegate = mock(RepositorySessionStoreDelegate.class);
+ bufferingSession.setStoreDelegate(delegate);
+ verify(innerRepositorySession).setStoreDelegate(delegate);
+ }
+
+ @Test
+ public void getHighWaterMarkTimestamp() throws Exception {
+ // Trivial case, empty buffer.
+ assertEquals(0, bufferingSession.getHighWaterMarkTimestamp());
+
+ MockRecord record = new MockRecord("guid1", null, 1, false);
+ bufferingSession.store(record);
+ assertEquals(1, bufferingSession.getHighWaterMarkTimestamp());
+
+ MockRecord record3 = new MockRecord("guid3", null, 5, false);
+ bufferingSession.store(record3);
+ assertEquals(5, bufferingSession.getHighWaterMarkTimestamp());
+
+ // NB: same guid as above.
+ MockRecord record4 = new MockRecord("guid3", null, -1, false);
+ bufferingSession.store(record4);
+ assertEquals(1, bufferingSession.getHighWaterMarkTimestamp());
+
+ MockRecord record2 = new MockRecord("guid2", null, 13, false);
+ bufferingSession.store(record2);
+ assertEquals(13, bufferingSession.getHighWaterMarkTimestamp());
+ }
+}
\ No newline at end of file