Bug 1308337 - Pre: More granular tracking of record flow between repositories r=nalexander draft
authorGrigory Kruglov <gkruglov@mozilla.com>
Fri, 26 May 2017 17:34:07 -0400
changeset 588421 d7424fec748b9a2d07d1c98b78ce89fd418750e4
parent 588420 a16933121371818307329523916d35e82b2446c9
child 588422 501ff746ecfb3022a0fe89844e307153bfdb5164
push id62031
push userbmo:gkruglov@mozilla.com
push dateFri, 02 Jun 2017 19:52:26 +0000
reviewersnalexander
bugs1308337
milestone55.0a1
Bug 1308337 - Pre: More granular tracking of record flow between repositories r=nalexander This patch: - introduces a way to signal that a record has been reconciled; this is not a "flow control" event type, and must be used in addition to regular "recordStored" delegate call - draws a clearer distinction between "attempted to store" and "stored, as reported by session's storage layer" MozReview-Commit-ID: 99UbUJzu57w
mobile/android/services/src/main/java/org/mozilla/gecko/sync/repositories/android/AndroidBrowserRepositorySession.java
mobile/android/services/src/main/java/org/mozilla/gecko/sync/repositories/android/FormHistoryRepositorySession.java
mobile/android/services/src/main/java/org/mozilla/gecko/sync/repositories/android/PasswordsRepositorySession.java
mobile/android/services/src/main/java/org/mozilla/gecko/sync/repositories/delegates/DeferredRepositorySessionStoreDelegate.java
mobile/android/services/src/main/java/org/mozilla/gecko/sync/repositories/delegates/RepositorySessionStoreDelegate.java
mobile/android/services/src/main/java/org/mozilla/gecko/sync/synchronizer/RecordsChannel.java
mobile/android/tests/background/junit3/src/org/mozilla/gecko/background/db/TestBookmarks.java
mobile/android/tests/background/junit3/src/org/mozilla/gecko/background/sync/TestStoreTracking.java
mobile/android/tests/background/junit3/src/org/mozilla/gecko/background/sync/helpers/DefaultStoreDelegate.java
mobile/android/tests/background/junit4/src/org/mozilla/android/sync/test/TestRecordsChannel.java
mobile/android/tests/background/junit4/src/org/mozilla/android/sync/test/helpers/ExpectSuccessRepositorySessionStoreDelegate.java
mobile/android/tests/background/junit4/src/org/mozilla/gecko/sync/repositories/uploaders/BatchingUploaderTest.java
mobile/android/tests/background/junit4/src/org/mozilla/gecko/sync/repositories/uploaders/PayloadUploadDelegateTest.java
--- a/mobile/android/services/src/main/java/org/mozilla/gecko/sync/repositories/android/AndroidBrowserRepositorySession.java
+++ b/mobile/android/services/src/main/java/org/mozilla/gecko/sync/repositories/android/AndroidBrowserRepositorySession.java
@@ -458,16 +458,20 @@ public abstract class AndroidBrowserRepo
               trace("Remote modified, local not. Deleting.");
               storeRecordDeletion(record, existingRecord);
               return;
             }
 
             trace("Both local and remote records have been modified.");
             if (record.lastModified > existingRecord.lastModified) {
               trace("Remote is newer, and deleted. Deleting local.");
+              // Note that while this counts as "reconciliation", we're probably over-counting.
+              // Currently, locallyModified above is _always_ true if a record exists locally,
+              // and so we'll consider any deletions of already present records as reconciliations.
+              storeDelegate.onRecordStoreReconciled(record.guid);
               storeRecordDeletion(record, existingRecord);
               return;
             }
 
             trace("Remote is older, local is not deleted. Ignoring.");
             return;
           }
           // End deletion logic.
@@ -512,16 +516,17 @@ public abstract class AndroidBrowserRepo
                        (toStore.deleted ? " with deleted record " : " with record ") +
                        toStore.guid);
           Record replaced = replace(toStore, existingRecord);
 
           // Note that we don't track records here; deciding that is the job
           // of reconcileRecords.
           Logger.debug(LOG_TAG, "Calling delegate callback with guid " + replaced.guid +
                                 "(" + replaced.androidID + ")");
