Bug 1364857 - Reject pending promises for actor when it's going to be destroyed.
The lifetime of async IPDL returned promise may be longer than its actor.
That is, the handler (receiver) may have not resolve/reject the promise when the actor
is destroyed. In this case, we have to reject all the pending promises before
ActorDestroy() is called on the "sender" side.
Besides, the handler (receiver) can reject with reason "ActorDestroyed" to silently
cancel the promise without trying to reply to the remote actor which may
have died. The sender-side promise is responsible for rejecting the pending promises,
which will be done in MessageChannel::RejectPendingPromisesForActor().
MozReview-Commit-ID: 4XjmquZzDBO
--- a/ipc/glue/MessageChannel.cpp
+++ b/ipc/glue/MessageChannel.cpp
@@ -712,17 +712,19 @@ MessageChannel::Clear()
}
if (mWorkerLoop) {
mWorkerLoop->RemoveDestructionObserver(this);
}
gUnresolvedPromises -= mPendingPromises.size();
for (auto& pair : mPendingPromises) {
- pair.second.mRejectFunction(pair.second.mPromise, __func__);
+ pair.second.mRejectFunction(pair.second.mPromise,
+ PromiseRejectReason::ChannelClosed,
+ __func__);
}
mPendingPromises.clear();
mWorkerLoop = nullptr;
delete mLink;
mLink = nullptr;
mOnChannelConnectedTask->Cancel();
@@ -939,16 +941,36 @@ MessageChannel::PopPromise(const Message
PromiseHolder ret = iter->second;
mPendingPromises.erase(iter);
gUnresolvedPromises--;
return ret.mPromise.forget();
}
return nullptr;
}
+void
+MessageChannel::RejectPendingPromisesForActor(ActorIdType aActorId)
+{
+ auto itr = mPendingPromises.begin();
+ while (itr != mPendingPromises.end()) {
+ if (itr->second.mActorId != aActorId) {
+ ++itr;
+ continue;
+ }
+ auto& promise = itr->second.mPromise;
+ itr->second.mRejectFunction(promise,
+ PromiseRejectReason::ActorDestroyed,
+ __func__);
+ // Take special care of advancing the iterator since we are
+ // removing it while iterating.
+ itr = mPendingPromises.erase(itr);
+ gUnresolvedPromises--;
+ }
+}
+
class BuildIDMessage : public IPC::Message
{
public:
BuildIDMessage()
: IPC::Message(MSG_ROUTING_NONE, BUILD_ID_MESSAGE_TYPE)
{
}
void Log(const std::string& aPrefix, FILE* aOutf) const
--- a/ipc/glue/MessageChannel.h
+++ b/ipc/glue/MessageChannel.h
@@ -64,16 +64,17 @@ enum class SyncSendError {
TimedOut,
ReplyError,
};
enum class PromiseRejectReason {
SendError,
ChannelClosed,
HandlerRejected,
+ ActorDestroyed,
EndGuard_,
};
enum ChannelState {
ChannelClosed,
ChannelOpening,
ChannelConnected,
ChannelTimeout,
@@ -88,20 +89,32 @@ class MessageChannel : HasResultCodes, M
friend class ProcessLink;
friend class ThreadLink;
class CxxStackFrame;
class InterruptFrame;
typedef mozilla::Monitor Monitor;
+ // We could templatize the actor type but it would unnecessarily
+ // expand the code size. Using the actor address as the
+ // identifier is already good enough.
+ typedef void* ActorIdType;
+
struct PromiseHolder
{
RefPtr<MozPromiseRefcountable> mPromise;
- std::function<void(MozPromiseRefcountable*, const char*)> mRejectFunction;
+
+ // For rejecting and removing the pending promises when a
+ // subprotocol is destoryed.
+ ActorIdType mActorId;
+
+ std::function<void(MozPromiseRefcountable*,
+ PromiseRejectReason,
+ const char*)> mRejectFunction;
};
static Atomic<size_t> gUnresolvedPromises;
friend class PromiseReporter;
public:
static const int32_t kNoTimeout;
typedef IPC::Message Message;
@@ -171,28 +184,29 @@ class MessageChannel : HasResultCodes, M
ChannelFlags GetChannelFlags() { return mFlags; }
// Asynchronously send a message to the other side of the channel
bool Send(Message* aMsg);
// Asynchronously send a message to the other side of the channel
// and wait for asynchronous reply
template<typename Promise>
- bool Send(Message* aMsg, Promise* aPromise) {
+ bool Send(Message* aMsg, Promise* aPromise, ActorIdType aActorId) {
int32_t seqno = NextSeqno();
aMsg->set_seqno(seqno);
if (!Send(aMsg)) {
return false;
}
PromiseHolder holder;
holder.mPromise = aPromise;
+ holder.mActorId = aActorId;
holder.mRejectFunction = [](MozPromiseRefcountable* aRejectPromise,
+ PromiseRejectReason aReason,
const char* aRejectSite) {
- static_cast<Promise*>(aRejectPromise)->Reject(
- PromiseRejectReason::ChannelClosed, aRejectSite);
+ static_cast<Promise*>(aRejectPromise)->Reject(aReason, aRejectSite);
};
mPendingPromises.insert(std::make_pair(seqno, Move(holder)));
gUnresolvedPromises++;
return true;
}
void SendBuildID();
@@ -209,16 +223,20 @@ class MessageChannel : HasResultCodes, M
// Wait until a message is received
bool WaitForIncomingMessage();
bool CanSend() const;
// Remove and return a promise that needs reply
already_AddRefed<MozPromiseRefcountable> PopPromise(const Message& aMsg);
+ // Used to reject and remove pending promises owned by the given
+ // actor when it's about to be destroyed.
+ void RejectPendingPromisesForActor(ActorIdType aActorId);
+
// If sending a sync message returns an error, this function gives a more
// descriptive error message.
SyncSendError LastSendError() const {
AssertWorkerThread();
return mLastSendError;
}
// Currently only for debugging purposes, doesn't aquire mMonitor.
--- a/ipc/ipdl/ipdl/lower.py
+++ b/ipc/ipdl/ipdl/lower.py
@@ -509,16 +509,18 @@ class _DestroyReason:
class _PromiseRejectReason:
@staticmethod
def Type():
return Type('PromiseRejectReason')
SendError = ExprVar('PromiseRejectReason::SendError')
ChannelClosed = ExprVar('PromiseRejectReason::ChannelClosed')
HandlerRejected = ExprVar('PromiseRejectReason::HandlerRejected')
+ ActorDestroyed = ExprVar('PromiseRejectReason::ActorDestroyed')
+
##-----------------------------------------------------------------------------
## Intermediate representation (IR) nodes used during lowering
class _ConvertToCxxType(TypeVisitor):
def __init__(self, side, fq):
self.side = side
self.fq = fq
@@ -3045,16 +3047,28 @@ class _GenerateProtocolActorCode(ipdl.as
StmtExpr(ExprCall(p.managedMethod(managed, self.side),
args=[ kidsvar ])),
foreachdestroy,
])
destroysubtree.addstmt(block)
if len(ptype.manages):
destroysubtree.addstmt(Whitespace.NL)
+
+ # Reject pending promises for actor before calling ActorDestroy().
+ rejectPendingPromiseMethod = ExprSelect(self.protocol.callGetChannel(),
+ '->',
+ 'RejectPendingPromisesForActor')
+ destroysubtree.addstmts([ Whitespace('// Reject owning pending promises.\n',
+ indent=1),
+ StmtExpr(ExprCall(rejectPendingPromiseMethod,
+ args=[ ExprVar('this') ])),
+ Whitespace.NL
+ ])
+
destroysubtree.addstmts([ Whitespace('// Finally, destroy "us".\n',
indent=1),
StmtExpr(ExprCall(_destroyMethod(),
args=[ whyvar ]))
])
self.cls.addstmts([ destroysubtree, Whitespace.NL ])
@@ -4248,17 +4262,25 @@ class _GenerateProtocolActorCode(ipdl.as
+ [ self.checkedWrite(None, resolve, self.replyvar,
sentinelKey=resolve.name) ]
+ [ self.checkedWrite(r.ipdltype, r.var(), self.replyvar,
sentinelKey=r.name)
for r in md.returns ])
promisethen.addstmts(sendmsg)
promiserej = ExprLambda([ExprVar.THIS, routingId, seqno],
[Decl(_PromiseRejectReason.Type(), reason.name)])
- promiserej.addstmts([ StmtExpr(ExprCall(ExprVar('MOZ_ASSERT'),
+
+ # If-statement for silently cancelling the promise due to ActorDestroyed.
+ returnifactordestroyed = StmtIf(ExprBinary(reason, '==',
+ _PromiseRejectReason.ActorDestroyed))
+ returnifactordestroyed.addifstmts([_printWarningMessage("Reject due to ActorDestroyed"),
+ StmtReturn()])
+
+ promiserej.addstmts([ returnifactordestroyed,
+ StmtExpr(ExprCall(ExprVar('MOZ_ASSERT'),
args=[ ExprBinary(reason, '==',
_PromiseRejectReason.HandlerRejected) ])),
StmtExpr(ExprAssn(reason, _PromiseRejectReason.HandlerRejected)),
StmtDecl(Decl(Type.BOOL, resolve.name),
init=ExprLiteral.FALSE),
StmtDecl(Decl(Type('IPC::Message', ptr=1), self.replyvar.name),
init=ExprCall(ExprVar(md.pqReplyCtorFunc()),
args=[ routingId ])),
@@ -4510,17 +4532,18 @@ class _GenerateProtocolActorCode(ipdl.as
args=[ _PromiseRejectReason.SendError,
ExprVar('__func__') ])) ])
sendargs = [ msgexpr ]
stmts = [ Whitespace.NL,
self.logMessage(md, msgexpr, 'Sending ', actor),
self.profilerLabel(md) ] + self.transition(md, actor)
if md.returns:
- sendargs.append(ExprCall(ExprSelect(retpromise, '.', 'get')));
+ sendargs.append(ExprCall(ExprSelect(retpromise, '.', 'get')))
+ sendargs.append(ExprVar('this'))
stmts.extend(promisedecl)
retvar = retpromise
stmts.extend([ Whitespace.NL,
StmtDecl(Decl(Type.BOOL, sendok.name),
init=ExprCall(
ExprSelect(self.protocol.callGetChannel(actor),
'->', 'Send'),