Bug 1356237 - Ship a copy of transport module in marionette. r=ato draft
authorAlexandre Poirot <poirot.alex@gmail.com>
Mon, 24 Apr 2017 20:23:04 +0200
changeset 567537 fb5713f11e9e4b02f6bc0fff4998e259cc99d3d5
parent 566881 73752931e273091185e1e4b5231c28beed657cc8
child 625692 a2cc60aa914875399366ce4e9e79e97c237161d5
push id55618
push userbmo:poirot.alex@gmail.com
push dateTue, 25 Apr 2017 08:02:06 +0000
reviewersato
bugs1356237
milestone55.0a1
Bug 1356237 - Ship a copy of transport module in marionette. r=ato DevTools is about to move out of mozilla-central and be released as an add-on. So that these protocol files are going to be missing from the tree. To accommodate that, we are doing a copy of them next to marionette. MozReview-Commit-ID: 9PyhuwyZyXI
testing/marionette/jar.mn
testing/marionette/packets.js
testing/marionette/server.js
testing/marionette/stream-utils.js
testing/marionette/transport.js
--- a/testing/marionette/jar.mn
+++ b/testing/marionette/jar.mn
@@ -27,16 +27,19 @@ marionette.jar:
   content/atom.js (atom.js)
   content/evaluate.js (evaluate.js)
   content/logging.js (logging.js)
   content/navigate.js (navigate.js)
   content/l10n.js (l10n.js)
   content/assert.js (assert.js)
   content/addon.js (addon.js)
   content/session.js (session.js)
+  content/transport.js (transport.js)
+  content/packets.js (packets.js)
+  content/stream-utils.js (stream-utils.js)
 #ifdef ENABLE_TESTS
   content/test.xul (chrome/test.xul)
   content/test2.xul (chrome/test2.xul)
   content/test_dialog.dtd (chrome/test_dialog.dtd)
   content/test_dialog.properties (chrome/test_dialog.properties)
   content/test_dialog.xul (chrome/test_dialog.xul)
   content/test_nested_iframe.xul (chrome/test_nested_iframe.xul)
   content/test_anonymous_content.xul (chrome/test_anonymous_content.xul)
