Bug 1389233 - Record outgoing batches in the android sync ping r?Grisha draft
authorThom Chiovoloni <tchiovoloni@mozilla.com>
Wed, 31 Jan 2018 16:13:19 -0500
changeset 749682 c17598310c963343efed8102ef462dc5b1036db1
parent 748403 117e0c0d1ebe2cf5bdffc3474744add2416fc511
push id97469
push userbmo:tchiovoloni@mozilla.com
push dateWed, 31 Jan 2018 21:14:29 +0000
reviewersGrisha
bugs1389233
milestone60.0a1
Bug 1389233 - Record outgoing batches in the android sync ping r?Grisha MozReview-Commit-ID: JUHSMluUE8q
mobile/android/app/src/test/java/org/mozilla/gecko/telemetry/pingbuilders/TelemetrySyncPingBuilderTest.java
mobile/android/base/android-services.mozbuild
mobile/android/base/java/org/mozilla/gecko/telemetry/pingbuilders/TelemetrySyncPingBuilder.java
mobile/android/services/src/androidTest/java/org/mozilla/gecko/background/sync/helpers/DefaultStoreDelegate.java
mobile/android/services/src/androidTest/java/org/mozilla/gecko/background/sync/helpers/SimpleSuccessStoreDelegate.java
mobile/android/services/src/main/java/org/mozilla/gecko/sync/repositories/VersioningDelegateHelper.java
mobile/android/services/src/main/java/org/mozilla/gecko/sync/repositories/delegates/DeferredRepositorySessionStoreDelegate.java
mobile/android/services/src/main/java/org/mozilla/gecko/sync/repositories/delegates/RepositorySessionStoreDelegate.java
mobile/android/services/src/main/java/org/mozilla/gecko/sync/repositories/uploaders/PayloadDispatcher.java
mobile/android/services/src/main/java/org/mozilla/gecko/sync/stage/ServerSyncStage.java
mobile/android/services/src/main/java/org/mozilla/gecko/sync/synchronizer/RecordsChannel.java
mobile/android/services/src/main/java/org/mozilla/gecko/sync/synchronizer/StoreBatchTracker.java
mobile/android/services/src/main/java/org/mozilla/gecko/sync/synchronizer/SynchronizerSession.java
mobile/android/services/src/main/java/org/mozilla/gecko/sync/telemetry/TelemetryStageCollector.java
mobile/android/services/src/test/java/org/mozilla/android/sync/test/helpers/ExpectSuccessRepositorySessionStoreDelegate.java
mobile/android/services/src/test/java/org/mozilla/gecko/sync/repositories/uploaders/BatchingUploaderTest.java
mobile/android/services/src/test/java/org/mozilla/gecko/sync/repositories/uploaders/PayloadUploadDelegateTest.java
mobile/android/services/src/test/java/org/mozilla/gecko/sync/synchronizer/StoreBatchTrackerTest.java
--- a/mobile/android/app/src/test/java/org/mozilla/gecko/telemetry/pingbuilders/TelemetrySyncPingBuilderTest.java
+++ b/mobile/android/app/src/test/java/org/mozilla/gecko/telemetry/pingbuilders/TelemetrySyncPingBuilderTest.java
@@ -2,23 +2,24 @@
    http://creativecommons.org/publicdomain/zero/1.0/ */
 
 package org.mozilla.gecko.telemetry.pingbuilders;
 
 import android.os.Bundle;
 import android.os.Parcelable;
 
 import org.json.JSONException;
-import org.json.JSONObject;
+
 import org.json.simple.JSONArray;
 import org.junit.Before;
 import org.junit.Test;
 import org.junit.runner.RunWith;
 import org.mozilla.gecko.background.testhelpers.TestRunner;
 import org.mozilla.gecko.sync.ExtendedJSONObject;
+import org.mozilla.gecko.sync.synchronizer.StoreBatchTracker;
 import org.mozilla.gecko.sync.telemetry.TelemetryStageCollector;
 import org.mozilla.gecko.sync.validation.BookmarkValidationResults;
 import org.mozilla.gecko.sync.validation.ValidationResults;
 import org.mozilla.gecko.telemetry.TelemetryLocalPing;
 
 import java.util.ArrayList;
 import java.util.HashMap;
 
@@ -56,16 +57,17 @@ public class TelemetrySyncPingBuilderTes
                 .setRestarted(true)
                 .build();
         payload = localPing.getPayload();
         assertEquals("uid", payload.getString("uid"));
         assertEquals(Long.valueOf(123L), payload.getLong("took"));
         assertEquals("device-id", payload.getString("deviceID"));
         assertTrue(payload.getLong("when") != null);
         assertEquals(true, payload.getBoolean("restarted"));
