--- a/mobile/android/app/src/test/java/org/mozilla/gecko/telemetry/pingbuilders/TelemetrySyncPingBuilderTest.java
+++ b/mobile/android/app/src/test/java/org/mozilla/gecko/telemetry/pingbuilders/TelemetrySyncPingBuilderTest.java
@@ -2,23 +2,24 @@
http://creativecommons.org/publicdomain/zero/1.0/ */
package org.mozilla.gecko.telemetry.pingbuilders;
import android.os.Bundle;
import android.os.Parcelable;
import org.json.JSONException;
-import org.json.JSONObject;
+
import org.json.simple.JSONArray;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.mozilla.gecko.background.testhelpers.TestRunner;
import org.mozilla.gecko.sync.ExtendedJSONObject;
+import org.mozilla.gecko.sync.synchronizer.StoreBatchTracker;
import org.mozilla.gecko.sync.telemetry.TelemetryStageCollector;
import org.mozilla.gecko.sync.validation.BookmarkValidationResults;
import org.mozilla.gecko.sync.validation.ValidationResults;
import org.mozilla.gecko.telemetry.TelemetryLocalPing;
import java.util.ArrayList;
import java.util.HashMap;
@@ -56,16 +57,17 @@ public class TelemetrySyncPingBuilderTes
.setRestarted(true)
.build();
payload = localPing.getPayload();
assertEquals("uid", payload.getString("uid"));
assertEquals(Long.valueOf(123L), payload.getLong("took"));
assertEquals("device-id", payload.getString("deviceID"));
assertTrue(payload.getLong("when") != null);
assertEquals(true, payload.getBoolean("restarted"));
+
}
@Test
public void testStage() throws Exception {
HashMap<String, TelemetryStageCollector> stages = new HashMap<>();
TelemetryStageCollector stage = new TelemetryStageCollector(null);
stage.validation = new ExtendedJSONObject();
stage.validation.put("took", 1L);
@@ -77,45 +79,73 @@ public class TelemetrySyncPingBuilderTes
stage.error.put("error", "test");
stage.started = 100;
stage.finished = 105;
stage.inbound = 5;
stage.inboundStored = 3;
stage.inboundFailed = 1;
stage.reconciled = 1;
+ stage.outbound = new ArrayList<>();
+ stage.outbound.add(new StoreBatchTracker.Batch(1, 1));
+ stage.outbound.add(new StoreBatchTracker.Batch(9, 0));
+ stage.outbound.add(new StoreBatchTracker.Batch(0, 0));
stages.put("testing", stage);
+
+ TelemetryStageCollector stage2 = new TelemetryStageCollector(null);
+ // If it's actually completely empty, it will get omitted.
+ stage2.inbound = 1;
+ stage2.inboundStored = 1;
+ stages.put("testing2", stage2);
+
TelemetryLocalPing localPing = builder
.setStages(stages)
.build();
ExtendedJSONObject payload = localPing.getPayload();
- assertEquals(1, payload.getArray("engines").size());
- ExtendedJSONObject engine = (ExtendedJSONObject)payload.getArray("engines").get(0);
+ assertEquals(2, payload.getArray("engines").size());
+ ExtendedJSONObject engine = (ExtendedJSONObject) payload.getArray("engines").get(0);
assertEquals("testing", engine.getString("name"));
assertEquals(Long.valueOf(5L), engine.getLong("took"));
ExtendedJSONObject inbound = engine.getObject("incoming");
assertEquals(Integer.valueOf(stage.inbound), inbound.getIntegerSafely("applied"));
assertEquals(Integer.valueOf(stage.inboundStored), inbound.getIntegerSafely("succeeded"));
assertEquals(Integer.valueOf(stage.inboundFailed), inbound.getIntegerSafely("failed"));
assertEquals(Integer.valueOf(stage.reconciled), inbound.getIntegerSafely("reconciled"));
- // TODO: Test outbound once bug 1389233 is addressed
-
ExtendedJSONObject error = engine.getObject("failureReason");
assertEquals("unexpectederror", error.getString("name"));
assertEquals("test", error.getString("error"));
ExtendedJSONObject validation = engine.getObject("validation");
assertEquals(stage.validation.getLong("took"), validation.getLong("took"));
assertEquals(stage.validation.getLong("checked"), validation.getLong("checked"));
assertEquals(0, stage.validation.getArray("problems").size());
+
+ JSONArray outgoing = engine.getArray("outgoing");
+ assertEquals(outgoing.size(), 3);
+
+ ExtendedJSONObject firstBatch = (ExtendedJSONObject) outgoing.get(0);
+ assertEquals(firstBatch.getLong("sent", -1), 1);
+ assertEquals(firstBatch.getLong("failed", -1), 1);
+
+ ExtendedJSONObject secondBatch = (ExtendedJSONObject) outgoing.get(1);
+ assertEquals(secondBatch.getLong("sent", -1), 9);
+ assertFalse(secondBatch.containsKey("failed"));
+
+ // Ensure we include "all zero" batches, since we can actually send those on android.
+ ExtendedJSONObject lastBatch = (ExtendedJSONObject) outgoing.get(2);
+ assertFalse(lastBatch.containsKey("sent"));
+ assertFalse(lastBatch.containsKey("failed"));
+
+ ExtendedJSONObject emptyEngine = (ExtendedJSONObject) payload.getArray("engines").get(1);
+ assertFalse(emptyEngine.containsKey("outgoing"));
}
@Test
public void testDevices() throws Exception {
ArrayList<Parcelable> devices = new ArrayList<>();
TelemetryLocalPing localPing = builder
.setDevices(devices)
--- a/mobile/android/base/android-services.mozbuild
+++ b/mobile/android/base/android-services.mozbuild
@@ -1070,16 +1070,17 @@ sync_java_files = [TOPSRCDIR + '/mobile/
'sync/synchronizer/ConcurrentRecordConsumer.java',
'sync/synchronizer/RecordConsumer.java',
'sync/synchronizer/RecordsChannel.java',
'sync/synchronizer/RecordsChannelDelegate.java',
'sync/synchronizer/RecordsConsumerDelegate.java',
'sync/synchronizer/ServerLocalSynchronizer.java',
'sync/synchronizer/ServerLocalSynchronizerSession.java',
'sync/synchronizer/SessionNotBegunException.java',
+ 'sync/synchronizer/StoreBatchTracker.java',
'sync/synchronizer/Synchronizer.java',
'sync/synchronizer/SynchronizerDelegate.java',
'sync/synchronizer/SynchronizerSession.java',
'sync/synchronizer/SynchronizerSessionDelegate.java',
'sync/synchronizer/UnbundleError.java',
'sync/synchronizer/UnexpectedSessionException.java',
'sync/SynchronizerConfiguration.java',
'sync/telemetry/TelemetryCollector.java',
--- a/mobile/android/base/java/org/mozilla/gecko/telemetry/pingbuilders/TelemetrySyncPingBuilder.java
+++ b/mobile/android/base/java/org/mozilla/gecko/telemetry/pingbuilders/TelemetrySyncPingBuilder.java
@@ -4,43 +4,47 @@
* file, you can obtain one at http://mozilla.org/MPL/2.0/.
*/
package org.mozilla.gecko.telemetry.pingbuilders;
import android.os.Bundle;
import android.os.Parcelable;
import android.support.annotation.NonNull;
+import android.support.annotation.Nullable;
import org.json.simple.JSONArray;
import org.json.simple.JSONObject;
import org.mozilla.gecko.sync.ExtendedJSONObject;
+import org.mozilla.gecko.sync.synchronizer.StoreBatchTracker;
import org.mozilla.gecko.sync.telemetry.TelemetryContract;
import org.mozilla.gecko.sync.telemetry.TelemetryStageCollector;
import org.mozilla.gecko.telemetry.TelemetryLocalPing;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.HashMap;
+import java.util.List;
/**
* Local ping builder which understands how to process sync data.
* Whenever hashing of data is involved, we expect it to be performed at the time of collection,
* somewhere in {@link org.mozilla.gecko.sync.telemetry.TelemetryCollector} and friends.
*/
public class TelemetrySyncPingBuilder extends TelemetryLocalPingBuilder {
public TelemetrySyncPingBuilder setStages(@NonNull final Serializable data) {
HashMap<String, TelemetryStageCollector> stages = castSyncData(data);
final JSONArray engines = new JSONArray();
for (String stageName : stages.keySet()) {
final TelemetryStageCollector stage = stages.get(stageName);
// Skip stages that did nothing.
- if (stage.inbound == 0 && stage.outbound == 0 && stage.error == null && stage.validation == null) {
+ if (stage.inbound == 0 && (stage.outbound == null || stage.outbound.size() == 0) &&
+ stage.error == null && stage.validation == null) {
continue;
}
final ExtendedJSONObject stageJSON = new ExtendedJSONObject();
stageJSON.put("name", stageName);
stageJSON.put("took", stage.finished - stage.started);
@@ -58,29 +62,20 @@ public class TelemetrySyncPingBuilder ex
incomingJSON.put("failed", stage.inboundFailed);
}
if (stage.reconciled > 0) {
incomingJSON.put("reconciled", stage.reconciled);
}
stageJSON.put("incoming", incomingJSON);
}
- if (stage.outbound > 0) {
- final ExtendedJSONObject outgoingJSON = new ExtendedJSONObject();
- // We specifically do not check if `outboundStored` is greater than zero.
- // `outbound` schema is simpler than `inbound`, namely there isn't an "attempted
- // to send" count.
- // Stage telemetry itself has that data (outbound = outboundStored + outboundFailed),
- // and so this is our way to relay slightly more information.
- // e.g. we'll know there's something wrong if `sent = 0` and `failed` is missing.
- outgoingJSON.put("sent", stage.outboundStored);
- if (stage.outboundFailed > 0) {
- outgoingJSON.put("failed", stage.outboundFailed);
- }
- stageJSON.put("outgoing", outgoingJSON);
+ JSONArray outbound = buildOutgoing(stage.outbound);
+
+ if (outbound != null) {
+ stageJSON.put("outgoing", outbound);
}
// We depend on the error builder from TelemetryCollector to produce the right schema.
// Spreading around our schema definition like that is awkward, but, alas, here we are.
if (stage.error != null) {
stageJSON.put("failureReason", stage.error);
}
// As above for validation too.
@@ -99,16 +94,36 @@ public class TelemetrySyncPingBuilder ex
return this;
}
public TelemetrySyncPingBuilder setDeviceID(@NonNull String deviceID) {
payload.put("deviceID", deviceID);
return this;
}
+ @Nullable
+ private static JSONArray buildOutgoing(List<StoreBatchTracker.Batch> batches) {
+ if (batches == null || batches.size() == 0) {
+ return null;
+ }
+ JSONArray arr = new JSONArray();
+ for (int i = 0; i < batches.size(); ++i) {
+ StoreBatchTracker.Batch batch = batches.get(i);
+ ExtendedJSONObject o = new ExtendedJSONObject();
+ if (batch.sent != 0) {
+ o.put("sent", (long) batch.sent);
+ }
+ if (batch.failed != 0) {
+ o.put("failed", (long) batch.failed);
+ }
+ addUnchecked(arr, o);
+ }
+ return arr.size() == 0 ? null : arr;
+ }
+
public TelemetrySyncPingBuilder setRestarted(boolean didRestart) {
if (!didRestart) {
return this;
}
payload.put("restarted", true);
return this;
}
--- a/mobile/android/services/src/androidTest/java/org/mozilla/gecko/background/sync/helpers/DefaultStoreDelegate.java
+++ b/mobile/android/services/src/androidTest/java/org/mozilla/gecko/background/sync/helpers/DefaultStoreDelegate.java
@@ -25,16 +25,21 @@ public class DefaultStoreDelegate extend
}
@Override
public void onStoreFailed(Exception ex) {
performNotify("Store failed", ex);
}
@Override
+ public void onBatchCommitted() {
+ performNotify("Stores committed ", null);
+ }
+
+ @Override
public void onRecordStoreReconciled(String guid, String oldGuid, Integer newVersion) {}
@Override
public RepositorySessionStoreDelegate deferredStoreDelegate(final ExecutorService executor) {
final RepositorySessionStoreDelegate self = this;
return new RepositorySessionStoreDelegate() {
@Override
@@ -73,16 +78,26 @@ public class DefaultStoreDelegate extend
@Override
public void run() {
self.onStoreCompleted();
}
});
}
@Override
+ public void onBatchCommitted() {
+ executor.execute(new Runnable() {
+ @Override
+ public void run() {
+ self.onBatchCommitted();
+ }
+ });
+ }
+
+ @Override
public void onStoreFailed(Exception e) {
}
@Override
public RepositorySessionStoreDelegate deferredStoreDelegate(ExecutorService newExecutor) {
if (newExecutor == executor) {
return this;
--- a/mobile/android/services/src/androidTest/java/org/mozilla/gecko/background/sync/helpers/SimpleSuccessStoreDelegate.java
+++ b/mobile/android/services/src/androidTest/java/org/mozilla/gecko/background/sync/helpers/SimpleSuccessStoreDelegate.java
@@ -9,12 +9,17 @@ import org.mozilla.gecko.sync.repositori
public abstract class SimpleSuccessStoreDelegate extends DefaultDelegate implements RepositorySessionStoreDelegate {
@Override
public void onRecordStoreFailed(Exception ex, String guid) {
performNotify("Store failed", ex);
}
@Override
+ public void onBatchCommitted() {
+ performNotify("Store committed", null);
+ }
+
+ @Override
public RepositorySessionStoreDelegate deferredStoreDelegate(ExecutorService executor) {
return this;
}
}
--- a/mobile/android/services/src/main/java/org/mozilla/gecko/sync/repositories/VersioningDelegateHelper.java
+++ b/mobile/android/services/src/main/java/org/mozilla/gecko/sync/repositories/VersioningDelegateHelper.java
@@ -202,13 +202,18 @@ public class VersioningDelegateHelper {
}
@Override
public void onStoreFailed(Exception e) {
inner.onStoreFailed(e);
}
@Override
+ public void onBatchCommitted() {
+ inner.onBatchCommitted();
+ }
+
+ @Override
public RepositorySessionStoreDelegate deferredStoreDelegate(ExecutorService executor) {
return inner.deferredStoreDelegate(executor);
}
}
}
\ No newline at end of file
--- a/mobile/android/services/src/main/java/org/mozilla/gecko/sync/repositories/delegates/DeferredRepositorySessionStoreDelegate.java
+++ b/mobile/android/services/src/main/java/org/mozilla/gecko/sync/repositories/delegates/DeferredRepositorySessionStoreDelegate.java
@@ -69,9 +69,19 @@ public class DeferredRepositorySessionSt
public void onRecordStoreReconciled(final String guid, final String oldGuid, final Integer newVersion) {
executor.execute(new Runnable() {
@Override
public void run() {
inner.onRecordStoreReconciled(guid, oldGuid, newVersion);
}
});
}
+
+ @Override
+ public void onBatchCommitted() {
+ executor.execute(new Runnable() {
+ @Override
+ public void run() {
+ inner.onBatchCommitted();
+ }
+ });
+ }
}
--- a/mobile/android/services/src/main/java/org/mozilla/gecko/sync/repositories/delegates/RepositorySessionStoreDelegate.java
+++ b/mobile/android/services/src/main/java/org/mozilla/gecko/sync/repositories/delegates/RepositorySessionStoreDelegate.java
@@ -20,10 +20,12 @@ public interface RepositorySessionStoreD
// Only makes sense in context of local repositories.
// Further call to onRecordStoreSucceeded is necessary.
void onRecordStoreReconciled(String guid, String oldGuid, Integer newVersion);
// Called with a GUID when store has succeeded.
void onRecordStoreSucceeded(String guid);
void onStoreCompleted();
void onStoreFailed(Exception e);
+ // Only relevant for store batches, and exists to help us record correct telemetry.
+ void onBatchCommitted();
RepositorySessionStoreDelegate deferredStoreDelegate(ExecutorService executor);
}
--- a/mobile/android/services/src/main/java/org/mozilla/gecko/sync/repositories/uploaders/PayloadDispatcher.java
+++ b/mobile/android/services/src/main/java/org/mozilla/gecko/sync/repositories/uploaders/PayloadDispatcher.java
@@ -91,16 +91,20 @@ class PayloadDispatcher {
// If we're not in a batching mode, or just committed a batch, uploaded records have
// been applied to the server storage and are now visible to other clients.
// Therefore, we bump our local "last store" timestamp.
bumpTimestampTo(uploadTimestamp, response.normalizedTimestampForHeader(SyncResponse.X_LAST_MODIFIED));
uploader.setLastStoreTimestamp(uploadTimestamp);
batchWhiteboard.clearSuccessRecordGuids();
}
+ if (isCommit || !batchWhiteboard.getInBatchingMode()) {
+ uploader.sessionStoreDelegate.onBatchCommitted();
+ }
+
// If this was our very last commit, we're done storing records.
// Get Last-Modified timestamp from the response, and pass it upstream.
if (isLastPayload) {
uploader.finished();
}
}
void payloadFailed(Exception e) {
--- a/mobile/android/services/src/main/java/org/mozilla/gecko/sync/stage/ServerSyncStage.java
+++ b/mobile/android/services/src/main/java/org/mozilla/gecko/sync/stage/ServerSyncStage.java
@@ -39,16 +39,17 @@ import org.mozilla.gecko.sync.repositori
import org.mozilla.gecko.sync.synchronizer.ServerLocalSynchronizer;
import org.mozilla.gecko.sync.synchronizer.Synchronizer;
import org.mozilla.gecko.sync.synchronizer.SynchronizerDelegate;
import org.mozilla.gecko.sync.synchronizer.SynchronizerSession;
import org.mozilla.gecko.sync.telemetry.TelemetryCollector;
import java.io.IOException;
import java.net.URISyntaxException;
+import java.util.List;
import java.util.Map;
import java.util.concurrent.ExecutorService;
/**
* Fetch from a server collection into a local repository, encrypting
* and decrypting along the way.
*
* @author rnewman
@@ -627,24 +628,22 @@ public abstract class ServerSyncStage ex
int inboundCount = synchronizerSession.getInboundCount();
int inboundCountStored = synchronizerSession.getInboundCountStored();
int inboundCountFailed = synchronizerSession.getInboundCountFailed();
int inboundCountReconciled = synchronizerSession.getInboundCountReconciled();
int outboundCount = synchronizerSession.getOutboundCount();
int outboundCountStored = synchronizerSession.getOutboundCountStored();
int outboundCountFailed = synchronizerSession.getOutboundCountFailed();
+ telemetryStageCollector.outbound = synchronizerSession.getOutboundBatches();
telemetryStageCollector.finished = stageCompleteTimestamp;
telemetryStageCollector.inbound = inboundCount;
telemetryStageCollector.inboundStored = inboundCountStored;
telemetryStageCollector.inboundFailed = inboundCountFailed;
telemetryStageCollector.reconciled = inboundCountReconciled;
- telemetryStageCollector.outbound = outboundCount;
- telemetryStageCollector.outboundStored = outboundCountStored;
- telemetryStageCollector.outboundFailed = outboundCountFailed;
Logger.info(LOG_TAG, "Stage " + getEngineName()
+ " received " + inboundCount
+ "; stored " + inboundCountStored + ", reconciling " + inboundCountReconciled
+ " and failed to store " + inboundCountFailed
+ ". Sent " + outboundCount
+ "; server accepted " + outboundCountStored + " and rejected " + outboundCountFailed
+ ". Duration: " + getStageDurationString() + ".");
--- a/mobile/android/services/src/main/java/org/mozilla/gecko/sync/synchronizer/RecordsChannel.java
+++ b/mobile/android/services/src/main/java/org/mozilla/gecko/sync/synchronizer/RecordsChannel.java
@@ -2,16 +2,17 @@
* License, v. 2.0. If a copy of the MPL was not distributed with this
* file, You can obtain one at http://mozilla.org/MPL/2.0/. */
package org.mozilla.gecko.sync.synchronizer;
import android.support.annotation.NonNull;
import android.support.annotation.Nullable;
+import java.util.ArrayList;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.atomic.AtomicInteger;
import org.mozilla.gecko.background.common.log.Logger;
import org.mozilla.gecko.sync.ReflowIsNecessaryException;
import org.mozilla.gecko.sync.SyncException;
import org.mozilla.gecko.sync.ThreadPool;
@@ -82,16 +83,18 @@ public class RecordsChannel implements
// attempted = accepted + failed
// reconciled <= accepted <= attempted
// reconciled = accepted - `new`, where `new` is inferred.
private final AtomicInteger storeAttemptedCount = new AtomicInteger();
private final AtomicInteger storeAcceptedCount = new AtomicInteger();
private final AtomicInteger storeFailedCount = new AtomicInteger();
private final AtomicInteger storeReconciledCount = new AtomicInteger();
+ private final StoreBatchTracker storeTracker = new StoreBatchTracker();
+
public RecordsChannel(RepositorySession source, RepositorySession sink, RecordsChannelDelegate delegate) {
this.source = source;
this.sink = sink;
this.delegate = delegate;
}
/*
* We push fetched records into a queue.
@@ -180,16 +183,17 @@ public class RecordsChannel implements
sink.setStoreDelegate(this);
fetchedCount.set(0);
fetchFailedCount.set(0);
storeAttemptedCount.set(0);
storeAcceptedCount.set(0);
storeFailedCount.set(0);
storeReconciledCount.set(0);
+ storeTracker.reset();
// Start a consumer thread.
this.consumer = new ConcurrentRecordConsumer(this);
ThreadPool.run(this.consumer);
waitingForQueueDone = true;
source.fetchModified(this);
}
/**
@@ -208,24 +212,29 @@ public class RecordsChannel implements
}
this.flow();
}
@Override
public void store(Record record) {
storeAttemptedCount.incrementAndGet();
+ storeTracker.onRecordStoreAttempted();
try {
sink.store(record);
} catch (NoStoreDelegateException e) {
Logger.error(LOG_TAG, "Got NoStoreDelegateException in RecordsChannel.store(). This should not occur. Aborting.", e);
delegate.onFlowStoreFailed(this, e, record.guid);
}
}
+ /* package-local */ ArrayList<StoreBatchTracker.Batch> getStoreBatches() {
+ return this.storeTracker.getStoreBatches();
+ }
+
@Override
public void onFetchFailed(Exception ex) {
Logger.warn(LOG_TAG, "onFetchFailed. Calling for immediate stop.", ex);
fetchFailedCount.incrementAndGet();
if (ex instanceof ReflowIsNecessaryException) {
setReflowException((ReflowIsNecessaryException) ex);
}
delegate.onFlowFetchFailed(this, ex);
@@ -246,29 +255,37 @@ public class RecordsChannel implements
this.consumer.queueFilled();
}
@Override
public void onBatchCompleted() {
this.sink.storeFlush();
}
+ // Sent for "store" batches.
+ @Override
+ public void onBatchCommitted() {
+ storeTracker.onBatchFinished();
+ }
+
@Override
public void onRecordStoreFailed(Exception ex, String recordGuid) {
Logger.trace(LOG_TAG, "Failed to store record with guid " + recordGuid);
storeFailedCount.incrementAndGet();
+ storeTracker.onRecordStoreFailed();
this.consumer.stored();
delegate.onFlowStoreFailed(this, ex, recordGuid);
// TODO: abort?
}
@Override
public void onRecordStoreSucceeded(String guid) {
Logger.trace(LOG_TAG, "Stored record with guid " + guid);
storeAcceptedCount.incrementAndGet();
+ storeTracker.onRecordStoreSucceeded();
this.consumer.stored();
}
@Override
public void onRecordStoreReconciled(String guid, String oldGuid, Integer newVersion) {
Logger.trace(LOG_TAG, "Reconciled record with guid " + guid);
storeReconciledCount.incrementAndGet();
}
@@ -311,16 +328,18 @@ public class RecordsChannel implements
@Override
public void onStoreFailed(Exception ex) {
Logger.warn(LOG_TAG, "onStoreFailed. Calling for immediate stop.", ex);
if (ex instanceof ReflowIsNecessaryException) {
setReflowException((ReflowIsNecessaryException) ex);
}
+ storeTracker.onBatchFailed();
+
// NB: consumer might or might not be running at this point. There are two cases here:
// 1) If we're storing records remotely, we might fail due to a 412.
// -- we might hit 412 at any point, so consumer might be in either state.
// Action: ignore consumer state, we have nothing else to do other to inform our delegate
// that we're done with this flow. Based on the reflow exception, it'll determine what to do.
// 2) If we're storing (merging) records locally, we might fail due to a sync deadline.
// -- we might hit a deadline only prior to attempting to merge records,
new file mode 100644
--- /dev/null
+++ b/mobile/android/services/src/main/java/org/mozilla/gecko/sync/synchronizer/StoreBatchTracker.java
@@ -0,0 +1,85 @@
+/* This Source Code Form is subject to the terms of the Mozilla Public
+ * License, v. 2.0. If a copy of the MPL was not distributed with this
+ * file, You can obtain one at http://mozilla.org/MPL/2.0/. */
+
+package org.mozilla.gecko.sync.synchronizer;
+
+import java.util.ArrayList;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.atomic.AtomicInteger;
+
+/**
+ * This class encapsulates most of the logic for recording telemetry information about outgoing
+ * batches.
+ */
+public class StoreBatchTracker {
+ private final AtomicInteger currentStoreBatchAttempted = new AtomicInteger();
+ private final AtomicInteger currentStoreBatchAccepted = new AtomicInteger();
+ private final AtomicInteger currentStoreBatchFailed = new AtomicInteger();
+ private final ConcurrentLinkedQueue<Batch> completedBatches = new ConcurrentLinkedQueue<>();
+
+ public static class Batch {
+ public final int sent;
+ public final int failed;
+ public Batch(final int sent, final int failed) {
+ this.sent = sent;
+ this.failed = failed;
+ }
+ }
+
+ /* package-local */ void reset() {
+ currentStoreBatchFailed.set(0);
+ currentStoreBatchAccepted.set(0);
+ currentStoreBatchAttempted.set(0);
+ completedBatches.clear();
+ }
+
+ private boolean haveUncommittedBatchData() {
+ return currentStoreBatchAttempted.get() != 0 ||
+ currentStoreBatchAccepted.get() != 0 ||
+ currentStoreBatchFailed.get() != 0;
+ }
+
+ /* package-local */ void onBatchFinished() {
+ // Not actually thread safe, but should be good enough for telemetry info. (Especially
+ // since we shouldn't be completing batches for the same channel on more than one thread)
+ final int sent = currentStoreBatchAttempted.getAndSet(0);
+ final int knownFailed = currentStoreBatchFailed.getAndSet(0);
+ final int knownSucceeded = currentStoreBatchAccepted.getAndSet(0);
+
+ // These might be different if we "forced" a failure due to an error unrelated to uploading
+ // a record.
+ final int failed = Math.max(knownFailed, sent - knownSucceeded);
+
+ completedBatches.add(new Batch(sent, failed));
+ }
+
+ /* package-local */ void onBatchFailed() {
+ if (currentStoreBatchFailed.get() == 0 && currentStoreBatchAccepted.get() == currentStoreBatchAttempted.get()) {
+ // Force the failure. It's unclear if this ever can happen for cases where we care about what
+ // is inside completedBatches (cases where we're uploading).
+ currentStoreBatchFailed.incrementAndGet();
+ }
+ onBatchFinished();
+ }
+
+ /* package-local */ void onRecordStoreFailed() {
+ currentStoreBatchFailed.incrementAndGet();
+ }
+
+ /* package-local */ void onRecordStoreSucceeded() {
+ currentStoreBatchAccepted.incrementAndGet();
+ }
+
+ /* package-local */ void onRecordStoreAttempted() {
+ currentStoreBatchAttempted.incrementAndGet();
+ }
+
+ // Note that this finishes the current batch (if any exists).
+ /* package-local */ ArrayList<Batch> getStoreBatches() {
+ if (haveUncommittedBatchData()) {
+ onBatchFinished();
+ }
+ return new ArrayList<>(completedBatches);
+ }
+}
--- a/mobile/android/services/src/main/java/org/mozilla/gecko/sync/synchronizer/SynchronizerSession.java
+++ b/mobile/android/services/src/main/java/org/mozilla/gecko/sync/synchronizer/SynchronizerSession.java
@@ -1,20 +1,24 @@
/* This Source Code Form is subject to the terms of the Mozilla Public
* License, v. 2.0. If a copy of the MPL was not distributed with this
* file, You can obtain one at http://mozilla.org/MPL/2.0/. */
package org.mozilla.gecko.sync.synchronizer;
+import java.util.ArrayList;
+import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicReference;
import org.mozilla.gecko.background.common.log.Logger;
import org.mozilla.gecko.sync.SyncException;
+import org.mozilla.gecko.sync.synchronizer.StoreBatchTracker.Batch;
import org.mozilla.gecko.sync.repositories.InactiveSessionException;
import org.mozilla.gecko.sync.repositories.InvalidSessionTransitionException;
import org.mozilla.gecko.sync.repositories.RepositorySession;
import org.mozilla.gecko.sync.repositories.RepositorySessionBundle;
import org.mozilla.gecko.sync.repositories.delegates.DeferredRepositorySessionFinishDelegate;
import org.mozilla.gecko.sync.repositories.delegates.RepositorySessionFinishDelegate;
import android.content.Context;
@@ -76,16 +80,19 @@ public class SynchronizerSession impleme
private final AtomicInteger numInboundRecords = new AtomicInteger(-1);
private final AtomicInteger numInboundRecordsStored = new AtomicInteger(-1);
private final AtomicInteger numInboundRecordsFailed = new AtomicInteger(-1);
private final AtomicInteger numInboundRecordsReconciled = new AtomicInteger(-1);
private final AtomicInteger numOutboundRecords = new AtomicInteger(-1);
private final AtomicInteger numOutboundRecordsStored = new AtomicInteger(-1);
private final AtomicInteger numOutboundRecordsFailed = new AtomicInteger(-1);
+ // Doesn't need to be ConcurrentLinkedQueue or anything like that since we don't do partial
+ // changes to it.
+ private final AtomicReference<List<Batch>> outboundBatches = new AtomicReference<>();
private Exception fetchFailedCauseException;
private Exception storeFailedCauseException;
/*
* Public API: constructor, init, synchronize.
*/
public SynchronizerSession(Synchronizer synchronizer, SynchronizerSessionDelegate delegate) {
@@ -179,16 +186,21 @@ public class SynchronizerSession impleme
public int getInboundCountFailed() {
return numInboundRecordsFailed.get();
}
public int getInboundCountReconciled() {
return numInboundRecordsReconciled.get();
}
+ // Returns outboundBatches.
+ public List<Batch> getOutboundBatches() {
+ return outboundBatches.get();
+ }
+
/**
* Get the number of records fetched from the second repository (usually the
* local store, hence outbound).
* <p>
* Valid only after second flow has completed.
*
* @return number of records, or -1 if not valid.
*/
@@ -223,16 +235,17 @@ public class SynchronizerSession impleme
private synchronized void synchronize() {
numInboundRecords.set(-1);
numInboundRecordsStored.set(-1);
numInboundRecordsFailed.set(-1);
numInboundRecordsReconciled.set(-1);
numOutboundRecords.set(-1);
numOutboundRecordsStored.set(-1);
numOutboundRecordsFailed.set(-1);
+ outboundBatches.set(null);
// First thing: decide whether we should.
if (sessionA.shouldSkip() ||
sessionB.shouldSkip()) {
Logger.info(LOG_TAG, "Session requested skip. Short-circuiting sync.");
sessionA.abort();
sessionB.abort();
this.delegate.onSynchronizeSkipped(this);
@@ -315,16 +328,17 @@ public class SynchronizerSession impleme
public void onSecondFlowCompleted(RecordsChannel recordsChannel) {
Logger.trace(LOG_TAG, "Second RecordsChannel onFlowCompleted.");
pendingBTimestamp = sessionB.getLastFetchTimestamp();
storeEndATimestamp = sessionA.getLastStoreTimestamp();
Logger.debug(LOG_TAG, "Fetch end is " + pendingBTimestamp + ". Store end is " + storeEndATimestamp + ". Finishing.");
numOutboundRecords.set(recordsChannel.getFetchCount());
numOutboundRecordsStored.set(recordsChannel.getStoreAcceptedCount());
numOutboundRecordsFailed.set(recordsChannel.getStoreFailureCount());
+ outboundBatches.set(recordsChannel.getStoreBatches());
flowBToACompleted = true;
// Finish the two sessions.
try {
this.sessionA.finish(this);
} catch (InactiveSessionException e) {
this.onFinishFailed(e);
return;
--- a/mobile/android/services/src/main/java/org/mozilla/gecko/sync/telemetry/TelemetryStageCollector.java
+++ b/mobile/android/services/src/main/java/org/mozilla/gecko/sync/telemetry/TelemetryStageCollector.java
@@ -1,37 +1,38 @@
/* This Source Code Form is subject to the terms of the Mozilla Public
* License, v. 2.0. If a copy of the MPL was not distributed with this
* file, You can obtain one at http://mozilla.org/MPL/2.0/. */
package org.mozilla.gecko.sync.telemetry;
import org.mozilla.gecko.sync.ExtendedJSONObject;
+import org.mozilla.gecko.sync.synchronizer.StoreBatchTracker;
+
+import java.util.List;
/**
* Gathers telemetry details about an individual sync stage.
* Implementation note: there are no getters/setters to avoid unnecessary verboseness.
* This data expected to be write-only from within SyncStages, and read-only from TelemetryCollector.
* Although there shouldn't be concurrent access, it's possible that we'll be reading/writing these
* values from different threads - hence `volatile` to ensure visibility.
*/
public class TelemetryStageCollector {
private final TelemetryCollector syncCollector;
public volatile long started = 0L;
public volatile long finished = 0L;
public volatile int inbound = 0;
public volatile int inboundStored = 0;
public volatile int inboundFailed = 0;
- public volatile int outbound = 0;
- public volatile int outboundStored = 0;
- public volatile int outboundFailed = 0;
public volatile int reconciled = 0;
public volatile ExtendedJSONObject error = null;
public volatile ExtendedJSONObject validation = null;
+ public volatile List<StoreBatchTracker.Batch> outbound = null;
public TelemetryStageCollector(TelemetryCollector syncCollector) {
this.syncCollector = syncCollector;
}
public TelemetryCollector getSyncCollector() {
return this.syncCollector;
}
--- a/mobile/android/services/src/test/java/org/mozilla/android/sync/test/helpers/ExpectSuccessRepositorySessionStoreDelegate.java
+++ b/mobile/android/services/src/test/java/org/mozilla/android/sync/test/helpers/ExpectSuccessRepositorySessionStoreDelegate.java
@@ -39,12 +39,17 @@ public class ExpectSuccessRepositorySess
}
@Override
public void onRecordStoreReconciled(String guid, String oldGuid, Integer newVersion) {
log("Store reconciled record " + guid);
}
@Override
+ public void onBatchCommitted() {
+ log("Batch committed");
+ }
+
+ @Override
public RepositorySessionStoreDelegate deferredStoreDelegate(ExecutorService executor) {
return this;
}
}
--- a/mobile/android/services/src/test/java/org/mozilla/gecko/sync/repositories/uploaders/BatchingUploaderTest.java
+++ b/mobile/android/services/src/test/java/org/mozilla/gecko/sync/repositories/uploaders/BatchingUploaderTest.java
@@ -155,16 +155,20 @@ public class BatchingUploaderTest {
lastStoreFailedException = e;
}
@Override
public void onRecordStoreReconciled(String guid, String oldGuid, Integer newVersion) {
}
@Override
+ public void onBatchCommitted() {
+ }
+
+ @Override
public RepositorySessionStoreDelegate deferredStoreDelegate(ExecutorService executor) {
return this;
}
}
private ExecutorService workQueue;
private RepositorySessionStoreDelegate storeDelegate;
--- a/mobile/android/services/src/test/java/org/mozilla/gecko/sync/repositories/uploaders/PayloadUploadDelegateTest.java
+++ b/mobile/android/services/src/test/java/org/mozilla/gecko/sync/repositories/uploaders/PayloadUploadDelegateTest.java
@@ -117,16 +117,20 @@ public class PayloadUploadDelegateTest {
storeFailedException = e;
}
@Override
public void onRecordStoreReconciled(String guid, String oldGuid, Integer newVersion) {
}
@Override
+ public void onBatchCommitted() {
+ }
+
+ @Override
public RepositorySessionStoreDelegate deferredStoreDelegate(ExecutorService executor) {
return this;
}
}
@Before
public void setUp() throws Exception {
sessionStoreDelegate = spy(new MockRepositorySessionStoreDelegate());
new file mode 100644
--- /dev/null
+++ b/mobile/android/services/src/test/java/org/mozilla/gecko/sync/synchronizer/StoreBatchTrackerTest.java
@@ -0,0 +1,124 @@
+/* Any copyright is dedicated to the Public Domain.
+ http://creativecommons.org/publicdomain/zero/1.0/ */
+
+package org.mozilla.gecko.sync.synchronizer;
+
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mozilla.gecko.background.testhelpers.TestRunner;
+import org.mozilla.gecko.sync.synchronizer.StoreBatchTracker.Batch;
+
+import java.util.ArrayList;
+
+import static org.junit.Assert.*;
+
+@RunWith(TestRunner.class)
+public class StoreBatchTrackerTest {
+ private StoreBatchTracker tracker;
+
+ @Before
+ public void setUp() throws Exception {
+ tracker = new StoreBatchTracker();
+ }
+
+ private void recordCounts(int attempted, int succeeded, int failed) {
+ for (int i = 0; i < attempted; ++i) {
+ tracker.onRecordStoreAttempted();
+ }
+ for (int i = 0; i < succeeded; ++i) {
+ tracker.onRecordStoreSucceeded();
+ }
+ for (int i = 0; i < failed; ++i) {
+ tracker.onRecordStoreFailed();
+ }
+ }
+
+ @Test
+ public void testSingleBatch() {
+ {
+ recordCounts(3, 3, 0);
+ tracker.onBatchFinished();
+ ArrayList<Batch> batches = tracker.getStoreBatches();
+ assertEquals(batches.size(), 1);
+ assertEquals(batches.get(0).failed, 0);
+ assertEquals(batches.get(0).sent, 3);
+ tracker.reset();
+ }
+ {
+ recordCounts(3, 2, 1);
+ // Don't bother calling onBatchFinished, to ensure we finish it automatically.
+ ArrayList<Batch> batches = tracker.getStoreBatches();
+ assertEquals(batches.size(), 1);
+ assertEquals(batches.get(0).failed, 1);
+ assertEquals(batches.get(0).sent, 3);
+ tracker.reset();
+ }
+ {
+ recordCounts(3, 0, 3);
+ tracker.onBatchFinished();
+ ArrayList<Batch> batches = tracker.getStoreBatches();
+ assertEquals(batches.size(), 1);
+ assertEquals(batches.get(0).failed, 3);
+ assertEquals(batches.get(0).sent, 3);
+ tracker.reset();
+ }
+ }
+
+ @Test
+ public void testBatchFail() {
+ recordCounts(3, 3, 0);
+ tracker.onBatchFailed();
+ ArrayList<Batch> batches = tracker.getStoreBatches();
+ assertEquals(batches.size(), 1);
+ // The important thing is that there's a non-zero number of failed here.
+ assertEquals(batches.get(0).failed, 1);
+ assertEquals(batches.get(0).sent, 3);
+ }
+
+ @Test
+ public void testMultipleBatches() {
+ recordCounts(3, 3, 0);
+ tracker.onBatchFinished();
+ recordCounts(8, 8, 0);
+ tracker.onBatchFinished();
+ recordCounts(5, 5, 0);
+ tracker.onBatchFinished();
+ // Fake an empty "commit" POST.
+ tracker.onBatchFinished();
+ ArrayList<Batch> batches = tracker.getStoreBatches();
+
+ assertEquals(batches.size(), 4);
+
+ assertEquals(batches.get(0).failed, 0);
+ assertEquals(batches.get(0).sent, 3);
+
+ assertEquals(batches.get(1).failed, 0);
+ assertEquals(batches.get(1).sent, 8);
+
+ assertEquals(batches.get(2).failed, 0);
+ assertEquals(batches.get(2).sent, 5);
+
+ assertEquals(batches.get(3).failed, 0);
+ assertEquals(batches.get(3).sent, 0);
+ }
+
+ @Test
+ public void testDroppedStores() {
+ recordCounts(3, 1, 0);
+ tracker.onBatchFinished();
+ ArrayList<Batch> batches = tracker.getStoreBatches();
+ assertEquals(batches.size(), 1);
+ assertEquals(batches.get(0).failed, 2);
+ assertEquals(batches.get(0).sent, 3);
+ }
+
+ @Test
+ public void testReset() {
+ assertEquals(tracker.getStoreBatches().size(), 0);
+ recordCounts(2, 1, 1);
+ assertEquals(tracker.getStoreBatches().size(), 1);
+ tracker.reset();
+ assertEquals(tracker.getStoreBatches().size(), 0);
+ }
+}
\ No newline at end of file