Bug 1253740 - Add code that syncs and tests, r=kmag draft
authorEthan Glasser-Camp <eglassercamp@mozilla.com>
Thu, 08 Sep 2016 14:16:09 -0400
changeset 437980 14a7f3cd0e623d1d35416b8db7b9bca61608d84e
parent 437979 3d1f55156b97abf3af78e85f7b01221a8d11b4f1
child 437981 196d52ea899c236960860b76be5137369511b589
push id35578
push usereglassercamp@mozilla.com
push dateSat, 12 Nov 2016 03:33:15 +0000
reviewerskmag
bugs1253740
milestone52.0a1
Bug 1253740 - Add code that syncs and tests, r=kmag MozReview-Commit-ID: 8pm2jG92hCs
modules/libpref/init/all.js
toolkit/components/extensions/ExtensionStorageSync.jsm
toolkit/components/extensions/ext-storage.js
toolkit/components/extensions/test/xpcshell/test_ext_storage_sync.js
toolkit/components/extensions/test/xpcshell/xpcshell.ini
--- a/modules/libpref/init/all.js
+++ b/modules/libpref/init/all.js
@@ -5460,16 +5460,21 @@ pref("toolkit.pageThumbs.minHeight", 0);
 
 pref("webextensions.tests", false);
 
 // 16MB default non-parseable upload limit for requestBody.raw.bytes
 pref("webextensions.webRequest.requestBodyMaxRawBytes", 16777216);
 
 // This functionality is still experimental
 pref("webextensions.storage.sync.enabled", false);
+#ifdef RELEASE_OR_BETA
+pref("webextensions.storage.sync.serverURL", "https://webextensions.settings.services.mozilla.com/v1");
+#else
+pref("webextensions.storage.sync.serverURL", "https://webextensions.dev.mozaws.net/v1");
+#endif
 
 // Allow customization of the fallback directory for file uploads
 pref("dom.input.fallbackUploadDir", "");
 
 // Turn rewriting of youtube embeds on/off
 pref("plugins.rewrite_youtube_embeds", true);
 
 // Don't hide Flash from navigator.plugins when it is click-to-activate
--- a/toolkit/components/extensions/ExtensionStorageSync.jsm
+++ b/toolkit/components/extensions/ExtensionStorageSync.jsm
@@ -1,51 +1,83 @@
 /* This Source Code Form is subject to the terms of the Mozilla Public
  * License, v. 2.0. If a copy of the MPL was not distributed with this
  * file, You can obtain one at http://mozilla.org/MPL/2.0/. */
 
+// TODO:
+// * find out how the Chrome implementation deals with conflicts
+
 "use strict";
 
 this.EXPORTED_SYMBOLS = ["ExtensionStorageSync"];
 
 const Ci = Components.interfaces;
 const Cc = Components.classes;
 const Cu = Components.utils;
 const Cr = Components.results;
 const global = this;
 
+Cu.import("resource://gre/modules/AppConstants.jsm");
+const KINTO_PROD_SERVER_URL = "https://webextensions.settings.services.mozilla.com/v1";
+const KINTO_DEV_SERVER_URL = "https://webextensions.dev.mozaws.net/v1";
+const KINTO_DEFAULT_SERVER_URL = AppConstants.RELEASE_OR_BETA ? KINTO_PROD_SERVER_URL : KINTO_DEV_SERVER_URL;
+
 const STORAGE_SYNC_ENABLED_PREF = "webextensions.storage.sync.enabled";
+const STORAGE_SYNC_SERVER_URL_PREF = "webextensions.storage.sync.serverURL";
+const STORAGE_SYNC_SCOPE = "sync:addon_storage";
+const STORAGE_SYNC_CRYPTO_COLLECTION_NAME = "storage-sync-crypto";
+const STORAGE_SYNC_CRYPTO_KEYRING_RECORD_ID = "keys";
+const FXA_OAUTH_OPTIONS = {
+  scope: STORAGE_SYNC_SCOPE,
+};
+// Default is 5sec, which seems a bit aggressive on the open internet
+const KINTO_REQUEST_TIMEOUT = 30000;
 
 Cu.import("resource://gre/modules/XPCOMUtils.jsm");
 const {
   runSafeSyncWithoutClone,
 } = Cu.import("resource://gre/modules/ExtensionUtils.jsm");
 
 XPCOMUtils.defineLazyModuleGetter(this, "AppsUtils",
                                   "resource://gre/modules/AppsUtils.jsm");
 XPCOMUtils.defineLazyModuleGetter(this, "AsyncShutdown",
                                   "resource://gre/modules/AsyncShutdown.jsm");
+XPCOMUtils.defineLazyModuleGetter(this, "CollectionKeyManager",
+                                  "resource://services-sync/record.js");
+XPCOMUtils.defineLazyModuleGetter(this, "EncryptionRemoteTransformer",
+                                  "resource://services-sync/engines/extension-storage.js");
 XPCOMUtils.defineLazyModuleGetter(this, "ExtensionStorage",
                                   "resource://gre/modules/ExtensionStorage.jsm");
+XPCOMUtils.defineLazyModuleGetter(this, "fxAccounts",
+                                  "resource://gre/modules/FxAccounts.jsm");
 XPCOMUtils.defineLazyModuleGetter(this, "loadKinto",
                                   "resource://services-common/kinto-offline-client.js");
+XPCOMUtils.defineLazyModuleGetter(this, "Log",
+                                  "resource://gre/modules/Log.jsm");
 XPCOMUtils.defineLazyModuleGetter(this, "Observers",
                                   "resource://services-common/observers.js");
 XPCOMUtils.defineLazyModuleGetter(this, "Sqlite",
                                   "resource://gre/modules/Sqlite.jsm");
 XPCOMUtils.defineLazyModuleGetter(this, "Task",
                                   "resource://gre/modules/Task.jsm");