+          storeDelegate.onRecordStoreReconciled(replaced.guid);
           storeDelegate.onRecordStoreSucceeded(replaced.guid);
           return;
 
         } catch (MultipleRecordsForGuidException e) {
           Logger.error(LOG_TAG, "Multiple records returned for given guid: " + record.guid);
           storeDelegate.onRecordStoreFailed(e, record.guid);
           return;
         } catch (NoGuidForIdException e) {
--- a/mobile/android/services/src/main/java/org/mozilla/gecko/sync/repositories/android/FormHistoryRepositorySession.java
+++ b/mobile/android/services/src/main/java/org/mozilla/gecko/sync/repositories/android/FormHistoryRepositorySession.java
@@ -607,16 +607,20 @@ public class FormHistoryRepositorySessio
               return;
             }
 
             Logger.trace(LOG_TAG, "Both local and remote records have been modified.");
             if (record.lastModified > existingRecord.lastModified) {
               Logger.trace(LOG_TAG, "Remote is newer, and deleted. Purging local.");
               deleteExistingRecord(existingRecord);
               trackRecord(record);
+              // Note that while this counts as "reconciliation", we're probably over-counting.
+              // Currently, locallyModified above is _always_ true if a record exists locally,
+              // and so we'll consider any deletions of already present records as reconciliations.
+              storeDelegate.onRecordStoreReconciled(record.guid);
               storeDelegate.onRecordStoreSucceeded(record.guid);
               return;
             }
 
             Logger.trace(LOG_TAG, "Remote is older, local is not deleted. Ignoring.");
             return;
           }
           // End deletion logic.
@@ -658,16 +662,17 @@ public class FormHistoryRepositorySessio
             return;
           }
 
           Logger.trace(LOG_TAG, "Both local and remote records have been modified.");
           if (record.lastModified > existingRecord.lastModified) {
             Logger.trace(LOG_TAG, "Remote is newer, and not deleted. Storing.");
             replaceExistingRecordWithRegularRecord(record, existingRecord);
             trackRecord(record);
+            storeDelegate.onRecordStoreReconciled(record.guid);
             storeDelegate.onRecordStoreSucceeded(record.guid);
             return;
           }
 
           Logger.trace(LOG_TAG, "Remote is older, local is not deleted. Ignoring.");
         } catch (Exception e) {
           Logger.error(LOG_TAG, "Store failed for " + record.guid, e);
           storeDelegate.onRecordStoreFailed(e, record.guid);
--- a/mobile/android/services/src/main/java/org/mozilla/gecko/sync/repositories/android/PasswordsRepositorySession.java
+++ b/mobile/android/services/src/main/java/org/mozilla/gecko/sync/repositories/android/PasswordsRepositorySession.java
@@ -306,16 +306,20 @@ public class PasswordsRepositorySession 
             trace("Remote modified, local not. Deleting.");
             storeRecordDeletion(remoteRecord);
             return;
           }
 
           trace("Both local and remote records have been modified.");
           if (remoteRecord.lastModified > existingRecord.lastModified) {
             trace("Remote is newer, and deleted. Deleting local.");
+            // Note that while this counts as "reconciliation", we're probably over-counting.
+            // Currently, locallyModified above is _always_ true if a record exists locally,
+            // and so we'll consider any deletions of already present records as reconciliations.
+            storeDelegate.onRecordStoreReconciled(record.guid);
             storeRecordDeletion(remoteRecord);
             return;
           }
 
           trace("Remote is older, local is not deleted. Ignoring.");
 
           return;
         }
@@ -384,16 +388,17 @@ public class PasswordsRepositorySession 
           storeDelegate.onRecordStoreFailed(e, record.guid);
           return;
         }
 
         // Note that we don't track records here; deciding that is the job
         // of reconcileRecords.
         Logger.debug(LOG_TAG, "Calling delegate callback with guid " + replaced.guid +
                               "(" + replaced.androidID + ")");
+        storeDelegate.onRecordStoreReconciled(record.guid);
         storeDelegate.onRecordStoreSucceeded(record.guid);
         return;
       }
     };
     storeWorkQueue.execute(storeRunnable);
   }
 
   @Override
