Bug 1317223 (part 1) - a collection_repair module (without any repairers) and integration with the clients engine. r?rnewman,eoger
This creates a collection_repair module, somewhat analogous to the existing
collection_validator module. This defines the public interface to request a
new repair and respond to a remote repair request, and also includes changes
to clients.js to call this public interface. collection_validator also defines
abstract base classes for the implementation of the requestors/responders, but
does not define either a requestor nor a responder - in other words, the
interface exists but no concrete requestors or repairers will be found.
There are also a couple of changes to client.js used by later patches, namely
a way to query the command queued for a client and to get the client record
itself.
The main TODO here is to consider how safe the new "don't remove the repair
request from the client engine until the repair is done" change and consider
if there is anything else we should do to make it less likely some obscure
error will cause a the command to remain pending forever (although I believe
the telemetry we record here should be enough to tell us if this is actually
happening)
Another thing we should consider is not allowing multiple repair requests to
be pending at once.
MozReview-Commit-ID: 9JPpRrLgFoR
new file mode 100644
--- /dev/null
+++ b/services/sync/modules/collection_repair.js
@@ -0,0 +1,123 @@
+/* This Source Code Form is subject to the terms of the Mozilla Public
+ * License, v. 2.0. If a copy of the MPL was not distributed with this
+ * file, You can obtain one at http://mozilla.org/MPL/2.0/. */
+"use strict";
+
+const Cu = Components.utils;
+
+Cu.import("resource://gre/modules/XPCOMUtils.jsm");
+Cu.import("resource://services-sync/main.js");
+
+this.EXPORTED_SYMBOLS = ["getRepairRequestor", "getAllRepairRequestors",
+ "CollectionRepairRequestor",
+ "getRepairResponder",
+ "CollectionRepairResponder"];
+
+// The individual requestors/responders, lazily loaded.
+const REQUESTORS = {
+}
+
+const RESPONDERS = {
+}
+
+// Should we maybe enforce the requestors being a singleton?
+function _getRepairConstructor(which, collection) {
+ if (!(collection in which)) {
+ return null;
+ }
+ let [modname, symbolname] = which[collection];
+ let ns = {};
+ Cu.import("resource://services-sync/" + modname, ns);
+ return ns[symbolname];
+}
+
+function getRepairRequestor(collection) {
+ let ctor = _getRepairConstructor(REQUESTORS, collection);
+ if (!ctor) {
+ return null;
+ }
+ return new ctor();
+}
+
+function getAllRepairRequestors() {
+ let result = {};
+ for (let collection of Object.keys(REQUESTORS)) {
+ let ctor = _getRepairConstructor(REQUESTORS, collection);
+ result[collection] = new ctor();
+ }
+ return result;
+}
+
+function getRepairResponder(collection) {
+ let ctor = _getRepairConstructor(RESPONDERS, collection);
+ if (!ctor) {
+ return null;
+ }
+ return new ctor();
+}
+
+// The abstract classes.
+class CollectionRepairRequestor {
+ constructor(service = null) {
+ // allow service to be mocked in tests.
+ this.service = service || Weave.Service;
+ }
+
+ /* See if the repairer is willing and able to begin a repair process given
+ the specified validation information.
+ Returns true if a repair was started and false otherwise.
+
+ @param validationInfo {Object}
+ The validation info as returned by the collection's validator.
+
+ @param flowID {String}
+ A guid that uniquely identifies this repair process for this
+ collection, and which should be sent to any requestors and
+ reported in telemetry.
+
+ */
+ startRepairs(validationInfo, flowID) {
+ throw new Error("not implemented");
+ }
+
+ /* Work out what state our current repair request is in, and whether it can
+ proceed to a new state.
+ Returns true if we could continue the repair - even if the state didn't
+ actually move. Returns false if we aren't actually repairing.
+
+ @param responseInfo {Object}
+ An optional response to a previous repair request, as returned
+ by a remote repair responder.
+
+ */
+ continueRepairs(responseInfo = null) {
+ throw new Error("not implemented");
+ }
+}
+
+class CollectionRepairResponder {
+ constructor(service = null) {
+ // allow service to be mocked in tests.
+ this.service = service || Weave.Service;
+ }
+
+ /* Take some action in response to a repair request. Returns a promise that
+ resolves once the repair process has started, or rejects if there
+ was an error starting the repair.
+
+ Note that when the promise resolves the repair is not yet complete - at
+ some point in the future the repair will auto-complete, at which time
+ |rawCommand| will be removed from the list of client commands for this
+ client.
+
+ @param request {Object}
+ The repair request as sent by another client.
+
+ @param rawCommand {Object}
+ The command object as stored in the clients engine, and which
+ will be automatically removed once a repair completes.
+ */
+ async repair(request, rawCommand) {
+ throw new Error("not implemented");
+ }
+}
--- a/services/sync/modules/engines/clients.js
+++ b/services/sync/modules/engines/clients.js
@@ -34,16 +34,22 @@ Cu.import("resource://services-sync/engi
Cu.import("resource://services-sync/record.js");
Cu.import("resource://services-sync/resource.js");
Cu.import("resource://services-sync/util.js");
Cu.import("resource://gre/modules/Services.jsm");
XPCOMUtils.defineLazyModuleGetter(this, "fxAccounts",
"resource://gre/modules/FxAccounts.jsm");
+XPCOMUtils.defineLazyModuleGetter(this, "getRepairRequestor",
+ "resource://services-sync/collection_repair.js");
+
+XPCOMUtils.defineLazyModuleGetter(this, "getRepairResponder",
+ "resource://services-sync/collection_repair.js");
+
const CLIENTS_TTL = 1814400; // 21 days
const CLIENTS_TTL_REFRESH = 604800; // 7 days
const STALE_CLIENT_REMOTE_AGE = 604800; // 7 days
const SUPPORTED_PROTOCOL_VERSIONS = ["1.1", "1.5"];
function hasDupeCommand(commands, action) {
if (!commands) {
@@ -95,19 +101,23 @@ ClientEngine.prototype = {
Svc.Prefs.set(this.name + ".lastRecordUpload", Math.floor(value));
},
get remoteClients() {
// return all non-stale clients for external consumption.
return Object.values(this._store._remoteClients).filter(v => !v.stale);
},
- remoteClientExists(id) {
+ remoteClient(id) {
let client = this._store._remoteClients[id];
- return !!(client && !client.stale);
+ return client && !client.stale ? client : null;
+ },
+
+ remoteClientExists(id) {
+ return !!this.remoteClient(id);
},
// Aggregate some stats on the composition of clients on this account
get stats() {
let stats = {
hasMobile: this.localType == DEVICE_TYPE_MOBILE,
names: [this.localName],
numClients: 1,
@@ -245,16 +255,27 @@ ClientEngine.prototype = {
delete this._currentlySyncingCommands;
Async.promiseSpinningly(
Utils.jsonRemove("commands-syncing", this).catch(err => {
this._log.error("Failed to delete syncing-commands file", err);
})
);
},
+ getClientCommands(clientId) {
+ const allCommands = this._readCommands();
+ return allCommands[clientId] || [];
+ },
+
+ removeLocalCommand(command) {
+ // the implementation of this engine is such that adding a command to
+ // the local client is how commands are deleted! ¯\_(ツ)_/¯
+ this._addClientCommand(this.localID, command);
+ },
+
_addClientCommand(clientId, command) {
const allCommands = this._readCommands();
const clientCommands = allCommands[clientId] || [];
if (hasDupeCommand(clientCommands, command)) {
return false;
}
allCommands[clientId] = clientCommands.concat(command);
this._saveCommands(allCommands);
@@ -475,16 +496,18 @@ ClientEngine.prototype = {
*/
_commands: {
resetAll: { args: 0, desc: "Clear temporary local data for all engines" },
resetEngine: { args: 1, desc: "Clear temporary local data for engine" },
wipeAll: { args: 0, desc: "Delete all client data for all engines" },
wipeEngine: { args: 1, desc: "Delete all client data for engine" },
logout: { args: 0, desc: "Log out client" },
displayURI: { args: 3, desc: "Instruct a client to display a URI" },
+ repairRequest: {args: 1, desc: "Instruct a client to initiate a repair"},
+ repairResponse: {args: 1, desc: "Instruct a client a repair request is complete"},
},
/**
* Sends a command+args pair to a specific client.
*
* @param command Command string
* @param args Array of arguments/data for command
* @param clientId Client to send command to
@@ -529,22 +552,24 @@ ClientEngine.prototype = {
processIncomingCommands: function processIncomingCommands() {
return this._notify("clients:process-commands", "", function() {
if (!this.localCommands) {
return true;
}
const clearedCommands = this._readCommands()[this.localID];
const commands = this.localCommands.filter(command => !hasDupeCommand(clearedCommands, command));
+ let didRemoveCommand = false;
let URIsToDisplay = [];
// Process each command in order.
for (let rawCommand of commands) {
+ let shouldRemoveCommand = true; // most commands are auto-removed.
let {command, args, flowID} = rawCommand;
- this._log.debug("Processing command: " + command + "(" + args + ")");
+ this._log.debug("Processing command " + command, args);
this.service.recordTelemetryEvent("processcommand", command, undefined,
{ flowID });
let engines = [args[0]];
switch (command) {
case "resetAll":
engines = null;
@@ -560,24 +585,75 @@ ClientEngine.prototype = {
break;
case "logout":
this.service.logout();
return false;
case "displayURI":
let [uri, clientId, title] = args;
URIsToDisplay.push({ uri, clientId, title });
break;
+ case "repairResponse": {
+ // When we send a repair request to another device that understands
+ // it, that device will send a response indicating what it did.
+ let response = args[0];
+ let requestor = getRepairRequestor(response.collection);
+ if (!requestor) {
+ this._log.warn("repairResponse for unknown collection", response);
+ break;
+ }
+ if (!requestor.continueRepairs(response)) {
+ this._log.warn("repairResponse couldn't continue the repair", response);
+ }
+ break;
+ }
+ case "repairRequest": {
+ // Another device has sent us a request to make some repair.
+ let request = args[0];
+ let responder = getRepairResponder(request.collection);
+ if (!responder) {
+ this._log.warn("repairRequest for unknown collection", request);
+ break;
+ }
+ try {
+ if (Async.promiseSpinningly(responder.repair(request, rawCommand))) {
+ // We've started a repair - once that collection has synced it
+ // will write a "response" command and arrange for this repair
+ // request to be removed from the local command list - if we
+ // removed it now we might fail to write a response in cases of
+ // premature shutdown etc.
+ shouldRemoveCommand = false;
+ }
+ } catch (ex) {
+ if (Async.isShutdownException(ex)) {
+ // Let's assume this error was caused by the shutdown, so let
+ // it try again next time.
+ throw ex;
+ }
+ // otherwise there are no second chances - the command is removed
+ // and will not be tried again.
+ // (Note that this shouldn't be hit in the normal case - it's
+ // expected the responder will handle all reasonable failures and
+ // write a response indicating that it couldn't do what was asked.)
+ this._log.error("Failed to handle a repair request", ex);
+ }
+ break;
+ }
default:
- this._log.debug("Received an unknown command: " + command);
+ this._log.warn("Received an unknown command: " + command);
break;
}
// Add the command to the "cleared" commands list
- this._addClientCommand(this.localID, rawCommand)
+ if (shouldRemoveCommand) {
+ this.removeLocalCommand(rawCommand);
+ didRemoveCommand = true;
+ }
}
- this._tracker.addChangedID(this.localID);
+ if (didRemoveCommand) {
+ this._tracker.addChangedID(this.localID);
+ }
if (URIsToDisplay.length) {
this._handleDisplayURIs(URIsToDisplay);
}
return true;
})();
},
--- a/services/sync/moz.build
+++ b/services/sync/moz.build
@@ -16,16 +16,17 @@ EXTRA_COMPONENTS += [
'Weave.js',
]
EXTRA_JS_MODULES['services-sync'] += [
'modules/addonsreconciler.js',
'modules/addonutils.js',
'modules/bookmark_validator.js',
'modules/browserid_identity.js',
+ 'modules/collection_repair.js',
'modules/collection_validator.js',
'modules/engines.js',
'modules/keys.js',
'modules/main.js',
'modules/policies.js',
'modules/record.js',
'modules/resource.js',
'modules/rest.js',
--- a/tools/lint/eslint/modules.json
+++ b/tools/lint/eslint/modules.json
@@ -30,16 +30,17 @@
"clients.js": ["ClientEngine", "ClientsRec"],
"CloudSyncAdapters.jsm": ["Adapters"],
"CloudSyncBookmarks.jsm": ["Bookmarks"],
"CloudSyncBookmarksFolderCache.jsm": ["FolderCache"],
"CloudSyncEventSource.jsm": ["EventSource"],
"CloudSyncLocal.jsm": ["Local"],
"CloudSyncPlacesWrapper.jsm": ["PlacesWrapper"],
"CloudSyncTabs.jsm": ["Tabs"],
+ "collection_repair.js": ["getRepairRequestor", "getAllRepairRequestors", "CollectionRepairRequestor", "getRepairResponder", "CollectionRepairResponder"],
"collection_validator.js": ["CollectionValidator", "CollectionProblemData"],
"Console.jsm": ["console", "ConsoleAPI"],
"constants.js": ["WEAVE_VERSION", "SYNC_API_VERSION", "USER_API_VERSION", "MISC_API_VERSION", "STORAGE_VERSION", "PREFS_BRANCH", "PWDMGR_HOST", "PWDMGR_PASSWORD_REALM", "PWDMGR_PASSPHRASE_REALM", "PWDMGR_KEYBUNDLE_REALM", "DEFAULT_KEYBUNDLE_NAME", "HMAC_INPUT", "SYNC_KEY_ENCODED_LENGTH", "SYNC_KEY_DECODED_LENGTH", "SYNC_KEY_HYPHENATED_LENGTH", "NO_SYNC_NODE_INTERVAL", "MAX_ERROR_COUNT_BEFORE_BACKOFF", "MAX_IGNORE_ERROR_COUNT", "MINIMUM_BACKOFF_INTERVAL", "MAXIMUM_BACKOFF_INTERVAL", "HMAC_EVENT_INTERVAL", "MASTER_PASSWORD_LOCKED_RETRY_INTERVAL", "DEFAULT_BLOCK_PERIOD", "MOBILE_BATCH_SIZE", "DEFAULT_GUID_FETCH_BATCH_SIZE", "DEFAULT_MOBILE_GUID_FETCH_BATCH_SIZE", "DEFAULT_STORE_BATCH_SIZE", "HISTORY_STORE_BATCH_SIZE", "FORMS_STORE_BATCH_SIZE", "PASSWORDS_STORE_BATCH_SIZE", "ADDONS_STORE_BATCH_SIZE", "APPS_STORE_BATCH_SIZE", "DEFAULT_DOWNLOAD_BATCH_SIZE", "SINGLE_USER_THRESHOLD", "MULTI_DEVICE_THRESHOLD", "SCORE_INCREMENT_SMALL", "SCORE_INCREMENT_MEDIUM", "SCORE_INCREMENT_XLARGE", "SCORE_UPDATE_DELAY", "IDLE_OBSERVER_BACK_DELAY", "MAX_UPLOAD_RECORDS", "MAX_UPLOAD_BYTES", "MAX_HISTORY_UPLOAD", "MAX_HISTORY_DOWNLOAD", "NOTIFY_TAB_SENT_TTL_SECS", "STATUS_OK", "SYNC_FAILED", "LOGIN_FAILED", "SYNC_FAILED_PARTIAL", "CLIENT_NOT_CONFIGURED", "STATUS_DISABLED", "MASTER_PASSWORD_LOCKED", "LOGIN_SUCCEEDED", "SYNC_SUCCEEDED", "ENGINE_SUCCEEDED", "LOGIN_FAILED_NO_USERNAME", "LOGIN_FAILED_NO_PASSWORD", "LOGIN_FAILED_NO_PASSPHRASE", "LOGIN_FAILED_NETWORK_ERROR", "LOGIN_FAILED_SERVER_ERROR", "LOGIN_FAILED_INVALID_PASSPHRASE", "LOGIN_FAILED_LOGIN_REJECTED", "METARECORD_DOWNLOAD_FAIL", "VERSION_OUT_OF_DATE", "DESKTOP_VERSION_OUT_OF_DATE", "SETUP_FAILED_NO_PASSPHRASE", "CREDENTIALS_CHANGED", "ABORT_SYNC_COMMAND", "NO_SYNC_NODE_FOUND", "OVER_QUOTA", "PROLONGED_SYNC_FAILURE", "SERVER_MAINTENANCE", "RESPONSE_OVER_QUOTA", "ENGINE_UPLOAD_FAIL", "ENGINE_DOWNLOAD_FAIL", "ENGINE_UNKNOWN_FAIL", "ENGINE_APPLY_FAIL", "ENGINE_METARECORD_DOWNLOAD_FAIL", "ENGINE_METARECORD_UPLOAD_FAIL", "ENGINE_BATCH_INTERRUPTED", "JPAKE_ERROR_CHANNEL", "JPAKE_ERROR_NETWORK", "JPAKE_ERROR_SERVER", "JPAKE_ERROR_TIMEOUT", "JPAKE_ERROR_INTERNAL", "JPAKE_ERROR_INVALID", "JPAKE_ERROR_NODATA", "JPAKE_ERROR_KEYMISMATCH", "JPAKE_ERROR_WRONGMESSAGE", "JPAKE_ERROR_USERABORT", "JPAKE_ERROR_DELAYUNSUPPORTED", "INFO_COLLECTIONS", "INFO_COLLECTION_USAGE", "INFO_COLLECTION_COUNTS", "INFO_QUOTA", "kSyncMasterPasswordLocked", "kSyncWeaveDisabled", "kSyncNetworkOffline", "kSyncBackoffNotMet", "kFirstSyncChoiceNotMade", "FIREFOX_ID", "FENNEC_ID", "SEAMONKEY_ID", "TEST_HARNESS_ID", "MIN_PP_LENGTH", "MIN_PASS_LENGTH", "DEVICE_TYPE_DESKTOP", "DEVICE_TYPE_MOBILE", "SQLITE_MAX_VARIABLE_NUMBER"],
"Constants.jsm": ["Roles", "Events", "Relations", "Filters", "States", "Prefilters"],
"ContactDB.jsm": ["ContactDB", "DB_NAME", "STORE_NAME", "SAVED_GETALL_STORE_NAME", "REVISION_STORE", "DB_VERSION"],
"content-server.jsm": ["init"],
"content.jsm": ["registerContentFrame"],
"ContentCrashHandlers.jsm": ["TabCrashHandler", "PluginCrashReporter", "UnsubmittedCrashHandler"],