Bug 1289287 - Store Sync clients outgoing commands in a different collection until uploaded. r?markh draft
authorEdouard Oger <eoger@fastmail.com>
Thu, 11 Aug 2016 11:01:15 -0700
changeset 404138 9839581db61cd4d40127de270e0ed05a08105d0a
parent 404026 63988dbb25327ff93dd6e0c7e07b2b819795a9eb
child 529107 32db3e03da5a057545df3429c1b2cad195d11545
push id27126
push userbmo:eoger@fastmail.com
push dateMon, 22 Aug 2016 23:34:34 +0000
reviewersmarkh
bugs1289287
milestone51.0a1
Bug 1289287 - Store Sync clients outgoing commands in a different collection until uploaded. r?markh MozReview-Commit-ID: AanLQ9cYXDI
services/sync/modules-testing/fakeservices.js
services/sync/modules/engines/clients.js
services/sync/modules/util.js
services/sync/tests/unit/test_clients_engine.js
--- a/services/sync/modules-testing/fakeservices.js
+++ b/services/sync/modules-testing/fakeservices.js
@@ -31,16 +31,28 @@ this.FakeFilesystemService = function Fa
   Utils.jsonLoad = function jsonLoad(filePath, that, cb) {
     let obj;
     let json = self.fakeContents["weave/" + filePath + ".json"];
     if (json) {
       obj = JSON.parse(json);
     }
     cb.call(that, obj);
   };
