new file mode 100644
--- /dev/null
+++ b/dom/push/PushBroadcastService.jsm
@@ -0,0 +1,216 @@
+/* jshint moz: true, esnext: true */
+/* This Source Code Form is subject to the terms of the Mozilla Public
+ * License, v. 2.0. If a copy of the MPL was not distributed with this file,
+ * You can obtain one at http://mozilla.org/MPL/2.0/. */
+
+"use strict";
+
+ChromeUtils.import("resource://gre/modules/osfile.jsm");
+ChromeUtils.import("resource://gre/modules/XPCOMUtils.jsm");
+ChromeUtils.defineModuleGetter(this, "JSONFile", "resource://gre/modules/JSONFile.jsm");
+
+var EXPORTED_SYMBOLS = ["pushBroadcastService"];
+
+// We are supposed to ignore any updates with this version.
+// FIXME: what is the actual "dummy" version?
+// See bug 1467550.
+const DUMMY_VERSION_STRING = "dummy";
+
+XPCOMUtils.defineLazyGetter(this, "console", () => {
+ let {ConsoleAPI} = ChromeUtils.import("resource://gre/modules/Console.jsm", {});
+ return new ConsoleAPI({
+ maxLogLevelPref: "dom.push.loglevel",
+ prefix: "BroadcastService",
+ });
+});
+ChromeUtils.defineModuleGetter(this, "PushService", "resource://gre/modules/PushService.jsm");
+
+class InvalidSourceInfo extends Error {
+ constructor(message) {
+ super(message);
+ this.name = 'InvalidSourceInfo';
+ }
+}
+
+const BROADCAST_SERVICE_VERSION = 1;
+
+var BroadcastService = class {
+ constructor(pushService, path) {
+ this.pushService = pushService;
+ this.jsonFile = new JSONFile({
+ path,
+ dataPostProcessor: this._initializeJSONFile,
+ });
+ this.initializePromise = this.jsonFile.load();
+ }
+
+ /**
+ * Convert the listeners from our on-disk format to the format
+ * needed by a hello message.
+ */
+ async getListeners() {
+ await this.initializePromise;
+ return Object.entries(this.jsonFile.data.listeners).reduce((acc, [k, v]) => {
+ acc[k] = v.version;
+ return acc;
+ }, {});
+ }
+
+ _initializeJSONFile(data) {
+ if (!data.version) {
+ data.version = BROADCAST_SERVICE_VERSION;
+ }
+ if (!data.hasOwnProperty("listeners")) {
+ data.listeners = {};
+ }
+ return data;
+ }
+
+ /**
+ * Reset to a state akin to what you would get in a new profile.
+ * In particular, wipe anything from storage.
+ *
+ * Used mainly for testing.
+ */
+ async _resetListeners() {
+ await this.initializePromise;
+ this.jsonFile.data = this._initializeJSONFile({});
+ this.initializePromise = Promise.resolve();
+ }
+
+ /**
+ * Ensure that a sourceInfo is correct (has the expected fields).
+ */
+ _validateSourceInfo(sourceInfo) {
+ const {moduleURI, symbolName} = sourceInfo;
+ if (typeof moduleURI !== "string") {
+ throw new InvalidSourceInfo(`moduleURI must be a string (got ${typeof moduleURI})`);
+ }
+ if (typeof symbolName !== "string") {
+ throw new InvalidSourceInfo(`symbolName must be a string (got ${typeof symbolName})`);
+ }
+ }
+
+ /**
+ * Add an entry for a given listener if it isn't present, or update
+ * one if it is already present.
+ *
+ * Note that this means only a single listener can be set for a
+ * given subscription. This is a limitation in the current API that
+ * stems from the fact that there can only be one source of truth
+ * for the subscriber's version. As a workaround, you can define a
+ * listener which calls multiple other listeners.
+ *
+ * @param {string} broadcastId The broadcastID to listen for
+ * @param {string} version The most recent version we have for
+ * updates from this broadcastID
+ * @param {Object} sourceInfo A description of the handler for
+ * updates on this broadcastID
+ */
+ async addListener(broadcastId, version, sourceInfo) {
+ console.info("addListener: adding listener", broadcastId, version, sourceInfo);
+ await this.initializePromise;
+ this._validateSourceInfo(sourceInfo);
+ if (typeof version !== "string") {
+ throw new TypeError("version should be a string");
+ }
+ const isNew = !this.jsonFile.data.listeners.hasOwnProperty(broadcastId);
+
+ // Update listeners before telling the pushService to subscribe,
+ // in case it would disregard the update in the small window
+ // between getting listeners and setting state to RUNNING.
+ this.jsonFile.data.listeners[broadcastId] = {version, sourceInfo};
+ this.jsonFile.saveSoon();
+
+ if (isNew) {
+ await this.pushService.subscribeBroadcast(broadcastId, version);
+ }
+ }
+
+ async receivedBroadcastMessage(broadcasts) {
+ console.info("receivedBroadcastMessage:", broadcasts);
+ await this.initializePromise;
+ for (const broadcastId in broadcasts) {
+ const version = broadcasts[broadcastId];
+ if (version === DUMMY_VERSION_STRING) {
+ console.info("Ignoring", version, "because it's the dummy version");
+ continue;
+ }
+ // We don't know this broadcastID. This is probably a bug?
+ if (!this.jsonFile.data.listeners.hasOwnProperty(broadcastId)) {
+ console.warn("receivedBroadcastMessage: unknown broadcastId", broadcastId);
+ continue;
+ }
+
+ const {sourceInfo} = this.jsonFile.data.listeners[broadcastId];
+ try {
+ this._validateSourceInfo(sourceInfo);
+ } catch (e) {
+ console.error("receivedBroadcastMessage: malformed sourceInfo", sourceInfo, e);
+ continue;
+ }
+
+ const {moduleURI, symbolName} = sourceInfo;
+
+ const module = {};
+ try {
+ ChromeUtils.import(moduleURI, module);
+ } catch (e) {
+ console.error("receivedBroadcastMessage: couldn't invoke", broadcastId,
+ "because import of module", moduleURI,
+ "failed", e);
+ continue;
+ }
+
+ if (!module[symbolName]) {
+ console.error("receivedBroadcastMessage: couldn't invoke", broadcastId,
+ "because module", moduleName, "missing attribute", symbolName);
+ continue;
+ }
+
+ const handler = module[symbolName];
+
+ if (!handler.receivedBroadcastMessage) {
+ console.error("receivedBroadcastMessage: couldn't invoke", broadcastId,
+ "because handler returned by", `${moduleURI}.${symbolName}`,
+ "has no receivedBroadcastMessage method");
+ continue;
+ }
+
+ try {
+ await handler.receivedBroadcastMessage(version, broadcastId);
+ } catch (e) {
+ console.error("receivedBroadcastMessage: handler for", broadcastId,
+ "threw error:", e);
+ continue;
+ }
+
+ // Broadcast message applied successfully. Update the version we
+ // received if it's different than the one we had. We don't
+ // enforce an ordering here (i.e. we use != instead of <)
+ // because we don't know what the ordering of the service's
+ // versions is going to be.
+ if (this.jsonFile.data.listeners[broadcastId].version != version) {
+ this.jsonFile.data.listeners[broadcastId].version = version;
+ this.jsonFile.saveSoon();
+ }
+ }
+ }
+
+ // For test only.
+ _saveImmediately() {
+ return this.jsonFile._save();
+ }
+}
+
+function initializeBroadcastService() {
+ // Fallback path for xpcshell tests.
+ let path = "broadcast-listeners.json";
+ if (OS.Constants.Path.profileDir) {
+ // Real path for use in a real profile.
+ path = OS.Path.join(OS.Constants.Path.profileDir, path);
+ }
+ return new BroadcastService(PushService, path);
+};
+
+var pushBroadcastService = initializeBroadcastService();
--- a/dom/push/PushService.jsm
+++ b/dom/push/PushService.jsm
@@ -30,16 +30,17 @@ const CONNECTION_PROTOCOLS = (function()
})();
XPCOMUtils.defineLazyServiceGetter(this, "gPushNotifier",
"@mozilla.org/push/Notifier;1",
"nsIPushNotifier");
XPCOMUtils.defineLazyServiceGetter(this, "eTLDService",
"@mozilla.org/network/effective-tld-service;1",
"nsIEffectiveTLDService");
+ChromeUtils.defineModuleGetter(this, "pushBroadcastService", "resource://gre/modules/PushBroadcastService.jsm");
var EXPORTED_SYMBOLS = ["PushService"];
XPCOMUtils.defineLazyGetter(this, "console", () => {
let {ConsoleAPI} = ChromeUtils.import("resource://gre/modules/Console.jsm", {});
return new ConsoleAPI({
maxLogLevelPref: "dom.push.loglevel",
prefix: "PushService",
@@ -211,23 +212,34 @@ var PushService = {
if (this._state == PUSH_SERVICE_RUNNING) {
// PushService was not in the offline state, but got notification to
// go online (a offline notification has not been sent).
// Disconnect first.
this._service.disconnect();
}
let records = await this.getAllUnexpired();
+ let broadcastListeners = await pushBroadcastService.getListeners();
+ // In principle, a listener could be added to the
+ // pushBroadcastService here, after we have gotten listeners and
+ // before we're RUNNING, but this can't happen in practice because
+ // the only caller that can add listeners is PushBroadcastService,
+ // and it waits on the same promise we are before it can add
+ // listeners. If PushBroadcastService gets woken first, it will
+ // update the value that is eventually returned from
+ // getListeners.
this._setState(PUSH_SERVICE_RUNNING);
if (records.length > 0 || prefs.get("alwaysConnect")) {
// Connect if we have existing subscriptions, or if the always-on pref
- // is set.
- this._service.connect(records);
+ // is set. We gate on the pref to let us do load testing before
+ // turning it on for everyone, but if the user has push
+ // subscriptions, we need to connect them anyhow.
+ this._service.connect(records, broadcastListeners);
}
},
_changeStateConnectionEnabledEvent: function(enabled) {
console.debug("changeStateConnectionEnabledEvent()", enabled);
if (this._state < PUSH_SERVICE_CONNECTION_DISABLE &&
this._state != PUSH_SERVICE_ACTIVATING) {
@@ -457,23 +469,23 @@ var PushService = {
this._setState(PUSH_SERVICE_ACTIVATING);
prefs.observe("serverURL", this);
Services.obs.addObserver(this, "quit-application");
if (options.serverURI) {
// this is use for xpcshell test.
- this._stateChangeProcessEnqueue(_ =>
+ return this._stateChangeProcessEnqueue(_ =>
this._changeServerURL(options.serverURI, STARTING_SERVICE_EVENT, options));
} else {
// This is only used for testing. Different tests require connecting to
// slightly different URLs.
- this._stateChangeProcessEnqueue(_ =>
+ return this._stateChangeProcessEnqueue(_ =>
this._changeServerURL(prefs.get("serverURL"), STARTING_SERVICE_EVENT));
}
},
_startObservers: function() {
console.debug("startObservers()");
if (this._state != PUSH_SERVICE_ACTIVATING) {
@@ -736,16 +748,26 @@ var PushService = {
return this._decryptAndNotifyApp(record, messageID, headers, data);
}).catch(error => {
console.error("receivedPushMessage: Error notifying app", error);
return Ci.nsIPushErrorReporter.ACK_NOT_DELIVERED;
});
},
/**
+ * Dispatches a broadcast notification to the BroadcastService.
+ */
+ receivedBroadcastMessage(message) {
+ pushBroadcastService.receivedBroadcastMessage(message.broadcasts)
+ .catch(e => {
+ console.error(e);
+ });;
+ },
+
+ /**
* Updates a registration record after receiving a push message.
*
* @param {String} keyID The push registration ID.
* @param {Function} updateFunc The function passed to `receivedPushMessage`.
* @returns {Promise} Resolves with the updated record, or rejects if the
* record was not updated.
*/
_updateRecordAfterPush(keyID, updateFunc) {
@@ -1053,16 +1075,31 @@ var PushService = {
}
throw new Error("Push subscription expired");
}).then(_ => this._lookupOrPutPendingRequest(aPageRecord));
}
return record.toSubscription();
});
},
+ /*
+ * Called only by the PushBroadcastService on the receipt of a new
+ * subscription. Don't call this directly. Go through PushBroadcastService.
+ */
+ async subscribeBroadcast(broadcastId, version) {
+ if (this._state != PUSH_SERVICE_RUNNING) {
+ // Ignore any request to subscribe before we send a hello.
+ // We'll send all the broadcast listeners as part of the hello
+ // anyhow.
+ return;
+ }
+
+ await this._service.sendSubscribeBroadcast(broadcastId, version);
+ },
+
/**
* Called on message from the child process.
*
* Why is the record being deleted from the local database before the server
* is told?
*
* Unregistration is for the benefit of the app and the AppServer
* so that the AppServer does not keep pinging a channel the UserAgent isn't
--- a/dom/push/PushServiceAndroidGCM.jsm
+++ b/dom/push/PushServiceAndroidGCM.jsm
@@ -162,17 +162,17 @@ var PushServiceAndroidGCM = {
Services.obs.removeObserver(this, "PushServiceAndroidGCM:ReceivedPushMessage");
prefs.ignore("debug", this);
},
onAlarmFired: function() {
// No action required.
},
- connect: function(records) {
+ connect: function(records, broadcastListeners) {
console.debug("connect:", records);
// It's possible for the registration or subscriptions backing the
// PushService to not be registered with the underlying AndroidPushService.
// Expire those that are unrecognized.
return EventDispatcher.instance.sendRequestForResult({
type: "PushServiceAndroidGCM:DumpSubscriptions",
})
.then(subscriptions => {
@@ -189,16 +189,20 @@ var PushServiceAndroidGCM = {
return this._mainPushService.dropRegistrationAndNotifyApp(record.keyID)
.catch(error => {
console.error("connect: Error dropping registration", record.keyID, error);
});
}));
});
},
+ sendSubscribeBroadcast: async function(serviceId, version) {
+ // Not implemented yet
+ },
+
isConnected: function() {
return this._mainPushService != null;
},
disconnect: function() {
console.debug("disconnect");
},
--- a/dom/push/PushServiceHttp2.jsm
+++ b/dom/push/PushServiceHttp2.jsm
@@ -421,20 +421,24 @@ var PushServiceHttp2 = {
validServerURI: function(serverURI) {
if (serverURI.scheme == "http") {
return !!prefs.getBoolPref("testing.allowInsecureServerURL", false);
}
return serverURI.scheme == "https";
},
- connect: function(subscriptions) {
+ connect: function(subscriptions, broadcastListeners) {
this.startConnections(subscriptions);
},
+ sendSubscribeBroadcast: async function(serviceId, version) {
+ // Not implemented yet
+ },
+
isConnected: function() {
return this._mainPushService != null;
},
disconnect: function() {
this._shutdownConnections(false);
},
--- a/dom/push/PushServiceWebSocket.jsm
+++ b/dom/push/PushServiceWebSocket.jsm
@@ -309,16 +309,18 @@ var PushServiceWebSocket = {
this._ws.sendMsg(msg);
},
init: function(options, mainPushService, serverURI) {
console.debug("init()");
this._mainPushService = mainPushService;
this._serverURI = serverURI;
+ // Filled in at connect() time
+ this._broadcastListeners = null;
// Override the default WebSocket factory function. The returned object
// must be null or satisfy the nsIWebSocketChannel interface. Used by
// the tests to provide a mock WebSocket implementation.
if (options.makeWebSocket) {
this._makeWebSocket = options.makeWebSocket;
}
@@ -507,18 +509,19 @@ var PushServiceWebSocket = {
this._currentState = STATE_WAITING_FOR_WS_START;
} catch(e) {
console.error("beginWSSetup: Error opening websocket.",
"asyncOpen failed", e);
this._reconnect();
}
},
- connect: function(records) {
- console.debug("connect()");
+ connect: function(records, broadcastListeners) {
+ console.debug("connect()", broadcastListeners);
+ this._broadcastListeners = broadcastListeners;
this._beginWSSetup();
},
isConnected: function() {
return !!this._ws;
},
/**
@@ -561,16 +564,23 @@ var PushServiceWebSocket = {
this._sendPendingRequests();
};
function finishHandshake() {
this._UAID = reply.uaid;
this._currentState = STATE_READY;
prefs.observe("userAgentID", this);
+ // Handle broadcasts received in response to the "hello" message.
+ if (reply.broadcasts) {
+ // The reply isn't technically a broadcast message, but it has
+ // the shape of a broadcast message (it has a broadcasts field).
+ this._mainPushService.receivedBroadcastMessage(reply);
+ }
+
this._dataEnabled = !!reply.use_webpush;
if (this._dataEnabled) {
this._mainPushService.getAllUnexpired().then(records =>
Promise.all(records.map(record =>
this._mainPushService.ensureCrypto(record).catch(error => {
console.error("finishHandshake: Error updating record",
record.keyID, error);
})
@@ -742,16 +752,20 @@ var PushServiceWebSocket = {
if (typeof version === "number" && version >= 0) {
// FIXME(nsm): this relies on app update notification being infallible!
// eventually fix this
this._receivedUpdate(update.channelID, version);
}
}
},
+ _handleBroadcastReply: function(reply) {
+ this._mainPushService.receivedBroadcastMessage(reply);
+ },
+
reportDeliveryError(messageID, reason) {
console.debug("reportDeliveryError()");
let code = kDELIVERY_REASON_TO_CODE[reason];
if (!code) {
throw new Error('Invalid delivery error reason');
}
let data = {messageType: 'nack',
version: messageID,
@@ -940,16 +954,17 @@ var PushServiceWebSocket = {
if (this._currentState != STATE_WAITING_FOR_WS_START) {
console.error("wsOnStart: NOT in STATE_WAITING_FOR_WS_START. Current",
"state", this._currentState, "Skipping");
return;
}
let data = {
messageType: "hello",
+ broadcasts: this._broadcastListeners,
use_webpush: true,
};
if (this._UAID) {
data.uaid = this._UAID;
}
this._wsSendMessage(data);
@@ -1008,17 +1023,17 @@ var PushServiceWebSocket = {
// If it is a ping, do not handle the message.
if (doNotHandle) {
return;
}
// A whitelist of protocol handlers. Add to these if new messages are added
// in the protocol.
- let handlers = ["Hello", "Register", "Unregister", "Notification"];
+ let handlers = ["Hello", "Register", "Unregister", "Notification", "Broadcast"];
// Build up the handler name to call from messageType.
// e.g. messageType == "register" -> _handleRegisterReply.
let handlerName = reply.messageType[0].toUpperCase() +
reply.messageType.slice(1).toLowerCase();
if (!handlers.includes(handlerName)) {
console.warn("wsOnMessageAvailable: No whitelisted handler", handlerName,
@@ -1107,16 +1122,27 @@ var PushServiceWebSocket = {
return null;
}
this._pendingRequests.delete(key);
if (!this._hasPendingRequests()) {
this._requestTimeoutTimer.cancel();
}
return request;
},
+
+ sendSubscribeBroadcast(serviceId, version) {
+ let data = {
+ messageType: "broadcast_subscribe",
+ broadcasts: {
+ [serviceId]: version
+ },
+ };
+
+ this._queueRequest(data);
+ },
};
function PushRecordWebSocket(record) {
PushRecord.call(this, record);
this.channelID = record.channelID;
this.version = record.version;
}
--- a/dom/push/moz.build
+++ b/dom/push/moz.build
@@ -8,16 +8,17 @@ with Files("**"):
EXTRA_COMPONENTS += [
'Push.js',
'Push.manifest',
'PushComponents.js',
]
EXTRA_JS_MODULES += [
+ 'PushBroadcastService.jsm',
'PushCrypto.jsm',
'PushDB.jsm',
'PushRecord.jsm',
'PushService.jsm',
]
if CONFIG['MOZ_BUILD_APP'] != 'mobile/android':
# Everything but Fennec.