--- a/services/sync/modules/engines/clients.js
+++ b/services/sync/modules/engines/clients.js
@@ -20,16 +20,21 @@ Cu.import("resource://gre/modules/Servic
XPCOMUtils.defineLazyModuleGetter(this, "fxAccounts",
"resource://gre/modules/FxAccounts.jsm");
const CLIENTS_TTL = 1814400; // 21 days
const CLIENTS_TTL_REFRESH = 604800; // 7 days
const SUPPORTED_PROTOCOL_VERSIONS = ["1.1", "1.5"];
+function hasDupeCommand(commands, action) {
+ return commands.some(other => other.command == action.command &&
+ Utils.deepEquals(other.args, action.args));
+}
+
this.ClientsRec = function ClientsRec(collection, id) {
CryptoWrapper.call(this, collection, id);
}
ClientsRec.prototype = {
__proto__: CryptoWrapper.prototype,
_logName: "Sync.Record.Clients",
ttl: CLIENTS_TTL
};
@@ -145,16 +150,37 @@ ClientEngine.prototype = {
// Reupload new client record periodically.
if (Date.now() / 1000 - this.lastRecordUpload > CLIENTS_TTL_REFRESH) {
this._tracker.addChangedID(this.localID);
this.lastRecordUpload = Date.now() / 1000;
}
SyncEngine.prototype._syncStartup.call(this);
},
+ _processIncoming() {
+ // Fetch all records from the server.
+ this.lastSync = 0;
+ this._incomingClients = [];
+ try {
+ SyncEngine.prototype._processIncoming.call(this);
+ // Since clients are synced unconditionally, any records in the local store
+ // that don't exist on the server must be for disconnected clients. Remove
+ // them, so that we don't upload records with commands for clients that will
+ // never see them. We also do this to filter out stale clients from the
+ // tabs collection, since showing their list of tabs is confusing.
+ let remoteClientIDs = Object.keys(this._store._remoteClients);
+ let staleIDs = Utils.arraySub(remoteClientIDs, this._incomingClients);
+ for (let staleID of staleIDs) {
+ this._removeRemoteClient(staleID);
+ }
+ } finally {
+ this._incomingClients = null;
+ }
+ },
+
_syncFinish() {
// Record telemetry for our device types.
for (let [deviceType, count] of this.deviceTypes) {
let hid;
switch (deviceType) {
case "desktop":
hid = "WEAVE_DEVICE_COUNT_DESKTOP";
break;
@@ -165,19 +191,32 @@ ClientEngine.prototype = {
this._log.warn(`Unexpected deviceType "${deviceType}" recording device telemetry.`);
continue;
}
Services.telemetry.getHistogramById(hid).add(count);
}
SyncEngine.prototype._syncFinish.call(this);
},
- // Always process incoming items because they might have commands
- _reconcile: function _reconcile() {
- return true;
+ _reconcile: function _reconcile(item) {
+ // Every incoming record is reconciled, so we use this to track the
+ // contents of the collection on the server.
+ this._incomingClients.push(item.id);
+
+ if (!this._store.itemExists(item.id)) {
+ return true;
+ }
+ // Clients are synced unconditionally, so we'll always have new records.
+ // Unfortunately, this will cause the scheduler to use the immediate sync
+ // interval for the multi-device case, instead of the active interval. We
+ // work around this by updating the record during reconciliation, and
+ // returning false to indicate that the record doesn't need to be applied
+ // later.
+ this._store.update(item);
+ return false;
},
// Treat reset the same as wiping for locally cached clients
_resetClient() {
this._wipeClient();
},
_wipeClient: function _wipeClient() {
@@ -238,31 +277,26 @@ ClientEngine.prototype = {
_sendCommandToClient: function sendCommandToClient(command, args, clientId) {
this._log.trace("Sending " + command + " to " + clientId);
let client = this._store._remoteClients[clientId];
if (!client) {
throw new Error("Unknown remote client ID: '" + clientId + "'.");
}
- // notDupe compares two commands and returns if they are not equal.
- let notDupe = function(other) {
- return other.command != command || !Utils.deepEquals(other.args, args);
- };
-
let action = {
command: command,
args: args,
};
if (!client.commands) {
client.commands = [action];
}
// Add the new action if there are no duplicates.
- else if (client.commands.every(notDupe)) {
+ else if (!hasDupeCommand(client.commands, action)) {
client.commands.push(action);
}
// It must be a dupe. Skip.
else {
return;
}
this._log.trace("Client " + clientId + " got a new action: " + [command, args]);
@@ -404,35 +438,50 @@ ClientEngine.prototype = {
* send this.
*/
_handleDisplayURI: function _handleDisplayURI(uri, clientId, title) {
this._log.info("Received a URI for display: " + uri + " (" + title +
") from " + clientId);
let subject = {uri: uri, client: clientId, title: title};
Svc.Obs.notify("weave:engine:clients:display-uri", subject);
- }
+ },
+
+ _removeRemoteClient(id) {
+ delete this._store._remoteClients[id];
+ this._tracker.removeChangedID(id);
+ },
};
function ClientStore(name, engine) {
Store.call(this, name, engine);
}
ClientStore.prototype = {
__proto__: Store.prototype,
create(record) {
this.update(record)
},
update: function update(record) {
// Only grab commands from the server; local name/type always wins
if (record.id == this.engine.localID)
this.engine.localCommands = record.commands;
- else
+ else {
+ let currentRecord = this._remoteClients[record.id];
+ if (currentRecord && currentRecord.commands) {
+ // Merge commands.
+ for (let action of currentRecord.commands) {
+ if (!hasDupeCommand(record.cleartext.commands, action)) {
+ record.cleartext.commands.push(action);
+ }
+ }
+ }
this._remoteClients[record.id] = record.cleartext;
+ }
},
createRecord: function createRecord(id, collection) {
let record = new ClientsRec(collection, id);
// Package the individual components into a record for the local client
if (id == this.engine.localID) {
let cb = Async.makeSpinningCallback();
--- a/services/sync/tests/unit/test_clients_engine.js
+++ b/services/sync/tests/unit/test_clients_engine.js
@@ -188,16 +188,89 @@ add_test(function test_properties() {
engine.lastRecordUpload = now / 1000;
do_check_eq(engine.lastRecordUpload, Math.floor(now / 1000));
} finally {
Svc.Prefs.resetBranch("");
run_next_test();
}
});
+add_test(function test_full_sync() {
+ _("Ensure that Clients engine fetches all records for each sync.");
+
+ let now = Date.now() / 1000;
+ let contents = {
+ meta: {global: {engines: {clients: {version: engine.version,
+ syncID: engine.syncID}}}},
+ clients: {},
+ crypto: {}
+ };
+ let server = serverForUsers({"foo": "password"}, contents);
+ let user = server.user("foo");
+
+ new SyncTestingInfrastructure(server.server);
+ generateNewKeys(Service.collectionKeys);
+
+ let activeID = Utils.makeGUID();
+ server.insertWBO("foo", "clients", new ServerWBO(activeID, encryptPayload({
+ id: activeID,
+ name: "Active client",
+ type: "desktop",
+ commands: [],
+ version: "48",
+ protocols: ["1.5"],
+ }), now - 10));
+
+ let deletedID = Utils.makeGUID();
+ server.insertWBO("foo", "clients", new ServerWBO(deletedID, encryptPayload({
+ id: deletedID,
+ name: "Client to delete",
+ type: "desktop",
+ commands: [],
+ version: "48",
+ protocols: ["1.5"],
+ }), now - 10));
+
+ try {
+ let store = engine._store;
+
+ _("First sync. 2 records downloaded; our record uploaded.");
+ strictEqual(engine.lastRecordUpload, 0);
+ engine._sync();
+ ok(engine.lastRecordUpload > 0);
+ deepEqual(user.collection("clients").keys().sort(),
+ [activeID, deletedID, engine.localID].sort(),
+ "Our record should be uploaded on first sync");
+ deepEqual(Object.keys(store.getAllIDs()).sort(),
+ [activeID, deletedID, engine.localID].sort(),
+ "Other clients should be downloaded on first sync");
+
+ _("Delete a record, then sync again");
+ let collection = server.getCollection("foo", "clients");
+ collection.remove(deletedID);
+ // Simulate a timestamp update in info/collections.
+ engine.lastModified = now;
+ engine._sync();
+
+ _("Record should be updated");
+ deepEqual(Object.keys(store.getAllIDs()).sort(),
+ [activeID, engine.localID].sort(),
+ "Deleted client should be removed on next sync");
+ } finally {
+ Svc.Prefs.resetBranch("");
+ Service.recordManager.clearCache();
+
+ try {
+ server.deleteCollections("foo");
+ } finally {
+ server.stop(run_next_test);
+ }
+ }
+});
+
add_test(function test_sync() {
_("Ensure that Clients engine uploads a new client record once a week.");
let contents = {
meta: {global: {engines: {clients: {version: engine.version,
syncID: engine.syncID}}}},
clients: {},
crypto: {}
@@ -449,28 +522,39 @@ add_test(function test_command_sync() {
let user = server.user("foo");
let remoteId = Utils.makeGUID();
function clientWBO(id) {
return user.collection("clients").wbo(id);
}
_("Create remote client record");
- let rec = new ClientsRec("clients", remoteId);
- engine._store.create(rec);
- let remoteRecord = engine._store.createRecord(remoteId, "clients");
- engine.sendCommand("wipeAll", []);
-
- let clientRecord = engine._store._remoteClients[remoteId];
- do_check_neq(clientRecord, undefined);
- do_check_eq(clientRecord.commands.length, 1);
+ server.insertWBO("foo", "clients", new ServerWBO(remoteId, encryptPayload({
+ id: remoteId,
+ name: "Remote client",
+ type: "desktop",
+ commands: [],
+ version: "48",
+ protocols: ["1.5"],
+ }), Date.now() / 1000));
try {
_("Syncing.");
engine._sync();
+
+ _("Checking remote record was downloaded.");
+ let clientRecord = engine._store._remoteClients[remoteId];
+ do_check_neq(clientRecord, undefined);
+ do_check_eq(clientRecord.commands.length, 0);
+
+ _("Send a command to the remote client.");
+ engine.sendCommand("wipeAll", []);
+ do_check_eq(clientRecord.commands.length, 1);
+ engine._sync();
+
_("Checking record was uploaded.");
do_check_neq(clientWBO(engine.localID).payload, undefined);
do_check_true(engine.lastRecordUpload > 0);
do_check_neq(clientWBO(remoteId).payload, undefined);
Svc.Prefs.set("client.GUID", remoteId);
engine._resetClient();
@@ -482,17 +566,23 @@ add_test(function test_command_sync() {
let command = engine.localCommands[0];
do_check_eq(command.command, "wipeAll");
do_check_eq(command.args.length, 0);
} finally {
Svc.Prefs.resetBranch("");
Service.recordManager.clearCache();
- server.stop(run_next_test);
+
+ try {
+ let collection = server.getCollection("foo", "clients");
+ collection.remove(remoteId);
+ } finally {
+ server.stop(run_next_test);
+ }
}
});
add_test(function test_send_uri_to_client_for_display() {
_("Ensure sendURIToClientForDisplay() sends command properly.");
let tracker = engine._tracker;
let store = engine._store;
@@ -570,16 +660,19 @@ add_test(function test_receive_display_u
do_check_eq(data, null);
run_next_test();
};
Svc.Obs.add(ev, handler);
do_check_true(engine.processIncomingCommands());
+
+ engine._resetClient();
+ run_next_test();
});
add_test(function test_optional_client_fields() {
_("Ensure that we produce records with the fields added in Bug 1097222.");
const SUPPORTED_PROTOCOL_VERSIONS = ["1.1", "1.5"];
let local = engine._store.createRecord(engine.localID, "clients");
do_check_eq(local.name, engine.localName);
@@ -598,13 +691,161 @@ add_test(function test_optional_client_f
do_check_true(!!local.application);
// We don't currently populate device or formfactor.
// See Bug 1100722, Bug 1100723.
run_next_test();
});
+add_test(function test_merge_commands() {
+ _("Verifies local commands for remote clients are merged with the server's");
+
+ let now = Date.now() / 1000;
+ let contents = {
+ meta: {global: {engines: {clients: {version: engine.version,
+ syncID: engine.syncID}}}},
+ clients: {},
+ crypto: {}
+ };
+ let server = serverForUsers({"foo": "password"}, contents);
+ let user = server.user("foo");
+
+ new SyncTestingInfrastructure(server.server);
+ generateNewKeys(Service.collectionKeys);
+
+ let desktopID = Utils.makeGUID();
+ server.insertWBO("foo", "clients", new ServerWBO(desktopID, encryptPayload({
+ id: desktopID,
+ name: "Desktop client",
+ type: "desktop",
+ commands: [{
+ command: "displayURI",
+ args: ["https://example.com", engine.localID, "Yak Herders Anonymous"],
+ }],
+ version: "48",
+ protocols: ["1.5"],
+ }), now - 10));
+
+ let mobileID = Utils.makeGUID();
+ server.insertWBO("foo", "clients", new ServerWBO(mobileID, encryptPayload({
+ id: mobileID,
+ name: "Mobile client",
+ type: "mobile",
+ commands: [{
+ command: "logout",
+ args: [],
+ }],
+ version: "48",
+ protocols: ["1.5"],
+ }), now - 10));
+
+ try {
+ let store = engine._store;
+
+ _("First sync. 2 records downloaded.");
+ strictEqual(engine.lastRecordUpload, 0);
+ engine._sync();
+
+ _("Broadcast logout to all clients");
+ engine.sendCommand("logout", []);
+ engine._sync();
+
+ let collection = server.getCollection("foo", "clients");
+ let desktopPayload = JSON.parse(JSON.parse(collection.payload(desktopID)).ciphertext);
+ deepEqual(desktopPayload.commands, [{
+ command: "displayURI",
+ args: ["https://example.com", engine.localID, "Yak Herders Anonymous"],
+ }, {
+ command: "logout",
+ args: [],
+ }], "Should send the logout command to the desktop client");
+
+ let mobilePayload = JSON.parse(JSON.parse(collection.payload(mobileID)).ciphertext);
+ deepEqual(mobilePayload.commands, [{ command: "logout", args: [] }],
+ "Should not send a duplicate logout to the mobile client");
+ } finally {
+ Svc.Prefs.resetBranch("");
+ Service.recordManager.clearCache();
+ engine._resetClient();
+
+ try {
+ server.deleteCollections("foo");
+ } finally {
+ server.stop(run_next_test);
+ }
+ }
+});
+
+add_test(function test_deleted_commands() {
+ _("Verifies commands for a deleted client are discarded");
+
+ let now = Date.now() / 1000;
+ let contents = {
+ meta: {global: {engines: {clients: {version: engine.version,
+ syncID: engine.syncID}}}},
+ clients: {},
+ crypto: {}
+ };
+ let server = serverForUsers({"foo": "password"}, contents);
+ let user = server.user("foo");
+
+ new SyncTestingInfrastructure(server.server);
+ generateNewKeys(Service.collectionKeys);
+
+ let activeID = Utils.makeGUID();
+ server.insertWBO("foo", "clients", new ServerWBO(activeID, encryptPayload({
+ id: activeID,
+ name: "Active client",
+ type: "desktop",
+ commands: [],
+ version: "48",
+ protocols: ["1.5"],
+ }), now - 10));
+
+ let deletedID = Utils.makeGUID();
+ server.insertWBO("foo", "clients", new ServerWBO(deletedID, encryptPayload({
+ id: deletedID,
+ name: "Client to delete",
+ type: "desktop",
+ commands: [],
+ version: "48",
+ protocols: ["1.5"],
+ }), now - 10));
+
+ try {
+ let store = engine._store;
+
+ _("First sync. 2 records downloaded.");
+ engine._sync();
+
+ _("Delete a record on the server.");
+ let collection = server.getCollection("foo", "clients");
+ collection.remove(deletedID);
+
+ _("Broadcast a command to all clients");
+ engine.sendCommand("logout", []);
+ engine._sync();
+
+ deepEqual(collection.keys().sort(), [activeID, engine.localID].sort(),
+ "Should not reupload deleted clients");
+
+ let activePayload = JSON.parse(JSON.parse(collection.payload(activeID)).ciphertext);
+ deepEqual(activePayload.commands, [{ command: "logout", args: [] }],
+ "Should send the command to the active client");
+ } finally {
+ Svc.Prefs.resetBranch("");
+ Service.recordManager.clearCache();
+ engine._resetClient();
+
+ try {
+ server.deleteCollections("foo");
+ } finally {
+ server.stop(run_next_test);
+ }
+ }
+});
+
function run_test() {
initTestLogging("Trace");
Log.repository.getLogger("Sync.Engine.Clients").level = Log.Level.Trace;
run_next_test();
}