--- a/media/mtransport/test/sctp_unittest.cpp
+++ b/media/mtransport/test/sctp_unittest.cpp
@@ -137,17 +137,17 @@ class TransportTestPeer : public sigslot
NS_DISPATCH_SYNC);
}
void ConnectSocket_s(TransportTestPeer *peer) {
loopback_->Connect(peer->loopback_);
ASSERT_EQ((nsresult)NS_OK, flow_->PushLayer(loopback_));
- flow_->SignalPacketReceived.connect(this, &TransportTestPeer::PacketReceived);
+ loopback_->SignalPacketReceived.connect(this, &TransportTestPeer::PacketReceived);
// SCTP here!
ASSERT_TRUE(sctp_);
std::cerr << "Calling usrsctp_bind()" << std::endl;
int r = usrsctp_bind(sctp_, reinterpret_cast<struct sockaddr *>(
&local_addr_), sizeof(local_addr_));
ASSERT_GE(0, r);
@@ -194,40 +194,41 @@ class TransportTestPeer : public sigslot
++sent_;
}
int sent() const { return sent_; }
int received() const { return received_; }
bool connected() const { return connected_; }
static TransportResult SendPacket_s(const unsigned char* data, size_t len,
- const RefPtr<TransportFlow>& flow) {
- TransportResult res = flow->SendPacket(data, len);
+ const RefPtr<TransportFlow>& flow,
+ TransportLayer* layer) {
+ TransportResult res = layer->SendPacket(data, len);
delete data; // we always allocate
return res;
}
TransportResult SendPacket(const unsigned char* data, size_t len) {
unsigned char *buffer = new unsigned char[len];
memcpy(buffer, data, len);
// Uses DISPATCH_NORMAL to avoid possible deadlocks when we're called
// from MainThread especially during shutdown (same as DataChannels).
// RUN_ON_THREAD short-circuits if already on the STS thread, which is
// normal for most transfers outside of connect() and close(). Passes
// a refptr to flow_ to avoid any async deletion issues (since we can't
// make 'this' into a refptr as it isn't refcounted)
RUN_ON_THREAD(test_utils_->sts_target(), WrapRunnableNM(
- &TransportTestPeer::SendPacket_s, buffer, len, flow_),
+ &TransportTestPeer::SendPacket_s, buffer, len, flow_, loopback_),
NS_DISPATCH_NORMAL);
return 0;
}
- void PacketReceived(TransportFlow * flow, const unsigned char* data,
+ void PacketReceived(TransportLayer * layer, const unsigned char* data,
size_t len) {
std::cerr << "Received " << len << " bytes" << std::endl;
// Pass the data to SCTP
usrsctp_conninput(static_cast<void *>(this), data, len, 0);
}
--- a/media/mtransport/test/transport_unittests.cpp
+++ b/media/mtransport/test/transport_unittests.cpp
@@ -568,17 +568,17 @@ class TransportTestPeer : public sigslot
// is that we are testing a feature which TransaportLayerDtls doesn't
// expose.
SECStatus rv = SSL_OptionSet(dtls_->internal_fd(),
SSL_REUSE_SERVER_ECDHE_KEY, PR_TRUE);
ASSERT_EQ(SECSuccess, rv);
}
}
- flow_->SignalPacketReceived.connect(this, &TransportTestPeer::PacketReceived);
+ dtls_->SignalPacketReceived.connect(this, &TransportTestPeer::PacketReceived);
}
void TweakCiphers(PRFileDesc* fd) {
for (unsigned short& enabled_cipersuite : enabled_cipersuites_) {
SSL_CipherPrefSet(fd, enabled_cipersuite, PR_TRUE);
}
for (unsigned short& disabled_cipersuite : disabled_cipersuites_) {
SSL_CipherPrefSet(fd, disabled_cipersuite, PR_FALSE);
@@ -614,31 +614,29 @@ class TransportTestPeer : public sigslot
// Listen for candidates
stream->SignalCandidate.
connect(this, &TransportTestPeer::GotCandidate);
// Create the transport layer
ice_ = new TransportLayerIce();
ice_->SetParameters(stream, 1);
- // Assemble the stack
- nsAutoPtr<std::queue<mozilla::TransportLayer *> > layers(
- new std::queue<mozilla::TransportLayer *>);
- layers->push(ice_);
- layers->push(dtls_);
+ test_utils_->sts_target()->Dispatch(
+ WrapRunnable(flow_, &TransportFlow::PushLayer, ice_),
+ NS_DISPATCH_SYNC);
test_utils_->sts_target()->Dispatch(
- WrapRunnableRet(&res, flow_, &TransportFlow::PushLayers, layers),
+ WrapRunnableRet(&res, flow_, &TransportFlow::PushLayer, dtls_),
NS_DISPATCH_SYNC);
ASSERT_EQ((nsresult)NS_OK, res);
// Listen for media events
- flow_->SignalPacketReceived.connect(this, &TransportTestPeer::PacketReceived);
- flow_->SignalStateChange.connect(this, &TransportTestPeer::StateChanged);
+ dtls_->SignalPacketReceived.connect(this, &TransportTestPeer::PacketReceived);
+ dtls_->SignalStateChange.connect(this, &TransportTestPeer::StateChanged);
// Start gathering
test_utils_->sts_target()->Dispatch(
WrapRunnableRet(&res,
ice_ctx_->ctx(),
&NrIceCtx::StartGathering,
false,
false),
@@ -701,30 +699,30 @@ class TransportTestPeer : public sigslot
offerer_),
NS_DISPATCH_SYNC);
ASSERT_TRUE(NS_SUCCEEDED(res));
}
TransportResult SendPacket(const unsigned char* data, size_t len) {
TransportResult ret;
test_utils_->sts_target()->Dispatch(
- WrapRunnableRet(&ret, flow_, &TransportFlow::SendPacket, data, len),
+ WrapRunnableRet(&ret, dtls_, &TransportLayer::SendPacket, data, len),
NS_DISPATCH_SYNC);
return ret;
}
- void StateChanged(TransportFlow *flow, TransportLayer::State state) {
+ void StateChanged(TransportLayer *layer, TransportLayer::State state) {
if (state == TransportLayer::TS_OPEN) {
std::cerr << "Now connected" << std::endl;
}
}
- void PacketReceived(TransportFlow * flow, const unsigned char* data,
+ void PacketReceived(TransportLayer* layer, const unsigned char* data,
size_t len) {
std::cerr << "Received " << len << " bytes" << std::endl;
++received_packets_;
received_bytes_ += len;
}
void SetLoss(uint32_t loss) {
lossy_->SetLoss(loss);
@@ -753,17 +751,17 @@ class TransportTestPeer : public sigslot
void SetReuseECDHEKey() {
reuse_dhe_key_ = true;
}
TransportLayer::State state() {
TransportLayer::State tstate;
RUN_ON_THREAD(test_utils_->sts_target(),
- WrapRunnableRet(&tstate, flow_, &TransportFlow::state));
+ WrapRunnableRet(&tstate, dtls_, &TransportLayer::state));
return tstate;
}
bool connected() {
return state() == TransportLayer::TS_OPEN;
}
@@ -1297,50 +1295,17 @@ TEST(PushTests, LayerFail) {
bool destroyed1, destroyed2;
rv = flow->PushLayer(new TransportLayerDummy(true, &destroyed1));
ASSERT_TRUE(NS_SUCCEEDED(rv));
rv = flow->PushLayer(new TransportLayerDummy(false, &destroyed2));
ASSERT_TRUE(NS_FAILED(rv));
- ASSERT_EQ(TransportLayer::TS_ERROR, flow->state());
ASSERT_EQ(true, destroyed1);
ASSERT_EQ(true, destroyed2);
rv = flow->PushLayer(new TransportLayerDummy(true, &destroyed1));
ASSERT_TRUE(NS_FAILED(rv));
ASSERT_EQ(true, destroyed1);
}
-TEST(PushTests, LayersFail) {
- RefPtr<TransportFlow> flow = new TransportFlow();
- nsresult rv;
- bool destroyed1, destroyed2, destroyed3;
-
- rv = flow->PushLayer(new TransportLayerDummy(true, &destroyed1));
- ASSERT_TRUE(NS_SUCCEEDED(rv));
-
- nsAutoPtr<std::queue<TransportLayer *> > layers(
- new std::queue<TransportLayer *>());
-
- layers->push(new TransportLayerDummy(true, &destroyed2));
- layers->push(new TransportLayerDummy(false, &destroyed3));
-
- rv = flow->PushLayers(layers);
- ASSERT_TRUE(NS_FAILED(rv));
-
- ASSERT_EQ(TransportLayer::TS_ERROR, flow->state());
- ASSERT_EQ(true, destroyed1);
- ASSERT_EQ(true, destroyed2);
- ASSERT_EQ(true, destroyed3);
-
- layers = new std::queue<TransportLayer *>();
- layers->push(new TransportLayerDummy(true, &destroyed2));
- layers->push(new TransportLayerDummy(true, &destroyed3));
- rv = flow->PushLayers(layers);
-
- ASSERT_TRUE(NS_FAILED(rv));
- ASSERT_EQ(true, destroyed2);
- ASSERT_EQ(true, destroyed3);
-}
-
} // end namespace
--- a/media/mtransport/transportflow.cpp
+++ b/media/mtransport/transportflow.cpp
@@ -16,23 +16,16 @@ namespace mozilla {
MOZ_MTLOG_MODULE("mtransport")
NS_IMPL_ISUPPORTS0(TransportFlow)
// There are some hacks here to allow destruction off of
// the main thread.
TransportFlow::~TransportFlow() {
- // Make sure that if we are off the right thread, we have
- // no more attached signals.
- if (!CheckThreadInt()) {
- MOZ_ASSERT(SignalStateChange.is_empty());
- MOZ_ASSERT(SignalPacketReceived.is_empty());
- }
-
// Push the destruction onto the STS thread. Note that there
// is still some possibility that someone is accessing this
// object simultaneously, but as long as smart pointer discipline
// is maintained, it shouldn't be possible to access and
// destroy it simultaneously. The conversion to an nsAutoPtr
// ensures automatic destruction of the queue at exit of
// DestroyFinal.
if (target_) {
@@ -42,216 +35,71 @@ TransportFlow::~TransportFlow() {
NS_DISPATCH_NORMAL);
}
}
void TransportFlow::DestroyFinal(nsAutoPtr<std::deque<TransportLayer *> > layers) {
ClearLayers(layers.get());
}
-void TransportFlow::ClearLayers(std::queue<TransportLayer *>* layers) {
- while (!layers->empty()) {
- delete layers->front();
- layers->pop();
- }
-}
-
void TransportFlow::ClearLayers(std::deque<TransportLayer *>* layers) {
while (!layers->empty()) {
delete layers->front();
layers->pop_front();
}
}
-nsresult TransportFlow::PushLayer(TransportLayer *layer) {
+nsresult TransportFlow::PushLayer(TransportLayer* layer) {
CheckThread();
- UniquePtr<TransportLayer> layer_tmp(layer); // Destroy on failure.
-
- // Don't allow pushes once we are in error state.
- if (state_ == TransportLayer::TS_ERROR) {
- MOZ_MTLOG(ML_ERROR, id_ + ": Can't call PushLayer in error state for flow");
+ if (!layers_) {
return NS_ERROR_FAILURE;
}
nsresult rv = layer->Init();
if (!NS_SUCCEEDED(rv)) {
- // Destroy the rest of the flow, because it's no longer in an acceptable
- // state.
- ClearLayers(layers_.get());
-
- // Set ourselves to have failed.
- MOZ_MTLOG(ML_ERROR, id_ << ": Layer initialization failed; invalidating");
- StateChangeInt(TransportLayer::TS_ERROR);
-
- return rv;
- }
- EnsureSameThread(layer);
-
- TransportLayer *old_layer = layers_->empty() ? nullptr : layers_->front();
-
- // Re-target my signals to the new layer
- if (old_layer) {
- old_layer->SignalStateChange.disconnect(this);
- old_layer->SignalPacketReceived.disconnect(this);
- }
- layers_->push_front(layer_tmp.release());
- layer->Inserted(this, old_layer);
-
- layer->SignalStateChange.connect(this, &TransportFlow::StateChange);
- layer->SignalPacketReceived.connect(this, &TransportFlow::PacketReceived);
- StateChangeInt(layer->state());
-
- return NS_OK;
-}
-
-// This is all-or-nothing.
-nsresult TransportFlow::PushLayers(nsAutoPtr<std::queue<TransportLayer *> > layers) {
- CheckThread();
-
- MOZ_ASSERT(!layers->empty());
- if (layers->empty()) {
- MOZ_MTLOG(ML_ERROR, id_ << ": Can't call PushLayers with empty layers");
- return NS_ERROR_INVALID_ARG;
- }
-
- // Don't allow pushes once we are in error state.
- if (state_ == TransportLayer::TS_ERROR) {
- MOZ_MTLOG(ML_ERROR,
- id_ << ": Can't call PushLayers in error state for flow ");
- ClearLayers(layers.get());
- return NS_ERROR_FAILURE;
- }
-
- nsresult rv = NS_OK;
-
- // Disconnect all the old signals.
- disconnect_all();
-
- TransportLayer *layer;
-
- while (!layers->empty()) {
- TransportLayer *old_layer = layers_->empty() ? nullptr : layers_->front();
- layer = layers->front();
-
- rv = layer->Init();
- if (NS_FAILED(rv)) {
- MOZ_MTLOG(ML_ERROR,
- id_ << ": Layer initialization failed; invalidating flow ");
- break;
- }
-
- EnsureSameThread(layer);
-
- // Push the layer onto the queue.
- layers_->push_front(layer);
- layers->pop();
- layer->Inserted(this, old_layer);
- }
-
- if (NS_FAILED(rv)) {
- // Destroy any layers we could not push.
- ClearLayers(layers.get());
+ MOZ_MTLOG(ML_ERROR, id_ << ": Layer initialization failed");
+ delete layer;
// Now destroy the rest of the flow, because it's no longer
// in an acceptable state.
ClearLayers(layers_.get());
+ layers_.reset();
- // Set ourselves to have failed.
- StateChangeInt(TransportLayer::TS_ERROR);
-
- // Return failure.
return rv;
}
- // Finally, attach ourselves to the top layer.
- layer->SignalStateChange.connect(this, &TransportFlow::StateChange);
- layer->SignalPacketReceived.connect(this, &TransportFlow::PacketReceived);
- StateChangeInt(layer->state()); // Signals if the state changes.
+ TransportLayer *old_layer = layers_->empty() ? nullptr : layers_->front();
+ layers_->push_front(layer);
+ EnsureSameThread(layer);
+
+ layer->Inserted(this, old_layer);
return NS_OK;
}
-TransportLayer *TransportFlow::top() const {
- CheckThread();
-
- return layers_->empty() ? nullptr : layers_->front();
-}
-
TransportLayer *TransportFlow::GetLayer(const std::string& id) const {
CheckThread();
- for (std::deque<TransportLayer *>::const_iterator it = layers_->begin();
- it != layers_->end(); ++it) {
- if ((*it)->id() == id)
- return *it;
+ if (layers_) {
+ for (TransportLayer* layer : *layers_) {
+ if (layer->id() == id)
+ return layer;
+ }
}
return nullptr;
}
-TransportLayer::State TransportFlow::state() {
- CheckThread();
-
- return state_;
-}
-
-TransportResult TransportFlow::SendPacket(const unsigned char *data,
- size_t len) {
- CheckThread();
-
- if (state_ != TransportLayer::TS_OPEN) {
- return TE_ERROR;
- }
- return top() ? top()->SendPacket(data, len) : TE_ERROR;
-}
-
-bool TransportFlow::Contains(TransportLayer *layer) const {
- if (layers_) {
- for (auto& l : *layers_) {
- if (l == layer) {
- return true;
- }
- }
- }
- return false;
-}
-
void TransportFlow::EnsureSameThread(TransportLayer *layer) {
// Enforce that if any of the layers have a thread binding,
// they all have the same binding.
if (target_) {
const nsCOMPtr<nsIEventTarget>& lthread = layer->GetThread();
if (lthread && (lthread != target_))
MOZ_CRASH();
}
else {
target_ = layer->GetThread();
}
}
-void TransportFlow::StateChangeInt(TransportLayer::State state) {
- CheckThread();
-
- if (state == state_) {
- return;
- }
-
- state_ = state;
- SignalStateChange(this, state_);
-}
-
-void TransportFlow::StateChange(TransportLayer *layer,
- TransportLayer::State state) {
- CheckThread();
-
- StateChangeInt(state);
-}
-
-void TransportFlow::PacketReceived(TransportLayer* layer,
- const unsigned char *data,
- size_t len) {
- CheckThread();
-
- SignalPacketReceived(this, data, len);
-}
-
} // close namespace
--- a/media/mtransport/transportflow.h
+++ b/media/mtransport/transportflow.h
@@ -5,17 +5,16 @@
* You can obtain one at http://mozilla.org/MPL/2.0/. */
// Original author: ekr@rtfm.com
#ifndef transportflow_h__
#define transportflow_h__
#include <deque>
-#include <queue>
#include <string>
#include "nscore.h"
#include "nsISupportsImpl.h"
#include "mozilla/UniquePtr.h"
#include "transportlayer.h"
#include "m_cpp_utils.h"
#include "nsAutoPtr.h"
@@ -44,63 +43,37 @@
// restriction by thread-locking the signals, but previous
// attempts have caused deadlocks.
//
// Most of these invariants are enforced by hard asserts
// (i.e., those which fire even in production builds).
namespace mozilla {
-class TransportFlow final : public nsISupports,
- public sigslot::has_slots<> {
+class TransportFlow final : public nsISupports {
public:
TransportFlow()
: id_("(anonymous)"),
- state_(TransportLayer::TS_NONE),
layers_(new std::deque<TransportLayer *>) {}
explicit TransportFlow(const std::string id)
: id_(id),
- state_(TransportLayer::TS_NONE),
layers_(new std::deque<TransportLayer *>) {}
const std::string& id() const { return id_; }
// Layer management. Note PushLayer() is not thread protected, so
// either:
// (a) Do it in the thread handling the I/O
// (b) Do it before you activate the I/O system
//
// The flow takes ownership of the layers after a successful
// push.
- nsresult PushLayer(TransportLayer *layer);
-
- // Convenience function to push multiple layers on. Layers
- // are pushed on in the order that they are in the queue.
- // Any failures cause the flow to become inoperable and
- // destroys all the layers including those already pushed.
- // TODO(ekr@rtfm.com): Change layers to be ref-counted.
- nsresult PushLayers(nsAutoPtr<std::queue<TransportLayer *> > layers);
-
- TransportLayer *top() const;
- TransportLayer *GetLayer(const std::string& id) const;
+ nsresult PushLayer(TransportLayer* layer);
- // Wrappers for whatever TLayer happens to be the top layer
- // at the time. This way you don't need to do top()->Foo().
- TransportLayer::State state(); // Current state
- TransportResult SendPacket(const unsigned char *data, size_t len);
-
- // State has changed. Reflects the top flow.
- sigslot::signal2<TransportFlow *, TransportLayer::State>
- SignalStateChange;
-
- // Data received on the flow
- sigslot::signal3<TransportFlow*, const unsigned char *, size_t>
- SignalPacketReceived;
-
- bool Contains(TransportLayer *layer) const;
+ TransportLayer *GetLayer(const std::string& id) const;
NS_DECL_THREADSAFE_ISUPPORTS
private:
~TransportFlow();
DISALLOW_COPY_ASSIGN(TransportFlow);
@@ -118,26 +91,20 @@ class TransportFlow final : public nsISu
if (NS_FAILED(target_->IsOnCurrentThread(&on)))
return false;
return on;
}
void EnsureSameThread(TransportLayer *layer);
- void StateChange(TransportLayer *layer, TransportLayer::State state);
- void StateChangeInt(TransportLayer::State state);
- void PacketReceived(TransportLayer* layer, const unsigned char *data,
- size_t len);
static void DestroyFinal(nsAutoPtr<std::deque<TransportLayer *> > layers);
// Overload needed because we use deque internally and queue externally.
static void ClearLayers(std::deque<TransportLayer *>* layers);
- static void ClearLayers(std::queue<TransportLayer *>* layers);
std::string id_;
- TransportLayer::State state_;
UniquePtr<std::deque<TransportLayer *>> layers_;
nsCOMPtr<nsIEventTarget> target_;
};
} // close namespace
#endif
--- a/media/webrtc/signaling/gtest/mediapipeline_unittest.cpp
+++ b/media/webrtc/signaling/gtest/mediapipeline_unittest.cpp
@@ -201,23 +201,23 @@ class TransportInfo {
dtls_->SetRole(client ? TransportLayerDtls::CLIENT :
TransportLayerDtls::SERVER);
dtls_->SetVerificationAllowAll();
}
void PushLayers() {
nsresult res;
- nsAutoPtr<std::queue<TransportLayer *> > layers(
- new std::queue<TransportLayer *>);
- layers->push(loopback_);
- layers->push(dtls_);
- res = flow_->PushLayers(layers);
+ // Ignore error; if this fails, subsequent calls to PushLayer will fail also
+ (void)flow_->PushLayer(loopback_);
+ res = flow_->PushLayer(dtls_);
if (res != NS_OK) {
- FreeLayers();
+ // These have already been deleted
+ loopback_ = nullptr;
+ dtls_ = nullptr;
}
ASSERT_EQ((nsresult)NS_OK, res);
}
void Connect(TransportInfo* peer) {
MOZ_ASSERT(loopback_);
MOZ_ASSERT(peer->loopback_);
--- a/media/webrtc/signaling/src/mediapipeline/MediaPipeline.cpp
+++ b/media/webrtc/signaling/src/mediapipeline/MediaPipeline.cpp
@@ -861,19 +861,19 @@ MediaPipeline::GetContributingSourceStat
RTCRTPContributingSourceStats stats;
info.second.GetWebidlInstance(stats, aInboundRtpStreamId);
aArr.AppendElement(stats, fallible);
}
}
}
void
-MediaPipeline::StateChange(TransportFlow* aFlow, TransportLayer::State aState)
+MediaPipeline::StateChange(TransportLayer* aLayer, TransportLayer::State aState)
{
- TransportInfo* info = GetTransportInfo_s(aFlow);
+ TransportInfo* info = GetTransportInfo_s(aLayer);
MOZ_ASSERT(info);
if (aState == TransportLayer::TS_OPEN) {
CSFLogInfo(LOGTAG, "Flow is ready");
TransportReady_s(*info);
} else if (aState == TransportLayer::TS_CLOSED ||
aState == TransportLayer::TS_ERROR) {
TransportFailed_s(*info);
@@ -1054,27 +1054,22 @@ MediaPipeline::UpdateRtcpMuxState(Transp
mRtcp.mSendSrtp = aInfo.mSendSrtp;
mRtcp.mRecvSrtp = aInfo.mRecvSrtp;
}
}
}
}
nsresult
-MediaPipeline::SendPacket(const TransportFlow* aFlow, const void* aData, int aLen)
+MediaPipeline::SendPacket(TransportLayer* aLayer, const void* aData, int aLen)
{
ASSERT_ON_THREAD(mStsThread);
- // Note that we bypass the DTLS layer here
- TransportLayerDtls* dtls =
- static_cast<TransportLayerDtls*>(aFlow->GetLayer(TransportLayerDtls::ID()));
- MOZ_ASSERT(dtls);
-
TransportResult res =
- dtls->downward()->SendPacket(static_cast<const unsigned char*>(aData), aLen);
+ aLayer->SendPacket(static_cast<const unsigned char*>(aData), aLen);
if (res != aLen) {
// Ignore blocking indications
if (res == TE_WOULDBLOCK)
return NS_OK;
CSFLogError(LOGTAG, "Failed write on stream %s", mDescription.c_str());
return NS_BASE_STREAM_CLOSED;
@@ -1166,17 +1161,17 @@ MediaPipeline::RtpPacketReceived(Transpo
return;
}
if (mRtp.mState != StateType::MP_OPEN) {
CSFLogError(LOGTAG, "Discarding incoming packet; pipeline not open");
return;
}
- if (mRtp.mTransport->state() != TransportLayer::TS_OPEN) {
+ if (mRtp.mDtls->state() != TransportLayer::TS_OPEN) {
CSFLogError(LOGTAG, "Discarding incoming packet; transport not open");
return;
}
// This should never happen.
MOZ_ASSERT(mRtp.mRecvSrtp);
if (!aLen) {
@@ -1289,17 +1284,17 @@ MediaPipeline::RtcpPacketReceived(Transp
return;
}
if (mRtcp.mState != StateType::MP_OPEN) {
CSFLogDebug(LOGTAG, "Discarding incoming packet; pipeline not open");
return;
}
- if (mRtcp.mTransport->state() != TransportLayer::TS_OPEN) {
+ if (mRtcp.mDtls->state() != TransportLayer::TS_OPEN) {
CSFLogError(LOGTAG, "Discarding incoming packet; transport not open");
return;
}
if (!aLen) {
return;
}
@@ -1745,49 +1740,50 @@ MediaPipelineTransmit::ReplaceTrack(RefP
}
return NS_OK;
}
nsresult
MediaPipeline::ConnectTransport_s(TransportInfo& aInfo)
{
MOZ_ASSERT(aInfo.mTransport);
+ MOZ_ASSERT(aInfo.mDtls);
ASSERT_ON_THREAD(mStsThread);
// Look to see if the transport is ready
- if (aInfo.mTransport->state() == TransportLayer::TS_OPEN) {
+ if (aInfo.mDtls->state() == TransportLayer::TS_OPEN) {
nsresult res = TransportReady_s(aInfo);
if (NS_FAILED(res)) {
CSFLogError(LOGTAG,
"Error calling TransportReady(); res=%u in %s",
static_cast<uint32_t>(res),
__FUNCTION__);
return res;
}
- } else if (aInfo.mTransport->state() == TransportLayer::TS_ERROR) {
+ } else if (aInfo.mDtls->state() == TransportLayer::TS_ERROR) {
CSFLogError(
LOGTAG, "%s transport is already in error state", ToString(aInfo.mType));
TransportFailed_s(aInfo);
return NS_ERROR_FAILURE;
}
- aInfo.mTransport->SignalStateChange.connect(this, &MediaPipeline::StateChange);
+ aInfo.mDtls->SignalStateChange.connect(this, &MediaPipeline::StateChange);
return NS_OK;
}
MediaPipeline::TransportInfo*
-MediaPipeline::GetTransportInfo_s(TransportFlow* aFlow)
+MediaPipeline::GetTransportInfo_s(TransportLayer* aLayer)
{
ASSERT_ON_THREAD(mStsThread);
- if (aFlow == mRtp.mTransport) {
+ if (aLayer == mRtp.mDtls) {
return &mRtp;
}
- if (aFlow == mRtcp.mTransport) {
+ if (aLayer == mRtcp.mDtls) {
return &mRtcp;
}
return nullptr;
}
nsresult
MediaPipeline::PipelineTransport::SendRtpPacket(const uint8_t* aData, size_t aLen)
@@ -1889,17 +1885,17 @@ MediaPipeline::PipelineTransport::SendRt
mPipeline->mPacketDumper->Dump(mPipeline->Level(),
dom::mozPacketDumpType::Srtcp,
true,
aData->data(),
out_len);
mPipeline->IncrementRtcpPacketsSent();
}
- return mPipeline->SendPacket(transport.mTransport, aData->data(), out_len);
+ return mPipeline->SendPacket(transport.mDtls->downward(), aData->data(), out_len);
}
nsresult
MediaPipeline::PipelineTransport::SendRtcpPacket(const uint8_t* aData,
size_t aLen)
{
nsAutoPtr<DataBuffer> buf(
--- a/media/webrtc/signaling/src/mediapipeline/MediaPipeline.h
+++ b/media/webrtc/signaling/src/mediapipeline/MediaPipeline.h
@@ -210,57 +210,60 @@ protected:
virtual ~MediaPipeline();
nsresult AttachTransport_s();
friend class PipelineTransport;
struct TransportInfo
{
TransportInfo(RefPtr<TransportFlow> aFlow, RtpType aType)
: mTransport(aFlow)
+ , mDtls(mTransport ? mTransport->GetLayer("dtls") : nullptr)
, mState(StateType::MP_CONNECTING)
, mType(aType)
{
}
void Detach()
{
mTransport = nullptr;
+ mDtls = nullptr;
mSendSrtp = nullptr;
mRecvSrtp = nullptr;
}
RefPtr<TransportFlow> mTransport;
+ TransportLayer* mDtls;
StateType mState;
RefPtr<SrtpFlow> mSendSrtp;
RefPtr<SrtpFlow> mRecvSrtp;
RtpType mType;
};
// The transport is down
virtual nsresult TransportFailed_s(TransportInfo& aInfo);
// The transport is ready
virtual nsresult TransportReady_s(TransportInfo& aInfo);
void UpdateRtcpMuxState(TransportInfo& aInfo);
nsresult ConnectTransport_s(TransportInfo& aInfo);
- TransportInfo* GetTransportInfo_s(TransportFlow* aFlow);
+ TransportInfo* GetTransportInfo_s(TransportLayer* aLayer);
void IncrementRtpPacketsSent(int aBytes);
void IncrementRtcpPacketsSent();
void IncrementRtpPacketsReceived(int aBytes);
virtual void OnRtpPacketReceived() {};
void IncrementRtcpPacketsReceived();
- virtual nsresult SendPacket(const TransportFlow* aFlow,
+ virtual nsresult SendPacket(TransportLayer* aLayer,
const void* aData,
int aLen);
// Process slots on transports
- void StateChange(TransportFlow* aFlow, TransportLayer::State);
+ void StateChange(TransportLayer* aLayer, TransportLayer::State);
void RtpPacketReceived(TransportLayer* aLayer,
const unsigned char* aData,
size_t aLen);
void RtcpPacketReceived(TransportLayer* aLayer,
const unsigned char* aData,
size_t aLen);
void PacketReceived(TransportLayer* aLayer,
const unsigned char* aData,
--- a/media/webrtc/signaling/src/peerconnection/PeerConnectionMedia.cpp
+++ b/media/webrtc/signaling/src/peerconnection/PeerConnectionMedia.cpp
@@ -530,23 +530,20 @@ FinalizeTransportFlow_s(RefPtr<PeerConne
RefPtr<TransportFlow> aFlow, size_t aLevel,
bool aIsRtcp,
nsAutoPtr<PtrVector<TransportLayer> > aLayerList)
{
TransportLayerIce* ice =
static_cast<TransportLayerIce*>(aLayerList->values.front());
ice->SetParameters(aPCMedia->ice_media_stream(aLevel),
aIsRtcp ? 2 : 1);
- nsAutoPtr<std::queue<TransportLayer*> > layerQueue(
- new std::queue<TransportLayer*>);
for (auto& value : aLayerList->values) {
- layerQueue->push(value);
+ (void)aFlow->PushLayer(value); // TODO(bug 854518): Process errors.
}
aLayerList->values.clear();
- (void)aFlow->PushLayers(layerQueue); // TODO(bug 854518): Process errors.
}
static void
AddNewIceStreamForRestart_s(RefPtr<PeerConnectionMedia> aPCMedia,
RefPtr<TransportFlow> aFlow,
size_t aLevel,
bool aIsRtcp)
{
--- a/netwerk/sctp/datachannel/DataChannel.cpp
+++ b/netwerk/sctp/datachannel/DataChannel.cpp
@@ -308,16 +308,17 @@ DataChannelConnection::DataChannelConnec
: NeckoTargetHolder(aTarget)
, mLock("netwerk::sctp::DataChannelConnection")
{
mCurrentStream = 0;
mState = CLOSED;
mSocket = nullptr;
mMasterSocket = nullptr;
mListener = listener;
+ mDtls = nullptr;
mLocalPort = 0;
mRemotePort = 0;
mPendingType = PENDING_NONE;
LOG(("Constructor DataChannelConnection=%p, listener=%p", this, mListener.get()));
mInternalIOThread = nullptr;
#ifdef MOZ_DIAGNOSTIC_ASSERT_ENABLED
mShutdown = false;
#endif
@@ -325,17 +326,17 @@ DataChannelConnection::DataChannelConnec
DataChannelConnection::~DataChannelConnection()
{
LOG(("Deleting DataChannelConnection %p", (void *) this));
// This may die on the MainThread, or on the STS thread
ASSERT_WEBRTC(mState == CLOSED);
MOZ_ASSERT(!mMasterSocket);
MOZ_ASSERT(mPending.GetSize() == 0);
- MOZ_ASSERT(!mTransportFlow);
+ MOZ_ASSERT(!mDtls);
// Already disconnected from sigslot/mTransportFlow
// TransportFlows must be released from the STS thread
if (!IsSTSThread()) {
ASSERT_WEBRTC(NS_IsMainThread());
if (mInternalIOThread) {
// Avoid spinning the event thread from here (which if we're mainthread
@@ -410,16 +411,17 @@ void DataChannelConnection::DestroyOnSTS
mSTS->Dispatch(WrapRunnable(RefPtr<DataChannelConnection>(this),
&DataChannelConnection::DestroyOnSTSFinal),
NS_DISPATCH_NORMAL);
}
void DataChannelConnection::DestroyOnSTSFinal()
{
mTransportFlow = nullptr;
+ mDtls = nullptr;
sDataChannelShutdown->CreateConnectionShutdown(this);
}
bool
DataChannelConnection::Init(unsigned short aPort, uint16_t aNumStreams, bool aMaxMessageSizeSet,
uint64_t aMaxMessageSize)
{
struct sctp_initmsg initmsg;
@@ -665,20 +667,18 @@ DataChannelConnection::GetMaxMessageSize
}
#ifdef MOZ_PEERCONNECTION
void
DataChannelConnection::SetEvenOdd()
{
ASSERT_WEBRTC(IsSTSThread());
- TransportLayerDtls *dtls = static_cast<TransportLayerDtls *>(
- mTransportFlow->GetLayer(TransportLayerDtls::ID()));
- MOZ_ASSERT(dtls); // DTLS is mandatory
- mAllocateEven = (dtls->role() == TransportLayerDtls::CLIENT);
+ MOZ_ASSERT(mDtls); // DTLS is mandatory
+ mAllocateEven = (mDtls->role() == TransportLayerDtls::CLIENT);
}
bool
DataChannelConnection::ConnectViaTransportFlow(TransportFlow *aFlow, uint16_t localport, uint16_t remoteport)
{
LOG(("Connect DTLS local %u, remote %u", localport, remoteport));
MOZ_ASSERT(mMasterSocket, "SCTP wasn't initialized before ConnectViaTransportFlow!");
@@ -696,26 +696,27 @@ DataChannelConnection::ConnectViaTranspo
NS_DISPATCH_NORMAL);
return true;
}
void
DataChannelConnection::SetSignals()
{
ASSERT_WEBRTC(IsSTSThread());
- ASSERT_WEBRTC(mTransportFlow);
- LOG(("Setting transport signals, state: %d", mTransportFlow->state()));
- mTransportFlow->SignalPacketReceived.connect(this, &DataChannelConnection::SctpDtlsInput);
+ mDtls = static_cast<TransportLayerDtls*>(mTransportFlow->GetLayer("dtls"));
+ ASSERT_WEBRTC(mDtls);
+ LOG(("Setting transport signals, state: %d", mDtls->state()));
+ mDtls->SignalPacketReceived.connect(this, &DataChannelConnection::SctpDtlsInput);
// SignalStateChange() doesn't call you with the initial state
- mTransportFlow->SignalStateChange.connect(this, &DataChannelConnection::CompleteConnect);
- CompleteConnect(mTransportFlow, mTransportFlow->state());
+ mDtls->SignalStateChange.connect(this, &DataChannelConnection::CompleteConnect);
+ CompleteConnect(mDtls, mDtls->state());
}
void
-DataChannelConnection::CompleteConnect(TransportFlow *flow, TransportLayer::State state)
+DataChannelConnection::CompleteConnect(TransportLayer *layer, TransportLayer::State state)
{
LOG(("Data transport state: %d", state));
MutexAutoLock lock(mLock);
ASSERT_WEBRTC(IsSTSThread());
// We should abort connection on TS_ERROR.
// Note however that the association will also fail (perhaps with a delay) and
// notify us in that way
if (state != TransportLayer::TS_OPEN || !mMasterSocket)
@@ -812,17 +813,17 @@ DataChannelConnection::ProcessQueuedOpen
channel = OpenFinish(channel.forget()); // may reset the flag and re-push
} else {
NS_ASSERTION(false, "How did a DataChannel get queued without the FINISH_OPEN flag?");
}
}
}
void
-DataChannelConnection::SctpDtlsInput(TransportFlow *flow,
+DataChannelConnection::SctpDtlsInput(TransportLayer *layer,
const unsigned char *data, size_t len)
{
if (MOZ_LOG_TEST(gSCTPLog, LogLevel::Debug)) {
char *buf;
if ((buf = usrsctp_dumppacket((void *)data, len, SCTP_DUMP_INBOUND)) != nullptr) {
SCTP_LOG(("%s", buf));
usrsctp_freedumpbuffer(buf);
@@ -833,18 +834,18 @@ DataChannelConnection::SctpDtlsInput(Tra
usrsctp_conninput(static_cast<void *>(this), data, len, 0);
}
int
DataChannelConnection::SendPacket(unsigned char data[], size_t len, bool release)
{
//LOG(("%p: SCTP/DTLS sent %ld bytes", this, len));
int res = 0;
- if (mTransportFlow) {
- res = mTransportFlow->SendPacket(data, len) < 0 ? 1 : 0;
+ if (mDtls) {
+ res = mDtls->SendPacket(data, len) < 0 ? 1 : 0;
}
if (release)
delete [] data;
return res;
}
/* static */
int
--- a/netwerk/sctp/datachannel/DataChannel.h
+++ b/netwerk/sctp/datachannel/DataChannel.h
@@ -161,17 +161,17 @@ public:
bool Listen(unsigned short port);
bool Connect(const char *addr, unsigned short port);
#endif
#ifdef SCTP_DTLS_SUPPORTED
// Connect using a TransportFlow (DTLS) channel
void SetEvenOdd();
bool ConnectViaTransportFlow(TransportFlow *aFlow, uint16_t localport, uint16_t remoteport);
- void CompleteConnect(TransportFlow *flow, TransportLayer::State state);
+ void CompleteConnect(TransportLayer *layer, TransportLayer::State state);
void SetSignals();
#endif
typedef enum {
RELIABLE=0,
PARTIAL_RELIABLE_REXMIT = 1,
PARTIAL_RELIABLE_TIMED = 2
} Type;
@@ -237,17 +237,17 @@ protected:
WeakPtr<DataConnectionListener> mListener;
private:
friend class DataChannelConnectRunnable;
#ifdef SCTP_DTLS_SUPPORTED
static void DTLSConnectThread(void *data);
int SendPacket(unsigned char data[], size_t len, bool release);
- void SctpDtlsInput(TransportFlow *flow, const unsigned char *data, size_t len);
+ void SctpDtlsInput(TransportLayer *layer, const unsigned char *data, size_t len);
static int SctpDtlsOutput(void *addr, void *buffer, size_t length, uint8_t tos, uint8_t set_df);
#endif
DataChannel* FindChannelByStream(uint16_t stream);
uint16_t FindFreeStream();
bool RequestMoreStreams(int32_t aNeeded = 16);
uint32_t UpdateCurrentStreamIndex();
uint32_t GetCurrentStreamIndex();
int SendControlMessage(const uint8_t *data, uint32_t len, uint16_t stream);
@@ -329,16 +329,17 @@ private:
AutoTArray<uint16_t,4> mStreamsResetting;
struct socket *mMasterSocket; // accessed from STS thread
struct socket *mSocket; // cloned from mMasterSocket on successful Connect on STS thread
uint16_t mState; // Protected with mLock
#ifdef SCTP_DTLS_SUPPORTED
RefPtr<TransportFlow> mTransportFlow;
+ TransportLayerDtls* mDtls;
nsCOMPtr<nsIEventTarget> mSTS;
#endif
uint16_t mLocalPort; // Accessed from connect thread
uint16_t mRemotePort;
nsCOMPtr<nsIThread> mInternalIOThread;
uint8_t mPendingType;
nsCString mRecvBuffer;