Bug 633062 p1 - Introduce AsyncQueueCaller and AsyncObserver. r?markh
MozReview-Commit-ID: GGuSkA5DZUQ
--- a/services/common/async.js
+++ b/services/common/async.js
@@ -9,16 +9,17 @@ var {classes: Cc, interfaces: Ci, result
// Constants for makeSyncCallback, waitForSyncCallback.
const CB_READY = {};
const CB_COMPLETE = {};
const CB_FAIL = {};
const REASON_ERROR = Ci.mozIStorageStatementCallback.REASON_ERROR;
ChromeUtils.import("resource://gre/modules/Services.jsm");
+ChromeUtils.import("resource://gre/modules/XPCOMUtils.jsm");
/*
* Helpers for various async operations.
*/
this.Async = {
/**
* Execute an arbitrary number of asynchronous functions one after the
@@ -198,10 +199,70 @@ this.Async = {
jankYielder(yieldEvery = 50) {
let iterations = 0;
return async () => {
Async.checkAppReady(); // Let it throw!
if (++iterations % yieldEvery === 0) {
await Async.promiseYield();
}
};
+ },
+
+ asyncQueueCaller(log) {
+ return new AsyncQueueCaller(log);
+ },
+
+ asyncObserver(log, obj) {
+ return new AsyncObserver(log, obj);
}
};
+
+/**
+ * Allows consumers to enqueue asynchronous callbacks to be called in order.
+ * Typically this is used when providing a callback to a caller that doesn't
+ * await on promises.
+ */
+class AsyncQueueCaller {
+ constructor(log) {
+ this._log = log;
+ this._queue = Promise.resolve();
+ this.QueryInterface = XPCOMUtils.generateQI([Ci.nsIObserver, Ci.nsISupportsWeakReference]);
+ }
+
+ /**
+ * /!\ Never await on another function that calls enqueueCall /!\
+ * on the same queue or we will deadlock.
+ */
+ enqueueCall(func) {
+ this._queue = (async () => {
+ await this._queue;
+ try {
+ await func();
+ } catch (e) {
+ this._log.error(e);
+ }
+ })();
+ }
+
+ promiseCallsComplete() {
+ return this._queue;
+ }
+}
+
+/*
+ * Subclass of AsyncQueueCaller that can be used with Services.obs directly.
+ * When this observe() is called, it will enqueue a call to the consumers's
+ * observe().
+ */
+class AsyncObserver extends AsyncQueueCaller {
+ constructor(obj, log) {
+ super(log);
+ this.obj = obj;
+ }
+
+ observe(subject, topic, data) {
+ this.enqueueCall(() => this.obj.observe(subject, topic, data));
+ }
+
+ promiseObserversComplete() {
+ return this.promiseCallsComplete();
+ }
+}