+XPCOMUtils.defineLazyModuleGetter(this, "KeyRingEncryptionRemoteTransformer",
+                                  "resource://services-sync/engines/extension-storage.js");
 XPCOMUtils.defineLazyPreferenceGetter(this, "prefPermitsStorageSync",
                                       STORAGE_SYNC_ENABLED_PREF, false);
+XPCOMUtils.defineLazyPreferenceGetter(this, "prefStorageSyncServerURL",
+                                      STORAGE_SYNC_SERVER_URL_PREF,
+                                      KINTO_DEFAULT_SERVER_URL);
 
-/* globals prefPermitsStorageSync */
+/* globals prefPermitsStorageSync, prefStorageSyncServerURL */
 
 // Map of Extensions to Set<Contexts> to track contexts that are still
 // "live" and use storage.sync.
-const extensionContexts = new WeakMap();
+const extensionContexts = new Map();
+// Borrow logger from Sync.
+const log = Log.repository.getLogger("Sync.Engine.Extension-Storage");
 
 /**
  * A Promise that centralizes initialization of ExtensionStorageSync.
  *
  * This centralizes the use of the Sqlite database, to which there is
  * only one connection which is shared by all threads.
  *
  * Fields in the object returned by this Promise:
@@ -60,16 +92,17 @@ const storageSyncInit = Task.spawn(funct
   const opts = {path, sharedMemoryCache: false};
   const connection = yield Sqlite.openConnection(opts);
   yield Kinto.adapters.FirefoxAdapter._init(connection);
   return {
     connection,
     kinto: new Kinto({
       adapter: Kinto.adapters.FirefoxAdapter,
       adapterOptions: {sqliteHandle: connection},
+      timeout: KINTO_REQUEST_TIMEOUT,
     }),
   };
 });
 
 AsyncShutdown.profileBeforeChange.addBlocker(
   "ExtensionStorageSync: close Sqlite handle",
   Task.async(function* () {
     const ret = yield storageSyncInit;
@@ -124,16 +157,119 @@ const storageSyncIdSchema = {
   },
 
   // See keyToId and idToKey for more details.
   validate(id) {
     return idToKey(id) !== null;
   },
 };
 
+// An "id schema" used for the system collection, which doesn't
+// require validation or generation of IDs.
+const cryptoCollectionIdSchema = {
+  generate() {
+    throw new Error("cannot generate IDs for system collection");
+  },
+
+  validate(id) {
+    return true;
+  },
+};
+
+/**
+ * Wrapper around the crypto collection providing some handy utilities.
+ */
+const cryptoCollection = this.cryptoCollection = {
+  getCollection: Task.async(function* () {
+    const {kinto} = yield storageSyncInit;
+    return kinto.collection(STORAGE_SYNC_CRYPTO_COLLECTION_NAME, {
+      idSchema: cryptoCollectionIdSchema,
+      remoteTransformers: [new KeyRingEncryptionRemoteTransformer()],
+    });
+  }),
+
+  /**
+   * Retrieve the keyring record from the crypto collection.
+   *
+   * You can use this if you want to check metadata on the keyring
+   * record rather than use the keyring itself.
+   *
+   * @returns {Promise<Object>}
+   */
+  getKeyRingRecord: Task.async(function* () {
+    const collection = yield this.getCollection();
+    const cryptoKeyRecord = yield collection.getAny(STORAGE_SYNC_CRYPTO_KEYRING_RECORD_ID);
+    return cryptoKeyRecord.data;
+  }),
+
+  /**
+   * Retrieve the actual keyring from the crypto collection.
+   *
+   * @returns {Promise<CollectionKeyManager>}
+   */
+  getKeyRing: Task.async(function* () {
+    const cryptoKeyRecord = yield this.getKeyRingRecord();
+    const collectionKeys = new CollectionKeyManager();
+    if (cryptoKeyRecord) {
+      collectionKeys.setContents(cryptoKeyRecord.keys, cryptoKeyRecord.last_modified);
+    } else {
+      // We never actually use the default key, so it's OK if we
+      // generate one multiple times.
+      collectionKeys.generateDefaultKey();
+    }
+    return collectionKeys;
+  }),
+
+  upsert: Task.async(function* (record) {
+    const collection = yield this.getCollection();
+    yield collection.upsert(record);
+  }),
+
+  sync: Task.async(function* () {
+    const collection = yield this.getCollection();
+    return yield ExtensionStorageSync._syncCollection(collection, {
+      strategy: "server_wins",
+    });
+  }),
+
+  // Used only for testing.
+  _clear: Task.async(function* () {
+    const collection = yield this.getCollection();
+    yield collection.clear();
+  }),
+};
+
+/**
+ * An EncryptionRemoteTransformer that uses the special "keys" record
+ * to find a key for a given extension.
+ *
+ * @param {string} extensionId The extension ID for which to find a key.
+ */
+class CollectionKeyEncryptionRemoteTransformer extends EncryptionRemoteTransformer {
+  constructor(extensionId) {
+    super();
+    this.extensionId = extensionId;
+  }
+
+  getKeys() {
+    const self = this;
+    return Task.spawn(function* () {
+      // FIXME: cache the crypto record for the duration of a sync cycle?
+      const collectionKeys = yield cryptoCollection.getKeyRing();
+      if (!collectionKeys.hasKeysFor([self.extensionId])) {
+        // This should never happen. Keys should be created (and
+        // synced) at the beginning of the sync cycle.
+        throw new Error(`tried to encrypt records for ${this.extensionId}, but key is not present`);
+      }
+      return collectionKeys.keyForCollection(self.extensionId);
+    });
+  }
+}
+global.CollectionKeyEncryptionRemoteTransformer = CollectionKeyEncryptionRemoteTransformer;
+
 /**
  * Clean up now that one context is no longer using this extension's collection.
  *
  * @param {Extension} extension
  *                    The extension whose context just ended.
  * @param {Context} context
  *                  The context that just ended.
  */
