--- a/devtools/shared/webconsole/moz.build
+++ b/devtools/shared/webconsole/moz.build
@@ -10,11 +10,12 @@ if CONFIG['OS_TARGET'] != 'Android':
DevToolsModules(
'client.js',
'js-property-provider.js',
'network-helper.js',
'network-monitor.js',
'server-logger-monitor.js',
'server-logger.js',
+ 'throttle.js',
'utils.js',
'worker-utils.js',
)
--- a/devtools/shared/webconsole/network-monitor.js
+++ b/devtools/shared/webconsole/network-monitor.js
@@ -13,16 +13,17 @@ const { XPCOMUtils } = require("resource
loader.lazyRequireGetter(this, "NetworkHelper",
"devtools/shared/webconsole/network-helper");
loader.lazyRequireGetter(this, "DevToolsUtils",
"devtools/shared/DevToolsUtils");
loader.lazyImporter(this, "NetUtil", "resource://gre/modules/NetUtil.jsm");
loader.lazyServiceGetter(this, "gActivityDistributor",
"@mozilla.org/network/http-activity-distributor;1",
"nsIHttpActivityDistributor");
+const {NetworkThrottleManager} = require("devtools/shared/webconsole/throttle");
// /////////////////////////////////////////////////////////////////////////////
// Network logging
// /////////////////////////////////////////////////////////////////////////////
// The maximum uint32 value.
const PR_UINT32_MAX = 4294967295;
@@ -662,17 +663,21 @@ NetworkResponseListener.prototype = {
*/
function NetworkMonitor(filters, owner) {
this.filters = filters;
this.owner = owner;
this.openRequests = {};
this.openResponses = {};
this._httpResponseExaminer =
DevToolsUtils.makeInfallible(this._httpResponseExaminer).bind(this);
+ this._httpModifyExaminer =
+ DevToolsUtils.makeInfallible(this._httpModifyExaminer).bind(this);
this._serviceWorkerRequest = this._serviceWorkerRequest.bind(this);
+ this.throttleData = null;
+ this._throttler = null;
}
exports.NetworkMonitor = NetworkMonitor;
NetworkMonitor.prototype = {
filters: null,
httpTransactionCodes: {
@@ -723,23 +728,32 @@ NetworkMonitor.prototype = {
this.interceptedChannels = new Set();
if (Services.appinfo.processType != Ci.nsIXULRuntime.PROCESS_TYPE_CONTENT) {
gActivityDistributor.addObserver(this);
Services.obs.addObserver(this._httpResponseExaminer,
"http-on-examine-response", false);
Services.obs.addObserver(this._httpResponseExaminer,
"http-on-examine-cached-response", false);
+ Services.obs.addObserver(this._httpModifyExaminer,
+ "http-on-modify-request", false);
}
// In child processes, only watch for service worker requests
// everything else only happens in the parent process
Services.obs.addObserver(this._serviceWorkerRequest,
"service-worker-synthesized-response", false);
},
+ _getThrottler: function () {
+ if (this.throttleData !== null && this._throttler === null) {
+ this._throttler = new NetworkThrottleManager(this.throttleData);
+ }
+ return this._throttler;
+ },
+
_serviceWorkerRequest: function (subject, topic, data) {
let channel = subject.QueryInterface(Ci.nsIHttpChannel);
if (!matchRequest(channel, this.filters)) {
return;
}
this.interceptedChannels.add(subject);
@@ -845,16 +859,34 @@ NetworkMonitor.prototype = {
// There also is never any timing events, so we can fire this
// event with zeroed out values.
let timings = this._setupHarTimings(httpActivity, true);
httpActivity.owner.addEventTimings(timings.total, timings.timings);
}
},
/**
+ * Observe notifications for the http-on-modify-request topic, coming from
+ * the nsIObserverService.
+ *
+ * @private
+ * @param nsIHttpChannel aSubject
+ * @returns void
+ */
+ _httpModifyExaminer: function (subject) {
+ let throttler = this._getThrottler();
+ if (throttler) {
+ let channel = subject.QueryInterface(Ci.nsIHttpChannel);
+ if (matchRequest(channel, this.filters)) {
+ throttler.manageUpload(channel);
+ }
+ }
+ },
+
+ /**
* Begin observing HTTP traffic that originates inside the current tab.
*
* @see https://developer.mozilla.org/en/XPCOM_Interface_Reference/nsIHttpActivityObserver
*
* @param nsIHttpChannel channel
* @param number activityType
* @param number activitySubtype
* @param number timestamp
@@ -1014,17 +1046,17 @@ NetworkMonitor.prototype = {
});
if (cookieHeader) {
cookies = NetworkHelper.parseCookieHeader(cookieHeader);
}
httpActivity.owner = this.owner.onNetworkEvent(event, channel);
- this._setupResponseListener(httpActivity);
+ this._setupResponseListener(httpActivity, fromCache);
httpActivity.owner.addRequestHeaders(headers, extraStringData);
httpActivity.owner.addRequestCookies(cookies);
this.openRequests[httpActivity.id] = httpActivity;
return httpActivity;
},
@@ -1084,20 +1116,27 @@ NetworkMonitor.prototype = {
/**
* Setup the network response listener for the given HTTP activity. The
* NetworkResponseListener is responsible for storing the response body.
*
* @private
* @param object httpActivity
* The HTTP activity object we are tracking.
*/
- _setupResponseListener: function (httpActivity) {
+ _setupResponseListener: function (httpActivity, fromCache) {
let channel = httpActivity.channel;
channel.QueryInterface(Ci.nsITraceableChannel);
+ if (!fromCache) {
+ let throttler = this._getThrottler();
+ if (throttler) {
+ httpActivity.downloadThrottle = throttler.manage(channel);
+ }
+ }
+
// The response will be written into the outputStream of this pipe.
// This allows us to buffer the data we are receiving and read it
// asynchronously.
// Both ends of the pipe must be blocking.
let sink = Cc["@mozilla.org/pipe;1"].createInstance(Ci.nsIPipe);
// The streams need to be blocking because this is required by the
// stream tee.
@@ -1321,26 +1360,29 @@ NetworkMonitor.prototype = {
*/
destroy: function () {
if (Services.appinfo.processType != Ci.nsIXULRuntime.PROCESS_TYPE_CONTENT) {
gActivityDistributor.removeObserver(this);
Services.obs.removeObserver(this._httpResponseExaminer,
"http-on-examine-response");
Services.obs.removeObserver(this._httpResponseExaminer,
"http-on-examine-cached-response");
+ Services.obs.removeObserver(this._httpModifyExaminer,
+ "http-on-modify-request", false);
}
Services.obs.removeObserver(this._serviceWorkerRequest,
"service-worker-synthesized-response");
this.interceptedChannels.clear();
this.openRequests = {};
this.openResponses = {};
this.owner = null;
this.filters = null;
+ this._throttler = null;
},
};
/**
* The NetworkMonitorChild is used to proxy all of the network activity of the
* child app process from the main process. The child WebConsoleActor creates an
* instance of this object.
*
new file mode 100644
--- /dev/null
+++ b/devtools/shared/webconsole/test/unit/test_throttle.js
@@ -0,0 +1,140 @@
+/* Any copyright is dedicated to the Public Domain.
+ http://creativecommons.org/publicdomain/zero/1.0/ */
+
+"use strict";
+
+const Cu = Components.utils;
+const Cc = Components.classes;
+const Ci = Components.interfaces;
+const { require } = Cu.import("resource://devtools/shared/Loader.jsm", {});
+const promise = require("promise");
+const { NetworkThrottleManager } =
+ require("devtools/shared/webconsole/throttle");
+const nsIScriptableInputStream = Ci.nsIScriptableInputStream;
+
+function TestStreamListener() {
+ this.state = "initial";
+}
+TestStreamListener.prototype = {
+ onStartRequest: function() {
+ this.setState("start");
+ },
+
+ onStopRequest: function() {
+ this.setState("stop");
+ },
+
+ onDataAvailable: function(request, context, inputStream, offset, count) {
+ const sin = Components.classes["@mozilla.org/scriptableinputstream;1"]
+ .createInstance(nsIScriptableInputStream);
+ sin.init(inputStream);
+ this.data = sin.read(count);
+ this.setState("data");
+ },
+
+ setState: function(state) {
+ this.state = state;
+ if (this._deferred) {
+ this._deferred.resolve(state);
+ this._deferred = null;
+ }
+ },
+
+ onStateChanged: function() {
+ if (!this._deferred) {
+ this._deferred = promise.defer();
+ }
+ return this._deferred.promise;
+ }
+};
+
+function TestChannel() {
+ this.state = "initial";
+ this.testListener = new TestStreamListener();
+ this._throttleQueue = null;
+}
+TestChannel.prototype = {
+ QueryInterface: function() {
+ return this;
+ },
+
+ get throttleQueue() {
+ return this._throttleQueue;
+ },
+
+ set throttleQueue(q) {
+ this._throttleQueue = q;
+ this.state = "throttled";
+ },
+
+ setNewListener: function(listener) {
+ this.listener = listener;
+ this.state = "listener";
+ return this.testListener;
+ },
+};
+
+add_task(function*() {
+ let throttler = new NetworkThrottleManager({
+ roundTripTimeMean: 1,
+ roundTripTimeMax: 1,
+ downloadBPSMean: 500,
+ downloadBPSMax: 500,
+ uploadBPSMean: 500,
+ uploadBPSMax: 500,
+ });
+
+ let uploadChannel = new TestChannel();
+ throttler.manageUpload(uploadChannel);
+ equal(uploadChannel.state, "throttled",
+ "NetworkThrottleManager set throttleQueue");
+
+ let downloadChannel = new TestChannel();
+ let testListener = downloadChannel.testListener;
+
+ let listener = throttler.manage(downloadChannel);
+ equal(downloadChannel.state, "listener",
+ "NetworkThrottleManager called setNewListener");
+
+ equal(testListener.state, "initial", "test listener in initial state");
+
+ // This method must be passed through immediately.
+ listener.onStartRequest(null, null);
+ equal(testListener.state, "start", "test listener started");
+
+ const TEST_INPUT = "hi bob";
+
+ let testStream = Cc["@mozilla.org/storagestream;1"]
+ .createInstance(Ci.nsIStorageStream);
+ testStream.init(512, 512);
+ let out = testStream.getOutputStream(0);
+ out.write(TEST_INPUT, TEST_INPUT.length);
+ out.close();
+ let testInputStream = testStream.newInputStream(0);
+
+ let activityDistributor =
+ Cc["@mozilla.org/network/http-activity-distributor;1"]
+ .getService(Ci.nsIHttpActivityDistributor);
+ let activitySeen = false;
+ listener.addActivityCallback(() => activitySeen = true, null, null, null,
+ activityDistributor
+ .ACTIVITY_SUBTYPE_RESPONSE_COMPLETE,
+ null, TEST_INPUT.length, null);
+
+ // onDataAvailable is required to immediately read the data.
+ listener.onDataAvailable(null, null, testInputStream, 0, 6);
+ equal(testInputStream.available(), 0, "no more data should be available");
+ equal(testListener.state, "start",
+ "test listener should not have received data");
+ equal(activitySeen, false, "activity not distributed yet");
+
+ let newState = yield testListener.onStateChanged();
+ equal(newState, "data", "test listener received data");
+ equal(testListener.data, TEST_INPUT, "test listener received all the data");
+ equal(activitySeen, true, "activity has been distributed");
+
+ let onChange = testListener.onStateChanged();
+ listener.onStopRequest(null, null, null);
+ newState = yield onChange;
+ equal(newState, "stop", "onStateChanged reported");
+});
--- a/devtools/shared/webconsole/test/unit/xpcshell.ini
+++ b/devtools/shared/webconsole/test/unit/xpcshell.ini
@@ -9,8 +9,9 @@ support-files =
[test_js_property_provider.js]
[test_network_helper.js]
[test_security-info-certificate.js]
[test_security-info-parser.js]
[test_security-info-protocol-version.js]
[test_security-info-state.js]
[test_security-info-static-hpkp.js]
[test_security-info-weakness-reasons.js]
+[test_throttle.js]
new file mode 100644
--- /dev/null
+++ b/devtools/shared/webconsole/throttle.js
@@ -0,0 +1,325 @@
+/* -*- indent-tabs-mode: nil; js-indent-level: 2 -*- */
+/* vim: set ft= javascript ts=2 et sw=2 tw=80: */
+/* This Source Code Form is subject to the terms of the Mozilla Public
+ * License, v. 2.0. If a copy of the MPL was not distributed with this
+ * file, You can obtain one at http://mozilla.org/MPL/2.0/. */
+
+"use strict";
+
+const {CC, Ci, Cu, Cc} = require("chrome");
+
+const ArrayBufferInputStream = CC("@mozilla.org/io/arraybuffer-input-stream;1",
+ "nsIArrayBufferInputStream");
+const BinaryInputStream = CC("@mozilla.org/binaryinputstream;1",
+ "nsIBinaryInputStream", "setInputStream");
+
+const {XPCOMUtils} = require("resource://gre/modules/XPCOMUtils.jsm");
+const {setTimeout} = Cu.import("resource://gre/modules/Timer.jsm", {});
+
+/**
+ * Construct a new nsIStreamListener that buffers data and provides a
+ * method to notify another listener when data is available. This is
+ * used to throttle network data on a per-channel basis.
+ *
+ * After construction, @see setOriginalListener must be called on the
+ * new object.
+ *
+ * @param {NetworkThrottleQueue} queue the NetworkThrottleQueue to
+ * which status changes should be reported
+ */
+function NetworkThrottleListener(queue) {
+ this.queue = queue;
+ this.pendingData = [];
+ this.pendingException = null;
+ this.offset = 0;
+}
+
+NetworkThrottleListener.prototype = {
+ QueryInterface:
+ XPCOMUtils.generateQI([Ci.nsIStreamListener, Ci.nsIInterfaceRequestor,
+ Ci.nsISupports]),
+
+ /**
+ * Set the original listener for this object. The original listener
+ * will receive requests from this object when the queue allows data
+ * through.
+ *
+ * @param {nsIStreamListener} originalListener the original listener
+ * for the channel, to which all requests will be sent
+ */
+ setOriginalListener: function (originalListener) {
+ this.originalListener = originalListener;
+ },
+
+ /**
+ * @see nsIStreamListener.onStartRequest.
+ */
+ onStartRequest: function (request, context) {
+ this.originalListener.onStartRequest(request, context);
+ this.queue.start(this);
+ },
+
+ /**
+ * @see nsIStreamListener.onStopRequest.
+ */
+ onStopRequest: function (request, context, statusCode) {
+ this.pendingData.push({request, context, statusCode});
+ this.queue.dataAvailable(this);
+ },
+
+ /**
+ * @see nsIStreamListener.onDataAvailable.
+ */
+ onDataAvailable: function (request, context, inputStream, offset, count) {
+ if (this.pendingException) {
+ throw this.pendingException;
+ }
+
+ const bin = new BinaryInputStream(inputStream);
+ const bytes = new ArrayBuffer(count);
+ bin.readArrayBuffer(count, bytes);
+
+ const stream = new ArrayBufferInputStream();
+ stream.setData(bytes, 0, count);
+
+ this.pendingData.push({request, context, stream, count});
+ this.queue.dataAvailable(this);
+ },
+
+ /**
+ * Allow some buffered data from this object to be forwarded to this
+ * object's originalListener.
+ *
+ * @param {Number} bytesPermitted The maximum number of bytes
+ * permitted to be sent.
+ * @return {Object} an object of the form {length, done}, where
+ * |length| is the number of bytes actually forwarded, and
+ * |done| is a boolean indicating whether this particular
+ * request has been completed. (A NetworkThrottleListener
+ * may be queued multiple times, so this does not mean that
+ * all available data has been sent.)
+ */
+ sendSomeData: function (bytesPermitted) {
+ if (this.pendingData.length === 0) {
+ // Shouldn't happen.
+ return {length: 0, done: true};
+ }
+
+ const {request, context, stream, count, statusCode} = this.pendingData[0];
+
+ if (statusCode !== undefined) {
+ this.pendingData.shift();
+ this.originalListener.onStopRequest(request, context, statusCode);
+ return {length: 0, done: true};
+ }
+
+ if (bytesPermitted > count) {
+ bytesPermitted = count;
+ }
+
+ try {
+ this.originalListener.onDataAvailable(request, context, stream,
+ this.offset, bytesPermitted);
+ } catch (e) {
+ this.pendingException = e;
+ }
+
+ let done = false;
+ if (bytesPermitted === count) {
+ this.pendingData.shift();
+ done = true;
+ } else {
+ this.pendingData[0].count -= bytesPermitted;
+ }
+
+ this.offset += bytesPermitted;
+ return {length: bytesPermitted, done};
+ },
+
+ /**
+ * Return the number of pending data requests available for this
+ * listener.
+ */
+ pendingCount: function () {
+ return this.pendingData.length;
+ },
+};
+
+/**
+ * Construct a new queue that can be used to throttle the network for
+ * a group of related network requests.
+ *
+ * meanBPS {Number} Mean bytes per second.
+ * maxBPS {Number} Maximum bytes per second.
+ * roundTripTimeMean {Number} Mean round trip time in milliseconds.
+ * roundTripTimeMax {Number} Maximum round trip time in milliseconds.
+ */
+function NetworkThrottleQueue(meanBPS, maxBPS,
+ roundTripTimeMean, roundTripTimeMax) {
+ this.meanBPS = meanBPS;
+ this.maxBPS = maxBPS;
+ this.roundTripTimeMean = roundTripTimeMean;
+ this.roundTripTimeMax = roundTripTimeMax;
+
+ this.pendingRequests = new Set();
+ this.downloadQueue = [];
+ this.previousReads = [];
+
+ this.pumping = false;
+}
+
+NetworkThrottleQueue.prototype = {
+ /**
+ * A helper function that, given a mean and a maximum, returns a
+ * random integer between (mean - (max - mean)) and max.
+ */
+ random: function (mean, max) {
+ return mean - (max - mean) + Math.floor(2 * (max - mean) * Math.random());
+ },
+
+ /**
+ * A helper function that lets the indicating listener start sending
+ * data. This is called after the initial round trip time for the
+ * listener has elapsed.
+ */
+ allowDataFrom: function (throttleListener) {
+ this.pendingRequests.delete(throttleListener);
+ const count = throttleListener.pendingCount();
+ for (let i = 0; i < count; ++i) {
+ this.downloadQueue.push(throttleListener);
+ }
+ this.pump();
+ },
+
+ /**
+ * Notice a new listener object. This is called by the
+ * NetworkThrottleListener when the request has started. Initially
+ * a new listener object is put into a "pending" state, until the
+ * round-trip time has elapsed. This is used to simulate latency.
+ *
+ * @param {NetworkThrottleListener} throttleListener the new listener
+ */
+ start: function (throttleListener) {
+ this.pendingRequests.add(throttleListener);
+ let delay = this.random(this.roundTripTimeMean, this.roundTripTimeMax);
+ if (delay > 0) {
+ setTimeout(() => this.allowDataFrom(throttleListener), delay);
+ } else {
+ this.allowDataFrom(throttleListener);
+ }
+ },
+
+ /**
+ * Note that new data is available for a given listener. Each time
+ * data is available, the listener will be re-queued.
+ *
+ * @param {NetworkThrottleListener} throttleListener the listener
+ * which has data available.
+ */
+ dataAvailable: function (throttleListener) {
+ if (!this.pendingRequests.has(throttleListener)) {
+ this.downloadQueue.push(throttleListener);
+ this.pump();
+ }
+ },
+
+ /**
+ * An internal function that permits individual listeners to send
+ * data.
+ */
+ pump: function () {
+ // A redirect will cause two NetworkThrottleListeners to be on a
+ // listener chain. In this case, we might recursively call into
+ // this method. Avoid infinite recursion here.
+ if (this.pumping) {
+ return;
+ }
+ this.pumping = true;
+
+ const now = Date.now();
+ const oneSecondAgo = now - 1000;
+
+ while (this.previousReads.length &&
+ this.previousReads[0].when < oneSecondAgo) {
+ this.previousReads.shift();
+ }
+
+ const totalBytes = this.previousReads.reduce((sum, elt) => {
+ return sum + elt.numBytes;
+ }, 0);
+
+ let thisSliceBytes = this.random(this.meanBPS, this.maxBPS);
+ if (totalBytes < thisSliceBytes) {
+ thisSliceBytes -= totalBytes;
+ let readThisTime = 0;
+ while (thisSliceBytes > 0 && this.downloadQueue.length) {
+ let {length, done} = this.downloadQueue[0].sendSomeData(thisSliceBytes);
+ thisSliceBytes -= length;
+ readThisTime += length;
+ if (done) {
+ this.downloadQueue.shift();
+ }
+ }
+ this.previousReads.push({when: now, numBytes: readThisTime});
+ }
+
+ // If there is more data to download, then schedule ourselves for
+ // one second after the oldest previous read.
+ if (this.downloadQueue.length) {
+ const when = this.previousReads[0].when + 1000;
+ setTimeout(this.pump.bind(this), when - now);
+ }
+
+ this.pumping = false;
+ },
+};
+
+/**
+ * Construct a new object that can be used to throttle the network for
+ * a group of related network requests.
+ *
+ * @param {Object} An object with the following attributes:
+ * roundTripTimeMean {Number} Mean round trip time in milliseconds.
+ * roundTripTimeMax {Number} Maximum round trip time in milliseconds.
+ * downloadBPSMean {Number} Mean bytes per second for downloads.
+ * downloadBPSMax {Number} Maximum bytes per second for downloads.
+ * uploadBPSMean {Number} Mean bytes per second for uploads.
+ * uploadBPSMax {Number} Maximum bytes per second for uploads.
+ */
+function NetworkThrottleManager({roundTripTimeMean, roundTripTimeMax,
+ downloadBPSMean, downloadBPSMax,
+ uploadBPSMean, uploadBPSMax}) {
+ this.downloadQueue =
+ new NetworkThrottleQueue(downloadBPSMean, downloadBPSMax,
+ roundTripTimeMean, roundTripTimeMax);
+ this.uploadQueue = Cc["@mozilla.org/network/throttlequeue;1"]
+ .createInstance(Ci.nsIInputChannelThrottleQueue);
+ this.uploadQueue.init(uploadBPSMean, uploadBPSMax);
+}
+exports.NetworkThrottleManager = NetworkThrottleManager;
+
+NetworkThrottleManager.prototype = {
+ /**
+ * Create a new NetworkThrottleListener for a given channel and
+ * install it using |setNewListener|.
+ *
+ * @param {nsITraceableChannel} channel the channel to manage
+ * @return {NetworkThrottleListener} the new listener
+ */
+ manage: function (channel) {
+ let listener = new NetworkThrottleListener(this.downloadQueue);
+ let originalListener = channel.setNewListener(listener);
+ listener.setOriginalListener(originalListener);
+ return listener;
+ },
+
+ /**
+ * Throttle uploads taking place on the given channel.
+ *
+ * @param {nsITraceableChannel} channel the channel to manage
+ */
+ manageUpload: function (channel) {
+ channel = channel.QueryInterface(Ci.nsIThrottledInputChannel);
+ channel.throttleQueue = this.uploadQueue;
+ },
+};