Bug 1255894: Part 11 - Move StreamFilterParent to STS thread. r?dragana
MozReview-Commit-ID: L5aPENDjVB3
--- a/toolkit/components/extensions/webrequest/StreamFilterParent.cpp
+++ b/toolkit/components/extensions/webrequest/StreamFilterParent.cpp
@@ -11,28 +11,28 @@
#include "mozilla/dom/ContentParent.h"
#include "nsHttpChannel.h"
#include "nsIChannel.h"
#include "nsIHttpChannelInternal.h"
#include "nsIInputStream.h"
#include "nsITraceableChannel.h"
#include "nsProxyRelease.h"
#include "nsQueryObject.h"
+#include "nsSocketTransportService2.h"
#include "nsStringStream.h"
namespace mozilla {
namespace extensions {
/*****************************************************************************
* Initialization
*****************************************************************************/
StreamFilterParent::StreamFilterParent()
- : mActorThread(NS_GetCurrentThread())
- , mIOThread(do_GetMainThread())
+ : mIOThread(do_GetMainThread())
, mBufferMutex("StreamFilter buffer mutex")
, mReceivedStop(false)
, mSentStop(false)
, mContext(nullptr)
, mOffset(0)
, mState(State::Uninitialized)
{
}
@@ -71,29 +71,40 @@ StreamFilterParent::Create(dom::ContentP
return false;
}
aEndpoint = Move(child);
return true;
}
/* static */ void
-StreamFilterParent::Attach(nsIChannel* aChannel, mozilla::ipc::Endpoint<PStreamFilterParent>&& aEndpoint)
+StreamFilterParent::Attach(nsIChannel* aChannel, ParentEndpoint&& aEndpoint)
{
auto self = MakeRefPtr<StreamFilterParent>();
- if (!aEndpoint.Bind(self)) {
- MOZ_CRASH("Failed to attach StreamFilter endpoint");
- }
+
+ self->ActorThread()->Dispatch(
+ NewRunnableMethod<ParentEndpoint&&>("StreamFilterParent::Bind",
+ self,
+ &StreamFilterParent::Bind,
+ Move(aEndpoint)),
+ NS_DISPATCH_NORMAL);
self->Init(aChannel);
+ // IPC owns this reference now.
Unused << self.forget();
}
void
+StreamFilterParent::Bind(ParentEndpoint&& aEndpoint)
+{
+ aEndpoint.Bind(this);
+}
+
+void
StreamFilterParent::Init(nsIChannel* aChannel)
{
mChannel = aChannel;
nsCOMPtr<nsITraceableChannel> traceable = do_QueryInterface(aChannel);
MOZ_RELEASE_ASSERT(traceable);
nsresult rv = traceable->SetNewListener(this, getter_AddRefs(mOrigListener));
@@ -154,20 +165,32 @@ StreamFilterParent::RecvClose()
if (!mSentStop) {
RefPtr<StreamFilterParent> self(this);
RunOnMainThread(FUNC, [=] {
nsresult rv = self->EmitStopRequest(NS_OK);
Unused << NS_WARN_IF(NS_FAILED(rv));
});
}
- Close();
+ Destroy();
return IPC_OK();
}
+void
+StreamFilterParent::Destroy()
+{
+ // Close the channel asynchronously so the actor is never destroyed before
+ // this message is fully processed.
+ ActorThread()->Dispatch(
+ NewRunnableMethod("StreamFilterParent::Close",
+ this,
+ &StreamFilterParent::Close),
+ NS_DISPATCH_NORMAL);
+}
+
IPCResult
StreamFilterParent::RecvSuspend()
{
AssertIsActorThread();
if (mState == State::TransferringData) {
RefPtr<StreamFilterParent> self(this);
RunOnMainThread(FUNC, [=] {
@@ -229,17 +252,17 @@ StreamFilterParent::RecvDisconnect()
IPCResult
StreamFilterParent::RecvFlushedData()
{
AssertIsActorThread();
MOZ_ASSERT(mState == State::Disconnecting);
- Close();
+ Destroy();
RefPtr<StreamFilterParent> self(this);
RunOnIOThread(FUNC, [=] {
self->FlushBufferedData();
RunOnActorThread(FUNC, [=] {
self->mState = State::Disconnected;
});
@@ -251,22 +274,26 @@ StreamFilterParent::RecvFlushedData()
* Data output
*****************************************************************************/
IPCResult
StreamFilterParent::RecvWrite(Data&& aData)
{
AssertIsActorThread();
- mIOThread->Dispatch(
- NewRunnableMethod<Data&&>("StreamFilterParent::WriteMove",
- this,
- &StreamFilterParent::WriteMove,
- Move(aData)),
- NS_DISPATCH_NORMAL);
+ if (IsIOThread()) {
+ Write(aData);
+ } else {
+ IOThread()->Dispatch(
+ NewRunnableMethod<Data&&>("StreamFilterParent::WriteMove",
+ this,
+ &StreamFilterParent::WriteMove,
+ Move(aData)),
+ NS_DISPATCH_NORMAL);
+ }
return IPC_OK();
}
void
StreamFilterParent::WriteMove(Data&& aData)
{
nsresult rv = Write(aData);
Unused << NS_WARN_IF(NS_FAILED(rv));
@@ -364,17 +391,21 @@ NS_IMETHODIMP
StreamFilterParent::OnDataAvailable(nsIRequest* aRequest,
nsISupports* aContext,
nsIInputStream* aInputStream,
uint64_t aOffset,
uint32_t aCount)
{
// Note: No AssertIsIOThread here. Whatever thread we're on now is, by
// definition, the IO thread.
- mIOThread = NS_GetCurrentThread();
+ if (OnSocketThread()) {
+ mIOThread = nullptr;
+ } else {
+ mIOThread = NS_GetCurrentThread();
+ }
if (mState == State::Disconnected) {
// If we're offloading data in a thread pool, it's possible that we'll
// have buffered some additional data while waiting for the buffer to
// flush. So, if there's any buffered data left, flush that before we
// flush this incoming data.
//
// Note: When in the eDisconnected state, the buffer list is guaranteed
@@ -398,17 +429,17 @@ StreamFilterParent::OnDataAvailable(nsIR
NS_ENSURE_TRUE(count == aCount, NS_ERROR_UNEXPECTED);
if (mState == State::Disconnecting) {
MutexAutoLock al(mBufferMutex);
BufferData(Move(data));
} else if (mState == State::Closed) {
return NS_ERROR_FAILURE;
} else {
- mActorThread->Dispatch(
+ ActorThread()->Dispatch(
NewRunnableMethod<Data&&>("StreamFilterParent::DoSendData",
this,
&StreamFilterParent::DoSendData,
Move(data)),
NS_DISPATCH_NORMAL);
}
return NS_OK;
}
@@ -437,16 +468,79 @@ StreamFilterParent::FlushBufferedData()
Unused << NS_WARN_IF(NS_FAILED(rv));
});
}
return NS_OK;
}
/*****************************************************************************
+ * Thread helpers
+ *****************************************************************************/
+
+void
+StreamFilterParent::AssertIsActorThread()
+{
+ MOZ_ASSERT(OnSocketThread());
+}
+
+nsIEventTarget*
+StreamFilterParent::ActorThread()
+{
+ return gSocketTransportService;
+}
+
+nsIEventTarget*
+StreamFilterParent::IOThread()
+{
+ if (mIOThread) {
+ return mIOThread;
+ }
+ return gSocketTransportService;
+}
+
+bool
+StreamFilterParent::IsIOThread()
+{
+ return (mIOThread ? NS_GetCurrentThread() == mIOThread
+ : OnSocketThread());
+}
+
+void
+StreamFilterParent::AssertIsIOThread()
+{
+ MOZ_ASSERT(IsIOThread());
+}
+
+template<typename Function>
+void
+StreamFilterParent::RunOnActorThread(const char* aName, Function&& aFunc)
+{
+ if (OnSocketThread()) {
+ aFunc();
+ } else {
+ gSocketTransportService->Dispatch(
+ Move(NS_NewRunnableFunction(aName, aFunc)),
+ NS_DISPATCH_NORMAL);
+ }
+}
+
+template<typename Function>
+void
+StreamFilterParent::RunOnIOThread(const char* aName, Function&& aFunc)
+{
+ if (mIOThread) {
+ mIOThread->Dispatch(Move(NS_NewRunnableFunction(aName, aFunc)),
+ NS_DISPATCH_NORMAL);
+ } else {
+ RunOnActorThread(aName, Move(aFunc));
+ }
+}
+
+/*****************************************************************************
* Glue
*****************************************************************************/
void
StreamFilterParent::ActorDestroy(ActorDestroyReason aWhy)
{
AssertIsActorThread();
--- a/toolkit/components/extensions/webrequest/StreamFilterParent.h
+++ b/toolkit/components/extensions/webrequest/StreamFilterParent.h
@@ -47,21 +47,23 @@ class StreamFilterParent final
public:
NS_DECL_THREADSAFE_ISUPPORTS
NS_DECL_NSISTREAMLISTENER
NS_DECL_NSIREQUESTOBSERVER
NS_DECL_NSITHREADRETARGETABLESTREAMLISTENER
StreamFilterParent();
+ using ParentEndpoint = mozilla::ipc::Endpoint<PStreamFilterParent>;
+
static bool Create(ContentParent* aContentParent,
uint64_t aChannelId, const nsAString& aAddonId,
mozilla::ipc::Endpoint<PStreamFilterChild>& aEndpoint);
- static void Attach(nsIChannel* aChannel, mozilla::ipc::Endpoint<PStreamFilterParent>&& aEndpoint);
+ static void Attach(nsIChannel* aChannel, ParentEndpoint&& aEndpoint);
enum class State
{
// The parent has been created, but not yet constructed by the child.
Uninitialized,
// The parent has been successfully constructed.
Initialized,
// The OnRequestStarted event has been received, and data is being
@@ -98,16 +100,20 @@ private:
{
return (mState != State::Closed &&
mState != State::Disconnecting &&
mState != State::Disconnected);
}
void Init(nsIChannel* aChannel);
+ void Bind(ParentEndpoint&& aEndpoint);
+
+ void Destroy();
+
nsresult FlushBufferedData();
nsresult Write(Data& aData);
void WriteMove(Data&& aData);
void DoSendData(Data&& aData);
@@ -120,62 +126,49 @@ private:
void
CheckResult(bool aResult)
{
if (NS_WARN_IF(!aResult)) {
Broken();
}
}
- void
- AssertIsActorThread()
- {
- MOZ_ASSERT(NS_GetCurrentThread() == mActorThread);
- }
+ inline nsIEventTarget* ActorThread();
+
+ inline nsIEventTarget* IOThread();
- void
- AssertIsIOThread()
- {
- MOZ_ASSERT(NS_GetCurrentThread() == mIOThread);
- }
+ inline bool IsIOThread();
+
+ inline void AssertIsActorThread();
+
+ inline void AssertIsIOThread();
static void
AssertIsMainThread()
{
MOZ_ASSERT(NS_IsMainThread());
}
template<typename Function>
void
RunOnMainThread(const char* aName, Function&& aFunc)
{
SystemGroup::Dispatch(TaskCategory::Network,
Move(NS_NewRunnableFunction(aName, aFunc)));
}
template<typename Function>
- void
- RunOnActorThread(const char* aName, Function&& aFunc)
- {
- mActorThread->Dispatch(Move(NS_NewRunnableFunction(aName, aFunc)),
- NS_DISPATCH_NORMAL);
- }
+ void RunOnActorThread(const char* aName, Function&& aFunc);
template<typename Function>
- void
- RunOnIOThread(const char* aName, Function&& aFunc)
- {
- mIOThread->Dispatch(Move(NS_NewRunnableFunction(aName, aFunc)),
- NS_DISPATCH_NORMAL);
- }
+ void RunOnIOThread(const char* aName, Function&& aFunc);
nsCOMPtr<nsIChannel> mChannel;
nsCOMPtr<nsIStreamListener> mOrigListener;
- nsCOMPtr<nsIThread> mActorThread;
nsCOMPtr<nsIThread> mIOThread;
Mutex mBufferMutex;
bool mReceivedStop;
bool mSentStop;
nsCOMPtr<nsISupports> mContext;