new file mode 100644
--- /dev/null
+++ b/testing/marionette/packets.js
@@ -0,0 +1,397 @@
+/* This Source Code Form is subject to the terms of the Mozilla Public
+ * License, v. 2.0. If a copy of the MPL was not distributed with this
+ * file, You can obtain one at http://mozilla.org/MPL/2.0/. */
+
+"use strict";
+
+/**
+ * Packets contain read / write functionality for the different packet types
+ * supported by the debugging protocol, so that a transport can focus on
+ * delivery and queue management without worrying too much about the specific
+ * packet types.
+ *
+ * They are intended to be "one use only", so a new packet should be
+ * instantiated for each incoming or outgoing packet.
+ *
+ * A complete Packet type should expose at least the following:
+ *   * read(stream, scriptableStream)
+ *     Called when the input stream has data to read
+ *   * write(stream)
+ *     Called when the output stream is ready to write
+ *   * get done()
+ *     Returns true once the packet is done being read / written
+ *   * destroy()
+ *     Called to clean up at the end of use
+ */
+
+const {classes: Cc, interfaces: Ci, utils: Cu} = Components;
+const { StreamUtils } = Cu.import("chrome://marionette/content/stream-utils.js");
+
+const unicodeConverter = Cc["@mozilla.org/intl/scriptableunicodeconverter"]
+                           .createInstance(Ci.nsIScriptableUnicodeConverter);
+unicodeConverter.charset = "UTF-8";
+
+const defer = function () {
+  let deferred = {
+    promise: new Promise((resolve, reject) => {
+      deferred.resolve = resolve;
+      deferred.reject = reject;
+    })
+  };
+  return deferred;
+};
+
+this.EXPORTED_SYMBOLS = ["RawPacket", "Packet", "JSONPacket", "BulkPacket"];
+
+// The transport's previous check ensured the header length did not exceed 20
+// characters.  Here, we opt for the somewhat smaller, but still large limit of
+// 1 TiB.
+const PACKET_LENGTH_MAX = Math.pow(2, 40);
+
+/**
+ * A generic Packet processing object (extended by two subtypes below).
+ */
+function Packet(transport) {
+  this._transport = transport;
+  this._length = 0;
+}
+
+/**
+ * Attempt to initialize a new Packet based on the incoming packet header we've
+ * received so far.  We try each of the types in succession, trying JSON packets
+ * first since they are much more common.
+ * @param header string
+ *        The packet header string to attempt parsing.
+ * @param transport DebuggerTransport
+ *        The transport instance that will own the packet.
+ * @return Packet
+ *         The parsed packet of the matching type, or null if no types matched.
+ */
+Packet.fromHeader = function (header, transport) {
+  return JSONPacket.fromHeader(header, transport) ||
+         BulkPacket.fromHeader(header, transport);
+};
+
+Packet.prototype = {
+
+  get length() {
+    return this._length;
+  },
+
+  set length(length) {
+    if (length > PACKET_LENGTH_MAX) {
+      throw Error("Packet length " + length + " exceeds the max length of " +
+                  PACKET_LENGTH_MAX);
+    }
+    this._length = length;
+  },
+
+  destroy: function () {
+    this._transport = null;
+  }
+
+};
+
+/**
+ * With a JSON packet (the typical packet type sent via the transport), data is
+ * transferred as a JSON packet serialized into a string, with the string length
+ * prepended to the packet, followed by a colon ([length]:[packet]). The
+ * contents of the JSON packet are specified in the Remote Debugging Protocol
+ * specification.
+ * @param transport DebuggerTransport
+ *        The transport instance that will own the packet.
+ */
+function JSONPacket(transport) {
+  Packet.call(this, transport);
+  this._data = "";
+  this._done = false;
+}
+
+/**
+ * Attempt to initialize a new JSONPacket based on the incoming packet header
+ * we've received so far.
+ * @param header string
+ *        The packet header string to attempt parsing.
+ * @param transport DebuggerTransport
+ *        The transport instance that will own the packet.
+ * @return JSONPacket
+ *         The parsed packet, or null if it's not a match.
+ */
+JSONPacket.fromHeader = function (header, transport) {
+  let match = this.HEADER_PATTERN.exec(header);
+
+  if (!match) {
+    return null;
+  }
+
+  let packet = new JSONPacket(transport);
+  packet.length = +match[1];
+  return packet;
+};
+
+JSONPacket.HEADER_PATTERN = /^(\d+):$/;
+
+JSONPacket.prototype = Object.create(Packet.prototype);
+
+Object.defineProperty(JSONPacket.prototype, "object", {
+  /**
+   * Gets the object (not the serialized string) being read or written.
+   */
+  get: function () {
+    return this._object;
+  },
+
+  /**
+   * Sets the object to be sent when write() is called.
+   */
+  set: function (object) {
+    this._object = object;
+    let data = JSON.stringify(object);
+    this._data = unicodeConverter.ConvertFromUnicode(data);
+    this.length = this._data.length;
+  }
+});
+
+JSONPacket.prototype.read = function (stream, scriptableStream) {
+
+  // Read in more packet data.
+  this._readData(stream, scriptableStream);
+
+  if (!this.done) {
+    // Don't have a complete packet yet.
+    return;
+  }
+
+  let json = this._data;
+  try {
+    json = unicodeConverter.ConvertToUnicode(json);
+    this._object = JSON.parse(json);
+  } catch (e) {
+    let msg = "Error parsing incoming packet: " + json + " (" + e +
+              " - " + e.stack + ")";
+    console.error(msg);
+    dump(msg + "\n");
+    return;
+  }
+
+  this._transport._onJSONObjectReady(this._object);
+};
+
+JSONPacket.prototype._readData = function (stream, scriptableStream) {
+  let bytesToRead = Math.min(this.length - this._data.length,
+                             stream.available());
+  this._data += scriptableStream.readBytes(bytesToRead);
+  this._done = this._data.length === this.length;
+};
+
+JSONPacket.prototype.write = function (stream) {
+
+  if (this._outgoing === undefined) {
+    // Format the serialized packet to a buffer
+    this._outgoing = this.length + ":" + this._data;
+  }
+
+  let written = stream.write(this._outgoing, this._outgoing.length);
+  this._outgoing = this._outgoing.slice(written);
+  this._done = !this._outgoing.length;
+};
+
+Object.defineProperty(JSONPacket.prototype, "done", {
+  get: function () {
+    return this._done;
+  }
+});
+
+JSONPacket.prototype.toString = function () {
+  return JSON.stringify(this._object, null, 2);
+};
+
+/**
+ * With a bulk packet, data is transferred by temporarily handing over the
+ * transport's input or output stream to the application layer for writing data
+ * directly.  This can be much faster for large data sets, and avoids various
+ * stages of copies and data duplication inherent in the JSON packet type.  The
+ * bulk packet looks like:
+ *
+ * bulk [actor] [type] [length]:[data]
+ *
+ * The interpretation of the data portion depends on the kind of actor and the
+ * packet's type.  See the Remote Debugging Protocol Stream Transport spec for
+ * more details.
+ * @param transport DebuggerTransport
+ *        The transport instance that will own the packet.
+ */
+function BulkPacket(transport) {
+  Packet.call(this, transport);
+  this._done = false;
+  this._readyForWriting = defer();
+}
+
+/**
+ * Attempt to initialize a new BulkPacket based on the incoming packet header
+ * we've received so far.
+ * @param header string
+ *        The packet header string to attempt parsing.
+ * @param transport DebuggerTransport
+ *        The transport instance that will own the packet.
+ * @return BulkPacket
+ *         The parsed packet, or null if it's not a match.
+ */
+BulkPacket.fromHeader = function (header, transport) {
+  let match = this.HEADER_PATTERN.exec(header);
+
+  if (!match) {
+    return null;
+  }
+
+  let packet = new BulkPacket(transport);
+  packet.header = {
+    actor: match[1],
+    type: match[2],
+    length: +match[3]
+  };
+  return packet;
+};
+
+BulkPacket.HEADER_PATTERN = /^bulk ([^: ]+) ([^: ]+) (\d+):$/;
+
+BulkPacket.prototype = Object.create(Packet.prototype);
+
+BulkPacket.prototype.read = function (stream) {
+  // Temporarily pause monitoring of the input stream
+  this._transport.pauseIncoming();
+
+  let deferred = defer();
+
+  this._transport._onBulkReadReady({
+    actor: this.actor,
+    type: this.type,
+    length: this.length,
+    copyTo: (output) => {
+      let copying = StreamUtils.copyStream(stream, output, this.length);
+      deferred.resolve(copying);
+      return copying;
+    },
+    stream: stream,
+    done: deferred
+  });
+
+  // Await the result of reading from the stream
+  deferred.promise.then(() => {
+    this._done = true;
+    this._transport.resumeIncoming();
+  }, this._transport.close);
+
+  // Ensure this is only done once
+  this.read = () => {
+    throw new Error("Tried to read() a BulkPacket's stream multiple times.");
+  };
+};
+
+BulkPacket.prototype.write = function (stream) {
+  if (this._outgoingHeader === undefined) {
+    // Format the serialized packet header to a buffer
+    this._outgoingHeader = "bulk " + this.actor + " " + this.type + " " +
+                           this.length + ":";
+  }
+
+  // Write the header, or whatever's left of it to write.
+  if (this._outgoingHeader.length) {
+    let written = stream.write(this._outgoingHeader,
+                               this._outgoingHeader.length);
+    this._outgoingHeader = this._outgoingHeader.slice(written);
+    return;
+  }
+
+  // Temporarily pause the monitoring of the output stream
+  this._transport.pauseOutgoing();
+
+  let deferred = defer();
+
+  this._readyForWriting.resolve({
+    copyFrom: (input) => {
+      let copying = StreamUtils.copyStream(input, stream, this.length);
+      deferred.resolve(copying);
+      return copying;
+    },
+    stream: stream,
+    done: deferred
+  });
+
+  // Await the result of writing to the stream
+  deferred.promise.then(() => {
+    this._done = true;
+    this._transport.resumeOutgoing();
+  }, this._transport.close);
+
+  // Ensure this is only done once
+  this.write = () => {
+    throw new Error("Tried to write() a BulkPacket's stream multiple times.");
+  };
+};
+
+Object.defineProperty(BulkPacket.prototype, "streamReadyForWriting", {
+  get: function () {
+    return this._readyForWriting.promise;
+  }
+});
+
+Object.defineProperty(BulkPacket.prototype, "header", {
+  get: function () {
+    return {
+      actor: this.actor,
+      type: this.type,
+      length: this.length
+    };
+  },
+
+  set: function (header) {
+    this.actor = header.actor;
+    this.type = header.type;
+    this.length = header.length;
+  },
+});
+
+Object.defineProperty(BulkPacket.prototype, "done", {
+  get: function () {
+    return this._done;
+  },
+});
+
+BulkPacket.prototype.toString = function () {
+  return "Bulk: " + JSON.stringify(this.header, null, 2);
+};
+
+/**
+ * RawPacket is used to test the transport's error handling of malformed
+ * packets, by writing data directly onto the stream.
+ * @param transport DebuggerTransport
+ *        The transport instance that will own the packet.
+ * @param data string
+ *        The raw string to send out onto the stream.
+ */
+function RawPacket(transport, data) {
+  Packet.call(this, transport);
+  this._data = data;
+  this.length = data.length;
+  this._done = false;
+}
+
+RawPacket.prototype = Object.create(Packet.prototype);
+
+RawPacket.prototype.read = function (stream) {
+  // This hasn't yet been needed for testing.
+  throw Error("Not implmented.");
+};
+
+RawPacket.prototype.write = function (stream) {
+  let written = stream.write(this._data, this._data.length);
+  this._data = this._data.slice(written);
+  this._done = !this._data.length;
+};
+
+Object.defineProperty(RawPacket.prototype, "done", {
+  get: function () {
+    return this._done;
+  }
+});
--- a/testing/marionette/server.js
+++ b/testing/marionette/server.js
@@ -14,18 +14,17 @@ Cu.import("resource://gre/modules/Prefer
 Cu.import("resource://gre/modules/Services.jsm");
 Cu.import("resource://gre/modules/Task.jsm");
 
 Cu.import("chrome://marionette/content/assert.js");
 Cu.import("chrome://marionette/content/driver.js");
 Cu.import("chrome://marionette/content/error.js");
 Cu.import("chrome://marionette/content/message.js");
 
-// Bug 1083711: Load transport.js as an SDK module instead of subscript
-loader.loadSubScript("resource://devtools/shared/transport/transport.js");
+Cu.import("chrome://marionette/content/transport.js");
 
 const logger = Log.repository.getLogger("Marionette");
 
 const {KeepWhenOffline, LoopbackOnly} = Ci.nsIServerSocket;
 
 this.EXPORTED_SYMBOLS = ["server"];
 this.server = {};
 
new file mode 100644
--- /dev/null
+++ b/testing/marionette/stream-utils.js
@@ -0,0 +1,242 @@
+/* This Source Code Form is subject to the terms of the Mozilla Public
+ * License, v. 2.0. If a copy of the MPL was not distributed with this
+ * file, You can obtain one at http://mozilla.org/MPL/2.0/. */
+
+"use strict";
+
+const {Constructor: CC, classes: Cc, interfaces: Ci, utils: Cu} = Components;
+
+Cu.import("resource://devtools/shared/event-emitter.js");
+Cu.import("resource://gre/modules/Services.jsm");
+
+const IOUtil = Cc["@mozilla.org/io-util;1"].getService(Ci.nsIIOUtil);
+const ScriptableInputStream =
+  CC("@mozilla.org/scriptableinputstream;1",
+     "nsIScriptableInputStream", "init");
+
+this.EXPORTED_SYMBOLS = ["StreamUtils"];
+
+const BUFFER_SIZE = 0x8000;
+
+/**
+ * This helper function (and its companion object) are used by bulk senders and
+ * receivers to read and write data in and out of other streams.  Functions that
+ * make use of this tool are passed to callers when it is time to read or write
+ * bulk data.  It is highly recommended to use these copier functions instead of
+ * the stream directly because the copier enforces the agreed upon length.
+ * Since bulk mode reuses an existing stream, the sender and receiver must write
+ * and read exactly the agreed upon amount of data, or else the entire transport
+ * will be left in a invalid state.  Additionally, other methods of stream
+ * copying (such as NetUtil.asyncCopy) close the streams involved, which would
+ * terminate the debugging transport, and so it is avoided here.
+ *
+ * Overall, this *works*, but clearly the optimal solution would be able to just
+ * use the streams directly.  If it were possible to fully implement
+ * nsIInputStream / nsIOutputStream in JS, wrapper streams could be created to
+ * enforce the length and avoid closing, and consumers could use familiar stream
+ * utilities like NetUtil.asyncCopy.
+ *
+ * The function takes two async streams and copies a precise number of bytes
+ * from one to the other.  Copying begins immediately, but may complete at some
+ * future time depending on data size.  Use the returned promise to know when
+ * it's complete.
+ *
+ * @param input nsIAsyncInputStream
+ *        The stream to copy from.
+ * @param output nsIAsyncOutputStream
+ *        The stream to copy to.
+ * @param length Integer
+ *        The amount of data that needs to be copied.
+ * @return Promise
+ *         The promise is resolved when copying completes or rejected if any
+ *         (unexpected) errors occur.
+ */
+function copyStream(input, output, length) {
+  let copier = new StreamCopier(input, output, length);
+  return copier.copy();
+}
+
+function StreamCopier(input, output, length) {
+  EventEmitter.decorate(this);
+  this._id = StreamCopier._nextId++;
+  this.input = input;
+  // Save off the base output stream, since we know it's async as we've required
+  this.baseAsyncOutput = output;
+  if (IOUtil.outputStreamIsBuffered(output)) {
+    this.output = output;
+  } else {
+    this.output = Cc["@mozilla.org/network/buffered-output-stream;1"]
+                  .createInstance(Ci.nsIBufferedOutputStream);
+    this.output.init(output, BUFFER_SIZE);
+  }
+  this._length = length;
+  this._amountLeft = length;
+  this._deferred = {
+    promise: new Promise((resolve, reject) => {
+      this._deferred.resolve = resolve;
+      this._deferred.reject = reject;
+    })
+  };
+
+  this._copy = this._copy.bind(this);
+  this._flush = this._flush.bind(this);
+  this._destroy = this._destroy.bind(this);
+
+  // Copy promise's then method up to this object.
+  // Allows the copier to offer a promise interface for the simple succeed or
+  // fail scenarios, but also emit events (due to the EventEmitter) for other
+  // states, like progress.
+  this.then = this._deferred.promise.then.bind(this._deferred.promise);
+  this.then(this._destroy, this._destroy);
+
+  // Stream ready callback starts as |_copy|, but may switch to |_flush| at end
+  // if flushing would block the output stream.
+  this._streamReadyCallback = this._copy;
+}
+StreamCopier._nextId = 0;
+
+StreamCopier.prototype = {
+
+  copy: function () {
+    // Dispatch to the next tick so that it's possible to attach a progress
+    // event listener, even for extremely fast copies (like when testing).
+    Services.tm.currentThread.dispatch(() => {
+      try {
+        this._copy();
+      } catch (e) {
+        this._deferred.reject(e);
+      }
+    }, 0);
+    return this;
+  },
+
+  _copy: function () {
+    let bytesAvailable = this.input.available();
+    let amountToCopy = Math.min(bytesAvailable, this._amountLeft);
+    this._debug("Trying to copy: " + amountToCopy);
+
+    let bytesCopied;
+    try {
+      bytesCopied = this.output.writeFrom(this.input, amountToCopy);
+    } catch (e) {
+      if (e.result == Cr.NS_BASE_STREAM_WOULD_BLOCK) {
+        this._debug("Base stream would block, will retry");
+        this._debug("Waiting for output stream");
+        this.baseAsyncOutput.asyncWait(this, 0, 0, Services.tm.currentThread);
+        return;
+      }
+      throw e;
+    }
+
+    this._amountLeft -= bytesCopied;
+    this._debug("Copied: " + bytesCopied +
+                ", Left: " + this._amountLeft);
+    this._emitProgress();
+
+    if (this._amountLeft === 0) {
+      this._debug("Copy done!");
+      this._flush();
+      return;
+    }
+
+    this._debug("Waiting for input stream");
+    this.input.asyncWait(this, 0, 0, Services.tm.currentThread);
+  },
+
+  _emitProgress: function () {
+    this.emit("progress", {
+      bytesSent: this._length - this._amountLeft,
+      totalBytes: this._length
+    });
+  },
+
+  _flush: function () {
+    try {
+      this.output.flush();
+    } catch (e) {
+      if (e.result == Cr.NS_BASE_STREAM_WOULD_BLOCK ||
+          e.result == Cr.NS_ERROR_FAILURE) {
+        this._debug("Flush would block, will retry");
+        this._streamReadyCallback = this._flush;
+        this._debug("Waiting for output stream");
+        this.baseAsyncOutput.asyncWait(this, 0, 0, Services.tm.currentThread);
+        return;
+      }
+      throw e;
+    }
+    this._deferred.resolve();
+  },
+
+  _destroy: function () {
+    this._destroy = null;
+    this._copy = null;
+    this._flush = null;
+    this.input = null;
+    this.output = null;
+  },
+
+  // nsIInputStreamCallback
+  onInputStreamReady: function () {
+    this._streamReadyCallback();
+  },
+
+  // nsIOutputStreamCallback
+  onOutputStreamReady: function () {
+    this._streamReadyCallback();
+  },
+
+  _debug: function (msg) {
+  }
+
+};
+
+/**
+ * Read from a stream, one byte at a time, up to the next |delimiter|
+ * character, but stopping if we've read |count| without finding it.  Reading
+ * also terminates early if there are less than |count| bytes available on the
+ * stream.  In that case, we only read as many bytes as the stream currently has
+ * to offer.
+ * TODO: This implementation could be removed if bug 984651 is fixed, which
+ *       provides a native version of the same idea.
+ * @param stream nsIInputStream
+ *        The input stream to read from.
+ * @param delimiter string
+ *        The character we're trying to find.
+ * @param count integer
+ *        The max number of characters to read while searching.
+ * @return string
+ *         The data collected.  If the delimiter was found, this string will
+ *         end with it.
+ */
+function delimitedRead(stream, delimiter, count) {
+  let scriptableStream;
+  if (stream instanceof Ci.nsIScriptableInputStream) {
+    scriptableStream = stream;
+  } else {
+    scriptableStream = new ScriptableInputStream(stream);
+  }
+
+  let data = "";
+
+  // Don't exceed what's available on the stream
+  count = Math.min(count, stream.available());
+
+  if (count <= 0) {
+    return data;
+  }
+
+  let char;
+  while (char !== delimiter && count > 0) {
+    char = scriptableStream.readBytes(1);
+    count--;
+    data += char;
+  }
+
+  return data;
+}
+
+const StreamUtils = {
+  copyStream,
+  delimitedRead
+};
+
new file mode 100644
--- /dev/null
+++ b/testing/marionette/transport.js
@@ -0,0 +1,896 @@
+/* This Source Code Form is subject to the terms of the Mozilla Public
+ * License, v. 2.0. If a copy of the MPL was not distributed with this
+ * file, You can obtain one at http://mozilla.org/MPL/2.0/. */
+
+"use strict";
+
+/* global Pipe, ScriptableInputStream, uneval */
+
+const {Constructor: CC, classes: Cc, interfaces: Ci, utils: Cu, results: Cr} = Components;
+
+Cu.import("resource://gre/modules/Services.jsm");
+Cu.import("resource://devtools/shared/event-emitter.js");
+Cu.import("chrome://marionette/content/stream-utils.js");
+const { Packet, JSONPacket, BulkPacket } =
+  Cu.import("chrome://marionette/content/packets.js");
+const defer = function () {
+  let deferred = {
+    promise: new Promise((resolve, reject) => {
+      deferred.resolve = resolve;
+      deferred.reject = reject;
+    })
+  };
+  return deferred;
+};
+const executeSoon = function (func) {
+  Services.tm.mainThread.dispatch(func, Ci.nsIThread.DISPATCH_NORMAL);
+};
+const flags = { wantVerbose: false, wantLogging: false };
+
+const dumpv =
+  flags.wantVerbose ?
+  function (msg) {dump(msg + "\n");} :
+  function () {};
+
+const Pipe = CC("@mozilla.org/pipe;1", "nsIPipe", "init");
+
+const ScriptableInputStream = CC("@mozilla.org/scriptableinputstream;1",
+  "nsIScriptableInputStream", "init");
+
+this.EXPORTED_SYMBOLS = ["DebuggerTransport"];
+
+const PACKET_HEADER_MAX = 200;
+
+/**
+ * An adapter that handles data transfers between the debugger client and
+ * server. It can work with both nsIPipe and nsIServerSocket transports so
+ * long as the properly created input and output streams are specified.
+ * (However, for intra-process connections, LocalDebuggerTransport, below,
+ * is more efficient than using an nsIPipe pair with DebuggerTransport.)
+ *
+ * @param input nsIAsyncInputStream
+ *        The input stream.
+ * @param output nsIAsyncOutputStream
+ *        The output stream.
+ *
+ * Given a DebuggerTransport instance dt:
+ * 1) Set dt.hooks to a packet handler object (described below).
+ * 2) Call dt.ready() to begin watching for input packets.
+ * 3) Call dt.send() / dt.startBulkSend() to send packets.
+ * 4) Call dt.close() to close the connection, and disengage from the event
+ *    loop.
+ *
+ * A packet handler is an object with the following methods:
+ *
+ * - onPacket(packet) - called when we have received a complete packet.
+ *   |packet| is the parsed form of the packet --- a JavaScript value, not
+ *   a JSON-syntax string.
+ *
+ * - onBulkPacket(packet) - called when we have switched to bulk packet
+ *   receiving mode. |packet| is an object containing:
+ *   * actor:  Name of actor that will receive the packet
+ *   * type:   Name of actor's method that should be called on receipt
+ *   * length: Size of the data to be read
+ *   * stream: This input stream should only be used directly if you can ensure
+ *             that you will read exactly |length| bytes and will not close the
+ *             stream when reading is complete
+ *   * done:   If you use the stream directly (instead of |copyTo| below), you
+ *             must signal completion by resolving / rejecting this deferred.
+ *             If it's rejected, the transport will be closed.  If an Error is
+ *             supplied as a rejection value, it will be logged via |dump|.
+ *             If you do use |copyTo|, resolving is taken care of for you when
+ *             copying completes.
+ *   * copyTo: A helper function for getting your data out of the stream that
+ *             meets the stream handling requirements above, and has the
+ *             following signature:
+ *     @param  output nsIAsyncOutputStream
+ *             The stream to copy to.
+ *     @return Promise
+ *             The promise is resolved when copying completes or rejected if any
+ *             (unexpected) errors occur.
+ *             This object also emits "progress" events for each chunk that is
+ *             copied.  See stream-utils.js.
+ *
+ * - onClosed(reason) - called when the connection is closed. |reason| is
+ *   an optional nsresult or object, typically passed when the transport is
+ *   closed due to some error in a underlying stream.
+ *
+ * See ./packets.js and the Remote Debugging Protocol specification for more
+ * details on the format of these packets.
+ */
+function DebuggerTransport(input, output) {
+  EventEmitter.decorate(this);
+
+  this._input = input;
+  this._scriptableInput = new ScriptableInputStream(input);
+  this._output = output;
+
+  // The current incoming (possibly partial) header, which will determine which
+  // type of Packet |_incoming| below will become.
+  this._incomingHeader = "";
+  // The current incoming Packet object
+  this._incoming = null;
+  // A queue of outgoing Packet objects
+  this._outgoing = [];
+
+  this.hooks = null;
+  this.active = false;
+
+  this._incomingEnabled = true;
+  this._outgoingEnabled = true;
+
+  this.close = this.close.bind(this);
+}
+
+DebuggerTransport.prototype = {
+  /**
+   * Transmit an object as a JSON packet.
+   *
+   * This method returns immediately, without waiting for the entire
+   * packet to be transmitted, registering event handlers as needed to
+   * transmit the entire packet. Packets are transmitted in the order
+   * they are passed to this method.
+   */
+  send: function (object) {
+    this.emit("send", object);
+
+    let packet = new JSONPacket(this);
+    packet.object = object;
+    this._outgoing.push(packet);
+    this._flushOutgoing();
+  },
+
+  /**
+   * Transmit streaming data via a bulk packet.
+   *
+   * This method initiates the bulk send process by queuing up the header data.
+   * The caller receives eventual access to a stream for writing.
+   *
+   * N.B.: Do *not* attempt to close the stream handed to you, as it will
+   * continue to be used by this transport afterwards.  Most users should
+   * instead use the provided |copyFrom| function instead.
+   *
+   * @param header Object
+   *        This is modeled after the format of JSON packets above, but does not
+   *        actually contain the data, but is instead just a routing header:
+   *          * actor:  Name of actor that will receive the packet
+   *          * type:   Name of actor's method that should be called on receipt
+   *          * length: Size of the data to be sent
+   * @return Promise
+   *         The promise will be resolved when you are allowed to write to the
+   *         stream with an object containing:
+   *           * stream:   This output stream should only be used directly if
+   *                       you can ensure that you will write exactly |length|
+   *                       bytes and will not close the stream when writing is
+   *                       complete
+   *           * done:     If you use the stream directly (instead of |copyFrom|
+   *                       below), you must signal completion by resolving /
+   *                       rejecting this deferred.  If it's rejected, the
+   *                       transport will be closed.  If an Error is supplied as
+   *                       a rejection value, it will be logged via |dump|.  If
+   *                       you do use |copyFrom|, resolving is taken care of for
+   *                       you when copying completes.
+   *           * copyFrom: A helper function for getting your data onto the
+   *                       stream that meets the stream handling requirements
+   *                       above, and has the following signature:
+   *             @param  input nsIAsyncInputStream
+   *                     The stream to copy from.
+   *             @return Promise
+   *                     The promise is resolved when copying completes or
+   *                     rejected if any (unexpected) errors occur.
+   *                     This object also emits "progress" events for each chunk
+   *                     that is copied.  See stream-utils.js.
+   */
+  startBulkSend: function (header) {
+    this.emit("startbulksend", header);
+
+    let packet = new BulkPacket(this);
+    packet.header = header;
+    this._outgoing.push(packet);
+    this._flushOutgoing();
+    return packet.streamReadyForWriting;
+  },
+
+  /**
+   * Close the transport.
+   * @param reason nsresult / object (optional)
+   *        The status code or error message that corresponds to the reason for
+   *        closing the transport (likely because a stream closed or failed).
+   */
+  close: function (reason) {
+    this.emit("close", reason);
+
+    this.active = false;
+    this._input.close();
+    this._scriptableInput.close();
+    this._output.close();
+    this._destroyIncoming();
+    this._destroyAllOutgoing();
+    if (this.hooks) {
+      this.hooks.onClosed(reason);
+      this.hooks = null;
+    }
+    if (reason) {
+      dumpv("Transport closed: " + reason);
+    } else {
+      dumpv("Transport closed.");
+    }
+  },
+
+  /**
+   * The currently outgoing packet (at the top of the queue).
+   */
+  get _currentOutgoing() {
+    return this._outgoing[0];
+  },
+
+  /**
+   * Flush data to the outgoing stream.  Waits until the output stream notifies
+   * us that it is ready to be written to (via onOutputStreamReady).
+   */
+  _flushOutgoing: function () {
+    if (!this._outgoingEnabled || this._outgoing.length === 0) {
+      return;
+    }
+
+    // If the top of the packet queue has nothing more to send, remove it.
+    if (this._currentOutgoing.done) {
+      this._finishCurrentOutgoing();
+    }
+
+    if (this._outgoing.length > 0) {
+      let threadManager = Cc["@mozilla.org/thread-manager;1"].getService();
+      this._output.asyncWait(this, 0, 0, threadManager.currentThread);
+    }
+  },
+
+  /**
+   * Pause this transport's attempts to write to the output stream.  This is
+   * used when we've temporarily handed off our output stream for writing bulk
+   * data.
+   */
+  pauseOutgoing: function () {
+    this._outgoingEnabled = false;
+  },
+
+  /**
+   * Resume this transport's attempts to write to the output stream.
+   */
+  resumeOutgoing: function () {
+    this._outgoingEnabled = true;
+    this._flushOutgoing();
+  },
+
+  // nsIOutputStreamCallback
+  /**
+   * This is called when the output stream is ready for more data to be written.
+   * The current outgoing packet will attempt to write some amount of data, but
+   * may not complete.
+   */
+  onOutputStreamReady: function (stream) {
+    if (!this._outgoingEnabled || this._outgoing.length === 0) {
+      return;
+    }
+
+    try {
+      this._currentOutgoing.write(stream);
+    } catch (e) {
+      if (e.result != Cr.NS_BASE_STREAM_WOULD_BLOCK) {
+        this.close(e.result);
+        return;
+      }
+      throw e;
+    }
+
+    this._flushOutgoing();
+  },
+
+  /**
+   * Remove the current outgoing packet from the queue upon completion.
+   */
+  _finishCurrentOutgoing: function () {
+    if (this._currentOutgoing) {
+      this._currentOutgoing.destroy();
+      this._outgoing.shift();
+    }
+  },
+
+  /**
+   * Clear the entire outgoing queue.
+   */
+  _destroyAllOutgoing: function () {
+    for (let packet of this._outgoing) {
+      packet.destroy();
+    }
+    this._outgoing = [];
+  },
+
+  /**
+   * Initialize the input stream for reading. Once this method has been called,
+   * we watch for packets on the input stream, and pass them to the appropriate
+   * handlers via this.hooks.
+   */
+  ready: function () {
+    this.active = true;
+    this._waitForIncoming();
+  },
+
+  /**
+   * Asks the input stream to notify us (via onInputStreamReady) when it is
+   * ready for reading.
+   */
+  _waitForIncoming: function () {
+    if (this._incomingEnabled) {
+      let threadManager = Cc["@mozilla.org/thread-manager;1"].getService();
+      this._input.asyncWait(this, 0, 0, threadManager.currentThread);
+    }
+  },
+
+  /**
+   * Pause this transport's attempts to read from the input stream.  This is
+   * used when we've temporarily handed off our input stream for reading bulk
+   * data.
+   */
+  pauseIncoming: function () {
+    this._incomingEnabled = false;
+  },
+
+  /**
+   * Resume this transport's attempts to read from the input stream.
+   */
+  resumeIncoming: function () {
+    this._incomingEnabled = true;
+    this._flushIncoming();
+    this._waitForIncoming();
+  },
+
+  // nsIInputStreamCallback
+  /**
+   * Called when the stream is either readable or closed.
+   */
+  onInputStreamReady: function (stream) {
+    try {
+      while (stream.available() && this._incomingEnabled &&
+             this._processIncoming(stream, stream.available())) {
+         // Loop until there is nothing more to process
+      }
+      this._waitForIncoming();
+    } catch (e) {
+      if (e.result != Cr.NS_BASE_STREAM_WOULD_BLOCK) {
+        this.close(e.result);
+      } else {
+        throw e;
+      }
+    }
+  },
+
+  /**
+   * Process the incoming data.  Will create a new currently incoming Packet if
+   * needed.  Tells the incoming Packet to read as much data as it can, but
+   * reading may not complete.  The Packet signals that its data is ready for
+   * delivery by calling one of this transport's _on*Ready methods (see
+   * ./packets.js and the _on*Ready methods below).
+   * @return boolean
+   *         Whether incoming stream processing should continue for any
+   *         remaining data.
+   */
+  _processIncoming: function (stream, count) {
+    dumpv("Data available: " + count);
+
+    if (!count) {
+      dumpv("Nothing to read, skipping");
+      return false;
+    }
+
+    try {
+      if (!this._incoming) {
+        dumpv("Creating a new packet from incoming");
+
+        if (!this._readHeader(stream)) {
+          // Not enough data to read packet type
+          return false;
+        }
+
+        // Attempt to create a new Packet by trying to parse each possible
+        // header pattern.
+        this._incoming = Packet.fromHeader(this._incomingHeader, this);
+        if (!this._incoming) {
+          throw new Error("No packet types for header: " +
+                        this._incomingHeader);
+        }
+      }
+
+      if (!this._incoming.done) {
+        // We have an incomplete packet, keep reading it.
+        dumpv("Existing packet incomplete, keep reading");
+        this._incoming.read(stream, this._scriptableInput);
+      }
+    } catch (e) {
+      let msg = "Error reading incoming packet: (" + e + " - " + e.stack + ")";
+      dump(msg + "\n");
+
+      // Now in an invalid state, shut down the transport.
+      this.close();
+      return false;
+    }
+
+    if (!this._incoming.done) {
+      // Still not complete, we'll wait for more data.
+      dumpv("Packet not done, wait for more");
+      return true;
+    }
+
+    // Ready for next packet
+    this._flushIncoming();
+    return true;
+  },
+
+  /**
+   * Read as far as we can into the incoming data, attempting to build up a
+   * complete packet header (which terminates with ":").  We'll only read up to
+   * PACKET_HEADER_MAX characters.
+   * @return boolean
+   *         True if we now have a complete header.
+   */
+  _readHeader: function () {
+    let amountToRead = PACKET_HEADER_MAX - this._incomingHeader.length;
+    this._incomingHeader +=
+    StreamUtils.delimitedRead(this._scriptableInput, ":", amountToRead);
+    if (flags.wantVerbose) {
+      dumpv("Header read: " + this._incomingHeader);
+    }
+
+    if (this._incomingHeader.endsWith(":")) {
+      if (flags.wantVerbose) {
+        dumpv("Found packet header successfully: " + this._incomingHeader);
+      }
+      return true;
+    }
+
+    if (this._incomingHeader.length >= PACKET_HEADER_MAX) {
+      throw new Error("Failed to parse packet header!");
+    }
+
+    // Not enough data yet.
+    return false;
+  },
+
+  /**
+   * If the incoming packet is done, log it as needed and clear the buffer.
+   */
+  _flushIncoming: function () {
+    if (!this._incoming.done) {
+      return;
+    }
+    if (flags.wantLogging) {
+      dumpv("Got: " + this._incoming);
+    }
+    this._destroyIncoming();
+  },
+
+  /**
+   * Handler triggered by an incoming JSONPacket completing it's |read| method.
+   * Delivers the packet to this.hooks.onPacket.
+   */
+  _onJSONObjectReady: function (object) {
+    executeSoon(() => {
+    // Ensure the transport is still alive by the time this runs.
+      if (this.active) {
+        this.emit("packet", object);
+        this.hooks.onPacket(object);
+      }
+    });
+  },
+
+  /**
+   * Handler triggered by an incoming BulkPacket entering the |read| phase for
+   * the stream portion of the packet.  Delivers info about the incoming
+   * streaming data to this.hooks.onBulkPacket.  See the main comment on the
+   * transport at the top of this file for more details.
+   */
+  _onBulkReadReady: function (...args) {
+    executeSoon(() => {
+    // Ensure the transport is still alive by the time this runs.
+      if (this.active) {
+        this.emit("bulkpacket", ...args);
+        this.hooks.onBulkPacket(...args);
+      }
+    });
+  },
+
+  /**
+   * Remove all handlers and references related to the current incoming packet,
+   * either because it is now complete or because the transport is closing.
+   */
+  _destroyIncoming: function () {
+    if (this._incoming) {
+      this._incoming.destroy();
+    }
+    this._incomingHeader = "";
+    this._incoming = null;
+  }
+
+};
+
+/**
+ * An adapter that handles data transfers between the debugger client and
+ * server when they both run in the same process. It presents the same API as
+ * DebuggerTransport, but instead of transmitting serialized messages across a
+ * connection it merely calls the packet dispatcher of the other side.
+ *
+ * @param other LocalDebuggerTransport
+ *        The other endpoint for this debugger connection.
+ *
+ * @see DebuggerTransport
+ */
+function LocalDebuggerTransport(other) {
+  EventEmitter.decorate(this);
+
+  this.other = other;
+  this.hooks = null;
+
+  // A packet number, shared between this and this.other. This isn't used by the
+  // protocol at all, but it makes the packet traces a lot easier to follow.
+  this._serial = this.other ? this.other._serial : { count: 0 };
+  this.close = this.close.bind(this);
+}
+
+LocalDebuggerTransport.prototype = {
+  /**
+   * Transmit a message by directly calling the onPacket handler of the other
+   * endpoint.
+   */
+  send: function (packet) {
+    this.emit("send", packet);
+
+    let serial = this._serial.count++;
+    if (flags.wantLogging) {
+      // Check 'from' first, as 'echo' packets have both.
+      if (packet.from) {
+        dumpv("Packet " + serial + " sent from " + uneval(packet.from));
+      } else if (packet.to) {
+        dumpv("Packet " + serial + " sent to " + uneval(packet.to));
+      }
+    }
+    this._deepFreeze(packet);
+    let other = this.other;
+    if (other) {
+      executeSoon(() => {
+        // Avoid the cost of JSON.stringify() when logging is disabled.
+        if (flags.wantLogging) {
+          dumpv("Received packet " + serial + ": " + JSON.stringify(packet, null, 2));
+        }
+        if (other.hooks) {
+          other.emit("packet", packet);
+          other.hooks.onPacket(packet);
+        }
+      });
+    }
+  },
+
+  /**
+   * Send a streaming bulk packet directly to the onBulkPacket handler of the
+   * other endpoint.
+   *
+   * This case is much simpler than the full DebuggerTransport, since there is
+   * no primary stream we have to worry about managing while we hand it off to
+   * others temporarily.  Instead, we can just make a single use pipe and be
+   * done with it.
+   */
+  startBulkSend: function ({actor, type, length}) {
+    this.emit("startbulksend", {actor, type, length});
+
+    let serial = this._serial.count++;
+
+    dumpv("Sent bulk packet " + serial + " for actor " + actor);
+    if (!this.other) {
+      let error = new Error("startBulkSend: other side of transport missing");
+      return Promise.reject(error);
+    }
+
+    let pipe = new Pipe(true, true, 0, 0, null);
+
+    executeSoon(() => {
+      dumpv("Received bulk packet " + serial);
+      if (!this.other.hooks) {
+        return;
+      }
+
+      // Receiver
+      let deferred = defer();
+      let packet = {
+        actor: actor,
+        type: type,
+        length: length,
+        copyTo: (output) => {
+          let copying =
+          StreamUtils.copyStream(pipe.inputStream, output, length);
+          deferred.resolve(copying);
+          return copying;
+        },
+        stream: pipe.inputStream,
+        done: deferred
+      };
+
+      this.other.emit("bulkpacket", packet);
+      this.other.hooks.onBulkPacket(packet);
+
+      // Await the result of reading from the stream
+      deferred.promise.then(() => pipe.inputStream.close(), this.close);
+    });
+
+    // Sender
+    let sendDeferred = defer();
+
+    // The remote transport is not capable of resolving immediately here, so we
+    // shouldn't be able to either.
+    executeSoon(() => {
+      let copyDeferred = defer();
+
+      sendDeferred.resolve({
+        copyFrom: (input) => {
+          let copying =
+          StreamUtils.copyStream(input, pipe.outputStream, length);
+          copyDeferred.resolve(copying);
+          return copying;
+        },
+        stream: pipe.outputStream,
+        done: copyDeferred
+      });
+
+      // Await the result of writing to the stream
+      copyDeferred.promise.then(() => pipe.outputStream.close(), this.close);
+    });
+
+    return sendDeferred.promise;
+  },
+
+  /**
+   * Close the transport.
+   */
+  close: function () {
+    this.emit("close");
+
+    if (this.other) {
+      // Remove the reference to the other endpoint before calling close(), to
+      // avoid infinite recursion.
+      let other = this.other;
+      this.other = null;
+      other.close();
+    }
+    if (this.hooks) {
+      try {
+        this.hooks.onClosed();
+      } catch (ex) {
+        console.error(ex);
+      }
+      this.hooks = null;
+    }
+  },
+
+  /**
+   * An empty method for emulating the DebuggerTransport API.
+   */
+  ready: function () {},
+
+  /**
+   * Helper function that makes an object fully immutable.
+   */
+  _deepFreeze: function (object) {
+    Object.freeze(object);
+    for (let prop in object) {
+      // Freeze the properties that are objects, not on the prototype, and not
+      // already frozen. Note that this might leave an unfrozen reference
+      // somewhere in the object if there is an already frozen object containing
+      // an unfrozen object.
+      if (object.hasOwnProperty(prop) && typeof object === "object" &&
+          !Object.isFrozen(object)) {
+        this._deepFreeze(object[prop]);
+      }
+    }
+  }
+};
+
+/**
+ * A transport for the debugging protocol that uses nsIMessageManagers to
+ * exchange packets with servers running in child processes.
+ *
+ * In the parent process, |mm| should be the nsIMessageSender for the
+ * child process. In a child process, |mm| should be the child process
+ * message manager, which sends packets to the parent.
+ *
+ * |prefix| is a string included in the message names, to distinguish
+ * multiple servers running in the same child process.
+ *
+ * This transport exchanges messages named 'debug:<prefix>:packet', where
+ * <prefix> is |prefix|, whose data is the protocol packet.
+ */
+function ChildDebuggerTransport(mm, prefix) {
+  EventEmitter.decorate(this);
+
+  this._mm = mm;
+  this._messageName = "debug:" + prefix + ":packet";
+}
+
+/*
+ * To avoid confusion, we use 'message' to mean something that
+ * nsIMessageSender conveys, and 'packet' to mean a remote debugging
+ * protocol packet.
+ */
+ChildDebuggerTransport.prototype = {
+  constructor: ChildDebuggerTransport,
+
+  hooks: null,
+
+  _addListener() {
+    this._mm.addMessageListener(this._messageName, this);
+  },
+
+  _removeListener() {
+    try {
+      this._mm.removeMessageListener(this._messageName, this);
+    } catch (e) {
+      if (e.result != Cr.NS_ERROR_NULL_POINTER) {
+        throw e;
+      }
+      // In some cases, especially when using messageManagers in non-e10s mode, we reach
+      // this point with a dead messageManager which only throws errors but does not
+      // seem to indicate in any other way that it is dead.
+    }
+  },
+
+  ready: function () {
+    this._addListener();
+  },
+
+  close: function () {
+    this._removeListener();
+    this.emit("close");
+    this.hooks.onClosed();
+  },
+
+  receiveMessage: function ({data}) {
+    this.emit("packet", data);
+    this.hooks.onPacket(data);
+  },
+
+  send: function (packet) {
+    this.emit("send", packet);
+    try {
+      this._mm.sendAsyncMessage(this._messageName, packet);
+    } catch (e) {
+      if (e.result != Cr.NS_ERROR_NULL_POINTER) {
+        throw e;
+      }
+      // In some cases, especially when using messageManagers in non-e10s mode, we reach
+      // this point with a dead messageManager which only throws errors but does not
+      // seem to indicate in any other way that it is dead.
+    }
+  },
+
+  startBulkSend: function () {
+    throw new Error("Can't send bulk data to child processes.");
+  },
+
+  swapBrowser(mm) {
+    this._removeListener();
+    this._mm = mm;
+    this._addListener();
+  },
+};
+
+// WorkerDebuggerTransport is defined differently depending on whether we are
+// on the main thread or a worker thread. In the former case, we are required
+// by the devtools loader, and isWorker will be false. Otherwise, we are
+// required by the worker loader, and isWorker will be true.
+//
+// Each worker debugger supports only a single connection to the main thread.
+// However, its theoretically possible for multiple servers to connect to the
+// same worker. Consequently, each transport has a connection id, to allow
+// messages from multiple connections to be multiplexed on a single channel.
+
+if (!this.isWorker) {
+  // Main thread
+  (function () {
+    /**
+     * A transport that uses a WorkerDebugger to send packets from the main
+     * thread to a worker thread.
+     */
+    function WorkerDebuggerTransport(dbg, id) {
+      this._dbg = dbg;
+      this._id = id;
+      this.onMessage = this._onMessage.bind(this);
+    }
+
+    WorkerDebuggerTransport.prototype = {
+      constructor: WorkerDebuggerTransport,
+
+      ready: function () {
+        this._dbg.addListener(this);
+      },
+
+      close: function () {
+        this._dbg.removeListener(this);
+        if (this.hooks) {
+          this.hooks.onClosed();
+        }
+      },
+
+      send: function (packet) {
+        this._dbg.postMessage(JSON.stringify({
+          type: "message",
+          id: this._id,
+          message: packet
+        }));
+      },
+
+      startBulkSend: function () {
+        throw new Error("Can't send bulk data from worker threads!");
+      },
+
+      _onMessage: function (message) {
+        let packet = JSON.parse(message);
+        if (packet.type !== "message" || packet.id !== this._id) {
+          return;
+        }
+
+        if (this.hooks) {
+          this.hooks.onPacket(packet.message);
+        }
+      }
+    };
+
+  }).call(this);
+} else {
+  // Worker thread
+  (function () {
+    /**
+     * A transport that uses a WorkerDebuggerGlobalScope to send packets from a
+     * worker thread to the main thread.
+     */
+    function WorkerDebuggerTransport(scope, id) {
+      this._scope = scope;
+      this._id = id;
+      this._onMessage = this._onMessage.bind(this);
+    }
+
+    WorkerDebuggerTransport.prototype = {
+      constructor: WorkerDebuggerTransport,
+
+      ready: function () {
+        this._scope.addEventListener("message", this._onMessage);
+      },
+
+      close: function () {
+        this._scope.removeEventListener("message", this._onMessage);
+        if (this.hooks) {
+          this.hooks.onClosed();
+        }
+      },
+
+      send: function (packet) {
+        this._scope.postMessage(JSON.stringify({
+          type: "message",
+          id: this._id,
+          message: packet
+        }));
+      },
+
+      startBulkSend: function () {
+        throw new Error("Can't send bulk data from worker threads!");
+      },
+
+      _onMessage: function (event) {
+        let packet = JSON.parse(event.data);
+        if (packet.type !== "message" || packet.id !== this._id) {
+          return;
+        }
+
+        if (this.hooks) {
+          this.hooks.onPacket(packet.message);
+        }
+      }
+    };
+
+  }).call(this);
+}