Bug 1267919 - Part 3. Implement initial sync telemetry recording code. r?Dexter,markh draft
authorThom Chiovoloni <tchiovoloni@mozilla.com>
Mon, 11 Jul 2016 13:37:23 -0400
changeset 386355 08cf76891929399ca4f0e69c59e663b14f383a17
parent 386352 67b059ed58ea63a16fa8e24f2a117ca25eb58017
child 525099 7fce17d780b9b4b204a14cedcf60626169d795cc
push id22690
push userbmo:tchiovoloni@mozilla.com
push dateMon, 11 Jul 2016 19:01:54 +0000
reviewersDexter, markh
bugs1267919
milestone50.0a1
Bug 1267919 - Part 3. Implement initial sync telemetry recording code. r?Dexter,markh MozReview-Commit-ID: 3hATEQVQjao
services/sync/modules-testing/utils.js
services/sync/modules/browserid_identity.js
services/sync/modules/engines.js
services/sync/modules/service.js
services/sync/modules/telemetry.js
services/sync/moz.build
services/sync/tests/unit/head_helpers.js
services/sync/tests/unit/test_bookmark_engine.js
services/sync/tests/unit/test_bookmark_smart_bookmarks.js
services/sync/tests/unit/test_collections_recovery.js
services/sync/tests/unit/test_corrupt_keys.js
services/sync/tests/unit/test_engine.js
services/sync/tests/unit/test_errorhandler.js
services/sync/tests/unit/test_errorhandler_filelog.js
services/sync/tests/unit/test_errorhandler_sync_checkServerError.js
services/sync/tests/unit/test_hmac_error.js
services/sync/tests/unit/test_node_reassignment.js
services/sync/tests/unit/test_service_startup.js
services/sync/tests/unit/test_service_sync_locked.js
services/sync/tests/unit/test_service_sync_remoteSetup.js
services/sync/tests/unit/test_service_sync_specified.js
services/sync/tests/unit/test_service_sync_updateEnabledEngines.js
services/sync/tests/unit/test_syncengine_sync.js
services/sync/tests/unit/test_syncscheduler.js
services/sync/tests/unit/test_telemetry.js
services/sync/tests/unit/xpcshell.ini
--- a/services/sync/modules-testing/utils.js
+++ b/services/sync/modules-testing/utils.js
@@ -140,17 +140,17 @@ this.makeIdentityConfig = function(overr
     // fxaccount specific credentials.
     fxaccount: {
       user: {
         assertion: 'assertion',
         email: 'email',
         kA: 'kA',
         kB: 'kB',
         sessionToken: 'sessionToken',
-        uid: 'user_uid',
+        uid: "a".repeat(32),
         verified: true,
       },
       token: {
         endpoint: null,
         duration: 300,
         id: "id",
         key: "key",
         // uid will be set to the username.
--- a/services/sync/modules/browserid_identity.js
+++ b/services/sync/modules/browserid_identity.js
@@ -1,15 +1,15 @@
 /* This Source Code Form is subject to the terms of the Mozilla Public
  * License, v. 2.0. If a copy of the MPL was not distributed with this file,
  * You can obtain one at http://mozilla.org/MPL/2.0/. */
 
 "use strict";
 
-this.EXPORTED_SYMBOLS = ["BrowserIDManager"];
+this.EXPORTED_SYMBOLS = ["BrowserIDManager", "AuthenticationError"];
 
 var {classes: Cc, interfaces: Ci, utils: Cu, results: Cr} = Components;
 
 Cu.import("resource://gre/modules/Log.jsm");
 Cu.import("resource://services-common/async.js");
 Cu.import("resource://services-common/utils.js");
 Cu.import("resource://services-common/tokenserverclient.js");
 Cu.import("resource://services-crypto/utils.js");
@@ -60,18 +60,19 @@ function deriveKeyBundle(kB) {
 
 /*
   General authentication error for abstracting authentication
   errors from multiple sources (e.g., from FxAccounts, TokenServer).
   details is additional details about the error - it might be a string, or
   some other error object (which should do the right thing when toString() is
   called on it)
 */
-function AuthenticationError(details) {
+function AuthenticationError(details, source) {
   this.details = details;
+  this.source = source;
 }
 
 AuthenticationError.prototype = {
   toString: function() {
     return "AuthenticationError(" + this.details + ")";
   }
 }
 
@@ -107,16 +108,24 @@ this.BrowserIDManager.prototype = {
   get needsCustomization() {
     try {
       return Services.prefs.getBoolPref(PREF_SYNC_SHOW_CUSTOMIZATION);
     } catch (e) {
       return false;
     }
   },
 
+  // Get the FxA UID. Throws if there is no signed in user
+  userUID() {
+    if (!this._signedInUser) {
+      throw new Error("userUID(): No signed in user");
+    }
+    return this._signedInUser.uid;
+  },
+
   initialize: function() {
     for (let topic of OBSERVER_TOPICS) {
       Services.obs.addObserver(this, topic, false);
     }
     // and a background fetch of account data just so we can set this.account,
     // so we have a username available before we've actually done a login.
     // XXX - this is actually a hack just for tests and really shouldn't be
     // necessary. Also, you'd think it would be safe to allow this.account to
@@ -620,23 +629,23 @@ this.BrowserIDManager.prototype = {
         }
         return token;
       })
       .catch(err => {
         // TODO: unify these errors - we need to handle errors thrown by
         // both tokenserverclient and hawkclient.
         // A tokenserver error thrown based on a bad response.
         if (err.response && err.response.status === 401) {
-          err = new AuthenticationError(err);
+          err = new AuthenticationError(err, "tokenserver");
         // A hawkclient error.
         } else if (err.code && err.code === 401) {
-          err = new AuthenticationError(err);
+          err = new AuthenticationError(err, "hawkclient");
         // An FxAccounts.jsm error.
         } else if (err.message == fxAccountsCommon.ERROR_AUTH_ERROR) {
-          err = new AuthenticationError(err);
+          err = new AuthenticationError(err, "fxaccounts");
         }
 
         // TODO: write tests to make sure that different auth error cases are handled here
         // properly: auth error getting assertion, auth error getting token (invalid generation
         // and client-state error)
         if (err instanceof AuthenticationError) {
           this._log.error("Authentication error in _fetchTokenForUser", err);
           // set it to the "fatal" LOGIN_FAILED_LOGIN_REJECTED reason.
--- a/services/sync/modules/engines.js
+++ b/services/sync/modules/engines.js
@@ -1448,31 +1448,34 @@ SyncEngine.prototype = {
   _uploadOutgoing: function () {
     this._log.trace("Uploading local changes to server.");
 
     let modifiedIDs = Object.keys(this._modified);
     if (modifiedIDs.length) {
       this._log.trace("Preparing " + modifiedIDs.length +
                       " outgoing records");
 
+      let counts = { sent: modifiedIDs.length, failed: 0 };
+
       // collection we'll upload
       let up = new Collection(this.engineURL, null, this.service);
       let handleResponse = resp => {
         if (!resp.success) {
           this._log.debug("Uploading records failed: " + resp);
           resp.failureCode = ENGINE_UPLOAD_FAIL;
           throw resp;
         }
 
         // Update server timestamp from the upload.
         let modified = resp.headers["x-weave-timestamp"];
         if (modified > this.lastSync)
           this.lastSync = modified;
 
         let failed_ids = Object.keys(resp.obj.failed);
+        counts.failed += failed_ids.length;
         if (failed_ids.length)
           this._log.debug("Records that will be uploaded again because "
                           + "the server couldn't store them: "
                           + failed_ids.join(", "));
 
         // Clear successfully uploaded objects.
         for (let key in resp.obj.success) {
           let id = resp.obj.success[key];
@@ -1499,16 +1502,17 @@ SyncEngine.prototype = {
           this._log.warn("Error creating record", ex);
         }
         if (ok) {
           postQueue.enqueue(out);
         }
         this._store._sleep(0);
       }
       postQueue.flush();
+      Observers.notify("weave:engine:sync:uploaded", counts, this.name);
     }
   },
 
   // Any cleanup necessary.
   // Save the current snapshot so as to calculate changes at next sync
   _syncFinish: function () {
     this._log.trace("Finishing up sync");
     this._tracker.resetScore();
--- a/services/sync/modules/service.js
+++ b/services/sync/modules/service.js
@@ -27,16 +27,17 @@ Cu.import("resource://services-sync/engi
 Cu.import("resource://services-sync/identity.js");
 Cu.import("resource://services-sync/policies.js");
 Cu.import("resource://services-sync/record.js");
 Cu.import("resource://services-sync/resource.js");
 Cu.import("resource://services-sync/rest.js");
 Cu.import("resource://services-sync/stages/enginesync.js");
 Cu.import("resource://services-sync/stages/declined.js");
 Cu.import("resource://services-sync/status.js");
+Cu.import("resource://services-sync/telemetry.js");
 Cu.import("resource://services-sync/userapi.js");
 Cu.import("resource://services-sync/util.js");
 
 const ENGINE_MODULES = {
   Addons: "addons.js",
   Bookmarks: "bookmarks.js",
   Form: "forms.js",
   History: "history.js",
@@ -543,17 +544,18 @@ Sync11Service.prototype = {
     } catch (ex) {
       this.errorHandler.checkServerError(ex);
       throw ex;
     }
 
     // Always check for errors; this is also where we look for X-Weave-Alert.
     this.errorHandler.checkServerError(info);
     if (!info.success) {
-      throw "Aborting sync: failed to get collections.";
+      this._log.error("Aborting sync: failed to get collections.")
+      throw info;
     }
     return info;
   },
 
   verifyAndFetchSymmetricKeys: function verifyAndFetchSymmetricKeys(infoResponse) {
 
     this._log.debug("Fetching and verifying -- or generating -- symmetric keys.");
 
new file mode 100644
--- /dev/null
+++ b/services/sync/modules/telemetry.js
@@ -0,0 +1,432 @@
+/* This Source Code Form is subject to the terms of the Mozilla Public
+ * License, v. 2.0. If a copy of the MPL was not distributed with this file,
+ * You can obtain one at http://mozilla.org/MPL/2.0/. */
+
+"use strict";
+
+const {classes: Cc, interfaces: Ci, utils: Cu, results: Cr} = Components;
+
+this.EXPORTED_SYMBOLS = ["SyncTelemetry"];
+
+Cu.import("resource://services-sync/browserid_identity.js");
+Cu.import("resource://services-sync/main.js");
+Cu.import("resource://services-sync/status.js");
+Cu.import("resource://services-sync/util.js");
+Cu.import("resource://services-common/observers.js");
+Cu.import("resource://services-common/async.js");
+Cu.import("resource://gre/modules/Log.jsm");
+Cu.import("resource://gre/modules/TelemetryController.jsm");
+Cu.import("resource://gre/modules/FxAccounts.jsm");
+Cu.import("resource://gre/modules/XPCOMUtils.jsm");
+
+let constants = {};
+Cu.import("resource://services-sync/constants.js", constants);
+
+var fxAccountsCommon = {};
+Cu.import("resource://gre/modules/FxAccountsCommon.js", fxAccountsCommon);
+
+XPCOMUtils.defineLazyServiceGetter(this, "Telemetry",
+                                   "@mozilla.org/base/telemetry;1",
+                                   "nsITelemetry");
+
+const log = Log.repository.getLogger("Sync.Telemetry");
+
+const TOPICS = [
+  "xpcom-shutdown",
+  "weave:service:sync:start",
+  "weave:service:sync:finish",
+  "weave:service:sync:error",
+
+  "weave:engine:sync:start",
+  "weave:engine:sync:finish",
+  "weave:engine:sync:error",
+  "weave:engine:sync:applied",
+  "weave:engine:sync:uploaded",
+];
+
+const PING_FORMAT_VERSION = 1;
+
+// The set of engines we record telemetry for - any other engines are ignored.
+const ENGINES = new Set(["addons", "bookmarks", "clients", "forms", "history",
+                         "passwords", "prefs", "tabs"]);
+
+// is it a wrapped auth error from browserid_identity?
+function isBrowerIdAuthError(error) {
+  // I can't think of what could throw on String conversion
+  // but we have absolutely no clue about the type, and
+  // there's probably some things out there that would
+  try {
+    if (String(error).startsWith("AuthenticationError")) {
+      return true;
+    }
+  } catch (e) {}
+  return false;
+}
+
+function transformError(error, engineName) {
+  if (Async.isShutdownException(error)) {
+    return { name: "shutdownerror" };
+  }
+
+  if (typeof error === "string") {
+    if (error.startsWith("error.")) {
+      // This is hacky, but I can't imagine that it's not also accurate.
+      return { name: "othererror", error };
+    }
+
+    return { name: "unexpectederror", error };
+  }
+
+  if (error.failureCode) {
+    return { name: "othererror", error: error.failureCode };
+  }
+
+  if (error instanceof AuthenticationError) {
+    return { name: "autherror", from: error.source };
+  }
+
+  let httpCode = error.status ||
+    (error.response && error.response.status) ||
+    error.code;
+
+  if (httpCode) {
+    return { name: "httperror", code: httpCode };
+  }
+
+  if (error.result) {
+    return { name: "nserror", code: error.result };
+  }
+
+  return {
+    name: "unexpectederror",
+    error: String(error),
+  }
+}
+
+function tryGetMonotonicTimestamp() {
+  try {
+    return Telemetry.msSinceProcessStart();
+  } catch (e) {
+    log.warn("Unable to get a monotonic timestamp!");
+    return -1;
+  }
+}
+
+function timeDeltaFrom(monotonicStartTime) {
+  let now = tryGetMonotonicTimestamp();
+  if (monotonicStartTime !== -1 && now !== -1) {
+    return Math.round(now - monotonicStartTime);
+  }
+  return -1;
+}
+
+class EngineRecord {
+  constructor(name) {
+    // startTime is in ms from process start, but is monotonic (unlike Date.now())
+    // so we need to keep both it and when.
+    this.startTime = tryGetMonotonicTimestamp();
+    this.name = name;
+  }
+
+  toJSON() {
+    let result = Object.assign({}, this);
+    delete result.startTime;
+    return result;
+  }
+
+  finished(error) {
+    let took = timeDeltaFrom(this.startTime);
+    if (took > 0) {
+      this.took = took;
+    }
+    if (error) {
+      this.failureReason = transformError(error, this.name);
+    }
+  }
+
+  recordApplied(counts) {
+    if (this.incoming) {
+      log.error(`Incoming records applied multiple times for engine ${this.name}!`);
+      return;
+    }
+    if (this.name === "clients" && !counts.failed) {
+      // ignore successful application of client records
+      // since otherwise they show up every time and are meaningless.
+      return;
+    }
+
+    let incomingData = {};
+    let properties = ["applied", "failed", "newFailed", "reconciled"];
+    // Only record non-zero properties and only record incoming at all if
+    // there's at least one property we care about.
+    for (let property of properties) {
+      if (counts[property]) {
+        incomingData[property] = counts[property];
+        this.incoming = incomingData;
+      }
+    }
+  }
+
+  recordUploaded(counts) {
+    if (counts.sent || counts.failed) {
+      if (!this.outgoing) {
+        this.outgoing = [];
+      }
+      this.outgoing.push({
+        sent: counts.sent || undefined,
+        failed: counts.failed || undefined,
+      });
+    }
+  }
+}
+
+class TelemetryRecord {
+  constructor(allowedEngines) {
+    this.allowedEngines = allowedEngines;
+    // Our failure reason. This property only exists in the generated ping if an
+    // error actually occurred.
+    this.failureReason = undefined;
+    this.uid = "";
+    this.when = Date.now();
+    this.startTime = tryGetMonotonicTimestamp();
+    this.took = 0; // will be set later.
+
+    // All engines that have finished (ie, does not include the "current" one)
+    // We omit this from the ping if it's empty.
+    this.engines = [];
+    // The engine that has started but not yet stopped.
+    this.currentEngine = null;
+  }
+
+  // toJSON returns the actual payload we will submit.
+  toJSON() {
+    let result = {
+      when: this.when,
+      uid: this.uid,
+      took: this.took,
+      version: PING_FORMAT_VERSION,
+      failureReason: this.failureReason,
+      status: this.status,
+    };
+    let engines = [];
+    for (let engine of this.engines) {
+      engines.push(engine.toJSON());
+    }
+    if (engines.length > 0) {
+      result.engines = engines;
+    }
+    return result;
+  }
+
+  finished(error) {
+    this.took = timeDeltaFrom(this.startTime);
+    if (this.currentEngine != null) {
+      log.error("Finished called for the sync before the current engine finished");
+      this.currentEngine.finished(null);
+      this.onEngineStop(this.currentEngine.name);
+    }
+    if (error) {
+      this.failureReason = transformError(error);
+    }
+
+    try {
+      this.uid = Weave.Service.identity.userUID();
+    } catch (e) {
+      this.uid = "0".repeat(32);
+    }
+
+    // Check for engine statuses. -- We do this now, and not in engine.finished
+    // to make sure any statuses that get set "late" are recorded
+    for (let engine of this.engines) {
+      let status = Status.engines[engine.name];
+      if (status && status !== constants.ENGINE_SUCCEEDED) {
+        engine.status = status;
+      }
+    }
+
+    let statusObject = {};
+
+    let serviceStatus = Status.service;
+    if (serviceStatus && serviceStatus !== constants.STATUS_OK) {
+      statusObject.service = serviceStatus;
+      this.status = statusObject;
+    }
+    let syncStatus = Status.sync;
+    if (syncStatus && syncStatus !== constants.SYNC_SUCCEEDED) {
+      statusObject.sync = syncStatus;
+      this.status = statusObject;
+    }
+  }
+
+  onEngineStart(engineName) {
+    if (this._shouldIgnoreEngine(engineName, false)) {
+      return;
+    }
+
+    if (this.currentEngine) {
+      log.error(`Being told that engine ${engineName} has started, but current engine ${
+        this.currentEngine.name} hasn't stopped`);
+      // Just discard the current engine rather than making up data for it.
+    }
+    this.currentEngine = new EngineRecord(engineName);
+  }
+
+  onEngineStop(engineName, error) {
+    if (error && !this.currentEngine) {
+      log.error(`Error triggered on ${engineName} when no current engine exists: ${error}`);
+      // It's possible for us to get an error before the start message of an engine
+      // (somehow), in which case we still want to record that error.
+      this.currentEngine = new EngineRecord(engineName);
+    } else if (!this.currentEngine || (engineName && this._shouldIgnoreEngine(engineName, true))) {
+      return;
+    }
+    this.currentEngine.finished(error);
+    this.engines.push(this.currentEngine);
+    this.currentEngine = null;
+  }
+
+  onEngineApplied(engineName, counts) {
+    if (this._shouldIgnoreEngine(engineName)) {
+      return;
+    }
+    this.currentEngine.recordApplied(counts);
+  }
+
+  onEngineUploaded(engineName, counts) {
+    if (this._shouldIgnoreEngine(engineName)) {
+      return;
+    }
+    this.currentEngine.recordUploaded(counts);
+  }
+
+  _shouldIgnoreEngine(engineName, shouldBeCurrent = true) {
+    if (!this.allowedEngines.has(engineName)) {
+      log.info(`Notification for engine ${engineName}, but we aren't recording telemetry for it`);
+      return true;
+    }
+    if (shouldBeCurrent) {
+      if (!this.currentEngine || engineName != this.currentEngine.name) {
+        log.error(`Notification for engine ${engineName} but it isn't current`);
+        return true;
+      }
+    }
+    return false;
+  }
+}
+
+class SyncTelemetryImpl {
+  constructor(allowedEngines) {
+    log.level = Log.Level[Svc.Prefs.get("log.logger.telemetry", "Trace")];
+    // This is accessible so we can enable custom engines during tests.
+    this.allowedEngines = allowedEngines;
+    this.current = null;
+    this.setupObservers();
+  }
+
+  setupObservers() {
+    for (let topic of TOPICS) {
+      Observers.add(topic, this, this);
+    }
+  }
+
+  shutdown() {
+    for (let topic of TOPICS) {
+      Observers.remove(topic, this, this);
+    }
+  }
+
+  submit(record) {
+    TelemetryController.submitExternalPing("sync", record);
+  }
+
+  onSyncStarted() {
+    if (this.current) {
+      log.warn("Observed weave:service:sync:start, but we're already recording a sync!");
+      // Just discard the old record, consistent with our handling of engines, above.
+    }
+    this.current = new TelemetryRecord(this.allowedEngines);
+  }
+
+  _checkCurrent(topic) {
+    if (!this.current) {
+      log.warn(`Observed notification ${topic} but no current sync is being recorded.`);
+      return false;
+    }
+    return true;
+  }
+
+  onSyncFinished(error) {
+    if (!this.current) {
+      log.warn("onSyncFinished but we aren't recording");
+      return;
+    }
+    this.current.finished(error);
+    let current = this.current;
+    this.current = null;
+    this.submit(current.toJSON());
+  }
+
+  observe(subject, topic, data) {
+    log.trace(`observed ${topic} ${data}`);
+
+    switch (topic) {
+      case "xpcom-shutdown":
+        this.shutdown();
+        break;
+
+      /* sync itself state changes */
+      case "weave:service:sync:start":
+        this.onSyncStarted();
+        break;
+
+      case "weave:service:sync:finish":
+        if (this._checkCurrent(topic)) {
+          this.onSyncFinished(null);
+        }
+        break;
+
+      case "weave:service:sync:error":
+        // argument needs to be truthy (this should always be the case)
+        this.onSyncFinished(subject || "Unknown");
+        break;
+
+      /* engine sync state changes */
+      case "weave:engine:sync:start":
+        if (this._checkCurrent(topic)) {
+          this.current.onEngineStart(data);
+        }
+        break;
+      case "weave:engine:sync:finish":
+        if (this._checkCurrent(topic)) {
+          this.current.onEngineStop(data, null);
+        }
+        break;
+
+      case "weave:engine:sync:error":
+        if (this._checkCurrent(topic)) {
+          // argument needs to be truthy (this should always be the case)
+          this.current.onEngineStop(data, subject || "Unknown");
+        }
+        break;
+
+      /* engine counts */
+      case "weave:engine:sync:applied":
+        if (this._checkCurrent(topic)) {
+          this.current.onEngineApplied(data, subject);
+        }
+        break;
+
+      case "weave:engine:sync:uploaded":
+        if (this._checkCurrent(topic)) {
+          this.current.onEngineUploaded(data, subject);
+        }
+        break;
+
+      default:
+        log.warn(`unexpected observer topic ${topic}`);
+        break;
+    }
+  }
+}
+
+this.SyncTelemetry = new SyncTelemetryImpl(ENGINES);
--- a/services/sync/moz.build
+++ b/services/sync/moz.build
@@ -30,16 +30,17 @@ EXTRA_JS_MODULES['services-sync'] += [
     'modules/main.js',
     'modules/policies.js',
     'modules/record.js',
     'modules/resource.js',
     'modules/rest.js',
     'modules/service.js',
     'modules/status.js',
     'modules/SyncedTabs.jsm',
+    'modules/telemetry.js',
     'modules/userapi.js',
     'modules/util.js',
 ]
 
 EXTRA_PP_JS_MODULES['services-sync'] += [
     'modules/constants.js',
 ]
 
--- a/services/sync/tests/unit/head_helpers.js
+++ b/services/sync/tests/unit/head_helpers.js
@@ -1,14 +1,44 @@
 /* Any copyright is dedicated to the Public Domain.
    http://creativecommons.org/publicdomain/zero/1.0/ */
 
 Cu.import("resource://services-common/async.js");
 Cu.import("resource://testing-common/services/common/utils.js");
 Cu.import("resource://testing-common/PlacesTestUtils.jsm");
+Cu.import("resource://gre/modules/XPCOMUtils.jsm");
+
+XPCOMUtils.defineLazyGetter(this, 'SyncPingSchema', function() {
+  let ns = {};
+  Cu.import("resource://gre/modules/FileUtils.jsm", ns);
+  let stream = Cc["@mozilla.org/network/file-input-stream;1"]
+               .createInstance(Ci.nsIFileInputStream);
+  let jsonReader = Cc["@mozilla.org/dom/json;1"]
+                   .createInstance(Components.interfaces.nsIJSON);
+  let schema;
+  try {
+    let schemaFile = do_get_file("sync_ping_schema.json");
+    stream.init(schemaFile, ns.FileUtils.MODE_RDONLY, ns.FileUtils.PERMS_FILE, 0);
+    schema = jsonReader.decodeFromStream(stream, stream.available());
+  } finally {
+    stream.close();
+  }
+
+  // Allow tests to make whatever engines they want, this shouldn't cause
+  // validation failure.
+  schema.definitions.engine.properties.name = { type: "string" };
+  return schema;
+});
+
+XPCOMUtils.defineLazyGetter(this, 'SyncPingValidator', function() {
+  let ns = {};
+  Cu.import("resource://testing-common/ajv-4.1.1.js", ns);
+  let ajv = new ns.Ajv({ async: "co*" });
+  return ajv.compile(SyncPingSchema);
+});
 
 var provider = {
   getFile: function(prop, persistent) {
     persistent.value = true;
     switch (prop) {
       case "ExtPrefDL":
         return [Services.dirsvc.get("CurProcD", Ci.nsIFile)];
       default:
@@ -201,8 +231,162 @@ function mockGetWindowEnumerator(url, nu
 
 // Helper that allows checking array equality.
 function do_check_array_eq(a1, a2) {
   do_check_eq(a1.length, a2.length);
   for (let i = 0; i < a1.length; ++i) {
     do_check_eq(a1[i], a2[i]);
   }
 }
+
+// Helper function to get the sync telemetry and add the typically used test
+// engine names to its list of allowed engines.
+function get_sync_test_telemetry() {
+  let ns = {};
+  Cu.import("resource://services-sync/telemetry.js", ns);
+  let testEngines = ["rotary", "steam", "sterling", "catapult"];
+  for (let engineName of testEngines) {
+    ns.SyncTelemetry.allowedEngines.add(engineName);
+  }
+  return ns.SyncTelemetry;
+}
+
+function assert_valid_ping(record) {
+  if (record) {
+    if (!SyncPingValidator(record)) {
+      deepEqual([], SyncPingValidator.errors, "Sync telemetry ping validation failed");
+    }
+    equal(record.version, 1);
+    lessOrEqual(record.when, Date.now());
+  }
+}
+
+// Asserts that `ping` is a ping that doesn't contain any failure information
+function assert_success_ping(ping) {
+  ok(!!ping);
+  assert_valid_ping(ping);
+  ok(!ping.failureReason);
+  equal(undefined, ping.status);
+  greater(ping.engines.length, 0);
+  for (let e of ping.engines) {
+    ok(!e.failureReason);
+    equal(undefined, e.status);
+    if (e.outgoing) {
+      for (let o of e.outgoing) {
+        equal(undefined, o.failed);
+        notEqual(undefined, o.sent);
+      }
+    }
+    if (e.incoming) {
+      equal(undefined, e.incoming.failed);
+      equal(undefined, e.incoming.newFailed);
+      notEqual(undefined, e.incoming.applied || e.incoming.reconciled);
+    }
+  }
+}
+
+// Hooks into telemetry to validate all pings after calling.
+function validate_all_future_pings() {
+  let telem = get_sync_test_telemetry();
+  telem.submit = assert_valid_ping;
+}
+
+function wait_for_ping(callback, allowErrorPings) {
+  return new Promise(resolve => {
+    let telem = get_sync_test_telemetry();
+    let oldSubmit = telem.submit;
+    telem.submit = function(record) {
+      telem.submit = oldSubmit;
+      if (allowErrorPings) {
+        assert_valid_ping(record);
+      } else {
+        assert_success_ping(record);
+      }
+      resolve(record);
+    };
+    callback();
+  });
+}
+
+// Short helper for wait_for_ping
+function sync_and_validate_telem(allowErrorPings) {
+  return wait_for_ping(() => Service.sync(), allowErrorPings);
+}
+
+// Used for the (many) cases where we do a 'partial' sync, where only a single
+// engine is actually synced, but we still want to ensure we're generating a
+// valid ping. Returns a promise that resolves to the ping, or rejects with the
+// thrown error after calling an optional callback.
+function sync_engine_and_validate_telem(engine, allowErrorPings, onError) {
+  return new Promise((resolve, reject) => {
+    let telem = get_sync_test_telemetry();
+    let caughtError = null;
+    // Clear out status, so failures from previous syncs won't show up in the
+    // telemetry ping.
+    let ns = {};
+    Cu.import("resource://services-sync/status.js", ns);
+    ns.Status._engines = {};
+    ns.Status.partial = false;
+    // Ideally we'd clear these out like we do with engines, (probably via
+    // Status.resetSync()), but this causes *numerous* tests to fail, so we just
+    // assume that if no failureReason or engine failures are set, and the
+    // status properties are the same as they were initially, that it's just
+    // a leftover.
+    // This is only an issue since we're triggering the sync of just one engine,
+    // without doing any other parts of the sync.
+    let initialServiceStatus = ns.Status._service;
+    let initialSyncStatus = ns.Status._sync;
+
+    let oldSubmit = telem.submit;
+    telem.submit = function(record) {
+      telem.submit = oldSubmit;
+      if (record && record.status) {
+        // did we see anything to lead us to believe that something bad actually happened
+        let realProblem = record.failureReason || record.engines.some(e => {
+          if (e.failureReason || e.status) {
+            return true;
+          }
+          if (e.outgoing && e.outgoing.some(o => o.failed > 0)) {
+            return true;
+          }
+          return e.incoming && e.incoming.failed;
+        });
+        if (!realProblem) {
+          // no, so if the status is the same as it was initially, just assume
+          // that its leftover and that we can ignore it.
+          if (record.status.sync && record.status.sync == initialSyncStatus) {
+            delete record.status.sync;
+          }
+          if (record.status.service && record.status.service == initialServiceStatus) {
+            delete record.status.service;
+          }
+          if (!record.status.sync && !record.status.service) {
+            delete record.status;
+          }
+        }
+      }
+      if (allowErrorPings) {
+        assert_valid_ping(record);
+      } else {
+        assert_success_ping(record);
+      }
+      if (caughtError) {
+        if (onError) {
+          onError(record);
+        }
+        reject(caughtError);
+      } else {
+        resolve(record);
+      }
+    };
+    Svc.Obs.notify("weave:service:sync:start");
+    try {
+      engine.sync();
+    } catch (e) {
+      caughtError = e;
+    }
+    if (caughtError) {
+      Svc.Obs.notify("weave:service:sync:error", caughtError);
+    } else {
+      Svc.Obs.notify("weave:service:sync:finish");
+    }
+  });
+}
--- a/services/sync/tests/unit/test_bookmark_engine.js
+++ b/services/sync/tests/unit/test_bookmark_engine.js
@@ -94,17 +94,17 @@ add_test(function test_ID_caching() {
 function serverForFoo(engine) {
   return serverForUsers({"foo": "password"}, {
     meta: {global: {engines: {bookmarks: {version: engine.version,
                                           syncID: engine.syncID}}}},
     bookmarks: {}
   });
 }
 
-add_test(function test_processIncoming_error_orderChildren() {
+add_task(function* test_processIncoming_error_orderChildren() {
   _("Ensure that _orderChildren() is called even when _processIncoming() throws an error.");
 
   let engine = new BookmarksEngine(Service);
   let store  = engine._store;
   let server = serverForFoo(engine);
   new SyncTestingInfrastructure(server.server);
 
   let collection = server.user("foo").collection("bookmarks");
@@ -141,36 +141,36 @@ add_test(function test_processIncoming_e
 
     // Make the 10 minutes old so it will only be synced in the toFetch phase.
     bogus_record.modified = Date.now() / 1000 - 60 * 10;
     engine.lastSync = Date.now() / 1000 - 60;
     engine.toFetch = [BOGUS_GUID];
 
     let error;
     try {
-      engine.sync();
+      yield sync_engine_and_validate_telem(engine, true)
     } catch(ex) {
       error = ex;
     }
-    do_check_true(!!error);
+    ok(!!error);
 
     // Verify that the bookmark order has been applied.
     let new_children = store.createRecord(folder1_guid).children;
     do_check_eq(new_children.length, 2);
     do_check_eq(new_children[0], folder1_payload.children[0]);
     do_check_eq(new_children[1], folder1_payload.children[1]);
 
     do_check_eq(PlacesUtils.bookmarks.getItemIndex(bmk1_id), 1);
     do_check_eq(PlacesUtils.bookmarks.getItemIndex(bmk2_id), 0);
 
   } finally {
     store.wipe();
     Svc.Prefs.resetBranch("");
     Service.recordManager.clearCache();
-    server.stop(run_next_test);
+    yield new Promise(resolve => server.stop(resolve));
   }
 });
 
 add_task(function* test_restorePromptsReupload() {
   _("Ensure that restoring from a backup will reupload all records.");
   let engine = new BookmarksEngine(Service);
   let store  = engine._store;
   let server = serverForFoo(engine);
@@ -213,17 +213,17 @@ add_task(function* test_restorePromptsRe
       folder1_id, tburi, PlacesUtils.bookmarks.DEFAULT_INDEX, "Get Thunderbird!");
     let bmk2_guid = store.GUIDForId(bmk2_id);
     _("Get Thunderbird!: " + bmk2_id + ", " + bmk2_guid);
 
     PlacesUtils.bookmarks.removeItem(bmk1_id);
 
     let error;
     try {
-      engine.sync();
+      yield sync_engine_and_validate_telem(engine, false);
     } catch(ex) {
       error = ex;
       _("Got error: " + Log.exceptionStr(ex));
     }
     do_check_true(!error);
 
     _("Verify that there's only one bookmark on the server, and it's Thunderbird.");
     // Of course, there's also the Bookmarks Toolbar and Bookmarks Menu...
@@ -257,17 +257,17 @@ add_task(function* test_restorePromptsRe
     _("We found it: " + found);
     do_check_true(found);
 
     _("Have the correct number of IDs locally, too.");
     do_check_eq(count, ["menu", "toolbar", "mobile", "unfiled", folder1_id, bmk1_id].length);
 
     _("Sync again. This'll wipe bookmarks from the server.");
     try {
-      engine.sync();
+      yield sync_engine_and_validate_telem(engine, false);
     } catch(ex) {
       error = ex;
       _("Got error: " + Log.exceptionStr(ex));
     }
     do_check_true(!error);
 
     _("Verify that there's only one bookmark on the server, and it's Firefox.");
     // Of course, there's also the Bookmarks Toolbar and Bookmarks Menu...
--- a/services/sync/tests/unit/test_bookmark_smart_bookmarks.js
+++ b/services/sync/tests/unit/test_bookmark_smart_bookmarks.js
@@ -52,17 +52,17 @@ function serverForFoo(engine) {
     meta: {global: {engines: {bookmarks: {version: engine.version,
                                           syncID: engine.syncID}}}},
     bookmarks: {}
   });
 }
 
 // Verify that Places smart bookmarks have their annotation uploaded and
 // handled locally.
-add_test(function test_annotation_uploaded() {
+add_task(function *test_annotation_uploaded() {
   let server = serverForFoo(engine);
   new SyncTestingInfrastructure(server.server);
 
   let startCount = smartBookmarkCount();
 
   _("Start count is " + startCount);
 
   if (startCount > 0) {
@@ -105,17 +105,17 @@ add_test(function test_annotation_upload
 
   _("Our count has increased since we started.");
   do_check_eq(smartBookmarkCount(), startCount + 1);
 
   _("Sync record to the server.");
   let collection = server.user("foo").collection("bookmarks");
 
   try {
-    engine.sync();
+    yield sync_engine_and_validate_telem(engine, false);
     let wbos = collection.keys(function (id) {
                  return ["menu", "toolbar", "mobile", "unfiled"].indexOf(id) == -1;
                });
     do_check_eq(wbos.length, 1);
 
     _("Verify that the server WBO has the annotation.");
     let serverGUID = wbos[0];
     do_check_eq(serverGUID, guid);
@@ -136,17 +136,17 @@ add_test(function test_annotation_upload
       mostVisitedID, Utils.makeURI("http://something/else"));
     PlacesUtils.annotations.removeItemAnnotation(mostVisitedID,
                                                  SMART_BOOKMARKS_ANNO);
     store.wipe();
     engine.resetClient();
     do_check_eq(smartBookmarkCount(), startCount);
 
     _("Sync. Verify that the downloaded record carries the annotation.");
-    engine.sync();
+    yield sync_engine_and_validate_telem(engine, false);
 
     _("Verify that the Places DB now has an annotated bookmark.");
     _("Our count has increased again.");
     do_check_eq(smartBookmarkCount(), startCount + 1);
 
     _("Find by GUID and verify that it's annotated.");
     let newID = store.idForGUID(serverGUID);
     let newAnnoValue = PlacesUtils.annotations.getItemAnnotation(
--- a/services/sync/tests/unit/test_collections_recovery.js
+++ b/services/sync/tests/unit/test_collections_recovery.js
@@ -28,16 +28,19 @@ add_identity_test(this, function* test_m
 
   let handlers = {
     "/1.1/johndoe/info/collections": maybe_empty(johnHelper.handler),
     "/1.1/johndoe/storage/crypto/keys": johnU("crypto", new ServerWBO("keys").handler()),
     "/1.1/johndoe/storage/meta/global": johnU("meta",   new ServerWBO("global").handler())
   };
   let collections = ["clients", "bookmarks", "forms", "history",
                      "passwords", "prefs", "tabs"];
+  // Disable addon sync because AddonManager won't be initialized here.
+  Service.engineManager.unregister("addons");
+
   for (let coll of collections) {
     handlers["/1.1/johndoe/storage/" + coll] =
       johnU(coll, new ServerCollection({}, true).handler());
   }
   let server = httpd_setup(handlers);
   Service.serverURL = server.baseURI;
 
   try {
@@ -45,32 +48,32 @@ add_identity_test(this, function* test_m
     let orig  = Service._freshStart;
     Service._freshStart = function() {
       _("Called _freshStart.");
       orig.call(Service);
       fresh++;
     };
 
     _("Startup, no meta/global: freshStart called once.");
-    Service.sync();
+    yield sync_and_validate_telem();
     do_check_eq(fresh, 1);
     fresh = 0;
 
     _("Regular sync: no need to freshStart.");
     Service.sync();
     do_check_eq(fresh, 0);
 
     _("Simulate a bad info/collections.");
     delete johnColls.crypto;
-    Service.sync();
+    yield sync_and_validate_telem();
     do_check_eq(fresh, 1);
     fresh = 0;
 
     _("Regular sync: no need to freshStart.");
-    Service.sync();
+    yield sync_and_validate_telem();
     do_check_eq(fresh, 0);
 
   } finally {
     Svc.Prefs.resetBranch("");
     let deferred = Promise.defer();
     server.stop(deferred.resolve);
     yield deferred.promise;
   }
--- a/services/sync/tests/unit/test_corrupt_keys.js
+++ b/services/sync/tests/unit/test_corrupt_keys.js
@@ -54,16 +54,17 @@ add_task(function* test_locally_changed_
       getBrowserState: () => JSON.stringify(myTabs)
     };
 
     setBasicCredentials("johndoe", "password", passphrase);
     Service.serverURL = server.baseURI;
     Service.clusterURL = server.baseURI;
 
     Service.engineManager.register(HistoryEngine);
+    Service.engineManager.unregister("addons");
 
     function corrupt_local_keys() {
       Service.collectionKeys._default.keyPair = [Svc.Crypto.generateRandomKey(),
                                                  Svc.Crypto.generateRandomKey()];
     }
 
     _("Setting meta.");
 
@@ -81,17 +82,17 @@ add_task(function* test_locally_changed_
     serverKeys.encrypt(Service.identity.syncKeyBundle);
     do_check_true(serverKeys.upload(Service.resource(Service.cryptoKeysURL)).success);
 
     // Check that login works.
     do_check_true(Service.login("johndoe", "ilovejane", passphrase));
     do_check_true(Service.isLoggedIn);
 
     // Sync should upload records.
-    Service.sync();
+    yield sync_and_validate_telem();
 
     // Tabs exist.
     _("Tabs modified: " + johndoe.modified("tabs"));
     do_check_true(johndoe.modified("tabs") > 0);
 
     let coll_modified = Service.collectionKeys.lastModified;
 
     // Let's create some server side history records.
@@ -134,17 +135,19 @@ add_task(function* test_locally_changed_
     // Fill local key cache with bad data.
     corrupt_local_keys();
     _("Keys now: " + Service.collectionKeys.keyForCollection("history").keyPair);
 
     do_check_eq(hmacErrorCount, 0);
 
     _("HMAC error count: " + hmacErrorCount);
     // Now syncing should succeed, after one HMAC error.
-    Service.sync();
+    let ping = yield wait_for_ping(() => Service.sync(), true);
+    equal(ping.engines.find(e => e.name == "history").incoming.applied, 5);
+
     do_check_eq(hmacErrorCount, 1);
     _("Keys now: " + Service.collectionKeys.keyForCollection("history").keyPair);
 
     // And look! We downloaded history!
     let store = Service.engineManager.get("history")._store;
     do_check_true(yield promiseIsURIVisited("http://foo/bar?record-no--0"));
     do_check_true(yield promiseIsURIVisited("http://foo/bar?record-no--1"));
     do_check_true(yield promiseIsURIVisited("http://foo/bar?record-no--2"));
@@ -178,17 +181,19 @@ add_task(function* test_locally_changed_
 
     _("Server key time hasn't changed.");
     do_check_eq(johndoe.modified("crypto"), old_key_time);
 
     _("Resetting HMAC error timer.");
     Service.lastHMACEvent = 0;
 
     _("Syncing...");
-    Service.sync();
+    ping = yield sync_and_validate_telem(true);
+
+    do_check_eq(ping.engines.find(e => e.name == "history").incoming.failed, 5);
     _("Keys now: " + Service.collectionKeys.keyForCollection("history").keyPair);
     _("Server keys have been updated, and we skipped over 5 more HMAC errors without adjusting history.");
     do_check_true(johndoe.modified("crypto") > old_key_time);
     do_check_eq(hmacErrorCount, 6);
     do_check_false(yield promiseIsURIVisited("http://foo/bar?record-no--5"));
     do_check_false(yield promiseIsURIVisited("http://foo/bar?record-no--6"));
     do_check_false(yield promiseIsURIVisited("http://foo/bar?record-no--7"));
     do_check_false(yield promiseIsURIVisited("http://foo/bar?record-no--8"));
@@ -199,16 +204,17 @@ add_task(function* test_locally_changed_
     server.stop(deferred.resolve);
     yield deferred.promise;
   }
 });
 
 function run_test() {
   let logger = Log.repository.rootLogger;
   Log.repository.rootLogger.addAppender(new Log.DumpAppender());
+  validate_all_future_pings();
 
   ensureLegacyIdentityManager();
 
   run_next_test();
 }
 
 /**
  * Asynchronously check a url is visited.
--- a/services/sync/tests/unit/test_engine.js
+++ b/services/sync/tests/unit/test_engine.js
@@ -166,20 +166,22 @@ add_test(function test_enabled() {
 
 add_test(function test_sync() {
   let engine = new SteamEngine("Steam", Service);
   try {
     _("Engine.sync doesn't call _sync if it's not enabled");
     do_check_false(engine.enabled);
     do_check_false(engine.wasSynced);
     engine.sync();
+
     do_check_false(engine.wasSynced);
 
     _("Engine.sync calls _sync if it's enabled");
     engine.enabled = true;
+
     engine.sync();
     do_check_true(engine.wasSynced);
     do_check_eq(engineObserver.topics[0], "weave:engine:sync:start");
     do_check_eq(engineObserver.topics[1], "weave:engine:sync:finish");
     run_next_test();
   } finally {
     Svc.Prefs.resetBranch("");
     engine.wasSynced = false;
--- a/services/sync/tests/unit/test_errorhandler.js
+++ b/services/sync/tests/unit/test_errorhandler.js
@@ -161,17 +161,17 @@ function clean() {
   errorHandler.didReportProlongedError = false;
 }
 
 add_identity_test(this, function* test_401_logout() {
   let server = sync_httpd_setup();
   yield setUp(server);
 
   // By calling sync, we ensure we're logged in.
-  Service.sync();
+  yield sync_and_validate_telem();
   do_check_eq(Status.sync, SYNC_SUCCEEDED);
   do_check_true(Service.isLoggedIn);
 
   let deferred = Promise.defer();
   Svc.Obs.add("weave:service:sync:error", onSyncError);
   function onSyncError() {
     _("Got weave:service:sync:error in first sync.");
     Svc.Obs.remove("weave:service:sync:error", onSyncError);
@@ -196,32 +196,39 @@ add_identity_test(this, function* test_4
     Svc.Obs.add("weave:service:login:error", onLoginError);
   }
 
   // Make sync fail due to login rejected.
   yield configureIdentity({username: "janedoe"});
   Service._updateCachedURLs();
 
   _("Starting first sync.");
-  Service.sync();
+  let ping = yield sync_and_validate_telem(true);
+  deepEqual(ping.failureReason, { name: "httperror", code: 401 });
   _("First sync done.");
   yield deferred.promise;
 });
 
 add_identity_test(this, function* test_credentials_changed_logout() {
   let server = sync_httpd_setup();
   yield setUp(server);
 
   // By calling sync, we ensure we're logged in.
-  Service.sync();
+  yield sync_and_validate_telem();
   do_check_eq(Status.sync, SYNC_SUCCEEDED);
   do_check_true(Service.isLoggedIn);
 
   generateCredentialsChangedFailure();
-  Service.sync();
+
+  let ping = yield sync_and_validate_telem(true);
+  equal(ping.status.sync, CREDENTIALS_CHANGED);
+  deepEqual(ping.failureReason, {
+    name: "unexpectederror",
+    error: "Error: Aborting sync, remote setup failed"
+  });
 
   do_check_eq(Status.sync, CREDENTIALS_CHANGED);
   do_check_false(Service.isLoggedIn);
 
   // Clean up.
   Service.startOver();
   let deferred = Promise.defer();
   server.stop(deferred.resolve);
@@ -534,23 +541,30 @@ add_identity_test(this, function* test_s
   do_check_true(Service.isLoggedIn);
 
   generateCredentialsChangedFailure();
 
   let deferred = Promise.defer();
   Svc.Obs.add("weave:ui:sync:error", function onSyncError() {
     Svc.Obs.remove("weave:ui:sync:error", onSyncError);
     do_check_eq(Status.sync, CREDENTIALS_CHANGED);
-
-    clean();
-    server.stop(deferred.resolve);
+    // If we clean this tick, telemetry won't get the right error
+    server.stop(() => {
+      clean();
+      deferred.resolve();
+    });
   });
 
   setLastSync(NON_PROLONGED_ERROR_DURATION);
-  errorHandler.syncAndReportErrors();
+  let ping = yield wait_for_ping(() => errorHandler.syncAndReportErrors(), true);
+  equal(ping.status.sync, CREDENTIALS_CHANGED);
+  deepEqual(ping.failureReason, {
+    name: "unexpectederror",
+    error: "Error: Aborting sync, remote setup failed"
+  });
   yield deferred.promise;
 });
 
 // XXX - how to arrange for 'Service.identity.basicPassword = null;' in
 // an fxaccounts environment?
 add_task(function* test_login_syncAndReportErrors_prolonged_non_network_error() {
   // Test prolonged, non-network errors are
   // reported when calling syncAndReportErrors.
@@ -584,23 +598,30 @@ add_identity_test(this, function* test_s
   do_check_true(Service.isLoggedIn);
 
   generateCredentialsChangedFailure();
 
   let deferred = Promise.defer();
   Svc.Obs.add("weave:ui:sync:error", function onSyncError() {
     Svc.Obs.remove("weave:ui:sync:error", onSyncError);
     do_check_eq(Status.sync, CREDENTIALS_CHANGED);
-
-    clean();
-    server.stop(deferred.resolve);
+    // If we clean this tick, telemetry won't get the right error
+    server.stop(() => {
+      clean();
+      deferred.resolve();
+    });
   });
 
   setLastSync(PROLONGED_ERROR_DURATION);
-  errorHandler.syncAndReportErrors();
+  let ping = yield wait_for_ping(() => errorHandler.syncAndReportErrors(), true);
+  equal(ping.status.sync, CREDENTIALS_CHANGED);
+  deepEqual(ping.failureReason, {
+    name: "unexpectederror",
+    error: "Error: Aborting sync, remote setup failed"
+  });
   yield deferred.promise;
 });
 
 add_identity_test(this, function* test_login_syncAndReportErrors_network_error() {
   // Test network errors are reported when calling syncAndReportErrors.
   yield configureIdentity({username: "broken.wipe"});
   Service.serverURL  = fakeServerUrl;
   Service.clusterURL = fakeServerUrl;
@@ -710,23 +731,30 @@ add_task(function* test_sync_prolonged_n
 
   generateCredentialsChangedFailure();
 
   let deferred = Promise.defer();
   Svc.Obs.add("weave:ui:sync:error", function onSyncError() {
     Svc.Obs.remove("weave:ui:sync:error", onSyncError);
     do_check_eq(Status.sync, PROLONGED_SYNC_FAILURE);
     do_check_true(errorHandler.didReportProlongedError);
-
-    clean();
-    server.stop(deferred.resolve);
+    server.stop(() => {
+      clean();
+      deferred.resolve();
+    });
   });
 
   setLastSync(PROLONGED_ERROR_DURATION);
-  Service.sync();
+
+  let ping = yield sync_and_validate_telem(true);
+  equal(ping.status.sync, PROLONGED_SYNC_FAILURE);
+  deepEqual(ping.failureReason, {
+    name: "unexpectederror",
+    error: "Error: Aborting sync, remote setup failed"
+  });
   yield deferred.promise;
 });
 
 add_identity_test(this, function* test_login_prolonged_network_error() {
   // Test prolonged, network errors are reported
   yield configureIdentity({username: "johndoe"});
   Service.serverURL  = fakeServerUrl;
   Service.clusterURL = fakeServerUrl;
@@ -875,22 +903,27 @@ add_identity_test(this, function* test_s
   Svc.Obs.add("weave:ui:sync:finish", function onSyncFinish() {
     Svc.Obs.remove("weave:ui:sync:finish", onSyncFinish);
 
     do_check_eq(Status.service, SYNC_FAILED_PARTIAL);
     do_check_eq(Status.sync, SERVER_MAINTENANCE);
     do_check_false(errorHandler.didReportProlongedError);
 
     Svc.Obs.remove("weave:ui:sync:error", onSyncError);
-    clean();
-    server.stop(deferred.resolve);
+    server.stop(() => {
+      clean();
+      deferred.resolve();
+    })
   });
 
   setLastSync(NON_PROLONGED_ERROR_DURATION);
-  Service.sync();
+  let ping = yield sync_and_validate_telem(true);
+  equal(ping.status.sync, SERVER_MAINTENANCE);
+  deepEqual(ping.engines.find(e => e.failureReason).failureReason, { name: "httperror", code: 503 })
+
   yield deferred.promise;
 });
 
 add_identity_test(this, function* test_info_collections_login_server_maintenance_error() {
   // Test info/collections server maintenance errors are not reported.
   let server = sync_httpd_setup();
   yield setUp(server);
 
@@ -1035,24 +1068,29 @@ add_task(function* test_sync_prolonged_s
 
   let deferred = Promise.defer();
   Svc.Obs.add("weave:ui:sync:error", function onUIUpdate() {
     Svc.Obs.remove("weave:ui:sync:error", onUIUpdate);
     do_check_eq(Status.service, SYNC_FAILED);
     do_check_eq(Status.sync, PROLONGED_SYNC_FAILURE);
     do_check_true(errorHandler.didReportProlongedError);
 
-    clean();
-    server.stop(deferred.resolve);
+    server.stop(() => {
+      clean();
+      deferred.resolve();
+    });
   });
 
   do_check_eq(Status.service, STATUS_OK);
 
   setLastSync(PROLONGED_ERROR_DURATION);
-  Service.sync();
+  let ping = yield sync_and_validate_telem(true);
+  deepEqual(ping.status.sync, PROLONGED_SYNC_FAILURE);
+  deepEqual(ping.engines.find(e => e.failureReason).failureReason,
+            { name: "httperror", code: 503 });
   yield deferred.promise;
 });
 
 add_identity_test(this, function* test_info_collections_login_prolonged_server_maintenance_error(){
   // Test info/collections prolonged server maintenance errors are reported.
   let server = sync_httpd_setup();
   yield setUp(server);
 
@@ -1259,27 +1297,29 @@ add_identity_test(this, function* test_w
   Svc.Obs.add("weave:ui:sync:error", function onUIUpdate() {
     Svc.Obs.remove("weave:ui:sync:error", onUIUpdate);
     do_check_true(Status.enforceBackoff);
     do_check_eq(backoffInterval, 42);
     do_check_eq(Status.service, SYNC_FAILED);
     do_check_eq(Status.sync, PROLONGED_SYNC_FAILURE);
     do_check_eq(Svc.Prefs.get("firstSync"), "wipeRemote");
     do_check_true(errorHandler.didReportProlongedError);
-
-    clean();
-    server.stop(deferred.resolve);
+    server.stop(() => {
+      clean();
+      deferred.resolve();
+    });
   });
 
   do_check_false(Status.enforceBackoff);
   do_check_eq(Status.service, STATUS_OK);
 
   Svc.Prefs.set("firstSync", "wipeRemote");
   setLastSync(PROLONGED_ERROR_DURATION);
-  Service.sync();
+  let ping = yield sync_and_validate_telem(true);
+  deepEqual(ping.failureReason, { name: "httperror", code: 503 });
   yield deferred.promise;
 });
 
 add_task(function* test_sync_syncAndReportErrors_server_maintenance_error() {
   // Test server maintenance errors are reported
   // when calling syncAndReportErrors.
   let server = sync_httpd_setup();
   yield setUp(server);
@@ -1765,20 +1805,20 @@ add_identity_test(this, function* test_w
   setLastSync(PROLONGED_ERROR_DURATION);
   errorHandler.syncAndReportErrors();
   yield deferred.promise;
 });
 
 add_task(function* test_sync_engine_generic_fail() {
   let server = sync_httpd_setup();
 
-  let engine = engineManager.get("catapult");
+let engine = engineManager.get("catapult");
   engine.enabled = true;
   engine.sync = function sync() {
-    Svc.Obs.notify("weave:engine:sync:error", "", "catapult");
+    Svc.Obs.notify("weave:engine:sync:error", ENGINE_UNKNOWN_FAIL, "catapult");
   };
 
   let log = Log.repository.getLogger("Sync.ErrorHandler");
   Svc.Prefs.set("log.appender.file.logOnError", true);
 
   do_check_eq(Status.engines["catapult"], undefined);
 
   let deferred = Promise.defer();
@@ -1803,22 +1843,28 @@ add_task(function* test_sync_engine_gene
       let logfile = entries.getNext().QueryInterface(Ci.nsILocalFile);
       do_check_true(logfile.leafName.startsWith("error-sync-"), logfile.leafName);
 
       clean();
 
       let syncErrors = sumHistogram("WEAVE_ENGINE_SYNC_ERRORS", { key: "catapult" });
       do_check_true(syncErrors, 1);
 
-      server.stop(deferred.resolve);
+      server.stop(() => {
+        clean();
+        deferred.resolve();
+      });
     });
   });
 
   do_check_true(yield setUp(server));
-  Service.sync();
+  let ping = yield sync_and_validate_telem(true);
+  deepEqual(ping.status.service, SYNC_FAILED_PARTIAL);
+  deepEqual(ping.engines.find(e => e.status).status, ENGINE_UNKNOWN_FAIL);
+
   yield deferred.promise;
 });
 
 add_test(function test_logs_on_sync_error_despite_shouldReportError() {
   _("Ensure that an error is still logged when weave:service:sync:error " +
     "is notified, despite shouldReportError returning false.");
 
   let log = Log.repository.getLogger("Sync.ErrorHandler");
--- a/services/sync/tests/unit/test_errorhandler_filelog.js
+++ b/services/sync/tests/unit/test_errorhandler_filelog.js
@@ -30,16 +30,18 @@ function setLastSync(lastSyncValue) {
 function run_test() {
   initTestLogging("Trace");
 
   Log.repository.getLogger("Sync.LogManager").level = Log.Level.Trace;
   Log.repository.getLogger("Sync.Service").level = Log.Level.Trace;
   Log.repository.getLogger("Sync.SyncScheduler").level = Log.Level.Trace;
   Log.repository.getLogger("Sync.ErrorHandler").level = Log.Level.Trace;
 
+  validate_all_future_pings();
+
   run_next_test();
 }
 
 add_test(function test_noOutput() {
   // Ensure that the log appender won't print anything.
   errorHandler._logManager._fileAppender.level = Log.Level.Fatal + 1;
 
   // Clear log output from startup.
--- a/services/sync/tests/unit/test_errorhandler_sync_checkServerError.js
+++ b/services/sync/tests/unit/test_errorhandler_sync_checkServerError.js
@@ -271,11 +271,12 @@ add_identity_test(this, function* test_r
   } finally {
     Status.resetSync();
     Service.startOver();
   }
   yield promiseStopServer(server);
 });
 
 function run_test() {
+  validate_all_future_pings();
   engineManager.register(CatapultEngine);
   run_next_test();
 }
--- a/services/sync/tests/unit/test_hmac_error.js
+++ b/services/sync/tests/unit/test_hmac_error.js
@@ -44,17 +44,17 @@ function shared_setup() {
   let global      = new ServerWBO("global", {engines: engines});
   let keysWBO     = new ServerWBO("keys");
   let rotaryColl  = new ServerCollection({}, true);
   let clientsColl = new ServerCollection({}, true);
 
   return [engine, rotaryColl, clientsColl, keysWBO, global];
 }
 
-add_test(function hmac_error_during_404() {
+add_task(function *hmac_error_during_404() {
   _("Attempt to replicate the HMAC error setup.");
   let [engine, rotaryColl, clientsColl, keysWBO, global] = shared_setup();
 
   // Hand out 404s for crypto/keys.
   let keysHandler    = keysWBO.handler();
   let key404Counter  = 0;
   let keys404Handler = function (request, response) {
     if (key404Counter > 0) {
@@ -78,31 +78,32 @@ add_test(function hmac_error_during_404(
     "/1.1/foo/storage/rotary": upd("rotary", rotaryColl.handler())
   };
 
   let server = sync_httpd_setup(handlers);
   Service.serverURL = server.baseURI;
 
   try {
     _("Syncing.");
-    Service.sync();
+    yield sync_and_validate_telem();
+
     _("Partially resetting client, as if after a restart, and forcing redownload.");
     Service.collectionKeys.clear();
     engine.lastSync = 0;        // So that we redownload records.
     key404Counter = 1;
     _("---------------------------");
-    Service.sync();
+    yield sync_and_validate_telem();
     _("---------------------------");
 
     // Two rotary items, one client record... no errors.
     do_check_eq(hmacErrorCount, 0)
   } finally {
     Svc.Prefs.resetBranch("");
     Service.recordManager.clearCache();
-    server.stop(run_next_test);
+    yield new Promise(resolve => server.stop(resolve));
   }
 });
 
 add_test(function hmac_error_during_node_reassignment() {
   _("Attempt to replicate an HMAC error during node reassignment.");
   let [engine, rotaryColl, clientsColl, keysWBO, global] = shared_setup();
 
   let collectionsHelper = track_collections_helper();
--- a/services/sync/tests/unit/test_node_reassignment.js
+++ b/services/sync/tests/unit/test_node_reassignment.js
@@ -18,17 +18,17 @@ Service.engineManager.clear();
 function run_test() {
   Log.repository.getLogger("Sync.AsyncResource").level = Log.Level.Trace;
   Log.repository.getLogger("Sync.ErrorHandler").level  = Log.Level.Trace;
   Log.repository.getLogger("Sync.Resource").level      = Log.Level.Trace;
   Log.repository.getLogger("Sync.RESTRequest").level   = Log.Level.Trace;
   Log.repository.getLogger("Sync.Service").level       = Log.Level.Trace;
   Log.repository.getLogger("Sync.SyncScheduler").level = Log.Level.Trace;
   initTestLogging();
-
+  validate_all_future_pings();
   ensureLegacyIdentityManager();
 
   Service.engineManager.register(RotaryEngine);
 
   // None of the failures in this file should result in a UI error.
   function onUIError() {
     do_throw("Errors should not be presented in the UI.");
   }
--- a/services/sync/tests/unit/test_service_startup.js
+++ b/services/sync/tests/unit/test_service_startup.js
@@ -5,16 +5,17 @@ Cu.import("resource://services-common/ob
 Cu.import("resource://services-sync/engines.js");
 Cu.import("resource://services-sync/util.js");
 Cu.import("resource://testing-common/services/sync/utils.js");
 
 Svc.Prefs.set("registerEngines", "Tab,Bookmarks,Form,History");
 Cu.import("resource://services-sync/service.js");
 
 function run_test() {
+  validate_all_future_pings();
   _("When imported, Service.onStartup is called");
   initTestLogging("Trace");
 
   let xps = Cc["@mozilla.org/weave/service;1"]
               .getService(Ci.nsISupports)
               .wrappedJSObject;
   do_check_false(xps.enabled);
 
--- a/services/sync/tests/unit/test_service_sync_locked.js
+++ b/services/sync/tests/unit/test_service_sync_locked.js
@@ -1,15 +1,16 @@
 /* Any copyright is dedicated to the Public Domain.
    http://creativecommons.org/publicdomain/zero/1.0/ */
 
 Cu.import("resource://services-sync/service.js");
 Cu.import("resource://services-sync/util.js");
 
 function run_test() {
+  validate_all_future_pings();
   let debug = [];
   let info  = [];
 
   function augmentLogger(old) {
     let d = old.debug;
     let i = old.info;
     // For the purposes of this test we don't need to do full formatting
     // of the 2nd param, as the ones we care about are always strings.
--- a/services/sync/tests/unit/test_service_sync_remoteSetup.js
+++ b/services/sync/tests/unit/test_service_sync_remoteSetup.js
@@ -5,16 +5,17 @@ Cu.import("resource://gre/modules/Log.js
 Cu.import("resource://services-sync/constants.js");
 Cu.import("resource://services-sync/keys.js");
 Cu.import("resource://services-sync/service.js");
 Cu.import("resource://services-sync/util.js");
 Cu.import("resource://testing-common/services/sync/fakeservices.js");
 Cu.import("resource://testing-common/services/sync/utils.js");
 
 function run_test() {
+  validate_all_future_pings();
   let logger = Log.repository.rootLogger;
   Log.repository.rootLogger.addAppender(new Log.DumpAppender());
 
   let guidSvc = new FakeGUIDService();
   let clients = new ServerCollection();
   let meta_global = new ServerWBO('global');
 
   let collectionsHelper = track_collections_helper();
--- a/services/sync/tests/unit/test_service_sync_specified.js
+++ b/services/sync/tests/unit/test_service_sync_specified.js
@@ -72,16 +72,17 @@ function setUp() {
   });
   new SyncTestingInfrastructure(server, "johndoe", "ilovejane",
                                 "abcdeabcdeabcdeabcdeabcdea");
   return server;
 }
 
 function run_test() {
   initTestLogging("Trace");
+  validate_all_future_pings();
   Log.repository.getLogger("Sync.Service").level = Log.Level.Trace;
   Log.repository.getLogger("Sync.ErrorHandler").level = Log.Level.Trace;
 
   run_next_test();
 }
 
 add_test(function test_noEngines() {
   _("Test: An empty array of engines to sync does nothing.");
--- a/services/sync/tests/unit/test_service_sync_updateEnabledEngines.js
+++ b/services/sync/tests/unit/test_service_sync_updateEnabledEngines.js
@@ -81,16 +81,17 @@ function setUp(server) {
 
 const PAYLOAD = 42;
 
 
 function run_test() {
   initTestLogging("Trace");
   Log.repository.getLogger("Sync.Service").level = Log.Level.Trace;
   Log.repository.getLogger("Sync.ErrorHandler").level = Log.Level.Trace;
+  validate_all_future_pings();
 
   run_next_test();
 }
 
 add_test(function test_newAccount() {
   _("Test: New account does not disable locally enabled engines.");
   let engine = Service.engineManager.get("steam");
   let server = sync_httpd_setup({
--- a/services/sync/tests/unit/test_syncengine_sync.js
+++ b/services/sync/tests/unit/test_syncengine_sync.js
@@ -10,23 +10,32 @@ Cu.import("resource://services-sync/serv
 Cu.import("resource://services-sync/util.js");
 Cu.import("resource://testing-common/services/sync/rotaryengine.js");
 Cu.import("resource://testing-common/services/sync/utils.js");
 
 function makeRotaryEngine() {
   return new RotaryEngine(Service);
 }
 
-function cleanAndGo(server) {
+function clean() {
   Svc.Prefs.resetBranch("");
   Svc.Prefs.set("log.logger.engine.rotary", "Trace");
   Service.recordManager.clearCache();
+}
+
+function cleanAndGo(server) {
+  clean();
   server.stop(run_next_test);
 }
 
+function promiseClean(server) {
+  clean();
+  return new Promise(resolve => server.stop(resolve));
+}
+
 function configureService(server, username, password) {
   Service.clusterURL = server.baseURI;
 
   Service.identity.account = username || "foo";
   Service.identity.basicPassword = password || "password";
 }
 
 function createServerAndConfigureClient() {
@@ -662,17 +671,17 @@ add_test(function test_processIncoming_m
     }
 
   } finally {
     cleanAndGo(server);
   }
 });
 
 
-add_test(function test_processIncoming_store_toFetch() {
+add_task(function *test_processIncoming_store_toFetch() {
   _("If processIncoming fails in the middle of a batch on mobile, state is saved in toFetch and lastSync.");
   Service.identity.username = "foo";
   Svc.Prefs.set("client.type", "mobile");
 
   // A collection that throws at the fourth get.
   let collection = new ServerCollection();
   collection._get_calls = 0;
   collection._get = collection.get;
@@ -709,33 +718,32 @@ add_test(function test_processIncoming_s
   try {
 
     // Confirm initial environment
     do_check_eq(engine.lastSync, 0);
     do_check_empty(engine._store.items);
 
     let error;
     try {
-      engine.sync();
+      yield sync_engine_and_validate_telem(engine, true);
     } catch (ex) {
       error = ex;
     }
-    do_check_true(!!error);
 
     // Only the first two batches have been applied.
     do_check_eq(Object.keys(engine._store.items).length,
                 MOBILE_BATCH_SIZE * 2);
 
     // The third batch is stuck in toFetch. lastSync has been moved forward to
     // the last successful item's timestamp.
     do_check_eq(engine.toFetch.length, MOBILE_BATCH_SIZE);
     do_check_eq(engine.lastSync, collection.wbo("record-no-99").modified);
 
   } finally {
-    cleanAndGo(server);
+    yield promiseClean(server);
   }
 });
 
 
 add_test(function test_processIncoming_resume_toFetch() {
   _("toFetch and previousFailed items left over from previous syncs are fetched on the next sync, along with new items.");
   Service.identity.username = "foo";
 
@@ -1216,17 +1224,17 @@ add_test(function test_processIncoming_f
     do_check_eq(batchDownload(BOGUS_RECORDS.length), 4);
 
   } finally {
     cleanAndGo(server);
   }
 });
 
 
-add_test(function test_processIncoming_decrypt_failed() {
+add_task(function *test_processIncoming_decrypt_failed() {
   _("Ensure that records failing to decrypt are either replaced or refetched.");
 
   Service.identity.username = "foo";
 
   // Some good and some bogus records. One doesn't contain valid JSON,
   // the other will throw during decrypt.
   let collection = new ServerCollection();
   collection._wbos.flying = new ServerWBO(
@@ -1275,31 +1283,34 @@ add_test(function test_processIncoming_d
     let observerData;
     Svc.Obs.add("weave:engine:sync:applied", function onApplied(subject, data) {
       Svc.Obs.remove("weave:engine:sync:applied", onApplied);
       observerSubject = subject;
       observerData = data;
     });
 
     engine.lastSync = collection.wbo("nojson").modified - 1;
-    engine.sync();
+    let ping = yield sync_engine_and_validate_telem(engine, true);
+    do_check_eq(ping.engines[0].incoming.applied, 2);
+    do_check_eq(ping.engines[0].incoming.failed, 4);
+    do_check_eq(ping.engines[0].incoming.newFailed, 4);
 
     do_check_eq(engine.previousFailed.length, 4);
     do_check_eq(engine.previousFailed[0], "nojson");
     do_check_eq(engine.previousFailed[1], "nojson2");
     do_check_eq(engine.previousFailed[2], "nodecrypt");
     do_check_eq(engine.previousFailed[3], "nodecrypt2");
 
     // Ensure the observer was notified
     do_check_eq(observerData, engine.name);
     do_check_eq(observerSubject.applied, 2);
     do_check_eq(observerSubject.failed, 4);
 
   } finally {
-    cleanAndGo(server);
+    yield promiseClean(server);
   }
 });
 
 
 add_test(function test_uploadOutgoing_toEmptyServer() {
   _("SyncEngine._uploadOutgoing uploads new records to server");
 
   Service.identity.username = "foo";
@@ -1353,17 +1364,17 @@ add_test(function test_uploadOutgoing_to
     do_check_eq(collection.payload("flying"), undefined);
 
   } finally {
     cleanAndGo(server);
   }
 });
 
 
-add_test(function test_uploadOutgoing_failed() {
+add_task(function *test_uploadOutgoing_failed() {
   _("SyncEngine._uploadOutgoing doesn't clear the tracker of objects that failed to upload.");
 
   Service.identity.username = "foo";
   let collection = new ServerCollection();
   // We only define the "flying" WBO on the server, not the "scotsman"
   // and "peppercorn" ones.
   collection._wbos.flying = new ServerWBO('flying');
 
@@ -1396,32 +1407,32 @@ add_test(function test_uploadOutgoing_fa
     // Confirm initial environment
     do_check_eq(engine.lastSyncLocal, 0);
     do_check_eq(collection.payload("flying"), undefined);
     do_check_eq(engine._tracker.changedIDs['flying'], FLYING_CHANGED);
     do_check_eq(engine._tracker.changedIDs['scotsman'], SCOTSMAN_CHANGED);
     do_check_eq(engine._tracker.changedIDs['peppercorn'], PEPPERCORN_CHANGED);
 
     engine.enabled = true;
-    engine.sync();
+    yield sync_engine_and_validate_telem(engine, true);
 
     // Local timestamp has been set.
     do_check_true(engine.lastSyncLocal > 0);
 
     // Ensure the 'flying' record has been uploaded and is no longer marked.
     do_check_true(!!collection.payload("flying"));
     do_check_eq(engine._tracker.changedIDs['flying'], undefined);
 
     // The 'scotsman' and 'peppercorn' records couldn't be uploaded so
     // they weren't cleared from the tracker.
     do_check_eq(engine._tracker.changedIDs['scotsman'], SCOTSMAN_CHANGED);
     do_check_eq(engine._tracker.changedIDs['peppercorn'], PEPPERCORN_CHANGED);
 
   } finally {
-    cleanAndGo(server);
+    yield promiseClean(server);
   }
 });
 
 
 add_test(function test_uploadOutgoing_MAX_UPLOAD_RECORDS() {
   _("SyncEngine._uploadOutgoing uploads in batches of MAX_UPLOAD_RECORDS");
 
   Service.identity.username = "foo";
@@ -1694,17 +1705,17 @@ add_test(function test_syncFinish_delete
     do_check_eq(engine._delete.ids, undefined);
 
   } finally {
     cleanAndGo(server);
   }
 });
 
 
-add_test(function test_sync_partialUpload() {
+add_task(function *test_sync_partialUpload() {
   _("SyncEngine.sync() keeps changedIDs that couldn't be uploaded.");
 
   Service.identity.username = "foo";
 
   let collection = new ServerCollection();
   let server = sync_httpd_setup({
       "/1.1/foo/storage/rotary": collection.handler()
   });
@@ -1742,21 +1753,22 @@ add_test(function test_sync_partialUploa
   meta_global.payload.engines = {rotary: {version: engine.version,
                                          syncID: engine.syncID}};
 
   try {
 
     engine.enabled = true;
     let error;
     try {
-      engine.sync();
+      yield sync_engine_and_validate_telem(engine, true);
     } catch (ex) {
       error = ex;
     }
-    do_check_true(!!error);
+
+    ok(!!error);
 
     // The timestamp has been updated.
     do_check_true(engine.lastSyncLocal > 456);
 
     for (let i = 0; i < 234; i++) {
       let id = 'record-no-' + i;
       // Ensure failed records are back in the tracker:
       // * records no. 23 and 42 were rejected by the server,
@@ -1764,17 +1776,17 @@ add_test(function test_sync_partialUploa
       //   hard on the 3rd upload.
       if ((i == 23) || (i == 42) || (i >= 200))
         do_check_eq(engine._tracker.changedIDs[id], i);
       else
         do_check_false(id in engine._tracker.changedIDs);
     }
 
   } finally {
-    cleanAndGo(server);
+    yield promiseClean(server);
   }
 });
 
 add_test(function test_canDecrypt_noCryptoKeys() {
   _("SyncEngine.canDecrypt returns false if the engine fails to decrypt items on the server, e.g. due to a missing crypto key collection.");
   Service.identity.username = "foo";
 
   // Wipe collection keys so we can test the desired scenario.
--- a/services/sync/tests/unit/test_syncscheduler.js
+++ b/services/sync/tests/unit/test_syncscheduler.js
@@ -85,16 +85,17 @@ function cleanUpAndGo(server) {
   return deferred.promise;
 }
 
 function run_test() {
   initTestLogging("Trace");
 
   Log.repository.getLogger("Sync.Service").level = Log.Level.Trace;
   Log.repository.getLogger("Sync.scheduler").level = Log.Level.Trace;
+  validate_all_future_pings();
 
   // The scheduler checks Weave.fxaEnabled to determine whether to use
   // FxA defaults or legacy defaults.  As .fxaEnabled checks the username, we
   // set a username here then reset the default to ensure they are used.
   ensureLegacyIdentityManager();
   setBasicCredentials("johndoe");
   scheduler.setDefaults();
 
new file mode 100644
--- /dev/null
+++ b/services/sync/tests/unit/test_telemetry.js
@@ -0,0 +1,409 @@
+/* Any copyright is dedicated to the Public Domain.
+   http://creativecommons.org/publicdomain/zero/1.0/ */
+
+Cu.import("resource://services-common/observers.js");
+Cu.import("resource://services-sync/telemetry.js");
+Cu.import("resource://services-sync/service.js");
+Cu.import("resource://services-sync/record.js");
+Cu.import("resource://services-sync/resource.js");
+Cu.import("resource://services-sync/constants.js");
+Cu.import("resource://services-sync/engines.js");
+Cu.import("resource://services-sync/engines/bookmarks.js");
+Cu.import("resource://services-sync/engines/clients.js");
+Cu.import("resource://testing-common/services/sync/utils.js");
+Cu.import("resource://testing-common/services/sync/fxa_utils.js");
+Cu.import("resource://testing-common/services/sync/rotaryengine.js");
+
+Cu.import("resource://gre/modules/PlacesUtils.jsm");
+Cu.import("resource://services-sync/util.js");
+
+initTestLogging("Trace");
+
+function SteamStore(engine) {
+  Store.call(this, "Steam", engine);
+}
+
+SteamStore.prototype = {
+  __proto__: Store.prototype,
+};
+
+function SteamTracker(name, engine) {
+  Tracker.call(this, name || "Steam", engine);
+}
+
+SteamTracker.prototype = {
+  __proto__: Tracker.prototype
+};
+
+function SteamEngine(service) {
+  Engine.call(this, "steam", service);
+}
+
+SteamEngine.prototype = {
+  __proto__: Engine.prototype,
+  _storeObj: SteamStore,
+  _trackerObj: SteamTracker,
+  _errToThrow: null,
+  _sync() {
+    if (this._errToThrow) {
+      throw this._errToThrow;
+    }
+  }
+};
+
+function cleanAndGo(server) {
+  Svc.Prefs.resetBranch("");
+  Svc.Prefs.set("log.logger.engine.rotary", "Trace");
+  Service.recordManager.clearCache();
+  return new Promise(resolve => server.stop(resolve));
+}
+
+// Avoid addon manager complaining about not being initialized
+Service.engineManager.unregister("addons");
+
+add_identity_test(this, function *test_basic() {
+  let helper = track_collections_helper();
+  let upd = helper.with_updated_collection;
+
+  yield configureIdentity({ username: "johndoe" });
+  let handlers = {
+    "/1.1/johndoe/info/collections": helper.handler,
+    "/1.1/johndoe/storage/crypto/keys": upd("crypto", new ServerWBO("keys").handler()),
+    "/1.1/johndoe/storage/meta/global": upd("meta",  new ServerWBO("global").handler())
+  };
+
+  let collections = ["clients", "bookmarks", "forms", "history", "passwords", "prefs", "tabs"];
+
+  for (let coll of collections) {
+    handlers["/1.1/johndoe/storage/" + coll] = upd(coll, new ServerCollection({}, true).handler());
+  }
+
+  let server = httpd_setup(handlers);
+  Service.serverURL = server.baseURI;
+
+  yield sync_and_validate_telem(true);
+
+  yield new Promise(resolve => server.stop(resolve));
+});
+
+add_task(function* test_processIncoming_error() {
+  let engine = new BookmarksEngine(Service);
+  let store  = engine._store;
+  let server = serverForUsers({"foo": "password"}, {
+    meta: {global: {engines: {bookmarks: {version: engine.version,
+                                           syncID: engine.syncID}}}},
+    bookmarks: {}
+  });
+  new SyncTestingInfrastructure(server.server);
+  let collection = server.user("foo").collection("bookmarks");
+  try {
+    // Create a bogus record that when synced down will provoke a
+    // network error which in turn provokes an exception in _processIncoming.
+    const BOGUS_GUID = "zzzzzzzzzzzz";
+    let bogus_record = collection.insert(BOGUS_GUID, "I'm a bogus record!");
+    bogus_record.get = function get() {
+      throw "Sync this!";
+    };
+    // Make the 10 minutes old so it will only be synced in the toFetch phase.
+    bogus_record.modified = Date.now() / 1000 - 60 * 10;
+    engine.lastSync = Date.now() / 1000 - 60;
+    engine.toFetch = [BOGUS_GUID];
+
+    let error, ping;
+    try {
+      yield sync_engine_and_validate_telem(engine, true, errPing => ping = errPing);
+    } catch(ex) {
+      error = ex;
+    }
+    ok(!!error);
+    ok(!!ping);
+    equal(ping.uid, "0".repeat(32));
+    deepEqual(ping.failureReason, {
+      name: "othererror",
+      error: "error.engine.reason.record_download_fail"
+    });
+
+    equal(ping.engines.length, 1);
+    equal(ping.engines[0].name, "bookmarks");
+    deepEqual(ping.engines[0].failureReason, {
+      name: "othererror",
+      error: "error.engine.reason.record_download_fail"
+    });
+
+  } finally {
+    store.wipe();
+    yield cleanAndGo(server);
+  }
+});
+
+add_task(function *test_uploading() {
+  let engine = new BookmarksEngine(Service);
+  let store  = engine._store;
+  let server = serverForUsers({"foo": "password"}, {
+    meta: {global: {engines: {bookmarks: {version: engine.version,
+                                           syncID: engine.syncID}}}},
+    bookmarks: {}
+  });
+  new SyncTestingInfrastructure(server.server);
+
+  let parent = PlacesUtils.toolbarFolderId;
+  let uri = Utils.makeURI("http://getfirefox.com/");
+  let title = "Get Firefox";
+
+  let bmk_id = PlacesUtils.bookmarks.insertBookmark(parent, uri,
+    PlacesUtils.bookmarks.DEFAULT_INDEX, "Get Firefox!");
+
+  let guid = store.GUIDForId(bmk_id);
+  let record = store.createRecord(guid);
+
+  let collection = server.user("foo").collection("bookmarks");
+  try {
+    let ping = yield sync_engine_and_validate_telem(engine, false);
+    ok(!!ping);
+    equal(ping.engines.length, 1);
+    equal(ping.engines[0].name, "bookmarks");
+    ok(!!ping.engines[0].outgoing);
+    greater(ping.engines[0].outgoing[0].sent, 0)
+    ok(!ping.engines[0].incoming);
+
+    PlacesUtils.bookmarks.setItemTitle(bmk_id, "New Title");
+
+    store.wipe();
+    engine.resetClient();
+
+    ping = yield sync_engine_and_validate_telem(engine, false);
+
+    equal(ping.engines.length, 1);
+    equal(ping.engines[0].name, "bookmarks");
+    equal(ping.engines[0].outgoing.length, 1);
+    ok(!!ping.engines[0].incoming);
+
+  } finally {
+    // Clean up.
+    store.wipe();
+    yield cleanAndGo(server);
+  }
+});
+
+add_task(function *test_upload_failed() {
+  Service.identity.username = "foo";
+  let collection = new ServerCollection();
+  collection._wbos.flying = new ServerWBO('flying');
+
+  let server = sync_httpd_setup({
+      "/1.1/foo/storage/rotary": collection.handler()
+  });
+
+  let syncTesting = new SyncTestingInfrastructure(server);
+
+  let engine = new RotaryEngine(Service);
+  engine.lastSync = 123; // needs to be non-zero so that tracker is queried
+  engine.lastSyncLocal = 456;
+  engine._store.items = {
+    flying: "LNER Class A3 4472",
+    scotsman: "Flying Scotsman",
+    peppercorn: "Peppercorn Class"
+  };
+  const FLYING_CHANGED = 12345;
+  const SCOTSMAN_CHANGED = 23456;
+  const PEPPERCORN_CHANGED = 34567;
+  engine._tracker.addChangedID("flying", FLYING_CHANGED);
+  engine._tracker.addChangedID("scotsman", SCOTSMAN_CHANGED);
+  engine._tracker.addChangedID("peppercorn", PEPPERCORN_CHANGED);
+
+  let meta_global = Service.recordManager.set(engine.metaURL, new WBORecord(engine.metaURL));
+  meta_global.payload.engines = { rotary: { version: engine.version, syncID: engine.syncID } };
+
+  try {
+    engine.enabled = true;
+    let ping = yield sync_engine_and_validate_telem(engine, true);
+    ok(!!ping);
+    equal(ping.engines.length, 1);
+    equal(ping.engines[0].incoming, null);
+    deepEqual(ping.engines[0].outgoing, [{ sent: 3, failed: 2 }]);
+    engine.lastSync = 123;
+    engine.lastSyncLocal = 456;
+
+    ping = yield sync_engine_and_validate_telem(engine, true);
+    ok(!!ping);
+    equal(ping.engines.length, 1);
+    equal(ping.engines[0].incoming.reconciled, 1);
+    deepEqual(ping.engines[0].outgoing, [{ sent: 2, failed: 2 }]);
+
+  } finally {
+    yield cleanAndGo(server);
+  }
+});
+
+add_task(function *test_sync_partialUpload() {
+  Service.identity.username = "foo";
+
+  let collection = new ServerCollection();
+  let server = sync_httpd_setup({
+      "/1.1/foo/storage/rotary": collection.handler()
+  });
+  let syncTesting = new SyncTestingInfrastructure(server);
+  generateNewKeys(Service.collectionKeys);
+
+  let engine = new RotaryEngine(Service);
+  engine.lastSync = 123;
+  engine.lastSyncLocal = 456;
+
+
+  // Create a bunch of records (and server side handlers)
+  for (let i = 0; i < 234; i++) {
+    let id = 'record-no-' + i;
+    engine._store.items[id] = "Record No. " + i;
+    engine._tracker.addChangedID(id, i);
+    // Let two items in the first upload batch fail.
+    if (i != 23 && i != 42) {
+      collection.insert(id);
+    }
+  }
+
+  let meta_global = Service.recordManager.set(engine.metaURL,
+                                              new WBORecord(engine.metaURL));
+  meta_global.payload.engines = {rotary: {version: engine.version,
+                                          syncID: engine.syncID}};
+
+  try {
+    engine.enabled = true;
+    let ping = yield sync_engine_and_validate_telem(engine, true);
+
+    ok(!!ping);
+    ok(!ping.failureReason);
+    equal(ping.engines.length, 1);
+    equal(ping.engines[0].name, "rotary");
+    ok(!ping.engines[0].incoming);
+    ok(!ping.engines[0].failureReason);
+    deepEqual(ping.engines[0].outgoing, [{ sent: 234, failed: 2 }]);
+
+    collection.post = function() { throw "Failure"; }
+
+    engine._store.items["record-no-1000"] = "Record No. 1000";
+    engine._tracker.addChangedID("record-no-1000", 1000);
+    collection.insert("record-no-1000", 1000);
+
+    engine.lastSync = 123;
+    engine.lastSyncLocal = 456;
+    ping = null;
+
+    try {
+      // should throw
+      yield sync_engine_and_validate_telem(engine, true, errPing => ping = errPing);
+    } catch (e) {}
+    // It would be nice if we had a more descriptive error for this...
+    let uploadFailureError = {
+      name: "othererror",
+      error: "error.engine.reason.record_upload_fail"
+    };
+
+    ok(!!ping);
+    deepEqual(ping.failureReason, uploadFailureError);
+    equal(ping.engines.length, 1);
+    equal(ping.engines[0].name, "rotary");
+    deepEqual(ping.engines[0].incoming, {
+      failed: 1,
+      newFailed: 1,
+      reconciled: 232
+    });
+    ok(!ping.engines[0].outgoing);
+    deepEqual(ping.engines[0].failureReason, uploadFailureError);
+
+  } finally {
+    yield cleanAndGo(server);
+  }
+});
+
+add_task(function* test_generic_engine_fail() {
+  Service.engineManager.register(SteamEngine);
+  let engine = Service.engineManager.get("steam");
+  engine.enabled = true;
+  let store  = engine._store;
+  let server = serverForUsers({"foo": "password"}, {
+    meta: {global: {engines: {steam: {version: engine.version,
+                                      syncID: engine.syncID}}}},
+    steam: {}
+  });
+  new SyncTestingInfrastructure(server.server);
+  let e = new Error("generic failure message")
+  engine._errToThrow = e;
+
+  try {
+    let ping = yield sync_and_validate_telem(true);
+    equal(ping.status.service, SYNC_FAILED_PARTIAL);
+    deepEqual(ping.engines.find(e => e.name === "steam").failureReason, {
+      name: "unexpectederror",
+      error: String(e)
+    });
+  } finally {
+    Service.engineManager.unregister(engine);
+    yield cleanAndGo(server);
+  }
+});
+
+add_task(function* test_initial_sync_engines() {
+  Service.engineManager.register(SteamEngine);
+  let engine = Service.engineManager.get("steam");
+  engine.enabled = true;
+  let store  = engine._store;
+  let engines = {};
+  // These are the only ones who actually have things to sync at startup.
+  let engineNames = ["clients", "bookmarks", "prefs", "tabs"];
+  let conf = { meta: { global: { engines } } };
+  for (let e of engineNames) {
+    engines[e] = { version: engine.version, syncID: engine.syncID };
+    conf[e] = {};
+  }
+  let server = serverForUsers({"foo": "password"}, conf);
+  new SyncTestingInfrastructure(server.server);
+  try {
+    let ping = yield wait_for_ping(() => Service.sync(), true);
+
+    equal(ping.engines.find(e => e.name === "clients").outgoing[0].sent, 1);
+    equal(ping.engines.find(e => e.name === "tabs").outgoing[0].sent, 1);
+
+    // for the rest we don't care about specifics
+    for (let e of ping.engines) {
+      if (!engineNames.includes(engine.name)) {
+        continue;
+      }
+      greaterOrEqual(e.took, 1);
+      ok(!!e.outgoing)
+      equal(e.outgoing.length, 1);
+      notEqual(e.outgoing[0].sent, undefined);
+      equal(e.outgoing[0].failed, undefined);
+    }
+  } finally {
+    yield cleanAndGo(server);
+  }
+});
+
+add_task(function* test_nserror() {
+  Service.engineManager.register(SteamEngine);
+  let engine = Service.engineManager.get("steam");
+  engine.enabled = true;
+  let store  = engine._store;
+  let server = serverForUsers({"foo": "password"}, {
+    meta: {global: {engines: {steam: {version: engine.version,
+                                      syncID: engine.syncID}}}},
+    steam: {}
+  });
+  new SyncTestingInfrastructure(server.server);
+  engine._errToThrow = Components.Exception("NS_ERROR_UNKNOWN_HOST", Cr.NS_ERROR_UNKNOWN_HOST);
+  try {
+    let ping = yield sync_and_validate_telem(true);
+    deepEqual(ping.status, {
+      service: SYNC_FAILED_PARTIAL,
+      sync: LOGIN_FAILED_NETWORK_ERROR
+    });
+    let enginePing = ping.engines.find(e => e.name === "steam");
+    deepEqual(enginePing.failureReason, {
+      name: "nserror",
+      code: Cr.NS_ERROR_UNKNOWN_HOST
+    });
+  } finally {
+    Service.engineManager.unregister(engine);
+    yield cleanAndGo(server);
+  }
+});
--- a/services/sync/tests/unit/xpcshell.ini
+++ b/services/sync/tests/unit/xpcshell.ini
@@ -6,16 +6,17 @@ skip-if = toolkit == 'gonk'
 support-files =
   addon1-search.xml
   bootstrap1-search.xml
   fake_login_manager.js
   missing-sourceuri.xml
   missing-xpi-search.xml
   places_v10_from_v11.sqlite
   rewrite-search.xml
+  sync_ping_schema.json
   !/services/common/tests/unit/head_helpers.js
   !/toolkit/mozapps/extensions/test/xpcshell/head_addons.js
 
 # The manifest is roughly ordered from low-level to high-level. When making
 # systemic sweeping changes, this makes it easier to identify errors closer to
 # the source.
 
 # Ensure we can import everything.
@@ -180,8 +181,10 @@ support-files = prefs_test_prefs_store.j
 
 [test_warn_on_truncated_response.js]
 
 # FxA migration
 [test_fxa_migration.js]
 
 # Synced tabs.
 [test_syncedtabs.js]
+
+[test_telemetry.js]