Bug 1255894: Part 4 - Implement StreamFilter IPC protocol. r?baku,mixedpuppy draft
authorKris Maglione <maglione.k@gmail.com>
Sun, 27 Aug 2017 15:19:58 -0700
changeset 653832 964a930f0bc4bd6afc37298c1cc21076713e87db
parent 653831 61b6b114bcb472b1f413cae657cc960eb7667c87
child 653833 431559c1a595ccc78884c1ab989fe3963744194f
push id76425
push usermaglione.k@gmail.com
push dateMon, 28 Aug 2017 04:09:59 +0000
reviewersbaku, mixedpuppy
bugs1255894
milestone57.0a1
Bug 1255894: Part 4 - Implement StreamFilter IPC protocol. r?baku,mixedpuppy This part implements the IPC state logic for the stream filters. Bill, can you please review the IPC and thread sanity, and Shane, the state logic? MozReview-Commit-ID: KrVOrdnuwC5
toolkit/components/extensions/webrequest/PStreamFilter.ipdl
toolkit/components/extensions/webrequest/StreamFilterBase.h
toolkit/components/extensions/webrequest/StreamFilterChild.cpp
toolkit/components/extensions/webrequest/StreamFilterChild.h
toolkit/components/extensions/webrequest/StreamFilterParent.cpp
toolkit/components/extensions/webrequest/StreamFilterParent.h
toolkit/components/extensions/webrequest/moz.build
--- a/toolkit/components/extensions/webrequest/PStreamFilter.ipdl
+++ b/toolkit/components/extensions/webrequest/PStreamFilter.ipdl
@@ -6,15 +6,36 @@ include protocol PBackground;
 
 namespace mozilla {
 namespace extensions {
 
 protocol PStreamFilter
 {
   manager PBackground;
 
+parent:
+  async Write(uint8_t[] data);
+
+  async FlushedData();
+
+  async Suspend();
+  async Resume();
+  async Close();
+  async Disconnect();
+
 child:
+  async Initialized(bool aSuccess);
+  async Resumed();
+  async Suspended();
+  async Closed();
+
+  async FlushData();
+
+  async StartRequest();
+  async Data(uint8_t[] data);
+  async StopRequest(nsresult aStatus);
+
   async __delete__();
 };
 
 } // namespace extensions
 } // namespace mozilla
 
new file mode 100644
--- /dev/null
+++ b/toolkit/components/extensions/webrequest/StreamFilterBase.h
@@ -0,0 +1,40 @@
+/* -*- Mode: C++; tab-width: 8; indent-tabs-mode: nil; c-basic-offset: 4 -*- */
+/* vim: set ts=8 sts=2 et sw=2 tw=80: */
+/* This Source Code Form is subject to the terms of the Mozilla Public
+ * License, v. 2.0. If a copy of the MPL was not distributed with this file,
+ * You can obtain one at http://mozilla.org/MPL/2.0/. */
+
+#ifndef mozilla_extensions_StreamFilterBase_h
+#define mozilla_extensions_StreamFilterBase_h
+
+#include "mozilla/LinkedList.h"
+#include "nsTArray.h"
+
+namespace mozilla {
+namespace extensions {
+
+class StreamFilterBase
+{
+public:
+  typedef nsTArray<uint8_t> Data;
+
+protected:
+  class BufferedData : public LinkedListElement<BufferedData> {
+  public:
+    explicit BufferedData(Data&& aData) : mData(Move(aData)) {}
+
+    Data mData;
+  };
+
+  LinkedList<BufferedData> mBufferedData;
+
+  inline void
+  BufferData(Data&& aData) {
+    mBufferedData.insertBack(new BufferedData(Move(aData)));
+  };
+};
+
+} // namespace extensions
+} // namespace mozilla
+
+#endif // mozilla_extensions_StreamFilterBase_h
--- a/toolkit/components/extensions/webrequest/StreamFilterChild.cpp
+++ b/toolkit/components/extensions/webrequest/StreamFilterChild.cpp
@@ -1,18 +1,431 @@
 /* -*- Mode: C++; tab-width: 8; indent-tabs-mode: nil; c-basic-offset: 4 -*- */
 /* vim: set ts=8 sts=2 et sw=2 tw=80: */
 /* This Source Code Form is subject to the terms of the Mozilla Public
  * License, v. 2.0. If a copy of the MPL was not distributed with this
  * file, You can obtain one at http://mozilla.org/MPL/2.0/. */
 
 #include "StreamFilterChild.h"
 
