--- a/browser/base/content/browser-sync.js
+++ b/browser/base/content/browser-sync.js
@@ -278,17 +278,19 @@ var gSync = {
let url = await fxAccounts.promiseAccountsManageDevicesURI(entryPoint);
switchToTabHavingURI(url, true, {
replaceQueryString: true,
triggeringPrincipal: Services.scriptSecurityManager.getSystemPrincipal(),
});
},
sendTabToDevice(url, clientId, title) {
- Weave.Service.clientsEngine.sendURIToClientForDisplay(url, clientId, title);
+ Weave.Service.clientsEngine.sendURIToClientForDisplay(url, clientId, title).catch(e => {
+ console.error("Could not send tab to device", e);
+ });
},
populateSendTabToDevicesMenu(devicesPopup, url, title, createDeviceNodeFn) {
if (!createDeviceNodeFn) {
createDeviceNodeFn = (clientId, name, clientType) => {
let eltName = name ? "menuitem" : "menuseparator";
return document.createElement(eltName);
};
@@ -436,17 +438,16 @@ var gSync = {
doSync() {
if (!UIState.isReady()) {
return;
}
const state = UIState.get();
if (state.status == UIState.STATUS_SIGNED_IN) {
setTimeout(() => Weave.Service.errorHandler.syncAndReportErrors(), 0);
}
- Services.obs.notifyObservers(null, "cloudsync:user-sync");
},
openPrefs(entryPoint = "syncbutton", origin = undefined) {
window.openPreferences("paneSync", { origin, urlParams: { entrypoint: entryPoint } });
},
openSyncedTabsPanel() {
let placement = CustomizableUI.getPlacementOfWidget("sync-button");
--- a/services/common/async.js
+++ b/services/common/async.js
@@ -112,17 +112,17 @@ this.Async = {
/**
* Check if the app is still ready (not quitting). Returns true, or throws an
* exception if not ready.
*/
checkAppReady: function checkAppReady() {
// Watch for app-quit notification to stop any sync calls
Services.obs.addObserver(function onQuitApplication() {
Services.obs.removeObserver(onQuitApplication, "quit-application");
- Async.checkAppReady = function() {
+ Async.checkAppReady = Async.promiseYield = function() {
let exception = Components.Exception("App. Quitting", Cr.NS_ERROR_ABORT);
exception.appIsShuttingDown = true;
throw exception;
};
}, "quit-application");
// In the common case, checkAppReady just returns true
return (Async.checkAppReady = function() { return true; })();
},
@@ -226,9 +226,37 @@ this.Async = {
let cb = Async.makeSpinningCallback();
promise.then(result => {
cb(null, result);
}, err => {
cb(err || new Error("Promise rejected without explicit error"));
});
return cb.wait();
},
+
+ /**
+ * A "tight loop" of promises can still lock up the browser for some time.
+ * Periodically waiting for a promise returned by this function will solve
+ * that.
+ * You should probably not use this method directly and instead use jankYielder
+ * below.
+ * Some reference here:
+ * - https://gist.github.com/jesstelford/bbb30b983bddaa6e5fef2eb867d37678
+ * - https://bugzilla.mozilla.org/show_bug.cgi?id=1094248
+ */
+ promiseYield() {
+ return new Promise(resolve => {
+ Services.tm.currentThread.dispatch(resolve, Ci.nsIThread.DISPATCH_NORMAL);
+ });
+ },
+
+ // Returns a method that yields every X calls.
+ // Common case is calling the returned method every iteration in a loop.
+ jankYielder(yieldEvery = 50) {
+ let iterations = 0;
+ return async () => {
+ Async.checkAppReady(); // Let it throw!
+ if (++iterations % yieldEvery === 0) {
+ await Async.promiseYield();
+ }
+ }
+ }
};
--- a/services/sync/modules/SyncedTabs.jsm
+++ b/services/sync/modules/SyncedTabs.jsm
@@ -136,48 +136,42 @@ let SyncedTabsInternal = {
// filter it if they care.
ntabs += clientRepr.tabs.length;
result.push(clientRepr);
}
log.info(`Final tab list has ${result.length} clients with ${ntabs} tabs.`);
return result;
},
- syncTabs(force) {
+ async syncTabs(force) {
if (!force) {
// Don't bother refetching tabs if we already did so recently
let lastFetch = Preferences.get("services.sync.lastTabFetch", 0);
let now = Math.floor(Date.now() / 1000);
if (now - lastFetch < TABS_FRESH_ENOUGH_INTERVAL) {
log.info("_refetchTabs was done recently, do not doing it again");
- return Promise.resolve(false);
+ return false;
}
}
// If Sync isn't configured don't try and sync, else we will get reports
// of a login failure.
if (Weave.Status.checkSetup() == Weave.CLIENT_NOT_CONFIGURED) {
log.info("Sync client is not configured, so not attempting a tab sync");
- return Promise.resolve(false);
+ return false;
}
// Ask Sync to just do the tabs engine if it can.
- // Sync is currently synchronous, so do it after an event-loop spin to help
- // keep the UI responsive.
- return new Promise((resolve, reject) => {
- Services.tm.dispatchToMainThread(() => {
- try {
- log.info("Doing a tab sync.");
- Weave.Service.sync(["tabs"]);
- resolve(true);
- } catch (ex) {
- log.error("Sync failed", ex);
- reject(ex);
- }
- });
- });
+ try {
+ log.info("Doing a tab sync.");
+ await Weave.Service.sync(["tabs"]);
+ return true;
+ } catch (ex) {
+ log.error("Sync failed", ex);
+ throw ex;
+ }
},
observe(subject, topic, data) {
log.trace(`observed topic=${topic}, data=${data}, subject=${subject}`);
switch (topic) {
case "weave:engine:sync:finish":
if (data != "tabs") {
return;
--- a/services/sync/modules/addonsreconciler.js
+++ b/services/sync/modules/addonsreconciler.js
@@ -53,24 +53,24 @@ this.EXPORTED_SYMBOLS = ["AddonsReconcil
*
* An instance of this is bound to an AddonsEngine instance. In reality, it
* likely exists as a singleton. To AddonsEngine, it functions as a store and
* an entity which emits events for tracking.
*
* The usage pattern for instances of this class is:
*
* let reconciler = new AddonsReconciler();
- * reconciler.loadState(null, function(error) { ... });
+ * await reconciler.ensureStateLoaded();
*
* // At this point, your instance should be ready to use.
*
* When you are finished with the instance, please call:
*
* reconciler.stopListening();
- * reconciler.saveState(...);
+ * await reconciler.saveState(...);
*
* There are 2 classes of listeners in the AddonManager: AddonListener and
* InstallListener. This class is a listener for both (member functions just
* get called directly).
*
* When an add-on is installed, listeners are called in the following order:
*
* IL.onInstallStarted, AL.onInstalling, IL.onInstallEnded, AL.onInstalled
@@ -120,23 +120,16 @@ this.AddonsReconciler = function AddonsR
Svc.Obs.add("xpcom-shutdown", this.stopListening, this);
};
AddonsReconciler.prototype = {
/** Flag indicating whether we are listening to AddonManager events. */
_listening: false,
/**
- * Whether state has been loaded from a file.
- *
- * State is loaded on demand if an operation requires it.
- */
- _stateLoaded: false,
-
- /**
* Define this as false if the reconciler should not persist state
* to disk when handling events.
*
* This allows test code to avoid spinning to write during observer
* notifications and xpcom shutdown, which appears to cause hangs on WinXP
* (Bug 873861).
*/
_shouldPersist: true,
@@ -166,91 +159,77 @@ AddonsReconciler.prototype = {
_listeners: [],
/**
* Accessor for add-ons in this object.
*
* Returns an object mapping add-on IDs to objects containing metadata.
*/
get addons() {
- this._ensureStateLoaded();
return this._addons;
},
+ async ensureStateLoaded() {
+ if (!this._promiseStateLoaded) {
+ this._promiseStateLoaded = this.loadState();
+ }
+ return this._promiseStateLoaded;
+ },
+
/**
* Load reconciler state from a file.
*
* The path is relative to the weave directory in the profile. If no
* path is given, the default one is used.
*
* If the file does not exist or there was an error parsing the file, the
* state will be transparently defined as empty.
*
- * @param path
+ * @param file
* Path to load. ".json" is appended automatically. If not defined,
* a default path will be consulted.
- * @param callback
- * Callback to be executed upon file load. The callback receives a
- * truthy error argument signifying whether an error occurred and a
- * boolean indicating whether data was loaded.
*/
- loadState: function loadState(path, callback) {
- let file = path || DEFAULT_STATE_FILE;
- Utils.jsonLoad(file, this, function(json) {
- this._addons = {};
- this._changes = [];
+ async loadState(file = DEFAULT_STATE_FILE) {
+ let json = await Utils.jsonLoad(file, this);
+ this._addons = {};
+ this._changes = [];
- if (!json) {
- this._log.debug("No data seen in loaded file: " + file);
- if (callback) {
- callback(null, false);
- }
-
- return;
- }
+ if (!json) {
+ this._log.debug("No data seen in loaded file: " + file);
+ return false;
+ }
- let version = json.version;
- if (!version || version != 1) {
- this._log.error("Could not load JSON file because version not " +
- "supported: " + version);
- if (callback) {
- callback(null, false);
- }
-
- return;
- }
+ let version = json.version;
+ if (!version || version != 1) {
+ this._log.error("Could not load JSON file because version not " +
+ "supported: " + version);
+ return false;
+ }
- this._addons = json.addons;
- for (let id in this._addons) {
- let record = this._addons[id];
- record.modified = new Date(record.modified);
- }
+ this._addons = json.addons;
+ for (let id in this._addons) {
+ let record = this._addons[id];
+ record.modified = new Date(record.modified);
+ }
- for (let [time, change, id] of json.changes) {
- this._changes.push([new Date(time), change, id]);
- }
+ for (let [time, change, id] of json.changes) {
+ this._changes.push([new Date(time), change, id]);
+ }
- if (callback) {
- callback(null, true);
- }
- });
+ return true;
},
/**
* Saves the current state to a file in the local profile.
*
- * @param path
+ * @param file
* String path in profile to save to. If not defined, the default
* will be used.
- * @param callback
- * Function to be invoked on save completion. No parameters will be
- * passed to callback.
*/
- saveState: function saveState(path, callback) {
- let file = path || DEFAULT_STATE_FILE;
+ async saveState(file = DEFAULT_STATE_FILE) {
let state = {version: 1, addons: {}, changes: []};
for (let [id, record] of Object.entries(this._addons)) {
state.addons[id] = {};
for (let [k, v] of Object.entries(record)) {
if (k == "modified") {
state.addons[id][k] = v.getTime();
} else {
@@ -259,17 +238,17 @@ AddonsReconciler.prototype = {
}
}
for (let [time, change, id] of this._changes) {
state.changes.push([time.getTime(), change, id]);
}
this._log.info("Saving reconciler state to file: " + file);
- Utils.jsonSave(file, this, state, callback);
+ await Utils.jsonSave(file, this, state);
},
/**
* Registers a change listener with this instance.
*
* Change listeners are called every time a change is recorded. The listener
* is an object with the function "changeListener" that takes 3 arguments,
* the Date at which the change happened, the type of change (a CHANGE_*
@@ -336,93 +315,86 @@ AddonsReconciler.prototype = {
AddonManager.removeInstallListener(this);
AddonManager.removeAddonListener(this);
this._listening = false;
},
/**
* Refreshes the global state of add-ons by querying the AddonManager.
*/
- refreshGlobalState: function refreshGlobalState(callback) {
+ async refreshGlobalState() {
this._log.info("Refreshing global state from AddonManager.");
- this._ensureStateLoaded();
let installs;
+ let addons = await AddonManager.getAllAddons();
- AddonManager.getAllAddons(addons => {
- let ids = {};
+ let ids = {};
- for (let addon of addons) {
- ids[addon.id] = true;
- this.rectifyStateFromAddon(addon);
+ for (let addon of addons) {
+ ids[addon.id] = true;
+ this.rectifyStateFromAddon(addon);
+ }
+
+ // Look for locally-defined add-ons that no longer exist and update their
+ // record.
+ for (let [id, addon] of Object.entries(this._addons)) {
+ if (id in ids) {
+ continue;
}
- // Look for locally-defined add-ons that no longer exist and update their
- // record.
- for (let [id, addon] of Object.entries(this._addons)) {
- if (id in ids) {
- continue;
- }
+ // If the id isn't in ids, it means that the add-on has been deleted or
+ // the add-on is in the process of being installed. We detect the
+ // latter by seeing if an AddonInstall is found for this add-on.
- // If the id isn't in ids, it means that the add-on has been deleted or
- // the add-on is in the process of being installed. We detect the
- // latter by seeing if an AddonInstall is found for this add-on.
-
- if (!installs) {
- let cb = Async.makeSyncCallback();
- AddonManager.getAllInstalls(cb);
- installs = Async.waitForSyncCallback(cb);
- }
+ if (!installs) {
+ installs = await AddonManager.getAllInstalls();
+ }
- let installFound = false;
- for (let install of installs) {
- if (install.addon && install.addon.id == id &&
- install.state == AddonManager.STATE_INSTALLED) {
-
- installFound = true;
- break;
- }
- }
+ let installFound = false;
+ for (let install of installs) {
+ if (install.addon && install.addon.id == id &&
+ install.state == AddonManager.STATE_INSTALLED) {
- if (installFound) {
- continue;
- }
-
- if (addon.installed) {
- addon.installed = false;
- this._log.debug("Adding change because add-on not present in " +
- "Add-on Manager: " + id);
- this._addChange(new Date(), CHANGE_UNINSTALLED, addon);
+ installFound = true;
+ break;
}
}
- // See note for _shouldPersist.
- if (this._shouldPersist) {
- this.saveState(null, callback);
- } else {
- callback();
+ if (installFound) {
+ continue;
}
- });
+
+ if (addon.installed) {
+ addon.installed = false;
+ this._log.debug("Adding change because add-on not present in " +
+ "Add-on Manager: " + id);
+ this._addChange(new Date(), CHANGE_UNINSTALLED, addon);
+ }
+ }
+
+ // See note for _shouldPersist.
+ if (this._shouldPersist) {
+ await this.saveState();
+ }
},
/**
* Rectifies the state of an add-on from an Addon instance.
*
* This basically says "given an Addon instance, assume it is truth and
* apply changes to the local state to reflect it."
*
* This function could result in change listeners being called if the local
* state differs from the passed add-on's state.
*
* @param addon
* Addon instance being updated.
*/
- rectifyStateFromAddon: function rectifyStateFromAddon(addon) {
+ rectifyStateFromAddon(addon) {
this._log.debug(`Rectifying state for addon ${addon.name} (version=${addon.version}, id=${addon.id})`);
- this._ensureStateLoaded();
let id = addon.id;
let enabled = !addon.userDisabled;
let guid = addon.syncGUID;
let now = new Date();
if (!(id in this._addons)) {
let record = {
@@ -499,98 +471,74 @@ AddonsReconciler.prototype = {
*
* This will return an array of arrays. Each entry in the array has the
* elements [date, change_type, id], where
*
* date - Date instance representing when the change occurred.
* change_type - One of CHANGE_* constants.
* id - ID of add-on that changed.
*/
- getChangesSinceDate: function getChangesSinceDate(date) {
- this._ensureStateLoaded();
-
+ getChangesSinceDate(date) {
let length = this._changes.length;
for (let i = 0; i < length; i++) {
if (this._changes[i][0] >= date) {
return this._changes.slice(i);
}
}
return [];
},
/**
* Prunes all recorded changes from before the specified Date.
*
* @param date
* Entries older than this Date will be removed.
*/
- pruneChangesBeforeDate: function pruneChangesBeforeDate(date) {
- this._ensureStateLoaded();
-
+ pruneChangesBeforeDate(date) {
this._changes = this._changes.filter(function test_age(change) {
return change[0] >= date;
});
},
/**
* Obtains the set of all known Sync GUIDs for add-ons.
- *
- * @return Object with guids as keys and values of true.
*/
- getAllSyncGUIDs: function getAllSyncGUIDs() {
+ getAllSyncGUIDs() {
let result = {};
for (let id in this.addons) {
result[id] = true;
}
return result;
},
/**
* Obtain the add-on state record for an add-on by Sync GUID.
*
* If the add-on could not be found, returns null.
*
* @param guid
* Sync GUID of add-on to retrieve.
- * @return Object on success on null on failure.
*/
- getAddonStateFromSyncGUID: function getAddonStateFromSyncGUID(guid) {
+ getAddonStateFromSyncGUID(guid) {
for (let id in this.addons) {
let addon = this.addons[id];
if (addon.guid == guid) {
return addon;
}
}
return null;
},
/**
- * Ensures that state is loaded before continuing.
- *
- * This is called internally by anything that accesses the internal data
- * structures. It effectively just-in-time loads serialized state.
- */
- _ensureStateLoaded: function _ensureStateLoaded() {
- if (this._stateLoaded) {
- return;
- }
-
- let cb = Async.makeSpinningCallback();
- this.loadState(null, cb);
- cb.wait();
- this._stateLoaded = true;
- },
-
- /**
* Handler that is invoked as part of the AddonManager listeners.
*/
- _handleListener: function _handlerListener(action, addon, requiresRestart) {
+ _handleListener(action, addon, requiresRestart) {
// Since this is called as an observer, we explicitly trap errors and
// log them to ourselves so we don't see errors reported elsewhere.
try {
let id = addon.id;
this._log.debug("Add-on change: " + action + " to " + id);
// We assume that every event for non-restartless add-ons is
// followed by another event and that this follow-up event is the most
@@ -624,19 +572,17 @@ AddonsReconciler.prototype = {
this._log.debug("Adding change because of uninstall listener: " +
id);
this._addChange(now, CHANGE_UNINSTALLED, record);
}
}
// See note for _shouldPersist.
if (this._shouldPersist) {
- let cb = Async.makeSpinningCallback();
- this.saveState(null, cb);
- cb.wait();
+ Async.promiseSpinningly(this.saveState());
}
} catch (ex) {
this._log.warn("Exception", ex);
}
},
// AddonListeners
onEnabling: function onEnabling(addon, requiresRestart) {
--- a/services/sync/modules/addonutils.js
+++ b/services/sync/modules/addonutils.js
@@ -148,53 +148,52 @@ AddonUtilsInternal.prototype = {
} catch (ex) {
this._log.error("Error installing add-on", ex);
cb(ex, null);
}
});
},
/**
- * Uninstalls the Addon instance and invoke a callback when it is done.
+ * Uninstalls the addon instance.
*
* @param addon
* Addon instance to uninstall.
- * @param cb
- * Function to be invoked when uninstall has finished. It receives a
- * truthy value signifying error and the add-on which was uninstalled.
*/
- uninstallAddon: function uninstallAddon(addon, cb) {
- let listener = {
- onUninstalling(uninstalling, needsRestart) {
- if (addon.id != uninstalling.id) {
- return;
- }
+ async uninstallAddon(addon) {
+ return new Promise(res => {
+ let listener = {
+ onUninstalling(uninstalling, needsRestart) {
+ if (addon.id != uninstalling.id) {
+ return;
+ }
- // We assume restartless add-ons will send the onUninstalled event
- // soon.
- if (!needsRestart) {
- return;
- }
+ // We assume restartless add-ons will send the onUninstalled event
+ // soon.
+ if (!needsRestart) {
+ return;
+ }
- // For non-restartless add-ons, we issue the callback on uninstalling
- // because we will likely never see the uninstalled event.
- AddonManager.removeAddonListener(listener);
- cb(null, addon);
- },
- onUninstalled(uninstalled) {
- if (addon.id != uninstalled.id) {
- return;
+ // For non-restartless add-ons, we issue the callback on uninstalling
+ // because we will likely never see the uninstalled event.
+ AddonManager.removeAddonListener(listener);
+ res(addon);
+ },
+ onUninstalled(uninstalled) {
+ if (addon.id != uninstalled.id) {
+ return;
+ }
+
+ AddonManager.removeAddonListener(listener);
+ res(addon);
}
-
- AddonManager.removeAddonListener(listener);
- cb(null, addon);
- }
- };
- AddonManager.addAddonListener(listener);
- addon.uninstall();
+ };
+ AddonManager.addAddonListener(listener);
+ addon.uninstall();
+ });
},
/**
* Installs multiple add-ons specified by metadata.
*
* The first argument is an array of objects. Each object must have the
* following keys:
*
--- a/services/sync/modules/bookmark_repair.js
+++ b/services/sync/modules/bookmark_repair.js
@@ -193,17 +193,17 @@ class BookmarkRepairRequestor extends Co
engine.toFetch = Array.from(new Set(toFetch));
return true;
}
/* See if the repairer is willing and able to begin a repair process given
the specified validation information.
Returns true if a repair was started and false otherwise.
*/
- startRepairs(validationInfo, flowID) {
+ async startRepairs(validationInfo, flowID) {
if (this._currentState != STATE.NOT_REPAIRING) {
log.info(`Can't start a repair - repair with ID ${this._flowID} is already in progress`);
return false;
}
let ids = this.getProblemIDs(validationInfo);
if (ids.size > MAX_REQUESTED_IDS) {
log.info("Not starting a repair as there are over " + MAX_REQUESTED_IDS + " problems");
@@ -233,32 +233,32 @@ class BookmarkRepairRequestor extends Co
return this.continueRepairs();
}
/* Work out what state our current repair request is in, and whether it can
proceed to a new state.
Returns true if we could continue the repair - even if the state didn't
actually move. Returns false if we aren't actually repairing.
*/
- continueRepairs(response = null) {
+ async continueRepairs(response = null) {
// Note that "ABORTED" and "FINISHED" should never be current when this
// function returns - this function resets to NOT_REPAIRING in those cases.
if (this._currentState == STATE.NOT_REPAIRING) {
return false;
}
let state, newState;
let abortReason;
// we loop until the state doesn't change - but enforce a max of 10 times
// to prevent errors causing infinite loops.
for (let i = 0; i < 10; i++) {
state = this._currentState;
log.info("continueRepairs starting with state", state);
try {
- newState = this._continueRepairs(state, response);
+ newState = await this._continueRepairs(state, response);
log.info("continueRepairs has next state", newState);
} catch (ex) {
if (!(ex instanceof AbortRepairError)) {
throw ex;
}
log.info(`Repair has been aborted: ${ex.reason}`);
newState = STATE.ABORTED;
abortReason = ex.reason;
@@ -289,17 +289,17 @@ class BookmarkRepairRequestor extends Co
this.service.recordTelemetryEvent("repair", object, undefined, extra);
// reset our state and flush our prefs.
this.prefs.resetBranch();
Services.prefs.savePrefFile(null); // flush prefs.
}
return true;
}
- _continueRepairs(state, response = null) {
+ async _continueRepairs(state, response = null) {
if (this.anyClientsRepairing(this._flowID)) {
throw new AbortRepairError("other clients repairing");
}
switch (state) {
case STATE.SENT_REQUEST:
case STATE.SENT_SECOND_REQUEST:
let flowID = this._flowID;
let clientID = this._currentRemoteClient;
@@ -321,17 +321,17 @@ class BookmarkRepairRequestor extends Co
state = STATE.NEED_NEW_CLIENT;
let extra = {
deviceID: this.service.identity.hashedDeviceID(clientID),
flowID,
}
this.service.recordTelemetryEvent("repair", "abandon", "missing", extra);
break;
}
- if (this._isCommandPending(clientID, flowID)) {
+ if ((await this._isCommandPending(clientID, flowID))) {
// So the command we previously sent is still queued for the client
// (ie, that client is yet to have synced). Let's see if we should
// give up on that client.
let lastRequestSent = this.prefs.get(PREF.REPAIR_WHEN);
let timeLeft = lastRequestSent + RESPONSE_INTERVAL_TIMEOUT - this._now();
if (timeLeft <= 0) {
log.info(`previous request to client ${clientID} is pending, but has taken too long`);
state = STATE.NEED_NEW_CLIENT;
@@ -351,34 +351,34 @@ class BookmarkRepairRequestor extends Co
// it another go (as that client may have cleared the command but is yet
// to complete the sync)
// XXX - note that this is no longer true - the responders don't remove
// their command until they have written a response. This might mean
// we could drop the entire STATE.SENT_SECOND_REQUEST concept???
if (state == STATE.SENT_REQUEST) {
log.info(`previous request to client ${clientID} was removed - trying a second time`);
state = STATE.SENT_SECOND_REQUEST;
- this._writeRequest(clientID);
+ await this._writeRequest(clientID);
} else {
// this was the second time around, so give up on this client
log.info(`previous 2 requests to client ${clientID} were removed - need a new client`);
state = STATE.NEED_NEW_CLIENT;
}
break;
case STATE.NEED_NEW_CLIENT:
// We need to find a new client to request.
let newClientID = this._findNextClient();
if (!newClientID) {
state = STATE.FINISHED;
break;
}
this._addToPreviousRemoteClients(this._currentRemoteClient);
this._currentRemoteClient = newClientID;
- this._writeRequest(newClientID);
+ await this._writeRequest(newClientID);
state = STATE.SENT_REQUEST;
break;
case STATE.ABORTED:
break; // our caller will take the abort action.
case STATE.FINISHED:
break;
@@ -429,32 +429,32 @@ class BookmarkRepairRequestor extends Co
numIDs: response.ids.length.toString(),
}
this.service.recordTelemetryEvent("repair", "response", "upload", extra);
return state;
}
/* Issue a repair request to a specific client.
*/
- _writeRequest(clientID) {
+ async _writeRequest(clientID) {
log.trace("writing repair request to client", clientID);
let ids = this._currentIDs;
if (!ids) {
throw new AbortRepairError("Attempting to write a request, but there are no IDs");
}
let flowID = this._flowID;
// Post a command to that client.
let request = {
collection: "bookmarks",
request: "upload",
requestor: this.service.clientsEngine.localID,
ids,
flowID,
}
- this.service.clientsEngine.sendCommand("repairRequest", [request], clientID, { flowID });
+ await this.service.clientsEngine.sendCommand("repairRequest", [request], clientID, { flowID });
this.prefs.set(PREF.REPAIR_WHEN, Math.floor(this._now()));
// record telemetry about this
let extra = {
deviceID: this.service.identity.hashedDeviceID(clientID),
flowID,
numIDs: ids.length.toString(),
}
this.service.recordTelemetryEvent("repair", "request", "upload", extra);
@@ -481,21 +481,22 @@ class BookmarkRepairRequestor extends Co
_isSuitableClient(client) {
// filter only desktop firefox running > 53 (ie, any 54)
return (client.type == DEVICE_TYPE_DESKTOP &&
Services.vc.compare(client.version, 53) > 0);
}
/* Is our command still in the "commands" queue for the specific client?
*/
- _isCommandPending(clientID, flowID) {
+ async _isCommandPending(clientID, flowID) {
// getClientCommands() is poorly named - it's only outgoing commands
// from us we have yet to write. For our purposes, we want to check
// them and commands previously written (which is in .commands)
- let commands = [...this.service.clientsEngine.getClientCommands(clientID),
+ let clientCommands = await this.service.clientsEngine.getClientCommands(clientID);
+ let commands = [...clientCommands,
...this.service.clientsEngine.remoteClient(clientID).commands || []];
for (let command of commands) {
if (command.command != "repairRequest" || command.args.length != 1) {
continue;
}
let arg = command.args[0];
if (arg.collection == "bookmarks" && arg.request == "upload" &&
arg.flowID == flowID) {
@@ -554,18 +555,18 @@ class BookmarkRepairRequestor extends Co
}
}
/* An object that responds to repair requests initiated by some other device.
*/
class BookmarkRepairResponder extends CollectionRepairResponder {
async repair(request, rawCommand) {
if (request.request != "upload") {
- this._abortRepair(request, rawCommand,
- `Don't understand request type '${request.request}'`);
+ await this._abortRepair(request, rawCommand,
+ `Don't understand request type '${request.request}'`);
return;
}
// Note that we don't try and guard against multiple repairs being in
// progress as we don't do anything too smart that could cause problems,
// but just upload items. If we get any smarter we should re-think this
// (but when we do, note that checking this._currentState isn't enough as
// this responder is not a singleton)
@@ -600,28 +601,28 @@ class BookmarkRepairResponder extends Co
// end up doing the upload for some obscure reason.
let eventExtra = {
flowID: request.flowID,
numIDs: this._currentState.ids.length.toString(),
};
this.service.recordTelemetryEvent("repairResponse", "uploading", undefined, eventExtra);
} else {
// We were unable to help with the repair, so report that we are done.
- this._finishRepair();
+ await this._finishRepair();
}
} catch (ex) {
if (Async.isShutdownException(ex)) {
// this repair request will be tried next time.
throw ex;
}
// On failure, we still write a response so the requestor knows to move
// on, but we record the failure reason in telemetry.
log.error("Failed to respond to the repair request", ex);
this._currentState.failureReason = SyncTelemetry.transformError(ex);
- this._finishRepair();
+ await this._finishRepair();
}
}
async _fetchItemsToUpload(request) {
let toUpload = new Set(); // items we will upload.
let toDelete = new Set(); // items we will delete.
let requested = new Set(request.ids);
@@ -685,51 +686,51 @@ class BookmarkRepairResponder extends Co
}
onUploaded(subject, data) {
if (data != "bookmarks") {
return;
}
Svc.Obs.remove("weave:engine:sync:uploaded", this.onUploaded, this);
log.debug(`bookmarks engine has uploaded stuff - creating a repair response`);
- this._finishRepair();
+ Async.promiseSpinningly(this._finishRepair());
}
- _finishRepair() {
+ async _finishRepair() {
let clientsEngine = this.service.clientsEngine;
let flowID = this._currentState.request.flowID;
let response = {
request: this._currentState.request.request,
collection: "bookmarks",
clientID: clientsEngine.localID,
flowID,
ids: this._currentState.ids,
}
let clientID = this._currentState.request.requestor;
- clientsEngine.sendCommand("repairResponse", [response], clientID, { flowID });
+ await clientsEngine.sendCommand("repairResponse", [response], clientID, { flowID });
// and nuke the request from our client.
- clientsEngine.removeLocalCommand(this._currentState.rawCommand);
+ await clientsEngine.removeLocalCommand(this._currentState.rawCommand);
let eventExtra = {
flowID,
numIDs: response.ids.length.toString(),
}
if (this._currentState.failureReason) {
// *sob* - recording this in "extra" means the value must be a string of
// max 85 chars.
eventExtra.failureReason = JSON.stringify(this._currentState.failureReason).substring(0, 85)
this.service.recordTelemetryEvent("repairResponse", "failed", undefined, eventExtra);
} else {
this.service.recordTelemetryEvent("repairResponse", "finished", undefined, eventExtra);
}
this._currentState = null;
}
- _abortRepair(request, rawCommand, why) {
+ async _abortRepair(request, rawCommand, why) {
log.warn(`aborting repair request: ${why}`);
- this.service.clientsEngine.removeLocalCommand(rawCommand);
+ await this.service.clientsEngine.removeLocalCommand(rawCommand);
// record telemetry for this.
let eventExtra = {
flowID: request.flowID,
reason: why,
};
this.service.recordTelemetryEvent("repairResponse", "aborted", undefined, eventExtra);
// We could also consider writing a response here so the requestor can take
// some immediate action rather than timing out, but we abort only in cases
--- a/services/sync/modules/bookmark_validator.js
+++ b/services/sync/modules/bookmark_validator.js
@@ -2,20 +2,22 @@
* License, v. 2.0. If a copy of the MPL was not distributed with this file,
* You can obtain one at http://mozilla.org/MPL/2.0/. */
"use strict";
const { interfaces: Ci, utils: Cu } = Components;
Cu.import("resource://gre/modules/Services.jsm");
-Cu.import("resource://gre/modules/Timer.jsm");
Cu.import("resource://gre/modules/XPCOMUtils.jsm");
Cu.import("resource://services-common/utils.js");
+XPCOMUtils.defineLazyModuleGetter(this, "Async",
+ "resource://services-common/async.js");
+
XPCOMUtils.defineLazyModuleGetter(this, "PlacesUtils",
"resource://gre/modules/PlacesUtils.jsm");
XPCOMUtils.defineLazyModuleGetter(this, "PlacesSyncUtils",
"resource://gre/modules/PlacesSyncUtils.jsm");
Cu.importGlobalProperties(["URLSearchParams"]);
@@ -286,21 +288,19 @@ class BookmarkValidator {
}
async createClientRecordsFromTree(clientTree) {
// Iterate over the treeNode, converting it to something more similar to what
// the server stores.
let records = [];
let recordsByGuid = new Map();
let syncedRoots = SYNCED_ROOTS;
- let yieldCounter = 0;
+ let maybeYield = Async.jankYielder();
async function traverse(treeNode, synced) {
- if (++yieldCounter % 50 === 0) {
- await new Promise(resolve => setTimeout(resolve, 50));
- }
+ await maybeYield();
if (!synced) {
synced = syncedRoots.includes(treeNode.guid);
} else if (isNodeIgnored(treeNode)) {
synced = false;
}
let guid = PlacesSyncUtils.bookmarks.guidToSyncId(treeNode.guid);
let itemType = "item";
treeNode.ignored = !synced;
@@ -401,18 +401,19 @@ class BookmarkValidator {
let deletedRecords = [];
let folders = [];
let problemData = new BookmarkProblemData();
let resultRecords = [];
- let yieldCounter = 0;
+ let maybeYield = Async.jankYielder();
for (let record of serverRecords) {
+ await maybeYield();
if (!record.id) {
++problemData.missingIDs;
continue;
}
if (record.deleted) {
deletedItemIds.add(record.id);
} else if (idToRecord.has(record.id)) {
problemData.duplicates.push(record.id);
@@ -440,19 +441,16 @@ class BookmarkValidator {
// The children array stores special guids as their local guid values,
// e.g. 'menu________' instead of 'menu', but all other parts of the
// serverside bookmark info stores it as the special value ('menu').
record.childGUIDs = record.children;
record.children = record.children.map(childID => {
return PlacesSyncUtils.bookmarks.guidToSyncId(childID);
});
}
- if (++yieldCounter % 50 === 0) {
- await new Promise(resolve => setTimeout(resolve, 50));
- }
}
for (let deletedId of deletedItemIds) {
let record = idToRecord.get(deletedId);
if (record && !record.isDeleted) {
deletedRecords.push(record);
record.isDeleted = true;
}
--- a/services/sync/modules/browserid_identity.js
+++ b/services/sync/modules/browserid_identity.js
@@ -277,27 +277,27 @@ this.BrowserIDManager.prototype = {
if (!firstLogin) {
// We still want to trigger these even if it isn't our first login.
// Note that the promise returned by `initializeWithCurrentIdentity`
// is resolved at the start of authentication, but we don't want to fire
// this event or start the next sync until after authentication is done
// (which is signaled by `this.whenReadyToAuthenticate.promise` resolving).
this.whenReadyToAuthenticate.promise.then(() => {
Services.obs.notifyObservers(null, "weave:service:setup-complete");
- return new Promise(resolve => { Weave.Utils.nextTick(resolve, null); })
+ return Async.promiseYield();
}).then(() => {
- Weave.Service.sync();
+ return Weave.Service.sync();
}).catch(e => {
this._log.warn("Failed to trigger setup complete notification", e);
});
}
} break;
case fxAccountsCommon.ONLOGOUT_NOTIFICATION:
- Weave.Service.startOver();
+ Async.promiseSpinningly(Weave.Service.startOver());
// startOver will cause this instance to be thrown away, so there's
// nothing else to do.
break;
case fxAccountsCommon.ON_ACCOUNT_STATE_CHANGE_NOTIFICATION:
// throw away token and fetch a new one
this.resetCredentials();
this._ensureValidToken().catch(err =>
--- a/services/sync/modules/collection_repair.js
+++ b/services/sync/modules/collection_repair.js
@@ -87,31 +87,31 @@ class CollectionRepairRequestor {
The validation info as returned by the collection's validator.
@param flowID {String}
A guid that uniquely identifies this repair process for this
collection, and which should be sent to any requestors and
reported in telemetry.
*/
- startRepairs(validationInfo, flowID) {
+ async startRepairs(validationInfo, flowID) {
throw new Error("not implemented");
}
/* Work out what state our current repair request is in, and whether it can
proceed to a new state.
Returns true if we could continue the repair - even if the state didn't
actually move. Returns false if we aren't actually repairing.
@param responseInfo {Object}
An optional response to a previous repair request, as returned
by a remote repair responder.
*/
- continueRepairs(responseInfo = null) {
+ async continueRepairs(responseInfo = null) {
throw new Error("not implemented");
}
}
class CollectionRepairResponder {
constructor(service = null) {
// allow service to be mocked in tests.
this.service = service || Weave.Service;
--- a/services/sync/modules/collection_validator.js
+++ b/services/sync/modules/collection_validator.js
@@ -1,15 +1,19 @@
/* This Source Code Form is subject to the terms of the Mozilla Public
* License, v. 2.0. If a copy of the MPL was not distributed with this file,
* You can obtain one at http://mozilla.org/MPL/2.0/. */
"use strict";
const Cu = Components.utils;
+Cu.import("resource://gre/modules/XPCOMUtils.jsm");
+
+XPCOMUtils.defineLazyModuleGetter(this, "Async",
+ "resource://services-common/async.js");
this.EXPORTED_SYMBOLS = ["CollectionValidator", "CollectionProblemData"];
class CollectionProblemData {
constructor() {
this.missingIDs = 0;
this.duplicates = [];
@@ -96,17 +100,17 @@ class CollectionValidator {
// Turn the client item into something that can be compared with the server item,
// and is also safe to mutate.
normalizeClientItem(item) {
return Cu.cloneInto(item, {});
}
// Turn the server item into something that can be easily compared with the client
// items.
- normalizeServerItem(item) {
+ async normalizeServerItem(item) {
return item;
}
// Return whether or not a server item should be present on the client. Expected
// to be overridden.
clientUnderstands(item) {
return true;
}
@@ -131,25 +135,34 @@ class CollectionValidator {
return differences;
}
// Returns an object containing
// problemData: an instance of the class returned by emptyProblemData(),
// clientRecords: Normalized client records
// records: Normalized server records,
// deletedRecords: Array of ids that were marked as deleted by the server.
- compareClientWithServer(clientItems, serverItems) {
- clientItems = clientItems.map(item => this.normalizeClientItem(item));
- serverItems = serverItems.map(item => this.normalizeServerItem(item));
+ async compareClientWithServer(clientItems, serverItems) {
+ let maybeYield = Async.jankYielder();
+ const clientRecords = [];
+ for (let item of clientItems) {
+ await maybeYield();
+ clientRecords.push(this.normalizeClientItem(item));
+ }
+ const serverRecords = [];
+ for (let item of serverItems) {
+ await maybeYield();
+ serverRecords.push((await this.normalizeServerItem(item)));
+ }
let problems = this.emptyProblemData();
let seenServer = new Map();
let serverDeleted = new Set();
let allRecords = new Map();
- for (let record of serverItems) {
+ for (let record of serverRecords) {
let id = record[this.idProp];
if (!id) {
++problems.missingIDs;
continue;
}
if (record.deleted) {
serverDeleted.add(record);
} else {
@@ -160,17 +173,17 @@ class CollectionValidator {
seenServer.set(id, record);
allRecords.set(id, { server: record, client: null, });
}
record.understood = this.clientUnderstands(record);
}
}
let seenClient = new Map();
- for (let record of clientItems) {
+ for (let record of clientRecords) {
let id = record[this.idProp];
record.shouldSync = this.syncedByClient(record);
seenClient.set(id, record);
let combined = allRecords.get(id);
if (combined) {
combined.client = record;
} else {
allRecords.set(id, { client: record, server: null });
@@ -198,17 +211,17 @@ class CollectionValidator {
let differences = this.getDifferences(client, server);
if (differences && differences.length) {
problems.differences.push({ id, differences });
}
}
}
return {
problemData: problems,
- clientRecords: clientItems,
- records: serverItems,
+ clientRecords,
+ records: serverRecords,
deletedRecords: [...serverDeleted]
};
}
}
// Default to 0, some engines may override.
CollectionValidator.prototype.version = 0;
--- a/services/sync/modules/doctor.js
+++ b/services/sync/modules/doctor.js
@@ -67,17 +67,17 @@ this.Doctor = {
await this._runValidators(engineInfos);
// We are called at the end of a sync, which is a good time to periodically
// check each repairer to see if it can advance.
if (this._now() - this.lastRepairAdvance > REPAIR_ADVANCE_PERIOD) {
try {
for (let [collection, requestor] of Object.entries(this._getAllRepairRequestors())) {
try {
- let advanced = requestor.continueRepairs();
+ let advanced = await requestor.continueRepairs();
log.info(`${collection} reparier ${advanced ? "advanced" : "did not advance"}.`);
} catch (ex) {
if (Async.isShutdownException(ex)) {
throw ex;
}
log.error(`${collection} repairer failed`, ex);
}
}
@@ -132,17 +132,17 @@ this.Doctor = {
if (Object.keys(engineInfos).length == 0) {
log.info("Skipping validation: no engines qualify");
return;
}
if (Object.values(engineInfos).filter(i => i.maxRecords != -1).length != 0) {
// at least some of the engines have maxRecord restrictions which require
// us to ask the server for the counts.
- let countInfo = this._fetchCollectionCounts();
+ let countInfo = await this._fetchCollectionCounts();
for (let [engineName, recordCount] of Object.entries(countInfo)) {
if (engineName in engineInfos) {
engineInfos[engineName].recordCount = recordCount;
}
}
}
for (let [engineName, { engine, maxRecords, recordCount }] of Object.entries(engineInfos)) {
@@ -188,43 +188,43 @@ this.Doctor = {
log.error(`Failed to run validation on ${engine.name}!`, ex);
Observers.notify("weave:engine:validate:error", ex, engine.name)
// Keep validating -- there's no reason to think that a failure for one
// validator would mean the others will fail.
}
}
},
- _maybeCure(engine, validationResults, flowID) {
+ async _maybeCure(engine, validationResults, flowID) {
if (!this._shouldRepair(engine)) {
log.info(`Skipping repair of ${engine.name} - disabled via preferences`);
return;
}
let requestor = this._getRepairRequestor(engine.name);
let didStart = false;
if (requestor) {
if (requestor.tryServerOnlyRepairs(validationResults)) {
return; // TODO: It would be nice if we could request a validation to be
// done on next sync.
}
- didStart = requestor.startRepairs(validationResults, flowID);
+ didStart = await requestor.startRepairs(validationResults, flowID);
}
log.info(`${didStart ? "did" : "didn't"} start a repair of ${engine.name} with flowID ${flowID}`);
},
_shouldRepair(engine) {
return Svc.Prefs.get(`engine.${engine.name}.repair.enabled`, false);
},
// mainly for mocking.
- _fetchCollectionCounts() {
+ async _fetchCollectionCounts() {
let collectionCountsURL = Service.userBaseURL + "info/collection_counts";
try {
- let infoResp = Service._fetchInfo(collectionCountsURL);
+ let infoResp = await Service._fetchInfo(collectionCountsURL);
if (!infoResp.success) {
log.error("Can't fetch collection counts: request to info/collection_counts responded with "
+ infoResp.status);
return {};
}
return infoResp.obj; // might throw because obj is a getter which parses json.
} catch (ex) {
if (Async.isShutdownException(ex)) {
--- a/services/sync/modules/engines.js
+++ b/services/sync/modules/engines.js
@@ -9,17 +9,16 @@ this.EXPORTED_SYMBOLS = [
"Tracker",
"Store",
"Changeset"
];
var {classes: Cc, interfaces: Ci, results: Cr, utils: Cu} = Components;
Cu.import("resource://gre/modules/XPCOMUtils.jsm");
-Cu.import("resource://gre/modules/Services.jsm");
Cu.import("resource://gre/modules/JSONFile.jsm");
Cu.import("resource://gre/modules/Log.jsm");
Cu.import("resource://services-common/async.js");
Cu.import("resource://services-common/observers.js");
Cu.import("resource://services-sync/constants.js");
Cu.import("resource://services-sync/record.js");
Cu.import("resource://services-sync/resource.js");
Cu.import("resource://services-sync/util.js");
@@ -305,54 +304,49 @@ this.Store = function Store(name, engine
this._log.level = Log.Level[level];
XPCOMUtils.defineLazyGetter(this, "_timer", function() {
return Cc["@mozilla.org/timer;1"].createInstance(Ci.nsITimer);
});
}
Store.prototype = {
- _sleep: function _sleep(delay) {
- let cb = Async.makeSyncCallback();
- this._timer.initWithCallback(cb, delay, Ci.nsITimer.TYPE_ONE_SHOT);
- Async.waitForSyncCallback(cb);
- },
-
/**
* Apply multiple incoming records against the store.
*
* This is called with a set of incoming records to process. The function
* should look at each record, reconcile with the current local state, and
* make the local changes required to bring its state in alignment with the
* record.
*
* The default implementation simply iterates over all records and calls
* applyIncoming(). Store implementations may overwrite this function
* if desired.
*
* @param records Array of records to apply
* @return Array of record IDs which did not apply cleanly
*/
- applyIncomingBatch(records) {
+ async applyIncomingBatch(records) {
let failed = [];
+ let maybeYield = Async.jankYielder();
for (let record of records) {
+ await maybeYield();
try {
- this.applyIncoming(record);
+ await this.applyIncoming(record);
} catch (ex) {
if (ex.code == Engine.prototype.eEngineAbortApplyIncoming) {
// This kind of exception should have a 'cause' attribute, which is an
// originating exception.
// ex.cause will carry its stack with it when rethrown.
throw ex.cause;
}
if (Async.isShutdownException(ex)) {
throw ex;
}
this._log.warn("Failed to apply incoming record " + record.id, ex);
- this.engine._noteApplyFailure();
failed.push(record.id);
}
}
return failed;
},
/**
* Apply a single record against the store.
@@ -362,77 +356,77 @@ Store.prototype = {
*
* The default implementation calls one of remove(), create(), or update()
* depending on the state obtained from the store itself. Store
* implementations may overwrite this function if desired.
*
* @param record
* Record to apply
*/
- applyIncoming(record) {
+ async applyIncoming(record) {
if (record.deleted)
- this.remove(record);
- else if (!this.itemExists(record.id))
- this.create(record);
+ await this.remove(record);
+ else if (!(await this.itemExists(record.id)))
+ await this.create(record);
else
- this.update(record);
+ await this.update(record);
},
// override these in derived objects
/**
* Create an item in the store from a record.
*
* This is called by the default implementation of applyIncoming(). If using
* applyIncomingBatch(), this won't be called unless your store calls it.
*
* @param record
* The store record to create an item from
*/
- create(record) {
+ async create(record) {
throw "override create in a subclass";
},
/**
* Remove an item in the store from a record.
*
* This is called by the default implementation of applyIncoming(). If using
* applyIncomingBatch(), this won't be called unless your store calls it.
*
* @param record
* The store record to delete an item from
*/
- remove(record) {
+ async remove(record) {
throw "override remove in a subclass";
},
/**
* Update an item from a record.
*
* This is called by the default implementation of applyIncoming(). If using
* applyIncomingBatch(), this won't be called unless your store calls it.
*
* @param record
* The record to use to update an item from
*/
- update(record) {
+ async update(record) {
throw "override update in a subclass";
},
/**
* Determine whether a record with the specified ID exists.
*
* Takes a string record ID and returns a booleans saying whether the record
* exists.
*
* @param id
* string record ID
* @return boolean indicating whether record exists locally
*/
- itemExists(id) {
+ async itemExists(id) {
throw "override itemExists in a subclass";
},
/**
* Create a record from the specified ID.
*
* If the ID is known, the record should be populated with metadata from
* the store. If the ID is not known, the record should be created with the
@@ -440,53 +434,53 @@ Store.prototype = {
*
* @param id
* string record ID
* @param collection
* Collection to add record to. This is typically passed into the
* constructor for the newly-created record.
* @return record type for this engine
*/
- createRecord(id, collection) {
+ async createRecord(id, collection) {
throw "override createRecord in a subclass";
},
/**
* Change the ID of a record.
*
* @param oldID
* string old/current record ID
* @param newID
* string new record ID
*/
- changeItemID(oldID, newID) {
+ async changeItemID(oldID, newID) {
throw "override changeItemID in a subclass";
},
/**
* Obtain the set of all known record IDs.
*
* @return Object with ID strings as keys and values of true. The values
* are ignored.
*/
- getAllIDs() {
+ async getAllIDs() {
throw "override getAllIDs in a subclass";
},
/**
* Wipe all data in the store.
*
* This function is called during remote wipes or when replacing local data
* with remote data.
*
* This function should delete all local data that the store is managing. It
* can be thought of as clearing out all state and restoring the "new
* browser" state.
*/
- wipe() {
+ async wipe() {
throw "override wipe in a subclass";
}
};
this.EngineManager = function EngineManager(service) {
this.service = service;
this._engines = {};
@@ -596,28 +590,33 @@ EngineManager.prototype = {
/**
* Register an Engine to the service. Alternatively, give an array of engine
* objects to register.
*
* @param engineObject
* Engine object used to get an instance of the engine
* @return The engine object if anything failed
*/
- register(engineObject) {
+ async register(engineObject) {
if (Array.isArray(engineObject)) {
- engineObject.map(this.register, this);
+ for (const e of engineObject) {
+ await this.register(e);
+ }
return;
}
try {
let engine = new engineObject(this.service);
let name = engine.name;
if (name in this._engines) {
this._log.error("Engine '" + name + "' is already registered!");
} else {
+ if (engine.initialize) {
+ await engine.initialize();
+ }
this._engines[name] = engine;
}
} catch (ex) {
let name = engineObject || "";
name = name.prototype || "";
name = name.name || "";
this._log.error(`Could not initialize engine ${name}`, ex);
@@ -656,17 +655,17 @@ this.Engine = function Engine(name, serv
this._notify = Utils.notify("weave:engine:");
this._log = Log.repository.getLogger("Sync.Engine." + this.Name);
let level = Svc.Prefs.get("log.logger.engine." + this.name, "Debug");
this._log.level = Log.Level[level];
this._modified = this.emptyChangeset();
this._tracker; // initialize tracker to load previously changed IDs
- this._log.debug("Engine initialized");
+ this._log.debug("Engine constructed");
}
Engine.prototype = {
// _storeObj, and _trackerObj should to be overridden in subclasses
_storeObj: Store,
_trackerObj: Tracker,
// Override this method to return a new changeset type.
emptyChangeset() {
@@ -706,50 +705,50 @@ Engine.prototype = {
},
get _tracker() {
let tracker = new this._trackerObj(this.Name, this);
this.__defineGetter__("_tracker", () => tracker);
return tracker;
},
- sync() {
+ async sync() {
if (!this.enabled) {
- return;
+ return false;
}
if (!this._sync) {
throw "engine does not implement _sync method";
}
- this._notify("sync", this.name, this._sync)();
+ return this._notify("sync", this.name, this._sync)();
},
/**
* Get rid of any local meta-data.
*/
- resetClient() {
+ async resetClient() {
if (!this._resetClient) {
throw "engine does not implement _resetClient method";
}
- this._notify("reset-client", this.name, this._resetClient)();
+ return this._notify("reset-client", this.name, this._resetClient)();
},
- _wipeClient() {
- this.resetClient();
+ async _wipeClient() {
+ await this.resetClient();
this._log.debug("Deleting all local data");
this._tracker.ignoreAll = true;
- this._store.wipe();
+ await this._store.wipe();
this._tracker.ignoreAll = false;
this._tracker.clearChangedIDs();
},
- wipeClient() {
- this._notify("wipe-client", this.name, this._wipeClient)();
+ async wipeClient() {
+ return this._notify("wipe-client", this.name, this._wipeClient)();
},
/**
* If one exists, initialize and return a validator for this engine (which
* must have a `validate(engine)` method that returns a promise to an object
* with a getSummary method). Otherwise return null.
*/
getValidator() {
@@ -759,18 +758,18 @@ Engine.prototype = {
async finalize() {
await this._tracker.finalize();
},
};
this.SyncEngine = function SyncEngine(name, service) {
Engine.call(this, name || "SyncEngine", service);
- this.loadToFetch();
- this.loadPreviousFailed();
+ // Async initializations can be made in the initialize() method.
+
// The set of records needing a weak reupload.
// The difference between this and a "normal" reupload is that these records
// are only tracked in memory, and if the reupload attempt fails (shutdown,
// 412, etc), we abort uploading the "weak" set.
//
// The rationale here is for the cases where we receive a record from the
// server that we know is wrong in some (small) way. For example, the
// dateAdded field on bookmarks -- maybe we have a better date, or the server
@@ -813,16 +812,22 @@ SyncEngine.prototype = {
// How many records to pull at one time when specifying IDs. This is to avoid
// URI length limitations.
guidFetchBatchSize: DEFAULT_GUID_FETCH_BATCH_SIZE,
// How many records to process in a single batch.
applyIncomingBatchSize: DEFAULT_STORE_BATCH_SIZE,
+ async initialize() {
+ await this.loadToFetch();
+ await this.loadPreviousFailed();
+ this._log.debug("SyncEngine initialized", this.name);
+ },
+
get storageURL() {
return this.service.storageURL;
},
get engineURL() {
return this.storageURL + this.name;
},
@@ -861,70 +866,65 @@ SyncEngine.prototype = {
Svc.Prefs.set(this.name + ".lastSync", "0");
this.lastSyncLocal = 0;
},
get toFetch() {
return this._toFetch;
},
set toFetch(val) {
- let cb = (error) => {
- if (error) {
- this._log.error("Failed to read JSON records to fetch", error);
- }
- }
// Coerce the array to a string for more efficient comparison.
if (val + "" == this._toFetch) {
return;
}
this._toFetch = val;
Utils.namedTimer(function() {
- Utils.jsonSave("toFetch/" + this.name, this, val, cb);
+ try {
+ Async.promiseSpinningly(Utils.jsonSave("toFetch/" + this.name, this, val));
+ } catch (error) {
+ this._log.error("Failed to read JSON records to fetch", error);
+ }
}, 0, this, "_toFetchDelay");
},
- loadToFetch() {
+ async loadToFetch() {
// Initialize to empty if there's no file.
this._toFetch = [];
- Utils.jsonLoad("toFetch/" + this.name, this, function(toFetch) {
- if (toFetch) {
- this._toFetch = toFetch;
- }
- });
+ let toFetch = await Utils.jsonLoad("toFetch/" + this.name, this);
+ if (toFetch) {
+ this._toFetch = toFetch;
+ }
},
get previousFailed() {
return this._previousFailed;
},
set previousFailed(val) {
- let cb = (error) => {
- if (error) {
- this._log.error("Failed to set previousFailed", error);
- } else {
- this._log.debug("Successfully wrote previousFailed.");
- }
- }
// Coerce the array to a string for more efficient comparison.
if (val + "" == this._previousFailed) {
return;
}
this._previousFailed = val;
Utils.namedTimer(function() {
- Utils.jsonSave("failed/" + this.name, this, val, cb);
+ Utils.jsonSave("failed/" + this.name, this, val).then(() => {
+ this._log.debug("Successfully wrote previousFailed.");
+ })
+ .catch((error) => {
+ this._log.error("Failed to set previousFailed", error);
+ });
}, 0, this, "_previousFailedDelay");
},
- loadPreviousFailed() {
+ async loadPreviousFailed() {
// Initialize to empty if there's no file
this._previousFailed = [];
- Utils.jsonLoad("failed/" + this.name, this, function(previousFailed) {
- if (previousFailed) {
- this._previousFailed = previousFailed;
- }
- });
+ let previousFailed = await Utils.jsonLoad("failed/" + this.name, this);
+ if (previousFailed) {
+ this._previousFailed = previousFailed;
+ }
},
/*
* lastSyncLocal is a timestamp in local time.
*/
get lastSyncLocal() {
return parseInt(Svc.Prefs.get(this.name + ".lastSyncLocal", "0"), 10);
},
@@ -940,42 +940,42 @@ SyncEngine.prototype = {
}
return DEFAULT_MAX_RECORD_PAYLOAD_BYTES;
},
/*
* Returns a changeset for this sync. Engine implementations can override this
* method to bypass the tracker for certain or all changed items.
*/
- getChangedIDs() {
+ async getChangedIDs() {
return this._tracker.changedIDs;
},
// Create a new record using the store and add in metadata.
- _createRecord(id) {
- let record = this._store.createRecord(id, this.name);
+ async _createRecord(id) {
+ let record = await this._store.createRecord(id, this.name);
record.id = id;
record.collection = this.name;
return record;
},
// Creates a tombstone Sync record with additional metadata.
_createTombstone(id) {
let tombstone = new this._recordObj(this.name, id);
tombstone.id = id;
tombstone.collection = this.name;
tombstone.deleted = true;
return tombstone;
},
// Any setup that needs to happen at the beginning of each sync.
- _syncStartup() {
+ async _syncStartup() {
// Determine if we need to wipe on outdated versions
- let metaGlobal = Async.promiseSpinningly(this.service.recordManager.get(this.metaURL));
+ let metaGlobal = await this.service.recordManager.get(this.metaURL);
let engines = metaGlobal.payload.engines || {};
let engineData = engines[this.name] || {};
let needsWipe = false;
// Assume missing versions are 0 and wipe the server
if ((engineData.version || 0) < this.version) {
this._log.debug("Old engine data: " + [engineData.version, this.version]);
@@ -996,38 +996,38 @@ SyncEngine.prototype = {
// Don't sync this engine if the server has newer data
let error = new String("New data: " + [engineData.version, this.version]);
error.failureCode = VERSION_OUT_OF_DATE;
throw error;
} else if (engineData.syncID != this.syncID) {
// Changes to syncID mean we'll need to upload everything
this._log.debug("Engine syncIDs: " + [engineData.syncID, this.syncID]);
this.syncID = engineData.syncID;
- this._resetClient();
+ await this._resetClient();
}
// Delete any existing data and reupload on bad version or missing meta.
// No crypto component here...? We could regenerate per-collection keys...
if (needsWipe) {
- this.wipeServer();
+ await this.wipeServer();
}
// Save objects that need to be uploaded in this._modified. We also save
// the timestamp of this fetch in this.lastSyncLocal. As we successfully
// upload objects we remove them from this._modified. If an error occurs
// or any objects fail to upload, they will remain in this._modified. At
// the end of a sync, or after an error, we add all objects remaining in
// this._modified to the tracker.
this.lastSyncLocal = Date.now();
let initialChanges;
if (this.lastSync) {
- initialChanges = this.pullNewChanges();
+ initialChanges = await this.pullNewChanges();
} else {
this._log.debug("First sync, uploading all items");
- initialChanges = this.pullAllChanges();
+ initialChanges = await this.pullAllChanges();
}
this._modified.replace(initialChanges);
// Clear the tracker now. If the sync fails we'll add the ones we failed
// to upload back.
this._tracker.clearChangedIDs();
this._log.info(this._modified.count() +
" outgoing items pre-reconciliation");
@@ -1045,17 +1045,17 @@ SyncEngine.prototype = {
return new Collection(this.engineURL, this._recordObj, this.service);
},
/**
* Process incoming records.
* In the most awful and untestable way possible.
* This now accepts something that makes testing vaguely less impossible.
*/
- _processIncoming(newitems) {
+ async _processIncoming(newitems) {
this._log.trace("Downloading & applying server changes");
// Figure out how many total items to fetch this sync; do less on mobile.
let batchSize = this.downloadLimit || Infinity;
if (!newitems) {
newitems = this.itemSource();
}
@@ -1081,54 +1081,54 @@ SyncEngine.prototype = {
// Reset previousFailed for each sync since previously failed items may not fail again.
this.previousFailed = [];
// Used (via exceptions) to allow the record handler/reconciliation/etc.
// methods to signal that they would like processing of incoming records to
// cease.
let aborting = undefined;
- function doApplyBatch() {
+ async function doApplyBatch() {
this._tracker.ignoreAll = true;
try {
- failed = failed.concat(this._store.applyIncomingBatch(applyBatch));
+ failed = failed.concat((await this._store.applyIncomingBatch(applyBatch)));
} catch (ex) {
if (Async.isShutdownException(ex)) {
throw ex;
}
// Catch any error that escapes from applyIncomingBatch. At present
// those will all be abort events.
this._log.warn("Got exception, aborting processIncoming", ex);
aborting = ex;
}
this._tracker.ignoreAll = false;
applyBatch = [];
}
- function doApplyBatchAndPersistFailed() {
+ async function doApplyBatchAndPersistFailed() {
// Apply remaining batch.
if (applyBatch.length) {
- doApplyBatch.call(this);
+ await doApplyBatch.call(this);
}
// Persist failed items so we refetch them.
if (failed.length) {
this.previousFailed = Utils.arrayUnion(failed, this.previousFailed);
count.failed += failed.length;
this._log.debug("Records that failed to apply: " + failed);
failed = [];
}
}
let key = this.service.collectionKeys.keyForCollection(this.name);
// Not binding this method to 'this' for performance reasons. It gets
// called for every incoming record.
let self = this;
- let recordHandler = function(item) {
+ let recordHandler = async function(item) {
if (aborting) {
return;
}
// Grab a later last modified if possible
if (self.lastModified == null || item.modified > self.lastModified)
self.lastModified = item.modified;
@@ -1140,112 +1140,109 @@ SyncEngine.prototype = {
try {
try {
item.decrypt(key);
} catch (ex) {
if (!Utils.isHMACMismatch(ex)) {
throw ex;
}
- let strategy = self.handleHMACMismatch(item, true);
+ let strategy = await self.handleHMACMismatch(item, true);
if (strategy == SyncEngine.kRecoveryStrategy.retry) {
// You only get one retry.
try {
// Try decrypting again, typically because we've got new keys.
self._log.info("Trying decrypt again...");
key = self.service.collectionKeys.keyForCollection(self.name);
item.decrypt(key);
strategy = null;
} catch (ex) {
if (!Utils.isHMACMismatch(ex)) {
throw ex;
}
- strategy = self.handleHMACMismatch(item, false);
+ strategy = await self.handleHMACMismatch(item, false);
}
}
switch (strategy) {
case null:
// Retry succeeded! No further handling.
break;
case SyncEngine.kRecoveryStrategy.retry:
self._log.debug("Ignoring second retry suggestion.");
// Fall through to error case.
case SyncEngine.kRecoveryStrategy.error:
self._log.warn("Error decrypting record", ex);
- self._noteApplyFailure();
failed.push(item.id);
return;
case SyncEngine.kRecoveryStrategy.ignore:
self._log.debug("Ignoring record " + item.id +
" with bad HMAC: already handled.");
return;
}
}
} catch (ex) {
if (Async.isShutdownException(ex)) {
throw ex;
}
self._log.warn("Error decrypting record", ex);
- self._noteApplyFailure();
failed.push(item.id);
return;
}
if (self._shouldDeleteRemotely(item)) {
self._log.trace("Deleting item from server without applying", item);
self._deleteId(item.id);
return;
}
let shouldApply;
try {
- shouldApply = self._reconcile(item);
+ shouldApply = await self._reconcile(item);
} catch (ex) {
if (ex.code == Engine.prototype.eEngineAbortApplyIncoming) {
self._log.warn("Reconciliation failed: aborting incoming processing.");
- self._noteApplyFailure();
failed.push(item.id);
aborting = ex.cause;
} else if (!Async.isShutdownException(ex)) {
self._log.warn("Failed to reconcile incoming record " + item.id, ex);
- self._noteApplyFailure();
failed.push(item.id);
return;
} else {
throw ex;
}
}
if (shouldApply) {
count.applied++;
applyBatch.push(item);
} else {
count.reconciled++;
self._log.trace("Skipping reconciled incoming item " + item.id);
}
if (applyBatch.length == self.applyIncomingBatchSize) {
- doApplyBatch.call(self);
+ await doApplyBatch.call(self);
}
- self._store._sleep(0);
};
// Only bother getting data from the server if there's new things
if (this.lastModified == null || this.lastModified > this.lastSync) {
- let { response, records } = Async.promiseSpinningly(newitems.getBatched());
+ let { response, records } = await newitems.getBatched();
if (!response.success) {
response.failureCode = ENGINE_DOWNLOAD_FAIL;
throw response;
}
+ let maybeYield = Async.jankYielder();
for (let record of records) {
- recordHandler(record);
+ await maybeYield();
+ await recordHandler(record);
}
- doApplyBatchAndPersistFailed.call(this);
+ await doApplyBatchAndPersistFailed.call(this);
if (aborting) {
throw aborting;
}
}
// Mobile: check if we got the maximum that we requested; get the rest if so.
if (handled.length == newitems.limit) {
@@ -1253,17 +1250,17 @@ SyncEngine.prototype = {
// Sort and limit so that on mobile we only get the last X records.
guidColl.limit = this.downloadLimit;
guidColl.newer = this.lastSync;
// index: Orders by the sortindex descending (highest weight first).
guidColl.sort = "index";
- let guids = Async.promiseSpinningly(guidColl.get());
+ let guids = await guidColl.get();
if (!guids.success)
throw guids;
// Figure out which guids weren't just fetched then remove any guids that
// were already waiting and prepend the new ones
let extra = Utils.arraySub(guids.obj, handled);
if (extra.length > 0) {
fetchBatch = Utils.arrayUnion(extra, fetchBatch);
@@ -1284,26 +1281,28 @@ SyncEngine.prototype = {
while (fetchBatch.length && !aborting) {
// Reuse the original query, but get rid of the restricting params
// and batch remaining records.
newitems.limit = 0;
newitems.newer = 0;
newitems.ids = fetchBatch.slice(0, batchSize);
- let resp = Async.promiseSpinningly(newitems.get());
+ let resp = await newitems.get();
if (!resp.success) {
resp.failureCode = ENGINE_DOWNLOAD_FAIL;
throw resp;
}
+ let maybeYield = Async.jankYielder();
for (let json of resp.obj) {
+ await maybeYield();
let record = new this._recordObj();
record.deserialize(json);
- recordHandler(record);
+ await recordHandler(record);
}
// This batch was successfully applied. Not using
// doApplyBatchAndPersistFailed() here to avoid writing toFetch twice.
fetchBatch = fetchBatch.slice(batchSize);
this.toFetch = Utils.arraySub(this.toFetch, newitems.ids);
this.previousFailed = Utils.arrayUnion(this.previousFailed, failed);
if (failed.length) {
@@ -1317,22 +1316,21 @@ SyncEngine.prototype = {
}
if (this.lastSync < this.lastModified) {
this.lastSync = this.lastModified;
}
}
// Apply remaining items.
- doApplyBatchAndPersistFailed.call(this);
+ await doApplyBatchAndPersistFailed.call(this);
count.newFailed = this.previousFailed.reduce((count, engine) => {
if (failedInPreviousSync.indexOf(engine) == -1) {
count++;
- this._noteApplyNewFailure();
}
return count;
}, 0);
count.succeeded = Math.max(0, count.applied - count.failed);
this._log.info(["Records:",
count.applied, "applied,",
count.succeeded, "successfully,",
count.failed, "failed to apply,",
@@ -1343,95 +1341,87 @@ SyncEngine.prototype = {
// Indicates whether an incoming item should be deleted from the server at
// the end of the sync. Engines can override this method to clean up records
// that shouldn't be on the server.
_shouldDeleteRemotely(remoteItem) {
return false;
},
- _noteApplyFailure() {
- // here would be a good place to record telemetry...
- },
-
- _noteApplyNewFailure() {
- // here would be a good place to record telemetry...
- },
-
/**
* Find a GUID of an item that is a duplicate of the incoming item but happens
* to have a different GUID
*
* @return GUID of the similar item; falsy otherwise
*/
- _findDupe(item) {
+ async _findDupe(item) {
// By default, assume there's no dupe items for the engine
},
/**
* Called before a remote record is discarded due to failed reconciliation.
* Used by bookmark sync to merge folder child orders.
*/
beforeRecordDiscard(localRecord, remoteRecord, remoteIsNewer) {
},
// Called when the server has a record marked as deleted, but locally we've
// changed it more recently than the deletion. If we return false, the
// record will be deleted locally. If we return true, we'll reupload the
// record to the server -- any extra work that's needed as part of this
// process should be done at this point (such as mark the record's parent
// for reuploading in the case of bookmarks).
- _shouldReviveRemotelyDeletedRecord(remoteItem) {
+ async _shouldReviveRemotelyDeletedRecord(remoteItem) {
return true;
},
_deleteId(id) {
this._tracker.removeChangedID(id);
this._noteDeletedId(id);
},
// Marks an ID for deletion at the end of the sync.
_noteDeletedId(id) {
if (this._delete.ids == null)
this._delete.ids = [id];
else
this._delete.ids.push(id);
},
- _switchItemToDupe(localDupeGUID, incomingItem) {
+ async _switchItemToDupe(localDupeGUID, incomingItem) {
// The local, duplicate ID is always deleted on the server.
this._deleteId(localDupeGUID);
// We unconditionally change the item's ID in case the engine knows of
// an item but doesn't expose it through itemExists. If the API
// contract were stronger, this could be changed.
this._log.debug("Switching local ID to incoming: " + localDupeGUID + " -> " +
incomingItem.id);
- this._store.changeItemID(localDupeGUID, incomingItem.id);
+ return this._store.changeItemID(localDupeGUID, incomingItem.id);
},
/**
* Reconcile incoming record with local state.
*
* This function essentially determines whether to apply an incoming record.
*
* @param item
* Record from server to be tested for application.
* @return boolean
* Truthy if incoming record should be applied. False if not.
*/
- _reconcile(item) {
+ async _reconcile(item) {
if (this._log.level <= Log.Level.Trace) {
this._log.trace("Incoming: " + item);
}
// We start reconciling by collecting a bunch of state. We do this here
// because some state may change during the course of this function and we
// need to operate on the original values.
- let existsLocally = this._store.itemExists(item.id);
+ let existsLocally = await this._store.itemExists(item.id);
let locallyModified = this._modified.has(item.id);
// TODO Handle clock drift better. Tracked in bug 721181.
let remoteAge = AsyncResource.serverTime - item.modified;
let localAge = locallyModified ?
(Date.now() / 1000 - this._modified.getModifiedTimestamp(item.id)) : null;
let remoteIsNewer = remoteAge < localAge;
@@ -1464,56 +1454,56 @@ SyncEngine.prototype = {
this._log.trace("Incoming record is deleted but we had local changes.");
if (remoteIsNewer) {
this._log.trace("Remote record is newer -- deleting local record.");
return true;
}
// If the local record is newer, we defer to individual engines for
// how to handle this. By default, we revive the record.
- let willRevive = this._shouldReviveRemotelyDeletedRecord(item);
+ let willRevive = await this._shouldReviveRemotelyDeletedRecord(item);
this._log.trace("Local record is newer -- reviving? " + willRevive);
return !willRevive;
}
// At this point the incoming record is not for a deletion and must have
// data. If the incoming record does not exist locally, we check for a local
// duplicate existing under a different ID. The default implementation of
// _findDupe() is empty, so engines have to opt in to this functionality.
//
// If we find a duplicate, we change the local ID to the incoming ID and we
// refresh the metadata collected above. See bug 710448 for the history
// of this logic.
if (!existsLocally) {
- let localDupeGUID = this._findDupe(item);
+ let localDupeGUID = await this._findDupe(item);
if (localDupeGUID) {
this._log.trace("Local item " + localDupeGUID + " is a duplicate for " +
"incoming item " + item.id);
// The current API contract does not mandate that the ID returned by
// _findDupe() actually exists. Therefore, we have to perform this
// check.
- existsLocally = this._store.itemExists(localDupeGUID);
+ existsLocally = await this._store.itemExists(localDupeGUID);
// If the local item was modified, we carry its metadata forward so
// appropriate reconciling can be performed.
if (this._modified.has(localDupeGUID)) {
locallyModified = true;
localAge = this._tracker._now() - this._modified.getModifiedTimestamp(localDupeGUID);
remoteIsNewer = remoteAge < localAge;
this._modified.changeID(localDupeGUID, item.id);
} else {
locallyModified = false;
localAge = null;
}
// Tell the engine to do whatever it needs to switch the items.
- this._switchItemToDupe(localDupeGUID, item);
+ await this._switchItemToDupe(localDupeGUID, item);
this._log.debug("Local item after duplication: age=" + localAge +
"; modified=" + locallyModified + "; exists=" +
existsLocally);
} else {
this._log.trace("No duplicate found for incoming item: " + item.id);
}
}
@@ -1548,17 +1538,17 @@ SyncEngine.prototype = {
}
// If the remote and local records are the same, there is nothing to be
// done, so we don't do anything. In the ideal world, this logic wouldn't
// be here and the engine would take a record and apply it. The reason we
// want to defer this logic is because it would avoid a redundant and
// possibly expensive dip into the storage layer to query item state.
// This should get addressed in the async rewrite, so we ignore it for now.
- let localRecord = this._createRecord(item.id);
+ let localRecord = await this._createRecord(item.id);
let recordsEqual = Utils.deepEquals(item.cleartext,
localRecord.cleartext);
// If the records are the same, we don't need to do anything. This does
// potentially throw away a local modification time. But, if the records
// are the same, does it matter?
if (recordsEqual) {
this._log.trace("Ignoring incoming item because the local item is " +
@@ -1584,32 +1574,32 @@ SyncEngine.prototype = {
item.id);
if (!remoteIsNewer) {
this.beforeRecordDiscard(localRecord, item, remoteIsNewer);
}
return remoteIsNewer;
},
// Upload outgoing records.
- _uploadOutgoing() {
+ async _uploadOutgoing() {
this._log.trace("Uploading local changes to server.");
// collection we'll upload
let up = new Collection(this.engineURL, null, this.service);
let modifiedIDs = this._modified.ids();
let counts = { failed: 0, sent: 0 };
if (modifiedIDs.length) {
this._log.trace("Preparing " + modifiedIDs.length +
" outgoing records");
counts.sent = modifiedIDs.length;
let failed = [];
let successful = [];
- let handleResponse = (resp, batchOngoing = false) => {
+ let handleResponse = async (resp, batchOngoing = false) => {
// Note: We don't want to update this.lastSync, or this._modified until
// the batch is complete, however we want to remember success/failure
// indicators for when that happens.
if (!resp.success) {
this._log.debug("Uploading records failed: " + resp);
resp.failureCode = resp.status == 412 ? ENGINE_BATCH_INTERRUPTED : ENGINE_UPLOAD_FAIL;
throw resp;
}
@@ -1634,30 +1624,30 @@ SyncEngine.prototype = {
}
counts.failed += failed.length;
for (let id of successful) {
this._modified.delete(id);
}
- this._onRecordsWritten(successful, failed);
+ await this._onRecordsWritten(successful, failed);
// clear for next batch
failed.length = 0;
successful.length = 0;
};
let postQueue = up.newPostQueue(this._log, this.lastSync, handleResponse);
for (let id of modifiedIDs) {
let out;
let ok = false;
try {
- out = this._createRecord(id);
+ out = await this._createRecord(id);
if (this._log.level <= Log.Level.Trace)
this._log.trace("Outgoing: " + out);
out.encrypt(this.service.collectionKeys.keyForCollection(this.name));
let payloadLength = JSON.stringify(out.payload).length;
if (payloadLength > this.maxRecordPayloadBytes) {
if (this.allowSkippedRecord) {
this._modified.delete(id); // Do not attempt to sync that record again
@@ -1670,196 +1660,196 @@ SyncEngine.prototype = {
++counts.failed;
if (Async.isShutdownException(ex) || !this.allowSkippedRecord) {
Observers.notify("weave:engine:sync:uploaded", counts, this.name);
throw ex;
}
}
this._needWeakReupload.delete(id);
if (ok) {
- let { enqueued, error } = postQueue.enqueue(out);
+ let { enqueued, error } = await postQueue.enqueue(out);
if (!enqueued) {
++counts.failed;
if (!this.allowSkippedRecord) {
throw error;
}
this._modified.delete(id);
this._log.warn(`Failed to enqueue record "${id}" (skipping)`, error);
}
}
- this._store._sleep(0);
+ await Async.promiseYield();
}
- postQueue.flush(true);
+ await postQueue.flush(true);
}
if (this._needWeakReupload.size) {
try {
- const { sent, failed } = this._weakReupload(up);
+ const { sent, failed } = await this._weakReupload(up);
counts.sent += sent;
counts.failed += failed;
} catch (e) {
if (Async.isShutdownException(e)) {
throw e;
}
this._log.warn("Weak reupload failed", e);
}
}
if (counts.sent || counts.failed) {
Observers.notify("weave:engine:sync:uploaded", counts, this.name);
}
},
- _weakReupload(collection) {
+ async _weakReupload(collection) {
const counts = { sent: 0, failed: 0 };
let pendingSent = 0;
let postQueue = collection.newPostQueue(this._log, this.lastSync, (resp, batchOngoing = false) => {
if (!resp.success) {
this._needWeakReupload.clear();
this._log.warn("Uploading records (weak) failed: " + resp);
resp.failureCode = resp.status == 412 ? ENGINE_BATCH_INTERRUPTED : ENGINE_UPLOAD_FAIL;
throw resp;
}
if (!batchOngoing) {
counts.sent += pendingSent;
pendingSent = 0;
}
});
- let pendingWeakReupload = this.buildWeakReuploadMap(this._needWeakReupload);
+ let pendingWeakReupload = await this.buildWeakReuploadMap(this._needWeakReupload);
for (let [id, encodedRecord] of pendingWeakReupload) {
try {
this._log.trace("Outgoing (weak)", encodedRecord);
encodedRecord.encrypt(this.service.collectionKeys.keyForCollection(this.name));
} catch (ex) {
if (Async.isShutdownException(ex)) {
throw ex;
}
this._log.warn(`Failed to encrypt record "${id}" during weak reupload`, ex);
++counts.failed;
continue;
}
// Note that general errors (network error, 412, etc.) will throw here.
// `enqueued` is only false if the specific item failed to enqueue, but
// other items should be/are fine. For example, enqueued will be false if
// it is larger than the max post or WBO size.
- let { enqueued } = postQueue.enqueue(encodedRecord);
+ let { enqueued } = await postQueue.enqueue(encodedRecord);
if (!enqueued) {
++counts.failed;
} else {
++pendingSent;
}
- this._store._sleep(0);
+ await Async.promiseYield();
}
- postQueue.flush(true);
+ await postQueue.flush(true);
return counts;
},
- _onRecordsWritten(succeeded, failed) {
+ async _onRecordsWritten(succeeded, failed) {
// Implement this method to take specific actions against successfully
// uploaded records and failed records.
},
// Any cleanup necessary.
// Save the current snapshot so as to calculate changes at next sync
- _syncFinish() {
+ async _syncFinish() {
this._log.trace("Finishing up sync");
this._tracker.resetScore();
- let doDelete = Utils.bind2(this, function(key, val) {
+ let doDelete = async (key, val) => {
let coll = new Collection(this.engineURL, this._recordObj, this.service);
coll[key] = val;
- Async.promiseSpinningly(coll.delete());
- });
+ await coll.delete();
+ };
for (let [key, val] of Object.entries(this._delete)) {
// Remove the key for future uses
delete this._delete[key];
// Send a simple delete for the property
if (key != "ids" || val.length <= 100)
- doDelete(key, val);
+ await doDelete(key, val);
else {
// For many ids, split into chunks of at most 100
while (val.length > 0) {
- doDelete(key, val.slice(0, 100));
+ await doDelete(key, val.slice(0, 100));
val = val.slice(100);
}
}
}
},
- _syncCleanup() {
+ async _syncCleanup() {
this._needWeakReupload.clear();
if (!this._modified) {
return;
}
try {
// Mark failed WBOs as changed again so they are reuploaded next time.
- this.trackRemainingChanges();
+ await this.trackRemainingChanges();
} finally {
this._modified.clear();
}
},
- _sync() {
+ async _sync() {
try {
- this._syncStartup();
+ await this._syncStartup();
Observers.notify("weave:engine:sync:status", "process-incoming");
- this._processIncoming();
+ await this._processIncoming();
Observers.notify("weave:engine:sync:status", "upload-outgoing");
- this._uploadOutgoing();
- this._syncFinish();
+ await this._uploadOutgoing();
+ await this._syncFinish();
} finally {
- this._syncCleanup();
+ await this._syncCleanup();
}
},
- canDecrypt() {
+ async canDecrypt() {
// Report failure even if there's nothing to decrypt
let canDecrypt = false;
// Fetch the most recently uploaded record and try to decrypt it
let test = new Collection(this.engineURL, this._recordObj, this.service);
test.limit = 1;
test.sort = "newest";
test.full = true;
let key = this.service.collectionKeys.keyForCollection(this.name);
// Any failure fetching/decrypting will just result in false
try {
this._log.trace("Trying to decrypt a record from the server..");
- let json = Async.promiseSpinningly(test.get()).obj[0];
+ let json = (await test.get()).obj[0];
let record = new this._recordObj();
record.deserialize(json);
record.decrypt(key);
canDecrypt = true;
} catch (ex) {
if (Async.isShutdownException(ex)) {
throw ex;
}
this._log.debug("Failed test decrypt", ex);
}
return canDecrypt;
},
- _resetClient() {
+ async _resetClient() {
this.resetLastSync();
this.previousFailed = [];
this.toFetch = [];
},
- wipeServer() {
- let response = Async.promiseSpinningly(this.service.resource(this.engineURL).delete());
+ async wipeServer() {
+ let response = await this.service.resource(this.engineURL).delete();
if (response.status != 200 && response.status != 404) {
throw response;
}
- this._resetClient();
+ await this._resetClient();
},
async removeClientData() {
// Implement this method in engines that store client specific data
// on the server.
},
/*
@@ -1873,74 +1863,78 @@ SyncEngine.prototype = {
* handleHMACMismatch returns kRecoveryStrategy.retry. Otherwise, it returns
* kRecoveryStrategy.error.
*
* Subclasses of SyncEngine can override this method to allow for different
* behavior -- e.g., to delete and ignore erroneous entries.
*
* All return values will be part of the kRecoveryStrategy enumeration.
*/
- handleHMACMismatch(item, mayRetry) {
+ async handleHMACMismatch(item, mayRetry) {
// By default we either try again, or bail out noisily.
- return (this.service.handleHMACEvent() && mayRetry) ?
+ return ((await this.service.handleHMACEvent()) && mayRetry) ?
SyncEngine.kRecoveryStrategy.retry :
SyncEngine.kRecoveryStrategy.error;
},
/**
* Returns a changeset containing all items in the store. The default
* implementation returns a changeset with timestamps from long ago, to
* ensure we always use the remote version if one exists.
*
* This function is only called for the first sync. Subsequent syncs call
* `pullNewChanges`.
*
* @return A `Changeset` object.
*/
- pullAllChanges() {
+ async pullAllChanges() {
let changes = {};
- for (let id in this._store.getAllIDs()) {
+ let ids = await this._store.getAllIDs();
+ for (let id in ids) {
changes[id] = 0;
}
return changes;
},
/*
* Returns a changeset containing entries for all currently tracked items.
* The default implementation returns a changeset with timestamps indicating
* when the item was added to the tracker.
*
* @return A `Changeset` object.
*/
- pullNewChanges() {
+ async pullNewChanges() {
return this.getChangedIDs();
},
/**
* Adds all remaining changeset entries back to the tracker, typically for
* items that failed to upload. This method is called at the end of each sync.
*
*/
- trackRemainingChanges() {
+ async trackRemainingChanges() {
for (let [id, change] of this._modified.entries()) {
this._tracker.addChangedID(id, change);
}
},
/**
* Returns a map of (id, unencrypted record) that will be used to perform
* the weak reupload. Subclasses may override this to filter out items we
* shouldn't upload as part of a weak reupload (items that have changed,
* for example).
*/
- buildWeakReuploadMap(idSet) {
+ async buildWeakReuploadMap(idSet) {
let result = new Map();
+ let maybeYield = Async.jankYielder();
for (let id of idSet) {
+ await maybeYield();
try {
- result.set(id, this._createRecord(id));
+ let record = await this._createRecord(id);
+ result.set(id, record);
} catch (ex) {
if (Async.isShutdownException(ex)) {
throw ex;
}
this._log.warn("createRecord failed during weak reupload", ex);
}
}
return result;
--- a/services/sync/modules/engines/addons.js
+++ b/services/sync/modules/engines/addons.js
@@ -123,20 +123,25 @@ AddonsEngine.prototype = {
_trackerObj: AddonsTracker,
_recordObj: AddonRecord,
version: 1,
syncPriority: 5,
_reconciler: null,
+ async initialize() {
+ await SyncEngine.prototype.initialize.call(this);
+ await this._reconciler.ensureStateLoaded();
+ },
+
/**
* Override parent method to find add-ons by their public ID, not Sync GUID.
*/
- _findDupe: function _findDupe(item) {
+ async _findDupe(item) {
let id = item.addonID;
// The reconciler should have been updated at the top of the sync, so we
// can assume it is up to date when this function is called.
let addons = this._reconciler.addons;
if (!(id in addons)) {
return null;
}
@@ -148,17 +153,17 @@ AddonsEngine.prototype = {
return null;
},
/**
* Override getChangedIDs to pull in tracker changes plus changes from the
* reconciler log.
*/
- getChangedIDs: function getChangedIDs() {
+ async getChangedIDs() {
let changes = {};
for (let [id, modified] of Object.entries(this._tracker.changedIDs)) {
changes[id] = modified;
}
let lastSyncDate = new Date(this.lastSync * 1000);
// The reconciler should have been refreshed at the beginning of a sync and
@@ -196,51 +201,47 @@ AddonsEngine.prototype = {
* Many functions in this class assume the reconciler is refreshed at the
* top of a sync. If this ever changes, those functions should be revisited.
*
* Technically speaking, we don't need to refresh the reconciler on every
* sync since it is installed as an AddonManager listener. However, add-ons
* are complicated and we force a full refresh, just in case the listeners
* missed something.
*/
- _syncStartup: function _syncStartup() {
+ async _syncStartup() {
// We refresh state before calling parent because syncStartup in the parent
// looks for changed IDs, which is dependent on add-on state being up to
// date.
- this._refreshReconcilerState();
-
- SyncEngine.prototype._syncStartup.call(this);
+ await this._refreshReconcilerState();
+ return SyncEngine.prototype._syncStartup.call(this);
},
/**
* Override end of sync to perform a little housekeeping on the reconciler.
*
* We prune changes to prevent the reconciler state from growing without
* bound. Even if it grows unbounded, there would have to be many add-on
* changes (thousands) for it to slow things down significantly. This is
* highly unlikely to occur. Still, we exercise defense just in case.
*/
- _syncCleanup: function _syncCleanup() {
+ async _syncCleanup() {
let ms = 1000 * this.lastSync - PRUNE_ADDON_CHANGES_THRESHOLD;
this._reconciler.pruneChangesBeforeDate(new Date(ms));
-
- SyncEngine.prototype._syncCleanup.call(this);
+ return SyncEngine.prototype._syncCleanup.call(this);
},
/**
* Helper function to ensure reconciler is up to date.
*
- * This will synchronously load the reconciler's state from the file
+ * This will load the reconciler's state from the file
* system (if needed) and refresh the state of the reconciler.
*/
- _refreshReconcilerState: function _refreshReconcilerState() {
+ async _refreshReconcilerState() {
this._log.debug("Refreshing reconciler state");
- let cb = Async.makeSpinningCallback();
- this._reconciler.refreshGlobalState(cb);
- cb.wait();
+ return this._reconciler.refreshGlobalState();
},
isAddonSyncable(addon, ignoreRepoCheck) {
return this._store.isAddonSyncable(addon, ignoreRepoCheck);
}
};
/**
@@ -262,17 +263,17 @@ AddonsStore.prototype = {
get reconciler() {
return this.engine._reconciler;
},
/**
* Override applyIncoming to filter out records we can't handle.
*/
- applyIncoming: function applyIncoming(record) {
+ async applyIncoming(record) {
// The fields we look at aren't present when the record is deleted.
if (!record.deleted) {
// Ignore records not belonging to our application ID because that is the
// current policy.
if (record.applicationID != Services.appinfo.ID) {
this._log.info("Ignoring incoming record from other App ID: " +
record.id);
return;
@@ -290,24 +291,24 @@ AddonsStore.prototype = {
// Ignore incoming records for which an existing non-syncable addon
// exists.
let existingMeta = this.reconciler.addons[record.addonID];
if (existingMeta && !this.isAddonSyncable(existingMeta)) {
this._log.info("Ignoring incoming record for an existing but non-syncable addon", record.addonID);
return;
}
- Store.prototype.applyIncoming.call(this, record);
+ await Store.prototype.applyIncoming.call(this, record);
},
/**
* Provides core Store API to create/install an add-on from a record.
*/
- create: function create(record) {
+ async create(record) {
let cb = Async.makeSpinningCallback();
AddonUtils.installAddons([{
id: record.addonID,
syncGUID: record.id,
enabled: record.enabled,
requireSecureURI: this._extensionsPrefs.get("install.requireSecureOrigin", true),
}], cb);
@@ -337,37 +338,35 @@ AddonsStore.prototype = {
}
this._log.info("Add-on installed: " + record.addonID);
},
/**
* Provides core Store API to remove/uninstall an add-on from a record.
*/
- remove: function remove(record) {
+ async remove(record) {
// If this is called, the payload is empty, so we have to find by GUID.
- let addon = this.getAddonByGUID(record.id);
+ let addon = await this.getAddonByGUID(record.id);
if (!addon) {
// We don't throw because if the add-on could not be found then we assume
// it has already been uninstalled and there is nothing for this function
// to do.
return;
}
this._log.info("Uninstalling add-on: " + addon.id);
- let cb = Async.makeSpinningCallback();
- AddonUtils.uninstallAddon(addon, cb);
- cb.wait();
+ await AddonUtils.uninstallAddon(addon);
},
/**
* Provides core Store API to update an add-on from a record.
*/
- update: function update(record) {
- let addon = this.getAddonByID(record.addonID);
+ async update(record) {
+ let addon = await this.getAddonByID(record.addonID);
// update() is called if !this.itemExists. And, since itemExists consults
// the reconciler only, we need to take care of some corner cases.
//
// First, the reconciler could know about an add-on that was uninstalled
// and no longer present in the add-ons manager.
if (!addon) {
this.create(record);
@@ -386,33 +385,33 @@ AddonsStore.prototype = {
}
this.updateUserDisabled(addon, !record.enabled);
},
/**
* Provide core Store API to determine if a record exists.
*/
- itemExists: function itemExists(guid) {
+ async itemExists(guid) {
let addon = this.reconciler.getAddonStateFromSyncGUID(guid);
return !!addon;
},
/**
* Create an add-on record from its GUID.
*
* @param guid
* Add-on GUID (from extensions DB)
* @param collection
* Collection to add record to.
*
* @return AddonRecord instance
*/
- createRecord: function createRecord(guid, collection) {
+ async createRecord(guid, collection) {
let record = new AddonRecord(collection, guid);
record.applicationID = Services.appinfo.ID;
let addon = this.reconciler.getAddonStateFromSyncGUID(guid);
// If we don't know about this GUID or if it has been uninstalled, we mark
// the record as deleted.
if (!addon || !addon.installed) {
@@ -431,43 +430,41 @@ AddonsStore.prototype = {
return record;
},
/**
* Changes the id of an add-on.
*
* This implements a core API of the store.
*/
- changeItemID: function changeItemID(oldID, newID) {
+ async changeItemID(oldID, newID) {
// We always update the GUID in the reconciler because it will be
// referenced later in the sync process.
let state = this.reconciler.getAddonStateFromSyncGUID(oldID);
if (state) {
state.guid = newID;
- let cb = Async.makeSpinningCallback();
- this.reconciler.saveState(null, cb);
- cb.wait();
+ await this.reconciler.saveState();
}
- let addon = this.getAddonByGUID(oldID);
+ let addon = await this.getAddonByGUID(oldID);
if (!addon) {
this._log.debug("Cannot change item ID (" + oldID + ") in Add-on " +
"Manager because old add-on not present: " + oldID);
return;
}
addon.syncGUID = newID;
},
/**
* Obtain the set of all syncable add-on Sync GUIDs.
*
* This implements a core Store API.
*/
- getAllIDs: function getAllIDs() {
+ async getAllIDs() {
let ids = {};
let addons = this.reconciler.addons;
for (let id in addons) {
let addon = addons[id];
if (this.isAddonSyncable(addon)) {
ids[addon.guid] = true;
}
@@ -477,64 +474,61 @@ AddonsStore.prototype = {
},
/**
* Wipe engine data.
*
* This uninstalls all syncable addons from the application. In case of
* error, it logs the error and keeps trying with other add-ons.
*/
- wipe: function wipe() {
+ async wipe() {
this._log.info("Processing wipe.");
- this.engine._refreshReconcilerState();
+ await this.engine._refreshReconcilerState();
// We only wipe syncable add-ons. Wipe is a Sync feature not a security
// feature.
- for (let guid in this.getAllIDs()) {
- let addon = this.getAddonByGUID(guid);
+ let ids = await this.getAllIDs();
+ for (let guid in ids) {
+ let addon = await this.getAddonByGUID(guid);
if (!addon) {
this._log.debug("Ignoring add-on because it couldn't be obtained: " +
guid);
continue;
}
this._log.info("Uninstalling add-on as part of wipe: " + addon.id);
- Utils.catch.call(this, () => addon.uninstall())();
+ await Utils.catch.call(this, () => addon.uninstall())();
}
},
/** *************************************************************************
* Functions below are unique to this store and not part of the Store API *
***************************************************************************/
/**
- * Synchronously obtain an add-on from its public ID.
+ * Obtain an add-on from its public ID.
*
* @param id
* Add-on ID
* @return Addon or undefined if not found
*/
- getAddonByID: function getAddonByID(id) {
- let cb = Async.makeSyncCallback();
- AddonManager.getAddonByID(id, cb);
- return Async.waitForSyncCallback(cb);
+ async getAddonByID(id) {
+ return AddonManager.getAddonByID(id);
},
/**
- * Synchronously obtain an add-on from its Sync GUID.
+ * Obtain an add-on from its Sync GUID.
*
* @param guid
* Add-on Sync GUID
* @return DBAddonInternal or null
*/
- getAddonByGUID: function getAddonByGUID(guid) {
- let cb = Async.makeSyncCallback();
- AddonManager.getAddonBySyncGUID(guid, cb);
- return Async.waitForSyncCallback(cb);
+ async getAddonByGUID(guid) {
+ return AddonManager.getAddonBySyncGUID(guid);
},
/**
* Determines whether an add-on is suitable for Sync.
*
* @param addon
* Addon instance
* @param ignoreRepoCheck
@@ -752,30 +746,27 @@ class AddonValidator extends CollectionV
"addonID",
"enabled",
"applicationID",
"source"
]);
this.engine = engine;
}
- getClientItems() {
- return Promise.all([
- AddonManager.getAllAddons(),
- AddonManager.getAddonsWithOperationsByTypes(["extension", "theme"]),
- ]).then(([installed, addonsWithPendingOperation]) => {
- // Addons pending install won't be in the first list, but addons pending
- // uninstall/enable/disable will be in both lists.
- let all = new Map(installed.map(addon => [addon.id, addon]));
- for (let addon of addonsWithPendingOperation) {
- all.set(addon.id, addon);
- }
- // Convert to an array since Map.prototype.values returns an iterable
- return [...all.values()];
- });
+ async getClientItems() {
+ const installed = await AddonManager.getAllAddons();
+ const addonsWithPendingOperation = await AddonManager.getAddonsWithOperationsByTypes(["extension", "theme"]);
+ // Addons pending install won't be in the first list, but addons pending
+ // uninstall/enable/disable will be in both lists.
+ let all = new Map(installed.map(addon => [addon.id, addon]));
+ for (let addon of addonsWithPendingOperation) {
+ all.set(addon.id, addon);
+ }
+ // Convert to an array since Map.prototype.values returns an iterable
+ return [...all.values()];
}
normalizeClientItem(item) {
let enabled = !item.userDisabled;
if (item.pendingOperations & AddonManager.PENDING_ENABLE) {
enabled = true;
} else if (item.pendingOperations & AddonManager.PENDING_DISABLE) {
enabled = false;
@@ -785,18 +776,18 @@ class AddonValidator extends CollectionV
id: item.syncGUID,
addonID: item.id,
applicationID: Services.appinfo.ID,
source: "amo", // check item.foreignInstall?
original: item
};
}
- normalizeServerItem(item) {
- let guid = this.engine._findDupe(item);
+ async normalizeServerItem(item) {
+ let guid = await this.engine._findDupe(item);
if (guid) {
item.id = guid;
}
return item;
}
clientUnderstands(item) {
return item.applicationID === Services.appinfo.ID;
--- a/services/sync/modules/engines/bookmarks.js
+++ b/services/sync/modules/engines/bookmarks.js
@@ -288,47 +288,45 @@ BookmarksEngine.prototype = {
syncPriority: 4,
allowSkippedRecord: false,
emptyChangeset() {
return new BookmarksChangeset();
},
-
- _guidMapFailed: false,
- _buildGUIDMap: function _buildGUIDMap() {
- let store = this._store;
+ async _buildGUIDMap() {
let guidMap = {};
- let tree = Async.promiseSpinningly(PlacesUtils.promiseBookmarksTree(""));
+ let tree = await PlacesUtils.promiseBookmarksTree("");
function* walkBookmarksTree(tree, parent = null) {
if (tree) {
// Skip root node
if (parent) {
yield [tree, parent];
}
if (tree.children) {
for (let child of tree.children) {
- store._sleep(0); // avoid jank while looping.
yield* walkBookmarksTree(child, tree);
}
}
}
}
function* walkBookmarksRoots(tree) {
for (let child of tree.children) {
if (isSyncedRootNode(child)) {
yield* walkBookmarksTree(child, tree);
}
}
}
+ let maybeYield = Async.jankYielder();
for (let [node, parent] of walkBookmarksRoots(tree)) {
+ await maybeYield();
let {guid, type: placeType} = node;
guid = PlacesSyncUtils.bookmarks.guidToSyncId(guid);
let key;
switch (placeType) {
case PlacesUtils.TYPE_X_MOZ_PLACE:
// Bookmark
let query = null;
if (node.annos && node.uri.startsWith("place:")) {
@@ -366,17 +364,17 @@ BookmarksEngine.prototype = {
guidMap[parentName][key] = entry;
this._log.trace("Mapped: " + [parentName, key, entry, entry.hasDupe]);
}
return guidMap;
},
// Helper function to get a dupe GUID for an item.
- _mapDupe: function _mapDupe(item) {
+ async _mapDupe(item) {
// Figure out if we have something to key with.
let key;
let altKey;
switch (item.type) {
case "query":
// Prior to Bug 610501, records didn't carry their Smart Bookmark
// anno, so we won't be able to dupe them correctly. This altKey
// hack should get them to dupe correctly.
@@ -397,17 +395,17 @@ BookmarksEngine.prototype = {
key = "s" + item.pos;
break;
default:
return undefined;
}
// Figure out if we have a map to use!
// This will throw in some circumstances. That's fine.
- let guidMap = this._guidMap;
+ let guidMap = await this.getGuidMap();
// Give the GUID if we have the matching pair.
let parentName = item.parentName || "";
this._log.trace("Finding mapping: " + parentName + ", " + key);
let parent = guidMap[parentName];
if (!parent) {
this._log.trace("No parent => no dupe.");
@@ -428,97 +426,88 @@ BookmarksEngine.prototype = {
return dupe;
}
}
this._log.trace("No dupe found for key " + key + "/" + altKey + ".");
return undefined;
},
- _syncStartup: function _syncStart() {
- SyncEngine.prototype._syncStartup.call(this);
+ async _syncStartup() {
+ await SyncEngine.prototype._syncStartup.call(this);
- let cb = Async.makeSpinningCallback();
- (async () => {
+ try {
// For first-syncs, make a backup for the user to restore
if (this.lastSync == 0) {
this._log.debug("Bookmarks backup starting.");
await PlacesBackups.create(null, true);
this._log.debug("Bookmarks backup done.");
}
- })().then(
- cb, ex => {
- // Failure to create a backup is somewhat bad, but probably not bad
- // enough to prevent syncing of bookmarks - so just log the error and
- // continue.
- this._log.warn("Error while backing up bookmarks, but continuing with sync", ex);
- cb();
- }
- );
-
- cb.wait();
-
- this.__defineGetter__("_guidMap", function() {
- // Create a mapping of folder titles and separator positions to GUID.
- // We do this lazily so that we don't do any work unless we reconcile
- // incoming items.
- let guidMap;
- try {
- guidMap = this._buildGUIDMap();
- } catch (ex) {
- if (Async.isShutdownException(ex)) {
- throw ex;
- }
- this._log.warn("Error while building GUID map, skipping all other incoming items", ex);
- throw {code: Engine.prototype.eEngineAbortApplyIncoming,
- cause: ex};
- }
- delete this._guidMap;
- return this._guidMap = guidMap;
- });
+ } catch (ex) {
+ // Failure to create a backup is somewhat bad, but probably not bad
+ // enough to prevent syncing of bookmarks - so just log the error and
+ // continue.
+ this._log.warn("Error while backing up bookmarks, but continuing with sync", ex);
+ }
this._store._childrenToOrder = {};
this._store.clearPendingDeletions();
},
+ async getGuidMap() {
+ if (this._guidMap) {
+ return this._guidMap;
+ }
+ try {
+ return this._guidMap = await this._buildGUIDMap();
+ } catch (ex) {
+ if (Async.isShutdownException(ex)) {
+ throw ex;
+ }
+ this._log.warn("Error while building GUID map, skipping all other incoming items", ex);
+ throw {code: Engine.prototype.eEngineAbortApplyIncoming,
+ cause: ex};
+ }
+ },
+
async _deletePending() {
// Delete pending items -- See the comment above BookmarkStore's deletePending
let newlyModified = await this._store.deletePending();
if (newlyModified) {
this._log.debug("Deleted pending items", newlyModified);
this._modified.insert(newlyModified);
}
},
- _shouldReviveRemotelyDeletedRecord(item) {
+ async _shouldReviveRemotelyDeletedRecord(item) {
let modifiedTimestamp = this._modified.getModifiedTimestamp(item.id);
if (!modifiedTimestamp) {
// We only expect this to be called with items locally modified, so
// something strange is going on - play it safe and don't revive it.
this._log.error("_shouldReviveRemotelyDeletedRecord called on unmodified item: " + item.id);
return false;
}
// In addition to preventing the deletion of this record (handled by the caller),
// we use `touch` to mark the parent of this record for uploading next sync, in order
// to ensure its children array is accurate. If `touch` returns new change records,
// we revive the item and insert the changes into the current changeset.
- let newChanges = Async.promiseSpinningly(PlacesSyncUtils.bookmarks.touch(item.id));
+ let newChanges = await PlacesSyncUtils.bookmarks.touch(item.id);
if (newChanges) {
this._modified.insert(newChanges);
return true;
}
return false;
},
- _processIncoming(newitems) {
+ async _processIncoming(newitems) {
try {
- SyncEngine.prototype._processIncoming.call(this, newitems);
+ await SyncEngine.prototype._processIncoming.call(this, newitems);
} finally {
- Async.promiseSpinningly(this._postProcessIncoming());
+ await this._postProcessIncoming();
}
},
// Applies pending tombstones, sets folder child order, and updates the sync
// status of all `NEW` bookmarks to `NORMAL`.
async _postProcessIncoming() {
await this._deletePending();
await this._orderChildren();
@@ -526,117 +515,118 @@ BookmarksEngine.prototype = {
await PlacesSyncUtils.bookmarks.markChangesAsSyncing(changes);
},
async _orderChildren() {
await this._store._orderChildren();
this._store._childrenToOrder = {};
},
- _syncFinish: function _syncFinish() {
- SyncEngine.prototype._syncFinish.call(this);
+ async _syncFinish() {
+ await SyncEngine.prototype._syncFinish.call(this);
this._tracker._ensureMobileQuery();
},
- _syncCleanup: function _syncCleanup() {
- SyncEngine.prototype._syncCleanup.call(this);
+ async _syncCleanup() {
+ await SyncEngine.prototype._syncCleanup.call(this);
delete this._guidMap;
},
- _createRecord: function _createRecord(id) {
+ async _createRecord(id) {
if (this._modified.isTombstone(id)) {
// If we already know a changed item is a tombstone, just create the
// record without dipping into Places.
return this._createTombstone(id);
}
// Create the record as usual, but mark it as having dupes if necessary.
- let record = SyncEngine.prototype._createRecord.call(this, id);
- let entry = this._mapDupe(record);
+ let record = await SyncEngine.prototype._createRecord.call(this, id);
+ let entry = await this._mapDupe(record);
if (entry != null && entry.hasDupe) {
record.hasDupe = true;
}
if (record.deleted) {
// Make sure deleted items are marked as tombstones. We do this here
// in addition to the `isTombstone` call above because it's possible
// a changed bookmark might be deleted during a sync (bug 1313967).
this._modified.setTombstone(record.id);
}
return record;
},
- buildWeakReuploadMap(idSet) {
+ async buildWeakReuploadMap(idSet) {
// We want to avoid uploading records which have changed, since that could
// cause an inconsistent state on the server.
//
// Strictly speaking, it would be correct to just call getChangedIds() after
// building the initial weak reupload map, however this is quite slow, since
// we might end up doing createRecord() (which runs at least one, and
// sometimes multiple database queries) for a potentially large number of
// items.
//
// Since the call to getChangedIds is relatively cheap, we do it once before
// building the weakReuploadMap (which is where the calls to createRecord()
// occur) as an optimization, and once after for correctness, to handle the
// unlikely case that a record was modified while we were building the map.
- let initialChanges = Async.promiseSpinningly(PlacesSyncUtils.bookmarks.getChangedIds());
+ let initialChanges = await PlacesSyncUtils.bookmarks.getChangedIds();
for (let changed of initialChanges) {
idSet.delete(changed);
}
- let map = SyncEngine.prototype.buildWeakReuploadMap.call(this, idSet);
- let changes = Async.promiseSpinningly(PlacesSyncUtils.bookmarks.getChangedIds());
+ let map = await SyncEngine.prototype.buildWeakReuploadMap.call(this, idSet);
+ let changes = await PlacesSyncUtils.bookmarks.getChangedIds();
for (let id of changes) {
map.delete(id);
}
return map;
},
- _findDupe: function _findDupe(item) {
+ async _findDupe(item) {
this._log.trace("Finding dupe for " + item.id +
" (already duped: " + item.hasDupe + ").");
// Don't bother finding a dupe if the incoming item has duplicates.
if (item.hasDupe) {
this._log.trace(item.id + " already a dupe: not finding one.");
return null;
}
- let mapped = this._mapDupe(item);
+ let mapped = await this._mapDupe(item);
this._log.debug(item.id + " mapped to " + mapped);
// We must return a string, not an object, and the entries in the GUIDMap
// are created via "new String()" making them an object.
return mapped ? mapped.toString() : mapped;
},
- pullAllChanges() {
+ async pullAllChanges() {
return this.pullNewChanges();
},
- pullNewChanges() {
- return Async.promiseSpinningly(this._tracker.promiseChangedIDs());
+ async pullNewChanges() {
+ return this._tracker.promiseChangedIDs();
},
- trackRemainingChanges() {
+ async trackRemainingChanges() {
let changes = this._modified.changes;
- Async.promiseSpinningly(PlacesSyncUtils.bookmarks.pushChanges(changes));
+ await PlacesSyncUtils.bookmarks.pushChanges(changes);
},
_deleteId(id) {
this._noteDeletedId(id);
},
- _resetClient() {
- SyncEngine.prototype._resetClient.call(this);
- Async.promiseSpinningly(PlacesSyncUtils.bookmarks.reset());
+ async _resetClient() {
+ await SyncEngine.prototype._resetClient.call(this);
+ await PlacesSyncUtils.bookmarks.reset();
},
// Called when _findDupe returns a dupe item and the engine has decided to
// switch the existing item to the new incoming item.
- _switchItemToDupe(localDupeGUID, incomingItem) {
- let newChanges = Async.promiseSpinningly(PlacesSyncUtils.bookmarks.dedupe(
- localDupeGUID, incomingItem.id, incomingItem.parentid));
+ async _switchItemToDupe(localDupeGUID, incomingItem) {
+ let newChanges = await PlacesSyncUtils.bookmarks.dedupe(localDupeGUID,
+ incomingItem.id,
+ incomingItem.parentid);
this._modified.insert(newChanges);
},
// Cleans up the Places root, reading list items (ignored in bug 762118,
// removed in bug 1155684), and pinned sites.
_shouldDeleteRemotely(incomingItem) {
return FORBIDDEN_INCOMING_IDS.includes(incomingItem.id) ||
FORBIDDEN_INCOMING_PARENT_IDS.includes(incomingItem.parentid);
@@ -672,32 +662,32 @@ BookmarksEngine.prototype = {
function BookmarksStore(name, engine) {
Store.call(this, name, engine);
this._itemsToDelete = new Set();
}
BookmarksStore.prototype = {
__proto__: Store.prototype,
- itemExists: function BStore_itemExists(id) {
- return this.idForGUID(id) > 0;
+ async itemExists(id) {
+ return (await this.idForGUID(id)) > 0;
},
- applyIncoming: function BStore_applyIncoming(record) {
+ async applyIncoming(record) {
this._log.debug("Applying record " + record.id);
let isSpecial = PlacesSyncUtils.bookmarks.ROOTS.includes(record.id);
if (record.deleted) {
if (isSpecial) {
this._log.warn("Ignoring deletion for special record " + record.id);
return;
}
// Don't bother with pre and post-processing for deletions.
- Store.prototype.applyIncoming.call(this, record);
+ await Store.prototype.applyIncoming.call(this, record);
return;
}
// For special folders we're only interested in child ordering.
if (isSpecial && record.children) {
this._log.debug("Processing special node: " + record.id);
// Reorder children later
this._childrenToOrder[record.id] = record.children;
@@ -714,46 +704,46 @@ BookmarksStore.prototype = {
// Figure out the local id of the parent GUID if available
let parentGUID = record.parentid;
if (!parentGUID) {
throw "Record " + record.id + " has invalid parentid: " + parentGUID;
}
this._log.debug("Remote parent is " + parentGUID);
// Do the normal processing of incoming records
- Store.prototype.applyIncoming.call(this, record);
+ await Store.prototype.applyIncoming.call(this, record);
if (record.type == "folder" && record.children) {
this._childrenToOrder[record.id] = record.children;
}
},
- create: function BStore_create(record) {
+ async create(record) {
let info = record.toSyncBookmark();
// This can throw if we're inserting an invalid or incomplete bookmark.
// That's fine; the exception will be caught by `applyIncomingBatch`
// without aborting further processing.
- let item = Async.promiseSpinningly(PlacesSyncUtils.bookmarks.insert(info));
+ let item = await PlacesSyncUtils.bookmarks.insert(info);
if (item) {
this._log.trace(`Created ${item.kind} ${item.syncId} under ${
item.parentSyncId}`, item);
if (item.dateAdded != record.dateAdded) {
this.engine._needWeakReupload.add(item.syncId);
}
}
},
- remove: function BStore_remove(record) {
+ async remove(record) {
this._log.trace(`Buffering removal of item "${record.id}".`);
this._itemsToDelete.add(record.id);
},
- update: function BStore_update(record) {
+ async update(record) {
let info = record.toSyncBookmark();
- let item = Async.promiseSpinningly(PlacesSyncUtils.bookmarks.update(info));
+ let item = await PlacesSyncUtils.bookmarks.update(info);
if (item) {
this._log.trace(`Updated ${item.kind} ${item.syncId} under ${
item.parentSyncId}`, item);
if (item.dateAdded != record.dateAdded) {
this.engine._needWeakReupload.add(item.syncId);
}
}
},
@@ -807,79 +797,80 @@ BookmarksStore.prototype = {
return guidsToUpdate;
},
clearPendingDeletions() {
this._itemsToDelete.clear();
},
// Create a record starting from the weave id (places guid)
- createRecord: function createRecord(id, collection) {
- let item = Async.promiseSpinningly(PlacesSyncUtils.bookmarks.fetch(id));
+ async createRecord(id, collection) {
+ let item = await PlacesSyncUtils.bookmarks.fetch(id);
if (!item) { // deleted item
let record = new PlacesItem(collection, id);
record.deleted = true;
return record;
}
let recordObj = getTypeObject(item.kind);
if (!recordObj) {
this._log.warn("Unknown item type, cannot serialize: " + item.kind);
recordObj = PlacesItem;
}
let record = new recordObj(collection, id);
record.fromSyncBookmark(item);
- record.sortindex = this._calculateIndex(record);
+ record.sortindex = await this._calculateIndex(record);
return record;
},
- GUIDForId: function GUIDForId(id) {
- let guid = Async.promiseSpinningly(PlacesUtils.promiseItemGuid(id));
+ async GUIDForId(id) {
+ let guid = await PlacesUtils.promiseItemGuid(id);
return PlacesSyncUtils.bookmarks.guidToSyncId(guid);
},
- idForGUID: function idForGUID(guid) {
+ async idForGUID(guid) {
// guid might be a String object rather than a string.
guid = PlacesSyncUtils.bookmarks.syncIdToGuid(guid.toString());
- return Async.promiseSpinningly(PlacesUtils.promiseItemId(guid).catch(
- ex => -1));
+ try {
+ return await PlacesUtils.promiseItemId(guid);
+ } catch (ex) {
+ return -1;
+ }
},
- _calculateIndex: function _calculateIndex(record) {
+ async _calculateIndex(record) {
// Ensure folders have a very high sort index so they're not synced last.
if (record.type == "folder")
return FOLDER_SORTINDEX;
// For anything directly under the toolbar, give it a boost of more than an
// unvisited bookmark
let index = 0;
if (record.parentid == "toolbar")
index += 150;
// Add in the bookmark's frecency if we have something.
if (record.bmkUri != null) {
- let frecency = Async.promiseSpinningly(PlacesSyncUtils.history.fetchURLFrecency(record.bmkUri));
+ let frecency = await PlacesSyncUtils.history.fetchURLFrecency(record.bmkUri);
if (frecency != -1)
index += frecency;
}
return index;
},
- wipe: function BStore_wipe() {
+ async wipe() {
this.clearPendingDeletions();
- Async.promiseSpinningly((async () => {
- // Save a backup before clearing out all bookmarks.
- await PlacesBackups.create(null, true);
- await PlacesSyncUtils.bookmarks.wipe();
- })());
+ // Save a backup before clearing out all bookmarks.
+ await PlacesBackups.create(null, true);
+ await PlacesSyncUtils.bookmarks.wipe();
}
};
// The bookmarks tracker is a special flower. Instead of listening for changes
// via observer notifications, it queries Places for the set of items that have
// changed since the last sync. Because it's a "pull-based" tracker, it ignores
// all concepts of "add a changed ID." However, it still registers an observer
// to bump the score, so that changed bookmarks are synced immediately.
@@ -1002,20 +993,22 @@ BookmarksTracker.prototype = {
break;
case "bookmarks-restore-begin":
this._log.debug("Ignoring changes from importing bookmarks.");
break;
case "bookmarks-restore-success":
this._log.debug("Tracking all items on successful import.");
this._log.debug("Restore succeeded: wiping server and other clients.");
- this.engine.service.resetClient([this.name]);
- this.engine.service.wipeServer([this.name]);
- this.engine.service.clientsEngine.sendCommand("wipeEngine", [this.name],
- null, { reason: "bookmark-restore" });
+ Async.promiseSpinningly((async () => {
+ await this.engine.service.resetClient([this.name]);
+ await this.engine.service.wipeServer([this.name]);
+ await this.engine.service.clientsEngine.sendCommand("wipeEngine", [this.name],
+ null, { reason: "bookmark-restore" });
+ })());
break;
case "bookmarks-restore-failed":
this._log.debug("Tracking all items on failed import.");
break;
}
},
QueryInterface: XPCOMUtils.generateQI([
--- a/services/sync/modules/engines/clients.js
+++ b/services/sync/modules/engines/clients.js
@@ -220,150 +220,148 @@ ClientEngine.prototype = {
},
isMobile: function isMobile(id) {
if (this._store._remoteClients[id])
return this._store._remoteClients[id].type == DEVICE_TYPE_MOBILE;
return false;
},
- _readCommands() {
- let cb = Async.makeSpinningCallback();
- Utils.jsonLoad("commands", this, commands => cb(null, commands));
- return cb.wait() || {};
+ async _readCommands() {
+ let commands = await Utils.jsonLoad("commands", this);
+ return commands || {};
},
/**
* Low level function, do not use directly (use _addClientCommand instead).
*/
- _saveCommands(commands) {
- let cb = Async.makeSpinningCallback();
- Utils.jsonSave("commands", this, commands, error => {
- if (error) {
- this._log.error("Failed to save JSON outgoing commands", error);
- }
- cb();
- });
- cb.wait();
+ async _saveCommands(commands) {
+ try {
+ await Utils.jsonSave("commands", this, commands);
+ } catch (error) {
+ this._log.error("Failed to save JSON outgoing commands", error);
+ }
},
- _prepareCommandsForUpload() {
- let cb = Async.makeSpinningCallback();
- Utils.jsonMove("commands", "commands-syncing", this).catch(() => {}) // Ignore errors
- .then(() => {
- Utils.jsonLoad("commands-syncing", this, commands => cb(null, commands));
- });
- return cb.wait() || {};
+ async _prepareCommandsForUpload() {
+ try {
+ await Utils.jsonMove("commands", "commands-syncing", this)
+ } catch (e) {
+ // Ignore errors
+ }
+ let commands = await Utils.jsonLoad("commands-syncing", this);
+ return commands || {};
},
- _deleteUploadedCommands() {
+ async _deleteUploadedCommands() {
delete this._currentlySyncingCommands;
- Async.promiseSpinningly(
- Utils.jsonRemove("commands-syncing", this).catch(err => {
- this._log.error("Failed to delete syncing-commands file", err);
- })
- );
+ try {
+ await Utils.jsonRemove("commands-syncing", this);
+ } catch (err) {
+ this._log.error("Failed to delete syncing-commands file", err);
+ }
},
// Gets commands for a client we are yet to write to the server. Doesn't
// include commands for that client which are already on the server.
// We should rename this!
- getClientCommands(clientId) {
- const allCommands = this._readCommands();
+ async getClientCommands(clientId) {
+ const allCommands = await this._readCommands();
return allCommands[clientId] || [];
},
- removeLocalCommand(command) {
+ async removeLocalCommand(command) {
// the implementation of this engine is such that adding a command to
// the local client is how commands are deleted! ¯\_(ツ)_/¯
- this._addClientCommand(this.localID, command);
+ await this._addClientCommand(this.localID, command);
},
- _addClientCommand(clientId, command) {
- const localCommands = this._readCommands();
+ async _addClientCommand(clientId, command) {
+ const localCommands = await this._readCommands();
const localClientCommands = localCommands[clientId] || [];
const remoteClient = this._store._remoteClients[clientId];
let remoteClientCommands = []
if (remoteClient && remoteClient.commands) {
remoteClientCommands = remoteClient.commands;
}
const clientCommands = localClientCommands.concat(remoteClientCommands);
if (hasDupeCommand(clientCommands, command)) {
return false;
}
localCommands[clientId] = localClientCommands.concat(command);
- this._saveCommands(localCommands);
+ await this._saveCommands(localCommands);
return true;
},
- _removeClientCommands(clientId) {
- const allCommands = this._readCommands();
+ async _removeClientCommands(clientId) {
+ const allCommands = await this._readCommands();
delete allCommands[clientId];
- this._saveCommands(allCommands);
+ await this._saveCommands(allCommands);
},
- updateKnownStaleClients() {
+ async updateKnownStaleClients() {
this._log.debug("Updating the known stale clients");
- this._refreshKnownStaleClients();
+ await this._refreshKnownStaleClients();
for (let client of Object.values(this._store._remoteClients)) {
if (client.fxaDeviceId && this._knownStaleFxADeviceIds.includes(client.fxaDeviceId)) {
this._log.info(`Hiding stale client ${client.id} - in known stale clients list`);
client.stale = true;
}
}
},
// We assume that clients not present in the FxA Device Manager list have been
// disconnected and so are stale
- _refreshKnownStaleClients() {
+ async _refreshKnownStaleClients() {
this._log.debug("Refreshing the known stale clients list");
let localClients = Object.values(this._store._remoteClients)
.filter(client => client.fxaDeviceId) // iOS client records don't have fxaDeviceId
.map(client => client.fxaDeviceId);
let fxaClients;
try {
- fxaClients = Async.promiseSpinningly(this.fxAccounts.getDeviceList()).map(device => device.id);
+ let deviceList = await this.fxAccounts.getDeviceList();
+ fxaClients = deviceList.map(device => device.id);
} catch (ex) {
this._log.error("Could not retrieve the FxA device list", ex);
this._knownStaleFxADeviceIds = [];
return;
}
this._knownStaleFxADeviceIds = Utils.arraySub(localClients, fxaClients);
},
- _syncStartup() {
+ async _syncStartup() {
this.isFirstSync = !this.lastRecordUpload;
// Reupload new client record periodically.
if (Date.now() / 1000 - this.lastRecordUpload > CLIENTS_TTL_REFRESH) {
this._tracker.addChangedID(this.localID);
this.lastRecordUpload = Date.now() / 1000;
}
- SyncEngine.prototype._syncStartup.call(this);
+ return SyncEngine.prototype._syncStartup.call(this);
},
- _processIncoming() {
+ async _processIncoming() {
// Fetch all records from the server.
this.lastSync = 0;
this._incomingClients = {};
try {
- SyncEngine.prototype._processIncoming.call(this);
+ await SyncEngine.prototype._processIncoming.call(this);
// Refresh the known stale clients list at startup and when we receive
// "device connected/disconnected" push notifications.
if (!this._knownStaleFxADeviceIds) {
- this._refreshKnownStaleClients();
+ await this._refreshKnownStaleClients();
}
// Since clients are synced unconditionally, any records in the local store
// that don't exist on the server must be for disconnected clients. Remove
// them, so that we don't upload records with commands for clients that will
// never see them. We also do this to filter out stale clients from the
// tabs collection, since showing their list of tabs is confusing.
for (let id in this._store._remoteClients) {
if (!this._incomingClients[id]) {
this._log.info(`Removing local state for deleted client ${id}`);
- this._removeRemoteClient(id);
+ await this._removeRemoteClient(id);
}
}
// Bug 1264498: Mobile clients don't remove themselves from the clients
// collection when the user disconnects Sync, so we mark as stale clients
// with the same name that haven't synced in over a week.
// (Note we can't simply delete them, or we re-apply them next sync - see
// bug 1287687)
delete this._incomingClients[this.localID];
@@ -386,35 +384,35 @@ ClientEngine.prototype = {
record.stale = true;
}
}
} finally {
this._incomingClients = null;
}
},
- _uploadOutgoing() {
- this._currentlySyncingCommands = this._prepareCommandsForUpload();
+ async _uploadOutgoing() {
+ this._currentlySyncingCommands = await this._prepareCommandsForUpload();
const clientWithPendingCommands = Object.keys(this._currentlySyncingCommands);
for (let clientId of clientWithPendingCommands) {
if (this._store._remoteClients[clientId] || this.localID == clientId) {
this._modified.set(clientId, 0);
}
}
let updatedIDs = this._modified.ids();
- SyncEngine.prototype._uploadOutgoing.call(this);
+ await SyncEngine.prototype._uploadOutgoing.call(this);
// Record the response time as the server time for each item we uploaded.
for (let id of updatedIDs) {
if (id != this.localID) {
this._store._remoteClients[id].serverLastModified = this.lastSync;
}
}
},
- _onRecordsWritten(succeeded, failed) {
+ async _onRecordsWritten(succeeded, failed) {
// Reconcile the status of the local records with what we just wrote on the
// server
for (let id of succeeded) {
const commandChanges = this._currentlySyncingCommands[id];
if (id == this.localID) {
if (this.isFirstSync) {
this._log.info("Uploaded our client record for the first time, notifying other clients.");
this._notifyCollectionChanged();
@@ -425,32 +423,32 @@ ClientEngine.prototype = {
} else {
const clientRecord = this._store._remoteClients[id];
if (!commandChanges || !clientRecord) {
// should be impossible, else we wouldn't have been writing it.
this._log.warn("No command/No record changes for a client we uploaded");
continue;
}
// fixup the client record, so our copy of _remoteClients matches what we uploaded.
- this._store._remoteClients[id] = this._store.createRecord(id);
+ this._store._remoteClients[id] = await this._store.createRecord(id);
// we could do better and pass the reference to the record we just uploaded,
// but this will do for now
}
}
// Re-add failed commands
for (let id of failed) {
const commandChanges = this._currentlySyncingCommands[id];
if (!commandChanges) {
continue;
}
- this._addClientCommand(id, commandChanges);
+ await this._addClientCommand(id, commandChanges);
}
- this._deleteUploadedCommands();
+ await this._deleteUploadedCommands();
// Notify other devices that their own client collection changed
const idsToNotify = succeeded.reduce((acc, id) => {
if (id == this.localID) {
return acc;
}
const fxaDeviceId = this.getClientFxaDeviceId(id);
return fxaDeviceId ? acc.concat(fxaDeviceId) : acc;
@@ -475,17 +473,17 @@ ClientEngine.prototype = {
}
try {
await this.fxAccounts.notifyDevices(ids, excludedIds, message, ttl);
} catch (e) {
this._log.error("Could not notify of changes in the collection", e);
}
},
- _syncFinish() {
+ async _syncFinish() {
// Record histograms for our device types, and also write them to a pref
// so non-histogram telemetry (eg, UITelemetry) and the sync scheduler
// has easy access to them, and so they are accurate even before we've
// successfully synced the first time after startup.
for (let [deviceType, count] of this.deviceTypes) {
let hid;
let prefName = this.name + ".devices.";
switch (deviceType) {
@@ -499,64 +497,69 @@ ClientEngine.prototype = {
break;
default:
this._log.warn(`Unexpected deviceType "${deviceType}" recording device telemetry.`);
continue;
}
Services.telemetry.getHistogramById(hid).add(count);
Svc.Prefs.set(prefName, count);
}
- SyncEngine.prototype._syncFinish.call(this);
+ return SyncEngine.prototype._syncFinish.call(this);
},
- _reconcile: function _reconcile(item) {
+ async _reconcile(item) {
// Every incoming record is reconciled, so we use this to track the
// contents of the collection on the server.
this._incomingClients[item.id] = item.modified;
- if (!this._store.itemExists(item.id)) {
+ if (!(await this._store.itemExists(item.id))) {
return true;
}
// Clients are synced unconditionally, so we'll always have new records.
// Unfortunately, this will cause the scheduler to use the immediate sync
// interval for the multi-device case, instead of the active interval. We
// work around this by updating the record during reconciliation, and
// returning false to indicate that the record doesn't need to be applied
// later.
- this._store.update(item);
+ await this._store.update(item);
return false;
},
// Treat reset the same as wiping for locally cached clients
- _resetClient() {
- this._wipeClient();
+ async _resetClient() {
+ await this._wipeClient();
},
- _wipeClient: function _wipeClient() {
- SyncEngine.prototype._resetClient.call(this);
+ async _wipeClient() {
+ await SyncEngine.prototype._resetClient.call(this);
this._knownStaleFxADeviceIds = null;
delete this.localCommands;
- this._store.wipe();
- const logRemoveError = err => this._log.warn("Could not delete json file", err);
- Async.promiseSpinningly(
- Utils.jsonRemove("commands", this).catch(logRemoveError)
- .then(Utils.jsonRemove("commands-syncing", this).catch(logRemoveError))
- );
+ await this._store.wipe();
+ try {
+ await Utils.jsonRemove("commands", this);
+ } catch (err) {
+ this._log.warn("Could not delete commands.json", err);
+ }
+ try {
+ await Utils.jsonRemove("commands-syncing", this)
+ } catch (err) {
+ this._log.warn("Could not delete commands-syncing.json", err);
+ }
},
async removeClientData() {
let res = this.service.resource(this.engineURL + "/" + this.localID);
await res.delete();
},
// Override the default behavior to delete bad records from the server.
- handleHMACMismatch: function handleHMACMismatch(item, mayRetry) {
+ async handleHMACMismatch(item, mayRetry) {
this._log.debug("Handling HMAC mismatch for " + item.id);
- let base = SyncEngine.prototype.handleHMACMismatch.call(this, item, mayRetry);
+ let base = await SyncEngine.prototype.handleHMACMismatch.call(this, item, mayRetry);
if (base != SyncEngine.kRecoveryStrategy.error)
return base;
// It's a bad client record. Save it to be deleted at the end of the sync.
this._log.debug("Bad client record detected. Scheduling for deletion.");
this._deleteId(item.id);
// Neither try again nor error; we're going to delete it.
@@ -581,17 +584,17 @@ ClientEngine.prototype = {
/**
* Sends a command+args pair to a specific client.
*
* @param command Command string
* @param args Array of arguments/data for command
* @param clientId Client to send command to
*/
- _sendCommandToClient(command, args, clientId, telemetryExtra) {
+ async _sendCommandToClient(command, args, clientId, telemetryExtra) {
this._log.trace("Sending " + command + " to " + clientId);
let client = this._store._remoteClients[clientId];
if (!client) {
throw new Error("Unknown remote client ID: '" + clientId + "'.");
}
if (client.stale) {
throw new Error("Stale remote client ID: '" + clientId + "'.");
@@ -600,17 +603,17 @@ ClientEngine.prototype = {
let action = {
command,
args,
// We send the flowID to the other client so *it* can report it in its
// telemetry - we record it in ours below.
flowID: telemetryExtra.flowID,
};
- if (this._addClientCommand(clientId, action)) {
+ if ((await this._addClientCommand(clientId, action))) {
this._log.trace(`Client ${clientId} got a new action`, [command, args]);
this._tracker.addChangedID(clientId);
try {
telemetryExtra.deviceID = this.service.identity.hashedDeviceID(clientId);
} catch (_) {}
this.service.recordTelemetryEvent("sendcommand", command, undefined, telemetryExtra);
} else {
@@ -618,23 +621,23 @@ ClientEngine.prototype = {
}
},
/**
* Check if the local client has any remote commands and perform them.
*
* @return false to abort sync
*/
- processIncomingCommands: function processIncomingCommands() {
- return this._notify("clients:process-commands", "", function() {
+ async processIncomingCommands() {
+ return this._notify("clients:process-commands", "", async function() {
if (!this.localCommands) {
return true;
}
- const clearedCommands = this._readCommands()[this.localID];
+ const clearedCommands = await this._readCommands()[this.localID];
const commands = this.localCommands.filter(command => !hasDupeCommand(clearedCommands, command));
let didRemoveCommand = false;
let URIsToDisplay = [];
// Process each command in order.
for (let rawCommand of commands) {
let shouldRemoveCommand = true; // most commands are auto-removed.
let {command, args, flowID} = rawCommand;
this._log.debug("Processing command " + command, args);
@@ -643,23 +646,23 @@ ClientEngine.prototype = {
{ flowID });
let engines = [args[0]];
switch (command) {
case "resetAll":
engines = null;
// Fallthrough
case "resetEngine":
- this.service.resetClient(engines);
+ await this.service.resetClient(engines);
break;
case "wipeAll":
engines = null;
// Fallthrough
case "wipeEngine":
- this.service.wipeClient(engines);
+ await this.service.wipeClient(engines);
break;
case "logout":
this.service.logout();
return false;
case "displayURI":
let [uri, clientId, title] = args;
URIsToDisplay.push({ uri, clientId, title });
break;
@@ -667,31 +670,31 @@ ClientEngine.prototype = {
// When we send a repair request to another device that understands
// it, that device will send a response indicating what it did.
let response = args[0];
let requestor = getRepairRequestor(response.collection);
if (!requestor) {
this._log.warn("repairResponse for unknown collection", response);
break;
}
- if (!requestor.continueRepairs(response)) {
+ if (!(await requestor.continueRepairs(response))) {
this._log.warn("repairResponse couldn't continue the repair", response);
}
break;
}
case "repairRequest": {
// Another device has sent us a request to make some repair.
let request = args[0];
let responder = getRepairResponder(request.collection);
if (!responder) {
this._log.warn("repairRequest for unknown collection", request);
break;
}
try {
- if (Async.promiseSpinningly(responder.repair(request, rawCommand))) {
+ if ((await responder.repair(request, rawCommand))) {
// We've started a repair - once that collection has synced it
// will write a "response" command and arrange for this repair
// request to be removed from the local command list - if we
// removed it now we might fail to write a response in cases of
// premature shutdown etc.
shouldRemoveCommand = false;
}
} catch (ex) {
@@ -710,17 +713,17 @@ ClientEngine.prototype = {
break;
}
default:
this._log.warn("Received an unknown command: " + command);
break;
}
// Add the command to the "cleared" commands list
if (shouldRemoveCommand) {
- this.removeLocalCommand(rawCommand);
+ await this.removeLocalCommand(rawCommand);
didRemoveCommand = true;
}
}
if (didRemoveCommand) {
this._tracker.addChangedID(this.localID);
}
if (URIsToDisplay.length) {
@@ -732,29 +735,30 @@ ClientEngine.prototype = {
},
/**
* Validates and sends a command to a client or all clients.
*
* Calling this does not actually sync the command data to the server. If the
* client already has the command/args pair, it won't receive a duplicate
* command.
+ * This method is async since it writes the command to a file.
*
* @param command
* Command to invoke on remote clients
* @param args
* Array of arguments to give to the command
* @param clientId
* Client ID to send command to. If undefined, send to all remote
* clients.
* @param flowID
* A unique identifier used to track success for this operation across
* devices.
*/
- sendCommand(command, args, clientId = null, telemetryExtra = {}) {
+ async sendCommand(command, args, clientId = null, telemetryExtra = {}) {
let commandData = this._commands[command];
// Don't send commands that we don't know about.
if (!commandData) {
this._log.error("Unknown command to send: " + command);
return;
} else if (!args || args.length != commandData.args) {
// Don't send a command with the wrong number of arguments.
this._log.error("Expected " + commandData.args + " args for '" +
@@ -764,21 +768,21 @@ ClientEngine.prototype = {
// We allocate a "flowID" here, so it is used for each client.
telemetryExtra = Object.assign({}, telemetryExtra); // don't clobber the caller's object
if (!telemetryExtra.flowID) {
telemetryExtra.flowID = Utils.makeGUID();
}
if (clientId) {
- this._sendCommandToClient(command, args, clientId, telemetryExtra);
+ await this._sendCommandToClient(command, args, clientId, telemetryExtra);
} else {
for (let [id, record] of Object.entries(this._store._remoteClients)) {
if (!record.stale) {
- this._sendCommandToClient(command, args, id, telemetryExtra);
+ await this._sendCommandToClient(command, args, id, telemetryExtra);
}
}
}
},
/**
* Send a URI to another client for display.
*
@@ -791,20 +795,20 @@ ClientEngine.prototype = {
* @param uri
* URI (as a string) to send and display on the remote client
* @param clientId
* ID of client to send the command to. If not defined, will be sent
* to all remote clients.
* @param title
* Title of the page being sent.
*/
- sendURIToClientForDisplay: function sendURIToClientForDisplay(uri, clientId, title) {
+ async sendURIToClientForDisplay(uri, clientId, title) {
this._log.info("Sending URI to client: " + uri + " -> " +
clientId + " (" + title + ")");
- this.sendCommand("displayURI", [uri, this.localID, title], clientId);
+ await this.sendCommand("displayURI", [uri, this.localID, title], clientId);
this._tracker.score += SCORE_INCREMENT_XLARGE;
},
/**
* Handle a bunch of received 'displayURI' commands.
*
* Interested parties should observe the "weave:engine:clients:display-uris"
@@ -826,58 +830,56 @@ ClientEngine.prototype = {
* @param uris[].title
* String title of page that URI corresponds to. Older clients may not
* send this.
*/
_handleDisplayURIs: function _handleDisplayURIs(uris) {
Svc.Obs.notify("weave:engine:clients:display-uris", uris);
},
- _removeRemoteClient(id) {
+ async _removeRemoteClient(id) {
delete this._store._remoteClients[id];
this._tracker.removeChangedID(id);
- this._removeClientCommands(id);
+ await this._removeClientCommands(id);
this._modified.delete(id);
},
};
function ClientStore(name, engine) {
Store.call(this, name, engine);
}
ClientStore.prototype = {
__proto__: Store.prototype,
_remoteClients: {},
- create(record) {
- this.update(record);
+ async create(record) {
+ await this.update(record);
},
- update: function update(record) {
+ async update(record) {
if (record.id == this.engine.localID) {
// Only grab commands from the server; local name/type always wins
this.engine.localCommands = record.commands;
} else {
this._remoteClients[record.id] = record.cleartext;
}
},
- createRecord: function createRecord(id, collection) {
+ async createRecord(id, collection) {
let record = new ClientsRec(collection, id);
const commandsChanges = this.engine._currentlySyncingCommands ?
this.engine._currentlySyncingCommands[id] :
[];
// Package the individual components into a record for the local client
if (id == this.engine.localID) {
- let cb = Async.makeSpinningCallback();
- this.engine.fxAccounts.getDeviceId().then(id => cb(null, id), cb);
try {
- record.fxaDeviceId = cb.wait();
+ record.fxaDeviceId = await this.engine.fxAccounts.getDeviceId();
} catch (error) {
this._log.warn("failed to get fxa device id", error);
}
record.name = this.engine.localName;
record.type = this.engine.localType;
record.version = Services.appinfo.version;
record.protocols = SUPPORTED_PROTOCOL_VERSIONS;
@@ -912,29 +914,29 @@ ClientStore.prototype = {
this._log.error(`Preparing to upload record ${id} that we consider stale`);
delete record.cleartext.stale;
}
}
return record;
},
- itemExists(id) {
- return id in this.getAllIDs();
+ async itemExists(id) {
+ return id in (await this.getAllIDs());
},
- getAllIDs: function getAllIDs() {
+ async getAllIDs() {
let ids = {};
ids[this.engine.localID] = true;
for (let id in this._remoteClients)
ids[id] = true;
return ids;
},
- wipe: function wipe() {
+ async wipe() {
this._remoteClients = {};
},
};
function ClientsTracker(name, engine) {
Tracker.call(this, name, engine);
Svc.Obs.add("weave:engine:start-tracking", this);
Svc.Obs.add("weave:engine:stop-tracking", this);
--- a/services/sync/modules/engines/extension-storage.js
+++ b/services/sync/modules/engines/extension-storage.js
@@ -7,17 +7,16 @@
this.EXPORTED_SYMBOLS = ["ExtensionStorageEngine"];
const {classes: Cc, interfaces: Ci, utils: Cu} = Components;
Cu.import("resource://gre/modules/XPCOMUtils.jsm");
Cu.import("resource://services-sync/constants.js");
Cu.import("resource://services-sync/engines.js");
Cu.import("resource://services-sync/util.js");
-Cu.import("resource://services-common/async.js");
XPCOMUtils.defineLazyModuleGetter(this, "extensionStorageSync",
"resource://gre/modules/ExtensionStorageSync.jsm");
/**
* The Engine that manages syncing for the web extension "storage"
* API, and in particular ext.storage.sync.
*
* ext.storage.sync is implemented using Kinto, so it has mechanisms
@@ -32,18 +31,18 @@ ExtensionStorageEngine.prototype = {
_trackerObj: ExtensionStorageTracker,
// we don't need these since we implement our own sync logic
_storeObj: undefined,
_recordObj: undefined,
syncPriority: 10,
allowSkippedRecord: false,
- _sync() {
- return Async.promiseSpinningly(extensionStorageSync.syncAll());
+ async _sync() {
+ return extensionStorageSync.syncAll();
},
get enabled() {
// By default, we sync extension storage if we sync addons. This
// lets us simplify the UX since users probably don't consider
// "extension preferences" a separate category of syncing.
// However, we also respect engine.extension-storage.force, which
// can be set to true or false, if a power user wants to customize
--- a/services/sync/modules/engines/forms.js
+++ b/services/sync/modules/engines/forms.js
@@ -6,17 +6,16 @@ this.EXPORTED_SYMBOLS = ["FormEngine", "
var Cc = Components.classes;
var Ci = Components.interfaces;
var Cu = Components.utils;
Cu.import("resource://gre/modules/XPCOMUtils.jsm");
Cu.import("resource://services-sync/engines.js");
Cu.import("resource://services-sync/record.js");
-Cu.import("resource://services-common/async.js");
Cu.import("resource://services-sync/util.js");
Cu.import("resource://services-sync/constants.js");
Cu.import("resource://services-sync/collection_validator.js");
Cu.import("resource://gre/modules/Log.jsm");
XPCOMUtils.defineLazyModuleGetter(this, "FormHistory",
"resource://gre/modules/FormHistory.jsm");
const FORMS_TTL = 3 * 365 * 24 * 60 * 60; // Three years in seconds.
@@ -34,77 +33,73 @@ Utils.deferGetSet(FormRec, "cleartext",
var FormWrapper = {
_log: Log.repository.getLogger("Sync.Engine.Forms"),
_getEntryCols: ["fieldname", "value"],
_guidCols: ["guid"],
- _promiseSearch(terms, searchData) {
+ async _search(terms, searchData) {
return new Promise(resolve => {
let results = [];
let callbacks = {
handleResult(result) {
results.push(result);
},
handleCompletion(reason) {
resolve(results);
}
};
FormHistory.search(terms, searchData, callbacks);
})
},
- // Do a "sync" search by spinning the event loop until it completes.
- _searchSpinningly(terms, searchData) {
- return Async.promiseSpinningly(this._promiseSearch(terms, searchData));
- },
-
- _updateSpinningly(changes) {
+ async _update(changes) {
if (!FormHistory.enabled) {
return; // update isn't going to do anything.
}
- let cb = Async.makeSpinningCallback();
- let callbacks = {
- handleCompletion(reason) {
- cb();
- }
- };
- FormHistory.update(changes, callbacks);
- cb.wait();
+ await new Promise(resolve => {
+ let callbacks = {
+ handleCompletion(reason) {
+ resolve();
+ }
+ };
+ FormHistory.update(changes, callbacks);
+ });
},
- getEntry(guid) {
- let results = this._searchSpinningly(this._getEntryCols, {guid});
+ async getEntry(guid) {
+ let results = await this._search(this._getEntryCols, {guid});
if (!results.length) {
return null;
}
return {name: results[0].fieldname, value: results[0].value};
},
- getGUID(name, value) {
+ async getGUID(name, value) {
// Query for the provided entry.
let query = { fieldname: name, value };
- let results = this._searchSpinningly(this._guidCols, query);
+ let results = await this._search(this._guidCols, query);
return results.length ? results[0].guid : null;
},
- hasGUID(guid) {
- // We could probably use a count function here, but searchSpinningly exists...
- return this._searchSpinningly(this._guidCols, {guid}).length != 0;
+ async hasGUID(guid) {
+ // We could probably use a count function here, but search exists...
+ let results = await this._search(this._guidCols, {guid});
+ return results.length != 0;
},
- replaceGUID(oldGUID, newGUID) {
+ async replaceGUID(oldGUID, newGUID) {
let changes = {
op: "update",
guid: oldGUID,
newGuid: newGUID,
}
- this._updateSpinningly(changes);
+ await this._update(changes);
}
};
this.FormEngine = function FormEngine(service) {
SyncEngine.call(this, "Forms", service);
}
FormEngine.prototype = {
@@ -115,107 +110,107 @@ FormEngine.prototype = {
applyIncomingBatchSize: FORMS_STORE_BATCH_SIZE,
syncPriority: 6,
get prefName() {
return "history";
},
- _findDupe: function _findDupe(item) {
+ async _findDupe(item) {
return FormWrapper.getGUID(item.name, item.value);
}
};
function FormStore(name, engine) {
Store.call(this, name, engine);
}
FormStore.prototype = {
__proto__: Store.prototype,
- _processChange(change) {
+ async _processChange(change) {
// If this._changes is defined, then we are applying a batch, so we
// can defer it.
if (this._changes) {
this._changes.push(change);
return;
}
- // Otherwise we must handle the change synchronously, right now.
- FormWrapper._updateSpinningly(change);
+ // Otherwise we must handle the change right now.
+ await FormWrapper._update(change);
},
- applyIncomingBatch(records) {
+ async applyIncomingBatch(records) {
// We collect all the changes to be made then apply them all at once.
this._changes = [];
- let failures = Store.prototype.applyIncomingBatch.call(this, records);
+ let failures = await Store.prototype.applyIncomingBatch.call(this, records);
if (this._changes.length) {
- FormWrapper._updateSpinningly(this._changes);
+ await FormWrapper._update(this._changes);
}
delete this._changes;
return failures;
},
- getAllIDs() {
- let results = FormWrapper._searchSpinningly(["guid"], [])
+ async getAllIDs() {
+ let results = await FormWrapper._search(["guid"], [])
let guids = {};
for (let result of results) {
guids[result.guid] = true;
}
return guids;
},
- changeItemID(oldID, newID) {
- FormWrapper.replaceGUID(oldID, newID);
+ async changeItemID(oldID, newID) {
+ await FormWrapper.replaceGUID(oldID, newID);
},
- itemExists(id) {
+ async itemExists(id) {
return FormWrapper.hasGUID(id);
},
- createRecord(id, collection) {
+ async createRecord(id, collection) {
let record = new FormRec(collection, id);
- let entry = FormWrapper.getEntry(id);
+ let entry = await FormWrapper.getEntry(id);
if (entry != null) {
record.name = entry.name;
record.value = entry.value;
} else {
record.deleted = true;
}
return record;
},
- create(record) {
+ async create(record) {
this._log.trace("Adding form record for " + record.name);
let change = {
op: "add",
fieldname: record.name,
value: record.value
};
- this._processChange(change);
+ await this._processChange(change);
},
- remove(record) {
+ async remove(record) {
this._log.trace("Removing form record: " + record.id);
let change = {
op: "remove",
guid: record.id
};
- this._processChange(change);
+ await this._processChange(change);
},
- update(record) {
+ async update(record) {
this._log.trace("Ignoring form record update request!");
},
- wipe() {
+ async wipe() {
let change = {
op: "remove"
};
- FormWrapper._updateSpinningly(change);
+ await FormWrapper._update(change);
}
};
function FormTracker(name, engine) {
Tracker.call(this, name, engine);
}
FormTracker.prototype = {
__proto__: Tracker.prototype,
@@ -268,40 +263,40 @@ class FormValidator extends CollectionVa
super("forms", "id", ["name", "value"]);
this.ignoresMissingClients = true;
}
emptyProblemData() {
return new FormsProblemData();
}
- getClientItems() {
- return FormWrapper._promiseSearch(["guid", "fieldname", "value"], {});
+ async getClientItems() {
+ return FormWrapper._search(["guid", "fieldname", "value"], {});
}
normalizeClientItem(item) {
return {
id: item.guid,
guid: item.guid,
name: item.fieldname,
fieldname: item.fieldname,
value: item.value,
original: item,
};
}
- normalizeServerItem(item) {
+ async normalizeServerItem(item) {
let res = Object.assign({
guid: item.id,
fieldname: item.name,
original: item,
}, item);
// Missing `name` or `value` causes the getGUID call to throw
if (item.name !== undefined && item.value !== undefined) {
- let guid = FormWrapper.getGUID(item.name, item.value);
+ let guid = await FormWrapper.getGUID(item.name, item.value);
if (guid) {
res.guid = guid;
res.id = guid;
res.duped = true;
}
}
return res;
--- a/services/sync/modules/engines/history.js
+++ b/services/sync/modules/engines/history.js
@@ -41,36 +41,36 @@ HistoryEngine.prototype = {
_recordObj: HistoryRec,
_storeObj: HistoryStore,
_trackerObj: HistoryTracker,
downloadLimit: MAX_HISTORY_DOWNLOAD,
applyIncomingBatchSize: HISTORY_STORE_BATCH_SIZE,
syncPriority: 7,
- _processIncoming(newitems) {
+ async _processIncoming(newitems) {
// We want to notify history observers that a batch operation is underway
// so they don't do lots of work for each incoming record.
let observers = PlacesUtils.history.getObservers();
function notifyHistoryObservers(notification) {
for (let observer of observers) {
try {
observer[notification]();
} catch (ex) { }
}
}
notifyHistoryObservers("onBeginUpdateBatch");
try {
- return SyncEngine.prototype._processIncoming.call(this, newitems);
+ await SyncEngine.prototype._processIncoming.call(this, newitems);
} finally {
notifyHistoryObservers("onEndUpdateBatch");
}
},
- pullNewChanges() {
+ async pullNewChanges() {
let modifiedGUIDs = Object.keys(this._tracker.changedIDs);
if (!modifiedGUIDs.length) {
return {};
}
let db = PlacesUtils.history.QueryInterface(Ci.nsPIPlacesDatabase)
.DBConnection;
@@ -231,35 +231,35 @@ HistoryStore.prototype = {
},
// See bug 468732 for why we use SQL here
_findURLByGUID: function HistStore__findURLByGUID(guid) {
this._urlStm.params.guid = guid;
return Async.querySpinningly(this._urlStm, this._urlCols)[0];
},
- changeItemID: function HStore_changeItemID(oldID, newID) {
+ async changeItemID(oldID, newID) {
this.setGUID(this._findURLByGUID(oldID).url, newID);
},
- getAllIDs: function HistStore_getAllIDs() {
+ async getAllIDs() {
// Only get places visited within the last 30 days (30*24*60*60*1000ms)
this._allUrlStm.params.cutoff_date = (Date.now() - 2592000000) * 1000;
this._allUrlStm.params.max_results = MAX_HISTORY_UPLOAD;
let urls = Async.querySpinningly(this._allUrlStm, this._allUrlCols);
let self = this;
return urls.reduce(function(ids, item) {
ids[self.GUIDForUri(item.url, true)] = item.url;
return ids;
}, {});
},
- applyIncomingBatch: function applyIncomingBatch(records) {
+ async applyIncomingBatch(records) {
let failed = [];
let blockers = [];
// Convert incoming records to mozIPlaceInfo objects. Some records can be
// ignored or handled directly, so we're rewriting the array in-place.
let i, k;
for (i = 0, k = 0; i < records.length; i++) {
let record = records[k] = records[i];
@@ -285,45 +285,32 @@ HistoryStore.prototype = {
}
if (shouldApply) {
k += 1;
}
}
records.length = k; // truncate array
- let handleAsyncOperationsComplete = Async.makeSyncCallback();
-
if (records.length) {
blockers.push(new Promise(resolve => {
let updatePlacesCallback = {
handleResult: function handleResult() {},
handleError: function handleError(resultCode, placeInfo) {
failed.push(placeInfo.guid);
},
handleCompletion: resolve,
};
this._asyncHistory.updatePlaces(records, updatePlacesCallback);
}));
}
- // Since `failed` is updated asynchronously and this function is
- // synchronous, we need to spin-wait until we are sure that all
- // updates to `fail` have completed.
- Promise.all(blockers).then(
- handleAsyncOperationsComplete,
- ex => {
- // In case of error, terminate wait, but make sure that the
- // error is reported nevertheless and still causes test
- // failures.
- handleAsyncOperationsComplete();
- throw ex;
- });
- Async.waitForSyncCallback(handleAsyncOperationsComplete);
- return failed;
+ // failed is updated asynchronously, hence the await on blockers.
+ await Promise.all(blockers);
+ return failed;
},
/**
* Converts a Sync history record to a mozIPlaceInfo.
*
* Throws if an invalid record is encountered (invalid URI, etc.),
* returns true if the record is to be applied, false otherwise
* (no visits to add, etc.),
@@ -399,51 +386,47 @@ HistoryStore.prototype = {
this._log.trace("Ignoring record " + record.id + " with URI "
+ record.uri.spec + ": no visits to add.");
return false;
}
return true;
},
- remove: function HistStore_remove(record) {
+ async remove(record) {
this._log.trace("Removing page: " + record.id);
- return PlacesUtils.history.remove(record.id).then(
- (removed) => {
- if (removed) {
- this._log.trace("Removed page: " + record.id);
- } else {
- this._log.debug("Page already removed: " + record.id);
- }
- });
+ let removed = await PlacesUtils.history.remove(record.id);
+ if (removed) {
+ this._log.trace("Removed page: " + record.id);
+ } else {
+ this._log.debug("Page already removed: " + record.id);
+ }
},
- itemExists: function HistStore_itemExists(id) {
+ async itemExists(id) {
return !!this._findURLByGUID(id);
},
- createRecord: function createRecord(id, collection) {
+ async createRecord(id, collection) {
let foo = this._findURLByGUID(id);
let record = new HistoryRec(collection, id);
if (foo) {
record.histUri = foo.url;
record.title = foo.title;
record.sortindex = foo.frecency;
record.visits = this._getVisits(record.histUri);
} else {
record.deleted = true;
}
return record;
},
- wipe: function HistStore_wipe() {
- let cb = Async.makeSyncCallback();
- PlacesUtils.history.clear().then(result => { cb(null, result) }, err => { cb(err) });
- return Async.waitForSyncCallback(cb);
+ async wipe() {
+ return PlacesUtils.history.clear();
}
};
function HistoryTracker(name, engine) {
Tracker.call(this, name, engine);
}
HistoryTracker.prototype = {
__proto__: Tracker.prototype,
--- a/services/sync/modules/engines/passwords.js
+++ b/services/sync/modules/engines/passwords.js
@@ -68,32 +68,32 @@ PasswordEngine.prototype = {
_storeObj: PasswordStore,
_trackerObj: PasswordTracker,
_recordObj: LoginRec,
applyIncomingBatchSize: PASSWORDS_STORE_BATCH_SIZE,
syncPriority: 2,
- _syncFinish() {
- SyncEngine.prototype._syncFinish.call(this);
+ async _syncFinish() {
+ await SyncEngine.prototype._syncFinish.call(this);
// Delete the Weave credentials from the server once.
if (!Svc.Prefs.get("deletePwdFxA", false)) {
try {
let ids = [];
for (let host of Utils.getSyncCredentialsHosts()) {
for (let info of Services.logins.findLogins({}, host, "", "")) {
ids.push(info.QueryInterface(Components.interfaces.nsILoginMetaInfo).guid);
}
}
if (ids.length) {
let coll = new Collection(this.engineURL, null, this.service);
coll.ids = ids;
- let ret = coll.delete();
+ let ret = await coll.delete();
this._log.debug("Delete result: " + ret);
if (!ret.success && ret.status != 400) {
// A non-400 failure means try again next time.
return;
}
} else {
this._log.debug("Didn't find any passwords to delete");
}
@@ -105,39 +105,40 @@ PasswordEngine.prototype = {
if (Async.isShutdownException(ex)) {
throw ex;
}
this._log.debug("Password deletes failed", ex);
}
}
},
- _findDupe(item) {
+ async _findDupe(item) {
let login = this._store._nsLoginInfoFromRecord(item);
if (!login) {
return null;
}
let logins = Services.logins.findLogins({}, login.hostname, login.formSubmitURL, login.httpRealm);
- this._store._sleep(0); // Yield back to main thread after synchronous operation.
+ await Async.promiseYield(); // Yield back to main thread after synchronous operation.
// Look for existing logins that match the hostname, but ignore the password.
for (let local of logins) {
if (login.matches(local, true) && local instanceof Ci.nsILoginMetaInfo) {
return local.guid;
}
}
return null;
},
- pullAllChanges() {
+ async pullAllChanges() {
let changes = {};
- for (let [id, info] of Object.entries(this._store.getAllIDs())) {
+ let ids = await this._store.getAllIDs();
+ for (let [id, info] of Object.entries(ids)) {
changes[id] = info.timePasswordChanged / 1000;
}
return changes;
}
};
function PasswordStore(name, engine) {
Store.call(this, name, engine);
@@ -181,75 +182,75 @@ PasswordStore.prototype = {
}
if (record.timePasswordChanged) {
info.timePasswordChanged = record.timePasswordChanged;
}
return info;
},
- _getLoginFromGUID(id) {
+ async _getLoginFromGUID(id) {
let prop = this._newPropertyBag();
prop.setPropertyAsAUTF8String("guid", id);
let logins = Services.logins.searchLogins({}, prop);
- this._sleep(0); // Yield back to main thread after synchronous operation.
+ await Async.promiseYield(); // Yield back to main thread after synchronous operation.
if (logins.length > 0) {
this._log.trace(logins.length + " items matching " + id + " found.");
return logins[0];
}
this._log.trace("No items matching " + id + " found. Ignoring");
return null;
},
- getAllIDs() {
+ async getAllIDs() {
let items = {};
let logins = Services.logins.getAllLogins({});
for (let i = 0; i < logins.length; i++) {
// Skip over Weave password/passphrase entries.
let metaInfo = logins[i].QueryInterface(Ci.nsILoginMetaInfo);
if (Utils.getSyncCredentialsHosts().has(metaInfo.hostname)) {
continue;
}
items[metaInfo.guid] = metaInfo;
}
return items;
},
- changeItemID(oldID, newID) {
+ async changeItemID(oldID, newID) {
this._log.trace("Changing item ID: " + oldID + " to " + newID);
- let oldLogin = this._getLoginFromGUID(oldID);
+ let oldLogin = await this._getLoginFromGUID(oldID);
if (!oldLogin) {
this._log.trace("Can't change item ID: item doesn't exist");
return;
}
- if (this._getLoginFromGUID(newID)) {
+ if ((await this._getLoginFromGUID(newID))) {
this._log.trace("Can't change item ID: new ID already in use");
return;
}
let prop = this._newPropertyBag();
prop.setPropertyAsAUTF8String("guid", newID);
Services.logins.modifyLogin(oldLogin, prop);
},
- itemExists(id) {
- return !!this._getLoginFromGUID(id);
+ async itemExists(id) {
+ return !!(await this._getLoginFromGUID(id));
},
- createRecord(id, collection) {
+ async createRecord(id, collection) {
let record = new LoginRec(collection, id);
- let login = this._getLoginFromGUID(id);
+ let login = await this._getLoginFromGUID(id);
if (!login) {
record.deleted = true;
return record;
}
record.hostname = login.hostname;
record.formSubmitURL = login.formSubmitURL;
@@ -262,46 +263,46 @@ PasswordStore.prototype = {
// Optional fields.
login.QueryInterface(Ci.nsILoginMetaInfo);
record.timeCreated = login.timeCreated;
record.timePasswordChanged = login.timePasswordChanged;
return record;
},
- create(record) {
+ async create(record) {
let login = this._nsLoginInfoFromRecord(record);
if (!login) {
return;
}
this._log.debug("Adding login for " + record.hostname);
this._log.trace("httpRealm: " + JSON.stringify(login.httpRealm) + "; " +
"formSubmitURL: " + JSON.stringify(login.formSubmitURL));
try {
Services.logins.addLogin(login);
} catch (ex) {
this._log.debug(`Adding record ${record.id} resulted in exception`, ex);
}
},
- remove(record) {
+ async remove(record) {
this._log.trace("Removing login " + record.id);
- let loginItem = this._getLoginFromGUID(record.id);
+ let loginItem = await this._getLoginFromGUID(record.id);
if (!loginItem) {
this._log.trace("Asked to remove record that doesn't exist, ignoring");
return;
}
Services.logins.removeLogin(loginItem);
},
- update(record) {
- let loginItem = this._getLoginFromGUID(record.id);
+ async update(record) {
+ let loginItem = await this._getLoginFromGUID(record.id);
if (!loginItem) {
this._log.debug("Skipping update for unknown item: " + record.hostname);
return;
}
this._log.debug("Updating " + record.hostname);
let newinfo = this._nsLoginInfoFromRecord(record);
if (!newinfo) {
@@ -310,17 +311,17 @@ PasswordStore.prototype = {
try {
Services.logins.modifyLogin(loginItem, newinfo);
} catch (ex) {
this._log.debug(`Modifying record ${record.id} resulted in exception; not modifying`, ex);
}
},
- wipe() {
+ async wipe() {
Services.logins.removeAllLogins();
},
};
function PasswordTracker(name, engine) {
Tracker.call(this, name, engine);
Svc.Obs.add("weave:engine:start-tracking", this);
Svc.Obs.add("weave:engine:stop-tracking", this);
@@ -419,14 +420,14 @@ class PasswordValidator extends Collecti
password: item.password,
passwordField: item.passwordField,
username: item.username,
usernameField: item.usernameField,
original: item,
}
}
- normalizeServerItem(item) {
+ async normalizeServerItem(item) {
return Object.assign({ guid: item.id }, item);
}
}
--- a/services/sync/modules/engines/prefs.js
+++ b/services/sync/modules/engines/prefs.js
@@ -43,30 +43,30 @@ PrefsEngine.prototype = {
_storeObj: PrefStore,
_trackerObj: PrefTracker,
_recordObj: PrefRec,
version: 2,
syncPriority: 1,
allowSkippedRecord: false,
- getChangedIDs() {
+ async getChangedIDs() {
// No need for a proper timestamp (no conflict resolution needed).
let changedIDs = {};
if (this._tracker.modified)
changedIDs[PREFS_GUID] = 0;
return changedIDs;
},
- _wipeClient() {
- SyncEngine.prototype._wipeClient.call(this);
+ async _wipeClient() {
+ await SyncEngine.prototype._wipeClient.call(this);
this.justWiped = true;
},
- _reconcile(item) {
+ async _reconcile(item) {
// Apply the incoming item if we don't care about the local data
if (this.justWiped) {
this.justWiped = false;
return true;
}
return SyncEngine.prototype._reconcile.call(this, item);
}
};
@@ -160,61 +160,61 @@ PrefStore.prototype = {
}
// Notify the lightweight theme manager if the selected theme has changed.
if (selectedThemeIDBefore != selectedThemeIDAfter) {
this._updateLightWeightTheme(selectedThemeIDAfter);
}
},
- getAllIDs() {
+ async getAllIDs() {
/* We store all prefs in just one WBO, with just one GUID */
let allprefs = {};
allprefs[PREFS_GUID] = true;
return allprefs;
},
- changeItemID(oldID, newID) {
+ async changeItemID(oldID, newID) {
this._log.trace("PrefStore GUID is constant!");
},
- itemExists(id) {
+ async itemExists(id) {
return (id === PREFS_GUID);
},
- createRecord(id, collection) {
+ async createRecord(id, collection) {
let record = new PrefRec(collection, id);
if (id == PREFS_GUID) {
record.value = this._getAllPrefs();
} else {
record.deleted = true;
}
return record;
},
- create(record) {
+ async create(record) {
this._log.trace("Ignoring create request");
},
- remove(record) {
+ async remove(record) {
this._log.trace("Ignoring remove request");
},
- update(record) {
+ async update(record) {
// Silently ignore pref updates that are for other apps.
if (record.id != PREFS_GUID)
return;
this._log.trace("Received pref updates, applying...");
this._setAllPrefs(record.value);
},
- wipe() {
+ async wipe() {
this._log.trace("Ignoring wipe request");
}
};
function PrefTracker(name, engine) {
Tracker.call(this, name, engine);
Svc.Obs.add("profile-before-change", this);
Svc.Obs.add("weave:engine:start-tracking", this);
--- a/services/sync/modules/engines/tabs.js
+++ b/services/sync/modules/engines/tabs.js
@@ -30,53 +30,57 @@ TabSetRecord.prototype = {
ttl: TABS_TTL,
};
Utils.deferGetSet(TabSetRecord, "cleartext", ["clientName", "tabs"]);
this.TabEngine = function TabEngine(service) {
SyncEngine.call(this, "Tabs", service);
-
- // Reset the client on every startup so that we fetch recent tabs.
- this._resetClient();
}
TabEngine.prototype = {
__proto__: SyncEngine.prototype,
_storeObj: TabStore,
_trackerObj: TabTracker,
_recordObj: TabSetRecord,
// A flag to indicate if we have synced in this session. This is to help
// consumers of remote tabs that may want to differentiate between "I've an
// empty tab list as I haven't yet synced" vs "I've an empty tab list
// as there really are no tabs"
hasSyncedThisSession: false,
syncPriority: 3,
- getChangedIDs() {
+ async initialize() {
+ await SyncEngine.prototype.initialize.call(this);
+
+ // Reset the client on every startup so that we fetch recent tabs.
+ await this._resetClient();
+ },
+
+ async getChangedIDs() {
// No need for a proper timestamp (no conflict resolution needed).
let changedIDs = {};
if (this._tracker.modified)
changedIDs[this.service.clientsEngine.localID] = 0;
return changedIDs;
},
// API for use by Sync UI code to give user choices of tabs to open.
getAllClients() {
return this._store._remoteClients;
},
getClientById(id) {
return this._store._remoteClients[id];
},
- _resetClient() {
- SyncEngine.prototype._resetClient.call(this);
- this._store.wipe();
+ async _resetClient() {
+ await SyncEngine.prototype._resetClient.call(this);
+ await this._store.wipe();
this._tracker.modified = true;
this.hasSyncedThisSession = false;
},
async removeClientData() {
let url = this.engineURL + "/" + this.service.clientsEngine.localID;
await this.service.resource(url).delete();
},
@@ -87,41 +91,41 @@ TabEngine.prototype = {
getOpenURLs() {
let urls = new Set();
for (let entry of this._store.getAllTabs()) {
urls.add(entry.urlHistory[0]);
}
return urls;
},
- _reconcile(item) {
+ async _reconcile(item) {
// Skip our own record.
// TabStore.itemExists tests only against our local client ID.
- if (this._store.itemExists(item.id)) {
+ if ((await this._store.itemExists(item.id))) {
this._log.trace("Ignoring incoming tab item because of its id: " + item.id);
return false;
}
return SyncEngine.prototype._reconcile.call(this, item);
},
- _syncFinish() {
+ async _syncFinish() {
this.hasSyncedThisSession = true;
return SyncEngine.prototype._syncFinish.call(this);
},
};
function TabStore(name, engine) {
Store.call(this, name, engine);
}
TabStore.prototype = {
__proto__: Store.prototype,
- itemExists(id) {
+ async itemExists(id) {
return id == this.engine.service.clientsEngine.localID;
},
getWindowEnumerator() {
return Services.wm.getEnumerator("navigator:browser");
},
shouldSkipWindow(win) {
@@ -196,17 +200,17 @@ TabStore.prototype = {
lastUsed: Math.floor((tabState.lastAccessed || 0) / 1000),
});
}
}
return allTabs;
},
- createRecord(id, collection) {
+ async createRecord(id, collection) {
let record = new TabSetRecord(collection, id);
record.clientName = this.engine.service.clientsEngine.localName;
// Sort tabs in descending-used order to grab the most recently used
let tabs = this.getAllTabs(true).sort(function(a, b) {
return b.lastUsed - a.lastUsed;
});
@@ -230,17 +234,17 @@ TabStore.prototype = {
tabs.forEach(function(tab) {
this._log.trace("Wrapping tab: " + JSON.stringify(tab));
}, this);
record.tabs = tabs;
return record;
},
- getAllIDs() {
+ async getAllIDs() {
// Don't report any tabs if all windows are in private browsing for
// first syncs.
let ids = {};
let allWindowsArePrivate = false;
let wins = Services.wm.getEnumerator("navigator:browser");
while (wins.hasMoreElements()) {
if (PrivateBrowsingUtils.isWindowPrivate(wins.getNext())) {
// Ensure that at least there is a private window.
@@ -256,28 +260,28 @@ TabStore.prototype = {
!PrivateBrowsingUtils.permanentPrivateBrowsing) {
return ids;
}
ids[this.engine.service.clientsEngine.localID] = true;
return ids;
},
- wipe() {
+ async wipe() {
this._remoteClients = {};
},
- create(record) {
+ async create(record) {
this._log.debug("Adding remote tabs from " + record.clientName);
this._remoteClients[record.id] = Object.assign({}, record.cleartext, {
lastModified: record.modified
});
},
- update(record) {
+ async update(record) {
this._log.trace("Ignoring tab updates as local ones win");
},
};
function TabTracker(name, engine) {
Tracker.call(this, name, engine);
Svc.Obs.add("weave:engine:start-tracking", this);
--- a/services/sync/modules/policies.js
+++ b/services/sync/modules/policies.js
@@ -733,33 +733,31 @@ ErrorHandler.prototype = {
*/
syncAndReportErrors: function syncAndReportErrors() {
this._log.debug("Beginning user-triggered sync.");
this.dontIgnoreErrors = true;
Utils.nextTick(this.service.sync, this.service);
},
- _dumpAddons: function _dumpAddons() {
+ async _dumpAddons() {
// Just dump the items that sync may be concerned with. Specifically,
// active extensions that are not hidden.
- let addonPromise = Promise.resolve([]);
+ let addons = [];
try {
- addonPromise = AddonManager.getAddonsByTypes(["extension"]);
+ addons = await AddonManager.getAddonsByTypes(["extension"]);
} catch (e) {
this._log.warn("Failed to dump addons", e)
}
- return addonPromise.then(addons => {
- let relevantAddons = addons.filter(x => x.isActive && !x.hidden);
- this._log.debug("Addons installed", relevantAddons.length);
- for (let addon of relevantAddons) {
- this._log.debug(" - ${name}, version ${version}, id ${id}", addon);
- }
- });
+ let relevantAddons = addons.filter(x => x.isActive && !x.hidden);
+ this._log.debug("Addons installed", relevantAddons.length);
+ for (let addon of relevantAddons) {
+ this._log.debug(" - ${name}, version ${version}, id ${id}", addon);
+ }
},
/**
* Generate a log file for the sync that just completed
* and refresh the input & output streams.
*/
resetFileLog: function resetFileLog() {
let onComplete = logType => {
--- a/services/sync/modules/record.js
+++ b/services/sync/modules/record.js
@@ -869,17 +869,17 @@ function PostQueue(poster, timestamp, co
// returned from the server.
this.batchID = undefined;
// Time used for X-If-Unmodified-Since -- should be the timestamp from the last GET.
this.lastModified = timestamp;
}
PostQueue.prototype = {
- enqueue(record) {
+ async enqueue(record) {
// We want to ensure the record has a .toJSON() method defined - even
// though JSON.stringify() would implicitly call it, the stringify might
// still work even if it isn't defined, which isn't what we want.
let jsonRepr = record.toJSON();
if (!jsonRepr) {
throw new Error("You must only call this with objects that explicitly support JSON");
}
let bytes = JSON.stringify(jsonRepr);
@@ -906,27 +906,27 @@ PostQueue.prototype = {
return { enqueued: false, error: new Error("Single record too large to submit to server") };
}
// We need to write the queue out before handling this one, but we only
// commit the batch (and thus start a new one) if the batch is full.
// Note that if a single record is too big for the batch or post, then
// the batch may be empty, and so we don't flush in that case.
if (this.numQueued) {
- this.flush(batchSizeExceeded || singleRecordTooBig);
+ await this.flush(batchSizeExceeded || singleRecordTooBig);
}
}
// Either a ',' or a '[' depending on whether this is the first record.
this.queued += this.numQueued ? "," : "[";
this.queued += bytes;
this.numQueued++;
return { enqueued: true };
},
- flush(finalBatchPost) {
+ async flush(finalBatchPost) {
if (!this.queued) {
// nothing queued - we can't be in a batch, and something has gone very
// bad if we think we are.
if (this.batchID) {
throw new Error(`Flush called when no queued records but we are in a batch ${this.batchID}`);
}
return;
}
@@ -952,42 +952,41 @@ PostQueue.prototype = {
this.bytesAlreadyBatched = 0;
this.numAlreadyBatched = 0;
} else {
this.bytesAlreadyBatched += queued.length;
this.numAlreadyBatched += this.numQueued;
}
this.queued = "";
this.numQueued = 0;
- let response = Async.promiseSpinningly(
- this.poster(queued, headers, batch, !!(finalBatchPost && this.batchID !== null)));
+ let response = await this.poster(queued, headers, batch, !!(finalBatchPost && this.batchID !== null));
if (!response.success) {
this.log.trace("Server error response during a batch", response);
// not clear what we should do here - we expect the consumer of this to
// abort by throwing in the postCallback below.
- this.postCallback(response, !finalBatchPost);
+ await this.postCallback(response, !finalBatchPost);
return;
}
if (finalBatchPost) {
this.log.trace("Committed batch", this.batchID);
this.batchID = undefined; // we are now in "first post for the batch" state.
this.lastModified = response.headers["x-last-modified"];
- this.postCallback(response, false);
+ await this.postCallback(response, false);
return;
}
if (response.status != 202) {
if (this.batchID) {
throw new Error("Server responded non-202 success code while a batch was in progress");
}
this.batchID = null; // no batch semantics are in place.
this.lastModified = response.headers["x-last-modified"];
- this.postCallback(response, false);
+ await this.postCallback(response, false);
return;
}
// this response is saying the server has batch semantics - we should
// always have a batch ID in the response.
let responseBatchID = response.obj.batch;
this.log.trace("Server responsed 202 with batch", responseBatchID);
if (!responseBatchID) {
@@ -1004,11 +1003,11 @@ PostQueue.prototype = {
}
}
}
if (this.batchID != responseBatchID) {
throw new Error(`Invalid client/server batch state - client has ${this.batchID}, server has ${responseBatchID}`);
}
- this.postCallback(response, true);
+ await this.postCallback(response, true);
},
}
--- a/services/sync/modules/service.js
+++ b/services/sync/modules/service.js
@@ -105,17 +105,17 @@ Sync11Service.prototype = {
},
unlock: function unlock() {
this._locked = false;
},
// A specialized variant of Utils.catch.
// This provides a more informative error message when we're already syncing:
// see Bug 616568.
- _catch: function _catch(func) {
+ _catch(func) {
function lockExceptions(ex) {
if (Utils.isLockException(ex)) {
// This only happens if we're syncing already.
this._log.info("Cannot start sync: already syncing?");
}
}
return Utils.catch.call(this, func, lockExceptions);
@@ -186,103 +186,103 @@ Sync11Service.prototype = {
* thrashing doesn't solve anything. We keep a reasonable interval between
* these remedial actions.
*/
lastHMACEvent: 0,
/*
* Returns whether to try again.
*/
- handleHMACEvent: function handleHMACEvent() {
+ async handleHMACEvent() {
let now = Date.now();
// Leave a sizable delay between HMAC recovery attempts. This gives us
// time for another client to fix themselves if we touch the record.
if ((now - this.lastHMACEvent) < HMAC_EVENT_INTERVAL)
return false;
this._log.info("Bad HMAC event detected. Attempting recovery " +
"or signaling to other clients.");
// Set the last handled time so that we don't act again.
this.lastHMACEvent = now;
// Fetch keys.
let cryptoKeys = new CryptoWrapper(CRYPTO_COLLECTION, KEYS_WBO);
try {
- let cryptoResp = Async.promiseSpinningly(cryptoKeys.fetch(this.resource(this.cryptoKeysURL))).response;
+ let cryptoResp = (await cryptoKeys.fetch(this.resource(this.cryptoKeysURL))).response;
// Save out the ciphertext for when we reupload. If there's a bug in
// CollectionKeyManager, this will prevent us from uploading junk.
let cipherText = cryptoKeys.ciphertext;
if (!cryptoResp.success) {
this._log.warn("Failed to download keys.");
return false;
}
- let keysChanged = this.handleFetchedKeys(this.identity.syncKeyBundle,
- cryptoKeys, true);
+ let keysChanged = await this.handleFetchedKeys(this.identity.syncKeyBundle,
+ cryptoKeys, true);
if (keysChanged) {
// Did they change? If so, carry on.
this._log.info("Suggesting retry.");
return true; // Try again.
}
// If not, reupload them and continue the current sync.
cryptoKeys.ciphertext = cipherText;
cryptoKeys.cleartext = null;
- let uploadResp = this._uploadCryptoKeys(cryptoKeys, cryptoResp.obj.modified);
+ let uploadResp = await this._uploadCryptoKeys(cryptoKeys, cryptoResp.obj.modified);
if (uploadResp.success) {
this._log.info("Successfully re-uploaded keys. Continuing sync.");
} else {
this._log.warn("Got error response re-uploading keys. " +
"Continuing sync; let's try again later.");
}
return false; // Don't try again: same keys.
} catch (ex) {
this._log.warn("Got exception \"" + ex + "\" fetching and handling " +
"crypto keys. Will try again later.");
return false;
}
},
- handleFetchedKeys: function handleFetchedKeys(syncKey, cryptoKeys, skipReset) {
+ async handleFetchedKeys(syncKey, cryptoKeys, skipReset) {
// Don't want to wipe if we're just starting up!
let wasBlank = this.collectionKeys.isClear;
let keysChanged = this.collectionKeys.updateContents(syncKey, cryptoKeys);
if (keysChanged && !wasBlank) {
this._log.debug("Keys changed: " + JSON.stringify(keysChanged));
if (!skipReset) {
this._log.info("Resetting client to reflect key change.");
if (keysChanged.length) {
// Collection keys only. Reset individual engines.
- this.resetClient(keysChanged);
+ await this.resetClient(keysChanged);
} else {
// Default key changed: wipe it all.
- this.resetClient();
+ await this.resetClient();
}
this._log.info("Downloaded new keys, client reset. Proceeding.");
}
return true;
}
return false;
},
/**
* Prepare to initialize the rest of Weave after waiting a little bit
*/
- onStartup: function onStartup() {
+ async onStartup() {
this.status = Status;
this.identity = Status._authManager;
this.collectionKeys = new CollectionKeyManager();
this.errorHandler = new ErrorHandler(this);
this._log = Log.repository.getLogger("Sync.Service");
this._log.level =
@@ -290,17 +290,17 @@ Sync11Service.prototype = {
this._log.info("Loading Weave " + WEAVE_VERSION);
this._clusterManager = this.identity.createClusterManager(this);
this.recordManager = new RecordManager(this);
this.enabled = true;
- this._registerEngines();
+ await this._registerEngines();
let ua = Cc["@mozilla.org/network/protocol;1?name=http"].
getService(Ci.nsIHttpProtocolHandler).userAgent;
this._log.info(ua);
if (!this._checkCrypto()) {
this.enabled = false;
this._log.info("Could not load the Weave crypto component. Disabling " +
@@ -347,17 +347,17 @@ Sync11Service.prototype = {
return this.status.service = STATUS_DISABLED;
}
return this.status.checkSetup();
},
/**
* Register the built-in engines for certain applications
*/
- _registerEngines: function _registerEngines() {
+ async _registerEngines() {
this.engineManager = new EngineManager(this);
let engines = [];
// We allow a pref, which has no default value, to limit the engines
// which are registered. We expect only tests will use this.
if (Svc.Prefs.has("registerEngines")) {
engines = Svc.Prefs.get("registerEngines").split(",");
this._log.info("Registering custom set of engines", engines);
@@ -367,17 +367,21 @@ Sync11Service.prototype = {
}
let declined = [];
let pref = Svc.Prefs.get("declinedEngines");
if (pref) {
declined = pref.split(",");
}
- this.clientsEngine = new ClientEngine(this);
+ let clientsEngine = new ClientEngine(this);
+ // Ideally clientsEngine should not exist
+ // (or be a promise that calls initialize() before returning the engine)
+ await clientsEngine.initialize();
+ this.clientsEngine = clientsEngine;
for (let name of engines) {
if (!(name in ENGINE_MODULES)) {
this._log.info("Do not know about engine: " + name);
continue;
}
let {module, symbol} = ENGINE_MODULES[name];
if (!module.includes(":")) {
@@ -386,17 +390,17 @@ Sync11Service.prototype = {
let ns = {};
try {
Cu.import(module, ns);
if (!(symbol in ns)) {
this._log.warn("Could not find exported engine instance: " + symbol);
continue;
}
- this.engineManager.register(ns[symbol]);
+ await this.engineManager.register(ns[symbol]);
} catch (ex) {
this._log.warn("Could not register engine " + name, ex);
}
}
this.engineManager.setDeclined(declined);
},
@@ -409,23 +413,28 @@ Sync11Service.prototype = {
switch (topic) {
// Ideally this observer should be in the SyncScheduler, but it would require
// some work to know about the sync specific engines. We should move this there once it does.
case "sync:collection_changed":
// We check if we're running TPS here to avoid TPS failing because it
// couldn't get to get the sync lock, due to us currently syncing the
// clients engine.
if (data.includes("clients") && !Svc.Prefs.get("testing.tps", false)) {
- this.sync([]); // [] = clients collection only
+ // Sync in the background (it's fine not to wait on the returned promise
+ // because sync() has a lock).
+ // [] = clients collection only
+ this.sync([]).catch(e => {
+ this._log.error(e);
+ });
}
break;
case "fxaccounts:device_disconnected":
data = JSON.parse(data);
if (!data.isLocalDevice) {
- this.clientsEngine.updateKnownStaleClients();
+ Async.promiseSpinningly(this.clientsEngine.updateKnownStaleClients());
}
break;
case "weave:service:setup-complete":
let status = this._checkSetup();
if (status != STATUS_DISABLED && status != CLIENT_NOT_CONFIGURED)
Svc.Obs.notify("weave:engine:start-tracking");
break;
case "nsPref:changed":
@@ -467,51 +476,51 @@ Sync11Service.prototype = {
return request;
},
/**
* Perform the info fetch as part of a login or key fetch, or
* inside engine sync.
*/
- _fetchInfo(url) {
+ async _fetchInfo(url) {
let infoURL = url || this.infoURL;
this._log.trace("In _fetchInfo: " + infoURL);
let info;
try {
- info = Async.promiseSpinningly(this.resource(infoURL).get());
+ info = await this.resource(infoURL).get();
} catch (ex) {
this.errorHandler.checkServerError(ex);
throw ex;
}
// Always check for errors; this is also where we look for X-Weave-Alert.
this.errorHandler.checkServerError(info);
if (!info.success) {
this._log.error("Aborting sync: failed to get collections.")
throw info;
}
return info;
},
- verifyAndFetchSymmetricKeys: function verifyAndFetchSymmetricKeys(infoResponse) {
+ async verifyAndFetchSymmetricKeys(infoResponse) {
this._log.debug("Fetching and verifying -- or generating -- symmetric keys.");
let syncKeyBundle = this.identity.syncKeyBundle;
if (!syncKeyBundle) {
this.status.login = LOGIN_FAILED_NO_PASSPHRASE;
this.status.sync = CREDENTIALS_CHANGED;
return false;
}
try {
if (!infoResponse)
- infoResponse = this._fetchInfo(); // Will throw an exception on failure.
+ infoResponse = await this._fetchInfo(); // Will throw an exception on failure.
// This only applies when the server is already at version 4.
if (infoResponse.status != 200) {
this._log.warn("info/collections returned non-200 response. Failing key fetch.");
this.status.login = LOGIN_FAILED_SERVER_ERROR;
this.errorHandler.checkServerError(infoResponse);
return false;
}
@@ -526,20 +535,20 @@ Sync11Service.prototype = {
// Don't always set to CREDENTIALS_CHANGED -- we will probably take care of this.
// Fetch storage/crypto/keys.
let cryptoKeys;
if (infoCollections && (CRYPTO_COLLECTION in infoCollections)) {
try {
cryptoKeys = new CryptoWrapper(CRYPTO_COLLECTION, KEYS_WBO);
- let cryptoResp = Async.promiseSpinningly(cryptoKeys.fetch(this.resource(this.cryptoKeysURL))).response;
+ let cryptoResp = (await cryptoKeys.fetch(this.resource(this.cryptoKeysURL))).response;
if (cryptoResp.success) {
- this.handleFetchedKeys(syncKeyBundle, cryptoKeys);
+ await this.handleFetchedKeys(syncKeyBundle, cryptoKeys);
return true;
} else if (cryptoResp.status == 404) {
// On failure, ask to generate new keys and upload them.
// Fall through to the behavior below.
this._log.warn("Got 404 for crypto/keys, but 'crypto' in info/collections. Regenerating.");
cryptoKeys = null;
} else {
// Some other problem.
@@ -569,103 +578,98 @@ Sync11Service.prototype = {
if (!cryptoKeys) {
this._log.info("No keys! Generating new ones.");
// Better make some and upload them, and wipe the server to ensure
// consistency. This is all achieved via _freshStart.
// If _freshStart fails to clear the server or upload keys, it will
// throw.
- this._freshStart();
+ await this._freshStart();
return true;
}
// Last-ditch case.
return false;
}
// No update needed: we're good!
return true;
} catch (ex) {
// This means no keys are present, or there's a network error.
this._log.debug("Failed to fetch and verify keys", ex);
this.errorHandler.checkServerError(ex);
return false;
}
},
- verifyLogin: function verifyLogin(allow40XRecovery = true) {
+ async verifyLogin(allow40XRecovery = true) {
if (!this.identity.username) {
this._log.warn("No username in verifyLogin.");
this.status.login = LOGIN_FAILED_NO_USERNAME;
return false;
}
// Attaching auth credentials to a request requires access to
// passwords, which means that Resource.get can throw MP-related
// exceptions!
// So we ask the identity to verify the login state after unlocking the
// master password (ie, this call is expected to prompt for MP unlock
// if necessary) while we still have control.
- let cb = Async.makeSpinningCallback();
- this.identity.unlockAndVerifyAuthState().then(
- result => cb(null, result),
- cb
- );
- let unlockedState = cb.wait();
+ let unlockedState = await this.identity.unlockAndVerifyAuthState();
this._log.debug("Fetching unlocked auth state returned " + unlockedState);
if (unlockedState != STATUS_OK) {
this.status.login = unlockedState;
return false;
}
try {
// Make sure we have a cluster to verify against.
// This is a little weird, if we don't get a node we pretend
// to succeed, since that probably means we just don't have storage.
if (this.clusterURL == "" && !this._clusterManager.setCluster()) {
this.status.sync = NO_SYNC_NODE_FOUND;
return true;
}
// Fetch collection info on every startup.
- let test = Async.promiseSpinningly(this.resource(this.infoURL).get());
+ let test = await this.resource(this.infoURL).get();
switch (test.status) {
case 200:
// The user is authenticated.
// We have no way of verifying the passphrase right now,
// so wait until remoteSetup to do so.
// Just make the most trivial checks.
if (!this.identity.syncKeyBundle) {
this._log.warn("No passphrase in verifyLogin.");
this.status.login = LOGIN_FAILED_NO_PASSPHRASE;
return false;
}
// Go ahead and do remote setup, so that we can determine
// conclusively that our passphrase is correct.
- if (this._remoteSetup(test)) {
+ if ((await this._remoteSetup(test))) {
// Username/password verified.
this.status.login = LOGIN_SUCCEEDED;
return true;
}
this._log.warn("Remote setup failed.");
// Remote setup must have failed.
return false;
case 401:
this._log.warn("401: login failed.");
// Fall through to the 404 case.
case 404:
// Check that we're verifying with the correct cluster
if (allow40XRecovery && this._clusterManager.setCluster()) {
- return this.verifyLogin(false);
+ return await this.verifyLogin(false);
}
// We must have the right cluster, but the server doesn't expect us.
// The implications of this depend on the identity being used - for
// the legacy identity, it's an authoritatively "incorrect password",
// (ie, LOGIN_FAILED_LOGIN_REJECTED) but for FxA it probably means
// "transient error fetching auth token".
this.status.login = this.identity.loginStatusFromVerification404();
@@ -681,35 +685,35 @@ Sync11Service.prototype = {
// Must have failed on some network issue
this._log.debug("verifyLogin failed", ex);
this.status.login = LOGIN_FAILED_NETWORK_ERROR;
this.errorHandler.checkServerError(ex);
return false;
}
},
- generateNewSymmetricKeys: function generateNewSymmetricKeys() {
+ async generateNewSymmetricKeys() {
this._log.info("Generating new keys WBO...");
let wbo = this.collectionKeys.generateNewKeysWBO();
this._log.info("Encrypting new key bundle.");
wbo.encrypt(this.identity.syncKeyBundle);
- let uploadRes = this._uploadCryptoKeys(wbo, 0);
+ let uploadRes = await this._uploadCryptoKeys(wbo, 0);
if (uploadRes.status != 200) {
this._log.warn("Got status " + uploadRes.status + " uploading new keys. What to do? Throw!");
this.errorHandler.checkServerError(uploadRes);
throw new Error("Unable to upload symmetric keys.");
}
this._log.info("Got status " + uploadRes.status + " uploading keys.");
let serverModified = uploadRes.obj; // Modified timestamp according to server.
this._log.debug("Server reports crypto modified: " + serverModified);
// Now verify that info/collections shows them!
this._log.debug("Verifying server collection records.");
- let info = this._fetchInfo();
+ let info = await this._fetchInfo();
this._log.debug("info/collections is: " + info);
if (info.status != 200) {
this._log.warn("Non-200 info/collections response. Aborting.");
throw new Error("Unable to upload symmetric keys.");
}
info = info.obj;
@@ -725,43 +729,39 @@ Sync11Service.prototype = {
"is stale after successful upload.");
throw new Error("Symmetric key upload failed.");
}
// Doesn't matter if the timestamp is ahead.
// Download and install them.
let cryptoKeys = new CryptoWrapper(CRYPTO_COLLECTION, KEYS_WBO);
- let cryptoResp = Async.promiseSpinningly(cryptoKeys.fetch(this.resource(this.cryptoKeysURL))).response;
+ let cryptoResp = (await cryptoKeys.fetch(this.resource(this.cryptoKeysURL))).response;
if (cryptoResp.status != 200) {
this._log.warn("Failed to download keys.");
throw new Error("Symmetric key download failed.");
}
- let keysChanged = this.handleFetchedKeys(this.identity.syncKeyBundle,
- cryptoKeys, true);
+ let keysChanged = await this.handleFetchedKeys(this.identity.syncKeyBundle,
+ cryptoKeys, true);
if (keysChanged) {
this._log.info("Downloaded keys differed, as expected.");
}
},
- startOver: function startOver() {
+ async startOver() {
this._log.trace("Invoking Service.startOver.");
Svc.Obs.notify("weave:engine:stop-tracking");
this.status.resetSync();
// Deletion doesn't make sense if we aren't set up yet!
if (this.clusterURL != "") {
// Clear client-specific data from the server, including disabled engines.
for (let engine of [this.clientsEngine].concat(this.engineManager.getAll())) {
try {
- // Note the additional Promise.resolve here is to handle the fact that
- // some 3rd party engines probably don't return a promise. We can
- // probably nuke this once webextensions become mandatory as then
- // no 3rd party engines will be allowed to exist.
- Async.promiseSpinningly(Promise.resolve().then(() => engine.removeClientData()));
+ await engine.removeClientData();
} catch (ex) {
this._log.warn(`Deleting client data for ${engine.name} failed`, ex);
}
}
this._log.debug("Finished deleting client data.");
} else {
this._log.debug("Skipping client data removal: no cluster URL.");
}
@@ -771,17 +771,17 @@ Sync11Service.prototype = {
// by emptying the passphrase (we still need the password).
this._log.info("Service.startOver dropping sync key and logging out.");
this.identity.resetSyncKeyBundle();
this.status.login = LOGIN_FAILED_NO_PASSPHRASE;
this.logout();
Svc.Obs.notify("weave:service:start-over");
// Reset all engines and clear keys.
- this.resetClient();
+ await this.resetClient();
this.collectionKeys.clear();
this.status.resetBackoff();
// Reset Weave prefs.
this._ignorePrefObserver = true;
Svc.Prefs.resetBranch("");
this._ignorePrefObserver = false;
this.clusterURL = null;
@@ -799,36 +799,36 @@ Sync11Service.prototype = {
} catch (err) {
this._log.error("startOver failed to re-initialize the identity manager", err);
// Still send the observer notification so the current state is
// reflected in the UI.
Svc.Obs.notify("weave:service:start-over:finish");
}
},
- login: function login() {
- function onNotify() {
+ async login() {
+ async function onNotify() {
this._loggedIn = false;
if (Services.io.offline) {
this.status.login = LOGIN_FAILED_NETWORK_ERROR;
throw "Application is offline, login should not be called";
}
this._log.info("Logging in the user.");
// Just let any errors bubble up - they've more context than we do!
try {
- Async.promiseSpinningly(this.identity.ensureLoggedIn());
+ await this.identity.ensureLoggedIn();
} finally {
this._checkSetup(); // _checkSetup has a side effect of setting the right state.
}
this._updateCachedURLs();
this._log.info("User logged in successfully - verifying login.");
- if (!this.verifyLogin()) {
+ if (!(await this.verifyLogin())) {
// verifyLogin sets the failure states here.
throw "Login failed: " + this.status.login;
}
this._loggedIn = true;
return true;
}
@@ -845,24 +845,24 @@ Sync11Service.prototype = {
this.identity.logout();
this._loggedIn = false;
Svc.Obs.notify("weave:service:logout:finish");
},
// Note: returns false if we failed for a reason other than the server not yet
// supporting the api.
- _fetchServerConfiguration() {
+ async _fetchServerConfiguration() {
// This is similar to _fetchInfo, but with different error handling.
let infoURL = this.userBaseURL + "info/configuration";
this._log.debug("Fetching server configuration", infoURL);
let configResponse;
try {
- configResponse = Async.promiseSpinningly(this.resource(infoURL).get());
+ configResponse = await this.resource(infoURL).get();
} catch (ex) {
// This is probably a network or similar error.
this._log.warn("Failed to fetch info/configuration", ex);
this.errorHandler.checkServerError(ex);
return false;
}
if (configResponse.status == 404) {
@@ -876,51 +876,51 @@ Sync11Service.prototype = {
this.serverConfiguration = configResponse.obj;
}
this._log.trace("info/configuration for this server", this.serverConfiguration);
return true;
},
// Stuff we need to do after login, before we can really do
// anything (e.g. key setup).
- _remoteSetup: function _remoteSetup(infoResponse) {
- if (!this._fetchServerConfiguration()) {
+ async _remoteSetup(infoResponse) {
+ if (!(await this._fetchServerConfiguration())) {
return false;
}
this._log.debug("Fetching global metadata record");
- let meta = Async.promiseSpinningly(this.recordManager.get(this.metaURL));
+ let meta = await this.recordManager.get(this.metaURL);
// Checking modified time of the meta record.
if (infoResponse &&
(infoResponse.obj.meta != this.metaModified) &&
(!meta || !meta.isNew)) {
// Delete the cached meta record...
this._log.debug("Clearing cached meta record. metaModified is " +
JSON.stringify(this.metaModified) + ", setting to " +
JSON.stringify(infoResponse.obj.meta));
this.recordManager.del(this.metaURL);
// ... fetch the current record from the server, and COPY THE FLAGS.
- let newMeta = Async.promiseSpinningly(this.recordManager.get(this.metaURL));
+ let newMeta = await this.recordManager.get(this.metaURL);
// If we got a 401, we do not want to create a new meta/global - we
// should be able to get the existing meta after we get a new node.
if (this.recordManager.response.status == 401) {
this._log.debug("Fetching meta/global record on the server returned 401.");
this.errorHandler.checkServerError(this.recordManager.response);
return false;
}
if (this.recordManager.response.status == 404) {
this._log.debug("No meta/global record on the server. Creating one.");
try {
- this._uploadNewMetaGlobal();
+ await this._uploadNewMetaGlobal();
} catch (uploadRes) {
this._log.warn("Unable to upload new meta/global. Failing remote setup.");
this.errorHandler.checkServerError(uploadRes);
return false;
}
} else if (!newMeta) {
this._log.warn("Unable to get meta/global. Failing remote setup.");
this.errorHandler.checkServerError(this.recordManager.response);
@@ -960,52 +960,52 @@ Sync11Service.prototype = {
}
if (!meta)
this._log.info("No metadata record, server wipe needed");
if (meta && !meta.payload.syncID)
this._log.warn("No sync id, server wipe needed");
this._log.info("Wiping server data");
- this._freshStart();
+ await this._freshStart();
if (status == 404)
this._log.info("Metadata record not found, server was wiped to ensure " +
"consistency.");
else // 200
this._log.info("Wiped server; incompatible metadata: " + remoteVersion);
return true;
} else if (remoteVersion > STORAGE_VERSION) {
this.status.sync = VERSION_OUT_OF_DATE;
this._log.warn("Upgrade required to access newer storage version.");
return false;
} else if (meta.payload.syncID != this.syncID) {
this._log.info("Sync IDs differ. Local is " + this.syncID + ", remote is " + meta.payload.syncID);
- this.resetClient();
+ await this.resetClient();
this.collectionKeys.clear();
this.syncID = meta.payload.syncID;
this._log.debug("Clear cached values and take syncId: " + this.syncID);
- if (!this.verifyAndFetchSymmetricKeys(infoResponse)) {
+ if (!(await this.verifyAndFetchSymmetricKeys(infoResponse))) {
this._log.warn("Failed to fetch symmetric keys. Failing remote setup.");
return false;
}
// bug 545725 - re-verify creds and fail sanely
- if (!this.verifyLogin()) {
+ if (!(await this.verifyLogin())) {
this.status.sync = CREDENTIALS_CHANGED;
this._log.info("Credentials have changed, aborting sync and forcing re-login.");
return false;
}
return true;
}
- if (!this.verifyAndFetchSymmetricKeys(infoResponse)) {
+ if (!(await this.verifyAndFetchSymmetricKeys(infoResponse))) {
this._log.warn("Failed to fetch symmetric keys. Failing remote setup.");
return false;
}
return true;
},
/**
@@ -1044,175 +1044,169 @@ Sync11Service.prototype = {
reason = kFirstSyncChoiceNotMade;
if (ignore && ignore.indexOf(reason) != -1)
return "";
return reason;
},
- sync: function sync(engineNamesToSync) {
+ async sync(engineNamesToSync) {
let dateStr = Utils.formatTimestamp(new Date());
this._log.debug("User-Agent: " + Utils.userAgent);
this._log.info(`Starting sync at ${dateStr} in browser session ${browserSessionID}`);
- this._catch(function() {
+ return this._catch(async function() {
// Make sure we're logged in.
if (this._shouldLogin()) {
this._log.debug("In sync: should login.");
- if (!this.login()) {
+ if (!(await this.login())) {
this._log.debug("Not syncing: login returned false.");
return;
}
} else {
this._log.trace("In sync: no need to login.");
}
- this._lockedSync(engineNamesToSync);
+ await this._lockedSync(engineNamesToSync);
})();
},
/**
* Sync up engines with the server.
*/
- _lockedSync: function _lockedSync(engineNamesToSync) {
+ async _lockedSync(engineNamesToSync) {
return this._lock("service.js: sync",
- this._notify("sync", "", function onNotify() {
+ this._notify("sync", "", async function onNotify() {
let histogram = Services.telemetry.getHistogramById("WEAVE_START_COUNT");
histogram.add(1);
let synchronizer = new EngineSynchronizer(this);
- let cb = Async.makeSpinningCallback();
- synchronizer.onComplete = cb;
-
- synchronizer.sync(engineNamesToSync);
- // wait() throws if the first argument is truthy, which is exactly what
- // we want.
- cb.wait();
+ await synchronizer.sync(engineNamesToSync); // Might throw!
histogram = Services.telemetry.getHistogramById("WEAVE_COMPLETE_SUCCESS_COUNT");
histogram.add(1);
// We successfully synchronized.
// Check if the identity wants to pre-fetch a migration sentinel from
// the server.
// If we have no clusterURL, we are probably doing a node reassignment
// so don't attempt to get it in that case.
if (this.clusterURL) {
this.identity.prefetchMigrationSentinel(this);
}
// Now let's update our declined engines (but only if we have a metaURL;
// if Sync failed due to no node we will not have one)
if (this.metaURL) {
- let meta = Async.promiseSpinningly(this.recordManager.get(this.metaURL));
+ let meta = await this.recordManager.get(this.metaURL);
if (!meta) {
this._log.warn("No meta/global; can't update declined state.");
return;
}
let declinedEngines = new DeclinedEngines(this);
let didChange = declinedEngines.updateDeclined(meta, this.engineManager);
if (!didChange) {
this._log.info("No change to declined engines. Not reuploading meta/global.");
return;
}
- this.uploadMetaGlobal(meta);
+ await this.uploadMetaGlobal(meta);
}
}))();
},
/**
* Upload a fresh meta/global record
* @throws the response object if the upload request was not a success
*/
- _uploadNewMetaGlobal() {
+ async _uploadNewMetaGlobal() {
let meta = new WBORecord("meta", "global");
meta.payload.syncID = this.syncID;
meta.payload.storageVersion = STORAGE_VERSION;
meta.payload.declined = this.engineManager.getDeclined();
meta.modified = 0;
meta.isNew = true;
- this.uploadMetaGlobal(meta);
+ await this.uploadMetaGlobal(meta);
},
/**
* Upload meta/global, throwing the response on failure
* @param {WBORecord} meta meta/global record
* @throws the response object if the request was not a success
*/
- uploadMetaGlobal(meta) {
+ async uploadMetaGlobal(meta) {
this._log.debug("Uploading meta/global", meta);
let res = this.resource(this.metaURL);
res.setHeader("X-If-Unmodified-Since", meta.modified);
- let response = Async.promiseSpinningly(res.put(meta));
+ let response = await res.put(meta);
if (!response.success) {
throw response;
}
// From https://docs.services.mozilla.com/storage/apis-1.5.html:
// "Successful responses will return the new last-modified time for the collection."
meta.modified = response.obj;
this.recordManager.set(this.metaURL, meta);
},
/**
* Upload crypto/keys
* @param {WBORecord} cryptoKeys crypto/keys record
* @param {Number} lastModified known last modified timestamp (in decimal seconds),
* will be used to set the X-If-Unmodified-Since header
*/
- _uploadCryptoKeys(cryptoKeys, lastModified) {
+ async _uploadCryptoKeys(cryptoKeys, lastModified) {
this._log.debug(`Uploading crypto/keys (lastModified: ${lastModified})`);
let res = this.resource(this.cryptoKeysURL);
res.setHeader("X-If-Unmodified-Since", lastModified);
- return Async.promiseSpinningly(res.put(cryptoKeys));
+ return res.put(cryptoKeys);
},
- _freshStart: function _freshStart() {
+ async _freshStart() {
this._log.info("Fresh start. Resetting client.");
- this.resetClient();
+ await this.resetClient();
this.collectionKeys.clear();
// Wipe the server.
- this.wipeServer();
+ await this.wipeServer();
// Upload a new meta/global record.
// _uploadNewMetaGlobal throws on failure -- including race conditions.
// If we got into a race condition, we'll abort the sync this way, too.
// That's fine. We'll just wait till the next sync. The client that we're
// racing is probably busy uploading stuff right now anyway.
- this._uploadNewMetaGlobal();
+ await this._uploadNewMetaGlobal();
// Wipe everything we know about except meta because we just uploaded it
// TODO: there's a bug here. We should be calling resetClient, no?
// Generate, upload, and download new keys. Do this last so we don't wipe
// them...
- this.generateNewSymmetricKeys();
+ await this.generateNewSymmetricKeys();
},
/**
* Wipe user data from the server.
*
* @param collections [optional]
* Array of collections to wipe. If not given, all collections are
* wiped by issuing a DELETE request for `storageURL`.
*
* @return the server's timestamp of the (last) DELETE.
*/
- wipeServer: function wipeServer(collections) {
+ async wipeServer(collections) {
let response;
let histogram = Services.telemetry.getHistogramById("WEAVE_WIPE_SERVER_SUCCEEDED");
if (!collections) {
// Strip the trailing slash.
let res = this.resource(this.storageURL.slice(0, -1));
res.setHeader("X-Confirm-Delete", "1");
try {
- response = Async.promiseSpinningly(res.delete());
+ response = await res.delete();
} catch (ex) {
this._log.debug("Failed to wipe server", ex);
histogram.add(false);
throw ex;
}
if (response.status != 200 && response.status != 404) {
this._log.debug("Aborting wipeServer. Server responded with " +
response.status + " response for " + this.storageURL);
@@ -1222,17 +1216,17 @@ Sync11Service.prototype = {
histogram.add(true);
return response.headers["x-weave-timestamp"];
}
let timestamp;
for (let name of collections) {
let url = this.storageURL + name;
try {
- response = Async.promiseSpinningly(this.resource(url).delete());
+ response = await this.resource(url).delete();
} catch (ex) {
this._log.debug("Failed to wipe '" + name + "' collection", ex);
histogram.add(false);
throw ex;
}
if (response.status != 200 && response.status != 404) {
this._log.debug("Aborting wipeServer. Server responded with " +
@@ -1250,105 +1244,105 @@ Sync11Service.prototype = {
},
/**
* Wipe all local user data.
*
* @param engines [optional]
* Array of engine names to wipe. If not given, all engines are used.
*/
- wipeClient: function wipeClient(engines) {
+ async wipeClient(engines) {
// If we don't have any engines, reset the service and wipe all engines
if (!engines) {
// Clear out any service data
- this.resetService();
+ await this.resetService();
engines = [this.clientsEngine].concat(this.engineManager.getAll());
} else {
// Convert the array of names into engines
engines = this.engineManager.get(engines);
}
// Fully wipe each engine if it's able to decrypt data
for (let engine of engines) {
- if (engine.canDecrypt()) {
- engine.wipeClient();
+ if ((await engine.canDecrypt())) {
+ await engine.wipeClient();
}
}
},
/**
* Wipe all remote user data by wiping the server then telling each remote
* client to wipe itself.
*
* @param engines [optional]
* Array of engine names to wipe. If not given, all engines are used.
*/
- wipeRemote: function wipeRemote(engines) {
+ async wipeRemote(engines) {
try {
// Make sure stuff gets uploaded.
- this.resetClient(engines);
+ await this.resetClient(engines);
// Clear out any server data.
- this.wipeServer(engines);
+ await this.wipeServer(engines);
// Only wipe the engines provided.
let extra = { reason: "wipe-remote" };
if (engines) {
- engines.forEach(function(e) {
- this.clientsEngine.sendCommand("wipeEngine", [e], null, extra);
- }, this);
+ for (const e of engines) {
+ await this.clientsEngine.sendCommand("wipeEngine", [e], null, extra);
+ }
} else {
// Tell the remote machines to wipe themselves.
- this.clientsEngine.sendCommand("wipeAll", [], null, extra);
+ await this.clientsEngine.sendCommand("wipeAll", [], null, extra);
}
// Make sure the changed clients get updated.
- this.clientsEngine.sync();
+ await this.clientsEngine.sync();
} catch (ex) {
this.errorHandler.checkServerError(ex);
throw ex;
}
},
/**
* Reset local service information like logs, sync times, caches.
*/
- resetService: function resetService() {
- this._catch(function reset() {
+ async resetService() {
+ return this._catch(async function reset() {
this._log.info("Service reset.");
// Pretend we've never synced to the server and drop cached data
this.syncID = "";
this.recordManager.clearCache();
})();
},
/**
* Reset the client by getting rid of any local server data and client data.
*
* @param engines [optional]
* Array of engine names to reset. If not given, all engines are used.
*/
- resetClient: function resetClient(engines) {
- this._catch(function doResetClient() {
+ async resetClient(engines) {
+ return this._catch(async function doResetClient() {
// If we don't have any engines, reset everything including the service
if (!engines) {
// Clear out any service data
- this.resetService();
+ await this.resetService();
engines = [this.clientsEngine].concat(this.engineManager.getAll());
} else {
// Convert the array of names into engines
engines = this.engineManager.get(engines);
}
// Have each engine drop any temporary meta data
for (let engine of engines) {
- engine.resetClient();
+ await engine.resetClient();
}
})();
},
/**
* Fetch storage info from the server.
*
* @param type
@@ -1396,9 +1390,11 @@ Sync11Service.prototype = {
},
recordTelemetryEvent(object, method, value, extra = undefined) {
Svc.Obs.notify("weave:telemetry:event", { object, method, value, extra });
},
};
this.Service = new Sync11Service();
-this.Service.onStartup();
+this.Service.promiseInitialized = new Promise(resolve => {
+ this.Service.onStartup().then(resolve);
+});
--- a/services/sync/modules/stages/enginesync.js
+++ b/services/sync/modules/stages/enginesync.js
@@ -23,205 +23,189 @@ XPCOMUtils.defineLazyModuleGetter(this,
*
* This was originally split out of service.js. The API needs lots of love.
*/
this.EngineSynchronizer = function EngineSynchronizer(service) {
this._log = Log.repository.getLogger("Sync.Synchronizer");
this._log.level = Log.Level[Svc.Prefs.get("log.logger.synchronizer")];
this.service = service;
-
- this.onComplete = null;
}
EngineSynchronizer.prototype = {
- sync: function sync(engineNamesToSync) {
- if (!this.onComplete) {
- throw new Error("onComplete handler not installed.");
- }
-
+ async sync(engineNamesToSync) {
let startTime = Date.now();
this.service.status.resetSync();
// Make sure we should sync or record why we shouldn't.
let reason = this.service._checkSync();
if (reason) {
if (reason == kSyncNetworkOffline) {
this.service.status.sync = LOGIN_FAILED_NETWORK_ERROR;
}
// this is a purposeful abort rather than a failure, so don't set
// any status bits
reason = "Can't sync: " + reason;
- this.onComplete(new Error("Can't sync: " + reason));
- return;
+ throw new Error(reason);
}
// If we don't have a node, get one. If that fails, retry in 10 minutes.
if (!this.service.clusterURL && !this.service._clusterManager.setCluster()) {
this.service.status.sync = NO_SYNC_NODE_FOUND;
this._log.info("No cluster URL found. Cannot sync.");
- this.onComplete(null);
return;
}
// Ping the server with a special info request once a day.
let infoURL = this.service.infoURL;
let now = Math.floor(Date.now() / 1000);
let lastPing = Svc.Prefs.get("lastPing", 0);
if (now - lastPing > 86400) { // 60 * 60 * 24
infoURL += "?v=" + WEAVE_VERSION;
Svc.Prefs.set("lastPing", now);
}
let engineManager = this.service.engineManager;
// Figure out what the last modified time is for each collection
- let info = this.service._fetchInfo(infoURL);
+ let info = await this.service._fetchInfo(infoURL);
// Convert the response to an object and read out the modified times
for (let engine of [this.service.clientsEngine].concat(engineManager.getAll())) {
engine.lastModified = info.obj[engine.name] || 0;
}
- if (!(this.service._remoteSetup(info))) {
- this.onComplete(new Error("Aborting sync, remote setup failed"));
- return;
+ if (!(await this.service._remoteSetup(info))) {
+ throw new Error("Aborting sync, remote setup failed");
}
// Make sure we have an up-to-date list of clients before sending commands
this._log.debug("Refreshing client list.");
- if (!this._syncEngine(this.service.clientsEngine)) {
+ if (!(await this._syncEngine(this.service.clientsEngine))) {
// Clients is an engine like any other; it can fail with a 401,
// and we can elect to abort the sync.
this._log.warn("Client engine sync failed. Aborting.");
- this.onComplete(null);
return;
}
// We only honor the "hint" of what engines to Sync if this isn't
// a first sync.
let allowEnginesHint = false;
// Wipe data in the desired direction if necessary
switch (Svc.Prefs.get("firstSync")) {
case "resetClient":
- this.service.resetClient(engineManager.enabledEngineNames);
+ await this.service.resetClient(engineManager.enabledEngineNames);
break;
case "wipeClient":
- this.service.wipeClient(engineManager.enabledEngineNames);
+ await this.service.wipeClient(engineManager.enabledEngineNames);
break;
case "wipeRemote":
- this.service.wipeRemote(engineManager.enabledEngineNames);
+ await this.service.wipeRemote(engineManager.enabledEngineNames);
break;
default:
allowEnginesHint = true;
break;
}
if (this.service.clientsEngine.localCommands) {
try {
- if (!(this.service.clientsEngine.processIncomingCommands())) {
+ if (!(await this.service.clientsEngine.processIncomingCommands())) {
this.service.status.sync = ABORT_SYNC_COMMAND;
- this.onComplete(new Error("Processed command aborted sync."));
- return;
+ throw new Error("Processed command aborted sync.");
}
// Repeat remoteSetup in-case the commands forced us to reset
- if (!(this.service._remoteSetup(info))) {
- this.onComplete(new Error("Remote setup failed after processing commands."));
- return;
+ if (!(await this.service._remoteSetup(info))) {
+ throw new Error("Remote setup failed after processing commands.");
}
} finally {
// Always immediately attempt to push back the local client (now
// without commands).
// Note that we don't abort here; if there's a 401 because we've
// been reassigned, we'll handle it around another engine.
- this._syncEngine(this.service.clientsEngine);
+ await this._syncEngine(this.service.clientsEngine);
}
}
// Update engines because it might change what we sync.
try {
- this._updateEnabledEngines();
+ await this._updateEnabledEngines();
} catch (ex) {
this._log.debug("Updating enabled engines failed", ex);
this.service.errorHandler.checkServerError(ex);
- this.onComplete(ex);
- return;
+ throw ex;
}
// If the engines to sync has been specified, we sync in the order specified.
let enginesToSync;
if (allowEnginesHint && engineNamesToSync) {
this._log.info("Syncing specified engines", engineNamesToSync);
enginesToSync = engineManager.get(engineNamesToSync).filter(e => e.enabled);
} else {
this._log.info("Syncing all enabled engines.");
enginesToSync = engineManager.getEnabled();
}
try {
// We don't bother validating engines that failed to sync.
let enginesToValidate = [];
for (let engine of enginesToSync) {
// If there's any problems with syncing the engine, report the failure
- if (!(this._syncEngine(engine)) || this.service.status.enforceBackoff) {
+ if (!(await this._syncEngine(engine)) || this.service.status.enforceBackoff) {
this._log.info("Aborting sync for failure in " + engine.name);
break;
}
enginesToValidate.push(engine);
}
// If _syncEngine fails for a 401, we might not have a cluster URL here.
// If that's the case, break out of this immediately, rather than
// throwing an exception when trying to fetch metaURL.
if (!this.service.clusterURL) {
this._log.debug("Aborting sync, no cluster URL: " +
"not uploading new meta/global.");
- this.onComplete(null);
return;
}
// Upload meta/global if any engines changed anything.
- let meta = Async.promiseSpinningly(this.service.recordManager.get(this.service.metaURL));
+ let meta = await this.service.recordManager.get(this.service.metaURL);
if (meta.isNew || meta.changed) {
this._log.info("meta/global changed locally: reuploading.");
try {
- this.service.uploadMetaGlobal(meta);
+ await this.service.uploadMetaGlobal(meta);
delete meta.isNew;
delete meta.changed;
} catch (error) {
this._log.error("Unable to upload meta/global. Leaving marked as new.");
}
}
- Async.promiseSpinningly(Doctor.consult(enginesToValidate));
+ await Doctor.consult(enginesToValidate);
// If there were no sync engine failures
if (this.service.status.service != SYNC_FAILED_PARTIAL) {
Svc.Prefs.set("lastSync", new Date().toString());
this.service.status.sync = SYNC_SUCCEEDED;
}
} finally {
Svc.Prefs.reset("firstSync");
let syncTime = ((Date.now() - startTime) / 1000).toFixed(2);
let dateStr = Utils.formatTimestamp(new Date());
this._log.info("Sync completed at " + dateStr
+ " after " + syncTime + " secs.");
}
-
- this.onComplete(null);
},
// Returns true if sync should proceed.
// false / no return value means sync should be aborted.
- _syncEngine: function _syncEngine(engine) {
+ async _syncEngine(engine) {
try {
- engine.sync();
+ await engine.sync();
} catch (e) {
if (e.status == 401) {
// Maybe a 401, cluster update perhaps needed?
// We rely on ErrorHandler observing the sync failure notification to
// schedule another sync and clear node assignment values.
// Here we simply want to muffle the exception and return an
// appropriate value.
return false;
@@ -233,17 +217,17 @@ EngineSynchronizer.prototype = {
this._log.info(`${engine.name} was interrupted by shutdown; no other engines will sync`);
return false;
}
}
return true;
},
- _updateEnabledFromMeta(meta, numClients, engineManager = this.service.engineManager) {
+ async _updateEnabledFromMeta(meta, numClients, engineManager = this.service.engineManager) {
this._log.info("Updating enabled engines: " +
numClients + " clients.");
if (meta.isNew || !meta.payload.engines) {
this._log.debug("meta/global isn't new, or is missing engines. Not updating enabled state.");
return;
}
@@ -302,17 +286,17 @@ EngineSynchronizer.prototype = {
attemptedEnable = true;
}
// If either the engine was disabled locally or enabling the engine
// failed (see above re master-password) then wipe server data and
// disable it everywhere.
if (!engine.enabled) {
this._log.trace("Wiping data for " + engineName + " engine.");
- engine.wipeServer();
+ await engine.wipeServer();
delete meta.payload.engines[engineName];
meta.changed = true; // the new enabled state must propagate
// We also here mark the engine as declined, because the pref
// was explicitly changed to false - unless we tried, and failed,
// to enable it - in which case we leave the declined state alone.
if (!attemptedEnable) {
// This will be reflected in meta/global in the next stage.
this._log.trace("Engine " + engineName + " was disabled locally. Marking as declined.");
@@ -337,17 +321,17 @@ EngineSynchronizer.prototype = {
engineManager.decline(toDecline);
engineManager.undecline(toUndecline);
Svc.Prefs.resetBranch("engineStatusChanged.");
this.service._ignorePrefObserver = false;
},
- _updateEnabledEngines() {
- let meta = Async.promiseSpinningly(this.service.recordManager.get(this.service.metaURL));
+ async _updateEnabledEngines() {
+ let meta = await this.service.recordManager.get(this.service.metaURL);
let numClients = this.service.scheduler.numClients;
let engineManager = this.service.engineManager;
- this._updateEnabledFromMeta(meta, numClients, engineManager);
+ await this._updateEnabledFromMeta(meta, numClients, engineManager);
},
};
Object.freeze(EngineSynchronizer.prototype);
--- a/services/sync/modules/util.js
+++ b/services/sync/modules/util.js
@@ -71,98 +71,99 @@ this.Utils = {
" (" + hph.oscpu + ")" + // (oscpu)
" FxSync/" + WEAVE_VERSION + "." + // Sync.
Services.appinfo.appBuildID + "."; // Build.
}
return this._userAgent + Svc.Prefs.get("client.type", "desktop");
},
/**
- * Wrap a function to catch all exceptions and log them
+ * Wrap a [promise-returning] function to catch all exceptions and log them.
*
* @usage MyObj._catch = Utils.catch;
* MyObj.foo = function() { this._catch(func)(); }
*
* Optionally pass a function which will be called if an
* exception occurs.
*/
- catch: function Utils_catch(func, exceptionCallback) {
+ catch(func, exceptionCallback) {
let thisArg = this;
- return function WrappedCatch() {
+ return async function WrappedCatch() {
try {
- return func.call(thisArg);
+ return await func.call(thisArg);
} catch (ex) {
thisArg._log.debug("Exception calling " + (func.name || "anonymous function"), ex);
if (exceptionCallback) {
return exceptionCallback.call(thisArg, ex);
}
return null;
}
};
},
/**
- * Wrap a function to call lock before calling the function then unlock.
+ * Wrap a [promise-returning] function to call lock before calling the function
+ * then unlock when it finishes executing or if it threw an error.
*
* @usage MyObj._lock = Utils.lock;
- * MyObj.foo = function() { this._lock(func)(); }
+ * MyObj.foo = async function() { await this._lock(func)(); }
*/
- lock: function lock(label, func) {
+ lock(label, func) {
let thisArg = this;
- return function WrappedLock() {
+ return async function WrappedLock() {
if (!thisArg.lock()) {
throw "Could not acquire lock. Label: \"" + label + "\".";
}
try {
- return func.call(thisArg);
+ return await func.call(thisArg);
} finally {
thisArg.unlock();
}
};
},
isLockException: function isLockException(ex) {
return ex && ex.indexOf && ex.indexOf("Could not acquire lock.") == 0;
},
/**
- * Wrap functions to notify when it starts and finishes executing or if it
- * threw an error.
+ * Wrap [promise-returning] functions to notify when it starts and
+ * finishes executing or if it threw an error.
*
* The message is a combination of a provided prefix, the local name, and
* the event. Possible events are: "start", "finish", "error". The subject
* is the function's return value on "finish" or the caught exception on
* "error". The data argument is the predefined data value.
*
* Example:
*
* @usage function MyObj(name) {
* this.name = name;
* this._notify = Utils.notify("obj:");
* }
* MyObj.prototype = {
- * foo: function() this._notify("func", "data-arg", function () {
+ * foo: function() this._notify("func", "data-arg", async function () {
* //...
* }(),
* };
*/
- notify: function Utils_notify(prefix) {
+ notify(prefix) {
return function NotifyMaker(name, data, func) {
let thisArg = this;
let notify = function(state, subject) {
let mesg = prefix + name + ":" + state;
thisArg._log.trace("Event: " + mesg);
Observers.notify(mesg, subject, data);
};
- return function WrappedNotify() {
+ return async function WrappedNotify() {
+ notify("start", null);
try {
- notify("start", null);
- let ret = func.call(thisArg);
+ let ret = await func.call(thisArg);
notify("finish", ret);
return ret;
} catch (ex) {
notify("error", ex);
throw ex;
}
};
};
@@ -294,86 +295,69 @@ this.Utils = {
/**
* Load a JSON file from disk in the profile directory.
*
* @param filePath
* JSON file path load from profile. Loaded file will be
* <profile>/<filePath>.json. i.e. Do not specify the ".json"
* extension.
* @param that
- * Object to use for logging and "this" for callback.
- * @param callback
- * Function to process json object as its first argument. If the file
- * could not be loaded, the first argument will be undefined.
+ * Object to use for logging.
+ *
+ * @return Promise<>
+ * Promise resolved when the write has been performed.
*/
- async jsonLoad(filePath, that, callback) {
+ async jsonLoad(filePath, that) {
let path = Utils.jsonFilePath(filePath);
- if (that._log) {
+ if (that._log && that._log.trace) {
that._log.trace("Loading json from disk: " + filePath);
}
- let json;
-
try {
- json = await CommonUtils.readJSON(path);
+ return await CommonUtils.readJSON(path);
} catch (e) {
- if (e instanceof OS.File.Error && e.becauseNoSuchFile) {
- // Ignore non-existent files, but explicitly return null.
- json = null;
- } else if (that._log) {
+ if (!(e instanceof OS.File.Error && e.becauseNoSuchFile)) {
+ if (that._log) {
that._log.debug("Failed to load json", e);
}
+ }
+ return null;
}
-
- if (callback) {
- callback.call(that, json);
- }
- return json;
},
/**
* Save a json-able object to disk in the profile directory.
*
* @param filePath
* JSON file path save to <filePath>.json
* @param that
- * Object to use for logging and "this" for callback
+ * Object to use for logging.
* @param obj
* Function to provide json-able object to save. If this isn't a
- * function, it'll be used as the object to make a json string.
- * @param callback
+ * function, it'll be used as the object to make a json string.*
* Function called when the write has been performed. Optional.
- * The first argument will be a Components.results error
- * constant on error or null if no error was encountered (and
- * the file saved successfully).
+ *
+ * @return Promise<>
+ * Promise resolved when the write has been performed.
*/
- async jsonSave(filePath, that, obj, callback) {
+ async jsonSave(filePath, that, obj) {
let path = OS.Path.join(OS.Constants.Path.profileDir, "weave",
...(filePath + ".json").split("/"));
let dir = OS.Path.dirname(path);
- let error = null;
- try {
- await OS.File.makeDir(dir, { from: OS.Constants.Path.profileDir });
+ await OS.File.makeDir(dir, { from: OS.Constants.Path.profileDir });
- if (that._log) {
- that._log.trace("Saving json to disk: " + path);
- }
-
- let json = typeof obj == "function" ? obj.call(that) : obj;
-
- await CommonUtils.writeJSON(json, path);
- } catch (e) {
- error = e
+ if (that._log) {
+ that._log.trace("Saving json to disk: " + path);
}
- if (typeof callback == "function") {
- callback.call(that, error);
- }
+ let json = typeof obj == "function" ? obj.call(that) : obj;
+
+ return CommonUtils.writeJSON(json, path);
},
/**
* Move a json file in the profile directory. Will fail if a file exists at the
* destination.
*
* @returns a promise that resolves to undefined on success, or rejects on failure
*