@@ -161,27 +297,180 @@ function cleanUpForContext(extension, co
  *                  close.
  * @returns {Promise<Collection>}
  */
 const openCollection = Task.async(function* (extension, context) {
   // FIXME: This leaks metadata about what extensions a user has
   // installed.  We should calculate collection ID using a hash of
   // user ID, extension ID, and some secret.
   let collectionId = extension.id;
-  // TODO: implement sync process
   const {kinto} = yield storageSyncInit;
   const coll = kinto.collection(collectionId, {
     idSchema: storageSyncIdSchema,
+    remoteTransformers: [new CollectionKeyEncryptionRemoteTransformer(extension.id)],
   });
   return coll;
 });
 
 this.ExtensionStorageSync = {
+  _fxaService: fxAccounts,
   listeners: new WeakMap(),
 
+  syncAll: Task.async(function* () {
+    const extensions = extensionContexts.keys();
+    const extIds = Array.from(extensions, extension => extension.id);
+    log.debug(`Syncing extension settings for ${JSON.stringify(extIds)}\n`);
+    if (extIds.length == 0) {
+      // No extensions to sync. Get out.
+      return;
+    }
+    yield this.ensureKeysFor(extIds);
+    const promises = Array.from(extensionContexts.keys(), extension => {
+      return openCollection(extension).then(coll => {
+        return this.sync(extension, coll);
+      });
+    });
+    yield Promise.all(promises);
+  }),
+
+  sync: Task.async(function* (extension, collection) {
+    const signedInUser = yield this._fxaService.getSignedInUser();
+    if (!signedInUser) {
+      // FIXME: this should support syncing to self-hosted
+      log.info("User was not signed into FxA; cannot sync");
+      throw new Error("Not signed in to FxA");
+    }
+    // FIXME: this leaks metadata about what extensions are being used
+    const collectionId = extension.id;
+    let syncResults;
+    try {
+      syncResults = yield this._syncCollection(collection, {
+        strategy: "client_wins",
+        collection: collectionId,
+      });
+    } catch (err) {
+      log.warn("Syncing failed", err);
+      throw err;
+    }
+
+    let changes = {};
+    for (const record of syncResults.created) {
+      changes[record.key] = {
+        newValue: record.data,
+      };
+    }
+    for (const record of syncResults.updated) {
+      // N.B. It's safe to just pick old.key because it's not
+      // possible to "rename" a record in the storage.sync API.
+      const key = record.old.key;
+      changes[key] = {
+        oldValue: record.old.data,
+        newValue: record.new.data,
+      };
+    }
+    for (const record of syncResults.deleted) {
+      changes[record.key] = {
+        oldValue: record.data,
+      };
+    }
+    for (const conflict of syncResults.resolved) {
+      // FIXME: Should we even send a notification? If so, what
+      // best values for "old" and "new"? This might violate
+      // client code's assumptions, since from their perspective,
+      // we were in state L, but this diff is from R -> L.
+      changes[conflict.remote.key] = {
+        oldValue: conflict.local.data,
+        newValue: conflict.remote.data,
+      };
+    }
+    if (Object.keys(changes).length > 0) {
+      this.notifyListeners(extension, changes);
+    }
+  }),
+
+  /**
+   * Utility function that handles the common stuff about syncing all
+   * Kinto collections (including "meta" collections like the crypto
+   * one).
+   *
+   * @param {Collection} collection
+   * @param {Object} options
+   *                 Additional options to be passed to sync().
+   * @returns {Promise<SyncResultObject>}
+   */
+  _syncCollection: Task.async(function* (collection, options) {
+    // FIXME: this should support syncing to self-hosted
+    return yield this._requestWithToken(`Syncing ${collection.name}`, function* (token) {
+      const allOptions = Object.assign({}, {
+        remote: prefStorageSyncServerURL,
+        headers: {
+          Authorization: "Bearer " + token,
+        },
+      }, options);
+
+      return yield collection.sync(allOptions);
+    });
+  }),
+
+  // Make a Kinto request with a current FxA token.
+  // If the response indicates that the token might have expired,
+  // retry the request.
+  _requestWithToken: Task.async(function* (description, f) {
+    const fxaToken = yield this._fxaService.getOAuthToken(FXA_OAUTH_OPTIONS);
+    try {
+      return yield f(fxaToken);
+    } catch (e) {
+      log.error(`${description}: request failed`, e);
+      if (e && e.data && e.data.code == 401) {
+        // Our token might have expired. Refresh and retry.
+        log.info("Token might have expired");
+        yield this._fxaService.removeCachedOAuthToken({token: fxaToken});
+        const newToken = yield this._fxaService.getOAuthToken(FXA_OAUTH_OPTIONS);
+
+        // If this fails too, let it go.
+        return yield f(newToken);
+      }
+      // Otherwise, we don't know how to handle this error, so just reraise.
+      throw e;
+    }
+  }),
+
+  /**
+   * Recursive promise that terminates when our local collectionKeys,
+   * as well as that on the server, have keys for all the extensions
+   * in extIds.
+   *
+   * @param {Array<string>} extIds
+   *                        The IDs of the extensions which need keys.
+   * @returns {Promise<CollectionKeyManager>}
+   */
+  ensureKeysFor: Task.async(function* (extIds) {
+    const collectionKeys = yield cryptoCollection.getKeyRing();
+    if (collectionKeys.hasKeysFor(extIds)) {
+      return collectionKeys;
+    }
+
+    const newKeys = yield collectionKeys.ensureKeysFor(extIds);
+    const newRecord = {
+      id: STORAGE_SYNC_CRYPTO_KEYRING_RECORD_ID,
+      keys: newKeys.asWBO().cleartext,
+    };
+    yield cryptoCollection.upsert(newRecord);
+    const result = yield cryptoCollection.sync();
+    if (result.resolved.length != 0) {
+      // We had a conflict which was automatically resolved. We now
+      // have a new keyring which might have keys for the
+      // collections. Recurse.
+      return yield this.ensureKeysFor(extIds);
+    }
+
+    // No conflicts. We're good.
+    return newKeys;
+  }),
+
   /**
    * Get the collection for an extension, and register the extension
    * as being "in use".
    *
    * @param {Extension} extension
    *                    The extension for which we are seeking
    *                    a collection.
    * @param {Context} context
@@ -298,20 +587,23 @@ this.ExtensionStorageSync = {
       if (res.data && res.data._status != "deleted") {
         records[res.data.key] = res.data.data;
       }
     }
 
     return records;
   }),
 
-  addOnChangedListener(extension, listener) {
+  addOnChangedListener(extension, listener, context) {
     let listeners = this.listeners.get(extension) || new Set();
     listeners.add(listener);
     this.listeners.set(extension, listeners);
+
+    // Force opening the collection so that we will sync for this extension.
+    return this.getCollection(extension, context);
   },
 
   removeOnChangedListener(extension, listener) {
     let listeners = this.listeners.get(extension);
     listeners.delete(listener);
     if (listeners.size == 0) {
       this.listeners.delete(extension);
     }
--- a/toolkit/components/extensions/ext-storage.js
+++ b/toolkit/components/extensions/ext-storage.js
@@ -50,17 +50,17 @@ function storageApiFactory(context) {
         let listenerLocal = changes => {
           fire(changes, "local");
         };
         let listenerSync = changes => {
           fire(changes, "sync");
         };
 
         ExtensionStorage.addOnChangedListener(extension.id, listenerLocal);
-        ExtensionStorageSync.addOnChangedListener(extension, listenerSync);
+        ExtensionStorageSync.addOnChangedListener(extension, listenerSync, context);
         return () => {
           ExtensionStorage.removeOnChangedListener(extension.id, listenerLocal);
           ExtensionStorageSync.removeOnChangedListener(extension, listenerSync);
         };
       }).api(),
     },
   };
 }
--- a/toolkit/components/extensions/test/xpcshell/test_ext_storage_sync.js
+++ b/toolkit/components/extensions/test/xpcshell/test_ext_storage_sync.js
@@ -1,14 +1,356 @@
 /* Any copyright is dedicated to the Public Domain.
  * http://creativecommons.org/publicdomain/zero/1.0/ */
 
 "use strict";
 
-const {keyToId, idToKey} = Cu.import("resource://gre/modules/ExtensionStorageSync.jsm");
+do_get_profile();   // so we can use FxAccounts
+
+Cu.import("resource://testing-common/httpd.js");
+Cu.import("resource://services-common/utils.js");
+
+Cu.import("resource://gre/modules/ExtensionStorageSync.jsm");
+const {
+  CollectionKeyEncryptionRemoteTransformer,
+  cryptoCollection,
+  idToKey,
+  keyToId,
+} = Cu.import("resource://gre/modules/ExtensionStorageSync.jsm");
+Cu.import("resource://services-sync/engines/extension-storage.js");
+Cu.import("resource://services-sync/keys.js");
+
+/* globals BulkKeyBundle, CommonUtils, EncryptionRemoteTransformer */
+/* globals KeyRingEncryptionRemoteTransformer */
+
+function handleCannedResponse(cannedResponse, request, response) {
+  response.setStatusLine(null, cannedResponse.status.status,
+                         cannedResponse.status.statusText);
+  // send the headers
+  for (let headerLine of cannedResponse.sampleHeaders) {
+    let headerElements = headerLine.split(":");
+    response.setHeader(headerElements[0], headerElements[1].trimLeft());
+  }
+  response.setHeader("Date", (new Date()).toUTCString());
+
+  response.write(cannedResponse.responseBody);
+}
+
+function collectionRecordsPath(collectionId) {
+  return `/buckets/default/collections/${collectionId}/records`;
+}
+
+class KintoServer {
+  constructor() {
+    // Set up an HTTP Server
+    this.httpServer = new HttpServer();
+    this.httpServer.start(-1);
+
+    // Map<CollectionId, Set<Object>> corresponding to the data in the
+    // Kinto server
+    this.collections = new Map();
+
+    // ETag to serve with responses
+    this.etag = 1;
+
+    this.port = this.httpServer.identity.primaryPort;
+    // POST requests we receive from the client go here
+    this.posts = [];
+    // Anything in here will force the next POST to generate a conflict
+    this.conflicts = [];
+
+    this.installConfigPath();
+    this.installBatchPath();
+    this.installCatchAll();
+  }
+
+  clearPosts() {
+    this.posts = [];
+  }
+
+  getPosts() {
+    return this.posts;
+  }
+
+  installConfigPath() {
+    const configPath = "/v1/";
+    const responseBody = JSON.stringify({
+      "settings": {"batch_max_requests": 25},
+      "url": `http://localhost:${this.port}/v1/`,
+      "documentation": "https://kinto.readthedocs.org/",
+      "version": "1.5.1",
+      "commit": "cbc6f58",
+      "hello": "kinto",
+    });
+    const configResponse = {
+      "sampleHeaders": [
+        "Access-Control-Allow-Origin: *",
+        "Access-Control-Expose-Headers: Retry-After, Content-Length, Alert, Backoff",
+        "Content-Type: application/json; charset=UTF-8",
+        "Server: waitress",
+      ],
+      "status": {status: 200, statusText: "OK"},
+      "responseBody": responseBody,
+    };
+
+    function handleGetConfig(request, response) {
+      if (request.method != "GET") {
+        dump(`ARGH, got ${request.method}\n`);
+      }
+      return handleCannedResponse(configResponse, request, response);
+    }
+
+    this.httpServer.registerPathHandler(configPath, handleGetConfig);
+  }
+
+  installBatchPath() {
+    const batchPath = "/v1/batch";
+
+    function handlePost(request, response) {
+      let bodyStr = CommonUtils.readBytesFromInputStream(request.bodyInputStream);
+      let body = JSON.parse(bodyStr);
+      let defaults = body.defaults;
+      for (let req of body.requests) {
+        let headers = Object.assign({}, defaults && defaults.headers || {}, req.headers);
+        // FIXME: assert auth is "Bearer ...token..."
+        this.posts.push(Object.assign({}, req, {headers}));
+      }
+
+      response.setStatusLine(null, 200, "OK");
+      response.setHeader("Content-Type", "application/json; charset=UTF-8");
+      response.setHeader("Date", (new Date()).toUTCString());
+
+      let postResponse = {
+        responses: body.requests.map(req => {
+          return {
+            path: req.path,
+            status: 201,   // FIXME -- only for new posts??
+            headers: {"ETag": 3000},   // FIXME???
+            body: {"data": Object.assign({}, req.body.data, {last_modified: this.etag}),
+                   "permissions": []},
+          };
+        }),
+      };
+
+      if (this.conflicts.length > 0) {
+        const {collectionId, encrypted} = this.conflicts.shift();
+        this.collections.get(collectionId).add(encrypted);
+        dump(`responding with etag ${this.etag}\n`);
+        postResponse = {
+          responses: body.requests.map(req => {
+            return {
+              path: req.path,
+              status: 412,
+              headers: {"ETag": this.etag}, // is this correct??
+              body: {
+                details: {
+                  existing: encrypted,
+                },
+              },
+            };
+          }),
+        };
+      }
+
+      response.write(JSON.stringify(postResponse));
+
+      //   "sampleHeaders": [
+      //     "Access-Control-Allow-Origin: *",
+      //     "Access-Control-Expose-Headers: Retry-After, Content-Length, Alert, Backoff",
+      //     "Server: waitress",
+      //     "Etag: \"4000\""
+      //   ],
+    }
+
+    this.httpServer.registerPathHandler(batchPath, handlePost.bind(this));
+  }
+
+  installCatchAll() {
+    this.httpServer.registerPathHandler("/", (request, response) => {
+      dump(`got request: ${request.method}:${request.path}?${request.queryString}\n`);
+      dump(`${CommonUtils.readBytesFromInputStream(request.bodyInputStream)}\n`);
+    });
+  }
+
+  installCollection(collectionId) {
+    this.collections.set(collectionId, new Set());
+
+    const remoteRecordsPath = "/v1" + collectionRecordsPath(encodeURIComponent(collectionId));
+
+    function handleGetRecords(request, response) {
+      if (request.method != "GET") {
+        do_throw(`only GET is supported on ${remoteRecordsPath}`);
+      }
+
+      response.setStatusLine(null, 200, "OK");
+      response.setHeader("Content-Type", "application/json; charset=UTF-8");
+      response.setHeader("Date", (new Date()).toUTCString());
+      response.setHeader("ETag", this.etag.toString());
+
+      const body = JSON.stringify({
+        // Can't JSON a Set directly, so convert to Array
+        "data": Array.from(this.collections.get(collectionId)),
+      });
+      response.write(body);
+    }
+
+    this.httpServer.registerPathHandler(remoteRecordsPath, handleGetRecords.bind(this));
+  }
+
+  // Utility function to install a keyring at the start of a test.
+  installKeyRing(keysData, etag, {conflict = false} = {}) {
+    this.installCollection("storage-sync-crypto");
+    const keysRecord = {
+      "id": "keys",
+      "keys": keysData,
+      "last_modified": etag,
+    };
+    this.etag = etag;
+    const methodName = conflict ? "encryptAndAddRecordWithConflict" : "encryptAndAddRecord";
+    this[methodName](new KeyRingEncryptionRemoteTransformer(),
+                     "storage-sync-crypto", keysRecord);
+  }
+
+  encryptAndAddRecord(transformer, collectionId, record) {
+    return transformer.encode(record).then(encrypted => {
+      this.collections.get(collectionId).add(encrypted);
+    });
+  }
+
+  // Conflicts block the next push and then appear in the collection specified.
+  encryptAndAddRecordWithConflict(transformer, collectionId, record) {
+    return transformer.encode(record).then(encrypted => {
+      this.conflicts.push({collectionId, encrypted});
+    });
+  }
+
+  clearCollection(collectionId) {
+    this.collections.get(collectionId).clear();
+  }
+
+  stop() {
+    this.httpServer.stop(() => { });
+  }
+}
+
+// Run a block of code with access to a KintoServer.
+function* withServer(f) {
+  let server = new KintoServer();
+  // Point the sync.storage client to use the test server we've just started.
+  Services.prefs.setCharPref("webextensions.storage.sync.serverURL",
+                             `http://localhost:${server.port}/v1`);
+  try {
+    yield* f(server);
+  } finally {
+    server.stop();
+  }
+}
+
+// Run a block of code with access to both a sync context and a
+// KintoServer. This is meant as a workaround for eslint's refusal to
+// let me have 5 nested callbacks.
+function* withContextAndServer(f) {
+  yield* withSyncContext(function* (context) {
+    yield* withServer(function* (server) {
+      yield* f(context, server);
+    });
+  });
+}
+
+// Run a block of code with fxa mocked out to return a specific user.
+function* withSignedInUser(user, f) {
+  const oldESSFxAccounts = ExtensionStorageSync._fxaService;
+  const oldERTFxAccounts = EncryptionRemoteTransformer.prototype._fxaService;
+  ExtensionStorageSync._fxaService = EncryptionRemoteTransformer.prototype._fxaService = {
+    getSignedInUser() {
+      return Promise.resolve(user);
+    },
+    getOAuthToken() {
+      return Promise.resolve("some-access-token");
+    },
+  };
+
+  try {
+    yield* f();
+  } finally {
+    ExtensionStorageSync._fxaService = oldESSFxAccounts;
+    EncryptionRemoteTransformer.prototype._fxaService = oldERTFxAccounts;
+  }
+}
+
+// Some assertions that make it easier to write tests about what was
+// posted and when.
+
+// Assert that the request was made with the correct access token.
+// This should be true of all requests, so this is usually called from
+// another assertion.
+function assertAuthenticatedRequest(post) {
+  equal(post.headers.Authorization, "Bearer some-access-token");
+}
+
+// Assert that this post was made with the correct request headers to
+// create a new resource while protecting against someone else
+// creating it at the same time (in other words, "If-None-Match: *").
+// Also calls assertAuthenticatedRequest(post).
+function assertPostedNewRecord(post) {
+  assertAuthenticatedRequest(post);
+  equal(post.headers["If-None-Match"], "*");
+}
+
+// Assert that this post was made with the correct request headers to
+// update an existing resource while protecting against concurrent
+// modification (in other words, `If-Match: "${etag}"`).
+// Also calls assertAuthenticatedRequest(post).
+function assertPostedUpdatedRecord(post, since) {
+  assertAuthenticatedRequest(post);
+  equal(post.headers["If-Match"], `"${since}"`);
+}
+
+// Assert that this post was an encrypted keyring, and produce the
+// decrypted body. Sanity check the body while we're here.
+const assertPostedEncryptedKeys = Task.async(function* (post) {
+  equal(post.path, collectionRecordsPath("storage-sync-crypto") + "/keys");
+
+  let body = yield new KeyRingEncryptionRemoteTransformer().decode(post.body.data);
+  ok(body.keys, `keys object should be present in decoded body`);
+  ok(body.keys.default, `keys object should have a default key`);
+  return body;
+});
+
+// assertEqual, but for keyring[extensionId] == key.
+function assertKeyRingKey(keyRing, extensionId, expectedKey, message) {
+  if (!message) {
+    message = `expected keyring's key for ${extensionId} to match ${expectedKey.keyPairB64}`;
+  }
+  ok(keyRing.hasKeysFor([extensionId]),
+     `expected keyring to have a key for ${extensionId}\n`);
+  deepEqual(keyRing.keyForCollection(extensionId).keyPairB64, expectedKey.keyPairB64,
+            message);
+}
+
+// Tests using this ID will share keys in local storage, so be careful.
+const defaultExtensionId = "{13bdde76-4dc7-11e6-9bdc-54ee758d6342}";
+// FIXME: need to access whatever mechanism we use in the syncing code
+const defaultCollectionId = defaultExtensionId;
+const defaultExtension = {id: defaultExtensionId};
+
+const BORING_KB = "0123456789abcdef0123456789abcdef0123456789abcdef0123456789abcdef";
+const loggedInUser = {
+  kB: BORING_KB,
+  oauthTokens: {
+    "sync:addon-storage": {
+      token: "some-access-token",
+    },
+  },
+};
+
+function uuid() {
+  const uuidgen = Cc["@mozilla.org/uuid-generator;1"].getService(Ci.nsIUUIDGenerator);
+  return uuidgen.generateUUID();
+}
 
 add_task(function* test_key_to_id() {
   equal(keyToId("foo"), "key-foo");
   equal(keyToId("my-new-key"), "key-my_2D_new_2D_key");
   equal(keyToId(""), "key-");
   equal(keyToId("™"), "key-_2122_");
   equal(keyToId("\b"), "key-_8_");
   equal(keyToId("abc\ndef"), "key-abc_A_def");
@@ -24,8 +366,252 @@ add_task(function* test_key_to_id() {
   equal(idToKey("key--abcd"), null);
   equal(idToKey("key-%"), null);
   equal(idToKey("key-_HI"), null);
   equal(idToKey("key-_HI_"), null);
   equal(idToKey("key-"), "");
   equal(idToKey("key-1"), "1");
   equal(idToKey("key-_2D_"), "-");
 });
+
+add_task(function* ensureKeysFor_posts_new_keys() {
+  const extensionId = uuid();
+  yield* withContextAndServer(function* (context, server) {
+    yield* withSignedInUser(loggedInUser, function* () {
+      server.installCollection("storage-sync-crypto");
+      server.etag = 1000;
+
+      let newKeys = yield ExtensionStorageSync.ensureKeysFor([extensionId]);
+      ok(newKeys.hasKeysFor([extensionId]), `key isn't present for ${extensionId}`);
+
+      let posts = server.getPosts();
+      equal(posts.length, 1);
+      const post = posts[0];
+      assertPostedNewRecord(post);
+      const body = yield assertPostedEncryptedKeys(post);
+      ok(body.keys.collections[extensionId], `keys object should have a key for ${extensionId}`);
+    });
+  });
+});
+
+add_task(function* ensureKeysFor_pulls_key() {
+  // ensureKeysFor is implemented by adding a key to our local record
+  // and doing a sync. This means that if the same key exists
+  // remotely, we get a "conflict". Ensure that we handle this
+  // correctly -- we keep the server key (since presumably it's
+  // already been used to encrypt records) and we don't wipe out other
+  // collections' keys.
+  const extensionId = uuid();
+  const extensionId2 = uuid();
+  const DEFAULT_KEY = new BulkKeyBundle("[default]");
+  DEFAULT_KEY.generateRandom();
+  const RANDOM_KEY = new BulkKeyBundle(extensionId);
+  RANDOM_KEY.generateRandom();
+  yield* withContextAndServer(function* (context, server) {
+    yield* withSignedInUser(loggedInUser, function* () {
+      const keysData = {
+        "default": DEFAULT_KEY.keyPairB64,
+        "collections": {
+          [extensionId]: RANDOM_KEY.keyPairB64,
+        },
+      };
+      server.installKeyRing(keysData, 999);
+
+      let collectionKeys = yield ExtensionStorageSync.ensureKeysFor([extensionId]);
+      assertKeyRingKey(collectionKeys, extensionId, RANDOM_KEY);
+
+      let posts = server.getPosts();
+      equal(posts.length, 0,
+            "ensureKeysFor shouldn't push when the server keyring has the right key");
+
+      // Another client generates a key for extensionId2
+      const newKey = new BulkKeyBundle(extensionId2);
+      newKey.generateRandom();
+      keysData.collections[extensionId2] = newKey.keyPairB64;
+      server.clearCollection("storage-sync-crypto");
+      server.installKeyRing(keysData, 1000);
+
+      let newCollectionKeys = yield ExtensionStorageSync.ensureKeysFor([extensionId, extensionId2]);
+      assertKeyRingKey(newCollectionKeys, extensionId2, newKey);
+      assertKeyRingKey(newCollectionKeys, extensionId, RANDOM_KEY,
+                       `ensureKeysFor shouldn't lose the old key for ${extensionId}`);
+
+      posts = server.getPosts();
+      equal(posts.length, 0, "ensureKeysFor shouldn't push when updating keys");
+    });
+  });
+});
+
+add_task(function* ensureKeysFor_handles_conflicts() {
+  // Syncing is done through a pull followed by a push of any merged
+  // changes. Accordingly, the only way to have a "true" conflict --
+  // i.e. with the server rejecting a change -- is if
+  // someone pushes changes between our pull and our push. Ensure that
+  // if this happens, we still behave sensibly (keep the remote key).
+  const extensionId = uuid();
+  const DEFAULT_KEY = new BulkKeyBundle("[default]");
+  DEFAULT_KEY.generateRandom();
+  const RANDOM_KEY = new BulkKeyBundle(extensionId);
+  RANDOM_KEY.generateRandom();
+  yield* withContextAndServer(function* (context, server) {
+    yield* withSignedInUser(loggedInUser, function* () {
+      const keysData = {
+        "default": DEFAULT_KEY.keyPairB64,
+        "collections": {
+          [extensionId]: RANDOM_KEY.keyPairB64,
+        },
+      };
+      server.installKeyRing(keysData, 765, {conflict: true});
+
+      yield cryptoCollection._clear();
+
+      let collectionKeys = yield ExtensionStorageSync.ensureKeysFor([extensionId]);
+      assertKeyRingKey(collectionKeys, extensionId, RANDOM_KEY,
+                       `syncing keyring should keep the server key for ${extensionId}`);
+
+      let posts = server.getPosts();
+      equal(posts.length, 1,
+            "syncing keyring should have tried to post a keyring");
+      const failedPost = posts[0];
+      assertPostedNewRecord(failedPost);
+      let body = yield assertPostedEncryptedKeys(failedPost);
+      // This key will be the one the client generated locally, so
+      // we don't know what its value will be
+      ok(body.keys.collections[extensionId],
+         `decrypted failed post should have a key for ${extensionId}`);
+      notEqual(body.keys.collections[extensionId], RANDOM_KEY.keyPairB64,
+               `decrypted failed post should have a randomly-generated key for ${extensionId}`);
+    });
+  });
+});
+
+add_task(function* test_storage_sync_pulls_changes() {
+  const extensionId = defaultExtensionId;
+  const collectionId = defaultCollectionId;
+  const extension = defaultExtension;
+  yield* withContextAndServer(function* (context, server) {
+    yield* withSignedInUser(loggedInUser, function* () {
+      let transformer = new CollectionKeyEncryptionRemoteTransformer(extensionId);
+      server.installCollection(collectionId);
+      server.installCollection("storage-sync-crypto");
+
+      let calls = [];
+      yield ExtensionStorageSync.addOnChangedListener(extension, function() {
+        calls.push(arguments);
+      }, context);
+
+      yield ExtensionStorageSync.ensureKeysFor([extensionId]);
+      yield server.encryptAndAddRecord(transformer, collectionId, {
+        "id": "key-remote_2D_key",
+        "key": "remote-key",
+        "data": 6,
+      });
+
+      yield ExtensionStorageSync.syncAll();
+      const remoteValue = (yield ExtensionStorageSync.get(extension, "remote-key", context))["remote-key"];
+      equal(remoteValue, 6,
+            "ExtensionStorageSync.get() returns value retrieved from sync");
+
+      equal(calls.length, 1,
+            "syncing calls on-changed listener");
+      deepEqual(calls[0][0], {"remote-key": {newValue: 6}});
+      calls = [];
+
+      // Syncing again doesn't do anything
+      yield ExtensionStorageSync.syncAll();
+
+      equal(calls.length, 0,
+            "syncing again shouldn't call on-changed listener");
+
+      // Updating the server causes us to pull down the new value
+      server.etag = 1000;
+      server.clearCollection(collectionId);
+      yield server.encryptAndAddRecord(transformer, collectionId, {
+        "id": "key-remote_2D_key",
+        "key": "remote-key",
+        "data": 7,
+      });
+
+      yield ExtensionStorageSync.syncAll();
+      const remoteValue2 = (yield ExtensionStorageSync.get(extension, "remote-key", context))["remote-key"];
+      equal(remoteValue2, 7,
+            "ExtensionStorageSync.get() returns value updated from sync");
+
+      equal(calls.length, 1,
+            "syncing calls on-changed listener on update");
+      deepEqual(calls[0][0], {"remote-key": {oldValue: 6, newValue: 7}});
+    });
+  });
+});
+
+add_task(function* test_storage_sync_pushes_changes() {
+  const extensionId = defaultExtensionId;
+  const collectionId = defaultCollectionId;
+  const extension = defaultExtension;
+  yield* withContextAndServer(function* (context, server) {
+    yield* withSignedInUser(loggedInUser, function* () {
+      let transformer = new CollectionKeyEncryptionRemoteTransformer(extensionId);
+      server.installCollection(collectionId);
+      server.installCollection("storage-sync-crypto");
+      server.etag = 1000;
+
+      yield ExtensionStorageSync.set(extension, {"my-key": 5}, context);
+
+      // install this AFTER we set the key to 5...
+      let calls = [];
+      ExtensionStorageSync.addOnChangedListener(extension, function() {
+        calls.push(arguments);
+      }, context);
+
+      yield ExtensionStorageSync.syncAll();
+      const localValue = (yield ExtensionStorageSync.get(extension, "my-key", context))["my-key"];
+      equal(localValue, 5,
+            "pushing an ExtensionStorageSync value shouldn't change local value");
+
+      let posts = server.getPosts();
+      equal(posts.length, 1,
+            "pushing a value should cause a post to the server");
+      const post = posts[0];
+      assertPostedNewRecord(post);
+      equal(post.path, collectionRecordsPath(collectionId) + "/key-my_2D_key",
+            "pushing a value should have a path corresponding to its id");
+
+      const encrypted = post.body.data;
+      ok(encrypted.ciphertext,
+         "pushing a value should post an encrypted record");
+      ok(!encrypted.data,
+         "pushing a value should not have any plaintext data");
+      equal(encrypted.id, "key-my_2D_key",
+            "pushing a value should use a kinto-friendly record ID");
+
+      const record = yield transformer.decode(encrypted);
+      equal(record.key, "my-key",
+            "when decrypted, a pushed value should have a key field corresponding to its storage.sync key");
+      equal(record.data, 5,
+            "when decrypted, a pushed value should have a data field corresponding to its storage.sync value");
+      equal(record.id, "key-my_2D_key",
+            "when decrypted, a pushed value should have an id field corresponding to its record ID");
+
+      equal(calls.length, 0,
+            "pushing a value shouldn't call the on-changed listener");
+
+      yield ExtensionStorageSync.set(extension, {"my-key": 6}, context);
+      yield ExtensionStorageSync.syncAll();
+
+      // Doesn't push keys because keys were pushed by a previous test.
+      posts = server.getPosts();
+      equal(posts.length, 2,
+            "updating a value should trigger another push");
+      const updatePost = posts[1];
+      assertPostedUpdatedRecord(updatePost, 1000);
+      equal(updatePost.path, collectionRecordsPath(collectionId) + "/key-my_2D_key",
+            "pushing an updated value should go to the same path");
+
+      const updateEncrypted = updatePost.body.data;
+      ok(updateEncrypted.ciphertext,
+         "pushing an updated value should still be encrypted");
+      ok(!updateEncrypted.data,
+         "pushing an updated value should not have any plaintext visible");
+      equal(updateEncrypted.id, "key-my_2D_key",
+            "pushing an updated value should maintain the same ID");
+    });
+  });
+});
--- a/toolkit/components/extensions/test/xpcshell/xpcshell.ini
+++ b/toolkit/components/extensions/test/xpcshell/xpcshell.ini
@@ -53,16 +53,17 @@ skip-if = release_or_beta
 [test_ext_runtime_sendMessage_self.js]
 [test_ext_schemas.js]
 [test_ext_schemas_api_injection.js]
 [test_ext_schemas_async.js]
 [test_ext_schemas_allowed_contexts.js]
 [test_ext_simple.js]
 [test_ext_storage.js]
 [test_ext_storage_sync.js]
+head = head.js head_sync.js
 skip-if = os == "android"
 [test_ext_topSites.js]
 skip-if = os == "android"
 [test_getAPILevelForWindow.js]
 [test_ext_legacy_extension_context.js]
 [test_ext_legacy_extension_embedding.js]
 [test_locale_converter.js]
 [test_locale_data.js]