+#include "mozilla/Assertions.h"
+#include "mozilla/UniquePtr.h"
+
 namespace mozilla {
 namespace extensions {
 
+using mozilla::ipc::IPCResult;
+
+/*****************************************************************************
+ * Initialization and cleanup
+ *****************************************************************************/
+
+void
+StreamFilterChild::Cleanup()
+{
+  switch (mState) {
+  case State::Closing:
+  case State::Closed:
+  case State::Error:
+  case State::Disconnecting:
+  case State::Disconnected:
+    break;
+
+  default:
+    ErrorResult rv;
+    Disconnect(rv);
+    break;
+  }
+}
+
+/*****************************************************************************
+ * State change methods
+ *****************************************************************************/
+
+void
+StreamFilterChild::Suspend(ErrorResult& aRv)
+{
+  switch (mState) {
+  case State::TransferringData:
+    mState = State::Suspending;
+    mNextState = State::Suspended;
+
+    SendSuspend();
+    break;
+
+  case State::Suspending:
+    switch (mNextState) {
+    case State::Suspended:
+    case State::Resuming:
+      mNextState = State::Suspended;
+      break;
+
+    default:
+      aRv.Throw(NS_ERROR_FAILURE);
+      return;
+    }
+    break;
+
+  case State::Resuming:
+    switch (mNextState) {
+    case State::TransferringData:
+    case State::Suspending:
+      mNextState = State::Suspending;
+      break;
+
+    default:
+      aRv.Throw(NS_ERROR_FAILURE);
+      return;
+    }
+    break;
+
+  case State::Suspended:
+    break;
+
+  default:
+    aRv.Throw(NS_ERROR_FAILURE);
+    break;
+  }
+}
+
+void
+StreamFilterChild::Resume(ErrorResult& aRv)
+{
+  switch (mState) {
+  case State::Suspended:
+    mState = State::Resuming;
+    mNextState = State::TransferringData;
+
+    SendResume();
+    break;
+
+  case State::Suspending:
+    switch (mNextState) {
+    case State::Suspended:
+    case State::Resuming:
+      mNextState = State::Resuming;
+      break;
+
+    default:
+      aRv.Throw(NS_ERROR_FAILURE);
+      return;
+    }
+    break;
+
+  case State::Resuming:
+  case State::TransferringData:
+    break;
+
+  default:
+    aRv.Throw(NS_ERROR_FAILURE);
+    return;
+  }
+
+  FlushBufferedData();
+}
+
+void
+StreamFilterChild::Disconnect(ErrorResult& aRv)
+{
+  switch (mState) {
+  case State::Suspended:
+  case State::TransferringData:
+  case State::FinishedTransferringData:
+    mState = State::Disconnecting;
+    mNextState = State::Disconnected;
+
+    SendDisconnect();
+    break;
+
+  case State::Suspending:
+  case State::Resuming:
+    switch (mNextState) {
+    case State::Suspended:
+    case State::Resuming:
+    case State::Disconnecting:
+      mNextState = State::Disconnecting;
+      break;
+
+    default:
+      aRv.Throw(NS_ERROR_FAILURE);
+      return;
+    }
+    break;
+
+  case State::Disconnecting:
+  case State::Disconnected:
+    break;
+
+  default:
+    aRv.Throw(NS_ERROR_FAILURE);
+    return;
+  }
+}
+
+void
+StreamFilterChild::Close(ErrorResult& aRv)
+{
+  switch (mState) {
+  case State::Suspended:
+  case State::TransferringData:
+  case State::FinishedTransferringData:
+    mState = State::Closing;
+    mNextState = State::Closed;
+
+    SendClose();
+    break;
+
+  case State::Suspending:
+  case State::Resuming:
+    mNextState = State::Closing;
+    break;
+
+  case State::Closing:
+    MOZ_DIAGNOSTIC_ASSERT(mNextState == State::Closed);
+    break;
+
+  case State::Closed:
+    break;
+
+  default:
+    aRv.Throw(NS_ERROR_FAILURE);
+    return;
+  }
+
+  mBufferedData.clear();
+}
+
+/*****************************************************************************
+ * Internal state management
+ *****************************************************************************/
+
+void
+StreamFilterChild::SetNextState()
+{
+  mState = mNextState;
+
+  switch (mNextState) {
+  case State::Suspending:
+    mNextState = State::Suspended;
+    SendSuspend();
+    break;
+
+  case State::Resuming:
+    mNextState = State::TransferringData;
+    SendResume();
+    break;
+
+  case State::Closing:
+    mNextState = State::Closed;
+    SendClose();
+    break;
+
+  case State::Disconnecting:
+    mNextState = State::Disconnected;
+    SendDisconnect();
+    break;
+
+  case State::FinishedTransferringData:
+    break;
+
+  case State::TransferringData:
+    FlushBufferedData();
+    break;
+
+  default:
+    break;
+  }
+}
+
+void
+StreamFilterChild::MaybeStopRequest()
+{
+  if (!mReceivedOnStop || !mBufferedData.isEmpty()) {
+    return;
+  }
+
+  switch (mState) {
+  case State::Suspending:
+  case State::Resuming:
+    mNextState = State::FinishedTransferringData;
+    return;
+
+  default:
+    mState = State::FinishedTransferringData;
+    break;
+  }
+}
+
+/*****************************************************************************
+ * State change acknowledgment callbacks
+ *****************************************************************************/
+
+IPCResult
+StreamFilterChild::RecvInitialized(const bool& aSuccess)
+{
+  MOZ_ASSERT(mState == State::Uninitialized);
+
+  if (aSuccess) {
+    mState = State::Initialized;
+  } else {
+    mState = State::Error;
+  }
+  return IPC_OK();
+}
+
+IPCResult
+StreamFilterChild::RecvClosed() {
+  MOZ_DIAGNOSTIC_ASSERT(mState == State::Closing);
+
+  SetNextState();
+  return IPC_OK();
+}
+
+IPCResult
+StreamFilterChild::RecvSuspended() {
+  MOZ_DIAGNOSTIC_ASSERT(mState == State::Suspending);
+
+  SetNextState();
+  return IPC_OK();
+}
+
+IPCResult
+StreamFilterChild::RecvResumed() {
+  MOZ_DIAGNOSTIC_ASSERT(mState == State::Resuming);
+
+  SetNextState();
+  return IPC_OK();
+}
+
+IPCResult
+StreamFilterChild::RecvFlushData() {
+  MOZ_DIAGNOSTIC_ASSERT(mState == State::Disconnecting);
+
+  SendFlushedData();
+  SetNextState();
+  return IPC_OK();
+}
+
+/*****************************************************************************
+ * Other binding methods
+ *****************************************************************************/
+
+void
+StreamFilterChild::Write(Data&& aData, ErrorResult& aRv)
+{
+  switch (mState) {
+  case State::Suspending:
+  case State::Resuming:
+    switch (mNextState) {
+    case State::Suspended:
+    case State::TransferringData:
+      break;
+
+    default:
+      aRv.Throw(NS_ERROR_FAILURE);
+      return;
+    }
+    break;
+
+  case State::Suspended:
+  case State::TransferringData:
+  case State::FinishedTransferringData:
+    break;
+
+  default:
+    aRv.Throw(NS_ERROR_FAILURE);
+    return;
+  }
+
+  SendWrite(Move(aData));
+}
+
+/*****************************************************************************
+ * Request state notifications
+ *****************************************************************************/
+
+IPCResult
+StreamFilterChild::RecvStartRequest()
+{
+  MOZ_ASSERT(mState == State::Initialized);
+
+  mState = State::TransferringData;
+
+  return IPC_OK();
+}
+
+IPCResult
+StreamFilterChild::RecvStopRequest(const nsresult& aStatus)
+{
+  mReceivedOnStop = true;
+  MaybeStopRequest();
+  return IPC_OK();
+}
+
+/*****************************************************************************
+ * Incoming request data handling
+ *****************************************************************************/
+
+void
+StreamFilterChild::EmitData(const Data& aData)
+{
+  MOZ_ASSERT(CanFlushData());
+
+  MaybeStopRequest();
+}
+
+void
+StreamFilterChild::FlushBufferedData()
+{
+  while (!mBufferedData.isEmpty() && CanFlushData()) {
+    UniquePtr<BufferedData> data(mBufferedData.popFirst());
+
+    EmitData(data->mData);
+  }
+}
+
+IPCResult
+StreamFilterChild::RecvData(Data&& aData)
+{
+  MOZ_ASSERT(!mReceivedOnStop);
+
+  switch (mState) {
+  case State::TransferringData:
+  case State::Resuming:
+    EmitData(aData);
+    break;
+
+  case State::FinishedTransferringData:
+    MOZ_ASSERT_UNREACHABLE("Received data in unexpected state");
+    EmitData(aData);
+    break;
+
+  case State::Suspending:
+  case State::Suspended:
+    BufferData(Move(aData));
+    break;
+
+  case State::Disconnecting:
+    SendWrite(Move(aData));
+    break;
+
+  case State::Closing:
+    break;
+
+  default:
+    MOZ_ASSERT_UNREACHABLE("Received data in unexpected state");
+    return IPC_FAIL_NO_REASON(this);
+  }
+
+  return IPC_OK();
+}
+
+/*****************************************************************************
+ * Glue
+ *****************************************************************************/
+
 void
 StreamFilterChild::ActorDestroy(ActorDestroyReason aWhy)
 {
 }
 
 } // namespace extensions
 } // namespace mozilla