--- a/mobile/android/services/src/main/java/org/mozilla/gecko/sync/repositories/delegates/DeferredRepositorySessionStoreDelegate.java
+++ b/mobile/android/services/src/main/java/org/mozilla/gecko/sync/repositories/delegates/DeferredRepositorySessionStoreDelegate.java
@@ -59,9 +59,19 @@ public class DeferredRepositorySessionSt
   public void onStoreFailed(final Exception e) {
     executor.execute(new Runnable() {
       @Override
       public void run() {
         inner.onStoreFailed(e);
       }
     });
   }
+
+  @Override
+  public void onRecordStoreReconciled(final String guid) {
+    executor.execute(new Runnable() {
+      @Override
+      public void run() {
+        inner.onRecordStoreReconciled(guid);
+      }
+    });
+  }
 }
--- a/mobile/android/services/src/main/java/org/mozilla/gecko/sync/repositories/delegates/RepositorySessionStoreDelegate.java
+++ b/mobile/android/services/src/main/java/org/mozilla/gecko/sync/repositories/delegates/RepositorySessionStoreDelegate.java
@@ -11,14 +11,19 @@ import java.util.concurrent.ExecutorServ
  * need help doing this.
  *
  * @author rnewman
  *
  */
 public interface RepositorySessionStoreDelegate {
   void onRecordStoreFailed(Exception ex, String recordGuid);
 
+  // Meant for signaling that a record has been reconciled.
+  // Only makes sense in context of local repositories.
+  // Further call to onRecordStoreSucceeded is necessary.
+  void onRecordStoreReconciled(String guid);
+
   // Called with a GUID when store has succeeded.
   void onRecordStoreSucceeded(String guid);
   void onStoreCompleted(long storeEnd);
   void onStoreFailed(Exception e);
   RepositorySessionStoreDelegate deferredStoreDelegate(ExecutorService executor);
 }