+
+  Utils.jsonMove = function jsonMove(aFrom, aTo, that) {
+    const fromPath = "weave/" + aFrom + ".json";
+    self.fakeContents["weave/" + aTo + ".json"] = self.fakeContents[fromPath];
+    delete self.fakeContents[fromPath];
+    return Promise.resolve();
+  };
+
+  Utils.jsonRemove = function jsonRemove(filePath, that) {
+    delete self.fakeContents["weave/" + filePath + ".json"];
+    return Promise.resolve();
+  };
 };
 
 this.fakeSHA256HMAC = function fakeSHA256HMAC(message) {
    message = message.substr(0, 64);
    while (message.length < 64) {
      message += " ";
    }
    return message;
--- a/services/sync/modules/engines/clients.js
+++ b/services/sync/modules/engines/clients.js
@@ -1,12 +1,30 @@
 /* This Source Code Form is subject to the terms of the Mozilla Public
  * License, v. 2.0. If a copy of the MPL was not distributed with this
  * file, You can obtain one at http://mozilla.org/MPL/2.0/. */
 
+/**
+ * How does the clients engine work?
+ *
+ * - We use 2 files - commands.json and commands-syncing.json.
+ *
+ * - At sync upload time, we attempt a rename of commands.json to
+ *   commands-syncing.json, and ignore errors (helps for crash during sync!).
+ * - We load commands-syncing.json and stash the contents in
+ *   _currentlySyncingCommands which lives for the duration of the upload process.
+ * - We use _currentlySyncingCommands to build the outgoing records
+ * - Immediately after successful upload, we delete commands-syncing.json from
+ *   disk (and clear _currentlySyncingCommands). We reconcile our local records
+ *   with what we just wrote in the server, and add failed IDs commands
+ *   back in commands.json
+ * - Any time we need to "save" a command for future syncs, we load
+ *   commands.json, update it, and write it back out.
+ */
+
 this.EXPORTED_SYMBOLS = [
   "ClientEngine",
   "ClientsRec"
 ];
 
 var {classes: Cc, interfaces: Ci, utils: Cu} = Components;
 
 Cu.import("resource://services-common/async.js");
@@ -50,18 +68,18 @@ Utils.deferGetSet(ClientsRec,
                    "version", "protocols",
                    "formfactor", "os", "appPackage", "application", "device",
                    "fxaDeviceId"]);
 
 
 this.ClientEngine = function ClientEngine(service) {
   SyncEngine.call(this, "Clients", service);
 
-  // Reset the client on every startup so that we fetch recent clients
-  this._resetClient();
+  // Reset the last sync timestamp on every startup so that we fetch all clients
+  this.resetLastSync();
 }
 ClientEngine.prototype = {
   __proto__: SyncEngine.prototype,
   _storeObj: ClientStore,
   _recordObj: ClientsRec,
   _trackerObj: ClientsTracker,
 
   // Always sync client data as it controls other sync behavior
@@ -185,16 +203,64 @@ ClientEngine.prototype = {
   },
 
   isMobile: function isMobile(id) {
     if (this._store._remoteClients[id])
       return this._store._remoteClients[id].type == DEVICE_TYPE_MOBILE;
     return false;
   },
 
+  _readCommands() {
+    let cb = Async.makeSpinningCallback();
+    Utils.jsonLoad("commands", this, commands => cb(null, commands));
+    return cb.wait() || {};
+  },
+
+  /**
+   * Low level function, do not use directly (use _addClientCommand instead).
+   */
+  _saveCommands(commands) {
+    let cb = Async.makeSpinningCallback();
+    Utils.jsonSave("commands", this, commands, error => {
+      if (error) {
+        this._log.error("Failed to save JSON outgoing commands", error);
+      }
+      cb();
+    });
+    cb.wait();
+  },
+
+  _prepareCommandsForUpload() {
+    let cb = Async.makeSpinningCallback();
+    Utils.jsonMove("commands", "commands-syncing", this).catch(() => {}) // Ignore errors
+      .then(() => {
+        Utils.jsonLoad("commands-syncing", this, commands => cb(null, commands));
+      });
+    return cb.wait() || {};
+  },
+
+  _deleteUploadedCommands() {
+    delete this._currentlySyncingCommands;
+    Async.promiseSpinningly(
+      Utils.jsonRemove("commands-syncing", this).catch(err => {
+        this._log.error("Failed to delete syncing-commands file", err);
+      })
+    );
+  },
+
+  _addClientCommand(clientId, command) {
+    const allCommands = this._readCommands();
+    const clientCommands = allCommands[clientId] || [];
+    if (hasDupeCommand(clientCommands, command)) {
+      return;
+    }
+    allCommands[clientId] = clientCommands.concat(command);
+    this._saveCommands(allCommands);
+  },
+
   _syncStartup: function _syncStartup() {
     // Reupload new client record periodically.
     if (Date.now() / 1000 - this.lastRecordUpload > CLIENTS_TTL_REFRESH) {
       this._tracker.addChangedID(this.localID);
       this.lastRecordUpload = Date.now() / 1000;
     }
     SyncEngine.prototype._syncStartup.call(this);
   },
@@ -236,21 +302,60 @@ ClientEngine.prototype = {
         }
       }
     } finally {
       this._incomingClients = null;
     }
   },
 
   _uploadOutgoing() {
-    this._clearedCommands = null;
+    this._currentlySyncingCommands = this._prepareCommandsForUpload();
+    const clientWithPendingCommands = Object.keys(this._currentlySyncingCommands);
+    for (let clientId of clientWithPendingCommands) {
+      if (this._store._remoteClients[clientId] || this.localID == clientId) {
+        this._modified[clientId] = 0;
+      }
+    }
     SyncEngine.prototype._uploadOutgoing.call(this);
   },
 
   _onRecordsWritten(succeeded, failed) {
+    // Reconcile the status of the local records with what we just wrote on the
+    // server
+    for (let id of succeeded) {
+      const commandChanges = this._currentlySyncingCommands[id];
+      if (id == this.localID) {
+        if (this.localCommands) {
+          this.localCommands = this.localCommands.filter(command => !hasDupeCommand(commandChanges, command));
+        }
+      } else {
+        const clientRecord = this._store._remoteClients[id];
+        if (!commandChanges || !clientRecord) {
+          // should be impossible, else we wouldn't have been writing it.
+          this._log.warn("No command/No record changes for a client we uploaded");
+          continue;
+        }
+        // fixup the client record, so our copy of _remoteClients matches what we uploaded.
+        clientRecord.commands = this._store.createRecord(id);
+        // we could do better and pass the reference to the record we just uploaded,
+        // but this will do for now
+      }
+    }
+
+    // Re-add failed commands
+    for (let id of failed) {
+      const commandChanges = this._currentlySyncingCommands[id];
+      if (!commandChanges) {
+        continue;
+      }
+      this._addClientCommand(id, commandChanges);
+    }
+
+    this._deleteUploadedCommands();
+
     // Notify other devices that their own client collection changed
     const idsToNotify = succeeded.reduce((acc, id) => {
       if (id == this.localID) {
         return acc;
       }
       const fxaDeviceId = this.getClientFxaDeviceId(id);
       return fxaDeviceId ? acc.concat(fxaDeviceId) : acc;
     }, []);
@@ -317,16 +422,21 @@ ClientEngine.prototype = {
   _resetClient() {
     this._wipeClient();
   },
 
   _wipeClient: function _wipeClient() {
     SyncEngine.prototype._resetClient.call(this);
     delete this.localCommands;
     this._store.wipe();
+    const logRemoveError = err => this._log.warn("Could not delete json file", err);
+    Async.promiseSpinningly(
+      Utils.jsonRemove("commands", this).catch(logRemoveError)
+        .then(Utils.jsonRemove("commands-syncing", this).catch(logRemoveError))
+    );
   },
 
   removeClientData: function removeClientData() {
     let res = this.service.resource(this.engineURL + "/" + this.localID);
     res.delete();
   },
 
   // Override the default behavior to delete bad records from the server.
@@ -355,30 +465,16 @@ ClientEngine.prototype = {
     resetEngine: { args: 1, desc: "Clear temporary local data for engine" },
     wipeAll:     { args: 0, desc: "Delete all client data for all engines" },
     wipeEngine:  { args: 1, desc: "Delete all client data for engine" },
     logout:      { args: 0, desc: "Log out client" },
     displayURI:  { args: 3, desc: "Instruct a client to display a URI" },
   },
 
   /**
-   * Remove any commands for the local client and mark it for upload.
-   */
-  clearCommands: function clearCommands() {
-    if (!this._clearedCommands) {
-      this._clearedCommands = [];
-    }
-    // Keep track of cleared local commands until the next sync, so that we
-    // don't reupload them.
-    this._clearedCommands = this._clearedCommands.concat(this.localCommands);
-    delete this.localCommands;
-    this._tracker.addChangedID(this.localID);
-  },
-
-  /**
    * Sends a command+args pair to a specific client.
    *
    * @param command Command string
    * @param args Array of arguments/data for command
    * @param clientId Client to send command to
    */
   _sendCommandToClient: function sendCommandToClient(command, args, clientId) {
     this._log.trace("Sending " + command + " to " + clientId);
@@ -391,51 +487,39 @@ ClientEngine.prototype = {
       throw new Error("Stale remote client ID: '" + clientId + "'.");
     }
 
     let action = {
       command: command,
       args: args,
     };
 
-    if (!client.commands) {
-      client.commands = [action];
-    }
-    // Add the new action if there are no duplicates.
-    else if (!hasDupeCommand(client.commands, action)) {
-      client.commands.push(action);
-    }
-    // It must be a dupe. Skip.
-    else {
-      return;
-    }
-
     this._log.trace("Client " + clientId + " got a new action: " + [command, args]);
+    this._addClientCommand(clientId, action);
     this._tracker.addChangedID(clientId);
   },
 
   /**
    * Check if the local client has any remote commands and perform them.
    *
    * @return false to abort sync
    */
   processIncomingCommands: function processIncomingCommands() {
     return this._notify("clients:process-commands", "", function() {
-      let commands = this.localCommands;
-
-      // Immediately clear out the commands as we've got them locally.
-      this.clearCommands();
-
-      // Process each command in order.
-      if (!commands) {
+      if (!this.localCommands) {
         return true;
       }
+
+      const clearedCommands = this._readCommands()[this.localID];
+      const commands = this.localCommands.filter(command => !hasDupeCommand(clearedCommands, command));
+
       let URIsToDisplay = [];
-      for (let key in commands) {
-        let {command, args} = commands[key];
+      // Process each command in order.
+      for (let rawCommand of commands) {
+        let {command, args} = rawCommand;
         this._log.debug("Processing command: " + command + "(" + args + ")");
 
         let engines = [args[0]];
         switch (command) {
           case "resetAll":
             engines = null;
             // Fallthrough
           case "resetEngine":
@@ -453,17 +537,21 @@ ClientEngine.prototype = {
           case "displayURI":
             let [uri, clientId, title] = args;
             URIsToDisplay.push({ uri, clientId, title });
             break;
           default:
             this._log.debug("Received an unknown command: " + command);
             break;
         }
+        // Add the command to the "cleared" commands list
+        this._addClientCommand(this.localID, rawCommand)
       }
+      this._tracker.addChangedID(this.localID);
+
       if (URIsToDisplay.length) {
         this._handleDisplayURIs(URIsToDisplay);
       }
 
       return true;
     })();
   },
 
@@ -566,101 +654,76 @@ ClientEngine.prototype = {
 };
 
 function ClientStore(name, engine) {
   Store.call(this, name, engine);
 }
 ClientStore.prototype = {
   __proto__: Store.prototype,
 
+  _remoteClients: {},
+
   create(record) {
-    this.update(record)
+    this.update(record);
   },
 
   update: function update(record) {
     if (record.id == this.engine.localID) {
-      this._updateLocalRecord(record);
+      // Only grab commands from the server; local name/type always wins
+      this.engine.localCommands = record.commands;
     } else {
-      this._updateRemoteRecord(record);
-    }
-  },
-
-  _updateLocalRecord(record) {
-    // Local changes for our client means we're clearing commands or
-    // uploading a new record.
-    let incomingCommands = record.commands;
-    if (incomingCommands) {
-      // Filter out incoming commands that we've cleared.
-      incomingCommands = incomingCommands.filter(action =>
-        !hasDupeCommand(this.engine._clearedCommands, action));
-      if (!incomingCommands.length) {
-        // Use `undefined` instead of `null` to avoid creating a null field
-        // in the uploaded record.
-        incomingCommands = undefined;
-      }
+      this._remoteClients[record.id] = record.cleartext;
     }
-    // Only grab commands from the server; local name/type always wins
-    this.engine.localCommands = incomingCommands;
-  },
-
-  _updateRemoteRecord(record) {
-    let currentRecord = this._remoteClients[record.id];
-    if (!currentRecord || !currentRecord.commands ||
-        !(record.id in this.engine._modified)) {
-
-      // If we have a new incoming record or no outgoing commands, use the
-      // full incoming record from the server.
-      this._remoteClients[record.id] = record.cleartext;
-      return;
-    }
-
-    // Otherwise, we have outgoing commands for a client, so merge them
-    // with the commands that we downloaded from the server.
-    for (let action of currentRecord.commands) {
-      if (hasDupeCommand(record.cleartext.commands, action)) {
-        // Ignore commands the server already knows about.
-        continue;
-      }
-      if (record.cleartext.commands) {
-        record.cleartext.commands.push(action);
-      } else {
-        record.cleartext.commands = [action];
-      }
-    }
-    this._remoteClients[record.id] = record.cleartext;
   },
 
   createRecord: function createRecord(id, collection) {
     let record = new ClientsRec(collection, id);
 
+    const commandsChanges = this.engine._currentlySyncingCommands ?
+                            this.engine._currentlySyncingCommands[id] :
+                            [];
+
     // Package the individual components into a record for the local client
     if (id == this.engine.localID) {
       let cb = Async.makeSpinningCallback();
       fxAccounts.getDeviceId().then(id => cb(null, id), cb);
       try {
         record.fxaDeviceId = cb.wait();
       } catch(error) {
         this._log.warn("failed to get fxa device id", error);
       }
       record.name = this.engine.localName;
       record.type = this.engine.localType;
-      record.commands = this.engine.localCommands;
       record.version = Services.appinfo.version;
       record.protocols = SUPPORTED_PROTOCOL_VERSIONS;
 
+      // Substract the commands we recorded that we've already executed
+      if (commandsChanges && commandsChanges.length &&
+          this.engine.localCommands && this.engine.localCommands.length) {
+        record.commands = this.engine.localCommands.filter(command => !hasDupeCommand(commandsChanges, command));
+      }
+
       // Optional fields.
       record.os = Services.appinfo.OS;             // "Darwin"
       record.appPackage = Services.appinfo.ID;
       record.application = this.engine.brandName   // "Nightly"
 
       // We can't compute these yet.
       // record.device = "";            // Bug 1100723
       // record.formfactor = "";        // Bug 1100722
     } else {
       record.cleartext = this._remoteClients[id];
+
+      // Add the commands we have to send
+      if (commandsChanges && commandsChanges.length) {
+        const recordCommands = record.cleartext.commands || [];
+        const newCommands = commandsChanges.filter(command => !hasDupeCommand(recordCommands, command));
+        record.cleartext.commands = recordCommands.concat(newCommands);
+      }
+
       if (record.cleartext.stale) {
         // It's almost certainly a logic error for us to upload a record we
         // consider stale, so make log noise, but still remove the flag.
         this._log.error(`Preparing to upload record ${id} that we consider stale`);
         delete record.cleartext.stale;
       }
     }
 
--- a/services/sync/modules/util.js
+++ b/services/sync/modules/util.js
@@ -404,16 +404,62 @@ this.Utils = {
       error = e
     }
 
     if (typeof callback == "function") {
       callback.call(that, error);
     }
   }),
 
+  /**
+   * Move a json file in the profile directory. Will fail if a file exists at the
+   * destination.
+   *
+   * @returns a promise that resolves to undefined on success, or rejects on failure
+   *
+   * @param aFrom
+   *        Current path to the JSON file saved on disk, relative to profileDir/weave
+   *        .json will be appended to the file name.
+   * @param aTo
+   *        New path to the JSON file saved on disk, relative to profileDir/weave
+   *        .json will be appended to the file name.
+   * @param that
+   *        Object to use for logging
+   */
+  jsonMove(aFrom, aTo, that) {
+    let pathFrom = OS.Path.join(OS.Constants.Path.profileDir, "weave",
+                                ...(aFrom + ".json").split("/"));
+    let pathTo = OS.Path.join(OS.Constants.Path.profileDir, "weave",
+                              ...(aTo + ".json").split("/"));
+    if (that._log) {
+      that._log.trace("Moving " + pathFrom + " to " + pathTo);
+    }
+    return OS.File.move(pathFrom, pathTo, { noOverwrite: true });
+  },
+
+  /**
+   * Removes a json file in the profile directory.
+   *
+   * @returns a promise that resolves to undefined on success, or rejects on failure
+   *
+   * @param filePath
+   *        Current path to the JSON file saved on disk, relative to profileDir/weave
+   *        .json will be appended to the file name.
+   * @param that
+   *        Object to use for logging
+   */
+  jsonRemove(filePath, that) {
+    let path = OS.Path.join(OS.Constants.Path.profileDir, "weave",
+                            ...(filePath + ".json").split("/"));
+    if (that._log) {
+      that._log.trace("Deleting " + path);
+    }
+    return OS.File.remove(path, { ignoreAbsent: true });
+  },
+
   getErrorString: function Utils_getErrorString(error, args) {
     try {
       return Str.errors.get(error, args || null);
     } catch (e) {}
 
     // basically returns "Unknown Error"
     return Str.errors.get("error.reason.unknown");
   },
--- a/services/sync/tests/unit/test_clients_engine.js
+++ b/services/sync/tests/unit/test_clients_engine.js
@@ -364,23 +364,24 @@ add_test(function test_send_command() {
   let remoteRecord = store.createRecord(remoteId, "clients");
 
   let action = "testCommand";
   let args = ["foo", "bar"];
 
   engine._sendCommandToClient(action, args, remoteId);
 
   let newRecord = store._remoteClients[remoteId];
+  let clientCommands = engine._readCommands()[remoteId];
   notEqual(newRecord, undefined);
-  equal(newRecord.commands.length, 1);
+  equal(clientCommands.length, 1);
 
-  let command = newRecord.commands[0];
+  let command = clientCommands[0];
   equal(command.command, action);
   equal(command.args.length, 2);
-  equal(command.args, args);
+  deepEqual(command.args, args);
 
   notEqual(tracker.changedIDs[remoteId], undefined);
 
   run_next_test();
 });
 
 add_test(function test_command_validation() {
   _("Verifies that command validation works properly.");
@@ -408,29 +409,31 @@ add_test(function test_command_validatio
     store.create(rec);
     store.createRecord(remoteId, "clients");
 
     engine.sendCommand(action, args, remoteId);
 
     let newRecord = store._remoteClients[remoteId];
     notEqual(newRecord, undefined);
 
+    let clientCommands = engine._readCommands()[remoteId];
+
     if (expectedResult) {
       _("Ensuring command is sent: " + action);
-      equal(newRecord.commands.length, 1);
+      equal(clientCommands.length, 1);
 
-      let command = newRecord.commands[0];
+      let command = clientCommands[0];
       equal(command.command, action);
-      equal(command.args, args);
+      deepEqual(command.args, args);
 
       notEqual(engine._tracker, undefined);
       notEqual(engine._tracker.changedIDs[remoteId], undefined);
     } else {
       _("Ensuring command is scrubbed: " + action);
-      equal(newRecord.commands, undefined);
+      equal(clientCommands, undefined);
 
       if (store._tracker) {
         equal(engine._tracker[remoteId], undefined);
       }
     }
 
   }
   run_next_test();
@@ -447,29 +450,31 @@ add_test(function test_command_duplicati
 
   let action = "resetAll";
   let args = [];
 
   engine.sendCommand(action, args, remoteId);
   engine.sendCommand(action, args, remoteId);
 
   let newRecord = store._remoteClients[remoteId];
-  equal(newRecord.commands.length, 1);
+  let clientCommands = engine._readCommands()[remoteId];
+  equal(clientCommands.length, 1);
 
   _("Check variant args length");
-  newRecord.commands = [];
+  engine._saveCommands({});
 
   action = "resetEngine";
   engine.sendCommand(action, [{ x: "foo" }], remoteId);
   engine.sendCommand(action, [{ x: "bar" }], remoteId);
 
   _("Make sure we spot a real dupe argument.");
   engine.sendCommand(action, [{ x: "bar" }], remoteId);
 
-  equal(newRecord.commands.length, 2);
+  clientCommands = engine._readCommands()[remoteId];
+  equal(clientCommands.length, 2);
 
   run_next_test();
 });
 
 add_test(function test_command_invalid_client() {
   _("Ensures invalid client IDs are caught");
 
   let id = Utils.makeGUID();
@@ -703,17 +708,18 @@ add_test(function test_command_sync() {
 
     _("Checking remote record was downloaded.");
     let clientRecord = engine._store._remoteClients[remoteId];
     notEqual(clientRecord, undefined);
     equal(clientRecord.commands.length, 0);
 
     _("Send a command to the remote client.");
     engine.sendCommand("wipeAll", []);
-    equal(clientRecord.commands.length, 1);
+    let clientCommands = engine._readCommands()[remoteId];
+    equal(clientCommands.length, 1);
     engine._sync();
 
     _("Checking record was uploaded.");
     notEqual(clientWBO(engine.localID).payload, undefined);
     ok(engine.lastRecordUpload > 0);
 
     notEqual(clientWBO(remoteId).payload, undefined);
 
@@ -759,19 +765,20 @@ add_test(function test_send_uri_to_clien
 
   let uri = "http://www.mozilla.org/";
   let title = "Title of the Page";
   engine.sendURIToClientForDisplay(uri, remoteId, title);
 
   let newRecord = store._remoteClients[remoteId];
 
   notEqual(newRecord, undefined);
-  equal(newRecord.commands.length, 1);
+  let clientCommands = engine._readCommands()[remoteId];
+  equal(clientCommands.length, 1);
 
-  let command = newRecord.commands[0];
+  let command = clientCommands[0];
   equal(command.command, "displayURI");
   equal(command.args.length, 3);
   equal(command.args[0], uri);
   equal(command.args[1], engine.localID);
   equal(command.args[2], title);
 
   ok(tracker.score > initialScore);
   ok(tracker.score - initialScore >= SCORE_INCREMENT_XLARGE);
@@ -937,16 +944,295 @@ add_test(function test_merge_commands() 
     try {
       server.deleteCollections("foo");
     } finally {
       server.stop(run_next_test);
     }
   }
 });
 
+add_test(function test_duplicate_remote_commands() {
+  _("Verifies local commands for remote clients are sent only once (bug 1289287)");
+
+  let now = Date.now() / 1000;
+  let contents = {
+    meta: {global: {engines: {clients: {version: engine.version,
+                                        syncID: engine.syncID}}}},
+    clients: {},
+    crypto: {}
+  };
+  let server = serverForUsers({"foo": "password"}, contents);
+  let user   = server.user("foo");
+
+  new SyncTestingInfrastructure(server.server);
+  generateNewKeys(Service.collectionKeys);
+
+  let desktopID = Utils.makeGUID();
+  server.insertWBO("foo", "clients", new ServerWBO(desktopID, encryptPayload({
+    id: desktopID,
+    name: "Desktop client",
+    type: "desktop",
+    commands: [],
+    version: "48",
+    protocols: ["1.5"],
+  }), now - 10));
+
+  try {
+    let store = engine._store;
+
+    _("First sync. 1 record downloaded.");
+    strictEqual(engine.lastRecordUpload, 0);
+    engine._sync();
+
+    _("Send tab to client");
+    engine.sendCommand("displayURI", ["https://example.com", engine.localID, "Yak Herders Anonymous"]);
+    engine._sync();
+
+    _("Simulate the desktop client consuming the command and syncing to the server");
+    server.insertWBO("foo", "clients", new ServerWBO(desktopID, encryptPayload({
+      id: desktopID,
+      name: "Desktop client",
+      type: "desktop",
+      commands: [],
+      version: "48",
+      protocols: ["1.5"],
+    }), now - 10));
+
+    _("Send another tab to the desktop client");
+    engine.sendCommand("displayURI", ["https://foobar.com", engine.localID, "Foo bar!"], desktopID);
+    engine._sync();
+
+    let collection = server.getCollection("foo", "clients");
+    let desktopPayload = JSON.parse(JSON.parse(collection.payload(desktopID)).ciphertext);
+    deepEqual(desktopPayload.commands, [{
+      command: "displayURI",
+      args: ["https://foobar.com", engine.localID, "Foo bar!"],
+    }], "Should only send the second command to the desktop client");
+  } finally {
+    Svc.Prefs.resetBranch("");
+    Service.recordManager.clearCache();
+    engine._resetClient();
+
+    try {
+      server.deleteCollections("foo");
+    } finally {
+      server.stop(run_next_test);
+    }
+  }
+});
+
+add_test(function test_upload_after_reboot() {
+  _("Multiple downloads, reboot, then upload (bug 1289287)");
+
+  let now = Date.now() / 1000;
+  let contents = {
+    meta: {global: {engines: {clients: {version: engine.version,
+                                        syncID: engine.syncID}}}},
+    clients: {},
+    crypto: {}
+  };
+  let server = serverForUsers({"foo": "password"}, contents);
+  let user   = server.user("foo");
+
+  new SyncTestingInfrastructure(server.server);
+  generateNewKeys(Service.collectionKeys);
+
+  let deviceBID = Utils.makeGUID();
+  let deviceCID = Utils.makeGUID();
+  server.insertWBO("foo", "clients", new ServerWBO(deviceBID, encryptPayload({
+    id: deviceBID,
+    name: "Device B",
+    type: "desktop",
+    commands: [{
+      command: "displayURI", args: ["https://deviceclink.com", deviceCID, "Device C link"]
+    }],
+    version: "48",
+    protocols: ["1.5"],
+  }), now - 10));
+  server.insertWBO("foo", "clients", new ServerWBO(deviceCID, encryptPayload({
+    id: deviceCID,
+    name: "Device C",
+    type: "desktop",
+    commands: [],
+    version: "48",
+    protocols: ["1.5"],
+  }), now - 10));
+
+  try {
+    let store = engine._store;
+
+    _("First sync. 2 records downloaded.");
+    strictEqual(engine.lastRecordUpload, 0);
+    engine._sync();
+
+    _("Send tab to client");
+    engine.sendCommand("displayURI", ["https://example.com", engine.localID, "Yak Herders Anonymous"], deviceBID);
+
+    const oldUploadOutgoing = SyncEngine.prototype._uploadOutgoing;
+    SyncEngine.prototype._uploadOutgoing = () => engine._onRecordsWritten.call(engine, [], [deviceBID]);
+    engine._sync();
+
+    let collection = server.getCollection("foo", "clients");
+    let deviceBPayload = JSON.parse(JSON.parse(collection.payload(deviceBID)).ciphertext);
+    deepEqual(deviceBPayload.commands, [{
+      command: "displayURI", args: ["https://deviceclink.com", deviceCID, "Device C link"]
+    }], "Should be the same because the upload failed");
+
+    _("Simulate the client B consuming the command and syncing to the server");
+    server.insertWBO("foo", "clients", new ServerWBO(deviceBID, encryptPayload({
+      id: deviceBID,
+      name: "Device B",
+      type: "desktop",
+      commands: [],
+      version: "48",
+      protocols: ["1.5"],
+    }), now - 10));
+
+    // Simulate reboot
+    SyncEngine.prototype._uploadOutgoing = oldUploadOutgoing;
+    engine = Service.clientsEngine = new ClientEngine(Service);
+
+    engine._sync();
+
+    deviceBPayload = JSON.parse(JSON.parse(collection.payload(deviceBID)).ciphertext);
+    deepEqual(deviceBPayload.commands, [{
+      command: "displayURI",
+      args: ["https://example.com", engine.localID, "Yak Herders Anonymous"],
+    }], "Should only had written our outgoing command");
+  } finally {
+    Svc.Prefs.resetBranch("");
+    Service.recordManager.clearCache();
+    engine._resetClient();
+
+    try {
+      server.deleteCollections("foo");
+    } finally {
+      server.stop(run_next_test);
+    }
+  }
+});
+
+add_test(function test_keep_cleared_commands_after_reboot() {
+  _("Download commands, fail upload, reboot, then apply new commands (bug 1289287)");
+
+  let now = Date.now() / 1000;
+  let contents = {
+    meta: {global: {engines: {clients: {version: engine.version,
+                                        syncID: engine.syncID}}}},
+    clients: {},
+    crypto: {}
+  };
+  let server = serverForUsers({"foo": "password"}, contents);
+  let user   = server.user("foo");
+
+  new SyncTestingInfrastructure(server.server);
+  generateNewKeys(Service.collectionKeys);
+
+  let deviceBID = Utils.makeGUID();
+  let deviceCID = Utils.makeGUID();
+  server.insertWBO("foo", "clients", new ServerWBO(engine.localID, encryptPayload({
+    id: engine.localID,
+    name: "Device A",
+    type: "desktop",
+    commands: [{
+      command: "displayURI", args: ["https://deviceblink.com", deviceBID, "Device B link"]
+    },
+    {
+      command: "displayURI", args: ["https://deviceclink.com", deviceCID, "Device C link"]
+    }],
+    version: "48",
+    protocols: ["1.5"],
+  }), now - 10));
+  server.insertWBO("foo", "clients", new ServerWBO(deviceBID, encryptPayload({
+    id: deviceBID,
+    name: "Device B",
+    type: "desktop",
+    commands: [],
+    version: "48",
+    protocols: ["1.5"],
+  }), now - 10));
+  server.insertWBO("foo", "clients", new ServerWBO(deviceCID, encryptPayload({
+    id: deviceCID,
+    name: "Device C",
+    type: "desktop",
+    commands: [],
+    version: "48",
+    protocols: ["1.5"],
+  }), now - 10));
+
+  try {
+    let store = engine._store;
+
+    _("First sync. Download remote and our record.");
+    strictEqual(engine.lastRecordUpload, 0);
+
+    let collection = server.getCollection("foo", "clients");
+    const oldUploadOutgoing = SyncEngine.prototype._uploadOutgoing;
+    SyncEngine.prototype._uploadOutgoing = () => engine._onRecordsWritten.call(engine, [], [deviceBID]);
+    let commandsProcessed = 0;
+    engine._handleDisplayURIs = (uris) => { commandsProcessed = uris.length };
+
+    engine._sync();
+    engine.processIncomingCommands(); // Not called by the engine.sync(), gotta call it ourselves
+    equal(commandsProcessed, 2, "We processed 2 commands");
+
+    let localRemoteRecord = JSON.parse(JSON.parse(collection.payload(engine.localID)).ciphertext);
+    deepEqual(localRemoteRecord.commands, [{
+      command: "displayURI", args: ["https://deviceblink.com", deviceBID, "Device B link"]
+    },
+    {
+      command: "displayURI", args: ["https://deviceclink.com", deviceCID, "Device C link"]
+    }], "Should be the same because the upload failed");
+
+    // Another client sends another link
+    server.insertWBO("foo", "clients", new ServerWBO(engine.localID, encryptPayload({
+      id: engine.localID,
+      name: "Device A",
+      type: "desktop",
+      commands: [{
+        command: "displayURI", args: ["https://deviceblink.com", deviceBID, "Device B link"]
+      },
+      {
+        command: "displayURI", args: ["https://deviceclink.com", deviceCID, "Device C link"]
+      },
+      {
+        command: "displayURI", args: ["https://deviceclink2.com", deviceCID, "Device C link 2"]
+      }],
+      version: "48",
+      protocols: ["1.5"],
+    }), now - 10));
+
+    // Simulate reboot
+    SyncEngine.prototype._uploadOutgoing = oldUploadOutgoing;
+    engine = Service.clientsEngine = new ClientEngine(Service);
+
+    commandsProcessed = 0;
+    engine._handleDisplayURIs = (uris) => { commandsProcessed = uris.length };
+    engine._sync();
+    engine.processIncomingCommands();
+    equal(commandsProcessed, 1, "We processed one command (the other were cleared)");
+
+    localRemoteRecord = JSON.parse(JSON.parse(collection.payload(deviceBID)).ciphertext);
+    deepEqual(localRemoteRecord.commands, [], "Should be empty");
+  } finally {
+    Svc.Prefs.resetBranch("");
+    Service.recordManager.clearCache();
+
+    // Reset service (remove mocks)
+    engine = Service.clientsEngine = new ClientEngine(Service);
+    engine._resetClient();
+
+    try {
+      server.deleteCollections("foo");
+    } finally {
+      server.stop(run_next_test);
+    }
+  }
+});
+
 add_test(function test_deleted_commands() {
   _("Verifies commands for a deleted client are discarded");
 
   let now = Date.now() / 1000;
   let contents = {
     meta: {global: {engines: {clients: {version: engine.version,
                                         syncID: engine.syncID}}}},
     clients: {},
@@ -1045,23 +1331,27 @@ add_test(function test_send_uri_ack() {
 
     _("Sync again");
     engine._sync();
     deepEqual(engine.localCommands, [{
       command: "displayURI",
       args: ["https://example.com", fakeSenderID, "Yak Herders Anonymous"],
     }], "Should receive incoming URI");
     ok(engine.processIncomingCommands(), "Should process incoming commands");
-    ok(!engine.localCommands, "Should clear commands after processing");
+    const clearedCommands = engine._readCommands()[engine.localID];
+    deepEqual(clearedCommands, [{
+      command: "displayURI",
+      args: ["https://example.com", fakeSenderID, "Yak Herders Anonymous"],
+    }], "Should mark the commands as cleared after processing");
 
     _("Check that the command was removed on the server");
     engine._sync();
     ourPayload = JSON.parse(JSON.parse(collection.payload(engine.localID)).ciphertext);
     ok(ourPayload, "Should upload the synced client record");
-    ok(!ourPayload.commands, "Should not reupload cleared commands");
+    deepEqual(ourPayload.commands, [], "Should not reupload cleared commands");
   } finally {
     Svc.Prefs.resetBranch("");
     Service.recordManager.clearCache();
     engine._resetClient();
 
     try {
       server.deleteCollections("foo");
     } finally {