Bug 1246341 - Include status codes in "ack" and "unregister" requests. r=dragana draft
authorKit Cambridge <kcambridge@mozilla.com>
Mon, 22 Feb 2016 20:37:46 -0800
changeset 335538 faf193bf2da05e7dcb458ab2c61b67104e95258a
parent 335537 03d35dddb9733204dfe758d2416b1db822b95c0d
child 335539 658ee86e5af8d10781b89de96b5a22097c8d927d
push id11804
push userkcambridge@mozilla.com
push dateMon, 29 Feb 2016 19:29:36 +0000
reviewersdragana
bugs1246341
milestone47.0a1
Bug 1246341 - Include status codes in "ack" and "unregister" requests. r=dragana MozReview-Commit-ID: Gsh3FhTfvkX
dom/push/PushComponents.js
dom/push/PushService.jsm
dom/push/PushServiceHttp2.jsm
dom/push/PushServiceWebSocket.jsm
dom/push/test/xpcshell/test_clear_origin_data.js
dom/push/test/xpcshell/test_notification_ack.js
dom/push/test/xpcshell/test_notification_data.js
dom/push/test/xpcshell/test_permissions.js
dom/push/test/xpcshell/test_quota_exceeded.js
dom/push/test/xpcshell/test_quota_observer.js
dom/push/test/xpcshell/test_register_rollback.js
dom/push/test/xpcshell/test_unregister_success.js
--- a/dom/push/PushComponents.js
+++ b/dom/push/PushComponents.js
@@ -103,16 +103,17 @@ Object.assign(PushServiceParent.prototyp
 
   _messages: [
     "Push:Register",
     "Push:Registration",
     "Push:Unregister",
     "Push:Clear",
     "Push:NotificationForOriginShown",
     "Push:NotificationForOriginClosed",
+    "Push:ReportError",
   ],
 
   // nsIPushService methods
 
   subscribe(scope, principal, callback) {
     return this._handleRequest("Push:Register", principal, {
       scope: scope,
     }).then(result => {
@@ -157,32 +158,42 @@ Object.assign(PushServiceParent.prototyp
   notificationForOriginShown(origin) {
     this._service.notificationForOriginShown(origin);
   },
 
   notificationForOriginClosed(origin) {
     this._service.notificationForOriginClosed(origin);
   },
 
+  // nsIPushErrorReporter methods
+
+  reportDeliveryError(messageId, reason) {
+    this._service.reportDeliveryError(messageId, reason);
+  },
+
   receiveMessage(message) {
     if (!this._isValidMessage(message)) {
       return;
     }
     let {name, principal, target, data} = message;
     if (name === "Push:NotificationForOriginShown") {
       this.notificationForOriginShown(data);
       return;
     }
     if (name === "Push:NotificationForOriginClosed") {
       this.notificationForOriginClosed(data);
       return;
     }
     if (!target.assertPermission("push")) {
       return;
     }
+    if (name === "Push:ReportError") {
+      this.reportDeliveryError(data.messageId, data.reason);
+      return;
+    }
     let sender = target.QueryInterface(Ci.nsIMessageSender);
     return this._handleRequest(name, principal, data).then(result => {
       sender.sendAsyncMessage(this._getResponseName(name, "OK"), {
         requestID: data.requestID,
         result: result
       });
     }, error => {
       sender.sendAsyncMessage(this._getResponseName(name, "KO"), {
@@ -319,16 +330,25 @@ Object.assign(PushServiceContent.prototy
   notificationForOriginShown(origin) {
     this._mm.sendAsyncMessage("Push:NotificationForOriginShown", origin);
   },
 
   notificationForOriginClosed(origin) {
     this._mm.sendAsyncMessage("Push:NotificationForOriginClosed", origin);
   },
 
+  // nsIPushErrorReporter methods
+
+  reportDeliveryError(messageId, reason) {
+    this._mm.sendAsyncMessage("Push:ReportError", {
+      messageId: messageId,
+      reason: reason,
+    });
+  },
+
   _addRequest(data) {
     let id = ++this._requestId;
     this._requests.set(id, data);
     return id;
   },
 
   _takeRequest(requestId) {
     let d = this._requests.get(requestId);
--- a/dom/push/PushService.jsm
+++ b/dom/push/PushService.jsm
@@ -22,16 +22,19 @@ const {PushServiceHttp2} = Cu.import("re
 const {PushCrypto} = Cu.import("resource://gre/modules/PushCrypto.jsm");
 
 // Currently supported protocols: WebSocket.
 const CONNECTION_PROTOCOLS = [PushServiceWebSocket, PushServiceHttp2];
 
 XPCOMUtils.defineLazyModuleGetter(this, "AlarmService",
                                   "resource://gre/modules/AlarmService.jsm");
 
+XPCOMUtils.defineLazyModuleGetter(this, "Task",
+                                  "resource://gre/modules/Task.jsm");
+
 XPCOMUtils.defineLazyServiceGetter(this, "gContentSecurityManager",
                                    "@mozilla.org/contentsecuritymanager;1",
                                    "nsIContentSecurityManager");
 
 XPCOMUtils.defineLazyServiceGetter(this, "gPushNotifier",
                                    "@mozilla.org/push/Notifier;1",
                                    "nsIPushNotifier");
 
@@ -288,36 +291,39 @@ this.PushService = {
       return Promise.resolve();
     }
 
     let pattern = JSON.parse(data);
     return this._db.clearIf(record => {
       if (!record.matchesOriginAttributes(pattern)) {
         return false;
       }
-      this._backgroundUnregister(record);
+      this._backgroundUnregister(record,
+                                 Ci.nsIPushErrorReporter.UNSUBSCRIBE_MANUAL);
       return true;
     });
   },
 
   /**
    * Sends an unregister request to the server in the background. If the
    * service is not connected, this function is a no-op.
    *
    * @param {PushRecord} record The record to unregister.
+   * @param {Number} reason An `nsIPushErrorReporter` unsubscribe reason,
+   *  indicating why this record was removed.
    */
-  _backgroundUnregister: function(record) {
+  _backgroundUnregister: function(record, reason) {
     console.debug("backgroundUnregister()");
 
     if (!this._service.isConnected() || !record) {
       return;
     }
 
     console.debug("backgroundUnregister: Notifying server", record);
-    this._sendUnregister(record).catch(e => {
+    this._sendUnregister(record, reason).catch(e => {
       console.error("backgroundUnregister: Error notifying server", e);
     });
   },
 
   // utility function used to add/remove observers in startObservers() and
   // stopObservers()
   getNetworkStateChangeEventName: function() {
     try {
@@ -777,95 +783,127 @@ this.PushService = {
 
   /**
    * Dispatches an incoming message to a service worker, recalculating the
    * quota for the associated push registration. If the quota is exceeded,
    * the registration and message will be dropped, and the worker will not
    * be notified.
    *
    * @param {String} keyID The push registration ID.
-   * @param {String} message The message contents.
+   * @param {String} messageID The message ID, used to report service worker
+   *  delivery failures. For Web Push messages, this is the version. If empty,
+   *  failures will not be reported.
+   * @param {ArrayBuffer|Uint8Array} data The encrypted message data.
    * @param {Object} cryptoParams The message encryption settings.
    * @param {Function} updateFunc A function that receives the existing
    *  registration record as its argument, and returns a new record. If the
    *  function returns `null` or `undefined`, the record will not be updated.
    *  `PushServiceWebSocket` uses this to drop incoming updates with older
    *  versions.
+   * @returns {Promise} Resolves with an `nsIPushErrorReporter` ack status
+   *  code, indicating whether the message was delivered successfully.
    */
-  receivedPushMessage: function(keyID, message, cryptoParams, updateFunc) {
+  receivedPushMessage: Task.async(function* (keyID, messageID, data, cryptoParams, updateFunc) {
     console.debug("receivedPushMessage()");
     Services.telemetry.getHistogramById("PUSH_API_NOTIFICATION_RECEIVED").add();
 
-    let shouldNotify = false;
-    return this.getByKeyID(keyID).then(record => {
+    let status = Ci.nsIPushErrorReporter.ACK_NOT_DELIVERED;
+    try {
+      let record = yield this._updateRecordAfterPush(keyID, updateFunc);
       if (!record) {
-        this._recordDidNotNotify(kDROP_NOTIFICATION_REASON_KEY_NOT_FOUND);
-        throw new Error("No record for key ID " + keyID);
-      }
-      return record.getLastVisit();
-    }).then(lastVisit => {
-      // As a special case, don't notify the service worker if the user
-      // cleared their history.
-      shouldNotify = isFinite(lastVisit);
-      if (!shouldNotify) {
-          this._recordDidNotNotify(kDROP_NOTIFICATION_REASON_NO_HISTORY);
+        throw new Error("Ignoring update for key ID " + keyID);
       }
-      return this._db.update(keyID, record => {
-        let newRecord = updateFunc(record);
-        if (!newRecord) {
-          this._recordDidNotNotify(kDROP_NOTIFICATION_REASON_NO_VERSION_INCREMENT);
-          return null;
-        }
-        // Because `unregister` is advisory only, we can still receive messages
-        // for stale Simple Push registrations from the server. To work around
-        // this, we check if the record has expired before *and* after updating
-        // the quota.
-        if (newRecord.isExpired()) {
-          console.error("receivedPushMessage: Ignoring update for expired key ID",
-            keyID);
-          return null;
-        }
-        newRecord.receivedPush(lastVisit);
-        return newRecord;
-      });
-    }).then(record => {
-      var notified = false;
-      if (!record) {
-        return notified;
+      status = yield this._decryptAndNotifyApp(record, messageID, data, cryptoParams);
+      // Update quota after the delay, at which point
+      // we check for visible notifications.
+      setTimeout(() => this._updateQuota(keyID),
+        prefs.get("quotaUpdateDelay"));
+    } catch (error) {
+      console.error("receivedPushMessage: Error notifying app", error);
+    }
+    return status;
+  }),
+
+  /**
+   * Updates a registration record after receiving a push message.
+   *
+   * @param {String} keyID The push registration ID.
+   * @param {Function} updateFunc The function passed to `receivedPushMessage`.
+   * @returns {Promise} Resolves with the updated record, or `null` if the
+   *  record was not updated.
+   */
+  _updateRecordAfterPush: Task.async(function* (keyID, updateFunc) {
+    let record = yield this.getByKeyID(keyID);
+    if (!record) {
+      this._recordDidNotNotify(kDROP_NOTIFICATION_REASON_KEY_NOT_FOUND);
+      throw new Error("No record for key ID " + keyID);
+    }
+
+    let lastVisit = yield record.getLastVisit();
+
+    // As a special case, don't notify the service worker if the user
+    // cleared their history.
+    if (!isFinite(lastVisit)) {
+      this._recordDidNotNotify(kDROP_NOTIFICATION_REASON_NO_HISTORY);
+      throw new Error("Ignoring message sent to unvisited origin");
+    }
+
+    // Update the record, resetting the quota if the user has visited the
+    // site since the last push.
+    let newRecord = yield this._db.update(keyID, record => {
+      let newRecord = updateFunc(record);
+      if (!newRecord) {
+        this._recordDidNotNotify(kDROP_NOTIFICATION_REASON_NO_VERSION_INCREMENT);
+        return null;
       }
-      let decodedPromise;
-      if (cryptoParams) {
-        decodedPromise = PushCrypto.decodeMsg(
-          message,
+      // Because `unregister` is advisory only, we can still receive messages
+      // for stale Simple Push registrations from the server. To work around
+      // this, we check if the record has expired before *and* after updating
+      // the quota.
+      if (newRecord.isExpired()) {
+        return null;
+      }
+      newRecord.receivedPush(lastVisit);
+      return newRecord;
+    });
+
+    return newRecord;
+  }),
+
+  /**
+   * Decrypts an incoming message and notifies the associated service worker.
+   *
+   * @param {PushRecord} record The receiving registration.
+   * @param {String} messageID The message ID.
+   * @param {ArrayBuffer|Uint8Array} data The encrypted message data.
+   * @param {Object} cryptoParams The message encryption settings.
+   * @returns {Promise} Resolves with an ack status code.
+   */
+  _decryptAndNotifyApp: Task.async(function* (record, messageID, data, cryptoParams) {
+    let message = null;
+    if (cryptoParams) {
+      try {
+        message = yield PushCrypto.decodeMsg(
+          data,
           record.p256dhPrivateKey,
           record.p256dhPublicKey,
           cryptoParams.dh,
           cryptoParams.salt,
           cryptoParams.rs,
           cryptoParams.auth ? record.authenticationSecret : null
         );
-      } else {
-        decodedPromise = Promise.resolve(null);
+      } catch (error) {
+        console.error("receivedPushMessage: Error decrypting message", error);
+        return Ci.nsIPushErrorReporter.ACK_DECRYPTION_ERROR;
       }
-      return decodedPromise.then(message => {
-        if (shouldNotify) {
-          notified = this._notifyApp(record, message);
-        }
-        // Update quota after the delay, at which point
-        // we check for visible notifications.
-        setTimeout(() => this._updateQuota(keyID),
-          prefs.get("quotaUpdateDelay"));
-        return notified;
-      }, error => {
-        console.error("receivedPushMessage: Error decrypting message", error);
-      });
-    }).catch(error => {
-      console.error("receivedPushMessage: Error notifying app", error);
-    });
-  },
+    }
+    return this._notifyApp(record, messageID, message) ?
+           Ci.nsIPushErrorReporter.ACK_DELIVERED :
+           Ci.nsIPushErrorReporter.ACK_NOT_DELIVERED;
+  }),
 
   _updateQuota: function(keyID) {
     console.debug("updateQuota()");
 
     this._db.update(keyID, record => {
       // Record may have expired from an earlier quota update.
       if (record.isExpired()) {
         console.debug(
@@ -879,17 +917,18 @@ this.PushService = {
       }
       return record;
     }).then(record => {
       if (record && record.isExpired()) {
         this._recordDidNotNotify(kDROP_NOTIFICATION_REASON_EXPIRED);
         // Drop the registration in the background. If the user returns to the
         // site, the service worker will be notified on the next `idle-daily`
         // event.
-        this._backgroundUnregister(record);
+        this._backgroundUnregister(record,
+          Ci.nsIPushErrorReporter.UNSUBSCRIBE_QUOTA_EXCEEDED);
       }
       if (this._updateQuotaTestCallback) {
         // Callback so that test may be notified when the quota update is complete.
         this._updateQuotaTestCallback();
       }
     }).catch(error => {
       console.debug("updateQuota: Error while trying to update quota", error);
     });
@@ -917,17 +956,27 @@ this.PushService = {
     }
     if (count > 1) {
       this._visibleNotifications.set(origin, count - 1);
     } else {
       this._visibleNotifications.delete(origin);
     }
   },
 
-  _notifyApp: function(aPushRecord, message) {
+  reportDeliveryError(messageID, reason) {
+    console.debug("reportDeliveryError()", messageID, reason);
+    if (this._state == PUSH_SERVICE_RUNNING &&
+        this._service.isConnected()) {
+
+      // Only report errors if we're initialized and connected.
+      this._service.reportDeliveryError(messageID, reason);
+    }
+  },
+
+  _notifyApp: function(aPushRecord, messageID, message) {
     if (!aPushRecord || !aPushRecord.scope ||
         aPushRecord.originAttributes === undefined) {
       console.error("notifyApp: Invalid record", aPushRecord);
       return false;
     }
 
     console.debug("notifyApp()", aPushRecord.scope);
 
@@ -943,42 +992,49 @@ this.PushService = {
     if (aPushRecord.quotaApplies()) {
       // Don't record telemetry for chrome push messages.
       Services.telemetry.getHistogramById("PUSH_API_NOTIFY").add();
     }
 
     if (payload) {
       gPushNotifier.notifyPushWithData(aPushRecord.scope,
                                        aPushRecord.principal,
-                                       payload.length, payload);
+                                       messageID, payload.length, payload);
     } else {
-      gPushNotifier.notifyPush(aPushRecord.scope, aPushRecord.principal);
+      gPushNotifier.notifyPush(aPushRecord.scope, aPushRecord.principal,
+                               messageID);
     }
 
     return true;
   },
 
   getByKeyID: function(aKeyID) {
     return this._db.getByKeyID(aKeyID);
   },
 
   getAllUnexpired: function() {
     return this._db.getAllUnexpired();
   },
 
-  _sendRequest: function(action, aRecord) {
+  _sendRequest: function(action, ...params) {
     if (this._state == PUSH_SERVICE_CONNECTION_DISABLE) {
       return Promise.reject(new Error("Push service disabled"));
     } else if (this._state == PUSH_SERVICE_ACTIVE_OFFLINE) {
       if (this._service.serviceType() == "WebSocket" && action == "unregister") {
         return Promise.resolve();
       }
       return Promise.reject(new Error("Push service offline"));
     }
-    return this._service.request(action, aRecord);
+    switch (action) {
+      case "register":
+        return this._service.register(...params);
+      case "unregister":
+        return this._service.unregister(...params);
+    }
+    return Promise.reject(new Error("Unknown request type: " + action));
   },
 
   /**
    * Called on message from the child process. aPageRecord is an object sent by
    * navigator.push, identifying the sending page and other fields.
    */
   _registerWithServer: function(aPageRecord) {
     console.debug("registerWithServer()", aPageRecord);
@@ -991,19 +1047,19 @@ this.PushService = {
         this._deletePendingRequest(aPageRecord);
         return record.toSubscription();
       }, err => {
         this._deletePendingRequest(aPageRecord);
         throw err;
      });
   },
 
-  _sendUnregister: function(aRecord) {
+  _sendUnregister: function(aRecord, aReason) {
     Services.telemetry.getHistogramById("PUSH_API_UNSUBSCRIBE_ATTEMPT").add();
-    return this._sendRequest("unregister", aRecord).then(function(v) {
+    return this._sendRequest("unregister", aRecord, aReason).then(function(v) {
       Services.telemetry.getHistogramById("PUSH_API_UNSUBSCRIBE_SUCCEEDED").add();
       return v;
     }).catch(function(e) {
       Services.telemetry.getHistogramById("PUSH_API_UNSUBSCRIBE_FAILED").add();
       return Promise.reject(e);
     });
   },
 
@@ -1017,17 +1073,18 @@ this.PushService = {
     return this._db.put(aRecord)
       .then(record => {
         Services.telemetry.getHistogramById("PUSH_API_SUBSCRIBE_SUCCEEDED").add();
         return record;
       })
       .catch(error => {
         Services.telemetry.getHistogramById("PUSH_API_SUBSCRIBE_FAILED").add()
         // Unable to save. Destroy the subscription in the background.
-        this._backgroundUnregister(aRecord);
+        this._backgroundUnregister(aRecord,
+                                   Ci.nsIPushErrorReporter.UNSUBSCRIBE_MANUAL);
         throw error;
       });
   },
 
   /**
    * Exceptions thrown in _onRegisterError are caught by the promise obtained
    * from _service.request, causing the promise to be rejected instead.
    */
@@ -1104,17 +1161,18 @@ this.PushService = {
 
     return this._getByPageRecord(aPageRecord)
       .then(record => {
         if (record === undefined) {
           return false;
         }
 
         return Promise.all([
-          this._sendUnregister(record),
+          this._sendUnregister(record,
+                               Ci.nsIPushErrorReporter.UNSUBSCRIBE_MANUAL),
           this._db.delete(record.keyID),
         ]).then(() => true);
       });
   },
 
   clear: function(info) {
     if (info.domain == "*") {
       return this._clearAll();
@@ -1214,17 +1272,18 @@ this.PushService = {
 
     if (data == "cleared") {
       // If the permission list was cleared, drop all registrations
       // that are subject to quota.
       return this._db.clearIf(record => {
         if (record.quotaApplies()) {
           if (!record.isExpired()) {
             // Drop the registration in the background.
-            this._backgroundUnregister(record);
+            this._backgroundUnregister(record,
+              Ci.nsIPushErrorReporter.UNSUBSCRIBE_PERMISSION_REVOKED);
           }
           return true;
         }
         return false;
       });
     }
 
     let permission = subject.QueryInterface(Ci.nsIPermission);
@@ -1290,17 +1349,18 @@ this.PushService = {
   _permissionDenied: function(record, cursor) {
     console.debug("permissionDenied()");
 
     if (!record.quotaApplies() || record.isExpired()) {
       // Ignore already-expired records.
       return;
     }
     // Drop the registration in the background.
-    this._backgroundUnregister(record);
+    this._backgroundUnregister(record,
+      Ci.nsIPushErrorReporter.UNSUBSCRIBE_PERMISSION_REVOKED);
     record.setQuota(0);
     cursor.update(record);
   },
 
   /**
    * The update function called for each registration record if the push
    * permission is granted. If the record has expired, it will be dropped;
    * otherwise, its quota will be reset to the default value.
--- a/dom/push/PushServiceHttp2.jsm
+++ b/dom/push/PushServiceHttp2.jsm
@@ -442,17 +442,17 @@ this.PushServiceHttp2 = {
                       .createInstance(Ci.nsILoadGroup);
     chan.loadGroup = loadGroup;
     return chan;
   },
 
   /**
    * Subscribe new resource.
    */
-  _subscribeResource: function(aRecord) {
+  register: function(aRecord) {
     console.debug("subscribeResource()");
 
     return this._subscribeResourceInternal({
       record: aRecord,
       retries: 0
     })
     .then(result =>
       PushCrypto.generateKeys()
@@ -695,38 +695,30 @@ this.PushServiceHttp2 = {
   },
 
   uninit: function() {
     console.debug("uninit()");
     this._shutdownConnections(true);
     this._mainPushService = null;
   },
 
-
-  request: function(action, aRecord) {
-    switch (action) {
-      case "register":
-        return this._subscribeResource(aRecord);
-     case "unregister":
-        this._shutdownSubscription(aRecord.subscriptionUri);
-        return this._unsubscribeResource(aRecord.subscriptionUri);
-      default:
-        return Promise.reject(new Error("Unknown request type: " + action));
-    }
+  unregister: function(aRecord) {
+    this._shutdownSubscription(aRecord.subscriptionUri);
+    return this._unsubscribeResource(aRecord.subscriptionUri);
   },
 
   /** Push server has deleted subscription.
    *  Re-subscribe - if it succeeds send update db record and send
    *                 pushsubscriptionchange,
    *               - on error delete record and send pushsubscriptionchange
    *  TODO: maybe pushsubscriptionerror will be included.
    */
   _resubscribe: function(aSubscriptionUri) {
     this._mainPushService.getByKeyID(aSubscriptionUri)
-      .then(record => this._subscribeResource(record)
+      .then(record => this.register(record)
         .then(recordNew => {
           if (this._mainPushService) {
             this._mainPushService
                 .updateRegistrationAndNotifyApp(aSubscriptionUri, recordNew)
                 .catch(Cu.reportError);
           }
         }, error => {
           if (this._mainPushService) {
@@ -769,17 +761,17 @@ this.PushServiceHttp2 = {
       this._retryAfterBackoff(aSubscriptionUri, -1);
     }
   },
 
   _pushChannelOnStop: function(aUri, aAckUri, aMessage, cryptoParams) {
     console.debug("pushChannelOnStop()");
 
     this._mainPushService.receivedPushMessage(
-      aUri, aMessage, cryptoParams, record => {
+      aUri, "", aMessage, cryptoParams, record => {
         // Always update the stored record.
         return record;
       }
     )
     .then(_ => this._ackMsgRecv(aAckUri))
     .catch(err => {
       console.error("pushChannelOnStop: Error receiving message",
         err);
--- a/dom/push/PushServiceWebSocket.jsm
+++ b/dom/push/PushServiceWebSocket.jsm
@@ -858,48 +858,43 @@ this.PushServiceWebSocket = {
 
   _handleDataUpdate: function(update) {
     let promise;
     if (typeof update.channelID != "string") {
       console.warn("handleDataUpdate: Discarding update without channel ID",
         update);
       return;
     }
-    // Unconditionally ack the update. This is important because the Push
-    // server requires the client to ack all outstanding updates before
-    // resuming delivery. However, the server doesn't verify the encryption
-    // params, and can't ensure that an update is encrypted correctly because
-    // it doesn't have the private key. Thus, if we only acked valid updates,
-    // it would be possible for a single invalid one to block delivery of all
-    // subsequent updates. A nack would be more appropriate for this case, but
-    // the protocol doesn't currently support them.
-    this._sendAck(update.channelID, update.version);
     if (typeof update.data != "string") {
       promise = this._mainPushService.receivedPushMessage(
         update.channelID,
+        update.version,
         null,
         null,
         record => record
       );
     } else {
       let params = getCryptoParams(update.headers);
       if (!params) {
         console.warn("handleDataUpdate: Discarding invalid encrypted message",
           update);
         return;
       }
       let message = base64UrlDecode(update.data);
       promise = this._mainPushService.receivedPushMessage(
         update.channelID,
+        update.version,
         message,
         params,
         record => record
       );
     }
-    promise.catch(err => {
+    promise.then(status => {
+      this._sendAck(update.channelID, update.version, status);
+    }).catch(err => {
       console.error("handleDataUpdate: Error delivering message", err);
     });
   },
 
   /**
    * Protocol handler invoked by server message.
    */
   _handleNotificationReply: function(reply) {
@@ -934,79 +929,121 @@ this.PushServiceWebSocket = {
       if (typeof version === "string") {
         version = parseInt(version, 10);
       }
 
       if (typeof version === "number" && version >= 0) {
         // FIXME(nsm): this relies on app update notification being infallible!
         // eventually fix this
         this._receivedUpdate(update.channelID, version);
-        this._sendAck(update.channelID, version);
       }
     }
   },
 
-  // FIXME(nsm): batch acks for efficiency reasons.
-  _sendAck: function(channelID, version) {
+  reportDeliveryError(messageID, reason) {
+    console.debug("reportDeliveryError()");
+    var data = {messageType: 'nack',
+                version: messageID};
+    switch (reason) {
+      case Ci.nsIPushErrorReporter.DELIVERY_UNCAUGHT_EXCEPTION:
+        data.code = 301;
+        break;
+      case Ci.nsIPushErrorReporter.DELIVERY_UNHANDLED_REJECTION:
+        data.code = 302;
+        break;
+      case Ci.nsIPushErrorReporter.DELIVERY_INTERNAL_ERROR:
+        data.code = 303;
+        break;
+      default:
+        throw new Error('Invalid delivery error reason');
+    }
+    this._queueRequest(data);
+  },
+
+  _sendAck: function(channelID, version, status) {
     console.debug("sendAck()");
     var data = {messageType: 'ack',
                 updates: [{channelID: channelID,
-                           version: version}]
-               };
+                           version: version}]};
+    switch (status) {
+      case Ci.nsIPushErrorReporter.ACK_DELIVERED:
+        data.code = 100;
+        break;
+      case Ci.nsIPushErrorReporter.ACK_DECRYPTION_ERROR:
+        data.code = 101;
+        break;
+      case Ci.nsIPushErrorReporter.ACK_NOT_DELIVERED:
+        data.code = 102;
+        break;
+      default:
+        throw new Error('Invalid ack status');
+    }
     this._queueRequest(data);
   },
 
   _generateID: function() {
     let uuidGenerator = Cc["@mozilla.org/uuid-generator;1"]
                           .getService(Ci.nsIUUIDGenerator);
     // generateUUID() gives a UUID surrounded by {...}, slice them off.
     return uuidGenerator.generateUUID().toString().slice(1, -1);
   },
 
-  request: function(action, record) {
-    console.debug("request() ", action);
-
+  register(record) {
     if (Object.keys(this._registerRequests).length === 0) {
       // start the timer since we now have at least one request
       if (!this._requestTimeoutTimer) {
         this._requestTimeoutTimer = Cc["@mozilla.org/timer;1"]
                                       .createInstance(Ci.nsITimer);
       }
       this._requestTimeoutTimer.init(this,
                                      this._requestTimeout,
                                      Ci.nsITimer.TYPE_REPEATING_SLACK);
     }
 
-    if (action == "register") {
-      let data = {channelID: this._generateID(),
-                  messageType: action};
+    let data = {channelID: this._generateID(),
+                messageType: "register"};
 
-      return new Promise((resolve, reject) => {
-        this._registerRequests[data.channelID] = {record: record,
-                                                 resolve: resolve,
-                                                 reject: reject,
-                                                 ctime: Date.now()
-                                                };
-        this._queueRequest(data);
-      }).then(record => {
-        if (!this._dataEnabled) {
+    return new Promise((resolve, reject) => {
+      this._registerRequests[data.channelID] = {record: record,
+                                               resolve: resolve,
+                                               reject: reject,
+                                               ctime: Date.now()
+                                              };
+      this._queueRequest(data);
+    }).then(record => {
+      if (!this._dataEnabled) {
+        return record;
+      }
+      return PushCrypto.generateKeys()
+        .then(([publicKey, privateKey]) => {
+          record.p256dhPublicKey = publicKey;
+          record.p256dhPrivateKey = privateKey;
+          record.authenticationSecret = PushCrypto.generateAuthenticationSecret();
           return record;
-        }
-        return PushCrypto.generateKeys()
-          .then(([publicKey, privateKey]) => {
-            record.p256dhPublicKey = publicKey;
-            record.p256dhPrivateKey = privateKey;
-            record.authenticationSecret = PushCrypto.generateAuthenticationSecret();
-            return record;
-          });
-      });
+        });
+    });
+  },
+
+  unregister(record, status) {
+    let data = {channelID: record.channelID,
+                messageType: "unregister"};
+    switch (status) {
+      case Ci.nsIPushErrorReporter.UNSUBSCRIBE_MANUAL:
+        data.code = 200;
+        break;
+      case Ci.nsIPushErrorReporter.UNSUBSCRIBE_QUOTA_EXCEEDED:
+        data.code = 201;
+        break;
+      case Ci.nsIPushErrorReporter.UNSUBSCRIBE_PERMISSION_REVOKED:
+        data.code = 202;
+        break;
+      default:
+        return Promise.reject(new Error('Invalid unregister reason'));
     }
-
-    this._queueRequest({channelID: record.channelID,
-                        messageType: action});
+    this._queueRequest(data);
     return Promise.resolve();
   },
 
   _queueStart: Promise.resolve(),
   _notifyRequestQueue: null,
   _queue: null,
   _enqueue: function(op) {
     console.debug("enqueue()");
@@ -1062,27 +1099,31 @@ this.PushServiceWebSocket = {
         this._notifyRequestQueue = null;
       }
     }
   },
 
   _receivedUpdate: function(aChannelID, aLatestVersion) {
     console.debug("receivedUpdate: Updating", aChannelID, "->", aLatestVersion);
 
-    this._mainPushService.receivedPushMessage(aChannelID, null, null, record => {
+    this._mainPushService.receivedPushMessage(aChannelID, "", null, null, record => {
       if (record.version === null ||
           record.version < aLatestVersion) {
         console.debug("receivedUpdate: Version changed for", aChannelID,
           aLatestVersion);
         record.version = aLatestVersion;
         return record;
       }
       console.debug("receivedUpdate: No significant version change for",
         aChannelID, aLatestVersion);
       return null;
+    }).then(status => {
+      this._sendAck(aChannelID, aLatestVersion, status);
+    }).catch(err => {
+      console.error("receivedUpdate: Error delivering message", err);
     });
   },
 
   // begin Push protocol handshake
   _wsOnStart: function(context) {
     console.debug("wsOnStart()");
     this._releaseWakeLock();
 
--- a/dom/push/test/xpcshell/test_clear_origin_data.js
+++ b/dom/push/test/xpcshell/test_clear_origin_data.js
@@ -103,16 +103,17 @@ add_task(function* test_webapps_cleardat
             messageType: 'register',
             status: 200,
             channelID: data.channelID,
             uaid: userAgentID,
             pushEndpoint: 'https://example.com/update/' + Math.random(),
           }));
         },
         onUnregister(data) {
+          equal(data.code, 200, 'Expected manual unregister reason');
           unregisterDone();
         },
       });
     }
   });
 
   yield Promise.all(testRecords.map(test =>
     PushService.register({
--- a/dom/push/test/xpcshell/test_notification_ack.js
+++ b/dom/push/test/xpcshell/test_notification_ack.js
@@ -71,16 +71,17 @@ add_task(function* test_notification_ack
             updates: [{
               channelID: '21668e05-6da8-42c9-b8ab-9cc3f4d5630c',
               version: 2
             }]
           }));
         },
         onACK(request) {
           equal(request.messageType, 'ack', 'Should send acknowledgements');
+          equal(request.code, 100, 'Should ack successful delivery');
           let updates = request.updates;
           ok(Array.isArray(updates),
             'Should send an array of acknowledged updates');
           equal(updates.length, 1,
             'Should send one acknowledged update per packet');
           switch (++acks) {
           case 1:
             this.serverSendMsg(JSON.stringify({
--- a/dom/push/test/xpcshell/test_notification_data.js
+++ b/dom/push/test/xpcshell/test_notification_data.js
@@ -103,17 +103,17 @@ add_task(function* test_notification_ack
             status: 200,
             use_webpush: true,
           }));
           server = this;
           setupDone();
         },
         onACK(request) {
           if (ackDone) {
-            ackDone(request.updates);
+            ackDone(request);
           }
         }
       });
     }
   });
   yield waitForPromise(setupDonePromise, DEFAULT_TIMEOUT,
                        'Timed out waiting for notifications');
 });
@@ -126,46 +126,49 @@ add_task(function* test_notification_ack
       send: {
         headers: {
           encryption_key: 'keyid="notification1"; dh="BO_tgGm-yvYAGLeRe16AvhzaUcpYRiqgsGOlXpt0DRWDRGGdzVLGlEVJMygqAUECarLnxCiAOHTP_znkedrlWoU"',
           encryption: 'keyid="notification1";salt="uAZaiXpOSfOLJxtOCZ09dA"',
         },
         data: 'NwrrOWPxLE8Sv5Rr0Kep7n0-r_j3rsYrUw_CXPo',
         version: 'v1',
       },
+      ackCode: 100,
       receive: {
         scope: 'https://example.com/page/1',
         data: 'Some message'
       }
     },
     {
       channelID: 'subscription2',
       version: 'v2',
       send: {
         headers: {
           encryption_key: 'keyid="notification2"; dh="BKVdQcgfncpNyNWsGrbecX0zq3eHIlHu5XbCGmVcxPnRSbhjrA6GyBIeGdqsUL69j5Z2CvbZd-9z1UBH0akUnGQ"',
           encryption: 'keyid="notification2";salt="vFn3t3M_k42zHBdpch3VRw"',
         },
         data: 'Zt9dEdqgHlyAL_l83385aEtb98ZBilz5tgnGgmwEsl5AOCNgesUUJ4p9qUU',
       },
+      ackCode: 100,
       receive: {
         scope: 'https://example.com/page/2',
         data: 'Some message'
       }
     },
     {
       channelID: 'subscription3',
       version: 'v3',
       send: {
         headers: {
           encryption_key: 'keyid="notification3";dh="BD3xV_ACT8r6hdIYES3BJj1qhz9wyv7MBrG9vM2UCnjPzwE_YFVpkD-SGqE-BR2--0M-Yf31wctwNsO1qjBUeMg"',
           encryption: 'keyid="notification3"; salt="DFq188piWU7osPBgqn4Nlg"; rs=24',
         },
         data: 'LKru3ZzxBZuAxYtsaCfaj_fehkrIvqbVd1iSwnwAUgnL-cTeDD-83blxHXTq7r0z9ydTdMtC3UjAcWi8LMnfY-BFzi0qJAjGYIikDA',
       },
+      ackCode: 100,
       receive: {
         scope: 'https://example.com/page/3',
         data: 'Some message'
       }
     },
     // This message uses the newer, authenticated form based on the crypto-key
     // header field.  No padding or record size changes.
     {
@@ -173,71 +176,92 @@ add_task(function* test_notification_ack
       version: 'v4',
       send: {
         headers: {
           crypto_key: 'keyid=v4;dh="BHqG01j7rOfp12BEDzxWXxlCaU4cdOx2DZAwCt3QuzEsnXN9lCna9QmZCkVpXsx7sAlaEmtl_VfF1lHlFS7XWcA"',
           encryption: 'keyid="v4";salt="X5-iy5rzhm4naNmMHdSYJw"',
         },
         data: '7YlxyNlZsNX4UNknHxzTqFrcrzz58W95uXBa0iY',
       },
+      ackCode: 100,
       receive: {
         scope: 'https://example.com/page/1',
         data: 'Some message'
       }
     },
     // A message with 17 bytes of padding and rs of 24
     {
       channelID: 'subscription2',
       version: 'v5',
       send: {
         headers: {
           crypto_key: 'keyid="v5"; dh="BJhyKIH5P30YUKn1bolj_LMnael1-KZT_aGXgD2CRspBfv9gcUhVAmpxToZrw7QQEKl9K83b3zcqNY6G_dFhEsI"',
           encryption: 'keyid=v5;salt="bLmqCy550eK1Ao41tD7orA";rs=24',
         },
         data: 'SQDlDg1ftLkM_ruZlmyB2bk9L78HYtkcbA-y4-uAxwL-G4KtOA-J-A_rJ007Vi6NUkQe9K4kSZeIBrIUpmGv',
       },
+      ackCode: 100,
       receive: {
         scope: 'https://example.com/page/2',
         data: 'Some message'
       }
     },
     // A message without key identifiers.
     {
       channelID: 'subscription3',
       version: 'v6',
       send: {
         headers: {
           crypto_key: 'dh="BEgnDmVw9Gcn1fWA5t53Jtpsgfewk_pzsjSc_PBPpPmROWGQA2v8ESrSsQgosNXx0o-uMMhi9tDAUeks3380kd8"',
           encryption: 'salt=T9DM8bNxuMHRVTn4LzkJDQ',
         },
         data: '7KUCi0dBBJbWmsYTqEqhFrgTv4ZOo_BmQRQ_2kY',
       },
+      ackCode: 100,
       receive: {
         scope: 'https://example.com/page/3',
         data: 'Some message'
       }
     },
+    // A malformed encrypted message.
+    {
+      channelID: 'subscription3',
+      version: 'v7',
+      send: {
+        headers: {
+          crypto_key: 'dh=AAAAAAAA',
+          encryption: 'salt=AAAAAAAA',
+        },
+        data: 'AAAAAAAA',
+      },
+      ackCode: 101,
+      receive: false,
+    },
   ];
 
   let sendAndReceive = testData => {
-    let messageReceived = promiseObserverNotification('push-message', (subject, data) => {
+    let messageReceived = testData.receive ? promiseObserverNotification('push-message', (subject, data) => {
       let notification = subject.QueryInterface(Ci.nsIPushMessage);
       equal(notification.text(), testData.receive.data,
             'Check data for notification ' + testData.version);
       equal(data, testData.receive.scope,
             'Check scope for notification ' + testData.version);
       return true;
-    });
+    }) : Promise.resolve();
 
     let ackReceived = new Promise(resolve => ackDone = resolve)
         .then(ackData => {
-          deepEqual([{
-            channelID: testData.channelID,
-            version: testData.version
-          }], ackData, 'Check updates for acknowledgment ' + testData.version);
+          deepEqual({
+            messageType: 'ack',
+            updates: [{
+              channelID: testData.channelID,
+              version: testData.version,
+            }],
+            code: testData.ackCode,
+          }, ackData, 'Check updates for acknowledgment ' + testData.version);
         });
 
     let msg = JSON.parse(JSON.stringify(testData.send));
     msg.messageType = 'notification';
     msg.channelID = testData.channelID;
     msg.version = testData.version;
     server.serverSendMsg(JSON.stringify(msg));
 
--- a/dom/push/test/xpcshell/test_permissions.js
+++ b/dom/push/test/xpcshell/test_permissions.js
@@ -113,16 +113,18 @@ add_task(function* setUp() {
           }));
           handshakeDone();
         },
         onUnregister(request) {
           let resolve = unregisterDefers[request.channelID];
           equal(typeof resolve, 'function',
             'Dropped unexpected channel ID ' + request.channelID);
           delete unregisterDefers[request.channelID];
+          equal(request.code, 202,
+            'Expected permission revoked unregister reason');
           resolve();
         },
         onACK(request) {},
       });
     }
   });
   yield waitForPromise(handshakePromise, DEFAULT_TIMEOUT,
     'Timed out waiting for handshake');
--- a/dom/push/test/xpcshell/test_quota_exceeded.js
+++ b/dom/push/test/xpcshell/test_quota_exceeded.js
@@ -118,16 +118,17 @@ add_task(function* test_expiration_origi
             updates: [{
               channelID: '46cc6f6a-c106-4ffa-bb7c-55c60bd50c41',
               version: 1,
             }],
           }));
         },
         onUnregister(request) {
           equal(request.channelID, 'eb33fc90-c883-4267-b5cb-613969e8e349', 'Unregistered wrong channel ID');
+          equal(request.code, 201, 'Expected quota exceeded unregister reason');
           unregisterDone();
         },
         // We expect to receive acks, but don't care about their
         // contents.
         onACK(request) {},
       });
     },
   });
--- a/dom/push/test/xpcshell/test_quota_observer.js
+++ b/dom/push/test/xpcshell/test_quota_observer.js
@@ -86,16 +86,17 @@ add_task(function* test_expiration_histo
             updates: [{
               channelID: '379c0668-8323-44d2-a315-4ee83f1a9ee9',
               version: 2,
             }],
           }));
         },
         onUnregister(request) {
           equal(request.channelID, '379c0668-8323-44d2-a315-4ee83f1a9ee9', 'Dropped wrong channel ID');
+          equal(request.code, 201, 'Expected quota exceeded unregister reason');
           unregisterDone();
         },
         onACK(request) {},
       });
     }
   });
 
   yield waitForPromise(subChangePromise, DEFAULT_TIMEOUT,
--- a/dom/push/test/xpcshell/test_register_rollback.js
+++ b/dom/push/test/xpcshell/test_register_rollback.js
@@ -54,16 +54,17 @@ add_task(function* test_register_rollbac
             status: 200,
             uaid: userAgentID,
             channelID,
             pushEndpoint: 'https://example.com/update/rollback'
           }));
         },
         onUnregister(request) {
           equal(request.channelID, channelID, 'Unregister: wrong channel ID');
+          equal(request.code, 200, 'Expected manual unregister reason');
           this.serverSendMsg(JSON.stringify({
             messageType: 'unregister',
             status: 200,
             channelID
           }));
           unregisterDone();
         }
       });
--- a/dom/push/test/xpcshell/test_unregister_success.js
+++ b/dom/push/test/xpcshell/test_unregister_success.js
@@ -37,16 +37,17 @@ add_task(function* test_unregister_succe
           this.serverSendMsg(JSON.stringify({
             messageType: 'hello',
             status: 200,
             uaid: 'fbe865a6-aeb8-446f-873c-aeebdb8d493c'
           }));
         },
         onUnregister(request) {
           equal(request.channelID, channelID, 'Should include the channel ID');
+          equal(request.code, 200, 'Expected manual unregister reason');
           this.serverSendMsg(JSON.stringify({
             messageType: 'unregister',
             status: 200,
             channelID
           }));
           unregisterDone();
         }
       });