Bug 1440022: initial implementation r?lina draft
authorEthan Glasser-Camp <ethan@betacantrips.com>
Wed, 11 Apr 2018 10:53:20 -0400
changeset 808030 cebb80f0b578e51e77f9dbf11634175d02b107c9
parent 808029 e664b15cda8566145b24f2ee9f6c0c50525f04b6
child 808031 f200df30a8b3a94f75bb38461c7d3e05c0755e3b
push id113255
push userbmo:eglassercamp@mozilla.com
push dateSun, 17 Jun 2018 05:13:12 +0000
reviewerslina
bugs1440022
milestone62.0a1
Bug 1440022: initial implementation r?lina MozReview-Commit-ID: GMnGfpUSnox
dom/push/PushBroadcastService.jsm
dom/push/PushService.jsm
dom/push/PushServiceAndroidGCM.jsm
dom/push/PushServiceHttp2.jsm
dom/push/PushServiceWebSocket.jsm
dom/push/moz.build
new file mode 100644
--- /dev/null
+++ b/dom/push/PushBroadcastService.jsm
@@ -0,0 +1,216 @@
+/* jshint moz: true, esnext: true */
+/* This Source Code Form is subject to the terms of the Mozilla Public
+ * License, v. 2.0. If a copy of the MPL was not distributed with this file,
+ * You can obtain one at http://mozilla.org/MPL/2.0/. */
+
+"use strict";
+
+ChromeUtils.import("resource://gre/modules/osfile.jsm");
+ChromeUtils.import("resource://gre/modules/XPCOMUtils.jsm");
+ChromeUtils.defineModuleGetter(this, "JSONFile", "resource://gre/modules/JSONFile.jsm");
+
+var EXPORTED_SYMBOLS = ["pushBroadcastService"];
+
+// We are supposed to ignore any updates with this version.
+// FIXME: what is the actual "dummy" version?
+// See bug 1467550.
+const DUMMY_VERSION_STRING = "dummy";
+
+XPCOMUtils.defineLazyGetter(this, "console", () => {
+  let {ConsoleAPI} = ChromeUtils.import("resource://gre/modules/Console.jsm", {});
+  return new ConsoleAPI({
+    maxLogLevelPref: "dom.push.loglevel",
+    prefix: "BroadcastService",
+  });
+});
+ChromeUtils.defineModuleGetter(this, "PushService", "resource://gre/modules/PushService.jsm");
+
+class InvalidSourceInfo extends Error {
+  constructor(message) {
+    super(message);
+    this.name = 'InvalidSourceInfo';
+  }
+}
+
+const BROADCAST_SERVICE_VERSION = 1;
+
+var BroadcastService = class {
+  constructor(pushService, path) {
+    this.pushService = pushService;
+    this.jsonFile = new JSONFile({
+      path,
+      dataPostProcessor: this._initializeJSONFile,
+    });
+    this.initializePromise = this.jsonFile.load();
+  }
+
+  /**
+   * Convert the listeners from our on-disk format to the format
+   * needed by a hello message.
+   */
+  async getListeners() {
+    await this.initializePromise;
+    return Object.entries(this.jsonFile.data.listeners).reduce((acc, [k, v]) => {
+      acc[k] = v.version;
+      return acc;
+    }, {});
+  }
+
+  _initializeJSONFile(data) {
+    if (!data.version) {
+      data.version = BROADCAST_SERVICE_VERSION;
+    }
+    if (!data.hasOwnProperty("listeners")) {
+      data.listeners = {};
+    }
+    return data;
+  }
+
+  /**
+   * Reset to a state akin to what you would get in a new profile.
+   * In particular, wipe anything from storage.
+   *
+   * Used mainly for testing.
+   */
+  async _resetListeners() {
+    await this.initializePromise;
+    this.jsonFile.data = this._initializeJSONFile({});
+    this.initializePromise = Promise.resolve();
+  }
+
+  /**
+   * Ensure that a sourceInfo is correct (has the expected fields).
+   */
+  _validateSourceInfo(sourceInfo) {
+    const {moduleURI, symbolName} = sourceInfo;
+    if (typeof moduleURI !== "string") {
+      throw new InvalidSourceInfo(`moduleURI must be a string (got ${typeof moduleURI})`);
+    }
+    if (typeof symbolName !== "string") {
+      throw new InvalidSourceInfo(`symbolName must be a string (got ${typeof symbolName})`);
+    }
+  }
+
+  /**
+   * Add an entry for a given listener if it isn't present, or update
+   * one if it is already present.
+   *
+   * Note that this means only a single listener can be set for a
+   * given subscription. This is a limitation in the current API that
+   * stems from the fact that there can only be one source of truth
+   * for the subscriber's version. As a workaround, you can define a
+   * listener which calls multiple other listeners.
+   *
+   * @param {string} broadcastId The broadcastID to listen for
+   * @param {string} version The most recent version we have for
+   *   updates from this broadcastID
+   * @param {Object} sourceInfo A description of the handler for
+   *   updates on this broadcastID
+   */
+  async addListener(broadcastId, version, sourceInfo) {
+    console.info("addListener: adding listener", broadcastId, version, sourceInfo);
+    await this.initializePromise;
+    this._validateSourceInfo(sourceInfo);
+    if (typeof version !== "string") {
+      throw new TypeError("version should be a string");
+    }
+    const isNew = !this.jsonFile.data.listeners.hasOwnProperty(broadcastId);
+
+    // Update listeners before telling the pushService to subscribe,
+    // in case it would disregard the update in the small window
+    // between getting listeners and setting state to RUNNING.
+    this.jsonFile.data.listeners[broadcastId] = {version, sourceInfo};
+    this.jsonFile.saveSoon();
+
+    if (isNew) {
+      await this.pushService.subscribeBroadcast(broadcastId, version);
+    }
+  }
+
+  async receivedBroadcastMessage(broadcasts) {
+    console.info("receivedBroadcastMessage:", broadcasts);
+    await this.initializePromise;
+    for (const broadcastId in broadcasts) {
+      const version = broadcasts[broadcastId];
+      if (version === DUMMY_VERSION_STRING) {
+        console.info("Ignoring", version, "because it's the dummy version");
+        continue;
+      }
+      // We don't know this broadcastID. This is probably a bug?
+      if (!this.jsonFile.data.listeners.hasOwnProperty(broadcastId)) {
+        console.warn("receivedBroadcastMessage: unknown broadcastId", broadcastId);
+        continue;
+      }
+
+      const {sourceInfo} = this.jsonFile.data.listeners[broadcastId];
+      try {
+        this._validateSourceInfo(sourceInfo);
+      } catch (e) {
+        console.error("receivedBroadcastMessage: malformed sourceInfo", sourceInfo, e);
+        continue;
+      }
+
+      const {moduleURI, symbolName} = sourceInfo;
+
+      const module = {};
+      try {
+        ChromeUtils.import(moduleURI, module);
+      } catch (e) {
+        console.error("receivedBroadcastMessage: couldn't invoke", broadcastId,
+                      "because import of module", moduleURI,
+                      "failed", e);
+        continue;
+      }
+
+      if (!module[symbolName]) {
+        console.error("receivedBroadcastMessage: couldn't invoke", broadcastId,
+                      "because module", moduleName, "missing attribute", symbolName);
+        continue;
+      }
+
+      const handler = module[symbolName];
+
+      if (!handler.receivedBroadcastMessage) {
+        console.error("receivedBroadcastMessage: couldn't invoke", broadcastId,
+                      "because handler returned by", `${moduleURI}.${symbolName}`,
+                      "has no receivedBroadcastMessage method");
+        continue;
+      }
+
+      try {
+        await handler.receivedBroadcastMessage(version, broadcastId);
+      } catch (e) {
+        console.error("receivedBroadcastMessage: handler for", broadcastId,
+                      "threw error:", e);
+        continue;
+      }
+
+      // Broadcast message applied successfully. Update the version we
+      // received if it's different than the one we had.  We don't
+      // enforce an ordering here (i.e. we use != instead of <)
+      // because we don't know what the ordering of the service's
+      // versions is going to be.
+      if (this.jsonFile.data.listeners[broadcastId].version != version) {
+        this.jsonFile.data.listeners[broadcastId].version = version;
+        this.jsonFile.saveSoon();
+      }
+    }
+  }
+
+  // For test only.
+  _saveImmediately() {
+    return this.jsonFile._save();
+  }
+}
+
+function initializeBroadcastService() {
+  // Fallback path for xpcshell tests.
+  let path = "broadcast-listeners.json";
+  if (OS.Constants.Path.profileDir) {
+    // Real path for use in a real profile.
+    path = OS.Path.join(OS.Constants.Path.profileDir, path);
+  }
+  return new BroadcastService(PushService, path);
+};
+
+var pushBroadcastService = initializeBroadcastService();
--- a/dom/push/PushService.jsm
+++ b/dom/push/PushService.jsm
@@ -30,16 +30,17 @@ const CONNECTION_PROTOCOLS = (function()
 })();
 
 XPCOMUtils.defineLazyServiceGetter(this, "gPushNotifier",
                                    "@mozilla.org/push/Notifier;1",
                                    "nsIPushNotifier");
 XPCOMUtils.defineLazyServiceGetter(this, "eTLDService",
                                    "@mozilla.org/network/effective-tld-service;1",
                                    "nsIEffectiveTLDService");
