Bug 1244227 - add nsIThrottledInputChannel.idl and implement; r=mcmanus draft
authorTom Tromey <tom@tromey.com>
Tue, 23 Feb 2016 14:26:45 -0700
changeset 390764 b55c62dc23cf79457ad0612159c0189e099e4e5a
parent 390691 f5154aaeaec4546dd04c66ac61a0d0ee69f4dfdf
child 390765 841550a1c6d69b7853ac2e4af8fc91c84c1fe97e
push id23743
push userbmo:ttromey@mozilla.com
push dateThu, 21 Jul 2016 18:21:08 +0000
reviewersmcmanus
bugs1244227
milestone50.0a1
Bug 1244227 - add nsIThrottledInputChannel.idl and implement; r=mcmanus MozReview-Commit-ID: JVIjxEO901W
netwerk/base/ThrottleQueue.cpp
netwerk/base/ThrottleQueue.h
netwerk/base/moz.build
netwerk/base/nsIThrottledInputChannel.idl
netwerk/build/nsNetCID.h
netwerk/build/nsNetModule.cpp
netwerk/protocol/http/HttpBaseChannel.cpp
netwerk/protocol/http/HttpBaseChannel.h
netwerk/protocol/http/nsHttpTransaction.cpp
netwerk/test/unit/test_throttlechannel.js
netwerk/test/unit/test_throttlequeue.js
netwerk/test/unit/test_throttling.js
netwerk/test/unit/xpcshell.ini
new file mode 100644
--- /dev/null
+++ b/netwerk/base/ThrottleQueue.cpp
@@ -0,0 +1,392 @@
+/* -*- Mode: C++; tab-width: 8; indent-tabs-mode: nil; c-basic-offset: 2 -*- */
+/* vim: set ts=8 sts=2 et sw=2 tw=80: */
+/* This Source Code Form is subject to the terms of the Mozilla Public
+ * License, v. 2.0. If a copy of the MPL was not distributed with this
+ * file, You can obtain one at http://mozilla.org/MPL/2.0/. */
+
+#include "ThrottleQueue.h"
+#include "nsISeekableStream.h"
+#include "nsIAsyncInputStream.h"
+#include "nsStreamUtils.h"
+#include "nsNetUtil.h"
+
+namespace mozilla {
+namespace net {
+
+//-----------------------------------------------------------------------------
+
+class ThrottleInputStream final
+  : public nsIAsyncInputStream
+  , public nsISeekableStream
+{
+public:
+
+  ThrottleInputStream(nsIInputStream* aStream, ThrottleQueue* aQueue);
+
+  NS_DECL_THREADSAFE_ISUPPORTS
+  NS_DECL_NSIINPUTSTREAM
+  NS_DECL_NSISEEKABLESTREAM
+  NS_DECL_NSIASYNCINPUTSTREAM
+
+  void AllowInput();
+
+private:
+
+  ~ThrottleInputStream();
+
+  nsCOMPtr<nsIInputStream> mStream;
+  RefPtr<ThrottleQueue> mQueue;
+  nsresult mClosedStatus;
+
+  nsCOMPtr<nsIInputStreamCallback> mCallback;
+  nsCOMPtr<nsIEventTarget> mEventTarget;
+};
+
+NS_IMPL_ISUPPORTS(ThrottleInputStream, nsIAsyncInputStream, nsIInputStream, nsISeekableStream)
+
+ThrottleInputStream::ThrottleInputStream(nsIInputStream *aStream, ThrottleQueue* aQueue)
+  : mStream(aStream)
+  , mQueue(aQueue)
+  , mClosedStatus(NS_OK)
+{
+  MOZ_ASSERT(aQueue != nullptr);
+}
+
+ThrottleInputStream::~ThrottleInputStream()
+{
+  Close();
+}
+
+NS_IMETHODIMP
+ThrottleInputStream::Close()
+{
+  if (NS_FAILED(mClosedStatus)) {
+    return mClosedStatus;
+  }
+
+  if (mQueue) {
+    mQueue->DequeueStream(this);
+    mQueue = nullptr;
+    mClosedStatus = NS_BASE_STREAM_CLOSED;
+  }
+  return mStream->Close();
+}
+
+NS_IMETHODIMP
+ThrottleInputStream::Available(uint64_t* aResult)
+{
+  if (NS_FAILED(mClosedStatus)) {
+    return mClosedStatus;
+  }
+
+  return mStream->Available(aResult);
+}
+
+NS_IMETHODIMP
+ThrottleInputStream::Read(char* aBuf, uint32_t aCount, uint32_t* aResult)
+{
+  if (NS_FAILED(mClosedStatus)) {
+    return mClosedStatus;
+  }
+
+  uint32_t realCount;
+  nsresult rv = mQueue->Available(aCount, &realCount);
+  if (NS_FAILED(rv)) {
+    return rv;
+  }
+
+  if (realCount == 0) {
+    return NS_BASE_STREAM_WOULD_BLOCK;
+  }
+
+  rv = mStream->Read(aBuf, realCount, aResult);
+  if (NS_SUCCEEDED(rv) && *aResult > 0) {
+    mQueue->RecordRead(*aResult);
+  }
+  return rv;
+}
+
+NS_IMETHODIMP
+ThrottleInputStream::ReadSegments(nsWriteSegmentFun aWriter, void* aClosure,
+                                  uint32_t aCount, uint32_t* aResult)
+{
+  if (NS_FAILED(mClosedStatus)) {
+    return mClosedStatus;
+  }
+
+  uint32_t realCount;
+  nsresult rv = mQueue->Available(aCount, &realCount);
+  if (NS_FAILED(rv)) {
+    return rv;
+  }
+
+  if (realCount == 0) {
+    return NS_BASE_STREAM_WOULD_BLOCK;
+  }
+
+  rv = mStream->ReadSegments(aWriter, aClosure, realCount, aResult);
+  if (NS_SUCCEEDED(rv) && *aResult > 0) {
+    mQueue->RecordRead(*aResult);
+  }
+  return rv;
+}
+
+NS_IMETHODIMP
+ThrottleInputStream::IsNonBlocking(bool* aNonBlocking)
+{
+  *aNonBlocking = true;
+  return NS_OK;
+}
+
+NS_IMETHODIMP
+ThrottleInputStream::Seek(int32_t aWhence, int64_t aOffset)
+{
+  if (NS_FAILED(mClosedStatus)) {
+    return mClosedStatus;
+  }
+
+  nsCOMPtr<nsISeekableStream> sstream = do_QueryInterface(mStream);
+  if (!sstream) {
+    return NS_ERROR_FAILURE;
+  }
+
+  return sstream->Seek(aWhence, aOffset);
+}
+
+NS_IMETHODIMP
+ThrottleInputStream::Tell(int64_t* aResult)
+{
+  if (NS_FAILED(mClosedStatus)) {
+    return mClosedStatus;
+  }
+
+  nsCOMPtr<nsISeekableStream> sstream = do_QueryInterface(mStream);
+  if (!sstream) {
+    return NS_ERROR_FAILURE;
+  }
+
+  return sstream->Tell(aResult);
+}
+
+NS_IMETHODIMP
+ThrottleInputStream::SetEOF()
+{
+  if (NS_FAILED(mClosedStatus)) {
+    return mClosedStatus;
+  }
+
+  nsCOMPtr<nsISeekableStream> sstream = do_QueryInterface(mStream);
+  if (!sstream) {
+    return NS_ERROR_FAILURE;
+  }
+
+  return sstream->SetEOF();
+}
+
+NS_IMETHODIMP
+ThrottleInputStream::CloseWithStatus(nsresult aStatus)
+{
+  if (NS_FAILED(mClosedStatus)) {
+    // Already closed, ignore.
+    return NS_OK;
+  }
+  if (NS_SUCCEEDED(aStatus)) {
+    aStatus = NS_BASE_STREAM_CLOSED;
+  }
+
+  mClosedStatus = Close();
+  if (NS_SUCCEEDED(mClosedStatus)) {
+    mClosedStatus = aStatus;
+  }
+  return NS_OK;
+}
+
+NS_IMETHODIMP
+ThrottleInputStream::AsyncWait(nsIInputStreamCallback *aCallback,
+                               uint32_t aFlags,
+                               uint32_t aRequestedCount,
+                               nsIEventTarget *aEventTarget)
+{
+  if (aFlags != 0) {
+    return NS_ERROR_ILLEGAL_VALUE;
+  }
+
+  mCallback = aCallback;
+  mEventTarget = aEventTarget;
+  if (mCallback) {
+    mQueue->QueueStream(this);
+  } else {
+    mQueue->DequeueStream(this);
+  }
+  return NS_OK;
+}
+
+void
+ThrottleInputStream::AllowInput()
+{
+  MOZ_ASSERT(mCallback);
+  nsCOMPtr<nsIInputStreamCallback> callbackEvent =
+    NS_NewInputStreamReadyEvent(mCallback, mEventTarget);
+  mCallback = nullptr;
+  mEventTarget = nullptr;
+  callbackEvent->OnInputStreamReady(this);
+}
+
+//-----------------------------------------------------------------------------
+
+NS_IMPL_ISUPPORTS(ThrottleQueue, nsIInputChannelThrottleQueue, nsITimerCallback)
+
+ThrottleQueue::ThrottleQueue()
+  : mMeanBytesPerSecond(0)
+  , mMaxBytesPerSecond(0)
+  , mBytesProcessed(0)
+  , mTimerArmed(false)
+{
+  nsresult rv;
+  nsCOMPtr<nsIEventTarget> sts;
+  nsCOMPtr<nsIIOService> ioService = do_GetIOService(&rv);
+  if (NS_SUCCEEDED(rv))
+    sts = do_GetService(NS_SOCKETTRANSPORTSERVICE_CONTRACTID, &rv);
+  if (NS_SUCCEEDED(rv))
+    mTimer = do_CreateInstance("@mozilla.org/timer;1");
+  if (mTimer)
+    mTimer->SetTarget(sts);
+}
+
+ThrottleQueue::~ThrottleQueue()
+{
+  if (mTimer && mTimerArmed) {
+    mTimer->Cancel();
+  }
+  mTimer = nullptr;
+}
+
+NS_IMETHODIMP
+ThrottleQueue::RecordRead(uint32_t aBytesRead)
+{
+  MOZ_ASSERT(PR_GetCurrentThread() == gSocketThread);
+  ThrottleEntry entry;
+  entry.mTime = TimeStamp::Now();
+  entry.mBytesRead = aBytesRead;
+  mReadEvents.AppendElement(entry);
+  mBytesProcessed += aBytesRead;
+  return NS_OK;
+}
+
+NS_IMETHODIMP
+ThrottleQueue::Available(uint32_t aRemaining, uint32_t* aAvailable)
+{
+  MOZ_ASSERT(PR_GetCurrentThread() == gSocketThread);
+  TimeStamp now = TimeStamp::Now();
+  TimeStamp oneSecondAgo = now - TimeDuration::FromSeconds(1);
+  size_t i;
+
+  // Remove all stale events.
+  for (i = 0; i < mReadEvents.Length(); ++i) {
+    if (mReadEvents[i].mTime >= oneSecondAgo) {
+      break;
+    }
+  }
+  mReadEvents.RemoveElementsAt(0, i);
+
+  uint32_t totalBytes = 0;
+  for (i = 0; i < mReadEvents.Length(); ++i) {
+    totalBytes += mReadEvents[i].mBytesRead;
+  }
+
+  uint32_t spread = mMaxBytesPerSecond - mMeanBytesPerSecond;
+  double prob = static_cast<double>(rand()) / RAND_MAX;
+  uint32_t thisSliceBytes = mMeanBytesPerSecond - spread +
+    static_cast<uint32_t>(2 * spread * prob);
+
+  if (totalBytes >= thisSliceBytes) {
+    *aAvailable = 0;
+  } else {
+    *aAvailable = thisSliceBytes;
+  }
+  return NS_OK;
+}
+
+NS_IMETHODIMP
+ThrottleQueue::Init(uint32_t aMeanBytesPerSecond, uint32_t aMaxBytesPerSecond)
+{
+  // Can be called on any thread.
+  if (aMeanBytesPerSecond == 0 || aMaxBytesPerSecond == 0 || aMaxBytesPerSecond < aMeanBytesPerSecond) {
+    return NS_ERROR_ILLEGAL_VALUE;
+  }
+
+  mMeanBytesPerSecond = aMeanBytesPerSecond;
+  mMaxBytesPerSecond = aMaxBytesPerSecond;
+  return NS_OK;
+}
+
+NS_IMETHODIMP
+ThrottleQueue::BytesProcessed(uint64_t* aResult)
+{
+  *aResult = mBytesProcessed;
+  return NS_OK;
+}
+
+NS_IMETHODIMP
+ThrottleQueue::WrapStream(nsIInputStream* aInputStream, nsIAsyncInputStream** aResult)
+{
+  nsCOMPtr<nsIAsyncInputStream> result = new ThrottleInputStream(aInputStream, this);
+  result.forget(aResult);
+  return NS_OK;
+}
+
+NS_IMETHODIMP
+ThrottleQueue::Notify(nsITimer* aTimer)
+{
+  MOZ_ASSERT(PR_GetCurrentThread() == gSocketThread);
+  // A notified reader may need to push itself back on the queue.
+  // Swap out the list of readers so that this works properly.
+  nsTArray<RefPtr<ThrottleInputStream>> events;
+  events.SwapElements(mAsyncEvents);
+
+  // Optimistically notify all the waiting readers, and then let them
+  // requeue if there isn't enough bandwidth.
+  for (size_t i = 0; i < events.Length(); ++i) {
+    events[i]->AllowInput();
+  }
+
+  mTimerArmed = false;
+  return NS_OK;
+}
+
+void
+ThrottleQueue::QueueStream(ThrottleInputStream* aStream)
+{
+  MOZ_ASSERT(PR_GetCurrentThread() == gSocketThread);
+  if (mAsyncEvents.IndexOf(aStream) == mAsyncEvents.NoIndex) {
+    mAsyncEvents.AppendElement(aStream);
+
+    if (!mTimerArmed) {
+      uint32_t ms = 1000;
+      if (mReadEvents.Length() > 0) {
+        TimeStamp t = mReadEvents[0].mTime + TimeDuration::FromSeconds(1);
+        TimeStamp now = TimeStamp::Now();
+
+        if (t > now) {
+          ms = static_cast<uint32_t>((t - now).ToMilliseconds());
+        } else {
+          ms = 1;
+        }
+      }
+
+      if (NS_SUCCEEDED(mTimer->InitWithCallback(this, ms, nsITimer::TYPE_ONE_SHOT))) {
+        mTimerArmed = true;
+      }
+    }
+  }
+}
+
+void
+ThrottleQueue::DequeueStream(ThrottleInputStream* aStream)
+{
+  MOZ_ASSERT(PR_GetCurrentThread() == gSocketThread);
+  mAsyncEvents.RemoveElement(aStream);
+}
+
+}
+}
new file mode 100644
--- /dev/null
+++ b/netwerk/base/ThrottleQueue.h
@@ -0,0 +1,65 @@
+/* -*- Mode: C++; tab-width: 8; indent-tabs-mode: nil; c-basic-offset: 2 -*- */
+/* vim: set ts=8 sts=2 et sw=2 tw=80: */
+/* This Source Code Form is subject to the terms of the Mozilla Public
+ * License, v. 2.0. If a copy of the MPL was not distributed with this
+ * file, You can obtain one at http://mozilla.org/MPL/2.0/. */
+
+#ifndef mozilla_net_ThrottleQueue_h
+#define mozilla_net_ThrottleQueue_h
+
+#include "mozilla/TimeStamp.h"
+#include "nsIThrottledInputChannel.h"
+#include "nsITimer.h"
+
+namespace mozilla {
+namespace net {
+
+class ThrottleInputStream;
+
+/**
+ * An implementation of nsIInputChannelThrottleQueue that can be used
+ * to throttle uploads.  This class is not thread-safe.
+ * Initialization and calls to WrapStream may be done on any thread;
+ * but otherwise, after creation, it can only be used on the socket
+ * thread.  It currently throttles with a one second granularity, so
+ * may be a bit choppy.
+ */
+
+class ThrottleQueue final
+  : public nsIInputChannelThrottleQueue
+  , public nsITimerCallback
+{
+public:
+
+  ThrottleQueue();
+
+  NS_DECL_THREADSAFE_ISUPPORTS
+  NS_DECL_NSIINPUTCHANNELTHROTTLEQUEUE
+  NS_DECL_NSITIMERCALLBACK
+
+  void QueueStream(ThrottleInputStream* aStream);
+  void DequeueStream(ThrottleInputStream* aStream);
+
+private:
+
+  ~ThrottleQueue();
+
+  struct ThrottleEntry {
+    TimeStamp mTime;
+    uint32_t mBytesRead;
+  };
+
+  nsTArray<ThrottleEntry> mReadEvents;
+  uint32_t mMeanBytesPerSecond;
+  uint32_t mMaxBytesPerSecond;
+  uint64_t mBytesProcessed;
+
+  nsTArray<RefPtr<ThrottleInputStream>> mAsyncEvents;
+  nsCOMPtr<nsITimer> mTimer;
+  bool mTimerArmed;
+};
+
+}
+}
+
+#endif //  mozilla_net_ThrottleQueue_h
--- a/netwerk/base/moz.build
+++ b/netwerk/base/moz.build
@@ -120,16 +120,17 @@ XPIDL_SOURCES += [
     'nsIStreamListener.idl',
     'nsIStreamListenerTee.idl',
     'nsIStreamLoader.idl',
     'nsIStreamTransportService.idl',
     'nsISyncStreamListener.idl',
     'nsISystemProxySettings.idl',
     'nsIThreadRetargetableRequest.idl',
     'nsIThreadRetargetableStreamListener.idl',
+    'nsIThrottledInputChannel.idl',
     'nsITimedChannel.idl',
     'nsITLSServerSocket.idl',
     'nsITraceableChannel.idl',
     'nsITransport.idl',
     'nsIUDPSocket.idl',
     'nsIUnicharStreamLoader.idl',
     'nsIUploadChannel.idl',
     'nsIUploadChannel2.idl',
@@ -255,16 +256,17 @@ UNIFIED_SOURCES += [
     'OfflineObserver.cpp',
     'PollableEvent.cpp',
     'Predictor.cpp',
     'ProxyAutoConfig.cpp',
     'RedirectChannelRegistrar.cpp',
     'RequestContextService.cpp',
     'SimpleBuffer.cpp',
     'StreamingProtocolService.cpp',
+    'ThrottleQueue.cpp',
     'Tickler.cpp',
     'TLSServerSocket.cpp',
 ]
 
 if CONFIG['MOZ_WIDGET_TOOLKIT'] == 'windows':
     SOURCES += [
         'nsURLHelperWin.cpp',
         'ShutdownLayer.cpp',
new file mode 100644
--- /dev/null
+++ b/netwerk/base/nsIThrottledInputChannel.idl
@@ -0,0 +1,80 @@
+/* -*- Mode: C++; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */
+/* This Source Code Form is subject to the terms of the Mozilla Public
+ * License, v. 2.0. If a copy of the MPL was not distributed with this
+ * file, You can obtain one at http://mozilla.org/MPL/2.0/. */
+
+#include "nsISupports.idl"
+
+interface nsIInputStream;
+interface nsIAsyncInputStream;
+
+/**
+ * An instance of this interface can be used to throttle the uploads
+ * of a group of associated channels.
+ */
+[scriptable, uuid(6b4b96fe-3c67-4587-af7b-58b6b17da411)]
+interface nsIInputChannelThrottleQueue : nsISupports
+{
+    /**
+     * Initialize this object with the mean and maximum bytes per
+     * second that will be allowed.  Neither value may be zero, and
+     * the maximum must not be less than the mean.
+     *
+     * @param aMeanBytesPerSecond
+     *        Mean number of bytes per second.
+     * @param aMaxBytesPerSecond
+     *        Maximum number of bytes per second.
+     */
+    void init(in unsigned long aMeanBytesPerSecond, in unsigned long aMaxBytesPerSecond);
+
+    /**
+     * Return the number of bytes that are available to the caller in
+     * this time slice.
+     *
+     * @param aRemaining
+     *        The number of bytes available to be processed
+     * @return the number of bytes allowed to be processed during this
+     *        time slice; this will never be greater than aRemaining.
+     */
+    unsigned long available(in unsigned long aRemaining);
+
+    /**
+     * Record a successful read.
+     *
+     * @param aBytesRead
+     *        The number of bytes actually read.
+     */
+    void recordRead(in unsigned long aBytesRead);
+
+    /**
+     * Return the number of bytes allowed through this queue.  This is
+     * the sum of all the values passed to recordRead.  This method is
+     * primarily useful for testing.
+     */
+    unsigned long long bytesProcessed();
+
+    /**
+     * Wrap the given input stream in a new input stream which
+     * throttles the incoming data.
+     *
+     * @param aInputStream the input stream to wrap
+     * @return a new input stream that throttles the data.
+     */
+    nsIAsyncInputStream wrapStream(in nsIInputStream aInputStream);
+};
+
+/**
+ * A throttled input channel can be managed by an
+ * nsIInputChannelThrottleQueue to limit how much data is sent during
+ * a given time slice.
+ */
+[scriptable, uuid(0a32a100-c031-45b6-9e8b-0444c7d4a143)]
+interface nsIThrottledInputChannel : nsISupports
+{
+    /**
+     * The queue that manages this channel.  Multiple channels can
+     * share a single queue.  A null value means that no throttling
+     * will be done.
+     */
+    attribute nsIInputChannelThrottleQueue throttleQueue;
+};
--- a/netwerk/build/nsNetCID.h
+++ b/netwerk/build/nsNetCID.h
@@ -620,16 +620,26 @@
 #define NS_HTTPACTIVITYDISTRIBUTOR_CID \
 { /* 15629ada-a41c-4a09-961f-6553cd60b1a2 */         \
     0x15629ada,                                      \
     0xa41c,                                          \
     0x4a09,                                          \
     {0x96, 0x1f, 0x65, 0x53, 0xcd, 0x60, 0xb1, 0xa2} \
 }
 
+#define NS_THROTTLEQUEUE_CONTRACTID \
+    "@mozilla.org/network/throttlequeue;1"
+#define NS_THROTTLEQUEUE_CID                            \
+{ /* 4c39159c-cd90-4dd3-97a7-06af5e6d84c4 */            \
+    0x4c39159c,                                         \
+    0xcd90,                                             \
+    0x4dd3,                                             \
+    {0x97, 0xa7, 0x06, 0xaf, 0x5e, 0x6d, 0x84, 0xc4}    \
+}
+
 /******************************************************************************
  * netwerk/protocol/ftp/ classes
  */
 
 #define NS_FTPPROTOCOLHANDLER_CID \
 { /* 25029490-F132-11d2-9588-00805F369F95 */         \
     0x25029490,                                      \
     0xf132,                                          \
--- a/netwerk/build/nsNetModule.cpp
+++ b/netwerk/build/nsNetModule.cpp
@@ -264,28 +264,30 @@ NS_GENERIC_FACTORY_CONSTRUCTOR_INIT(nsFt
 #undef LOG
 #undef LOG_ENABLED
 #include "nsHttpAuthManager.h"
 #include "nsHttpChannelAuthProvider.h"
 #include "nsHttpBasicAuth.h"
 #include "nsHttpDigestAuth.h"
 #include "nsHttpNTLMAuth.h"
 #include "nsHttpActivityDistributor.h"
+#include "ThrottleQueue.h"
 #undef LOG
 #undef LOG_ENABLED
 namespace mozilla {
 namespace net {
 NS_GENERIC_FACTORY_CONSTRUCTOR(nsHttpNTLMAuth)
 NS_GENERIC_FACTORY_CONSTRUCTOR_INIT(nsHttpHandler, Init)
 NS_GENERIC_FACTORY_CONSTRUCTOR_INIT(nsHttpsHandler, Init)
 NS_GENERIC_FACTORY_CONSTRUCTOR_INIT(nsHttpAuthManager, Init)
 NS_GENERIC_FACTORY_CONSTRUCTOR(nsHttpChannelAuthProvider)
 NS_GENERIC_FACTORY_CONSTRUCTOR(nsHttpActivityDistributor)
 NS_GENERIC_FACTORY_CONSTRUCTOR(nsHttpBasicAuth)
 NS_GENERIC_FACTORY_CONSTRUCTOR(nsHttpDigestAuth)
+NS_GENERIC_FACTORY_CONSTRUCTOR(ThrottleQueue)
 } // namespace net
 } // namespace mozilla
 #endif // !NECKO_PROTOCOL_http
 
 #include "mozilla/net/Dashboard.h"
 #include "mozilla/net/PackagedAppService.h"
 #include "mozilla/net/PackagedAppVerifier.h"
 namespace mozilla {
@@ -789,16 +791,17 @@ NS_DEFINE_NAMED_CID(NS_FILEPROTOCOLHANDL
 NS_DEFINE_NAMED_CID(NS_HTTPPROTOCOLHANDLER_CID);
 NS_DEFINE_NAMED_CID(NS_HTTPSPROTOCOLHANDLER_CID);
 NS_DEFINE_NAMED_CID(NS_HTTPBASICAUTH_CID);
 NS_DEFINE_NAMED_CID(NS_HTTPDIGESTAUTH_CID);
 NS_DEFINE_NAMED_CID(NS_HTTPNTLMAUTH_CID);
 NS_DEFINE_NAMED_CID(NS_HTTPAUTHMANAGER_CID);
 NS_DEFINE_NAMED_CID(NS_HTTPCHANNELAUTHPROVIDER_CID);
 NS_DEFINE_NAMED_CID(NS_HTTPACTIVITYDISTRIBUTOR_CID);
+NS_DEFINE_NAMED_CID(NS_THROTTLEQUEUE_CID);
 #endif // !NECKO_PROTOCOL_http
 #ifdef NECKO_PROTOCOL_ftp
 NS_DEFINE_NAMED_CID(NS_FTPPROTOCOLHANDLER_CID);
 #endif
 #ifdef NECKO_PROTOCOL_res
 NS_DEFINE_NAMED_CID(NS_RESPROTOCOLHANDLER_CID);
 NS_DEFINE_NAMED_CID(NS_EXTENSIONPROTOCOLHANDLER_CID);
 NS_DEFINE_NAMED_CID(NS_SUBSTITUTINGURL_CID);
@@ -939,16 +942,17 @@ static const mozilla::Module::CIDEntry k
     { &kNS_HTTPPROTOCOLHANDLER_CID, false, nullptr, mozilla::net::nsHttpHandlerConstructor },
     { &kNS_HTTPSPROTOCOLHANDLER_CID, false, nullptr, mozilla::net::nsHttpsHandlerConstructor },
     { &kNS_HTTPBASICAUTH_CID, false, nullptr, mozilla::net::nsHttpBasicAuthConstructor },
     { &kNS_HTTPDIGESTAUTH_CID, false, nullptr, mozilla::net::nsHttpDigestAuthConstructor },
     { &kNS_HTTPNTLMAUTH_CID, false, nullptr, mozilla::net::nsHttpNTLMAuthConstructor },
     { &kNS_HTTPAUTHMANAGER_CID, false, nullptr, mozilla::net::nsHttpAuthManagerConstructor },
     { &kNS_HTTPCHANNELAUTHPROVIDER_CID, false, nullptr, mozilla::net::nsHttpChannelAuthProviderConstructor },
     { &kNS_HTTPACTIVITYDISTRIBUTOR_CID, false, nullptr, mozilla::net::nsHttpActivityDistributorConstructor },
+    { &kNS_THROTTLEQUEUE_CID, false, nullptr, mozilla::net::ThrottleQueueConstructor },
 #endif // !NECKO_PROTOCOL_http
 #ifdef NECKO_PROTOCOL_ftp
     { &kNS_FTPPROTOCOLHANDLER_CID, false, nullptr, nsFtpProtocolHandlerConstructor },
 #endif
 #ifdef NECKO_PROTOCOL_res
     { &kNS_RESPROTOCOLHANDLER_CID, false, nullptr, nsResProtocolHandlerConstructor },
     { &kNS_EXTENSIONPROTOCOLHANDLER_CID, false, nullptr, mozilla::ExtensionProtocolHandlerConstructor },
     { &kNS_SUBSTITUTINGURL_CID, false, nullptr, mozilla::SubstitutingURLConstructor },
@@ -1100,16 +1104,17 @@ static const mozilla::Module::ContractID
     { NS_NETWORK_PROTOCOL_CONTRACTID_PREFIX "http", &kNS_HTTPPROTOCOLHANDLER_CID },
     { NS_NETWORK_PROTOCOL_CONTRACTID_PREFIX "https", &kNS_HTTPSPROTOCOLHANDLER_CID },
     { NS_HTTP_AUTHENTICATOR_CONTRACTID_PREFIX "basic", &kNS_HTTPBASICAUTH_CID },
     { NS_HTTP_AUTHENTICATOR_CONTRACTID_PREFIX "digest", &kNS_HTTPDIGESTAUTH_CID },
     { NS_HTTP_AUTHENTICATOR_CONTRACTID_PREFIX "ntlm", &kNS_HTTPNTLMAUTH_CID },
     { NS_HTTPAUTHMANAGER_CONTRACTID, &kNS_HTTPAUTHMANAGER_CID },
     { NS_HTTPCHANNELAUTHPROVIDER_CONTRACTID, &kNS_HTTPCHANNELAUTHPROVIDER_CID },
     { NS_HTTPACTIVITYDISTRIBUTOR_CONTRACTID, &kNS_HTTPACTIVITYDISTRIBUTOR_CID },
+    { NS_THROTTLEQUEUE_CONTRACTID, &kNS_THROTTLEQUEUE_CID },
 #endif // !NECKO_PROTOCOL_http
 #ifdef NECKO_PROTOCOL_ftp
     { NS_NETWORK_PROTOCOL_CONTRACTID_PREFIX "ftp", &kNS_FTPPROTOCOLHANDLER_CID },
 #endif
 #ifdef NECKO_PROTOCOL_res
     { NS_NETWORK_PROTOCOL_CONTRACTID_PREFIX "resource", &kNS_RESPROTOCOLHANDLER_CID },
     { NS_NETWORK_PROTOCOL_CONTRACTID_PREFIX "moz-extension", &kNS_EXTENSIONPROTOCOLHANDLER_CID },
 #endif
--- a/netwerk/protocol/http/HttpBaseChannel.cpp
+++ b/netwerk/protocol/http/HttpBaseChannel.cpp
@@ -222,16 +222,17 @@ NS_INTERFACE_MAP_BEGIN(HttpBaseChannel)
   NS_INTERFACE_MAP_ENTRY(nsIUploadChannel)
   NS_INTERFACE_MAP_ENTRY(nsIFormPOSTActionChannel)
   NS_INTERFACE_MAP_ENTRY(nsIUploadChannel2)
   NS_INTERFACE_MAP_ENTRY(nsISupportsPriority)
   NS_INTERFACE_MAP_ENTRY(nsITraceableChannel)
   NS_INTERFACE_MAP_ENTRY(nsIPrivateBrowsingChannel)
   NS_INTERFACE_MAP_ENTRY(nsITimedChannel)
   NS_INTERFACE_MAP_ENTRY(nsIConsoleReportCollector)
+  NS_INTERFACE_MAP_ENTRY(nsIThrottledInputChannel)
 NS_INTERFACE_MAP_END_INHERITING(nsHashPropertyBag)
 
 //-----------------------------------------------------------------------------
 // HttpBaseChannel::nsIRequest
 //-----------------------------------------------------------------------------
 
 NS_IMETHODIMP
 HttpBaseChannel::GetName(nsACString& aName)
@@ -3436,16 +3437,38 @@ HttpBaseChannel::GetInnerDOMWindow()
     nsCOMPtr<nsPIDOMWindowInner> innerWindow = pDomWindow->GetCurrentInnerWindow();
     if (!innerWindow) {
       return nullptr;
     }
 
     return innerWindow;
 }
 
+//-----------------------------------------------------------------------------
+// HttpBaseChannel::nsIThrottledInputChannel
+//-----------------------------------------------------------------------------
+
+NS_IMETHODIMP
+HttpBaseChannel::SetThrottleQueue(nsIInputChannelThrottleQueue* aQueue)
+{
+  if (!XRE_IsParentProcess()) {
+    return NS_ERROR_FAILURE;
+  }
+
+  mThrottleQueue = aQueue;
+  return NS_OK;
+}
+
+NS_IMETHODIMP
+HttpBaseChannel::GetThrottleQueue(nsIInputChannelThrottleQueue** aQueue)
+{
+  *aQueue = mThrottleQueue;
+  return NS_OK;
+}
+
 //------------------------------------------------------------------------------
 
 bool
 HttpBaseChannel::EnsureRequestContextID()
 {
     nsID nullID;
     nullID.Clear();
     if (!mRequestContextID.Equals(nullID)) {
--- a/netwerk/protocol/http/HttpBaseChannel.h
+++ b/netwerk/protocol/http/HttpBaseChannel.h
@@ -38,16 +38,17 @@
 #include "nsThreadUtils.h"
 #include "PrivateBrowsingChannel.h"
 #include "mozilla/net/DNS.h"
 #include "nsITimedChannel.h"
 #include "nsIHttpChannel.h"
 #include "nsISecurityConsoleMessage.h"
 #include "nsCOMArray.h"
 #include "mozilla/net/ChannelEventQueue.h"
+#include "nsIThrottledInputChannel.h"
 
 class nsISecurityConsoleMessage;
 class nsIPrincipal;
 
 namespace mozilla {
 
 namespace dom {
 class Performance;
@@ -74,27 +75,29 @@ class HttpBaseChannel : public nsHashPro
                       , public nsISupportsPriority
                       , public nsIClassOfService
                       , public nsIResumableChannel
                       , public nsITraceableChannel
                       , public PrivateBrowsingChannel<HttpBaseChannel>
                       , public nsITimedChannel
                       , public nsIForcePendingChannel
                       , public nsIConsoleReportCollector
+                      , public nsIThrottledInputChannel
 {
 protected:
   virtual ~HttpBaseChannel();
 
 public:
   NS_DECL_ISUPPORTS_INHERITED
   NS_DECL_NSIUPLOADCHANNEL
   NS_DECL_NSIFORMPOSTACTIONCHANNEL
   NS_DECL_NSIUPLOADCHANNEL2
   NS_DECL_NSITRACEABLECHANNEL
   NS_DECL_NSITIMEDCHANNEL
+  NS_DECL_NSITHROTTLEDINPUTCHANNEL
 
   HttpBaseChannel();
 
   virtual nsresult Init(nsIURI *aURI, uint32_t aCaps, nsProxyInfo *aProxyInfo,
                         uint32_t aProxyResolveFlags,
                         nsIURI *aProxyURI,
                         const nsID& aChannelId);
 
@@ -379,16 +382,18 @@ protected:
   nsCOMPtr<nsIProgressEventSink>    mProgressSink;
   nsCOMPtr<nsIURI>                  mReferrer;
   nsCOMPtr<nsIApplicationCache>     mApplicationCache;
 
   // An instance of nsHTTPCompressConv
   nsCOMPtr<nsIStreamListener>       mCompressListener;
 
   nsHttpRequestHead                 mRequestHead;
+  // Upload throttling.
+  nsCOMPtr<nsIInputChannelThrottleQueue> mThrottleQueue;
   nsCOMPtr<nsIInputStream>          mUploadStream;
   nsCOMPtr<nsIRunnable>             mUploadCloneableCallback;
   nsAutoPtr<nsHttpResponseHead>     mResponseHead;
   RefPtr<nsHttpConnectionInfo>    mConnectionInfo;
   nsCOMPtr<nsIProxyInfo>            mProxyInfo;
   nsCOMPtr<nsISupports>             mSecurityInfo;
 
   nsCString                         mSpec; // ASCII encoded URL spec
--- a/netwerk/protocol/http/nsHttpTransaction.cpp
+++ b/netwerk/protocol/http/nsHttpTransaction.cpp
@@ -28,16 +28,17 @@
 #include "nsComponentManagerUtils.h" // do_CreateInstance
 #include "nsServiceManagerUtils.h"   // do_GetService
 #include "nsIHttpActivityObserver.h"
 #include "nsSocketTransportService2.h"
 #include "nsICancelable.h"
 #include "nsIEventTarget.h"
 #include "nsIHttpChannelInternal.h"
 #include "nsIInputStream.h"
+#include "nsIThrottledInputChannel.h"
 #include "nsITransport.h"
 #include "nsIOService.h"
 #include "nsIRequestContext.h"
 #include <algorithm>
 
 #ifdef MOZ_WIDGET_GONK
 #include "NetStatistics.h"
 #endif
@@ -227,16 +228,17 @@ nsHttpTransaction::Init(uint32_t caps,
 {
     nsresult rv;
 
     LOG(("nsHttpTransaction::Init [this=%p caps=%x]\n", this, caps));
 
     MOZ_ASSERT(cinfo);
     MOZ_ASSERT(requestHead);
     MOZ_ASSERT(target);
+    MOZ_ASSERT(NS_IsMainThread());
 
     mActivityDistributor = do_GetService(NS_HTTPACTIVITYDISTRIBUTOR_CONTRACTID, &rv);
     if (NS_FAILED(rv)) return rv;
 
     bool activityDistributorActive;
     rv = mActivityDistributor->GetIsActive(&activityDistributorActive);
     if (NS_SUCCEEDED(rv) && activityDistributorActive) {
         // there are some observers registered at activity distributor, gather
@@ -374,16 +376,35 @@ nsHttpTransaction::Init(uint32_t caps,
         // necessary to workaround some common server bugs (see bug 137155).
         rv = NS_NewBufferedInputStream(getter_AddRefs(mRequestStream), multi,
                                        nsIOService::gDefaultSegmentSize);
         if (NS_FAILED(rv)) return rv;
     }
     else
         mRequestStream = headers;
 
+    nsCOMPtr<nsIThrottledInputChannel> throttled = do_QueryInterface(mChannel);
+    nsIInputChannelThrottleQueue* queue;
+    if (throttled) {
+        rv = throttled->GetThrottleQueue(&queue);
+        // In case of failure, just carry on without throttling.
+        if (NS_SUCCEEDED(rv) && queue) {
+            nsCOMPtr<nsIAsyncInputStream> wrappedStream;
+            rv = queue->WrapStream(mRequestStream, getter_AddRefs(wrappedStream));
+            // Failure to throttle isn't sufficient reason to fail
+            // initialization
+            if (NS_SUCCEEDED(rv)) {
+                MOZ_ASSERT(wrappedStream != nullptr);
+                LOG(("nsHttpTransaction::Init %p wrapping input stream using throttle queue %p\n",
+                     this, queue));
+                mRequestStream = do_QueryInterface(wrappedStream);
+            }
+        }
+    }
+
     uint64_t size_u64;
     rv = mRequestStream->Available(&size_u64);
     if (NS_FAILED(rv)) {
         return rv;
     }
 
     // make sure it fits within js MAX_SAFE_INTEGER
     mRequestSize = InScriptableRange(size_u64) ? static_cast<int64_t>(size_u64) : -1;
new file mode 100644
--- /dev/null
+++ b/netwerk/test/unit/test_throttlechannel.js
@@ -0,0 +1,41 @@
+// Test nsIThrottledInputChannel interface.
+
+Cu.import("resource://testing-common/httpd.js");
+Cu.import("resource://gre/modules/NetUtil.jsm");
+
+function test_handler(metadata, response) {
+  const originalBody = "the response";
+  response.setHeader("Content-Type", "text/html", false);
+  response.setStatusLine(metadata.httpVersion, 200, "OK");
+  response.bodyOutputStream.write(originalBody, originalBody.length);
+}
+
+function make_channel(url) {
+  return NetUtil.newChannel({uri: url, loadUsingSystemPrincipal: true})
+                .QueryInterface(Components.interfaces.nsIHttpChannel);
+}
+
+function run_test() {
+  let httpserver = new HttpServer();
+  httpserver.start(-1);
+  const PORT = httpserver.identity.primaryPort;
+
+  httpserver.registerPathHandler("/testdir", test_handler);
+
+  let channel = make_channel("http://localhost:" + PORT + "/testdir");
+
+  let tq = Cc["@mozilla.org/network/throttlequeue;1"]
+      .createInstance(Ci.nsIInputChannelThrottleQueue);
+  tq.init(1000, 1000);
+
+  let tic = channel.QueryInterface(Ci.nsIThrottledInputChannel);
+  tic.throttleQueue = tq;
+
+  channel.asyncOpen2(new ChannelListener(() => {
+    ok(tq.bytesProcessed() > 0, "throttled queue processed some bytes");
+
+    httpserver.stop(do_test_finished);
+  }));
+
+  do_test_pending();
+}
new file mode 100644
--- /dev/null
+++ b/netwerk/test/unit/test_throttlequeue.js
@@ -0,0 +1,23 @@
+// Test ThrottleQueue initialization.
+
+function init(tq, mean, max) {
+  let threw = false;
+  try {
+    tq.init(mean, max);
+  } catch (e) {
+    threw = true;
+  }
+  return !threw;
+}
+
+function run_test() {
+  let tq = Cc["@mozilla.org/network/throttlequeue;1"]
+      .createInstance(Ci.nsIInputChannelThrottleQueue);
+
+  ok(!init(tq, 0, 50), "mean bytes cannot be 0");
+  ok(!init(tq, 50, 0), "max bytes cannot be 0");
+  ok(!init(tq, 0, 0), "mean and max bytes cannot be 0");
+  ok(!init(tq, 70, 20), "max cannot be less than mean");
+
+  ok(init(tq, 2, 2), "valid initialization");
+}
new file mode 100644
--- /dev/null
+++ b/netwerk/test/unit/test_throttling.js
@@ -0,0 +1,57 @@
+// Test nsIThrottledInputChannel interface.
+
+Cu.import("resource://testing-common/httpd.js");
+Cu.import("resource://gre/modules/NetUtil.jsm");
+
+function test_handler(metadata, response) {
+  const originalBody = "the response";
+  response.setHeader("Content-Type", "text/html", false);
+  response.setStatusLine(metadata.httpVersion, 200, "OK");
+  response.bodyOutputStream.write(originalBody, originalBody.length);
+}
+
+function make_channel(url) {
+  return NetUtil.newChannel({uri: url, loadUsingSystemPrincipal: true})
+                .QueryInterface(Ci.nsIHttpChannel);
+}
+
+function run_test() {
+  let httpserver = new HttpServer();
+  httpserver.registerPathHandler("/testdir", test_handler);
+  httpserver.start(-1);
+
+  const PORT = httpserver.identity.primaryPort;
+  const size = 4096;
+
+  let sstream = Cc["@mozilla.org/io/string-input-stream;1"].
+                  createInstance(Ci.nsIStringInputStream);
+  sstream.data = 'x'.repeat(size);
+
+  let mime = Cc["@mozilla.org/network/mime-input-stream;1"].
+               createInstance(Ci.nsIMIMEInputStream);
+  mime.addHeader("Content-Type", "multipart/form-data; boundary=zzzzz");
+  mime.setData(sstream);
+  mime.addContentLength = true;
+
+  let tq = Cc["@mozilla.org/network/throttlequeue;1"]
+      .createInstance(Ci.nsIInputChannelThrottleQueue);
+  // Make sure the request takes more than one read.
+  tq.init(100 + size / 2, 100 + size / 2);
+
+  let channel = make_channel("http://localhost:" + PORT + "/testdir");
+  channel.QueryInterface(Ci.nsIUploadChannel)
+         .setUploadStream(mime, "", mime.available());
+  channel.requestMethod = "POST";
+
+  let tic = channel.QueryInterface(Ci.nsIThrottledInputChannel);
+  tic.throttleQueue = tq;
+
+  let startTime = Date.now();
+  channel.asyncOpen2(new ChannelListener(() => {
+    ok(Date.now() - startTime > 1000, "request took more than one second");
+
+    httpserver.stop(do_test_finished);
+  }));
+
+  do_test_pending();
+}
--- a/netwerk/test/unit/xpcshell.ini
+++ b/netwerk/test/unit/xpcshell.ini
@@ -354,8 +354,11 @@ skip-if = os == "android"
 [test_bug1195415.js]
 [test_cookie_blacklist.js]
 [test_getHost.js]
 [test_packaged_app_bug1214079.js]
 [test_bug412457.js]
 [test_bug464591.js]
 [test_cache-control_request.js]
 [test_bug1279246.js]
+[test_throttlequeue.js]
+[test_throttlechannel.js]
+[test_throttling.js]