--- a/toolkit/components/extensions/webrequest/StreamFilterChild.h
+++ b/toolkit/components/extensions/webrequest/StreamFilterChild.h
@@ -2,36 +2,129 @@
 /* vim: set ts=8 sts=2 et sw=2 tw=80: */
 /* This Source Code Form is subject to the terms of the Mozilla Public
  * License, v. 2.0. If a copy of the MPL was not distributed with this file,
  * You can obtain one at http://mozilla.org/MPL/2.0/. */
 
 #ifndef mozilla_extensions_StreamFilterChild_h
 #define mozilla_extensions_StreamFilterChild_h
 
+#include "StreamFilterBase.h"
 #include "mozilla/extensions/PStreamFilterChild.h"
+
+#include "mozilla/ErrorResult.h"
+#include "mozilla/LinkedList.h"
 #include "nsISupportsImpl.h"
 
 namespace mozilla {
 namespace extensions {
 
 using mozilla::ipc::IPCResult;
 
+class StreamFilter;
 class StreamFilterChild final : public PStreamFilterChild
+                              , public StreamFilterBase
 {
 public:
   NS_INLINE_DECL_REFCOUNTING(StreamFilterChild)
 
-  StreamFilterChild() {}
+  StreamFilterChild()
+    : mState(State::Uninitialized)
+    , mReceivedOnStop(false)
+  {}
+
+  enum class State {
+    // Uninitialized, waiting for constructor response from parent.
+    Uninitialized,
+    // Initialized, but channel has not begun transferring data.
+    Initialized,
+    // The stream's OnStartRequest event has been dispatched, and the channel is
+    // transferring data.
+    TransferringData,
+    // The channel's OnStopRequest event has been dispatched, and the channel is
+    // no longer transferring data. Data may still be written to the output
+    // stream listener.
+    FinishedTransferringData,
+    // The channel is being suspended, and we're waiting for confirmation of
+    // suspension from the parent.
+    Suspending,
+    // The channel has been suspended in the parent. Data may still be written
+    // to the output stream listener in this state.
+    Suspended,
+    // The channel is suspended. Resume has been called, and we are waiting for
+    // confirmation of resumption from the parent.
+    Resuming,
+    // The close() method has been called, and no further output may be written.
+    // We are waiting for confirmation from the parent.
+    Closing,
+    // The close() method has been called, and we have been disconnected from
+    // our parent.
+    Closed,
+    // The channel is being disconnected from the parent, and all further events
+    // and data will pass unfiltered. Data received by the child in this state
+    // will be automatically written ot the output stream listener. No data may
+    // be explicitly written.
+    Disconnecting,
+    // The channel has been disconnected from the parent, and all further data
+    // and events will be transparently passed to the output stream listener
+    // without passing through the child.
+    Disconnected,
+    // An error has occurred and the child is disconnected from the parent.
+    Error,
+  };
+
+  void Suspend(ErrorResult& aRv);
+  void Resume(ErrorResult& aRv);
+  void Disconnect(ErrorResult& aRv);
+  void Close(ErrorResult& aRv);
+  void Cleanup();
+
+  void Write(Data&& aData, ErrorResult& aRv);
+
+  State GetState() const
+  {
+    return mState;
+  }
 
 protected:
+  virtual IPCResult RecvInitialized(const bool& aSuccess) override;
+
+  virtual IPCResult RecvStartRequest() override;
+  virtual IPCResult RecvData(Data&& data) override;
+  virtual IPCResult RecvStopRequest(const nsresult& aStatus) override;
+
+  virtual IPCResult RecvClosed() override;
+  virtual IPCResult RecvSuspended() override;
+  virtual IPCResult RecvResumed() override;
+  virtual IPCResult RecvFlushData() override;
+
   virtual IPCResult Recv__delete__() override { return IPC_OK(); }
 
 private:
   ~StreamFilterChild() {}
 
+  void SetNextState();
+
+  void MaybeStopRequest();
+
+  void EmitData(const Data& aData);
+
+  bool
+  CanFlushData()
+  {
+    return (mState == State::TransferringData ||
+            mState == State::Resuming);
+  }
+
+  void FlushBufferedData();
+
   virtual void ActorDestroy(ActorDestroyReason aWhy) override;
+
+
+  State mState;
+  State mNextState;
+  bool mReceivedOnStop;
 };
 
 } // namespace extensions
 } // namespace mozilla
 
 #endif // mozilla_extensions_StreamFilterChild_h