+ChromeUtils.defineModuleGetter(this, "pushBroadcastService", "resource://gre/modules/PushBroadcastService.jsm");
 
 var EXPORTED_SYMBOLS = ["PushService"];
 
 XPCOMUtils.defineLazyGetter(this, "console", () => {
   let {ConsoleAPI} = ChromeUtils.import("resource://gre/modules/Console.jsm", {});
   return new ConsoleAPI({
     maxLogLevelPref: "dom.push.loglevel",
     prefix: "PushService",
@@ -211,23 +212,34 @@ var PushService = {
     if (this._state == PUSH_SERVICE_RUNNING) {
       // PushService was not in the offline state, but got notification to
       // go online (a offline notification has not been sent).
       // Disconnect first.
       this._service.disconnect();
     }
 
     let records = await this.getAllUnexpired();
+    let broadcastListeners = await pushBroadcastService.getListeners();
 
+    // In principle, a listener could be added to the
+    // pushBroadcastService here, after we have gotten listeners and
+    // before we're RUNNING, but this can't happen in practice because
+    // the only caller that can add listeners is PushBroadcastService,
+    // and it waits on the same promise we are before it can add
+    // listeners. If PushBroadcastService gets woken first, it will
+    // update the value that is eventually returned from
+    // getListeners.
     this._setState(PUSH_SERVICE_RUNNING);
 
     if (records.length > 0 || prefs.get("alwaysConnect")) {
       // Connect if we have existing subscriptions, or if the always-on pref
-      // is set.
-      this._service.connect(records);
+      // is set. We gate on the pref to let us do load testing before
+      // turning it on for everyone, but if the user has push
+      // subscriptions, we need to connect them anyhow.
+      this._service.connect(records, broadcastListeners);
     }
   },
 
   _changeStateConnectionEnabledEvent: function(enabled) {
     console.debug("changeStateConnectionEnabledEvent()", enabled);
 
     if (this._state < PUSH_SERVICE_CONNECTION_DISABLE &&
         this._state != PUSH_SERVICE_ACTIVATING) {
@@ -457,23 +469,23 @@ var PushService = {
     this._setState(PUSH_SERVICE_ACTIVATING);
 
     prefs.observe("serverURL", this);
     Services.obs.addObserver(this, "quit-application");
 
     if (options.serverURI) {
       // this is use for xpcshell test.
 
-      this._stateChangeProcessEnqueue(_ =>
+      return this._stateChangeProcessEnqueue(_ =>
         this._changeServerURL(options.serverURI, STARTING_SERVICE_EVENT, options));
 
     } else {
       // This is only used for testing. Different tests require connecting to
       // slightly different URLs.
-      this._stateChangeProcessEnqueue(_ =>
+      return this._stateChangeProcessEnqueue(_ =>
         this._changeServerURL(prefs.get("serverURL"), STARTING_SERVICE_EVENT));
     }
   },
 
   _startObservers: function() {
     console.debug("startObservers()");
 
     if (this._state != PUSH_SERVICE_ACTIVATING) {
@@ -736,16 +748,26 @@ var PushService = {
       return this._decryptAndNotifyApp(record, messageID, headers, data);
     }).catch(error => {
       console.error("receivedPushMessage: Error notifying app", error);
       return Ci.nsIPushErrorReporter.ACK_NOT_DELIVERED;
     });
   },
 
   /**
+   * Dispatches a broadcast notification to the BroadcastService.
+   */
+  receivedBroadcastMessage(message) {
+    pushBroadcastService.receivedBroadcastMessage(message.broadcasts)
+      .catch(e => {
+        console.error(e);
+      });;
+  },
+
+  /**
    * Updates a registration record after receiving a push message.
    *
    * @param {String} keyID The push registration ID.
    * @param {Function} updateFunc The function passed to `receivedPushMessage`.
    * @returns {Promise} Resolves with the updated record, or rejects if the
    *  record was not updated.
    */
   _updateRecordAfterPush(keyID, updateFunc) {
@@ -1053,16 +1075,31 @@ var PushService = {
           }
           throw new Error("Push subscription expired");
         }).then(_ => this._lookupOrPutPendingRequest(aPageRecord));
       }
       return record.toSubscription();
     });
   },
 
+  /*
+   * Called only by the PushBroadcastService on the receipt of a new
+   * subscription. Don't call this directly. Go through PushBroadcastService.
+   */
+  async subscribeBroadcast(broadcastId, version) {
+    if (this._state != PUSH_SERVICE_RUNNING) {
+      // Ignore any request to subscribe before we send a hello.
+      // We'll send all the broadcast listeners as part of the hello
+      // anyhow.
+      return;
+    }
+
+    await this._service.sendSubscribeBroadcast(broadcastId, version);
+  },
+
   /**
    * Called on message from the child process.
    *
    * Why is the record being deleted from the local database before the server
    * is told?
    *
    * Unregistration is for the benefit of the app and the AppServer
    * so that the AppServer does not keep pinging a channel the UserAgent isn't
--- a/dom/push/PushServiceAndroidGCM.jsm
+++ b/dom/push/PushServiceAndroidGCM.jsm
@@ -162,17 +162,17 @@ var PushServiceAndroidGCM = {
     Services.obs.removeObserver(this, "PushServiceAndroidGCM:ReceivedPushMessage");
     prefs.ignore("debug", this);
   },
 
   onAlarmFired: function() {
     // No action required.
   },
 
-  connect: function(records) {
+  connect: function(records, broadcastListeners) {
     console.debug("connect:", records);
     // It's possible for the registration or subscriptions backing the
     // PushService to not be registered with the underlying AndroidPushService.
     // Expire those that are unrecognized.
     return EventDispatcher.instance.sendRequestForResult({
       type: "PushServiceAndroidGCM:DumpSubscriptions",
     })
     .then(subscriptions => {
@@ -189,16 +189,20 @@ var PushServiceAndroidGCM = {
         return this._mainPushService.dropRegistrationAndNotifyApp(record.keyID)
           .catch(error => {
             console.error("connect: Error dropping registration", record.keyID, error);
           });
       }));
     });
   },
 
+  sendSubscribeBroadcast: async function(serviceId, version) {
+    // Not implemented yet
+  },
+
   isConnected: function() {
     return this._mainPushService != null;
   },
 
   disconnect: function() {
     console.debug("disconnect");
   },
 
--- a/dom/push/PushServiceHttp2.jsm
+++ b/dom/push/PushServiceHttp2.jsm
@@ -421,20 +421,24 @@ var PushServiceHttp2 = {
 
   validServerURI: function(serverURI) {
     if (serverURI.scheme == "http") {
       return !!prefs.getBoolPref("testing.allowInsecureServerURL", false);
     }
     return serverURI.scheme == "https";
   },
 
-  connect: function(subscriptions) {
+  connect: function(subscriptions, broadcastListeners) {
     this.startConnections(subscriptions);
   },
 
+  sendSubscribeBroadcast: async function(serviceId, version) {
+    // Not implemented yet
+  },
+
   isConnected: function() {
     return this._mainPushService != null;
   },
 
   disconnect: function() {
     this._shutdownConnections(false);
   },
 
--- a/dom/push/PushServiceWebSocket.jsm
+++ b/dom/push/PushServiceWebSocket.jsm
@@ -309,16 +309,18 @@ var PushServiceWebSocket = {
     this._ws.sendMsg(msg);
   },
 
   init: function(options, mainPushService, serverURI) {
     console.debug("init()");
 
     this._mainPushService = mainPushService;
     this._serverURI = serverURI;
+    // Filled in at connect() time
+    this._broadcastListeners = null;
 
     // Override the default WebSocket factory function. The returned object
     // must be null or satisfy the nsIWebSocketChannel interface. Used by
     // the tests to provide a mock WebSocket implementation.
     if (options.makeWebSocket) {
       this._makeWebSocket = options.makeWebSocket;
     }
 
@@ -507,18 +509,19 @@ var PushServiceWebSocket = {
       this._currentState = STATE_WAITING_FOR_WS_START;
     } catch(e) {
       console.error("beginWSSetup: Error opening websocket.",
         "asyncOpen failed", e);
       this._reconnect();
     }
   },
 
-  connect: function(records) {
-    console.debug("connect()");
+  connect: function(records, broadcastListeners) {
+    console.debug("connect()", broadcastListeners);
+    this._broadcastListeners = broadcastListeners;
     this._beginWSSetup();
   },
 
   isConnected: function() {
     return !!this._ws;
   },
 
   /**
@@ -561,16 +564,23 @@ var PushServiceWebSocket = {
       this._sendPendingRequests();
     };
 
     function finishHandshake() {
       this._UAID = reply.uaid;
       this._currentState = STATE_READY;
       prefs.observe("userAgentID", this);
 
+      // Handle broadcasts received in response to the "hello" message.
+      if (reply.broadcasts) {
+        // The reply isn't technically a broadcast message, but it has
+        // the shape of a broadcast message (it has a broadcasts field).
+        this._mainPushService.receivedBroadcastMessage(reply);
+      }
+
       this._dataEnabled = !!reply.use_webpush;
       if (this._dataEnabled) {
         this._mainPushService.getAllUnexpired().then(records =>
           Promise.all(records.map(record =>
             this._mainPushService.ensureCrypto(record).catch(error => {
               console.error("finishHandshake: Error updating record",
                 record.keyID, error);
             })
@@ -742,16 +752,20 @@ var PushServiceWebSocket = {
       if (typeof version === "number" && version >= 0) {
         // FIXME(nsm): this relies on app update notification being infallible!
         // eventually fix this
         this._receivedUpdate(update.channelID, version);
       }
     }
   },
 
+  _handleBroadcastReply: function(reply) {
+    this._mainPushService.receivedBroadcastMessage(reply);
+  },
+
   reportDeliveryError(messageID, reason) {
     console.debug("reportDeliveryError()");
     let code = kDELIVERY_REASON_TO_CODE[reason];
     if (!code) {
       throw new Error('Invalid delivery error reason');
     }
     let data = {messageType: 'nack',
                 version: messageID,
@@ -940,16 +954,17 @@ var PushServiceWebSocket = {
     if (this._currentState != STATE_WAITING_FOR_WS_START) {
       console.error("wsOnStart: NOT in STATE_WAITING_FOR_WS_START. Current",
         "state", this._currentState, "Skipping");
       return;
     }
 
     let data = {
       messageType: "hello",
+      broadcasts: this._broadcastListeners,
       use_webpush: true,
     };
 
     if (this._UAID) {
       data.uaid = this._UAID;
     }
 
     this._wsSendMessage(data);
@@ -1008,17 +1023,17 @@ var PushServiceWebSocket = {
 
     // If it is a ping, do not handle the message.
     if (doNotHandle) {
       return;
     }
 
     // A whitelist of protocol handlers. Add to these if new messages are added
     // in the protocol.
-    let handlers = ["Hello", "Register", "Unregister", "Notification"];
+    let handlers = ["Hello", "Register", "Unregister", "Notification", "Broadcast"];
 
     // Build up the handler name to call from messageType.
     // e.g. messageType == "register" -> _handleRegisterReply.
     let handlerName = reply.messageType[0].toUpperCase() +
                       reply.messageType.slice(1).toLowerCase();
 
     if (!handlers.includes(handlerName)) {
       console.warn("wsOnMessageAvailable: No whitelisted handler", handlerName,
@@ -1107,16 +1122,27 @@ var PushServiceWebSocket = {
       return null;
     }
     this._pendingRequests.delete(key);
     if (!this._hasPendingRequests()) {
       this._requestTimeoutTimer.cancel();
     }
     return request;
   },
+
+  sendSubscribeBroadcast(serviceId, version) {
+    let data = {
+      messageType: "broadcast_subscribe",
+      broadcasts: {
+        [serviceId]: version
+      },
+    };
+
+    this._queueRequest(data);
+  },
 };
 
 function PushRecordWebSocket(record) {
   PushRecord.call(this, record);
   this.channelID = record.channelID;
   this.version = record.version;
 }
 
--- a/dom/push/moz.build
+++ b/dom/push/moz.build
@@ -8,16 +8,17 @@ with Files("**"):
 
 EXTRA_COMPONENTS += [
     'Push.js',
     'Push.manifest',
     'PushComponents.js',
 ]
 
 EXTRA_JS_MODULES += [
+    'PushBroadcastService.jsm',
     'PushCrypto.jsm',
     'PushDB.jsm',
     'PushRecord.jsm',
     'PushService.jsm',
 ]
 
 if CONFIG['MOZ_BUILD_APP'] != 'mobile/android':
     # Everything but Fennec.