+
     }
 
     @Test
     public void testStage() throws Exception {
         HashMap<String, TelemetryStageCollector> stages = new HashMap<>();
         TelemetryStageCollector stage = new TelemetryStageCollector(null);
         stage.validation = new ExtendedJSONObject();
         stage.validation.put("took", 1L);
@@ -77,45 +79,73 @@ public class TelemetrySyncPingBuilderTes
         stage.error.put("error", "test");
         stage.started = 100;
         stage.finished = 105;
         stage.inbound = 5;
         stage.inboundStored = 3;
         stage.inboundFailed = 1;
         stage.reconciled = 1;
 
+        stage.outbound = new ArrayList<>();
+        stage.outbound.add(new StoreBatchTracker.Batch(1, 1));
+        stage.outbound.add(new StoreBatchTracker.Batch(9, 0));
+        stage.outbound.add(new StoreBatchTracker.Batch(0, 0));
 
         stages.put("testing", stage);
+
+        TelemetryStageCollector stage2 = new TelemetryStageCollector(null);
+        // If it's actually completely empty, it will get omitted.
+        stage2.inbound = 1;
+        stage2.inboundStored = 1;
+        stages.put("testing2", stage2);
+
         TelemetryLocalPing localPing = builder
                 .setStages(stages)
                 .build();
         ExtendedJSONObject payload = localPing.getPayload();
 
-        assertEquals(1, payload.getArray("engines").size());
-        ExtendedJSONObject engine = (ExtendedJSONObject)payload.getArray("engines").get(0);
+        assertEquals(2, payload.getArray("engines").size());
+        ExtendedJSONObject engine = (ExtendedJSONObject) payload.getArray("engines").get(0);
 
         assertEquals("testing", engine.getString("name"));
         assertEquals(Long.valueOf(5L), engine.getLong("took"));
 
         ExtendedJSONObject inbound = engine.getObject("incoming");
         assertEquals(Integer.valueOf(stage.inbound), inbound.getIntegerSafely("applied"));
         assertEquals(Integer.valueOf(stage.inboundStored), inbound.getIntegerSafely("succeeded"));
         assertEquals(Integer.valueOf(stage.inboundFailed), inbound.getIntegerSafely("failed"));
         assertEquals(Integer.valueOf(stage.reconciled), inbound.getIntegerSafely("reconciled"));
 
-        // TODO: Test outbound once bug 1389233 is addressed
-
         ExtendedJSONObject error = engine.getObject("failureReason");
         assertEquals("unexpectederror", error.getString("name"));
         assertEquals("test", error.getString("error"));
 
         ExtendedJSONObject validation = engine.getObject("validation");
         assertEquals(stage.validation.getLong("took"), validation.getLong("took"));
         assertEquals(stage.validation.getLong("checked"), validation.getLong("checked"));
         assertEquals(0, stage.validation.getArray("problems").size());
+
+        JSONArray outgoing = engine.getArray("outgoing");
+        assertEquals(outgoing.size(), 3);
+
+        ExtendedJSONObject firstBatch = (ExtendedJSONObject) outgoing.get(0);
+        assertEquals(firstBatch.getLong("sent", -1), 1);
+        assertEquals(firstBatch.getLong("failed", -1), 1);
+
+        ExtendedJSONObject secondBatch = (ExtendedJSONObject) outgoing.get(1);
+        assertEquals(secondBatch.getLong("sent", -1), 9);
+        assertFalse(secondBatch.containsKey("failed"));
+
+        // Ensure we include "all zero" batches, since we can actually send those on android.
+        ExtendedJSONObject lastBatch = (ExtendedJSONObject) outgoing.get(2);
+        assertFalse(lastBatch.containsKey("sent"));
+        assertFalse(lastBatch.containsKey("failed"));
+
+        ExtendedJSONObject emptyEngine = (ExtendedJSONObject) payload.getArray("engines").get(1);
+        assertFalse(emptyEngine.containsKey("outgoing"));
     }
 
     @Test
     public void testDevices() throws Exception {
         ArrayList<Parcelable> devices = new ArrayList<>();
 
         TelemetryLocalPing localPing = builder
                 .setDevices(devices)
--- a/mobile/android/base/android-services.mozbuild
+++ b/mobile/android/base/android-services.mozbuild
@@ -1070,16 +1070,17 @@ sync_java_files = [TOPSRCDIR + '/mobile/
     'sync/synchronizer/ConcurrentRecordConsumer.java',
     'sync/synchronizer/RecordConsumer.java',
     'sync/synchronizer/RecordsChannel.java',
     'sync/synchronizer/RecordsChannelDelegate.java',
     'sync/synchronizer/RecordsConsumerDelegate.java',
     'sync/synchronizer/ServerLocalSynchronizer.java',
     'sync/synchronizer/ServerLocalSynchronizerSession.java',
     'sync/synchronizer/SessionNotBegunException.java',
+    'sync/synchronizer/StoreBatchTracker.java',
     'sync/synchronizer/Synchronizer.java',
     'sync/synchronizer/SynchronizerDelegate.java',
     'sync/synchronizer/SynchronizerSession.java',
     'sync/synchronizer/SynchronizerSessionDelegate.java',
     'sync/synchronizer/UnbundleError.java',
     'sync/synchronizer/UnexpectedSessionException.java',
     'sync/SynchronizerConfiguration.java',
     'sync/telemetry/TelemetryCollector.java',
--- a/mobile/android/base/java/org/mozilla/gecko/telemetry/pingbuilders/TelemetrySyncPingBuilder.java
+++ b/mobile/android/base/java/org/mozilla/gecko/telemetry/pingbuilders/TelemetrySyncPingBuilder.java
@@ -4,43 +4,47 @@
  * file, you can obtain one at http://mozilla.org/MPL/2.0/.
  */
 
 package org.mozilla.gecko.telemetry.pingbuilders;
 
 import android.os.Bundle;
 import android.os.Parcelable;
 import android.support.annotation.NonNull;
+import android.support.annotation.Nullable;
 
 import org.json.simple.JSONArray;
 import org.json.simple.JSONObject;
 import org.mozilla.gecko.sync.ExtendedJSONObject;
+import org.mozilla.gecko.sync.synchronizer.StoreBatchTracker;
 import org.mozilla.gecko.sync.telemetry.TelemetryContract;
 import org.mozilla.gecko.sync.telemetry.TelemetryStageCollector;
 import org.mozilla.gecko.telemetry.TelemetryLocalPing;
 
 import java.io.Serializable;
 import java.util.ArrayList;
 import java.util.HashMap;
+import java.util.List;
 
 /**
  * Local ping builder which understands how to process sync data.
  * Whenever hashing of data is involved, we expect it to be performed at the time of collection,
  * somewhere in {@link org.mozilla.gecko.sync.telemetry.TelemetryCollector} and friends.
  */
 public class TelemetrySyncPingBuilder extends TelemetryLocalPingBuilder {
     public TelemetrySyncPingBuilder setStages(@NonNull final Serializable data) {
         HashMap<String, TelemetryStageCollector> stages = castSyncData(data);
 
         final JSONArray engines = new JSONArray();
         for (String stageName : stages.keySet()) {
             final TelemetryStageCollector stage = stages.get(stageName);
 
             // Skip stages that did nothing.
-            if (stage.inbound == 0 && stage.outbound == 0 && stage.error == null && stage.validation == null) {
+            if (stage.inbound == 0 && (stage.outbound == null || stage.outbound.size() == 0) &&
+                    stage.error == null && stage.validation == null) {
                 continue;
             }
 
             final ExtendedJSONObject stageJSON = new ExtendedJSONObject();
 
             stageJSON.put("name", stageName);
             stageJSON.put("took", stage.finished - stage.started);
 
@@ -58,29 +62,20 @@ public class TelemetrySyncPingBuilder ex
                     incomingJSON.put("failed", stage.inboundFailed);
                 }
                 if (stage.reconciled > 0) {
                     incomingJSON.put("reconciled", stage.reconciled);
                 }
                 stageJSON.put("incoming", incomingJSON);
             }
 
-            if (stage.outbound > 0) {
-                final ExtendedJSONObject outgoingJSON = new ExtendedJSONObject();
-                // We specifically do not check if `outboundStored` is greater than zero.
-                // `outbound` schema is simpler than `inbound`, namely there isn't an "attempted
-                // to send" count.
-                // Stage telemetry itself has that data (outbound = outboundStored + outboundFailed),
-                // and so this is our way to relay slightly more information.
-                // e.g. we'll know there's something wrong if `sent = 0` and `failed` is missing.
-                outgoingJSON.put("sent", stage.outboundStored);
-                if (stage.outboundFailed > 0) {
-                    outgoingJSON.put("failed", stage.outboundFailed);
-                }
-                stageJSON.put("outgoing", outgoingJSON);
+            JSONArray outbound = buildOutgoing(stage.outbound);
+
+            if (outbound != null) {
+                stageJSON.put("outgoing", outbound);
             }
 
             // We depend on the error builder from TelemetryCollector to produce the right schema.
             // Spreading around our schema definition like that is awkward, but, alas, here we are.
             if (stage.error != null) {
                 stageJSON.put("failureReason", stage.error);
             }
             // As above for validation too.
@@ -99,16 +94,36 @@ public class TelemetrySyncPingBuilder ex
         return this;
     }
 
     public TelemetrySyncPingBuilder setDeviceID(@NonNull String deviceID) {
         payload.put("deviceID", deviceID);
         return this;
     }
 
+    @Nullable
+    private static JSONArray buildOutgoing(List<StoreBatchTracker.Batch> batches) {
+        if (batches == null || batches.size() == 0) {
+            return null;
+        }
+        JSONArray arr = new JSONArray();
+        for (int i = 0; i < batches.size(); ++i) {
+            StoreBatchTracker.Batch batch = batches.get(i);
+            ExtendedJSONObject o = new ExtendedJSONObject();
+            if (batch.sent != 0) {
+                o.put("sent", (long) batch.sent);
+            }
+            if (batch.failed != 0) {
+                o.put("failed", (long) batch.failed);
+            }
+            addUnchecked(arr, o);
+        }
+        return arr.size() == 0 ? null : arr;
+    }
+
     public TelemetrySyncPingBuilder setRestarted(boolean didRestart) {
         if (!didRestart) {
             return this;
         }
 
         payload.put("restarted", true);
         return this;
     }
--- a/mobile/android/services/src/androidTest/java/org/mozilla/gecko/background/sync/helpers/DefaultStoreDelegate.java
+++ b/mobile/android/services/src/androidTest/java/org/mozilla/gecko/background/sync/helpers/DefaultStoreDelegate.java
@@ -25,16 +25,21 @@ public class DefaultStoreDelegate extend
   }
 
   @Override
   public void onStoreFailed(Exception ex) {
     performNotify("Store failed", ex);
   }
 
   @Override
+  public void onBatchCommitted() {
+    performNotify("Stores committed ", null);
+  }
+
+  @Override
   public void onRecordStoreReconciled(String guid, String oldGuid, Integer newVersion) {}
 
   @Override
   public RepositorySessionStoreDelegate deferredStoreDelegate(final ExecutorService executor) {
     final RepositorySessionStoreDelegate self = this;
     return new RepositorySessionStoreDelegate() {
 
       @Override
@@ -73,16 +78,26 @@ public class DefaultStoreDelegate extend
           @Override
           public void run() {
             self.onStoreCompleted();
           }
         });
       }
 
       @Override
+      public void onBatchCommitted() {
+        executor.execute(new Runnable() {
+          @Override
+          public void run() {
+            self.onBatchCommitted();
+          }
+        });
+      }
+
+      @Override
       public void onStoreFailed(Exception e) {
 
       }
 
       @Override
       public RepositorySessionStoreDelegate deferredStoreDelegate(ExecutorService newExecutor) {
         if (newExecutor == executor) {
           return this;
--- a/mobile/android/services/src/androidTest/java/org/mozilla/gecko/background/sync/helpers/SimpleSuccessStoreDelegate.java
+++ b/mobile/android/services/src/androidTest/java/org/mozilla/gecko/background/sync/helpers/SimpleSuccessStoreDelegate.java
@@ -9,12 +9,17 @@ import org.mozilla.gecko.sync.repositori
 
 public abstract class SimpleSuccessStoreDelegate extends DefaultDelegate implements RepositorySessionStoreDelegate {
   @Override
   public void onRecordStoreFailed(Exception ex, String guid) {
     performNotify("Store failed", ex);
   }
 
   @Override
+  public void onBatchCommitted() {
+    performNotify("Store committed", null);
+  }
+
+  @Override
   public RepositorySessionStoreDelegate deferredStoreDelegate(ExecutorService executor) {
     return this;
   }
 }
--- a/mobile/android/services/src/main/java/org/mozilla/gecko/sync/repositories/VersioningDelegateHelper.java
+++ b/mobile/android/services/src/main/java/org/mozilla/gecko/sync/repositories/VersioningDelegateHelper.java
@@ -202,13 +202,18 @@ public class VersioningDelegateHelper {
         }
 
         @Override
         public void onStoreFailed(Exception e) {
             inner.onStoreFailed(e);
         }
 
         @Override
+        public void onBatchCommitted() {
+            inner.onBatchCommitted();
+        }
+
+        @Override
         public RepositorySessionStoreDelegate deferredStoreDelegate(ExecutorService executor) {
             return inner.deferredStoreDelegate(executor);
         }
     }
 }
\ No newline at end of file
--- a/mobile/android/services/src/main/java/org/mozilla/gecko/sync/repositories/delegates/DeferredRepositorySessionStoreDelegate.java
+++ b/mobile/android/services/src/main/java/org/mozilla/gecko/sync/repositories/delegates/DeferredRepositorySessionStoreDelegate.java
@@ -69,9 +69,19 @@ public class DeferredRepositorySessionSt
   public void onRecordStoreReconciled(final String guid, final String oldGuid, final Integer newVersion) {
     executor.execute(new Runnable() {
       @Override
       public void run() {
         inner.onRecordStoreReconciled(guid, oldGuid, newVersion);
       }
     });
   }
+
+  @Override
+  public void onBatchCommitted() {
+    executor.execute(new Runnable() {
+      @Override
+      public void run() {
+        inner.onBatchCommitted();
+      }
+    });
+  }
 }
--- a/mobile/android/services/src/main/java/org/mozilla/gecko/sync/repositories/delegates/RepositorySessionStoreDelegate.java
+++ b/mobile/android/services/src/main/java/org/mozilla/gecko/sync/repositories/delegates/RepositorySessionStoreDelegate.java
@@ -20,10 +20,12 @@ public interface RepositorySessionStoreD
   // Only makes sense in context of local repositories.
   // Further call to onRecordStoreSucceeded is necessary.
   void onRecordStoreReconciled(String guid, String oldGuid, Integer newVersion);
 
   // Called with a GUID when store has succeeded.
   void onRecordStoreSucceeded(String guid);
   void onStoreCompleted();
   void onStoreFailed(Exception e);
+  // Only relevant for store batches, and exists to help us record correct telemetry.
+  void onBatchCommitted();
   RepositorySessionStoreDelegate deferredStoreDelegate(ExecutorService executor);
 }