--- a/toolkit/components/extensions/webrequest/StreamFilterParent.cpp
+++ b/toolkit/components/extensions/webrequest/StreamFilterParent.cpp
@@ -1,44 +1,291 @@
 /* -*- Mode: C++; tab-width: 8; indent-tabs-mode: nil; c-basic-offset: 4 -*- */
 /* vim: set ts=8 sts=2 et sw=2 tw=80: */
 /* This Source Code Form is subject to the terms of the Mozilla Public
  * License, v. 2.0. If a copy of the MPL was not distributed with this
  * file, You can obtain one at http://mozilla.org/MPL/2.0/. */
 
 #include "StreamFilterParent.h"
 
+#include "mozilla/ScopeExit.h"
 #include "mozilla/Unused.h"
 #include "mozilla/dom/nsIContentParent.h"
+#include "nsIChannel.h"
+#include "nsITraceableChannel.h"
 #include "nsProxyRelease.h"
+#include "nsStringStream.h"
 
 namespace mozilla {
 namespace extensions {
 
-using namespace mozilla::dom;
+/*****************************************************************************
+ * Initialization
+ *****************************************************************************/
 
 StreamFilterParent::StreamFilterParent(uint64_t aChannelId, const nsAString& aAddonId)
   : mChannelId(aChannelId)
   , mAddonId(NS_Atomize(aAddonId))
+  , mPBackgroundThread(NS_GetCurrentThread())
+  , mIOThread(do_GetMainThread())
+  , mBufferMutex("StreamFilter buffer mutex")
+  , mReceivedStop(false)
+  , mSentStop(false)
+  , mState(State::Uninitialized)
 {
 }
 
 StreamFilterParent::~StreamFilterParent()
 {
 }
 
 void
 StreamFilterParent::Init(already_AddRefed<nsIContentParent> aContentParent)
 {
+  AssertIsPBackgroundThread();
+
   nsCOMPtr<nsIContentParent> contentParent = aContentParent;
-  NS_ReleaseOnMainThread(contentParent.forget());
+
+  SystemGroup::Dispatch(
+    TaskCategory::Network,
+    NewRunnableMethod<nsCOMPtr<nsIContentParent>&&>(
+        "StreamFilterParent::DoInit",
+        this, &StreamFilterParent::DoInit, contentParent));
+}
+
+void
+StreamFilterParent::DoInit(nsCOMPtr<nsIContentParent>&& aContentParent)
+{
+  AssertIsMainThread();
+
+  bool success = false;
+  auto guard = MakeScopeExit([&] {
+    RefPtr<StreamFilterParent> self(this);
+
+    RunOnPBackgroundThread(FUNC, [=] {
+      if (self->IPCActive()) {
+        self->mState = State::Initialized;
+        self->CheckResult(self->SendInitialized(success));
+      }
+    });
+  });
+
+  auto webreq = WebRequestService::GetInstance();
+
+  mChannel = webreq->GetTraceableChannel(mChannelId, mAddonId, aContentParent);
+  if (NS_WARN_IF(!mChannel)) {
+    return;
+  }
+
+  nsCOMPtr<nsITraceableChannel> traceable = do_QueryInterface(mChannel);
+  MOZ_RELEASE_ASSERT(traceable);
+}
+
+/*****************************************************************************
+ * Error handling
+ *****************************************************************************/
+
+void
+StreamFilterParent::Broken()
+{
+  AssertIsPBackgroundThread();
+
+  mState = State::Disconnecting;
+
+  RefPtr<StreamFilterParent> self(this);
+  RunOnIOThread(FUNC, [=] {
+    self->FlushBufferedData();
+
+    RunOnPBackgroundThread(FUNC, [=] {
+      if (self->IPCActive()) {
+        self->mState = State::Disconnected;
+      }
+    });
+  });
+}
+
+/*****************************************************************************
+ * State change requests
+ *****************************************************************************/
+
+IPCResult
+StreamFilterParent::RecvClose()
+{
+  AssertIsPBackgroundThread();
+
+  mState = State::Closed;
+
+  Unused << Send__delete__(this);
+  return IPC_OK();
+}
+
+IPCResult
+StreamFilterParent::RecvSuspend()
+{
+  AssertIsPBackgroundThread();
+
+  if (mState == State::TransferringData) {
+    RefPtr<StreamFilterParent> self(this);
+    RunOnMainThread(FUNC, [=] {
+      self->mChannel->Suspend();
+
+      RunOnPBackgroundThread(FUNC, [=] {
+        if (IPCActive()) {
+          mState = State::Suspended;
+          CheckResult(SendSuspended());
+        }
+      });
+    });
+  }
+  return IPC_OK();
 }
 
+IPCResult
+StreamFilterParent::RecvResume()
+{
+  AssertIsPBackgroundThread();
+
+  if (mState == State::Suspended) {
+    // Change state before resuming so incoming data is handled correctly
+    // immediately after resuming.
+    mState = State::TransferringData;
+
+    RefPtr<StreamFilterParent> self(this);
+    RunOnMainThread(FUNC, [=] {
+      self->mChannel->Resume();
+
+      RunOnPBackgroundThread(FUNC, [=] {
+        if (self->IPCActive()) {
+          self->CheckResult(self->SendResumed());
+        }
+      });
+    });
+  }
+  return IPC_OK();
+}
+
+IPCResult
+StreamFilterParent::RecvDisconnect()
+{
+  AssertIsPBackgroundThread();
+
+  if (mState == State::Suspended) {
+  RefPtr<StreamFilterParent> self(this);
+    RunOnMainThread(FUNC, [=] {
+      self->mChannel->Resume();
+    });
+  } else if (mState != State::TransferringData) {
+    return IPC_OK();
+  }
+
+  mState = State::Disconnecting;
+  CheckResult(SendFlushData());
+  return IPC_OK();
+}
+
+IPCResult
+StreamFilterParent::RecvFlushedData()
+{
+  AssertIsPBackgroundThread();
+
+  MOZ_ASSERT(mState == State::Disconnecting);
+
+  Unused << Send__delete__(this);
+
+  RefPtr<StreamFilterParent> self(this);
+  RunOnIOThread(FUNC, [=] {
+    self->FlushBufferedData();
+
+    RunOnPBackgroundThread(FUNC, [=] {
+      self->mState = State::Disconnected;
+    });
+  });
+  return IPC_OK();
+}
+
+/*****************************************************************************
+ * Data output
+ *****************************************************************************/
+
+IPCResult
+StreamFilterParent::RecvWrite(Data&& aData)
+{
+  AssertIsPBackgroundThread();
+
+  mIOThread->Dispatch(
+    NewRunnableMethod<Data&&>("StreamFilterParent::WriteMove",
+                              this,
+                              &StreamFilterParent::WriteMove,
+                              Move(aData)),
+    NS_DISPATCH_NORMAL);
+  return IPC_OK();
+}
+
+void
+StreamFilterParent::WriteMove(Data&& aData)
+{
+  nsresult rv = Write(aData);
+  Unused << NS_WARN_IF(NS_FAILED(rv));
+}
+
+nsresult
+StreamFilterParent::Write(Data& aData)
+{
+  AssertIsIOThread();
+
+  return NS_OK;
+}
+
+/*****************************************************************************
+ * Incoming data handling
+ *****************************************************************************/
+
+void
+StreamFilterParent::DoSendData(Data&& aData)
+{
+  AssertIsPBackgroundThread();
+
+  if (mState == State::TransferringData) {
+  }
+}
+
+nsresult
+StreamFilterParent::FlushBufferedData()
+{
+  AssertIsIOThread();
+
+  // When offloading data to a thread pool, OnDataAvailable isn't guaranteed
+  // to always run in the same thread, so it's possible for this function to
+  // run in parallel with OnDataAvailable.
+  MutexAutoLock al(mBufferMutex);
+
+  while (!mBufferedData.isEmpty()) {
+    UniquePtr<BufferedData> data(mBufferedData.popFirst());
+
+    nsresult rv = Write(data->mData);
+    NS_ENSURE_SUCCESS(rv, rv);
+  }
+
+  if (mReceivedStop && !mSentStop) {
+  }
+
+  return NS_OK;
+}
+
+/*****************************************************************************
+ * Glue
+ *****************************************************************************/
+
 void
 StreamFilterParent::ActorDestroy(ActorDestroyReason aWhy)
 {
+  AssertIsPBackgroundThread();
+
+  if (mState != State::Disconnected && mState != State::Closed) {
+    Broken();
+  }
 }
 
 NS_IMPL_ISUPPORTS0(StreamFilterParent)
 
 } // namespace extensions
 } // namespace mozilla
 
