--- a/dom/push/PushServiceWebSocket.jsm
+++ b/dom/push/PushServiceWebSocket.jsm
@@ -135,54 +135,114 @@ this.PushServiceWebSocket = {
return "WebSocket";
},
disconnect: function() {
this._shutdownWS();
},
observe: function(aSubject, aTopic, aData) {
+ if (aTopic == "nsPref:changed" && aData == "dom.push.userAgentID") {
+ this._onUAIDChanged();
+ } else if (aTopic == "timer-callback") {
+ this._onTimerFired(aSubject);
+ }
+ },
- switch (aTopic) {
- case "nsPref:changed":
- if (aData == "dom.push.userAgentID") {
- this._shutdownWS();
- this._reconnectAfterBackoff();
+ /**
+ * Handles a UAID change. Unlike reconnects, we cancel all pending requests
+ * after disconnecting. Existing subscriptions stored in IndexedDB will be
+ * dropped on reconnect.
+ */
+ _onUAIDChanged() {
+ console.debug("onUAIDChanged()");
+ this._shutdownWS();
+ this._startBackoffTimer();
+ },
+
+ /** Handles a ping, backoff, or request timer event. */
+ _onTimerFired(timer) {
+ console.debug("onTimerFired()");
+ if (timer == this._pingTimer) {
+ this._sendPing();
+ return;
+ }
+ if (timer == this._backoffTimer) {
+ if (this._reconnectTestCallback) {
+ // Notify the test callback once the client reconnects.
+ let actualRetryTimeout = Date.now() - this._lastDisconnect;
+ this._reconnectTestCallback(actualRetryTimeout);
}
- break;
- case "timer-callback":
- if (aSubject == this._requestTimeoutTimer) {
- if (Object.keys(this._registerRequests).length === 0) {
- this._requestTimeoutTimer.cancel();
- }
+ this._beginWSSetup();
+ return;
+ }
+ if (timer == this._requestTimeoutTimer) {
+ this._timeOutRequests();
+ return;
+ }
+ },
- // Set to true if at least one request timed out.
- let requestTimedOut = false;
- for (let channelID in this._registerRequests) {
- let duration = Date.now() - this._registerRequests[channelID].ctime;
- // If any of the registration requests time out, all the ones after it
- // also made to fail, since we are going to be disconnecting the
- // socket.
- if (requestTimedOut || duration > this._requestTimeout) {
- requestTimedOut = true;
- this._registerRequests[channelID]
- .reject(new Error("Register request timed out for channel ID " +
- channelID));
+ /**
+ * Sends a ping to the server. Skips the request queue, but starts the
+ * request timer. If the socket is already closed, or the server does not
+ * respond within the timeout, the client will back off and reconnect.
+ */
+ _sendPing() {
+ console.debug("sendPing()");
+
+ this._startRequestTimeoutTimer();
+ try {
+ this._wsSendMessage({});
+ this._lastPingTime = Date.now();
+ } catch (e) {
+ console.debug("sendPing: Error sending ping", e);
+ this._reconnect();
+ }
+ },
+
+ /** Times out any pending requests. */
+ _timeOutRequests() {
+ console.debug("timeOutRequests()");
- delete this._registerRequests[channelID];
- }
- }
+ if (!this._hasPendingRequests()) {
+ // Cancel the repeating timer and exit early if we aren't waiting for
+ // pongs or requests.
+ this._requestTimeoutTimer.cancel();
+ return;
+ }
+
+ let now = Date.now();
+
+ // Set to true if at least one request timed out, or we're still waiting
+ // for a pong after the request timeout.
+ let requestTimedOut = false;
- // The most likely reason for a registration request timing out is
- // that the socket has disconnected. Best to reconnect.
+ if (this._lastPingTime > 0 &&
+ now - this._lastPingTime >= this._requestTimeout) {
+ requestTimedOut = true;
+ } else {
+ for (let [channelID, request] of this._registerRequests) {
+ let duration = now - request.ctime;
+ // If any of the registration requests time out, all the ones after it
+ // also made to fail, since we are going to be disconnecting the
+ // socket.
+ requestTimedOut |= duration > this._requestTimeout;
if (requestTimedOut) {
- this._reconnect();
+ request.reject(new Error(
+ "Register request timed out for channel ID " + channelID));
+
+ this._registerRequests.delete(channelID);
}
}
- break;
+ }
+
+ // The most likely reason for a pong or registration request timing out is
+ // that the socket has disconnected. Best to reconnect.
+ if (requestTimedOut) {
+ this._reconnect();
}
},
validServerURI: function(serverURI) {
return serverURI.scheme == "ws" || serverURI.scheme == "wss";
},
get _UAID() {
@@ -195,17 +255,17 @@ this.PushServiceWebSocket = {
"Not updating userAgentID");
return;
}
console.debug("New _UAID", newID);
prefs.set("userAgentID", newID);
},
_ws: null,
- _registerRequests: {},
+ _registerRequests: new Map(),
_currentState: STATE_SHUT_DOWN,
_requestTimeout: 0,
_requestTimeoutTimer: null,
_retryFailCount: 0,
/**
* According to the WS spec, servers should immediately close the underlying
* TCP connection after they close a WebSocket. This causes wsOnStop to be
@@ -251,16 +311,40 @@ this.PushServiceWebSocket = {
/**
* Maximum ping interval that we can reach.
*/
_upperLimit: 0,
/** Indicates whether the server supports Web Push-style message delivery. */
_dataEnabled: false,
+ /** The last time the client sent a ping to the server. */
+ _lastPingTime: 0,
+
+ /** The last time the connection was closed. */
+ _lastDisconnect: 0,
+
+ /**
+ * A one-shot timer used to periodically ping the server, to avoid timing out
+ * idle connections.
+ */
+ _pingTimer: null,
+
+ /** A one-shot timer fired after the reconnect backoff period. */
+ _backoffTimer: null,
+
+ /**
+ * A function called when the client reconnects after backing off.
+ *
+ * @param {Number} actualRetryTimeout The time elapsed between the last
+ * disconnect and reconnect time. This should be >= the backoff delay for
+ * that attempt.
+ */
+ _reconnectTestCallback: null,
+
/**
* Sends a message to the Push Server through an open websocket.
* typeof(msg) shall be an object
*/
_wsSendMessage: function(msg) {
if (!this._ws) {
console.warn("wsSendMessage: No WebSocket initialized.",
"Cannot send a message");
@@ -301,17 +385,17 @@ this.PushServiceWebSocket = {
this._upperLimit = prefs.get('adaptive.upperLimit');
return Promise.resolve();
},
_reconnect: function () {
console.debug("reconnect()");
this._shutdownWS(false);
- this._reconnectAfterBackoff();
+ this._startBackoffTimer();
},
_shutdownWS: function(shouldCancelPending = true) {
console.debug("shutdownWS()");
this._currentState = STATE_SHUT_DOWN;
this._willBeWokenUpByUDP = false;
prefs.ignore("userAgentID", this);
@@ -319,89 +403,124 @@ this.PushServiceWebSocket = {
if (this._wsListener) {
this._wsListener._pushService = null;
}
try {
this._ws.close(0, null);
} catch (e) {}
this._ws = null;
- this._waitingForPong = false;
- if (this._mainPushService) {
- this._mainPushService.stopAlarm();
- } else {
- console.error("shutdownWS: Uninitialized push service");
+ this._lastPingTime = 0;
+
+ if (this._pingTimer) {
+ this._pingTimer.cancel();
}
if (shouldCancelPending) {
this._cancelRegisterRequests();
}
if (this._notifyRequestQueue) {
this._notifyRequestQueue();
this._notifyRequestQueue = null;
}
+
+ this._lastDisconnect = Date.now();
},
uninit: function() {
if (this._udpServer) {
this._udpServer.close();
this._udpServer = null;
}
// All pending requests (ideally none) are dropped at this point. We
// shouldn't have any applications performing registration/unregistration
// or receiving notifications.
this._shutdownWS();
+ if (this._backoffTimer) {
+ this._backoffTimer.cancel();
+ }
if (this._requestTimeoutTimer) {
this._requestTimeoutTimer.cancel();
}
this._mainPushService = null;
this._dataEnabled = false;
},
/**
* How retries work: The goal is to ensure websocket is always up on
* networks not supporting UDP. So the websocket should only be shutdown if
* onServerClose indicates UDP wakeup. If WS is closed due to socket error,
- * _reconnectAfterBackoff() is called. The retry alarm is started and when
+ * _startBackoffTimer() is called. The retry timer is started and when
* it times out, beginWSSetup() is called again.
*
- * On a successful connection, the alarm is cancelled in
- * wsOnMessageAvailable() when the ping alarm is started.
- *
* If we are in the middle of a timeout (i.e. waiting), but
* a register/unregister is called, we don't want to wait around anymore.
* _sendRequest will automatically call beginWSSetup(), which will cancel the
* timer. In addition since the state will have changed, even if a pending
* timer event comes in (because the timer fired the event before it was
* cancelled), so the connection won't be reset.
*/
- _reconnectAfterBackoff: function() {
- console.debug("reconnectAfterBackoff()");
+ _startBackoffTimer() {
+ console.debug("startBackoffTimer()");
//Calculate new ping interval
this._calculateAdaptivePing(true /* wsWentDown */);
// Calculate new timeout, but cap it to pingInterval.
let retryTimeout = prefs.get("retryBaseInterval") *
Math.pow(2, this._retryFailCount);
retryTimeout = Math.min(retryTimeout, prefs.get("pingInterval"));
this._retryFailCount++;
- console.debug("reconnectAfterBackoff: Retry in", retryTimeout,
+ console.debug("startBackoffTimer: Retry in", retryTimeout,
"Try number", this._retryFailCount);
- if (this._mainPushService) {
- this._mainPushService.setAlarm(retryTimeout);
- } else {
- console.error("reconnectAfterBackoff: Uninitialized push service");
+
+ if (!this._backoffTimer) {
+ this._backoffTimer = Cc["@mozilla.org/timer;1"]
+ .createInstance(Ci.nsITimer);
}
+ this._backoffTimer.init(this, retryTimeout, Ci.nsITimer.TYPE_ONE_SHOT);
+ },
+
+ /** Indicates whether we're waiting for pongs or requests. */
+ _hasPendingRequests() {
+ return this._lastPingTime > 0 || this._registerRequests.size > 0;
+ },
+
+ /**
+ * Starts the request timeout timer unless we're already waiting for a pong
+ * or register request.
+ */
+ _startRequestTimeoutTimer() {
+ if (this._hasPendingRequests()) {
+ return;
+ }
+ // start the timer since we now have at least one request
+ if (!this._requestTimeoutTimer) {
+ this._requestTimeoutTimer = Cc["@mozilla.org/timer;1"]
+ .createInstance(Ci.nsITimer);
+ }
+ this._requestTimeoutTimer.init(this,
+ this._requestTimeout,
+ Ci.nsITimer.TYPE_REPEATING_SLACK);
+ },
+
+ /** Unconditionally starts the ping timer. */
+ _startPingTimer() {
+ if (!this._pingTimer) {
+ this._pingTimer = Cc["@mozilla.org/timer;1"]
+ .createInstance(Ci.nsITimer);
+ }
+ this._pingTimer.init(this, prefs.get("pingInterval"),
+ Ci.nsITimer.TYPE_ONE_SHOT);
},
/**
* We need to calculate a new ping based on:
* 1) Latest good ping
* 2) A safe gap between 1) and the calculated new ping (which is
* by default, 1 minute)
*
@@ -575,18 +694,18 @@ this.PushServiceWebSocket = {
console.debug("beginWSSetup()");
if (this._currentState != STATE_SHUT_DOWN) {
console.error("_beginWSSetup: Not in shutdown state! Current state",
this._currentState);
return;
}
// Stop any pending reconnects scheduled for the near future.
- if (this._mainPushService) {
- this._mainPushService.stopAlarm();
+ if (this._backoffTimer) {
+ this._backoffTimer.cancel();
}
let uri = this._serverURI;
if (!uri) {
return;
}
let socket = this._makeWebSocket(uri);
if (!socket) {
@@ -618,84 +737,16 @@ this.PushServiceWebSocket = {
this._beginWSSetup();
}
},
isConnected: function() {
return !!this._ws;
},
- /**
- * There is only one alarm active at any time. This alarm has 3 intervals
- * corresponding to 3 tasks.
- *
- * 1) Reconnect on ping timeout.
- * If we haven't received any messages from the server by the time this
- * alarm fires, the connection is closed and PushService tries to
- * reconnect, repurposing the alarm for (3).
- *
- * 2) Send a ping.
- * The protocol sends a ping ({}) on the wire every pingInterval ms. Once
- * it sends the ping, the alarm goes to task (1) which is waiting for
- * a pong. If data is received after the ping is sent,
- * _wsOnMessageAvailable() will reset the ping alarm (which cancels
- * waiting for the pong). So as long as the connection is fine, pong alarm
- * never fires.
- *
- * 3) Reconnect after backoff.
- * The alarm is set by _reconnectAfterBackoff() and increases in duration
- * every time we try and fail to connect. When it triggers, websocket
- * setup begins again. On successful socket setup, the socket starts
- * receiving messages. The alarm now goes to (2) where it monitors the
- * WebSocket by sending a ping. Since incoming data is a sign of the
- * connection being up, the ping alarm is reset every time data is
- * received.
- */
- onAlarmFired: function() {
- // Conditions are arranged in decreasing specificity.
- // i.e. when _waitingForPong is true, other conditions are also true.
- if (this._waitingForPong) {
- console.debug("onAlarmFired: Did not receive pong in time.",
- "Reconnecting WebSocket");
- this._reconnect();
- }
- else if (this._currentState == STATE_READY) {
- // Send a ping.
- // Bypass the queue; we don't want this to be kept pending.
- // Watch out for exception in case the socket has disconnected.
- // When this happens, we pretend the ping was sent and don't specially
- // handle the exception, as the lack of a pong will lead to the socket
- // being reset.
- try {
- this._wsSendMessage({});
- } catch (e) {
- }
-
- this._waitingForPong = true;
- this._mainPushService.setAlarm(prefs.get("requestTimeout"));
- }
- else if (this._mainPushService && this._mainPushService._alarmID !== null) {
- console.debug("onAlarmFired: reconnect alarm fired");
- // Reconnect after back-off.
- // The check for a non-null _alarmID prevents a situation where the alarm
- // fires, but _shutdownWS() is called from another code-path (e.g.
- // network state change) and we don't want to reconnect.
- //
- // It also handles the case where _beginWSSetup() is called from another
- // code-path.
- //
- // alarmID will be non-null only when no shutdown/connect is
- // called between _reconnectAfterBackoff() setting the alarm and the
- // alarm firing.
-
- // Websocket is shut down. Backoff interval expired, try to connect.
- this._beginWSSetup();
- }
- },
-
_acquireWakeLock: function() {
if (!AppConstants.MOZ_B2G) {
return;
}
// Disable the wake lock on non-B2G platforms to work around bug 1154492.
if (!this._socketWakeLock) {
console.debug("acquireWakeLock: Acquiring Socket Wakelock");
@@ -813,24 +864,23 @@ this.PushServiceWebSocket = {
},
/**
* Protocol handler invoked by server message.
*/
_handleRegisterReply: function(reply) {
console.debug("handleRegisterReply()");
if (typeof reply.channelID !== "string" ||
- typeof this._registerRequests[reply.channelID] !== "object") {
+ !this._registerRequests.has(reply.channelID)) {
return;
}
- let tmp = this._registerRequests[reply.channelID];
- delete this._registerRequests[reply.channelID];
- if (Object.keys(this._registerRequests).length === 0 &&
- this._requestTimeoutTimer) {
+ let tmp = this._registerRequests.get(reply.channelID);
+ this._registerRequests.delete(reply.channelID);
+ if (!this._hasPendingRequests()) {
this._requestTimeoutTimer.cancel();
}
if (reply.status == 200) {
try {
Services.io.newURI(reply.pushEndpoint, null, null);
}
catch (e) {
@@ -959,37 +1009,29 @@ this.PushServiceWebSocket = {
.getService(Ci.nsIUUIDGenerator);
// generateUUID() gives a UUID surrounded by {...}, slice them off.
return uuidGenerator.generateUUID().toString().slice(1, -1);
},
request: function(action, record) {
console.debug("request() ", action);
- if (Object.keys(this._registerRequests).length === 0) {
- // start the timer since we now have at least one request
- if (!this._requestTimeoutTimer) {
- this._requestTimeoutTimer = Cc["@mozilla.org/timer;1"]
- .createInstance(Ci.nsITimer);
- }
- this._requestTimeoutTimer.init(this,
- this._requestTimeout,
- Ci.nsITimer.TYPE_REPEATING_SLACK);
- }
+ this._startRequestTimeoutTimer();
if (action == "register") {
let data = {channelID: this._generateID(),
messageType: action};
return new Promise((resolve, reject) => {
- this._registerRequests[data.channelID] = {record: record,
- resolve: resolve,
- reject: reject,
- ctime: Date.now()
- };
+ this._registerRequests.set(data.channelID, {
+ record: record,
+ resolve: resolve,
+ reject: reject,
+ ctime: Date.now(),
+ });
this._queueRequest(data);
}).then(record => {
if (!this._dataEnabled) {
return record;
}
return PushCrypto.generateKeys()
.then(([publicKey, privateKey]) => {
record.p256dhPublicKey = publicKey;
@@ -1016,31 +1058,33 @@ this.PushServiceWebSocket = {
this._queue = this._queue
.then(op)
.catch(_ => {});
},
_send(data) {
if (this._currentState == STATE_READY) {
if (data.messageType != "register" ||
- typeof this._registerRequests[data.channelID] == "object") {
+ this._registerRequests.has(data.channelID)) {
// check if request has not been cancelled
this._wsSendMessage(data);
}
}
},
_sendRegisterRequests() {
- this._enqueue(_ => Promise.all(Object.keys(this._registerRequests).map(channelID =>
- this._send({
- messageType: "register",
- channelID: channelID,
- })
- )));
+ this._enqueue(_ => {
+ for (let channelID of this._registerRequests.keys()) {
+ this._send({
+ messageType: "register",
+ channelID: channelID,
+ });
+ }
+ });
},
_queueRequest(data) {
if (data.messageType != "register") {
if (this._currentState != STATE_READY && !this._notifyRequestQueue) {
let promise = new Promise((resolve, reject) => {
this._notifyRequestQueue = resolve;
});
@@ -1143,17 +1187,17 @@ this.PushServiceWebSocket = {
}
this._shutdownWS();
},
_wsOnMessageAvailable: function(context, message) {
console.debug("wsOnMessageAvailable()", message);
- this._waitingForPong = false;
+ this._lastPingTime = 0;
let reply;
try {
reply = JSON.parse(message);
} catch(e) {
console.warn("wsOnMessageAvailable: Invalid JSON", message, e);
return;
}
@@ -1169,18 +1213,18 @@ this.PushServiceWebSocket = {
(reply.messageType === "ping") ||
(typeof reply.messageType != "string")) {
console.debug("wsOnMessageAvailable: Pong received");
this._calculateAdaptivePing(false);
doNotHandle = true;
}
// Reset the ping timer. Note: This path is executed at every step of the
- // handshake, so this alarm does not need to be set explicitly at startup.
- this._mainPushService.setAlarm(prefs.get("pingInterval"));
+ // handshake, so this timer does not need to be set explicitly at startup.
+ this._startPingTimer();
// If it is a ping, do not handle the message.
if (doNotHandle) {
return;
}
// A whitelist of protocol handlers. Add to these if new messages are added
// in the protocol.
@@ -1227,21 +1271,20 @@ this.PushServiceWebSocket = {
// TODO: there should be no pending requests
}
},
/**
* Rejects all pending register requests with errors.
*/
_cancelRegisterRequests: function() {
- for (let channelID in this._registerRequests) {
- let request = this._registerRequests[channelID];
- delete this._registerRequests[channelID];
+ for (let request of this._registerRequests.values()) {
request.reject(new Error("Register request aborted"));
}
+ this._registerRequests.clear();
},
_makeUDPSocket: function() {
return Cc["@mozilla.org/network/udp-socket;1"]
.createInstance(Ci.nsIUDPSocket);
},
/**
--- a/dom/push/test/xpcshell/test_retry_ws.js
+++ b/dom/push/test/xpcshell/test_retry_ws.js
@@ -26,32 +26,29 @@ add_task(function* test_ws_retry() {
pushEndpoint: 'https://example.org/push/1',
scope: 'https://example.net/push/1',
version: 1,
originAttributes: '',
quota: Infinity,
});
let alarmDelays = [];
- let setAlarm = PushService.setAlarm;
- PushService.setAlarm = function(delay) {
+ PushServiceWebSocket._reconnectTestCallback = function(delay) {
alarmDelays.push(delay);
- setAlarm.apply(this, arguments);
};
let handshakeDone;
let handshakePromise = new Promise(resolve => handshakeDone = resolve);
PushService.init({
serverURI: "wss://push.example.org/",
networkInfo: new MockDesktopNetworkInfo(),
makeWebSocket(uri) {
return new MockWebSocket(uri, {
onHello(request) {
if (alarmDelays.length == 10) {
- PushService.setAlarm = setAlarm;
this.serverSendMsg(JSON.stringify({
messageType: 'hello',
status: 200,
uaid: userAgentID,
}));
handshakeDone();
return;
}
@@ -61,11 +58,13 @@ add_task(function* test_ws_retry() {
},
});
yield waitForPromise(
handshakePromise,
45000,
'Timed out waiting for successful handshake'
);
- deepEqual(alarmDelays, [25, 50, 100, 200, 400, 800, 1600, 3200, 6400, 10000],
- 'Wrong reconnect alarm delays');
+ [25, 50, 100, 200, 400, 800, 1600, 3200, 6400, 10000].forEach(function(minDelay, index) {
+ ok(alarmDelays[index] >= minDelay, `Should wait at least ${
+ minDelay}ms before attempt ${index + 1}`);
+ });
});