--- a/services/sync/modules/engines/clients.js
+++ b/services/sync/modules/engines/clients.js
@@ -246,20 +246,21 @@ ClientEngine.prototype = {
})
);
},
_addClientCommand(clientId, command) {
const allCommands = this._readCommands();
const clientCommands = allCommands[clientId] || [];
if (hasDupeCommand(clientCommands, command)) {
- return;
+ return false;
}
allCommands[clientId] = clientCommands.concat(command);
this._saveCommands(allCommands);
+ return true;
},
_syncStartup: function _syncStartup() {
// Reupload new client record periodically.
if (Date.now() / 1000 - this.lastRecordUpload > CLIENTS_TTL_REFRESH) {
this._tracker.addChangedID(this.localID);
this.lastRecordUpload = Date.now() / 1000;
}
@@ -472,35 +473,45 @@ ClientEngine.prototype = {
/**
* Sends a command+args pair to a specific client.
*
* @param command Command string
* @param args Array of arguments/data for command
* @param clientId Client to send command to
*/
- _sendCommandToClient: function sendCommandToClient(command, args, clientId) {
+ _sendCommandToClient: function sendCommandToClient(command, args, clientId, flowID = null) {
this._log.trace("Sending " + command + " to " + clientId);
let client = this._store._remoteClients[clientId];
if (!client) {
throw new Error("Unknown remote client ID: '" + clientId + "'.");
}
if (client.stale) {
throw new Error("Stale remote client ID: '" + clientId + "'.");
}
let action = {
command: command,
args: args,
+ flowID: flowID || Utils.makeGUID(), // used for telemetry.
};
- this._log.trace("Client " + clientId + " got a new action: " + [command, args]);
- this._addClientCommand(clientId, action);
- this._tracker.addChangedID(clientId);
+ if (this._addClientCommand(clientId, action)) {
+ this._log.trace(`Client ${clientId} got a new action`, [command, args]);
+ this._tracker.addChangedID(clientId);
+ let deviceID;
+ try {
+ deviceID = this.service.identity.hashedDeviceID(clientId);
+ } catch (_) {}
+ this.service.recordTelemetryEvent("sendcommand", command, undefined,
+ { flowID: action.flowID, deviceID });
+ } else {
+ this._log.trace(`Client ${clientId} got a duplicate action`, [command, args]);
+ }
},
/**
* Check if the local client has any remote commands and perform them.
*
* @return false to abort sync
*/
processIncomingCommands: function processIncomingCommands() {
@@ -510,19 +521,22 @@ ClientEngine.prototype = {
}
const clearedCommands = this._readCommands()[this.localID];
const commands = this.localCommands.filter(command => !hasDupeCommand(clearedCommands, command));
let URIsToDisplay = [];
// Process each command in order.
for (let rawCommand of commands) {
- let {command, args} = rawCommand;
+ let {command, args, flowID} = rawCommand;
this._log.debug("Processing command: " + command + "(" + args + ")");
+ this.service.recordTelemetryEvent("processcommand", command, undefined,
+ { flowID });
+
let engines = [args[0]];
switch (command) {
case "resetAll":
engines = null;
// Fallthrough
case "resetEngine":
this.service.resetClient(engines);
break;
@@ -565,37 +579,40 @@ ClientEngine.prototype = {
*
* @param command
* Command to invoke on remote clients
* @param args
* Array of arguments to give to the command
* @param clientId
* Client ID to send command to. If undefined, send to all remote
* clients.
+ * @param flowID
+ * A unique identifier used to track success for this operation across
+ * devices.
*/
- sendCommand: function sendCommand(command, args, clientId) {
+ sendCommand: function sendCommand(command, args, clientId, flowID = null) {
let commandData = this._commands[command];
// Don't send commands that we don't know about.
if (!commandData) {
this._log.error("Unknown command to send: " + command);
return;
}
// Don't send a command with the wrong number of arguments.
else if (!args || args.length != commandData.args) {
this._log.error("Expected " + commandData.args + " args for '" +
command + "', but got " + args);
return;
}
if (clientId) {
- this._sendCommandToClient(command, args, clientId);
+ this._sendCommandToClient(command, args, clientId, flowID);
} else {
for (let [id, record] of Object.entries(this._store._remoteClients)) {
if (!record.stale) {
- this._sendCommandToClient(command, args, id);
+ this._sendCommandToClient(command, args, id, flowID);
}
}
}
},
/**
* Send a URI to another client for display.
*
--- a/services/sync/tests/unit/test_clients_engine.js
+++ b/services/sync/tests/unit/test_clients_engine.js
@@ -384,16 +384,17 @@ add_test(function test_send_command() {
let clientCommands = engine._readCommands()[remoteId];
notEqual(newRecord, undefined);
equal(clientCommands.length, 1);
let command = clientCommands[0];
equal(command.command, action);
equal(command.args.length, 2);
deepEqual(command.args, args);
+ ok(command.flowID);
notEqual(tracker.changedIDs[remoteId], undefined);
engine._tracker.clearChangedIDs();
run_next_test();
});
add_test(function test_command_validation() {
@@ -627,22 +628,22 @@ add_task(async function test_filter_dupl
equal(counts.newFailed, 0);
_("Broadcast logout to all clients");
engine.sendCommand("logout", []);
engine._sync();
let collection = server.getCollection("foo", "clients");
let recentPayload = JSON.parse(JSON.parse(collection.payload(recentID)).ciphertext);
- deepEqual(recentPayload.commands, [{ command: "logout", args: [] }],
- "Should send commands to the recent client");
+ compareCommands(recentPayload.commands, [{ command: "logout", args: [] }],
+ "Should send commands to the recent client");
let oldPayload = JSON.parse(JSON.parse(collection.payload(oldID)).ciphertext);
- deepEqual(oldPayload.commands, [{ command: "logout", args: [] }],
- "Should send commands to the week-old client");
+ compareCommands(oldPayload.commands, [{ command: "logout", args: [] }],
+ "Should send commands to the week-old client");
let dupePayload = JSON.parse(JSON.parse(collection.payload(dupeID)).ciphertext);
deepEqual(dupePayload.commands, [],
"Should not send commands to the dupe client");
_("Update the dupe client's modified time");
server.insertWBO("foo", "clients", new ServerWBO(dupeID, encryptPayload({
id: dupeID,
@@ -909,29 +910,31 @@ add_task(async function test_merge_comma
let desktopID = Utils.makeGUID();
server.insertWBO("foo", "clients", new ServerWBO(desktopID, encryptPayload({
id: desktopID,
name: "Desktop client",
type: "desktop",
commands: [{
command: "displayURI",
args: ["https://example.com", engine.localID, "Yak Herders Anonymous"],
+ flowID: Utils.makeGUID(),
}],
version: "48",
protocols: ["1.5"],
}), now - 10));
let mobileID = Utils.makeGUID();
server.insertWBO("foo", "clients", new ServerWBO(mobileID, encryptPayload({
id: mobileID,
name: "Mobile client",
type: "mobile",
commands: [{
command: "logout",
args: [],
+ flowID: Utils.makeGUID(),
}],
version: "48",
protocols: ["1.5"],
}), now - 10));
try {
let store = engine._store;
@@ -940,27 +943,27 @@ add_task(async function test_merge_comma
engine._sync();
_("Broadcast logout to all clients");
engine.sendCommand("logout", []);
engine._sync();
let collection = server.getCollection("foo", "clients");
let desktopPayload = JSON.parse(JSON.parse(collection.payload(desktopID)).ciphertext);
- deepEqual(desktopPayload.commands, [{
+ compareCommands(desktopPayload.commands, [{
command: "displayURI",
args: ["https://example.com", engine.localID, "Yak Herders Anonymous"],
}, {
command: "logout",
args: [],
}], "Should send the logout command to the desktop client");
let mobilePayload = JSON.parse(JSON.parse(collection.payload(mobileID)).ciphertext);
- deepEqual(mobilePayload.commands, [{ command: "logout", args: [] }],
- "Should not send a duplicate logout to the mobile client");
+ compareCommands(mobilePayload.commands, [{ command: "logout", args: [] }],
+ "Should not send a duplicate logout to the mobile client");
} finally {
Svc.Prefs.resetBranch("");
Service.recordManager.clearCache();
engine._resetClient();
try {
server.deleteCollections("foo");
} finally {
@@ -1017,17 +1020,17 @@ add_task(async function test_duplicate_r
}), now - 10));
_("Send another tab to the desktop client");
engine.sendCommand("displayURI", ["https://foobar.com", engine.localID, "Foo bar!"], desktopID);
engine._sync();
let collection = server.getCollection("foo", "clients");
let desktopPayload = JSON.parse(JSON.parse(collection.payload(desktopID)).ciphertext);
- deepEqual(desktopPayload.commands, [{
+ compareCommands(desktopPayload.commands, [{
command: "displayURI",
args: ["https://foobar.com", engine.localID, "Foo bar!"],
}], "Should only send the second command to the desktop client");
} finally {
Svc.Prefs.resetBranch("");
Service.recordManager.clearCache();
engine._resetClient();
@@ -1057,17 +1060,19 @@ add_task(async function test_upload_afte
let deviceBID = Utils.makeGUID();
let deviceCID = Utils.makeGUID();
server.insertWBO("foo", "clients", new ServerWBO(deviceBID, encryptPayload({
id: deviceBID,
name: "Device B",
type: "desktop",
commands: [{
- command: "displayURI", args: ["https://deviceclink.com", deviceCID, "Device C link"]
+ command: "displayURI",
+ args: ["https://deviceclink.com", deviceCID, "Device C link"],
+ flowID: Utils.makeGUID(),
}],
version: "48",
protocols: ["1.5"],
}), now - 10));
server.insertWBO("foo", "clients", new ServerWBO(deviceCID, encryptPayload({
id: deviceCID,
name: "Device C",
type: "desktop",
@@ -1087,17 +1092,17 @@ add_task(async function test_upload_afte
engine.sendCommand("displayURI", ["https://example.com", engine.localID, "Yak Herders Anonymous"], deviceBID);
const oldUploadOutgoing = SyncEngine.prototype._uploadOutgoing;
SyncEngine.prototype._uploadOutgoing = () => engine._onRecordsWritten.call(engine, [], [deviceBID]);
engine._sync();
let collection = server.getCollection("foo", "clients");
let deviceBPayload = JSON.parse(JSON.parse(collection.payload(deviceBID)).ciphertext);
- deepEqual(deviceBPayload.commands, [{
+ compareCommands(deviceBPayload.commands, [{
command: "displayURI", args: ["https://deviceclink.com", deviceCID, "Device C link"]
}], "Should be the same because the upload failed");
_("Simulate the client B consuming the command and syncing to the server");
server.insertWBO("foo", "clients", new ServerWBO(deviceBID, encryptPayload({
id: deviceBID,
name: "Device B",
type: "desktop",
@@ -1108,17 +1113,17 @@ add_task(async function test_upload_afte
// Simulate reboot
SyncEngine.prototype._uploadOutgoing = oldUploadOutgoing;
engine = Service.clientsEngine = new ClientEngine(Service);
engine._sync();
deviceBPayload = JSON.parse(JSON.parse(collection.payload(deviceBID)).ciphertext);
- deepEqual(deviceBPayload.commands, [{
+ compareCommands(deviceBPayload.commands, [{
command: "displayURI",
args: ["https://example.com", engine.localID, "Yak Herders Anonymous"],
}], "Should only had written our outgoing command");
} finally {
Svc.Prefs.resetBranch("");
Service.recordManager.clearCache();
engine._resetClient();
@@ -1148,20 +1153,24 @@ add_task(async function test_keep_cleare
let deviceBID = Utils.makeGUID();
let deviceCID = Utils.makeGUID();
server.insertWBO("foo", "clients", new ServerWBO(engine.localID, encryptPayload({
id: engine.localID,
name: "Device A",
type: "desktop",
commands: [{
- command: "displayURI", args: ["https://deviceblink.com", deviceBID, "Device B link"]
+ command: "displayURI",
+ args: ["https://deviceblink.com", deviceBID, "Device B link"],
+ flowID: Utils.makeGUID(),
},
{
- command: "displayURI", args: ["https://deviceclink.com", deviceCID, "Device C link"]
+ command: "displayURI",
+ args: ["https://deviceclink.com", deviceCID, "Device C link"],
+ flowID: Utils.makeGUID(),
}],
version: "48",
protocols: ["1.5"],
}), now - 10));
server.insertWBO("foo", "clients", new ServerWBO(deviceBID, encryptPayload({
id: deviceBID,
name: "Device B",
type: "desktop",
@@ -1190,36 +1199,42 @@ add_task(async function test_keep_cleare
let commandsProcessed = 0;
engine._handleDisplayURIs = (uris) => { commandsProcessed = uris.length };
engine._sync();
engine.processIncomingCommands(); // Not called by the engine.sync(), gotta call it ourselves
equal(commandsProcessed, 2, "We processed 2 commands");
let localRemoteRecord = JSON.parse(JSON.parse(collection.payload(engine.localID)).ciphertext);
- deepEqual(localRemoteRecord.commands, [{
+ compareCommands(localRemoteRecord.commands, [{
command: "displayURI", args: ["https://deviceblink.com", deviceBID, "Device B link"]
},
{
command: "displayURI", args: ["https://deviceclink.com", deviceCID, "Device C link"]
}], "Should be the same because the upload failed");
// Another client sends another link
server.insertWBO("foo", "clients", new ServerWBO(engine.localID, encryptPayload({
id: engine.localID,
name: "Device A",
type: "desktop",
commands: [{
- command: "displayURI", args: ["https://deviceblink.com", deviceBID, "Device B link"]
+ command: "displayURI",
+ args: ["https://deviceblink.com", deviceBID, "Device B link"],
+ flowID: Utils.makeGUID(),
},
{
- command: "displayURI", args: ["https://deviceclink.com", deviceCID, "Device C link"]
+ command: "displayURI",
+ args: ["https://deviceclink.com", deviceCID, "Device C link"],
+ flowID: Utils.makeGUID(),
},
{
- command: "displayURI", args: ["https://deviceclink2.com", deviceCID, "Device C link 2"]
+ command: "displayURI",
+ args: ["https://deviceclink2.com", deviceCID, "Device C link 2"],
+ flowID: Utils.makeGUID(),
}],
version: "48",
protocols: ["1.5"],
}), now - 10));
// Simulate reboot
SyncEngine.prototype._uploadOutgoing = oldUploadOutgoing;
engine = Service.clientsEngine = new ClientEngine(Service);
@@ -1297,17 +1312,17 @@ add_task(async function test_deleted_com
_("Broadcast a command to all clients");
engine.sendCommand("logout", []);
engine._sync();
deepEqual(collection.keys().sort(), [activeID, engine.localID].sort(),
"Should not reupload deleted clients");
let activePayload = JSON.parse(JSON.parse(collection.payload(activeID)).ciphertext);
- deepEqual(activePayload.commands, [{ command: "logout", args: [] }],
+ compareCommands(activePayload.commands, [{ command: "logout", args: [] }],
"Should send the command to the active client");
} finally {
Svc.Prefs.resetBranch("");
Service.recordManager.clearCache();
engine._resetClient();
try {
server.deleteCollections("foo");
@@ -1341,28 +1356,29 @@ add_task(async function test_send_uri_ac
let collection = server.getCollection("foo", "clients");
let ourPayload = JSON.parse(JSON.parse(collection.payload(engine.localID)).ciphertext);
ok(ourPayload, "Should upload our client record");
_("Send a URL to the device on the server");
ourPayload.commands = [{
command: "displayURI",
args: ["https://example.com", fakeSenderID, "Yak Herders Anonymous"],
+ flowID: Utils.makeGUID(),
}];
server.insertWBO("foo", "clients", new ServerWBO(engine.localID, encryptPayload(ourPayload), now));
_("Sync again");
engine._sync();
- deepEqual(engine.localCommands, [{
+ compareCommands(engine.localCommands, [{
command: "displayURI",
args: ["https://example.com", fakeSenderID, "Yak Herders Anonymous"],
}], "Should receive incoming URI");
ok(engine.processIncomingCommands(), "Should process incoming commands");
const clearedCommands = engine._readCommands()[engine.localID];
- deepEqual(clearedCommands, [{
+ compareCommands(clearedCommands, [{
command: "displayURI",
args: ["https://example.com", fakeSenderID, "Yak Herders Anonymous"],
}], "Should mark the commands as cleared after processing");
_("Check that the command was removed on the server");
engine._sync();
ourPayload = JSON.parse(JSON.parse(collection.payload(engine.localID)).ciphertext);
ok(ourPayload, "Should upload the synced client record");