--- a/mobile/android/services/src/main/java/org/mozilla/gecko/sync/repositories/uploaders/PayloadDispatcher.java
+++ b/mobile/android/services/src/main/java/org/mozilla/gecko/sync/repositories/uploaders/PayloadDispatcher.java
@@ -91,16 +91,20 @@ class PayloadDispatcher {
             // If we're not in a batching mode, or just committed a batch, uploaded records have
             // been applied to the server storage and are now visible to other clients.
             // Therefore, we bump our local "last store" timestamp.
             bumpTimestampTo(uploadTimestamp, response.normalizedTimestampForHeader(SyncResponse.X_LAST_MODIFIED));
             uploader.setLastStoreTimestamp(uploadTimestamp);
             batchWhiteboard.clearSuccessRecordGuids();
         }
 
+        if (isCommit || !batchWhiteboard.getInBatchingMode()) {
+            uploader.sessionStoreDelegate.onBatchCommitted();
+        }
+
         // If this was our very last commit, we're done storing records.
         // Get Last-Modified timestamp from the response, and pass it upstream.
         if (isLastPayload) {
             uploader.finished();
         }
     }
 
     void payloadFailed(Exception e) {
--- a/mobile/android/services/src/main/java/org/mozilla/gecko/sync/stage/ServerSyncStage.java
+++ b/mobile/android/services/src/main/java/org/mozilla/gecko/sync/stage/ServerSyncStage.java
@@ -39,16 +39,17 @@ import org.mozilla.gecko.sync.repositori
 import org.mozilla.gecko.sync.synchronizer.ServerLocalSynchronizer;
 import org.mozilla.gecko.sync.synchronizer.Synchronizer;
 import org.mozilla.gecko.sync.synchronizer.SynchronizerDelegate;
 import org.mozilla.gecko.sync.synchronizer.SynchronizerSession;
 import org.mozilla.gecko.sync.telemetry.TelemetryCollector;
 
 import java.io.IOException;
 import java.net.URISyntaxException;
+import java.util.List;
 import java.util.Map;
 import java.util.concurrent.ExecutorService;
 
 /**
  * Fetch from a server collection into a local repository, encrypting
  * and decrypting along the way.
  *
  * @author rnewman
@@ -627,24 +628,22 @@ public abstract class ServerSyncStage ex
     int inboundCount = synchronizerSession.getInboundCount();
     int inboundCountStored = synchronizerSession.getInboundCountStored();
     int inboundCountFailed = synchronizerSession.getInboundCountFailed();
     int inboundCountReconciled = synchronizerSession.getInboundCountReconciled();
     int outboundCount = synchronizerSession.getOutboundCount();
     int outboundCountStored = synchronizerSession.getOutboundCountStored();
     int outboundCountFailed = synchronizerSession.getOutboundCountFailed();
 
+    telemetryStageCollector.outbound = synchronizerSession.getOutboundBatches();
     telemetryStageCollector.finished = stageCompleteTimestamp;
     telemetryStageCollector.inbound = inboundCount;
     telemetryStageCollector.inboundStored = inboundCountStored;
     telemetryStageCollector.inboundFailed = inboundCountFailed;
     telemetryStageCollector.reconciled = inboundCountReconciled;
-    telemetryStageCollector.outbound = outboundCount;
-    telemetryStageCollector.outboundStored = outboundCountStored;
-    telemetryStageCollector.outboundFailed = outboundCountFailed;
 
     Logger.info(LOG_TAG, "Stage " + getEngineName()
             + " received " + inboundCount
             + "; stored " + inboundCountStored + ", reconciling " + inboundCountReconciled
             + " and failed to store " + inboundCountFailed
             + ". Sent " + outboundCount
             + "; server accepted " + outboundCountStored + " and rejected " + outboundCountFailed
             + ". Duration: " + getStageDurationString() + ".");
--- a/mobile/android/services/src/main/java/org/mozilla/gecko/sync/synchronizer/RecordsChannel.java
+++ b/mobile/android/services/src/main/java/org/mozilla/gecko/sync/synchronizer/RecordsChannel.java
@@ -2,16 +2,17 @@
  * License, v. 2.0. If a copy of the MPL was not distributed with this
  * file, You can obtain one at http://mozilla.org/MPL/2.0/. */
 
 package org.mozilla.gecko.sync.synchronizer;
 
 import android.support.annotation.NonNull;
 import android.support.annotation.Nullable;
 
+import java.util.ArrayList;
 import java.util.concurrent.ConcurrentLinkedQueue;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.atomic.AtomicInteger;
 
 import org.mozilla.gecko.background.common.log.Logger;
 import org.mozilla.gecko.sync.ReflowIsNecessaryException;
 import org.mozilla.gecko.sync.SyncException;
 import org.mozilla.gecko.sync.ThreadPool;
@@ -82,16 +83,18 @@ public class RecordsChannel implements
   // attempted = accepted + failed
   // reconciled <= accepted <= attempted
   // reconciled = accepted - `new`, where `new` is inferred.
   private final AtomicInteger storeAttemptedCount = new AtomicInteger();
   private final AtomicInteger storeAcceptedCount = new AtomicInteger();
   private final AtomicInteger storeFailedCount = new AtomicInteger();
   private final AtomicInteger storeReconciledCount = new AtomicInteger();
 
+  private final StoreBatchTracker storeTracker = new StoreBatchTracker();
+
   public RecordsChannel(RepositorySession source, RepositorySession sink, RecordsChannelDelegate delegate) {
     this.source    = source;
     this.sink      = sink;
     this.delegate  = delegate;
   }
 
   /*
    * We push fetched records into a queue.
@@ -180,16 +183,17 @@ public class RecordsChannel implements
 
     sink.setStoreDelegate(this);
     fetchedCount.set(0);
     fetchFailedCount.set(0);
     storeAttemptedCount.set(0);
     storeAcceptedCount.set(0);
     storeFailedCount.set(0);
     storeReconciledCount.set(0);
+    storeTracker.reset();
     // Start a consumer thread.
     this.consumer = new ConcurrentRecordConsumer(this);
     ThreadPool.run(this.consumer);
     waitingForQueueDone = true;
     source.fetchModified(this);
   }
 
   /**
@@ -208,24 +212,29 @@ public class RecordsChannel implements
     }
 
     this.flow();
   }
 
   @Override
   public void store(Record record) {
     storeAttemptedCount.incrementAndGet();
+    storeTracker.onRecordStoreAttempted();
     try {
       sink.store(record);
     } catch (NoStoreDelegateException e) {
       Logger.error(LOG_TAG, "Got NoStoreDelegateException in RecordsChannel.store(). This should not occur. Aborting.", e);
       delegate.onFlowStoreFailed(this, e, record.guid);
     }
   }
 
+  /* package-local */ ArrayList<StoreBatchTracker.Batch> getStoreBatches() {
+    return this.storeTracker.getStoreBatches();
+  }
+
   @Override
   public void onFetchFailed(Exception ex) {
     Logger.warn(LOG_TAG, "onFetchFailed. Calling for immediate stop.", ex);
     fetchFailedCount.incrementAndGet();
     if (ex instanceof ReflowIsNecessaryException) {
       setReflowException((ReflowIsNecessaryException) ex);
     }
     delegate.onFlowFetchFailed(this, ex);
@@ -246,29 +255,37 @@ public class RecordsChannel implements
     this.consumer.queueFilled();
   }
 
   @Override
   public void onBatchCompleted() {
     this.sink.storeFlush();
   }
 
+  // Sent for "store" batches.
+  @Override
+  public void onBatchCommitted() {
+    storeTracker.onBatchFinished();
+  }
+
   @Override
   public void onRecordStoreFailed(Exception ex, String recordGuid) {
     Logger.trace(LOG_TAG, "Failed to store record with guid " + recordGuid);
     storeFailedCount.incrementAndGet();
+    storeTracker.onRecordStoreFailed();
     this.consumer.stored();
     delegate.onFlowStoreFailed(this, ex, recordGuid);
     // TODO: abort?
   }
 
   @Override
   public void onRecordStoreSucceeded(String guid) {
     Logger.trace(LOG_TAG, "Stored record with guid " + guid);
     storeAcceptedCount.incrementAndGet();
+    storeTracker.onRecordStoreSucceeded();
     this.consumer.stored();
   }
 
   @Override
   public void onRecordStoreReconciled(String guid, String oldGuid, Integer newVersion) {
     Logger.trace(LOG_TAG, "Reconciled record with guid " + guid);
     storeReconciledCount.incrementAndGet();
   }
@@ -311,16 +328,18 @@ public class RecordsChannel implements
 
   @Override
   public void onStoreFailed(Exception ex) {
     Logger.warn(LOG_TAG, "onStoreFailed. Calling for immediate stop.", ex);
     if (ex instanceof ReflowIsNecessaryException) {
       setReflowException((ReflowIsNecessaryException) ex);
     }
 
+    storeTracker.onBatchFailed();
+
     // NB: consumer might or might not be running at this point. There are two cases here:
     // 1) If we're storing records remotely, we might fail due to a 412.
     // -- we might hit 412 at any point, so consumer might be in either state.
     // Action: ignore consumer state, we have nothing else to do other to inform our delegate
     // that we're done with this flow. Based on the reflow exception, it'll determine what to do.
 
     // 2) If we're storing (merging) records locally, we might fail due to a sync deadline.
     // -- we might hit a deadline only prior to attempting to merge records,
new file mode 100644
--- /dev/null
+++ b/mobile/android/services/src/main/java/org/mozilla/gecko/sync/synchronizer/StoreBatchTracker.java
@@ -0,0 +1,85 @@
+/* This Source Code Form is subject to the terms of the Mozilla Public
+ * License, v. 2.0. If a copy of the MPL was not distributed with this
+ * file, You can obtain one at http://mozilla.org/MPL/2.0/. */
+
+package org.mozilla.gecko.sync.synchronizer;
+
+import java.util.ArrayList;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.atomic.AtomicInteger;
+
+/**
+ * This class encapsulates most of the logic for recording telemetry information about outgoing
+ * batches.
+ */
+public class StoreBatchTracker {
+    private final AtomicInteger currentStoreBatchAttempted = new AtomicInteger();
+    private final AtomicInteger currentStoreBatchAccepted = new AtomicInteger();
+    private final AtomicInteger currentStoreBatchFailed = new AtomicInteger();
+    private final ConcurrentLinkedQueue<Batch> completedBatches = new ConcurrentLinkedQueue<>();
+
+    public static class Batch {
+        public final int sent;
+        public final int failed;
+        public Batch(final int sent, final int failed) {
+            this.sent = sent;
+            this.failed = failed;
+        }
+    }
+
+    /* package-local */ void reset() {
+        currentStoreBatchFailed.set(0);
+        currentStoreBatchAccepted.set(0);
+        currentStoreBatchAttempted.set(0);
+        completedBatches.clear();
+    }
+
+    private boolean haveUncommittedBatchData() {
+        return currentStoreBatchAttempted.get() != 0 ||
+                currentStoreBatchAccepted.get() != 0 ||
+                currentStoreBatchFailed.get() != 0;
+    }
+
+    /* package-local */ void onBatchFinished() {
+        // Not actually thread safe, but should be good enough for telemetry info. (Especially
+        // since we shouldn't be completing batches for the same channel on more than one thread)
+        final int sent = currentStoreBatchAttempted.getAndSet(0);
+        final int knownFailed = currentStoreBatchFailed.getAndSet(0);
+        final int knownSucceeded = currentStoreBatchAccepted.getAndSet(0);
+
+        // These might be different if we "forced" a failure due to an error unrelated to uploading
+        // a record.
+        final int failed = Math.max(knownFailed, sent - knownSucceeded);
+
+        completedBatches.add(new Batch(sent, failed));
+    }
+
+    /* package-local */ void onBatchFailed() {
+        if (currentStoreBatchFailed.get() == 0 && currentStoreBatchAccepted.get() == currentStoreBatchAttempted.get()) {
+            // Force the failure. It's unclear if this ever can happen for cases where we care about what
+            // is inside completedBatches (cases where we're uploading).
+            currentStoreBatchFailed.incrementAndGet();
+        }
+        onBatchFinished();
+    }
+
+    /* package-local */ void onRecordStoreFailed() {
+        currentStoreBatchFailed.incrementAndGet();
+    }
+
+    /* package-local */ void onRecordStoreSucceeded() {
+        currentStoreBatchAccepted.incrementAndGet();
+    }
+
+    /* package-local */ void onRecordStoreAttempted() {
+        currentStoreBatchAttempted.incrementAndGet();
+    }
+
+    // Note that this finishes the current batch (if any exists).
+    /* package-local */ ArrayList<Batch> getStoreBatches() {
+        if (haveUncommittedBatchData()) {
+            onBatchFinished();
+        }
+        return new ArrayList<>(completedBatches);
+    }
+}
--- a/mobile/android/services/src/main/java/org/mozilla/gecko/sync/synchronizer/SynchronizerSession.java
+++ b/mobile/android/services/src/main/java/org/mozilla/gecko/sync/synchronizer/SynchronizerSession.java
@@ -1,20 +1,24 @@
 /* This Source Code Form is subject to the terms of the Mozilla Public
  * License, v. 2.0. If a copy of the MPL was not distributed with this
  * file, You can obtain one at http://mozilla.org/MPL/2.0/. */
 
 package org.mozilla.gecko.sync.synchronizer;
 
 
+import java.util.ArrayList;
+import java.util.List;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicReference;
 
 import org.mozilla.gecko.background.common.log.Logger;
 import org.mozilla.gecko.sync.SyncException;
+import org.mozilla.gecko.sync.synchronizer.StoreBatchTracker.Batch;
 import org.mozilla.gecko.sync.repositories.InactiveSessionException;
 import org.mozilla.gecko.sync.repositories.InvalidSessionTransitionException;
 import org.mozilla.gecko.sync.repositories.RepositorySession;
 import org.mozilla.gecko.sync.repositories.RepositorySessionBundle;
 import org.mozilla.gecko.sync.repositories.delegates.DeferredRepositorySessionFinishDelegate;
 import org.mozilla.gecko.sync.repositories.delegates.RepositorySessionFinishDelegate;
 
 import android.content.Context;
@@ -76,16 +80,19 @@ public class SynchronizerSession impleme
 
   private final AtomicInteger numInboundRecords = new AtomicInteger(-1);
   private final AtomicInteger numInboundRecordsStored = new AtomicInteger(-1);
   private final AtomicInteger numInboundRecordsFailed = new AtomicInteger(-1);
   private final AtomicInteger numInboundRecordsReconciled = new AtomicInteger(-1);
   private final AtomicInteger numOutboundRecords = new AtomicInteger(-1);
   private final AtomicInteger numOutboundRecordsStored = new AtomicInteger(-1);
   private final AtomicInteger numOutboundRecordsFailed = new AtomicInteger(-1);
+  // Doesn't need to be ConcurrentLinkedQueue or anything like that since we don't do partial
+  // changes to it.
+  private final AtomicReference<List<Batch>> outboundBatches = new AtomicReference<>();
 
   private Exception fetchFailedCauseException;
   private Exception storeFailedCauseException;
 
   /*
    * Public API: constructor, init, synchronize.
    */
   public SynchronizerSession(Synchronizer synchronizer, SynchronizerSessionDelegate delegate) {
@@ -179,16 +186,21 @@ public class SynchronizerSession impleme
   public int getInboundCountFailed() {
     return numInboundRecordsFailed.get();
   }
 
   public int getInboundCountReconciled() {
     return numInboundRecordsReconciled.get();
   }
 
+  // Returns outboundBatches.
+  public List<Batch> getOutboundBatches() {
+    return outboundBatches.get();
+  }
+
   /**
    * Get the number of records fetched from the second repository (usually the
    * local store, hence outbound).
    * <p>
    * Valid only after second flow has completed.
    *
    * @return number of records, or -1 if not valid.
    */
@@ -223,16 +235,17 @@ public class SynchronizerSession impleme
   private synchronized void synchronize() {
     numInboundRecords.set(-1);
     numInboundRecordsStored.set(-1);
     numInboundRecordsFailed.set(-1);
     numInboundRecordsReconciled.set(-1);
     numOutboundRecords.set(-1);
     numOutboundRecordsStored.set(-1);
     numOutboundRecordsFailed.set(-1);
+    outboundBatches.set(null);
 
     // First thing: decide whether we should.
     if (sessionA.shouldSkip() ||
         sessionB.shouldSkip()) {
       Logger.info(LOG_TAG, "Session requested skip. Short-circuiting sync.");
       sessionA.abort();
       sessionB.abort();
       this.delegate.onSynchronizeSkipped(this);
@@ -315,16 +328,17 @@ public class SynchronizerSession impleme
   public void onSecondFlowCompleted(RecordsChannel recordsChannel) {
     Logger.trace(LOG_TAG, "Second RecordsChannel onFlowCompleted.");
     pendingBTimestamp = sessionB.getLastFetchTimestamp();
     storeEndATimestamp = sessionA.getLastStoreTimestamp();
     Logger.debug(LOG_TAG, "Fetch end is " + pendingBTimestamp + ". Store end is " + storeEndATimestamp + ". Finishing.");
     numOutboundRecords.set(recordsChannel.getFetchCount());
     numOutboundRecordsStored.set(recordsChannel.getStoreAcceptedCount());
     numOutboundRecordsFailed.set(recordsChannel.getStoreFailureCount());
+    outboundBatches.set(recordsChannel.getStoreBatches());
     flowBToACompleted = true;
 
     // Finish the two sessions.
     try {
       this.sessionA.finish(this);
     } catch (InactiveSessionException e) {
       this.onFinishFailed(e);
       return;
--- a/mobile/android/services/src/main/java/org/mozilla/gecko/sync/telemetry/TelemetryStageCollector.java
+++ b/mobile/android/services/src/main/java/org/mozilla/gecko/sync/telemetry/TelemetryStageCollector.java
@@ -1,37 +1,38 @@
 /* This Source Code Form is subject to the terms of the Mozilla Public
  * License, v. 2.0. If a copy of the MPL was not distributed with this
  * file, You can obtain one at http://mozilla.org/MPL/2.0/. */
 
 package org.mozilla.gecko.sync.telemetry;
 
 import org.mozilla.gecko.sync.ExtendedJSONObject;
+import org.mozilla.gecko.sync.synchronizer.StoreBatchTracker;
+
+import java.util.List;
 
 /**
  * Gathers telemetry details about an individual sync stage.
  * Implementation note: there are no getters/setters to avoid unnecessary verboseness.
  * This data expected to be write-only from within SyncStages, and read-only from TelemetryCollector.
  * Although there shouldn't be concurrent access, it's possible that we'll be reading/writing these
  * values from different threads - hence `volatile` to ensure visibility.
  */
 public class TelemetryStageCollector {
     private final TelemetryCollector syncCollector;
 
     public volatile long started = 0L;
     public volatile long finished = 0L;
     public volatile int inbound = 0;
     public volatile int inboundStored = 0;
     public volatile int inboundFailed = 0;
-    public volatile int outbound = 0;
-    public volatile int outboundStored = 0;
-    public volatile int outboundFailed = 0;
     public volatile int reconciled = 0;
     public volatile ExtendedJSONObject error = null;
     public volatile ExtendedJSONObject validation = null;
+    public volatile List<StoreBatchTracker.Batch> outbound = null;
 
     public TelemetryStageCollector(TelemetryCollector syncCollector) {
         this.syncCollector = syncCollector;
     }
 
     public TelemetryCollector getSyncCollector() {
         return this.syncCollector;
     }
--- a/mobile/android/services/src/test/java/org/mozilla/android/sync/test/helpers/ExpectSuccessRepositorySessionStoreDelegate.java
+++ b/mobile/android/services/src/test/java/org/mozilla/android/sync/test/helpers/ExpectSuccessRepositorySessionStoreDelegate.java
@@ -39,12 +39,17 @@ public class ExpectSuccessRepositorySess
   }
 
   @Override
   public void onRecordStoreReconciled(String guid, String oldGuid, Integer newVersion) {
     log("Store reconciled record " + guid);
   }
 
   @Override
+  public void onBatchCommitted() {
+    log("Batch committed");
+  }
+
+  @Override
   public RepositorySessionStoreDelegate deferredStoreDelegate(ExecutorService executor) {
     return this;
   }
 }
--- a/mobile/android/services/src/test/java/org/mozilla/gecko/sync/repositories/uploaders/BatchingUploaderTest.java
+++ b/mobile/android/services/src/test/java/org/mozilla/gecko/sync/repositories/uploaders/BatchingUploaderTest.java
@@ -155,16 +155,20 @@ public class BatchingUploaderTest {
             lastStoreFailedException = e;
         }
 
         @Override
         public void onRecordStoreReconciled(String guid, String oldGuid, Integer newVersion) {
         }
 
         @Override
+        public void onBatchCommitted() {
+        }
+
+        @Override
         public RepositorySessionStoreDelegate deferredStoreDelegate(ExecutorService executor) {
             return this;
         }
     }
 
     private ExecutorService workQueue;
     private RepositorySessionStoreDelegate storeDelegate;
 
--- a/mobile/android/services/src/test/java/org/mozilla/gecko/sync/repositories/uploaders/PayloadUploadDelegateTest.java
+++ b/mobile/android/services/src/test/java/org/mozilla/gecko/sync/repositories/uploaders/PayloadUploadDelegateTest.java
@@ -117,16 +117,20 @@ public class PayloadUploadDelegateTest {
             storeFailedException = e;
         }
 
         @Override
         public void onRecordStoreReconciled(String guid, String oldGuid, Integer newVersion) {
         }
 
         @Override
+        public void onBatchCommitted() {
+        }
+
+        @Override
         public RepositorySessionStoreDelegate deferredStoreDelegate(ExecutorService executor) {
             return this;
         }
     }
 
     @Before
     public void setUp() throws Exception {
         sessionStoreDelegate = spy(new MockRepositorySessionStoreDelegate());
new file mode 100644
--- /dev/null
+++ b/mobile/android/services/src/test/java/org/mozilla/gecko/sync/synchronizer/StoreBatchTrackerTest.java
@@ -0,0 +1,124 @@
+/* Any copyright is dedicated to the Public Domain.
+   http://creativecommons.org/publicdomain/zero/1.0/ */
+
+package org.mozilla.gecko.sync.synchronizer;
+
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mozilla.gecko.background.testhelpers.TestRunner;
+import org.mozilla.gecko.sync.synchronizer.StoreBatchTracker.Batch;
+
+import java.util.ArrayList;
+
+import static org.junit.Assert.*;
+
+@RunWith(TestRunner.class)
+public class StoreBatchTrackerTest {
+    private StoreBatchTracker tracker;
+
+    @Before
+    public void setUp() throws Exception {
+        tracker = new StoreBatchTracker();
+    }
+
+    private void recordCounts(int attempted, int succeeded, int failed) {
+        for (int i = 0; i < attempted; ++i) {
+            tracker.onRecordStoreAttempted();
+        }
+        for (int i = 0; i < succeeded; ++i) {
+            tracker.onRecordStoreSucceeded();
+        }
+        for (int i = 0; i < failed; ++i) {
+            tracker.onRecordStoreFailed();
+        }
+    }
+
+    @Test
+    public void testSingleBatch() {
+        {
+            recordCounts(3, 3, 0);
+            tracker.onBatchFinished();
+            ArrayList<Batch> batches = tracker.getStoreBatches();
+            assertEquals(batches.size(), 1);
+            assertEquals(batches.get(0).failed, 0);
+            assertEquals(batches.get(0).sent, 3);
+            tracker.reset();
+        }
+        {
+            recordCounts(3, 2, 1);
+            // Don't bother calling onBatchFinished, to ensure we finish it automatically.
+            ArrayList<Batch> batches = tracker.getStoreBatches();
+            assertEquals(batches.size(), 1);
+            assertEquals(batches.get(0).failed, 1);
+            assertEquals(batches.get(0).sent, 3);
+            tracker.reset();
+        }
+        {
+            recordCounts(3, 0, 3);
+            tracker.onBatchFinished();
+            ArrayList<Batch> batches = tracker.getStoreBatches();
+            assertEquals(batches.size(), 1);
+            assertEquals(batches.get(0).failed, 3);
+            assertEquals(batches.get(0).sent, 3);
+            tracker.reset();
+        }
+    }
+
+    @Test
+    public void testBatchFail() {
+        recordCounts(3, 3, 0);
+        tracker.onBatchFailed();
+        ArrayList<Batch> batches = tracker.getStoreBatches();
+        assertEquals(batches.size(), 1);
+        // The important thing is that there's a non-zero number of failed here.
+        assertEquals(batches.get(0).failed, 1);
+        assertEquals(batches.get(0).sent, 3);
+    }
+
+    @Test
+    public void testMultipleBatches() {
+        recordCounts(3, 3, 0);
+        tracker.onBatchFinished();
+        recordCounts(8, 8, 0);
+        tracker.onBatchFinished();
+        recordCounts(5, 5, 0);
+        tracker.onBatchFinished();
+        // Fake an empty "commit" POST.
+        tracker.onBatchFinished();
+        ArrayList<Batch> batches = tracker.getStoreBatches();
+
+        assertEquals(batches.size(), 4);
+
+        assertEquals(batches.get(0).failed, 0);
+        assertEquals(batches.get(0).sent, 3);
+
+        assertEquals(batches.get(1).failed, 0);
+        assertEquals(batches.get(1).sent, 8);
+
+        assertEquals(batches.get(2).failed, 0);
+        assertEquals(batches.get(2).sent, 5);
+
+        assertEquals(batches.get(3).failed, 0);
+        assertEquals(batches.get(3).sent, 0);
+    }
+
+    @Test
+    public void testDroppedStores() {
+        recordCounts(3, 1, 0);
+        tracker.onBatchFinished();
+        ArrayList<Batch> batches = tracker.getStoreBatches();
+        assertEquals(batches.size(), 1);
+        assertEquals(batches.get(0).failed, 2);
+        assertEquals(batches.get(0).sent, 3);
+    }
+
+    @Test
+    public void testReset() {
+        assertEquals(tracker.getStoreBatches().size(), 0);
+        recordCounts(2, 1, 1);
+        assertEquals(tracker.getStoreBatches().size(), 1);
+        tracker.reset();
+        assertEquals(tracker.getStoreBatches().size(), 0);
+    }
+}
\ No newline at end of file