--- a/toolkit/components/extensions/webrequest/StreamFilterParent.h
+++ b/toolkit/components/extensions/webrequest/StreamFilterParent.h
@@ -2,51 +2,178 @@
 /* vim: set ts=8 sts=2 et sw=2 tw=80: */
 /* This Source Code Form is subject to the terms of the Mozilla Public
  * License, v. 2.0. If a copy of the MPL was not distributed with this file,
  * You can obtain one at http://mozilla.org/MPL/2.0/. */
 
 #ifndef mozilla_extensions_StreamFilterParent_h
 #define mozilla_extensions_StreamFilterParent_h
 
+#include "StreamFilterBase.h"
 #include "mozilla/extensions/PStreamFilterParent.h"
 
+#include "mozilla/LinkedList.h"
+#include "mozilla/Mutex.h"
+#include "mozilla/SystemGroup.h"
+#include "mozilla/WebRequestService.h"
+#include "nsIThread.h"
+#include "nsThreadUtils.h"
+
+#if defined(_MSC_VER)
+#  define FUNC __FUNCSIG__
+#else
+#  define FUNC __PRETTY_FUNCTION__
+#endif
+
 namespace mozilla {
 namespace dom {
   class nsIContentParent;
 }
 
 namespace extensions {
 
 using namespace mozilla::dom;
 using mozilla::ipc::IPCResult;
 
-class StreamFilterParent final : public PStreamFilterParent
-                               , public nsISupports
+class StreamFilterParent final
+  : public PStreamFilterParent
+  , public nsISupports
+  , public StreamFilterBase
 {
 public:
   NS_DECL_THREADSAFE_ISUPPORTS
 
   explicit StreamFilterParent(uint64_t aChannelId, const nsAString& aAddonId);
 
+  enum class State
+  {
+    // The parent has been created, but not yet constructed by the child.
+    Uninitialized,
+    // The parent has been successfully constructed.
+    Initialized,
+    // The OnRequestStarted event has been received, and data is being
+    // transferred to the child.
+    TransferringData,
+    // The channel is suspended.
+    Suspended,
+    // The channel has been closed by the child, and will send or receive data.
+    Closed,
+    // The channel is being disconnected from the child, so that all further
+    // data and events pass unfiltered to the output listener. Any data
+    // currnetly in transit to, or buffered by, the child will be written to the
+    // output listener before we enter the Disconnected atate.
+    Disconnecting,
+    // The channel has been disconnected from the child, and all further data
+    // and events will be passed directly to the output listener.
+    Disconnected,
+  };
+
   static already_AddRefed<StreamFilterParent>
   Create(uint64_t aChannelId, const nsAString& aAddonId)
   {
     RefPtr<StreamFilterParent> filter = new StreamFilterParent(aChannelId, aAddonId);
     return filter.forget();
   }
 
   void Init(already_AddRefed<nsIContentParent> aContentParent);
 
 protected:
   virtual ~StreamFilterParent();
 
+  virtual IPCResult RecvWrite(Data&& aData) override;
+  virtual IPCResult RecvFlushedData() override;
+  virtual IPCResult RecvSuspend() override;
+  virtual IPCResult RecvResume() override;
+  virtual IPCResult RecvClose() override;
+  virtual IPCResult RecvDisconnect() override;
+
 private:
+  bool IPCActive()
+  {
+    return (mState != State::Closed &&
+            mState != State::Disconnecting &&
+            mState != State::Disconnected);
+  }
+
+  void DoInit(nsCOMPtr<nsIContentParent>&& aContentParent);
+
+  nsresult FlushBufferedData();
+
+  nsresult Write(Data& aData);
+
+  void WriteMove(Data&& aData);
+
+  void DoSendData(Data&& aData);
+
   virtual void ActorDestroy(ActorDestroyReason aWhy) override;
 
+  void Broken();
+
+  void
+  CheckResult(bool aResult)
+  {
+    if (NS_WARN_IF(!aResult)) {
+      Broken();
+    }
+  }
+
+  void
+  AssertIsPBackgroundThread()
+  {
+    MOZ_ASSERT(NS_GetCurrentThread() == mPBackgroundThread);
+  }
+
+  void
+  AssertIsIOThread()
+  {
+    MOZ_ASSERT(NS_GetCurrentThread() == mIOThread);
+  }
+
+  void
+  AssertIsMainThread()
+  {
+    MOZ_ASSERT(NS_IsMainThread());
+  }
+
+  template<typename Function>
+  void
+  RunOnMainThread(const char* aName, Function&& aFunc)
+  {
+    SystemGroup::Dispatch(TaskCategory::Network,
+                          Move(NS_NewRunnableFunction(aName, aFunc)));
+  }
+
+  template<typename Function>
+  void
+  RunOnPBackgroundThread(const char* aName, Function&& aFunc)
+  {
+    mPBackgroundThread->Dispatch(Move(NS_NewRunnableFunction(aName, aFunc)),
+                                 NS_DISPATCH_NORMAL);
+  }
+
+  template<typename Function>
+  void
+  RunOnIOThread(const char* aName, Function&& aFunc)
+  {
+    mIOThread->Dispatch(Move(NS_NewRunnableFunction(aName, aFunc)),
+                        NS_DISPATCH_NORMAL);
+  }
+
   const uint64_t mChannelId;
   const nsCOMPtr<nsIAtom> mAddonId;
+
+  nsCOMPtr<nsIChannel> mChannel;
+
+  nsCOMPtr<nsIThread> mPBackgroundThread;
+  nsCOMPtr<nsIThread> mIOThread;
+
+  Mutex mBufferMutex;
+
+  bool mReceivedStop;
+  bool mSentStop;
+
+  volatile State mState;
 };
 
 } // namespace extensions
 } // namespace mozilla
 
 #endif // mozilla_extensions_StreamFilterParent_h
--- a/toolkit/components/extensions/webrequest/moz.build
+++ b/toolkit/components/extensions/webrequest/moz.build
@@ -26,16 +26,17 @@ EXPORTS += [
     'nsWebRequestListener.h',
 ]
 
 EXPORTS.mozilla += [
     'WebRequestService.h',
 ]
 
 EXPORTS.mozilla.extensions += [
+    'StreamFilterBase.h',
     'StreamFilterChild.h',
     'StreamFilterParent.h',
 ]
 
 include('/ipc/chromium/chromium-config.mozbuild')
 
 FINAL_LIBRARY = 'xul'