Bug 1243856 - Remove alarms from the Push WebSocket backend. r?dragana draft
authorKit Cambridge <kcambridge@mozilla.com>
Wed, 03 Feb 2016 15:27:34 -0800
changeset 328749 8fa37b11a5ec555d713213becee1b8705da4d570
parent 327605 d07dbd40dcd209124149f331f60dd55c8da33fbe
child 328750 252d3125e4fb49ae4fd5c5a2d3184b28cf2daad1
push id10417
push userkcambridge@mozilla.com
push dateThu, 04 Feb 2016 02:20:26 +0000
reviewersdragana
bugs1243856
milestone47.0a1
Bug 1243856 - Remove alarms from the Push WebSocket backend. r?dragana
dom/push/PushServiceWebSocket.jsm
dom/push/test/xpcshell/test_retry_ws.js
--- a/dom/push/PushServiceWebSocket.jsm
+++ b/dom/push/PushServiceWebSocket.jsm
@@ -135,54 +135,114 @@ this.PushServiceWebSocket = {
     return "WebSocket";
   },
 
   disconnect: function() {
     this._shutdownWS();
   },
 
   observe: function(aSubject, aTopic, aData) {
+    if (aTopic == "nsPref:changed" && aData == "dom.push.userAgentID") {
+      this._onUAIDChanged();
+    } else if (aTopic == "timer-callback") {
+      this._onTimerFired(aSubject);
+    }
+  },
 
