--- a/services/sync/modules/engines/clients.js
+++ b/services/sync/modules/engines/clients.js
@@ -1,12 +1,30 @@
/* This Source Code Form is subject to the terms of the Mozilla Public
* License, v. 2.0. If a copy of the MPL was not distributed with this
* file, You can obtain one at http://mozilla.org/MPL/2.0/. */
+/**
+ * How does the clients engine work?
+ *
+ * - We use 2 files - commands.json and commands-syncing.json.
+ *
+ * - At sync upload time, we attempt a rename of commands.json to
+ * commands-syncing.json, and ignore errors (helps for crash during sync!).
+ * - We load commands-syncing.json and stash the contents in
+ * _currentlySyncingCommands which lives for the duration of the upload process.
+ * - We use _currentlySyncingCommands to build the outgoing records
+ * - Immediately after successful upload, we delete commands-syncing.json from
+ * disk (and clear _currentlySyncingCommands). We reconcile our local records
+ * with what we just wrote in the server, and add failed IDs commands
+ * back in commands.json
+ * - Any time we need to "save" a command for future syncs, we load
+ * commands.json, update it, and write it back out.
+ */
+
this.EXPORTED_SYMBOLS = [
"ClientEngine",
"ClientsRec"
];
var {classes: Cc, interfaces: Ci, utils: Cu} = Components;
Cu.import("resource://services-common/async.js");
@@ -50,18 +68,18 @@ Utils.deferGetSet(ClientsRec,
"version", "protocols",
"formfactor", "os", "appPackage", "application", "device",
"fxaDeviceId"]);
this.ClientEngine = function ClientEngine(service) {
SyncEngine.call(this, "Clients", service);
- // Reset the client on every startup so that we fetch recent clients
- this._resetClient();
+ // Reset the last sync timestamp on every startup so that we fetch all clients
+ this.resetLastSync();
}
ClientEngine.prototype = {
__proto__: SyncEngine.prototype,
_storeObj: ClientStore,
_recordObj: ClientsRec,
_trackerObj: ClientsTracker,
// Always sync client data as it controls other sync behavior
@@ -185,16 +203,64 @@ ClientEngine.prototype = {
},
isMobile: function isMobile(id) {
if (this._store._remoteClients[id])
return this._store._remoteClients[id].type == DEVICE_TYPE_MOBILE;
return false;
},
+ _readCommands() {
+ let cb = Async.makeSpinningCallback();
+ Utils.jsonLoad("commands", this, commands => cb(null, commands));
+ return cb.wait() || {};
+ },
+
+ /**
+ * Low level function, do not use directly (use _addClientCommand instead).
+ */
+ _saveCommands(commands) {
+ let cb = Async.makeSpinningCallback();
+ Utils.jsonSave("commands", this, commands, error => {
+ if (error) {
+ this._log.error("Failed to save JSON outgoing commands", error);
+ }
+ cb();
+ });
+ cb.wait();
+ },
+
+ _prepareCommandsForUpload() {
+ let cb = Async.makeSpinningCallback();
+ Utils.jsonMove("commands", "commands-syncing", this).catch(() => {}) // Ignore errors
+ .then(() => {
+ Utils.jsonLoad("commands-syncing", this, commands => cb(null, commands));
+ });
+ return cb.wait() || {};
+ },
+
+ _deleteUploadedCommands() {
+ delete this._currentlySyncingCommands;
+ Async.promiseSpinningly(
+ Utils.jsonRemove("commands-syncing", this).catch(err => {
+ this._log.error("Failed to delete syncing-commands file", err);
+ })
+ );
+ },
+
+ _addClientCommand(clientId, command) {
+ const allCommands = this._readCommands();
+ const clientCommands = allCommands[clientId] || [];
+ if (hasDupeCommand(clientCommands, command)) {
+ return;
+ }
+ allCommands[clientId] = clientCommands.concat(command);
+ this._saveCommands(allCommands);
+ },
+
_syncStartup: function _syncStartup() {
// Reupload new client record periodically.
if (Date.now() / 1000 - this.lastRecordUpload > CLIENTS_TTL_REFRESH) {
this._tracker.addChangedID(this.localID);
this.lastRecordUpload = Date.now() / 1000;
}
SyncEngine.prototype._syncStartup.call(this);
},
@@ -236,21 +302,60 @@ ClientEngine.prototype = {
}
}
} finally {
this._incomingClients = null;
}
},
_uploadOutgoing() {
- this._clearedCommands = null;
+ this._currentlySyncingCommands = this._prepareCommandsForUpload();
+ const clientWithPendingCommands = Object.keys(this._currentlySyncingCommands);
+ for (let clientId of clientWithPendingCommands) {
+ if (this._store._remoteClients[clientId] || this.localID == clientId) {
+ this._modified[clientId] = 0;
+ }
+ }
SyncEngine.prototype._uploadOutgoing.call(this);
},
_onRecordsWritten(succeeded, failed) {
+ // Reconcile the status of the local records with what we just wrote on the
+ // server
+ for (let id of succeeded) {
+ const commandChanges = this._currentlySyncingCommands[id];
+ if (id == this.localID) {
+ if (this.localCommands) {
+ this.localCommands = this.localCommands.filter(command => !hasDupeCommand(commandChanges, command));
+ }
+ } else {
+ const clientRecord = this._store._remoteClients[id];
+ if (!commandChanges || !clientRecord) {
+ // should be impossible, else we wouldn't have been writing it.
+ this._log.warn("No command/No record changes for a client we uploaded");
+ continue;
+ }
+ // fixup the client record, so our copy of _remoteClients matches what we uploaded.
+ clientRecord.commands = this._store.createRecord(id);
+ // we could do better and pass the reference to the record we just uploaded,
+ // but this will do for now
+ }
+ }
+
+ // Re-add failed commands
+ for (let id of failed) {
+ const commandChanges = this._currentlySyncingCommands[id];
+ if (!commandChanges) {
+ continue;
+ }
+ this._addClientCommand(id, commandChanges);
+ }
+
+ this._deleteUploadedCommands();
+
// Notify other devices that their own client collection changed
const idsToNotify = succeeded.reduce((acc, id) => {
if (id == this.localID) {
return acc;
}
const fxaDeviceId = this.getClientFxaDeviceId(id);
return fxaDeviceId ? acc.concat(fxaDeviceId) : acc;
}, []);
@@ -317,16 +422,21 @@ ClientEngine.prototype = {
_resetClient() {
this._wipeClient();
},
_wipeClient: function _wipeClient() {
SyncEngine.prototype._resetClient.call(this);
delete this.localCommands;
this._store.wipe();
+ const logRemoveError = err => this._log.warn("Could not delete json file", err);
+ Async.promiseSpinningly(
+ Utils.jsonRemove("commands", this).catch(logRemoveError)
+ .then(Utils.jsonRemove("commands-syncing", this).catch(logRemoveError))
+ );
},
removeClientData: function removeClientData() {
let res = this.service.resource(this.engineURL + "/" + this.localID);
res.delete();
},
// Override the default behavior to delete bad records from the server.
@@ -355,30 +465,16 @@ ClientEngine.prototype = {
resetEngine: { args: 1, desc: "Clear temporary local data for engine" },
wipeAll: { args: 0, desc: "Delete all client data for all engines" },
wipeEngine: { args: 1, desc: "Delete all client data for engine" },
logout: { args: 0, desc: "Log out client" },
displayURI: { args: 3, desc: "Instruct a client to display a URI" },
},
/**
- * Remove any commands for the local client and mark it for upload.
- */
- clearCommands: function clearCommands() {
- if (!this._clearedCommands) {
- this._clearedCommands = [];
- }
- // Keep track of cleared local commands until the next sync, so that we
- // don't reupload them.
- this._clearedCommands = this._clearedCommands.concat(this.localCommands);
- delete this.localCommands;
- this._tracker.addChangedID(this.localID);
- },
-
- /**
* Sends a command+args pair to a specific client.
*
* @param command Command string
* @param args Array of arguments/data for command
* @param clientId Client to send command to
*/
_sendCommandToClient: function sendCommandToClient(command, args, clientId) {
this._log.trace("Sending " + command + " to " + clientId);
@@ -391,51 +487,39 @@ ClientEngine.prototype = {
throw new Error("Stale remote client ID: '" + clientId + "'.");
}
let action = {
command: command,
args: args,
};
- if (!client.commands) {
- client.commands = [action];
- }
- // Add the new action if there are no duplicates.
- else if (!hasDupeCommand(client.commands, action)) {
- client.commands.push(action);
- }
- // It must be a dupe. Skip.
- else {
- return;
- }
-
this._log.trace("Client " + clientId + " got a new action: " + [command, args]);
+ this._addClientCommand(clientId, action);
this._tracker.addChangedID(clientId);
},
/**
* Check if the local client has any remote commands and perform them.
*
* @return false to abort sync
*/
processIncomingCommands: function processIncomingCommands() {
return this._notify("clients:process-commands", "", function() {
- let commands = this.localCommands;
-
- // Immediately clear out the commands as we've got them locally.
- this.clearCommands();
-
- // Process each command in order.
- if (!commands) {
+ if (!this.localCommands) {
return true;
}
+
+ const clearedCommands = this._readCommands()[this.localID];
+ const commands = this.localCommands.filter(command => !hasDupeCommand(clearedCommands, command));
+
let URIsToDisplay = [];
- for (let key in commands) {
- let {command, args} = commands[key];
+ // Process each command in order.
+ for (let rawCommand of commands) {
+ let {command, args} = rawCommand;
this._log.debug("Processing command: " + command + "(" + args + ")");
let engines = [args[0]];
switch (command) {
case "resetAll":
engines = null;
// Fallthrough
case "resetEngine":
@@ -453,17 +537,21 @@ ClientEngine.prototype = {
case "displayURI":
let [uri, clientId, title] = args;
URIsToDisplay.push({ uri, clientId, title });
break;
default:
this._log.debug("Received an unknown command: " + command);
break;
}
+ // Add the command to the "cleared" commands list
+ this._addClientCommand(this.localID, rawCommand)
}
+ this._tracker.addChangedID(this.localID);
+
if (URIsToDisplay.length) {
this._handleDisplayURIs(URIsToDisplay);
}
return true;
})();
},
@@ -566,101 +654,76 @@ ClientEngine.prototype = {
};
function ClientStore(name, engine) {
Store.call(this, name, engine);
}
ClientStore.prototype = {
__proto__: Store.prototype,
+ _remoteClients: {},
+
create(record) {
- this.update(record)
+ this.update(record);
},
update: function update(record) {
if (record.id == this.engine.localID) {
- this._updateLocalRecord(record);
+ // Only grab commands from the server; local name/type always wins
+ this.engine.localCommands = record.commands;
} else {
- this._updateRemoteRecord(record);
- }
- },
-
- _updateLocalRecord(record) {
- // Local changes for our client means we're clearing commands or
- // uploading a new record.
- let incomingCommands = record.commands;
- if (incomingCommands) {
- // Filter out incoming commands that we've cleared.
- incomingCommands = incomingCommands.filter(action =>
- !hasDupeCommand(this.engine._clearedCommands, action));
- if (!incomingCommands.length) {
- // Use `undefined` instead of `null` to avoid creating a null field
- // in the uploaded record.
- incomingCommands = undefined;
- }
+ this._remoteClients[record.id] = record.cleartext;
}
- // Only grab commands from the server; local name/type always wins
- this.engine.localCommands = incomingCommands;
- },
-
- _updateRemoteRecord(record) {
- let currentRecord = this._remoteClients[record.id];
- if (!currentRecord || !currentRecord.commands ||
- !(record.id in this.engine._modified)) {
-
- // If we have a new incoming record or no outgoing commands, use the
- // full incoming record from the server.
- this._remoteClients[record.id] = record.cleartext;
- return;
- }
-
- // Otherwise, we have outgoing commands for a client, so merge them
- // with the commands that we downloaded from the server.
- for (let action of currentRecord.commands) {
- if (hasDupeCommand(record.cleartext.commands, action)) {
- // Ignore commands the server already knows about.
- continue;
- }
- if (record.cleartext.commands) {
- record.cleartext.commands.push(action);
- } else {
- record.cleartext.commands = [action];
- }
- }
- this._remoteClients[record.id] = record.cleartext;
},
createRecord: function createRecord(id, collection) {
let record = new ClientsRec(collection, id);
+ const commandsChanges = this.engine._currentlySyncingCommands ?
+ this.engine._currentlySyncingCommands[id] :
+ [];
+
// Package the individual components into a record for the local client
if (id == this.engine.localID) {
let cb = Async.makeSpinningCallback();
fxAccounts.getDeviceId().then(id => cb(null, id), cb);
try {
record.fxaDeviceId = cb.wait();
} catch(error) {
this._log.warn("failed to get fxa device id", error);
}
record.name = this.engine.localName;
record.type = this.engine.localType;
- record.commands = this.engine.localCommands;
record.version = Services.appinfo.version;
record.protocols = SUPPORTED_PROTOCOL_VERSIONS;
+ // Substract the commands we recorded that we've already executed
+ if (commandsChanges && commandsChanges.length &&
+ this.engine.localCommands && this.engine.localCommands.length) {
+ record.commands = this.engine.localCommands.filter(command => !hasDupeCommand(commandsChanges, command));
+ }
+
// Optional fields.
record.os = Services.appinfo.OS; // "Darwin"
record.appPackage = Services.appinfo.ID;
record.application = this.engine.brandName // "Nightly"
// We can't compute these yet.
// record.device = ""; // Bug 1100723
// record.formfactor = ""; // Bug 1100722
} else {
record.cleartext = this._remoteClients[id];
+
+ // Add the commands we have to send
+ if (commandsChanges && commandsChanges.length) {
+ const recordCommands = record.cleartext.commands || [];
+ const newCommands = commandsChanges.filter(command => !hasDupeCommand(recordCommands, command));
+ record.cleartext.commands = recordCommands.concat(newCommands);
+ }
+
if (record.cleartext.stale) {
// It's almost certainly a logic error for us to upload a record we
// consider stale, so make log noise, but still remove the flag.
this._log.error(`Preparing to upload record ${id} that we consider stale`);
delete record.cleartext.stale;
}
}
--- a/services/sync/tests/unit/test_clients_engine.js
+++ b/services/sync/tests/unit/test_clients_engine.js
@@ -364,23 +364,24 @@ add_test(function test_send_command() {
let remoteRecord = store.createRecord(remoteId, "clients");
let action = "testCommand";
let args = ["foo", "bar"];
engine._sendCommandToClient(action, args, remoteId);
let newRecord = store._remoteClients[remoteId];
+ let clientCommands = engine._readCommands()[remoteId];
notEqual(newRecord, undefined);
- equal(newRecord.commands.length, 1);
+ equal(clientCommands.length, 1);
- let command = newRecord.commands[0];
+ let command = clientCommands[0];
equal(command.command, action);
equal(command.args.length, 2);
- equal(command.args, args);
+ deepEqual(command.args, args);
notEqual(tracker.changedIDs[remoteId], undefined);
run_next_test();
});
add_test(function test_command_validation() {
_("Verifies that command validation works properly.");
@@ -408,29 +409,31 @@ add_test(function test_command_validatio
store.create(rec);
store.createRecord(remoteId, "clients");
engine.sendCommand(action, args, remoteId);
let newRecord = store._remoteClients[remoteId];
notEqual(newRecord, undefined);
+ let clientCommands = engine._readCommands()[remoteId];
+
if (expectedResult) {
_("Ensuring command is sent: " + action);
- equal(newRecord.commands.length, 1);
+ equal(clientCommands.length, 1);
- let command = newRecord.commands[0];
+ let command = clientCommands[0];
equal(command.command, action);
- equal(command.args, args);
+ deepEqual(command.args, args);
notEqual(engine._tracker, undefined);
notEqual(engine._tracker.changedIDs[remoteId], undefined);
} else {
_("Ensuring command is scrubbed: " + action);
- equal(newRecord.commands, undefined);
+ equal(clientCommands, undefined);
if (store._tracker) {
equal(engine._tracker[remoteId], undefined);
}
}
}
run_next_test();
@@ -447,29 +450,31 @@ add_test(function test_command_duplicati
let action = "resetAll";
let args = [];
engine.sendCommand(action, args, remoteId);
engine.sendCommand(action, args, remoteId);
let newRecord = store._remoteClients[remoteId];
- equal(newRecord.commands.length, 1);
+ let clientCommands = engine._readCommands()[remoteId];
+ equal(clientCommands.length, 1);
_("Check variant args length");
- newRecord.commands = [];
+ engine._saveCommands({});
action = "resetEngine";
engine.sendCommand(action, [{ x: "foo" }], remoteId);
engine.sendCommand(action, [{ x: "bar" }], remoteId);
_("Make sure we spot a real dupe argument.");
engine.sendCommand(action, [{ x: "bar" }], remoteId);
- equal(newRecord.commands.length, 2);
+ clientCommands = engine._readCommands()[remoteId];
+ equal(clientCommands.length, 2);
run_next_test();
});
add_test(function test_command_invalid_client() {
_("Ensures invalid client IDs are caught");
let id = Utils.makeGUID();
@@ -703,17 +708,18 @@ add_test(function test_command_sync() {
_("Checking remote record was downloaded.");
let clientRecord = engine._store._remoteClients[remoteId];
notEqual(clientRecord, undefined);
equal(clientRecord.commands.length, 0);
_("Send a command to the remote client.");
engine.sendCommand("wipeAll", []);
- equal(clientRecord.commands.length, 1);
+ let clientCommands = engine._readCommands()[remoteId];
+ equal(clientCommands.length, 1);
engine._sync();
_("Checking record was uploaded.");
notEqual(clientWBO(engine.localID).payload, undefined);
ok(engine.lastRecordUpload > 0);
notEqual(clientWBO(remoteId).payload, undefined);
@@ -759,19 +765,20 @@ add_test(function test_send_uri_to_clien
let uri = "http://www.mozilla.org/";
let title = "Title of the Page";
engine.sendURIToClientForDisplay(uri, remoteId, title);
let newRecord = store._remoteClients[remoteId];
notEqual(newRecord, undefined);
- equal(newRecord.commands.length, 1);
+ let clientCommands = engine._readCommands()[remoteId];
+ equal(clientCommands.length, 1);
- let command = newRecord.commands[0];
+ let command = clientCommands[0];
equal(command.command, "displayURI");
equal(command.args.length, 3);
equal(command.args[0], uri);
equal(command.args[1], engine.localID);
equal(command.args[2], title);
ok(tracker.score > initialScore);
ok(tracker.score - initialScore >= SCORE_INCREMENT_XLARGE);
@@ -937,16 +944,295 @@ add_test(function test_merge_commands()
try {
server.deleteCollections("foo");
} finally {
server.stop(run_next_test);
}
}
});
+add_test(function test_duplicate_remote_commands() {
+ _("Verifies local commands for remote clients are sent only once (bug 1289287)");
+
+ let now = Date.now() / 1000;
+ let contents = {
+ meta: {global: {engines: {clients: {version: engine.version,
+ syncID: engine.syncID}}}},
+ clients: {},
+ crypto: {}
+ };
+ let server = serverForUsers({"foo": "password"}, contents);
+ let user = server.user("foo");
+
+ new SyncTestingInfrastructure(server.server);
+ generateNewKeys(Service.collectionKeys);
+
+ let desktopID = Utils.makeGUID();
+ server.insertWBO("foo", "clients", new ServerWBO(desktopID, encryptPayload({
+ id: desktopID,
+ name: "Desktop client",
+ type: "desktop",
+ commands: [],
+ version: "48",
+ protocols: ["1.5"],
+ }), now - 10));
+
+ try {
+ let store = engine._store;
+
+ _("First sync. 1 record downloaded.");
+ strictEqual(engine.lastRecordUpload, 0);
+ engine._sync();
+
+ _("Send tab to client");
+ engine.sendCommand("displayURI", ["https://example.com", engine.localID, "Yak Herders Anonymous"]);
+ engine._sync();
+
+ _("Simulate the desktop client consuming the command and syncing to the server");
+ server.insertWBO("foo", "clients", new ServerWBO(desktopID, encryptPayload({
+ id: desktopID,
+ name: "Desktop client",
+ type: "desktop",
+ commands: [],
+ version: "48",
+ protocols: ["1.5"],
+ }), now - 10));
+
+ _("Send another tab to the desktop client");
+ engine.sendCommand("displayURI", ["https://foobar.com", engine.localID, "Foo bar!"], desktopID);
+ engine._sync();
+
+ let collection = server.getCollection("foo", "clients");
+ let desktopPayload = JSON.parse(JSON.parse(collection.payload(desktopID)).ciphertext);
+ deepEqual(desktopPayload.commands, [{
+ command: "displayURI",
+ args: ["https://foobar.com", engine.localID, "Foo bar!"],
+ }], "Should only send the second command to the desktop client");
+ } finally {
+ Svc.Prefs.resetBranch("");
+ Service.recordManager.clearCache();
+ engine._resetClient();
+
+ try {
+ server.deleteCollections("foo");
+ } finally {
+ server.stop(run_next_test);
+ }
+ }
+});
+
+add_test(function test_upload_after_reboot() {
+ _("Multiple downloads, reboot, then upload (bug 1289287)");
+
+ let now = Date.now() / 1000;
+ let contents = {
+ meta: {global: {engines: {clients: {version: engine.version,
+ syncID: engine.syncID}}}},
+ clients: {},
+ crypto: {}
+ };
+ let server = serverForUsers({"foo": "password"}, contents);
+ let user = server.user("foo");
+
+ new SyncTestingInfrastructure(server.server);
+ generateNewKeys(Service.collectionKeys);
+
+ let deviceBID = Utils.makeGUID();
+ let deviceCID = Utils.makeGUID();
+ server.insertWBO("foo", "clients", new ServerWBO(deviceBID, encryptPayload({
+ id: deviceBID,
+ name: "Device B",
+ type: "desktop",
+ commands: [{
+ command: "displayURI", args: ["https://deviceclink.com", deviceCID, "Device C link"]
+ }],
+ version: "48",
+ protocols: ["1.5"],
+ }), now - 10));
+ server.insertWBO("foo", "clients", new ServerWBO(deviceCID, encryptPayload({
+ id: deviceCID,
+ name: "Device C",
+ type: "desktop",
+ commands: [],
+ version: "48",
+ protocols: ["1.5"],
+ }), now - 10));
+
+ try {
+ let store = engine._store;
+
+ _("First sync. 2 records downloaded.");
+ strictEqual(engine.lastRecordUpload, 0);
+ engine._sync();
+
+ _("Send tab to client");
+ engine.sendCommand("displayURI", ["https://example.com", engine.localID, "Yak Herders Anonymous"], deviceBID);
+
+ const oldUploadOutgoing = SyncEngine.prototype._uploadOutgoing;
+ SyncEngine.prototype._uploadOutgoing = () => engine._onRecordsWritten.call(engine, [], [deviceBID]);
+ engine._sync();
+
+ let collection = server.getCollection("foo", "clients");
+ let deviceBPayload = JSON.parse(JSON.parse(collection.payload(deviceBID)).ciphertext);
+ deepEqual(deviceBPayload.commands, [{
+ command: "displayURI", args: ["https://deviceclink.com", deviceCID, "Device C link"]
+ }], "Should be the same because the upload failed");
+
+ _("Simulate the client B consuming the command and syncing to the server");
+ server.insertWBO("foo", "clients", new ServerWBO(deviceBID, encryptPayload({
+ id: deviceBID,
+ name: "Device B",
+ type: "desktop",
+ commands: [],
+ version: "48",
+ protocols: ["1.5"],
+ }), now - 10));
+
+ // Simulate reboot
+ SyncEngine.prototype._uploadOutgoing = oldUploadOutgoing;
+ engine = Service.clientsEngine = new ClientEngine(Service);
+
+ engine._sync();
+
+ deviceBPayload = JSON.parse(JSON.parse(collection.payload(deviceBID)).ciphertext);
+ deepEqual(deviceBPayload.commands, [{
+ command: "displayURI",
+ args: ["https://example.com", engine.localID, "Yak Herders Anonymous"],
+ }], "Should only had written our outgoing command");
+ } finally {
+ Svc.Prefs.resetBranch("");
+ Service.recordManager.clearCache();
+ engine._resetClient();
+
+ try {
+ server.deleteCollections("foo");
+ } finally {
+ server.stop(run_next_test);
+ }
+ }
+});
+
+add_test(function test_keep_cleared_commands_after_reboot() {
+ _("Download commands, fail upload, reboot, then apply new commands (bug 1289287)");
+
+ let now = Date.now() / 1000;
+ let contents = {
+ meta: {global: {engines: {clients: {version: engine.version,
+ syncID: engine.syncID}}}},
+ clients: {},
+ crypto: {}
+ };
+ let server = serverForUsers({"foo": "password"}, contents);
+ let user = server.user("foo");
+
+ new SyncTestingInfrastructure(server.server);
+ generateNewKeys(Service.collectionKeys);
+
+ let deviceBID = Utils.makeGUID();
+ let deviceCID = Utils.makeGUID();
+ server.insertWBO("foo", "clients", new ServerWBO(engine.localID, encryptPayload({
+ id: engine.localID,
+ name: "Device A",
+ type: "desktop",
+ commands: [{
+ command: "displayURI", args: ["https://deviceblink.com", deviceBID, "Device B link"]
+ },
+ {
+ command: "displayURI", args: ["https://deviceclink.com", deviceCID, "Device C link"]
+ }],
+ version: "48",
+ protocols: ["1.5"],
+ }), now - 10));
+ server.insertWBO("foo", "clients", new ServerWBO(deviceBID, encryptPayload({
+ id: deviceBID,
+ name: "Device B",
+ type: "desktop",
+ commands: [],
+ version: "48",
+ protocols: ["1.5"],
+ }), now - 10));
+ server.insertWBO("foo", "clients", new ServerWBO(deviceCID, encryptPayload({
+ id: deviceCID,
+ name: "Device C",
+ type: "desktop",
+ commands: [],
+ version: "48",
+ protocols: ["1.5"],
+ }), now - 10));
+
+ try {
+ let store = engine._store;
+
+ _("First sync. Download remote and our record.");
+ strictEqual(engine.lastRecordUpload, 0);
+
+ let collection = server.getCollection("foo", "clients");
+ const oldUploadOutgoing = SyncEngine.prototype._uploadOutgoing;
+ SyncEngine.prototype._uploadOutgoing = () => engine._onRecordsWritten.call(engine, [], [deviceBID]);
+ let commandsProcessed = 0;
+ engine._handleDisplayURIs = (uris) => { commandsProcessed = uris.length };
+
+ engine._sync();
+ engine.processIncomingCommands(); // Not called by the engine.sync(), gotta call it ourselves
+ equal(commandsProcessed, 2, "We processed 2 commands");
+
+ let localRemoteRecord = JSON.parse(JSON.parse(collection.payload(engine.localID)).ciphertext);
+ deepEqual(localRemoteRecord.commands, [{
+ command: "displayURI", args: ["https://deviceblink.com", deviceBID, "Device B link"]
+ },
+ {
+ command: "displayURI", args: ["https://deviceclink.com", deviceCID, "Device C link"]
+ }], "Should be the same because the upload failed");
+
+ // Another client sends another link
+ server.insertWBO("foo", "clients", new ServerWBO(engine.localID, encryptPayload({
+ id: engine.localID,
+ name: "Device A",
+ type: "desktop",
+ commands: [{
+ command: "displayURI", args: ["https://deviceblink.com", deviceBID, "Device B link"]
+ },
+ {
+ command: "displayURI", args: ["https://deviceclink.com", deviceCID, "Device C link"]
+ },
+ {
+ command: "displayURI", args: ["https://deviceclink2.com", deviceCID, "Device C link 2"]
+ }],
+ version: "48",
+ protocols: ["1.5"],
+ }), now - 10));
+
+ // Simulate reboot
+ SyncEngine.prototype._uploadOutgoing = oldUploadOutgoing;
+ engine = Service.clientsEngine = new ClientEngine(Service);
+
+ commandsProcessed = 0;
+ engine._handleDisplayURIs = (uris) => { commandsProcessed = uris.length };
+ engine._sync();
+ engine.processIncomingCommands();
+ equal(commandsProcessed, 1, "We processed one command (the other were cleared)");
+
+ localRemoteRecord = JSON.parse(JSON.parse(collection.payload(deviceBID)).ciphertext);
+ deepEqual(localRemoteRecord.commands, [], "Should be empty");
+ } finally {
+ Svc.Prefs.resetBranch("");
+ Service.recordManager.clearCache();
+
+ // Reset service (remove mocks)
+ engine = Service.clientsEngine = new ClientEngine(Service);
+ engine._resetClient();
+
+ try {
+ server.deleteCollections("foo");
+ } finally {
+ server.stop(run_next_test);
+ }
+ }
+});
+
add_test(function test_deleted_commands() {
_("Verifies commands for a deleted client are discarded");
let now = Date.now() / 1000;
let contents = {
meta: {global: {engines: {clients: {version: engine.version,
syncID: engine.syncID}}}},
clients: {},
@@ -1045,23 +1331,27 @@ add_test(function test_send_uri_ack() {
_("Sync again");
engine._sync();
deepEqual(engine.localCommands, [{
command: "displayURI",
args: ["https://example.com", fakeSenderID, "Yak Herders Anonymous"],
}], "Should receive incoming URI");
ok(engine.processIncomingCommands(), "Should process incoming commands");
- ok(!engine.localCommands, "Should clear commands after processing");
+ const clearedCommands = engine._readCommands()[engine.localID];
+ deepEqual(clearedCommands, [{
+ command: "displayURI",
+ args: ["https://example.com", fakeSenderID, "Yak Herders Anonymous"],
+ }], "Should mark the commands as cleared after processing");
_("Check that the command was removed on the server");
engine._sync();
ourPayload = JSON.parse(JSON.parse(collection.payload(engine.localID)).ciphertext);
ok(ourPayload, "Should upload the synced client record");
- ok(!ourPayload.commands, "Should not reupload cleared commands");
+ deepEqual(ourPayload.commands, [], "Should not reupload cleared commands");
} finally {
Svc.Prefs.resetBranch("");
Service.recordManager.clearCache();
engine._resetClient();
try {
server.deleteCollections("foo");
} finally {