--- a/mobile/android/services/src/main/java/org/mozilla/gecko/sync/synchronizer/RecordsChannel.java
+++ b/mobile/android/services/src/main/java/org/mozilla/gecko/sync/synchronizer/RecordsChannel.java
@@ -68,26 +68,33 @@ import org.mozilla.gecko.sync.repositori
 public class RecordsChannel implements
   RepositorySessionFetchRecordsDelegate,
   RepositorySessionStoreDelegate,
   RecordsConsumerDelegate,
   RepositorySessionBeginDelegate {
 
   private static final String LOG_TAG = "RecordsChannel";
   public RepositorySession source;
-  public RepositorySession sink;
+  private RepositorySession sink;
   private final RecordsChannelDelegate delegate;
   private long fetchEnd = -1;
 
   private volatile ReflowIsNecessaryException reflowException;
 
-  protected final AtomicInteger numFetched = new AtomicInteger();
-  protected final AtomicInteger numFetchFailed = new AtomicInteger();
-  protected final AtomicInteger numStored = new AtomicInteger();
-  protected final AtomicInteger numStoreFailed = new AtomicInteger();
+  private final AtomicInteger fetchedCount = new AtomicInteger();
+  private final AtomicInteger fetchFailedCount = new AtomicInteger();
+
+  // Expected value relationships:
+  // attempted = accepted + failed
+  // reconciled <= accepted <= attempted
+  // reconciled = accepted - `new`, where `new` is inferred.
+  private final AtomicInteger storeAttemptedCount = new AtomicInteger();
+  private final AtomicInteger storeAcceptedCount = new AtomicInteger();
+  private final AtomicInteger storeFailedCount = new AtomicInteger();
+  private final AtomicInteger storeReconciledCount = new AtomicInteger();
 
   public RecordsChannel(RepositorySession source, RepositorySession sink, RecordsChannelDelegate delegate) {
     this.source    = source;
     this.sink      = sink;
     this.delegate  = delegate;
   }
 
   /*
@@ -113,44 +120,52 @@ public class RecordsChannel implements
   }
 
   /**
    * Get the number of records fetched so far.
    *
    * @return number of fetches.
    */
   public int getFetchCount() {
-    return numFetched.get();
+    return fetchedCount.get();
   }
 
   /**
    * Get the number of fetch failures recorded so far.
    *
    * @return number of fetch failures.
    */
   public int getFetchFailureCount() {
-    return numFetchFailed.get();
+    return fetchFailedCount.get();
   }
 
   /**
    * Get the number of store attempts (successful or not) so far.
    *
    * @return number of stores attempted.
    */
-  public int getStoreCount() {
-    return numStored.get();
+  public int getStoreAttemptedCount() {
+    return storeAttemptedCount.get();
+  }
+
+  public int getStoreAcceptedCount() {
+    return storeAcceptedCount.get();
   }
 
   /**
    * Get the number of store failures recorded so far.
    *
    * @return number of store failures.
    */
   public int getStoreFailureCount() {
-    return numStoreFailed.get();
+    return storeFailedCount.get();
+  }
+
+  public int getStoreReconciledCount() {
+    return storeReconciledCount.get();
   }
 
   /**
    * Start records flowing through the channel.
    */
   public void flow() {
     if (!isReady()) {
       RepositorySession failed = source;
@@ -164,20 +179,22 @@ public class RecordsChannel implements
     if (!source.dataAvailable()) {
       Logger.info(LOG_TAG, "No data available: short-circuiting flow from source " + source);
       long now = System.currentTimeMillis();
       this.delegate.onFlowCompleted(this, now, now);
       return;
     }
 
     sink.setStoreDelegate(this);
-    numFetched.set(0);
-    numFetchFailed.set(0);
-    numStored.set(0);
-    numStoreFailed.set(0);
+    fetchedCount.set(0);
+    fetchFailedCount.set(0);
+    storeAttemptedCount.set(0);
+    storeAcceptedCount.set(0);
+    storeFailedCount.set(0);
+    storeReconciledCount.set(0);
     // Start a consumer thread.
     this.consumer = new ConcurrentRecordConsumer(this);
     ThreadPool.run(this.consumer);
     waitingForQueueDone = true;
     source.fetchSince(source.getLastSyncTimestamp(), this);
   }
 
   /**
@@ -186,40 +203,40 @@ public class RecordsChannel implements
    */
   public void beginAndFlow() throws InvalidSessionTransitionException {
     Logger.trace(LOG_TAG, "Beginning source.");
     source.begin(this);
   }
 
   @Override
   public void store(Record record) {
-    numStored.incrementAndGet();
+    storeAttemptedCount.incrementAndGet();
     try {
       sink.store(record);
     } catch (NoStoreDelegateException e) {
       Logger.error(LOG_TAG, "Got NoStoreDelegateException in RecordsChannel.store(). This should not occur. Aborting.", e);
       delegate.onFlowStoreFailed(this, e, record.guid);
     }
   }
 
   @Override
   public void onFetchFailed(Exception ex) {
     Logger.warn(LOG_TAG, "onFetchFailed. Calling for immediate stop.", ex);
-    numFetchFailed.incrementAndGet();
+    fetchFailedCount.incrementAndGet();
     if (ex instanceof ReflowIsNecessaryException) {
       setReflowException((ReflowIsNecessaryException) ex);
     }
     delegate.onFlowFetchFailed(this, ex);
     // Sink will be informed once consumer finishes.
     this.consumer.halt();
   }
 
   @Override
   public void onFetchedRecord(Record record) {
-    numFetched.incrementAndGet();
+    fetchedCount.incrementAndGet();
     this.toProcess.add(record);
     this.consumer.doNotify();
   }
 
   @Override
   public void onFetchCompleted(final long fetchEnd) {
     Logger.trace(LOG_TAG, "onFetchCompleted. Stopping consumer once stores are done.");
     Logger.trace(LOG_TAG, "Fetch timestamp is " + fetchEnd);
@@ -230,29 +247,36 @@ public class RecordsChannel implements
   @Override
   public void onBatchCompleted() {
     this.sink.storeFlush();
   }
 
   @Override
   public void onRecordStoreFailed(Exception ex, String recordGuid) {
     Logger.trace(LOG_TAG, "Failed to store record with guid " + recordGuid);
-    numStoreFailed.incrementAndGet();
+    storeFailedCount.incrementAndGet();
     this.consumer.stored();
     delegate.onFlowStoreFailed(this, ex, recordGuid);
     // TODO: abort?
   }
 
   @Override
   public void onRecordStoreSucceeded(String guid) {
     Logger.trace(LOG_TAG, "Stored record with guid " + guid);
+    storeAcceptedCount.incrementAndGet();
     this.consumer.stored();
   }
 
   @Override
+  public void onRecordStoreReconciled(String guid) {
+    Logger.trace(LOG_TAG, "Reconciled record with guid " + guid);
+    storeReconciledCount.incrementAndGet();
+  }
+
+  @Override
   public void consumerIsDoneFull() {
     Logger.trace(LOG_TAG, "Consumer is done, processed all records. Are we waiting for it? " + waitingForQueueDone);
     if (waitingForQueueDone) {
       waitingForQueueDone = false;
 
       // Now we'll be waiting for sink to call its delegate's onStoreCompleted or onStoreFailed.
       this.sink.storeDone();
     }
--- a/mobile/android/tests/background/junit3/src/org/mozilla/gecko/background/db/TestBookmarks.java
+++ b/mobile/android/tests/background/junit3/src/org/mozilla/gecko/background/db/TestBookmarks.java
@@ -743,16 +743,20 @@ public class TestBookmarks extends Andro
             finishAndNotify(session);
           }
 
           @Override
           public void onRecordStoreSucceeded(String guid) {
           }
 
           @Override
+          public void onRecordStoreReconciled(String guid) {
+          }
+
+          @Override
           public void onStoreFailed(Exception e) {
 
           }
         };
         session.setStoreDelegate(storeDelegate);
         for (BookmarkRecord record : records) {
           try {
             session.store(record);
--- a/mobile/android/tests/background/junit3/src/org/mozilla/gecko/background/sync/TestStoreTracking.java
+++ b/mobile/android/tests/background/junit3/src/org/mozilla/gecko/background/sync/TestStoreTracking.java
@@ -56,16 +56,22 @@ public class TestStoreTracking extends A
 
       @Override
       public void onRecordStoreSucceeded(String guid) {
         Logger.debug(getName(), "Stored " + guid);
         assertEq(expectedGUID, guid);
       }
 
       @Override
+      public void onRecordStoreReconciled(String guid) {
+        Logger.debug(getName(), "Reconciled " + guid);
+        assertEq(expectedGUID, guid);
+      }
+
+      @Override
       public void onStoreCompleted(long storeEnd) {
         Logger.debug(getName(), "Store completed at " + storeEnd + ".");
         try {
           session.fetch(new String[] { expectedGUID }, new SimpleSuccessFetchDelegate() {
             @Override
             public void onFetchedRecord(Record record) {
               Logger.debug(getName(), "Hurrah! Fetched record " + record.guid);
               assertEq(expectedGUID, record.guid);
--- a/mobile/android/tests/background/junit3/src/org/mozilla/gecko/background/sync/helpers/DefaultStoreDelegate.java
+++ b/mobile/android/tests/background/junit3/src/org/mozilla/gecko/background/sync/helpers/DefaultStoreDelegate.java
@@ -25,16 +25,19 @@ public class DefaultStoreDelegate extend
   }
 
   @Override
   public void onStoreFailed(Exception ex) {
     performNotify("Store failed", ex);
   }
 
   @Override
+  public void onRecordStoreReconciled(String guid) {}
+
+  @Override
   public RepositorySessionStoreDelegate deferredStoreDelegate(final ExecutorService executor) {
     final RepositorySessionStoreDelegate self = this;
     return new RepositorySessionStoreDelegate() {
 
       @Override
       public void onRecordStoreSucceeded(final String guid) {
         executor.execute(new Runnable() {
           @Override
@@ -50,16 +53,26 @@ public class DefaultStoreDelegate extend
           @Override
           public void run() {
             self.onRecordStoreFailed(ex, guid);
           }
         });
       }
 
       @Override
+      public void onRecordStoreReconciled(final String guid) {
+        executor.execute(new Runnable() {
+          @Override
+          public void run() {
+            self.onRecordStoreReconciled(guid);
+          }
+        });
+      }
+
+      @Override
       public void onStoreCompleted(final long storeEnd) {
         executor.execute(new Runnable() {
           @Override
           public void run() {
             self.onStoreCompleted(storeEnd);
           }
         });
       }
--- a/mobile/android/tests/background/junit4/src/org/mozilla/android/sync/test/TestRecordsChannel.java
+++ b/mobile/android/tests/background/junit4/src/org/mozilla/android/sync/test/TestRecordsChannel.java
@@ -198,46 +198,46 @@ public class TestRecordsChannel {
     sinkRepository = empty();
     doFlow();
     assertEquals(1, numFlowCompleted.get());
     assertEquals(0, numFlowFetchFailed.get());
     assertEquals(0, numFlowStoreFailed.get());
     assertEquals(sourceRepository.wbos, sinkRepository.wbos);
     assertEquals(0, recordsChannel.getFetchFailureCount());
     assertEquals(0, recordsChannel.getStoreFailureCount());
-    assertEquals(6, recordsChannel.getStoreCount());
+    assertEquals(6, recordsChannel.getStoreAttemptedCount());
   }
 
   @Test
   public void testFetchFail() throws Exception {
     sourceRepository = failingFetch(SynchronizerHelpers.FailMode.FETCH);
     sinkRepository = empty();
     doFlow();
     assertEquals(1, numFlowCompleted.get());
     assertTrue(numFlowFetchFailed.get() > 0);
     assertEquals(0, numFlowStoreFailed.get());
     assertTrue(sinkRepository.wbos.size() < 6);
     assertTrue(recordsChannel.getFetchFailureCount() > 0);
     assertEquals(0, recordsChannel.getStoreFailureCount());
-    assertTrue(recordsChannel.getStoreCount() < 6);
+    assertTrue(recordsChannel.getStoreAttemptedCount() < 6);
   }
 
   @Test
   public void testStoreFetchFailedCollectionModified() throws Exception {
     sourceRepository = failingFetch(SynchronizerHelpers.FailMode.COLLECTION_MODIFIED);
     sinkRepository = empty();
     doFlow();
     assertEquals(1, numFlowCompleted.get());
     assertTrue(numFlowFetchFailed.get() > 0);
     assertEquals(0, numFlowStoreFailed.get());
     assertTrue(sinkRepository.wbos.size() < 6);
 
     assertTrue(recordsChannel.getFetchFailureCount() > 0);
     assertEquals(0, recordsChannel.getStoreFailureCount());
-    assertTrue(recordsChannel.getStoreCount() < sourceRepository.wbos.size());
+    assertTrue(recordsChannel.getStoreAttemptedCount() < sourceRepository.wbos.size());
 
     assertEquals(CollectionConcurrentModificationException.class, fetchException.getClass());
     final Exception ex = recordsChannel.getReflowException();
     assertNotNull(ex);
     assertEquals(CollectionConcurrentModificationException.class, ex.getClass());
   }
 
   @Test
@@ -247,17 +247,17 @@ public class TestRecordsChannel {
     doFlow();
     assertEquals(1, numFlowCompleted.get());
     assertTrue(numFlowFetchFailed.get() > 0);
     assertEquals(0, numFlowStoreFailed.get());
     assertTrue(sinkRepository.wbos.size() < 6);
 
     assertTrue(recordsChannel.getFetchFailureCount() > 0);
     assertEquals(0, recordsChannel.getStoreFailureCount());
-    assertTrue(recordsChannel.getStoreCount() < sourceRepository.wbos.size());
+    assertTrue(recordsChannel.getStoreAttemptedCount() < sourceRepository.wbos.size());
 
     assertEquals(SyncDeadlineReachedException.class, fetchException.getClass());
     final Exception ex = recordsChannel.getReflowException();
     assertNotNull(ex);
     assertEquals(SyncDeadlineReachedException.class, ex.getClass());
   }
 
   @Test
@@ -270,17 +270,17 @@ public class TestRecordsChannel {
     assertEquals(0, numFlowFetchFailed.get());
     assertEquals(1, numFlowStoreFailed.get());
     // We will fail to store one of the records but expect flow to continue.
     assertEquals(5, sinkRepository.wbos.size());
 
     assertEquals(0, recordsChannel.getFetchFailureCount());
     assertEquals(1, recordsChannel.getStoreFailureCount());
     // Number of store attempts.
-    assertEquals(sourceRepository.wbos.size(), recordsChannel.getStoreCount());
+    assertEquals(sourceRepository.wbos.size(), recordsChannel.getStoreAttemptedCount());
   }
 
   @Test
   public void testStoreSerialFailCollectionModified() throws Exception {
     sourceRepository = full();
     sinkRepository = new SynchronizerHelpers.SerialFailStoreWBORepository(
             SynchronizerHelpers.FailMode.COLLECTION_MODIFIED);
     doFlow();
@@ -309,28 +309,28 @@ public class TestRecordsChannel {
     assertEquals(1, numFlowCompleted.get());
     assertEquals(0, numFlowFetchFailed.get());
     assertEquals(3, numFlowStoreFailed.get()); // One batch fails.
     assertEquals(3, sinkRepository.wbos.size()); // One batch succeeds.
 
     assertEquals(0, recordsChannel.getFetchFailureCount());
     assertEquals(3, recordsChannel.getStoreFailureCount());
     // Number of store attempts.
-    assertEquals(sourceRepository.wbos.size(), recordsChannel.getStoreCount());
+    assertEquals(sourceRepository.wbos.size(), recordsChannel.getStoreAttemptedCount());
   }
 
 
   @Test
   public void testStoreOneBigBatchFail() throws Exception {
     sourceRepository = full();
     sinkRepository = new SynchronizerHelpers.BatchFailStoreWBORepository(50);
     doFlow();
     assertEquals(1, numFlowCompleted.get());
     assertEquals(0, numFlowFetchFailed.get());
     assertEquals(6, numFlowStoreFailed.get()); // One (big) batch fails.
     assertEquals(0, sinkRepository.wbos.size()); // No batches succeed.
 
     assertEquals(0, recordsChannel.getFetchFailureCount());
     assertEquals(6, recordsChannel.getStoreFailureCount());
     // Number of store attempts.
-    assertEquals(sourceRepository.wbos.size(), recordsChannel.getStoreCount());
+    assertEquals(sourceRepository.wbos.size(), recordsChannel.getStoreAttemptedCount());
   }
 }
--- a/mobile/android/tests/background/junit4/src/org/mozilla/android/sync/test/helpers/ExpectSuccessRepositorySessionStoreDelegate.java
+++ b/mobile/android/tests/background/junit4/src/org/mozilla/android/sync/test/helpers/ExpectSuccessRepositorySessionStoreDelegate.java
@@ -34,12 +34,17 @@ public class ExpectSuccessRepositorySess
 
   @Override
   public void onStoreFailed(Exception e) {
     log("Store failed.", e);
     performNotify(new AssertionFailedError("onStoreFailed: store should not have failed."));
   }
 
   @Override
+  public void onRecordStoreReconciled(String guid) {
+    log("Store reconciled record " + guid);
+  }
+
+  @Override
   public RepositorySessionStoreDelegate deferredStoreDelegate(ExecutorService executor) {
     return this;
   }
 }
--- a/mobile/android/tests/background/junit4/src/org/mozilla/gecko/sync/repositories/uploaders/BatchingUploaderTest.java
+++ b/mobile/android/tests/background/junit4/src/org/mozilla/gecko/sync/repositories/uploaders/BatchingUploaderTest.java
@@ -149,16 +149,20 @@ public class BatchingUploaderTest {
         }
 
         @Override
         public void onStoreFailed(Exception e) {
             lastStoreFailedException = e;
         }
 
         @Override
+        public void onRecordStoreReconciled(String guid) {
+        }
+
+        @Override
         public RepositorySessionStoreDelegate deferredStoreDelegate(ExecutorService executor) {
             return this;
         }
     }
 
     private ExecutorService workQueue;
     private RepositorySessionStoreDelegate storeDelegate;
 
--- a/mobile/android/tests/background/junit4/src/org/mozilla/gecko/sync/repositories/uploaders/PayloadUploadDelegateTest.java
+++ b/mobile/android/tests/background/junit4/src/org/mozilla/gecko/sync/repositories/uploaders/PayloadUploadDelegateTest.java
@@ -107,16 +107,20 @@ public class PayloadUploadDelegateTest {
         }
 
         @Override
         public void onStoreFailed(Exception e) {
             storeFailedException = e;
         }
 
         @Override
+        public void onRecordStoreReconciled(String guid) {
+        }
+
+        @Override
         public RepositorySessionStoreDelegate deferredStoreDelegate(ExecutorService executor) {
             return null;
         }
     }
 
     @Before
     public void setUp() throws Exception {
         sessionStoreDelegate = new MockRepositorySessionStoreDelegate();