-    switch (aTopic) {
-    case "nsPref:changed":
-      if (aData == "dom.push.userAgentID") {
-        this._shutdownWS();
-        this._reconnectAfterBackoff();
+  /**
+   * Handles a UAID change. Unlike reconnects, we cancel all pending requests
+   * after disconnecting. Existing subscriptions stored in IndexedDB will be
+   * dropped on reconnect.
+   */
+  _onUAIDChanged() {
+    console.debug("onUAIDChanged()");
+    this._shutdownWS();
+    this._startBackoffTimer();
+  },
+
+  /** Handles a ping, backoff, or request timer event. */
+  _onTimerFired(timer) {
+    console.debug("onTimerFired()");
+    if (timer == this._pingTimer) {
+      this._sendPing();
+      return;
+    }
+    if (timer == this._backoffTimer) {
+      if (this._reconnectTestCallback) {
+        // Notify the test callback once the client reconnects.
+        let actualRetryTimeout = Date.now() - this._lastDisconnect;
+        this._reconnectTestCallback(actualRetryTimeout);
       }
-      break;
-    case "timer-callback":
-      if (aSubject == this._requestTimeoutTimer) {
-        if (Object.keys(this._registerRequests).length === 0) {
-          this._requestTimeoutTimer.cancel();
-        }
+      this._beginWSSetup();
+      return;
+    }
+    if (timer == this._requestTimeoutTimer) {
+      this._timeOutRequests();
+      return;
+    }
+  },
 
-        // Set to true if at least one request timed out.
-        let requestTimedOut = false;
-        for (let channelID in this._registerRequests) {
-          let duration = Date.now() - this._registerRequests[channelID].ctime;
-          // If any of the registration requests time out, all the ones after it
-          // also made to fail, since we are going to be disconnecting the
-          // socket.
-          if (requestTimedOut || duration > this._requestTimeout) {
-            requestTimedOut = true;
-            this._registerRequests[channelID]
-              .reject(new Error("Register request timed out for channel ID " +
-                  channelID));
+  /**
+   * Sends a ping to the server. Skips the request queue, but starts the
+   * request timer. If the socket is already closed, or the server does not
+   * respond within the timeout, the client will back off and reconnect.
+   */
+  _sendPing() {
+    console.debug("sendPing()");
+
+    this._startRequestTimeoutTimer();
+    try {
+      this._wsSendMessage({});
+      this._lastPingTime = Date.now();
+    } catch (e) {
+      console.debug("sendPing: Error sending ping", e);
+      this._reconnect();
+    }
+  },
+
+  /** Times out any pending requests. */
+  _timeOutRequests() {
+    console.debug("timeOutRequests()");
 
-            delete this._registerRequests[channelID];
-          }
-        }
+    if (!this._hasPendingRequests()) {
+      // Cancel the repeating timer and exit early if we aren't waiting for
+      // pongs or requests.
+      this._requestTimeoutTimer.cancel();
+      return;
+    }
+
+    let now = Date.now();
+
+    // Set to true if at least one request timed out, or we're still waiting
+    // for a pong after the request timeout.
+    let requestTimedOut = false;
 
-        // The most likely reason for a registration request timing out is
-        // that the socket has disconnected. Best to reconnect.
+    if (this._lastPingTime > 0 &&
+        now - this._lastPingTime >= this._requestTimeout) {
+      requestTimedOut = true;
+    } else {
+      for (let [channelID, request] of this._registerRequests) {
+        let duration = now - request.ctime;
+        // If any of the registration requests time out, all the ones after it
+        // also made to fail, since we are going to be disconnecting the
+        // socket.
+        requestTimedOut |= duration > this._requestTimeout;
         if (requestTimedOut) {
-          this._reconnect();
+          request.reject(new Error(
+            "Register request timed out for channel ID " + channelID));
+
+          this._registerRequests.delete(channelID);
         }
       }
-      break;
+    }
+
+    // The most likely reason for a pong or registration request timing out is
+    // that the socket has disconnected. Best to reconnect.
+    if (requestTimedOut) {
+      this._reconnect();
     }
   },
 
   validServerURI: function(serverURI) {
     return serverURI.scheme == "ws" || serverURI.scheme == "wss";
   },
 
   get _UAID() {
@@ -195,17 +255,17 @@ this.PushServiceWebSocket = {
         "Not updating userAgentID");
       return;
     }
     console.debug("New _UAID", newID);
     prefs.set("userAgentID", newID);
   },
 
   _ws: null,
-  _registerRequests: {},
+  _registerRequests: new Map(),
   _currentState: STATE_SHUT_DOWN,
   _requestTimeout: 0,
   _requestTimeoutTimer: null,
   _retryFailCount: 0,
 
   /**
    * According to the WS spec, servers should immediately close the underlying
    * TCP connection after they close a WebSocket. This causes wsOnStop to be
@@ -251,16 +311,40 @@ this.PushServiceWebSocket = {
   /**
    * Maximum ping interval that we can reach.
    */
   _upperLimit: 0,
 
   /** Indicates whether the server supports Web Push-style message delivery. */
   _dataEnabled: false,
 
+  /** The last time the client sent a ping to the server. */
+  _lastPingTime: 0,
+
+  /** The last time the connection was closed. */
+  _lastDisconnect: 0,
+
+  /**
+   * A one-shot timer used to periodically ping the server, to avoid timing out
+   * idle connections.
+   */
+  _pingTimer: null,
+
+  /** A one-shot timer fired after the reconnect backoff period. */
+  _backoffTimer: null,
+
+  /**
+   * A function called when the client reconnects after backing off.
+   *
+   * @param {Number} actualRetryTimeout The time elapsed between the last
+   *  disconnect and reconnect time. This should be >= the backoff delay for
+   *  that attempt.
+   */
+  _reconnectTestCallback: null,
+
   /**
    * Sends a message to the Push Server through an open websocket.
    * typeof(msg) shall be an object
    */
   _wsSendMessage: function(msg) {
     if (!this._ws) {
       console.warn("wsSendMessage: No WebSocket initialized.",
         "Cannot send a message");
@@ -301,17 +385,17 @@ this.PushServiceWebSocket = {
     this._upperLimit = prefs.get('adaptive.upperLimit');
 
     return Promise.resolve();
   },
 
   _reconnect: function () {
     console.debug("reconnect()");
     this._shutdownWS(false);
-    this._reconnectAfterBackoff();
+    this._startBackoffTimer();
   },
 
   _shutdownWS: function(shouldCancelPending = true) {
     console.debug("shutdownWS()");
     this._currentState = STATE_SHUT_DOWN;
     this._willBeWokenUpByUDP = false;
 
     prefs.ignore("userAgentID", this);
@@ -319,89 +403,124 @@ this.PushServiceWebSocket = {
     if (this._wsListener) {
       this._wsListener._pushService = null;
     }
     try {
         this._ws.close(0, null);
     } catch (e) {}
     this._ws = null;
 
-    this._waitingForPong = false;
-    if (this._mainPushService) {
-      this._mainPushService.stopAlarm();
-    } else {
-      console.error("shutdownWS: Uninitialized push service");
+    this._lastPingTime = 0;
+
+    if (this._pingTimer) {
+      this._pingTimer.cancel();
     }
 
     if (shouldCancelPending) {
       this._cancelRegisterRequests();
     }
 
     if (this._notifyRequestQueue) {
       this._notifyRequestQueue();
       this._notifyRequestQueue = null;
     }
+
+    this._lastDisconnect = Date.now();
   },
 
   uninit: function() {
     if (this._udpServer) {
       this._udpServer.close();
       this._udpServer = null;
     }
 
     // All pending requests (ideally none) are dropped at this point. We
     // shouldn't have any applications performing registration/unregistration
     // or receiving notifications.
     this._shutdownWS();
 
+    if (this._backoffTimer) {
+      this._backoffTimer.cancel();
+    }
     if (this._requestTimeoutTimer) {
       this._requestTimeoutTimer.cancel();
     }
 
     this._mainPushService = null;
 
     this._dataEnabled = false;
   },
 
   /**
    * How retries work:  The goal is to ensure websocket is always up on
    * networks not supporting UDP. So the websocket should only be shutdown if
    * onServerClose indicates UDP wakeup.  If WS is closed due to socket error,
-   * _reconnectAfterBackoff() is called.  The retry alarm is started and when
+   * _startBackoffTimer() is called. The retry timer is started and when
    * it times out, beginWSSetup() is called again.
    *
-   * On a successful connection, the alarm is cancelled in
-   * wsOnMessageAvailable() when the ping alarm is started.
-   *
    * If we are in the middle of a timeout (i.e. waiting), but
    * a register/unregister is called, we don't want to wait around anymore.
    * _sendRequest will automatically call beginWSSetup(), which will cancel the
    * timer. In addition since the state will have changed, even if a pending
    * timer event comes in (because the timer fired the event before it was
    * cancelled), so the connection won't be reset.
    */
-  _reconnectAfterBackoff: function() {
-    console.debug("reconnectAfterBackoff()");
+  _startBackoffTimer() {
+    console.debug("startBackoffTimer()");
     //Calculate new ping interval
     this._calculateAdaptivePing(true /* wsWentDown */);
 
     // Calculate new timeout, but cap it to pingInterval.
     let retryTimeout = prefs.get("retryBaseInterval") *
                        Math.pow(2, this._retryFailCount);
     retryTimeout = Math.min(retryTimeout, prefs.get("pingInterval"));
 
     this._retryFailCount++;
 
-    console.debug("reconnectAfterBackoff: Retry in", retryTimeout,
+    console.debug("startBackoffTimer: Retry in", retryTimeout,
       "Try number", this._retryFailCount);
-    if (this._mainPushService) {
-      this._mainPushService.setAlarm(retryTimeout);
-    } else {
-      console.error("reconnectAfterBackoff: Uninitialized push service");
+
+    if (!this._backoffTimer) {
+      this._backoffTimer = Cc["@mozilla.org/timer;1"]
+                               .createInstance(Ci.nsITimer);
     }
+    this._backoffTimer.init(this, retryTimeout, Ci.nsITimer.TYPE_ONE_SHOT);
+  },
+
+  /** Indicates whether we're waiting for pongs or requests. */
+  _hasPendingRequests() {
+    return this._lastPingTime > 0 || this._registerRequests.size > 0;
+  },
+
+  /**
+   * Starts the request timeout timer unless we're already waiting for a pong
+   * or register request.
+   */
+  _startRequestTimeoutTimer() {
+    if (this._hasPendingRequests()) {
+      return;
+    }
+    // start the timer since we now have at least one request
+    if (!this._requestTimeoutTimer) {
+      this._requestTimeoutTimer = Cc["@mozilla.org/timer;1"]
+                                    .createInstance(Ci.nsITimer);
+    }
+    this._requestTimeoutTimer.init(this,
+                                   this._requestTimeout,
+                                   Ci.nsITimer.TYPE_REPEATING_SLACK);
+  },
+
+  /** Unconditionally starts the ping timer. */
+  _startPingTimer() {
+    if (!this._pingTimer) {
+      this._pingTimer = Cc["@mozilla.org/timer;1"]
+                          .createInstance(Ci.nsITimer);
+    }
+    this._pingTimer.init(this, prefs.get("pingInterval"),
+                         Ci.nsITimer.TYPE_ONE_SHOT);
   },
 
   /**
    * We need to calculate a new ping based on:
    *  1) Latest good ping
    *  2) A safe gap between 1) and the calculated new ping (which is
    *  by default, 1 minute)
    *
@@ -575,18 +694,18 @@ this.PushServiceWebSocket = {
     console.debug("beginWSSetup()");
     if (this._currentState != STATE_SHUT_DOWN) {
       console.error("_beginWSSetup: Not in shutdown state! Current state",
         this._currentState);
       return;
     }
 
     // Stop any pending reconnects scheduled for the near future.
-    if (this._mainPushService) {
-      this._mainPushService.stopAlarm();
+    if (this._backoffTimer) {
+      this._backoffTimer.cancel();
     }
 
     let uri = this._serverURI;
     if (!uri) {
       return;
     }
     let socket = this._makeWebSocket(uri);
     if (!socket) {
@@ -618,84 +737,16 @@ this.PushServiceWebSocket = {
       this._beginWSSetup();
     }
   },
 
   isConnected: function() {
     return !!this._ws;
   },
 
-  /**
-   * There is only one alarm active at any time. This alarm has 3 intervals
-   * corresponding to 3 tasks.
-   *
-   * 1) Reconnect on ping timeout.
-   *    If we haven't received any messages from the server by the time this
-   *    alarm fires, the connection is closed and PushService tries to
-   *    reconnect, repurposing the alarm for (3).
-   *
-   * 2) Send a ping.
-   *    The protocol sends a ping ({}) on the wire every pingInterval ms. Once
-   *    it sends the ping, the alarm goes to task (1) which is waiting for
-   *    a pong. If data is received after the ping is sent,
-   *    _wsOnMessageAvailable() will reset the ping alarm (which cancels
-   *    waiting for the pong). So as long as the connection is fine, pong alarm
-   *    never fires.
-   *
-   * 3) Reconnect after backoff.
-   *    The alarm is set by _reconnectAfterBackoff() and increases in duration
-   *    every time we try and fail to connect.  When it triggers, websocket
-   *    setup begins again. On successful socket setup, the socket starts
-   *    receiving messages. The alarm now goes to (2) where it monitors the
-   *    WebSocket by sending a ping.  Since incoming data is a sign of the
-   *    connection being up, the ping alarm is reset every time data is
-   *    received.
-   */
-  onAlarmFired: function() {
-    // Conditions are arranged in decreasing specificity.
-    // i.e. when _waitingForPong is true, other conditions are also true.
-    if (this._waitingForPong) {
-      console.debug("onAlarmFired: Did not receive pong in time.",
-        "Reconnecting WebSocket");
-      this._reconnect();
-    }
-    else if (this._currentState == STATE_READY) {
-      // Send a ping.
-      // Bypass the queue; we don't want this to be kept pending.
-      // Watch out for exception in case the socket has disconnected.
-      // When this happens, we pretend the ping was sent and don't specially
-      // handle the exception, as the lack of a pong will lead to the socket
-      // being reset.
-      try {
-        this._wsSendMessage({});
-      } catch (e) {
-      }
-
-      this._waitingForPong = true;
-      this._mainPushService.setAlarm(prefs.get("requestTimeout"));
-    }
-    else if (this._mainPushService && this._mainPushService._alarmID !== null) {
-      console.debug("onAlarmFired: reconnect alarm fired");
-      // Reconnect after back-off.
-      // The check for a non-null _alarmID prevents a situation where the alarm
-      // fires, but _shutdownWS() is called from another code-path (e.g.
-      // network state change) and we don't want to reconnect.
-      //
-      // It also handles the case where _beginWSSetup() is called from another
-      // code-path.
-      //
-      // alarmID will be non-null only when no shutdown/connect is
-      // called between _reconnectAfterBackoff() setting the alarm and the
-      // alarm firing.
-
-      // Websocket is shut down. Backoff interval expired, try to connect.
-      this._beginWSSetup();
-    }
-  },
-
   _acquireWakeLock: function() {
     if (!AppConstants.MOZ_B2G) {
       return;
     }
 
     // Disable the wake lock on non-B2G platforms to work around bug 1154492.
     if (!this._socketWakeLock) {
       console.debug("acquireWakeLock: Acquiring Socket Wakelock");
@@ -813,24 +864,23 @@ this.PushServiceWebSocket = {
   },
 
   /**
    * Protocol handler invoked by server message.
    */
   _handleRegisterReply: function(reply) {
     console.debug("handleRegisterReply()");
     if (typeof reply.channelID !== "string" ||
-        typeof this._registerRequests[reply.channelID] !== "object") {
+        !this._registerRequests.has(reply.channelID)) {
       return;
     }
 
-    let tmp = this._registerRequests[reply.channelID];
-    delete this._registerRequests[reply.channelID];
-    if (Object.keys(this._registerRequests).length === 0 &&
-        this._requestTimeoutTimer) {
+    let tmp = this._registerRequests.get(reply.channelID);
+    this._registerRequests.delete(reply.channelID);
+    if (!this._hasPendingRequests()) {
       this._requestTimeoutTimer.cancel();
     }
 
     if (reply.status == 200) {
       try {
         Services.io.newURI(reply.pushEndpoint, null, null);
       }
       catch (e) {
@@ -959,37 +1009,29 @@ this.PushServiceWebSocket = {
                           .getService(Ci.nsIUUIDGenerator);
     // generateUUID() gives a UUID surrounded by {...}, slice them off.
     return uuidGenerator.generateUUID().toString().slice(1, -1);
   },
 
   request: function(action, record) {
     console.debug("request() ", action);
 
-    if (Object.keys(this._registerRequests).length === 0) {
-      // start the timer since we now have at least one request
-      if (!this._requestTimeoutTimer) {
-        this._requestTimeoutTimer = Cc["@mozilla.org/timer;1"]
-                                      .createInstance(Ci.nsITimer);
-      }
-      this._requestTimeoutTimer.init(this,
-                                     this._requestTimeout,
-                                     Ci.nsITimer.TYPE_REPEATING_SLACK);
-    }
+    this._startRequestTimeoutTimer();
 
     if (action == "register") {
       let data = {channelID: this._generateID(),
                   messageType: action};
 
       return new Promise((resolve, reject) => {
-        this._registerRequests[data.channelID] = {record: record,
-                                                 resolve: resolve,
-                                                 reject: reject,
-                                                 ctime: Date.now()
-                                                };
+        this._registerRequests.set(data.channelID, {
+          record: record,
+          resolve: resolve,
+          reject: reject,
+          ctime: Date.now(),
+        });
         this._queueRequest(data);
       }).then(record => {
         if (!this._dataEnabled) {
           return record;
         }
         return PushCrypto.generateKeys()
           .then(([publicKey, privateKey]) => {
             record.p256dhPublicKey = publicKey;
@@ -1016,31 +1058,33 @@ this.PushServiceWebSocket = {
     this._queue = this._queue
                     .then(op)
                     .catch(_ => {});
   },
 
   _send(data) {
     if (this._currentState == STATE_READY) {
       if (data.messageType != "register" ||
-        typeof this._registerRequests[data.channelID] == "object") {
+          this._registerRequests.has(data.channelID)) {
 
         // check if request has not been cancelled
         this._wsSendMessage(data);
       }
     }
   },
 
   _sendRegisterRequests() {
-    this._enqueue(_ => Promise.all(Object.keys(this._registerRequests).map(channelID =>
-      this._send({
-        messageType: "register",
-        channelID: channelID,
-      })
-    )));
+    this._enqueue(_ => {
+      for (let channelID of this._registerRequests.keys()) {
+        this._send({
+          messageType: "register",
+          channelID: channelID,
+        });
+      }
+    });
   },
 
   _queueRequest(data) {
     if (data.messageType != "register") {
       if (this._currentState != STATE_READY && !this._notifyRequestQueue) {
         let promise = new Promise((resolve, reject) => {
           this._notifyRequestQueue = resolve;
         });
@@ -1143,17 +1187,17 @@ this.PushServiceWebSocket = {
     }
 
     this._shutdownWS();
   },
 
   _wsOnMessageAvailable: function(context, message) {
     console.debug("wsOnMessageAvailable()", message);
 
-    this._waitingForPong = false;
+    this._lastPingTime = 0;
 
     let reply;
     try {
       reply = JSON.parse(message);
     } catch(e) {
       console.warn("wsOnMessageAvailable: Invalid JSON", message, e);
       return;
     }
@@ -1169,18 +1213,18 @@ this.PushServiceWebSocket = {
         (reply.messageType === "ping") ||
         (typeof reply.messageType != "string")) {
       console.debug("wsOnMessageAvailable: Pong received");
       this._calculateAdaptivePing(false);
       doNotHandle = true;
     }
 
     // Reset the ping timer.  Note: This path is executed at every step of the
-    // handshake, so this alarm does not need to be set explicitly at startup.
-    this._mainPushService.setAlarm(prefs.get("pingInterval"));
+    // handshake, so this timer does not need to be set explicitly at startup.
+    this._startPingTimer();
 
     // If it is a ping, do not handle the message.
     if (doNotHandle) {
       return;
     }
 
     // A whitelist of protocol handlers. Add to these if new messages are added
     // in the protocol.
@@ -1227,21 +1271,20 @@ this.PushServiceWebSocket = {
       // TODO: there should be no pending requests
     }
   },
 
   /**
    * Rejects all pending register requests with errors.
    */
   _cancelRegisterRequests: function() {
-    for (let channelID in this._registerRequests) {
-      let request = this._registerRequests[channelID];
-      delete this._registerRequests[channelID];
+    for (let request of this._registerRequests.values()) {
       request.reject(new Error("Register request aborted"));
     }
+    this._registerRequests.clear();
   },
 
   _makeUDPSocket: function() {
     return Cc["@mozilla.org/network/udp-socket;1"]
              .createInstance(Ci.nsIUDPSocket);
   },
 
   /**
--- a/dom/push/test/xpcshell/test_retry_ws.js
+++ b/dom/push/test/xpcshell/test_retry_ws.js
@@ -26,32 +26,29 @@ add_task(function* test_ws_retry() {
     pushEndpoint: 'https://example.org/push/1',
     scope: 'https://example.net/push/1',
     version: 1,
     originAttributes: '',
     quota: Infinity,
   });
 
   let alarmDelays = [];
-  let setAlarm = PushService.setAlarm;
-  PushService.setAlarm = function(delay) {
+  PushServiceWebSocket._reconnectTestCallback = function(delay) {
     alarmDelays.push(delay);
-    setAlarm.apply(this, arguments);
   };
 
   let handshakeDone;
   let handshakePromise = new Promise(resolve => handshakeDone = resolve);
   PushService.init({
     serverURI: "wss://push.example.org/",
     networkInfo: new MockDesktopNetworkInfo(),
     makeWebSocket(uri) {
       return new MockWebSocket(uri, {
         onHello(request) {
           if (alarmDelays.length == 10) {
-            PushService.setAlarm = setAlarm;
             this.serverSendMsg(JSON.stringify({
               messageType: 'hello',
               status: 200,
               uaid: userAgentID,
             }));
             handshakeDone();
             return;
           }
@@ -61,11 +58,13 @@ add_task(function* test_ws_retry() {
     },
   });
 
   yield waitForPromise(
     handshakePromise,
     45000,
     'Timed out waiting for successful handshake'
   );
-  deepEqual(alarmDelays, [25, 50, 100, 200, 400, 800, 1600, 3200, 6400, 10000],
-    'Wrong reconnect alarm delays');
+  [25, 50, 100, 200, 400, 800, 1600, 3200, 6400, 10000].forEach(function(minDelay, index) {
+    ok(alarmDelays[index] >= minDelay, `Should wait at least ${
+      minDelay}ms before attempt ${index + 1}`);
+  });
 });