--- a/netwerk/sctp/datachannel/DataChannel.cpp
+++ b/netwerk/sctp/datachannel/DataChannel.cpp
@@ -30,16 +30,18 @@
#pragma warning(pop)
#endif
#include "DataChannelLog.h"
#include "nsServiceManagerUtils.h"
#include "nsIObserverService.h"
#include "nsIObserver.h"
+#include "nsIPrefBranch.h"
+#include "nsIPrefService.h"
#include "mozilla/Services.h"
#include "mozilla/Sprintf.h"
#include "nsProxyRelease.h"
#include "nsThread.h"
#include "nsThreadUtils.h"
#include "nsAutoPtr.h"
#include "nsNetUtil.h"
#include "nsNetCID.h"
@@ -122,29 +124,48 @@ public:
(void) rv;
}
return NS_OK;
}
};
NS_IMPL_ISUPPORTS(DataChannelShutdown, nsIObserver);
-BufferedMsg::BufferedMsg(struct sctp_sendv_spa &spa, const char *data,
- size_t length) : mLength(length)
+OutgoingMsg::OutgoingMsg(struct sctp_sendv_spa &info, const uint8_t *data,
+ size_t length)
+ : mLength(length)
+ , mData(data)
+{
+ mInfo = &info;
+ mPos = 0;
+}
+
+void OutgoingMsg::Advance(size_t offset)
{
- mSpa = new sctp_sendv_spa;
- *mSpa = spa;
- auto *tmp = new char[length]; // infallible malloc!
- memcpy(tmp, data, length);
+ mPos += offset;
+ if (mPos > mLength) {
+ mPos = mLength;
+ }
+}
+
+BufferedOutgoingMsg::BufferedOutgoingMsg(OutgoingMsg &msg)
+{
+ size_t length = msg.GetLeft();
+ auto *tmp = new uint8_t[length]; // infallible malloc!
+ memcpy(tmp, msg.GetData(), length);
+ mLength = length;
mData = tmp;
+ mInfo = new sctp_sendv_spa;
+ *mInfo = msg.GetInfo();
+ mPos = 0;
}
-BufferedMsg::~BufferedMsg()
+BufferedOutgoingMsg::~BufferedOutgoingMsg()
{
- delete mSpa;
+ delete mInfo;
delete mData;
}
static int
receive_cb(struct socket* sock, union sctp_sockstore addr,
void *data, size_t datalen,
struct sctp_rcvinfo rcv, int flags, void *ulp_info)
{
@@ -175,17 +196,16 @@ GetConnectionFromSocket(struct socket* s
}
// called when the buffer empties to the threshold value
static int
threshold_event(struct socket* sock, uint32_t sb_free)
{
DataChannelConnection *connection = GetConnectionFromSocket(sock);
if (connection) {
- LOG(("SendDeferred()"));
connection->SendDeferredMessages();
} else {
LOG(("Can't find connection for socket %p", sock));
}
return 0;
}
static void
@@ -207,22 +227,24 @@ debug_printf(const char *format, ...)
}
}
DataChannelConnection::DataChannelConnection(DataConnectionListener *listener,
nsIEventTarget *aTarget)
: NeckoTargetHolder(aTarget)
, mLock("netwerk::sctp::DataChannelConnection")
{
+ mCurrentStream = 0;
mState = CLOSED;
mSocket = nullptr;
mMasterSocket = nullptr;
mListener = listener;
mLocalPort = 0;
mRemotePort = 0;
+ mPendingType = PENDING_NONE;
LOG(("Constructor DataChannelConnection=%p, listener=%p", this, mListener.get()));
mInternalIOThread = nullptr;
}
DataChannelConnection::~DataChannelConnection()
{
LOG(("Deleting DataChannelConnection %p", (void *) this));
// This may die on the MainThread, or on the STS thread
@@ -271,20 +293,18 @@ DataChannelConnection::Destroy()
// we can deregister this DataChannelConnection without leaking.
ClearResets();
MOZ_ASSERT(mSTS);
ASSERT_WEBRTC(NS_IsMainThread());
// Must do this in Destroy() since we may then delete this object.
// Do this before dispatching to create a consistent ordering of calls to
// the SCTP stack.
- if (mUsingDtls) {
- usrsctp_deregister_address(static_cast<void *>(this));
- LOG(("Deregistered %p from the SCTP stack.", static_cast<void *>(this)));
- }
+ usrsctp_deregister_address(static_cast<void *>(this));
+ LOG(("Deregistered %p from the SCTP stack.", static_cast<void *>(this)));
// Finish Destroy on STS thread to avoid bug 876167 - once that's fixed,
// the usrsctp_close() calls can move back here (and just proxy the
// disconnect_all())
RUN_ON_THREAD(mSTS, WrapRunnable(RefPtr<DataChannelConnection>(this),
&DataChannelConnection::DestroyOnSTS,
mSocket, mMasterSocket),
NS_DISPATCH_NORMAL);
@@ -306,80 +326,85 @@ void DataChannelConnection::DestroyOnSTS
usrsctp_close(aSocket);
if (aMasterSocket)
usrsctp_close(aMasterSocket);
disconnect_all();
}
bool
-DataChannelConnection::Init(unsigned short aPort, uint16_t aNumStreams, bool aUsingDtls)
+DataChannelConnection::Init(unsigned short aPort, uint16_t aNumStreams, bool aMaxMessageSizeSet,
+ uint64_t aMaxMessageSize)
{
struct sctp_initmsg initmsg;
- struct sctp_udpencaps encaps;
struct sctp_assoc_value av;
struct sctp_event event;
socklen_t len;
uint16_t event_types[] = {SCTP_ASSOC_CHANGE,
SCTP_PEER_ADDR_CHANGE,
SCTP_REMOTE_ERROR,
SCTP_SHUTDOWN_EVENT,
SCTP_ADAPTATION_INDICATION,
+ SCTP_PARTIAL_DELIVERY_EVENT,
SCTP_SEND_FAILED_EVENT,
SCTP_STREAM_RESET_EVENT,
SCTP_STREAM_CHANGE_EVENT};
{
ASSERT_WEBRTC(NS_IsMainThread());
-
// MutexAutoLock lock(mLock); Not needed since we're on mainthread always
+
+ mSendInterleaved = false;
+ mPpidFragmentation = false;
+ SetMaxMessageSize(aMaxMessageSizeSet, aMaxMessageSize);
+
if (!sctp_initialized) {
- if (aUsingDtls) {
- LOG(("sctp_init(DTLS)"));
+ LOG(("sctp_init"));
#ifdef MOZ_PEERCONNECTION
- usrsctp_init(0,
- DataChannelConnection::SctpDtlsOutput,
- debug_printf
- );
+ usrsctp_init(0,
+ DataChannelConnection::SctpDtlsOutput,
+ debug_printf
+ );
#else
- NS_ASSERTION(!aUsingDtls, "Trying to use SCTP/DTLS without mtransport");
+ MOZ_CRASH("Trying to use SCTP/DTLS without mtransport");
#endif
- } else {
- LOG(("sctp_init(%u)", aPort));
- usrsctp_init(aPort,
- nullptr,
- debug_printf
- );
- }
// Set logging to SCTP:LogLevel::Debug to get SCTP debugs
if (MOZ_LOG_TEST(gSCTPLog, LogLevel::Debug)) {
usrsctp_sysctl_set_sctp_debug_on(SCTP_DEBUG_ALL);
}
+ // Do not send ABORTs in response to INITs (1).
+ // Do not send ABORTs for received Out of the Blue packets (2).
usrsctp_sysctl_set_sctp_blackhole(2);
- // ECN is currently not supported by the Firefox code
+
+ // Disable the Explicit Congestion Notification extension (currently not supported by the
+ // Firefox code)
usrsctp_sysctl_set_sctp_ecn_enable(0);
+
+ // Enable interleaving messages for different streams (incoming)
+ // See: https://tools.ietf.org/html/rfc6458#section-8.1.20
+ usrsctp_sysctl_set_sctp_default_frag_interleave(2);
+
sctp_initialized = true;
RefPtr<DataChannelShutdown> shutdown = new DataChannelShutdown();
shutdown->Init();
}
}
// XXX FIX! make this a global we get once
// Find the STS thread
nsresult rv;
mSTS = do_GetService(NS_SOCKETTRANSPORTSERVICE_CONTRACTID, &rv);
MOZ_ASSERT(NS_SUCCEEDED(rv));
// Open sctp with a callback
if ((mMasterSocket = usrsctp_socket(
- aUsingDtls ? AF_CONN : AF_INET,
- SOCK_STREAM, IPPROTO_SCTP, receive_cb, threshold_event,
+ AF_CONN, SOCK_STREAM, IPPROTO_SCTP, receive_cb, threshold_event,
usrsctp_sysctl_get_sctp_sendspace() / 2, this)) == nullptr) {
return false;
}
// Make non-blocking for bind/connect. SCTP over UDP defaults to non-blocking
// in associations for normal IO
if (usrsctp_set_non_blocking(mMasterSocket, 1) < 0) {
LOG(("Couldn't set non_blocking on SCTP socket"));
@@ -399,40 +424,49 @@ DataChannelConnection::Init(unsigned sho
// unsafe to allow it to continue if this fails
goto error_cleanup;
}
// XXX Consider disabling this when we add proper SDP negotiation.
// We may want to leave enabled for supporting 'cloning' of SDP offers, which
// implies re-use of the same pseudo-port number, or forcing a renegotiation.
{
- uint32_t on = 1;
+ const int option_value = 1;
if (usrsctp_setsockopt(mMasterSocket, IPPROTO_SCTP, SCTP_REUSE_PORT,
- (const void *)&on, (socklen_t)sizeof(on)) < 0) {
+ (const void *)&option_value, (socklen_t)sizeof(option_value)) < 0) {
LOG(("Couldn't set SCTP_REUSE_PORT on SCTP socket"));
}
if (usrsctp_setsockopt(mMasterSocket, IPPROTO_SCTP, SCTP_NODELAY,
- (const void *)&on, (socklen_t)sizeof(on)) < 0) {
+ (const void *)&option_value, (socklen_t)sizeof(option_value)) < 0) {
LOG(("Couldn't set SCTP_NODELAY on SCTP socket"));
}
}
- if (!aUsingDtls) {
- memset(&encaps, 0, sizeof(encaps));
- encaps.sue_address.ss_family = AF_INET;
- encaps.sue_port = htons(aPort);
- if (usrsctp_setsockopt(mMasterSocket, IPPROTO_SCTP, SCTP_REMOTE_UDP_ENCAPS_PORT,
- (const void*)&encaps,
- (socklen_t)sizeof(struct sctp_udpencaps)) < 0) {
- LOG(("*** failed encaps errno %d", errno));
+ // Set explicit EOR
+ {
+ const int option_value = 1;
+ if (usrsctp_setsockopt(mMasterSocket, IPPROTO_SCTP, SCTP_EXPLICIT_EOR,
+ (const void *)&option_value, (socklen_t)sizeof(option_value)) < 0) {
+ LOG(("*** failed enable explicit EOR mode %d", errno));
goto error_cleanup;
}
- LOG(("SCTP encapsulation local port %d", aPort));
}
+ // Enable ndata
+ // TODO: Bug 1381145, enable this once ndata has been deployed
+#if 0
+ av.assoc_id = SCTP_FUTURE_ASSOC;
+ av.assoc_value = 1;
+ if (usrsctp_setsockopt(mMasterSocket, IPPROTO_SCTP, SCTP_INTERLEAVING_SUPPORTED, &av,
+ (socklen_t)sizeof(struct sctp_assoc_value)) < 0) {
+ LOG(("*** failed enable ndata errno %d", errno));
+ goto error_cleanup;
+ }
+#endif
+
av.assoc_id = SCTP_ALL_ASSOC;
av.assoc_value = SCTP_ENABLE_RESET_STREAM_REQ | SCTP_ENABLE_CHANGE_ASSOC_REQ;
if (usrsctp_setsockopt(mMasterSocket, IPPROTO_SCTP, SCTP_ENABLE_STREAM_RESET, &av,
(socklen_t)sizeof(struct sctp_assoc_value)) < 0) {
LOG(("*** failed enable stream reset errno %d", errno));
goto error_cleanup;
}
@@ -465,32 +499,82 @@ DataChannelConnection::Init(unsigned sho
initmsg.sinit_max_instreams = MAX_NUM_STREAMS;
if (usrsctp_setsockopt(mMasterSocket, IPPROTO_SCTP, SCTP_INITMSG, &initmsg,
(socklen_t)sizeof(initmsg)) < 0) {
LOG(("*** failed setsockopt SCTP_INITMSG, errno %d", errno));
goto error_cleanup;
}
mSocket = nullptr;
- if (aUsingDtls) {
- mUsingDtls = true;
- usrsctp_register_address(static_cast<void *>(this));
- LOG(("Registered %p within the SCTP stack.", static_cast<void *>(this)));
- } else {
- mUsingDtls = false;
- }
+ usrsctp_register_address(static_cast<void *>(this));
+ LOG(("Registered %p within the SCTP stack.", static_cast<void *>(this)));
return true;
error_cleanup:
usrsctp_close(mMasterSocket);
mMasterSocket = nullptr;
- mUsingDtls = false;
return false;
}
+void
+DataChannelConnection::SetMaxMessageSize(bool aMaxMessageSizeSet, uint64_t aMaxMessageSize)
+{
+ MutexAutoLock lock(mLock); // TODO: Needed?
+
+ mMaxMessageSizeSet = aMaxMessageSizeSet;
+ mMaxMessageSize = aMaxMessageSize;
+
+ bool ppidFragmentationEnforced = false;
+ nsresult rv;
+ nsCOMPtr<nsIPrefService> prefs = do_GetService("@mozilla.org/preferences-service;1", &rv);
+ if (!NS_WARN_IF(NS_FAILED(rv))) {
+ nsCOMPtr<nsIPrefBranch> branch = do_QueryInterface(prefs);
+
+ if (branch) {
+ if (!NS_FAILED(branch->GetBoolPref(
+ "media.peerconnection.sctp.force_ppid_fragmentation", &mPpidFragmentation))) {
+ // Ensure that forced on/off PPID fragmentation does not get overridden when Firefox has
+ // been detected.
+ mMaxMessageSizeSet = true;
+ ppidFragmentationEnforced = true;
+ }
+
+ int32_t temp;
+ if (!NS_FAILED(branch->GetIntPref(
+ "media.peerconnection.sctp.force_maximum_message_size", &temp))) {
+ if (temp >= 0) {
+ mMaxMessageSize = (uint64_t)temp;
+ }
+ }
+ }
+ }
+
+ // Fix remote MMS. This code exists, so future implementations of RTCSctpTransport.maxMessageSize
+ // can simply provide that value from GetMaxMessageSize.
+
+ // TODO: Bug 1382779, once resolved, can be increased to min(Uint8ArrayMaxSize, UINT32_MAX)
+ // TODO: Bug 1381146, once resolved, can be increased to whatever we support then (hopefully
+ // SIZE_MAX)
+ if (mMaxMessageSize == 0 || mMaxMessageSize > WEBRTC_DATACHANNEL_MAX_MESSAGE_SIZE_REMOTE) {
+ mMaxMessageSize = WEBRTC_DATACHANNEL_MAX_MESSAGE_SIZE_REMOTE;
+ }
+
+ LOG(("Use PPID-based fragmentation/reassembly: %s (enforced=%s)",
+ mPpidFragmentation ? "yes" : "no", ppidFragmentationEnforced ? "yes" : "no"));
+ LOG(("Maximum message size (outgoing data): %" PRIu64 " (set=%s, enforced=%s)",
+ mMaxMessageSize, mMaxMessageSizeSet ? "yes" : "no",
+ aMaxMessageSize != mMaxMessageSize ? "yes" : "no"));
+}
+
+uint64_t
+DataChannelConnection::GetMaxMessageSize()
+{
+ return mMaxMessageSize;
+}
+
#ifdef MOZ_PEERCONNECTION
void
DataChannelConnection::SetEvenOdd()
{
ASSERT_WEBRTC(IsSTSThread());
TransportLayerDtls *dtls = static_cast<TransportLayerDtls *>(
mTransportFlow->GetLayer(TransportLayerDtls::ID()));
@@ -499,17 +583,19 @@ DataChannelConnection::SetEvenOdd()
}
bool
DataChannelConnection::ConnectViaTransportFlow(TransportFlow *aFlow, uint16_t localport, uint16_t remoteport)
{
LOG(("Connect DTLS local %u, remote %u", localport, remoteport));
NS_PRECONDITION(mMasterSocket, "SCTP wasn't initialized before ConnectViaTransportFlow!");
- NS_ENSURE_TRUE(aFlow, false);
+ if (NS_WARN_IF(!aFlow)) {
+ return false;
+ }
mTransportFlow = aFlow;
mLocalPort = localport;
mRemotePort = remoteport;
mState = CONNECTING;
RUN_ON_THREAD(mSTS, WrapRunnable(RefPtr<DataChannelConnection>(this),
&DataChannelConnection::SetSignals),
@@ -874,16 +960,39 @@ DataChannelConnection::FindFreeStream()
}
}
if (i >= limit) {
return INVALID_STREAM;
}
return i;
}
+uint32_t
+DataChannelConnection::UpdateCurrentStreamIndex()
+{
+ if (mCurrentStream == mStreams.Length() - 1) {
+ mCurrentStream = 0;
+ } else {
+ ++mCurrentStream;
+ }
+
+ return mCurrentStream;
+}
+
+uint32_t
+DataChannelConnection::GetCurrentStreamIndex()
+{
+ // Fix current stream index (in case #streams decreased)
+ if (mCurrentStream >= mStreams.Length()) {
+ mCurrentStream = 0;
+ }
+
+ return mCurrentStream;
+}
+
bool
DataChannelConnection::RequestMoreStreams(int32_t aNeeded)
{
struct sctp_status status;
struct sctp_add_streams sas;
uint32_t outStreamsNeeded;
socklen_t len;
@@ -918,46 +1027,62 @@ DataChannelConnection::RequestMoreStream
return false;
}
LOG(("Requested %u more streams", outStreamsNeeded));
// We add to mStreams when we get a SCTP_STREAM_CHANGE_EVENT and the
// values are larger than mStreams.Length()
return true;
}
-int32_t
-DataChannelConnection::SendControlMessage(void *msg, uint32_t len, uint16_t stream)
+// Returns a POSIX error code.
+int
+DataChannelConnection::SendControlMessage(const uint8_t *data, uint32_t len, uint16_t stream)
{
- struct sctp_sndinfo sndinfo;
-
+ struct sctp_sendv_spa info = {0};
+
+ // General flags
+ info.sendv_flags = SCTP_SEND_SNDINFO_VALID;
+
+ // Set stream identifier, protocol identifier and flags
+ info.sendv_sndinfo.snd_sid = stream;
+ info.sendv_sndinfo.snd_flags = SCTP_EOR;
+ info.sendv_sndinfo.snd_ppid = htonl(DATA_CHANNEL_PPID_CONTROL);
+
+ // Create message instance and send
// Note: Main-thread IO, but doesn't block
- memset(&sndinfo, 0, sizeof(struct sctp_sndinfo));
- sndinfo.snd_sid = stream;
- sndinfo.snd_ppid = htonl(DATA_CHANNEL_PPID_CONTROL);
- if (usrsctp_sendv(mSocket, msg, len, nullptr, 0,
- &sndinfo, (socklen_t)sizeof(struct sctp_sndinfo),
- SCTP_SENDV_SNDINFO, 0) < 0) {
- //LOG(("***failed: sctp_sendv")); don't log because errno is a return!
- return (0);
+#if (UINT32_MAX > SIZE_MAX)
+ if (len > SIZE_MAX) {
+ return EMSGSIZE;
}
- return (1);
+#endif
+ OutgoingMsg msg(info, data, (size_t)len);
+ bool buffered;
+ int error = SendMsgInternalOrBuffer(mBufferedControl, msg, buffered);
+
+ // Set pending type (if buffered)
+ if (!error && buffered && !mPendingType) {
+ mPendingType = PENDING_DCEP;
+ }
+ return error;
}
-int32_t
+// Returns a POSIX error code.
+int
DataChannelConnection::SendOpenAckMessage(uint16_t stream)
{
struct rtcweb_datachannel_ack ack;
memset(&ack, 0, sizeof(struct rtcweb_datachannel_ack));
ack.msg_type = DATA_CHANNEL_ACK;
- return SendControlMessage(&ack, sizeof(ack), stream);
+ return SendControlMessage((const uint8_t *)&ack, sizeof(ack), stream);
}
-int32_t
+// Returns a POSIX error code.
+int
DataChannelConnection::SendOpenRequestMessage(const nsACString& label,
const nsACString& protocol,
uint16_t stream, bool unordered,
uint16_t prPolicy, uint32_t prValue)
{
const int label_len = label.Length(); // not including nul
const int proto_len = protocol.Length(); // not including nul
// careful - request struct include one char for the label
@@ -974,203 +1099,186 @@ DataChannelConnection::SendOpenRequestMe
break;
case SCTP_PR_SCTP_TTL:
req->channel_type = DATA_CHANNEL_PARTIAL_RELIABLE_TIMED;
break;
case SCTP_PR_SCTP_RTX:
req->channel_type = DATA_CHANNEL_PARTIAL_RELIABLE_REXMIT;
break;
default:
- // FIX! need to set errno! Or make all these SendXxxx() funcs return 0 or errno!
free(req);
- return (0);
+ return EINVAL;
}
if (unordered) {
// Per the current types, all differ by 0x80 between ordered and unordered
req->channel_type |= 0x80; // NOTE: be careful if new types are added in the future
}
req->reliability_param = htonl(prValue);
req->priority = htons(0); /* XXX: add support */
req->label_length = htons(label_len);
req->protocol_length = htons(proto_len);
memcpy(&req->label[0], PromiseFlatCString(label).get(), label_len);
memcpy(&req->label[label_len], PromiseFlatCString(protocol).get(), proto_len);
- int32_t result = SendControlMessage(req, req_size, stream);
+ // TODO: req_size is an int... that looks hairy
+ int error = SendControlMessage((const uint8_t *)req, req_size, stream);
free(req);
- return result;
+ return error;
}
// XXX This should use a separate thread (outbound queue) which should
// select() to know when to *try* to send data to the socket again.
// Alternatively, it can use a timeout, but that's guaranteed to be wrong
// (just not sure in what direction). We could re-implement NSPR's
// PR_POLL_WRITE/etc handling... with a lot of work.
// Better yet, use the SCTP stack's notifications on buffer state to avoid
// filling the SCTP's buffers.
-// returns if we're still blocked or not
+// returns if we're still blocked (true)
bool
DataChannelConnection::SendDeferredMessages()
{
- uint32_t i;
RefPtr<DataChannel> channel; // we may null out the refs to this
- bool still_blocked = false;
// This may block while something is modifying channels, but should not block for IO
MutexAutoLock lock(mLock);
- // XXX For total fairness, on a still_blocked we'd start next time at the
- // same index. Sorry, not going to bother for now.
- for (i = 0; i < mStreams.Length(); ++i) {
+ LOG(("SendDeferredMessages called, pending type: %d", mPendingType));
+ if (!mPendingType) {
+ return false;
+ }
+
+ // Send pending control messages
+ // Note: If ndata is not active, check if DCEP messages are currently outstanding. These need to
+ // be sent first before other streams can be used for sending.
+ if (!mBufferedControl.IsEmpty() && (mSendInterleaved || mPendingType == PENDING_DCEP)) {
+ if (SendBufferedMessages(mBufferedControl)) {
+ return true;
+ }
+
+ // Note: There may or may not be pending data messages
+ mPendingType = PENDING_DATA;
+ }
+
+ bool blocked = false;
+ uint32_t i = GetCurrentStreamIndex();
+ uint32_t end = i;
+ do {
channel = mStreams[i];
- if (!channel)
+ if (!channel || channel->mBufferedData.IsEmpty()) {
+ i = UpdateCurrentStreamIndex();
continue;
-
- // Only one of these should be set....
- if (channel->mFlags & DATA_CHANNEL_FLAGS_SEND_REQ) {
- if (SendOpenRequestMessage(channel->mLabel, channel->mProtocol,
- channel->mStream,
- channel->mFlags & DATA_CHANNEL_FLAGS_OUT_OF_ORDER_ALLOWED,
- channel->mPrPolicy, channel->mPrValue)) {
- channel->mFlags &= ~DATA_CHANNEL_FLAGS_SEND_REQ;
-
- channel->mState = OPEN;
- channel->mReady = true;
- LOG(("%s: sending ON_CHANNEL_OPEN for %p", __FUNCTION__, channel.get()));
- Dispatch(do_AddRef(new DataChannelOnMessageAvailable(
- DataChannelOnMessageAvailable::ON_CHANNEL_OPEN, this,
- channel)));
- } else {
- if (errno == EAGAIN || errno == EWOULDBLOCK) {
- still_blocked = true;
- } else {
- // Close the channel, inform the user
- mStreams[channel->mStream] = nullptr;
- channel->mState = CLOSED;
- // Don't need to reset; we didn't open it
- Dispatch(do_AddRef(new DataChannelOnMessageAvailable(
- DataChannelOnMessageAvailable::ON_CHANNEL_CLOSED, this,
- channel)));
- }
- }
+ }
+
+ // Clear if closing/closed
+ if (channel->mState == CLOSED || channel->mState == CLOSING) {
+ channel->mBufferedData.Clear();
+ i = UpdateCurrentStreamIndex();
+ continue;
}
- if (still_blocked)
- break;
-
- if (channel->mFlags & DATA_CHANNEL_FLAGS_SEND_ACK) {
- if (SendOpenAckMessage(channel->mStream)) {
- channel->mFlags &= ~DATA_CHANNEL_FLAGS_SEND_ACK;
- } else {
- if (errno == EAGAIN || errno == EWOULDBLOCK) {
- still_blocked = true;
- } else {
- // Close the channel, inform the user
- CloseInt(channel);
- // XXX send error via DataChannelOnMessageAvailable (bug 843625)
- }
- }
+
+ size_t bufferedAmount = channel->GetBufferedAmountLocked();
+ size_t threshold = channel->mBufferedThreshold;
+ bool wasOverThreshold = bufferedAmount >= threshold;
+
+ // Send buffered data messages
+ // Warning: This will fail in case ndata is inactive and a previously deallocated data channel
+ // has not been closed properly. If you ever see that no messages can be sent on any
+ // channel, this is likely the cause (an explicit EOR message partially sent whose
+ // remaining chunks are still being waited for).
+ blocked = SendBufferedMessages(channel->mBufferedData);
+ bufferedAmount = channel->GetBufferedAmountLocked();
+
+ // can never fire with default threshold of 0
+ if (wasOverThreshold && bufferedAmount < threshold) {
+ LOG(("%s: sending BUFFER_LOW_THRESHOLD for %s/%s: %u", __FUNCTION__,
+ channel->mLabel.get(), channel->mProtocol.get(), channel->mStream));
+ Dispatch(do_AddRef(new DataChannelOnMessageAvailable(
+ DataChannelOnMessageAvailable::BUFFER_LOW_THRESHOLD,
+ this, channel)));
}
- if (still_blocked)
- break;
-
- if (channel->mFlags & DATA_CHANNEL_FLAGS_SEND_DATA) {
- bool failed_send = false;
- int32_t result;
-
- if (channel->mState == CLOSED || channel->mState == CLOSING) {
- channel->mBufferedData.Clear();
- }
-
- uint32_t buffered_amount = channel->GetBufferedAmountLocked();
- uint32_t threshold = channel->GetBufferedAmountLowThreshold();
- bool was_over_threshold = buffered_amount >= threshold;
-
- while (!channel->mBufferedData.IsEmpty() &&
- !failed_send) {
- struct sctp_sendv_spa *spa = channel->mBufferedData[0]->mSpa;
- const char *data = channel->mBufferedData[0]->mData;
- size_t len = channel->mBufferedData[0]->mLength;
-
- // SCTP will return EMSGSIZE if the message is bigger than the buffer
- // size (or EAGAIN if there isn't space)
- if ((result = usrsctp_sendv(mSocket, data, len,
- nullptr, 0,
- (void *)spa, (socklen_t)sizeof(struct sctp_sendv_spa),
- SCTP_SENDV_SPA,
- 0)) < 0) {
- if (errno == EAGAIN || errno == EWOULDBLOCK) {
- // leave queued for resend
- failed_send = true;
- LOG(("queue full again when resending %zu bytes (%d)", len, result));
- } else {
- LOG(("error %d re-sending string", errno));
- failed_send = true;
- }
- } else {
- LOG(("Resent buffer of %zu bytes (%d)", len, result));
- // In theory this could underflow if >4GB was buffered and re
- // truncated in GetBufferedAmount(), but this won't cause any problems.
- buffered_amount -= channel->mBufferedData[0]->mLength;
- channel->mBufferedData.RemoveElementAt(0);
- // can never fire with default threshold of 0
- if (was_over_threshold && buffered_amount < threshold) {
- LOG(("%s: sending BUFFER_LOW_THRESHOLD for %s/%s: %u", __FUNCTION__,
- channel->mLabel.get(), channel->mProtocol.get(), channel->mStream));
- Dispatch(do_AddRef(new DataChannelOnMessageAvailable(
- DataChannelOnMessageAvailable::BUFFER_LOW_THRESHOLD,
- this, channel)));
- was_over_threshold = false;
- }
- if (buffered_amount == 0) {
- // buffered-to-not-buffered transition; tell the DOM code in case this makes it
- // available for GC
- LOG(("%s: sending NO_LONGER_BUFFERED for %s/%s: %u", __FUNCTION__,
- channel->mLabel.get(), channel->mProtocol.get(), channel->mStream));
- Dispatch(do_AddRef(new DataChannelOnMessageAvailable(
- DataChannelOnMessageAvailable::NO_LONGER_BUFFERED,
- this, channel)));
- }
- }
- }
- if (channel->mBufferedData.IsEmpty())
- channel->mFlags &= ~DATA_CHANNEL_FLAGS_SEND_DATA;
- else
- still_blocked = true;
+
+ if (bufferedAmount == 0) {
+ // buffered-to-not-buffered transition; tell the DOM code in case this makes it
+ // available for GC
+ LOG(("%s: sending NO_LONGER_BUFFERED for %s/%s: %u", __FUNCTION__,
+ channel->mLabel.get(), channel->mProtocol.get(), channel->mStream));
+ Dispatch(do_AddRef(new DataChannelOnMessageAvailable(
+ DataChannelOnMessageAvailable::NO_LONGER_BUFFERED,
+ this, channel)));
+ }
+
+ // Update current stream index
+ // Note: If ndata is not active, the outstanding data messages on this stream need to be sent
+ // first before other streams can be used for sending.
+ if (mSendInterleaved || !blocked) {
+ i = UpdateCurrentStreamIndex();
}
- if (still_blocked)
- break;
+ } while (!blocked && i != end);
+
+ if (!blocked) {
+ mPendingType = mBufferedControl.IsEmpty() ? PENDING_NONE : PENDING_DCEP;
}
-
- return still_blocked;
+ return blocked;
}
+
+// Called with mLock locked!
+// buffer MUST have at least one item!
+// returns if we're still blocked (true)
+bool
+DataChannelConnection::SendBufferedMessages(nsTArray<nsAutoPtr<BufferedOutgoingMsg>> &buffer)
+{
+ do {
+ // Re-send message
+ int error = SendMsgInternal(*buffer[0]);
+ switch (error) {
+ case 0:
+ buffer.RemoveElementAt(0);
+ break;
+ case EAGAIN:
+#if (EAGAIN != EWOULDBLOCK)
+ case EWOULDBLOCK:
+#endif
+ return true;
+ default:
+ buffer.RemoveElementAt(0);
+ LOG(("error on sending: %d", error));
+ break;
+ }
+ } while (!buffer.IsEmpty());
+
+ return false;
+}
+
+// Caller must ensure that length <= SIZE_MAX
void
DataChannelConnection::HandleOpenRequestMessage(const struct rtcweb_datachannel_open_request *req,
- size_t length,
- uint16_t stream)
+ uint32_t length, uint16_t stream)
{
RefPtr<DataChannel> channel;
uint32_t prValue;
uint16_t prPolicy;
uint32_t flags;
mLock.AssertCurrentThreadOwns();
- if (length != (sizeof(*req) - 1) + ntohs(req->label_length) + ntohs(req->protocol_length)) {
- LOG(("%s: Inconsistent length: %zu, should be %zu", __FUNCTION__, length,
- (sizeof(*req) - 1) + ntohs(req->label_length) + ntohs(req->protocol_length)));
- if (length < (sizeof(*req) - 1) + ntohs(req->label_length) + ntohs(req->protocol_length))
+ const size_t requiredLength =
+ (sizeof(*req) - 1) + ntohs(req->label_length) + ntohs(req->protocol_length);
+ if (((size_t)length) != requiredLength) {
+ LOG(("%s: Inconsistent length: %u, should be %zu",
+ __FUNCTION__, length, requiredLength));
+ if (((size_t)length) < requiredLength)
return;
}
- LOG(("%s: length %zu, sizeof(*req) = %zu", __FUNCTION__, length, sizeof(*req)));
+ LOG(("%s: length %u, sizeof(*req) = %zu", __FUNCTION__, length, sizeof(*req)));
switch (req->channel_type) {
case DATA_CHANNEL_RELIABLE:
case DATA_CHANNEL_RELIABLE_UNORDERED:
prPolicy = SCTP_PR_SCTP_NONE;
break;
case DATA_CHANNEL_PARTIAL_RELIABLE_REXMIT:
case DATA_CHANNEL_PARTIAL_RELIABLE_REXMIT_UNORDERED:
@@ -1232,21 +1340,23 @@ DataChannelConnection::HandleOpenRequest
LOG(("%s: sending ON_CHANNEL_CREATED for %s/%s: %u (state %u)", __FUNCTION__,
channel->mLabel.get(), channel->mProtocol.get(), stream, channel->mState));
Dispatch(do_AddRef(new DataChannelOnMessageAvailable(
DataChannelOnMessageAvailable::ON_CHANNEL_CREATED,
this, channel)));
LOG(("%s: deferring sending ON_CHANNEL_OPEN for %p", __FUNCTION__, channel.get()));
- if (!SendOpenAckMessage(stream)) {
- // XXX Only on EAGAIN!? And if not, then close the channel??
- channel->mFlags |= DATA_CHANNEL_FLAGS_SEND_ACK;
- // Note: we're locked, so there's no danger of a race with the
- // buffer-threshold callback
+ int error = SendOpenAckMessage(stream);
+ if (error) {
+ LOG(("SendOpenRequest failed, error = %d", error));
+ // Close the channel, inform the user
+ CloseInt(channel);
+ // XXX send error via DataChannelOnMessageAvailable (bug 843625)
+ return;
}
// Now process any queued data messages for the channel (which will
// themselves likely get queued until we leave WAITING_TO_OPEN, plus any
// more that come in before that happens)
DeliverQueuedData(stream);
}
@@ -1257,214 +1367,343 @@ DataChannelConnection::DeliverQueuedData
{
mLock.AssertCurrentThreadOwns();
uint32_t i = 0;
while (i < mQueuedData.Length()) {
// Careful! we may modify the array length from within the loop!
if (mQueuedData[i]->mStream == stream) {
LOG(("Delivering queued data for stream %u, length %u",
- stream, (unsigned int) mQueuedData[i]->mLength));
+ stream, mQueuedData[i]->mLength));
// Deliver the queued data
- HandleDataMessage(mQueuedData[i]->mPpid,
- mQueuedData[i]->mData, mQueuedData[i]->mLength,
- mQueuedData[i]->mStream);
+ HandleDataMessage(mQueuedData[i]->mData, mQueuedData[i]->mLength,
+ mQueuedData[i]->mPpid, mQueuedData[i]->mStream,
+ mQueuedData[i]->mFlags);
mQueuedData.RemoveElementAt(i);
continue; // don't bump index since we removed the element
}
i++;
}
}
+// Caller must ensure that length <= SIZE_MAX
void
DataChannelConnection::HandleOpenAckMessage(const struct rtcweb_datachannel_ack *ack,
- size_t length, uint16_t stream)
+ uint32_t length, uint16_t stream)
{
DataChannel *channel;
mLock.AssertCurrentThreadOwns();
channel = FindChannelByStream(stream);
- NS_ENSURE_TRUE_VOID(channel);
+ if (NS_WARN_IF(!channel)) {
+ return;
+ }
LOG(("OpenAck received for stream %u, waiting=%d", stream,
(channel->mFlags & DATA_CHANNEL_FLAGS_WAITING_ACK) ? 1 : 0));
channel->mFlags &= ~DATA_CHANNEL_FLAGS_WAITING_ACK;
}
+// Caller must ensure that length <= SIZE_MAX
void
-DataChannelConnection::HandleUnknownMessage(uint32_t ppid, size_t length, uint16_t stream)
+DataChannelConnection::HandleUnknownMessage(uint32_t ppid, uint32_t length, uint16_t stream)
{
/* XXX: Send an error message? */
- LOG(("unknown DataChannel message received: %u, len %zu on stream %d", ppid, length, stream));
+ LOG(("unknown DataChannel message received: %u, len %u on stream %d", ppid, length, stream));
// XXX Log to JS error console if possible
}
+uint8_t
+DataChannelConnection::BufferMessage(nsACString& recvBuffer, const void *data,
+ uint32_t length, uint32_t ppid, int flags)
+{
+ const char *buffer = (const char *) data;
+ uint8_t bufferFlags = 0;
+
+ if ((flags & MSG_EOR) &&
+ ppid != DATA_CHANNEL_PPID_BINARY_PARTIAL &&
+ ppid != DATA_CHANNEL_PPID_DOMSTRING_PARTIAL) {
+ bufferFlags |= DATA_CHANNEL_BUFFER_MESSAGE_FLAGS_COMPLETE;
+
+ // Return directly if nothing has been buffered
+ if (recvBuffer.IsEmpty()) {
+ return bufferFlags;
+ }
+ }
+
+ // Ensure it doesn't blow up our buffer
+ // TODO: Change 'WEBRTC_DATACHANNEL_MAX_MESSAGE_SIZE_LOCAL' to whatever the new buffer is capable
+ // of holding.
+ if (((uint64_t) recvBuffer.Length()) + ((uint64_t) length) > WEBRTC_DATACHANNEL_MAX_MESSAGE_SIZE_LOCAL) {
+ bufferFlags |= DATA_CHANNEL_BUFFER_MESSAGE_FLAGS_TOO_LARGE;
+ return bufferFlags;
+ }
+
+ // Copy & add to receive buffer
+ recvBuffer.Append(buffer, length);
+ bufferFlags |= DATA_CHANNEL_BUFFER_MESSAGE_FLAGS_BUFFERED;
+ return bufferFlags;
+}
+
void
-DataChannelConnection::HandleDataMessage(uint32_t ppid,
- const void *data, size_t length,
- uint16_t stream)
+DataChannelConnection::HandleDataMessage(const void *data, size_t length, uint32_t ppid,
+ uint16_t stream, int flags)
{
DataChannel *channel;
const char *buffer = (const char *) data;
mLock.AssertCurrentThreadOwns();
-
channel = FindChannelByStream(stream);
+ // Note: Until we support SIZE_MAX sized messages, we need this check
+#if (SIZE_MAX > UINT32_MAX)
+ if (length > UINT32_MAX) {
+ LOG(("DataChannel: Cannot handle message of size %zu (max=%" PRIu32 ")",
+ length, UINT32_MAX));
+ CloseInt(channel);
+ return;
+ }
+#endif
+ uint32_t data_length = (uint32_t)length;
+
// XXX A closed channel may trip this... check
// NOTE: the updated spec from the IETF says we should set in-order until we receive an ACK.
// That would make this code moot. Keep it for now for backwards compatibility.
if (!channel) {
// In the updated 0-RTT open case, the sender can send data immediately
// after Open, and doesn't set the in-order bit (since we don't have a
// response or ack). Also, with external negotiation, data can come in
// before we're told about the external negotiation. We need to buffer
// data until either a) Open comes in, if the ordering get messed up,
// or b) the app tells us this channel was externally negotiated. When
// these occur, we deliver the data.
// Since this is rare and non-performance, keep a single list of queued
// data messages to deliver once the channel opens.
- LOG(("Queuing data for stream %u, length %zu", stream, length));
+ LOG(("Queuing data for stream %u, length %u", stream, data_length));
// Copies data
- mQueuedData.AppendElement(new QueuedDataMessage(stream, ppid, data, length));
+ mQueuedData.AppendElement(new QueuedDataMessage(stream, ppid, flags, data, data_length));
+ return;
+ }
+
+ // Ignore incoming data in case the channel is closed
+ if (channel->mState == CLOSED) {
+ return;
+ }
+
+ bool is_binary = true;
+ uint8_t bufferFlags;
+ int32_t type;
+ const char* info = "";
+
+ if (ppid == DATA_CHANNEL_PPID_DOMSTRING_PARTIAL ||
+ ppid == DATA_CHANNEL_PPID_DOMSTRING) {
+ is_binary = false;
+ }
+ if (is_binary != channel->mIsRecvBinary && !channel->mRecvBuffer.IsEmpty()) {
+ NS_WARNING("DataChannel message aborted by fragment type change!");
+ // TODO: Maybe closing would be better as this is a hard to detect protocol violation?
+ channel->mRecvBuffer.Truncate(0);
+ }
+ channel->mIsRecvBinary = is_binary;
+
+ // Remaining chunks of previously truncated message (due to the buffer being full)?
+ if (channel->mFlags & DATA_CHANNEL_FLAGS_CLOSING_TOO_LARGE) {
+ LOG(("DataChannel: Ignoring partial message of length %u, buffer full and closing",
+ data_length));
+ // Only unblock if unordered
+ if ((channel->mFlags & DATA_CHANNEL_FLAGS_OUT_OF_ORDER_ALLOWED) && (flags & MSG_EOR)) {
+ channel->mFlags &= ~DATA_CHANNEL_FLAGS_CLOSING_TOO_LARGE;
+ }
+ }
+
+ // Buffer message until complete
+ bufferFlags = BufferMessage(channel->mRecvBuffer, buffer, data_length, ppid, flags);
+ if (bufferFlags & DATA_CHANNEL_BUFFER_MESSAGE_FLAGS_TOO_LARGE) {
+ LOG(("DataChannel: Buffered message would become too large to handle, closing channel"));
+ channel->mRecvBuffer.Truncate(0);
+ channel->mFlags |= DATA_CHANNEL_FLAGS_CLOSING_TOO_LARGE;
+ CloseInt(channel);
return;
}
-
- // XXX should this be a simple if, no warnings/debugbreaks?
- NS_ENSURE_TRUE_VOID(channel->mState != CLOSED);
-
- {
- nsAutoCString recvData(buffer, length); // copies (<64) or allocates
- bool is_binary = true;
-
- if (ppid == DATA_CHANNEL_PPID_DOMSTRING ||
- ppid == DATA_CHANNEL_PPID_DOMSTRING_LAST) {
- is_binary = false;
- }
- if (is_binary != channel->mIsRecvBinary && !channel->mRecvBuffer.IsEmpty()) {
- NS_WARNING("DataChannel message aborted by fragment type change!");
- channel->mRecvBuffer.Truncate(0);
- }
- channel->mIsRecvBinary = is_binary;
-
- switch (ppid) {
- case DATA_CHANNEL_PPID_DOMSTRING:
- case DATA_CHANNEL_PPID_BINARY:
- channel->mRecvBuffer += recvData;
- LOG(("DataChannel: Partial %s message of length %zu (total %u) on channel id %u",
- is_binary ? "binary" : "string", length, channel->mRecvBuffer.Length(),
- channel->mStream));
- return; // Not ready to notify application
-
- case DATA_CHANNEL_PPID_DOMSTRING_LAST:
- LOG(("DataChannel: String message received of length %zu on channel %u",
- length, channel->mStream));
- if (!channel->mRecvBuffer.IsEmpty()) {
- channel->mRecvBuffer += recvData;
- LOG(("%s: sending ON_DATA (string fragmented) for %p", __FUNCTION__, channel));
- channel->SendOrQueue(new DataChannelOnMessageAvailable(
- DataChannelOnMessageAvailable::ON_DATA, this,
- channel, channel->mRecvBuffer, -1));
- channel->mRecvBuffer.Truncate(0);
- return;
- }
- // else send using recvData normally
- length = -1; // Flag for DOMString
-
- // WebSockets checks IsUTF8() here; we can try to deliver it
- break;
-
- case DATA_CHANNEL_PPID_BINARY_LAST:
- LOG(("DataChannel: Received binary message of length %zu on channel id %u",
- length, channel->mStream));
- if (!channel->mRecvBuffer.IsEmpty()) {
- channel->mRecvBuffer += recvData;
- LOG(("%s: sending ON_DATA (binary fragmented) for %p", __FUNCTION__, channel));
- channel->SendOrQueue(new DataChannelOnMessageAvailable(
- DataChannelOnMessageAvailable::ON_DATA, this,
- channel, channel->mRecvBuffer,
- channel->mRecvBuffer.Length()));
- channel->mRecvBuffer.Truncate(0);
- return;
- }
- // else send using recvData normally
- break;
-
- default:
- NS_ERROR("Unknown data PPID");
+ if (!(bufferFlags & DATA_CHANNEL_BUFFER_MESSAGE_FLAGS_COMPLETE)) {
+ LOG(("DataChannel: Partial %s message of length %u (total %u) on channel id %u",
+ is_binary ? "binary" : "string", data_length, channel->mRecvBuffer.Length(),
+ channel->mStream));
+ return; // Not ready to notify application
+ }
+ if (bufferFlags & DATA_CHANNEL_BUFFER_MESSAGE_FLAGS_BUFFERED) {
+ data_length = channel->mRecvBuffer.Length();
+ }
+
+ // Complain about large messages (only complain - we can handle it)
+ if (data_length > WEBRTC_DATACHANNEL_MAX_MESSAGE_SIZE_LOCAL) {
+ LOG(("DataChannel: Received message of length %u is > announced maximum message size (%u)",
+ data_length, WEBRTC_DATACHANNEL_MAX_MESSAGE_SIZE_LOCAL));
+ }
+
+ switch (ppid) {
+ case DATA_CHANNEL_PPID_DOMSTRING:
+ LOG(("DataChannel: Received string message of length %u on channel %u",
+ data_length, channel->mStream));
+ type = DataChannelOnMessageAvailable::ON_DATA_STRING;
+ if (bufferFlags & DATA_CHANNEL_BUFFER_MESSAGE_FLAGS_BUFFERED) {
+ info = " (string fragmented)";
+ }
+ // else send using recvData normally
+
+ // WebSockets checks IsUTF8() here; we can try to deliver it
+ break;
+
+ case DATA_CHANNEL_PPID_BINARY:
+ LOG(("DataChannel: Received binary message of length %u on channel id %u",
+ data_length, channel->mStream));
+ type = DataChannelOnMessageAvailable::ON_DATA_BINARY;
+ if (bufferFlags & DATA_CHANNEL_BUFFER_MESSAGE_FLAGS_BUFFERED) {
+ info = " (binary fragmented)";
+ }
+
+ // else send using recvData normally
+ break;
+
+ default:
+ NS_ERROR("Unknown data PPID");
+ return;
+ }
+
+ // Notify onmessage
+ LOG(("%s: sending ON_DATA_%s%s for %p", __FUNCTION__,
+ (type == DataChannelOnMessageAvailable::ON_DATA_STRING) ? "STRING" : "BINARY",
+ info, channel));
+ if (bufferFlags & DATA_CHANNEL_BUFFER_MESSAGE_FLAGS_BUFFERED) {
+ channel->SendOrQueue(new DataChannelOnMessageAvailable(
+ type, this, channel, channel->mRecvBuffer));
+ channel->mRecvBuffer.Truncate(0);
+ } else {
+ nsAutoCString recvData(buffer, data_length); // copies (<64) or allocates
+ channel->SendOrQueue(new DataChannelOnMessageAvailable(
+ type, this, channel, recvData));
+ }
+}
+
+void
+DataChannelConnection::HandleDCEPMessage(const void *buffer, size_t length, uint32_t ppid,
+ uint16_t stream, int flags)
+{
+ const struct rtcweb_datachannel_open_request *req;
+ const struct rtcweb_datachannel_ack *ack;
+
+ // Note: Until we support SIZE_MAX sized messages, we need this check
+#if (SIZE_MAX > UINT32_MAX)
+ if (length > UINT32_MAX) {
+ LOG(("DataChannel: Cannot handle message of size %zu (max=%u)", length, UINT32_MAX));
+ Stop();
+ return;
+ }
+#endif
+ uint32_t data_length = (uint32_t)length;
+
+ mLock.AssertCurrentThreadOwns();
+
+ // Buffer message until complete
+ const uint8_t bufferFlags = BufferMessage(mRecvBuffer, buffer, data_length, ppid, flags);
+ if (bufferFlags & DATA_CHANNEL_BUFFER_MESSAGE_FLAGS_TOO_LARGE) {
+ LOG(("DataChannel: Buffered message would become too large to handle, closing connection"));
+ mRecvBuffer.Truncate(0);
+ Stop();
+ return;
+ }
+ if (!(bufferFlags & DATA_CHANNEL_BUFFER_MESSAGE_FLAGS_COMPLETE)) {
+ LOG(("Buffered partial DCEP message of length %u", data_length));
+ return;
+ }
+ if (bufferFlags & DATA_CHANNEL_BUFFER_MESSAGE_FLAGS_BUFFERED) {
+ buffer = reinterpret_cast<const void *>(mRecvBuffer.BeginReading());
+ data_length = mRecvBuffer.Length();
+ }
+
+ req = static_cast<const struct rtcweb_datachannel_open_request *>(buffer);
+ LOG(("Handling DCEP message of length %u", data_length));
+
+ // Ensure minimum message size (ack is the smallest DCEP message)
+ if ((size_t)data_length < sizeof(*ack)) {
+ LOG(("Ignored invalid DCEP message (too short)"));
+ return;
+ }
+
+ switch (req->msg_type) {
+ case DATA_CHANNEL_OPEN_REQUEST:
+ // structure includes a possibly-unused char label[1] (in a packed structure)
+ if (NS_WARN_IF((size_t)data_length < sizeof(*req) - 1)) {
return;
- }
- /* Notify onmessage */
- LOG(("%s: sending ON_DATA for %p", __FUNCTION__, channel));
- channel->SendOrQueue(new DataChannelOnMessageAvailable(
- DataChannelOnMessageAvailable::ON_DATA, this,
- channel, recvData, length));
+ }
+
+ HandleOpenRequestMessage(req, data_length, stream);
+ break;
+ case DATA_CHANNEL_ACK:
+ // >= sizeof(*ack) checked above
+
+ ack = static_cast<const struct rtcweb_datachannel_ack *>(buffer);
+ HandleOpenAckMessage(ack, data_length, stream);
+ break;
+ default:
+ HandleUnknownMessage(ppid, data_length, stream);
+ break;
}
+
+ // Reset buffer
+ mRecvBuffer.Truncate(0);
}
// Called with mLock locked!
void
-DataChannelConnection::HandleMessage(const void *buffer, size_t length, uint32_t ppid, uint16_t stream)
+DataChannelConnection::HandleMessage(const void *buffer, size_t length, uint32_t ppid,
+ uint16_t stream, int flags)
{
- const struct rtcweb_datachannel_open_request *req;
- const struct rtcweb_datachannel_ack *ack;
-
mLock.AssertCurrentThreadOwns();
switch (ppid) {
case DATA_CHANNEL_PPID_CONTROL:
- req = static_cast<const struct rtcweb_datachannel_open_request *>(buffer);
-
- NS_ENSURE_TRUE_VOID(length >= sizeof(*ack)); // smallest message
- switch (req->msg_type) {
- case DATA_CHANNEL_OPEN_REQUEST:
- // structure includes a possibly-unused char label[1] (in a packed structure)
- NS_ENSURE_TRUE_VOID(length >= sizeof(*req) - 1);
-
- HandleOpenRequestMessage(req, length, stream);
- break;
- case DATA_CHANNEL_ACK:
- // >= sizeof(*ack) checked above
-
- ack = static_cast<const struct rtcweb_datachannel_ack *>(buffer);
- HandleOpenAckMessage(ack, length, stream);
- break;
- default:
- HandleUnknownMessage(ppid, length, stream);
- break;
- }
+ HandleDCEPMessage(buffer, length, ppid, stream, flags);
break;
+ case DATA_CHANNEL_PPID_DOMSTRING_PARTIAL:
case DATA_CHANNEL_PPID_DOMSTRING:
- case DATA_CHANNEL_PPID_DOMSTRING_LAST:
+ case DATA_CHANNEL_PPID_BINARY_PARTIAL:
case DATA_CHANNEL_PPID_BINARY:
- case DATA_CHANNEL_PPID_BINARY_LAST:
- HandleDataMessage(ppid, buffer, length, stream);
+ HandleDataMessage(buffer, length, ppid, stream, flags);
break;
default:
- LOG(("Message of length %zu, PPID %u on stream %u received.",
- length, ppid, stream));
+ LOG(("Message of length %zu PPID %u on stream %u received (%s).",
+ length, ppid, stream, (flags & MSG_EOR) ? "complete" : "partial"));
break;
}
}
void
DataChannelConnection::HandleAssociationChangeEvent(const struct sctp_assoc_change *sac)
{
uint32_t i, n;
switch (sac->sac_state) {
case SCTP_COMM_UP:
LOG(("Association change: SCTP_COMM_UP"));
if (mState == CONNECTING) {
mSocket = mMasterSocket;
mState = OPEN;
+ // Check for older Firefox by looking at the amount of incoming streams
+ LOG(("Negotiated number of incoming streams: %" PRIu16, sac->sac_inbound_streams));
+ if (!mMaxMessageSizeSet
+ && sac->sac_inbound_streams == WEBRTC_DATACHANNEL_STREAMS_OLDER_FIREFOX) {
+ LOG(("Older Firefox detected, using PPID-based fragmentation"));
+ mPpidFragmentation = true;
+ }
+
SetEvenOdd();
Dispatch(do_AddRef(new DataChannelOnMessageAvailable(
DataChannelOnMessageAvailable::ON_CONNECTION,
this)));
LOG(("DTLS connect() succeeded! Entering connected mode"));
// Open any streams pending...
@@ -1474,63 +1713,71 @@ DataChannelConnection::HandleAssociation
LOG(("DataConnection Already OPEN"));
} else {
LOG(("Unexpected state: %d", mState));
}
break;
case SCTP_COMM_LOST:
LOG(("Association change: SCTP_COMM_LOST"));
// This association is toast, so also close all the channels -- from mainthread!
- Dispatch(do_AddRef(new DataChannelOnMessageAvailable(
- DataChannelOnMessageAvailable::ON_DISCONNECTED,
- this)));
+ Stop();
break;
case SCTP_RESTART:
LOG(("Association change: SCTP_RESTART"));
break;
case SCTP_SHUTDOWN_COMP:
LOG(("Association change: SCTP_SHUTDOWN_COMP"));
- Dispatch(do_AddRef(new DataChannelOnMessageAvailable(
- DataChannelOnMessageAvailable::ON_DISCONNECTED,
- this)));
+ Stop();
break;
case SCTP_CANT_STR_ASSOC:
LOG(("Association change: SCTP_CANT_STR_ASSOC"));
break;
default:
LOG(("Association change: UNKNOWN"));
break;
}
LOG(("Association change: streams (in/out) = (%u/%u)",
sac->sac_inbound_streams, sac->sac_outbound_streams));
- NS_ENSURE_TRUE_VOID(sac);
+ if (NS_WARN_IF(!sac)) {
+ return;
+ }
+
n = sac->sac_length - sizeof(*sac);
- if (((sac->sac_state == SCTP_COMM_UP) ||
- (sac->sac_state == SCTP_RESTART)) && (n > 0)) {
- for (i = 0; i < n; ++i) {
- switch (sac->sac_info[i]) {
- case SCTP_ASSOC_SUPPORTS_PR:
- LOG(("Supports: PR"));
- break;
- case SCTP_ASSOC_SUPPORTS_AUTH:
- LOG(("Supports: AUTH"));
- break;
- case SCTP_ASSOC_SUPPORTS_ASCONF:
- LOG(("Supports: ASCONF"));
- break;
- case SCTP_ASSOC_SUPPORTS_MULTIBUF:
- LOG(("Supports: MULTIBUF"));
- break;
- case SCTP_ASSOC_SUPPORTS_RE_CONFIG:
- LOG(("Supports: RE-CONFIG"));
- break;
- default:
- LOG(("Supports: UNKNOWN(0x%02x)", sac->sac_info[i]));
- break;
+ if ((sac->sac_state == SCTP_COMM_UP) || (sac->sac_state == SCTP_RESTART)) {
+ if (n > 0) {
+ for (i = 0; i < n; ++i) {
+ switch (sac->sac_info[i]) {
+ case SCTP_ASSOC_SUPPORTS_PR:
+ LOG(("Supports: PR"));
+ break;
+ case SCTP_ASSOC_SUPPORTS_AUTH:
+ LOG(("Supports: AUTH"));
+ break;
+ case SCTP_ASSOC_SUPPORTS_ASCONF:
+ LOG(("Supports: ASCONF"));
+ break;
+ case SCTP_ASSOC_SUPPORTS_MULTIBUF:
+ LOG(("Supports: MULTIBUF"));
+ break;
+ case SCTP_ASSOC_SUPPORTS_RE_CONFIG:
+ LOG(("Supports: RE-CONFIG"));
+ break;
+#if defined(SCTP_ASSOC_SUPPORTS_INTERLEAVING)
+ case SCTP_ASSOC_SUPPORTS_INTERLEAVING:
+ LOG(("Supports: NDATA"));
+ // TODO: This should probably be set earlier above in 'case SCTP_COMM_UP' but we also
+ // need this for 'SCTP_RESTART'.
+ mSendInterleaved = true;
+ break;
+#endif
+ default:
+ LOG(("Supports: UNKNOWN(0x%02x)", sac->sac_info[i]));
+ break;
+ }
}
}
} else if (((sac->sac_state == SCTP_COMM_LOST) ||
(sac->sac_state == SCTP_CANT_STR_ASSOC)) && (n > 0)) {
LOG(("Association: ABORT ="));
for (i = 0; i < n; ++i) {
LOG((" 0x%02x", sac->sac_info[i]));
}
@@ -1620,16 +1867,48 @@ DataChannelConnection::HandleShutdownEve
void
DataChannelConnection::HandleAdaptationIndication(const struct sctp_adaptation_event *sai)
{
LOG(("Adaptation indication: %x.", sai-> sai_adaptation_ind));
}
void
+DataChannelConnection::HandlePartialDeliveryEvent(const struct sctp_pdapi_event *spde)
+{
+ // Note: Be aware that stream and sequence number being u32 instead of u16 is
+ // a bug in the SCTP API. This may change in the future.
+
+ LOG(("Partial delivery event: "));
+ switch (spde->pdapi_indication) {
+ case SCTP_PARTIAL_DELIVERY_ABORTED:
+ LOG(("delivery aborted "));
+ break;
+ default:
+ LOG(("??? "));
+ break;
+ }
+ LOG(("(flags = %x), stream = %" PRIu32 ", sn = %" PRIu32, spde->pdapi_flags, spde->pdapi_stream,
+ spde->pdapi_seq));
+
+ // Validate stream ID
+ if (spde->pdapi_stream >= UINT16_MAX) {
+ LOG(("Invalid stream id in partial delivery event: %" PRIu32 "\n", spde->pdapi_stream));
+ return;
+ }
+
+ // Find channel and reset buffer
+ DataChannel *channel = FindChannelByStream((uint16_t)spde->pdapi_stream);
+ if (channel) {
+ LOG(("Abort partially delivered message of %u bytes\n", channel->mRecvBuffer.Length()));
+ channel->mRecvBuffer.Truncate(0);
+ }
+}
+
+void
DataChannelConnection::HandleSendFailedEvent(const struct sctp_send_failed_event *ssfe)
{
size_t i, n;
if (ssfe->ssfe_flags & SCTP_DATA_UNSENT) {
LOG(("Unsent "));
}
if (ssfe->ssfe_flags & SCTP_DATA_SENT) {
@@ -1814,16 +2093,17 @@ DataChannelConnection::HandleStreamChang
size_t num_needed = mPending.GetSize();
LOG(("%zu of %d new streams already needed", num_needed,
new_len - old_len));
num_needed -= (new_len - old_len); // number we added
if (num_needed > 0) {
if (num_needed < 16)
num_needed = 16;
LOG(("Not enough new streams, asking for %zu more", num_needed));
+ // TODO: parameter is an int32_t but we pass size_t
RequestMoreStreams(num_needed);
} else if (strchg->strchange_outstrms < strchg->strchange_instrms) {
LOG(("Requesting %d output streams to match partner",
strchg->strchange_instrms - strchg->strchange_outstrms));
RequestMoreStreams(strchg->strchange_instrms - strchg->strchange_outstrms);
}
ProcessQueuedOpens();
@@ -1845,19 +2125,39 @@ DataChannelConnection::HandleStreamChang
DataChannelOnMessageAvailable::ON_CHANNEL_CLOSED, this,
channel)));
// maybe fire onError (bug 843625)
} else {
stream = FindFreeStream();
if (stream != INVALID_STREAM) {
channel->mStream = stream;
mStreams[stream] = channel;
- channel->mFlags |= DATA_CHANNEL_FLAGS_SEND_REQ;
- // Note: we're locked, so there's no danger of a race with the
- // buffer-threshold callback
+
+ // Send open request
+ int error = SendOpenRequestMessage(
+ channel->mLabel, channel->mProtocol, channel->mStream,
+ !!(channel->mFlags & DATA_CHANNEL_FLAGS_OUT_OF_ORDER_ALLOWED), channel->mPrPolicy,
+ channel->mPrValue);
+ if (error) {
+ LOG(("SendOpenRequest failed, error = %d", error));
+ // Close the channel, inform the user
+ mStreams[channel->mStream] = nullptr;
+ channel->mState = CLOSED;
+ // Don't need to reset; we didn't open it
+ Dispatch(do_AddRef(new DataChannelOnMessageAvailable(
+ DataChannelOnMessageAvailable::ON_CHANNEL_CLOSED, this,
+ channel)));
+ } else {
+ channel->mState = OPEN;
+ channel->mFlags |= DATA_CHANNEL_FLAGS_READY;
+ LOG(("%s: sending ON_CHANNEL_OPEN for %p", __FUNCTION__, channel.get()));
+ Dispatch(do_AddRef(new DataChannelOnMessageAvailable(
+ DataChannelOnMessageAvailable::ON_CHANNEL_OPEN, this,
+ channel)));
+ }
} else {
/* We will not find more ... */
break;
}
}
}
}
}
@@ -1881,28 +2181,28 @@ DataChannelConnection::HandleNotificatio
HandleRemoteErrorEvent(&(notif->sn_remote_error));
break;
case SCTP_SHUTDOWN_EVENT:
HandleShutdownEvent(&(notif->sn_shutdown_event));
break;
case SCTP_ADAPTATION_INDICATION:
HandleAdaptationIndication(&(notif->sn_adaptation_event));
break;
- case SCTP_PARTIAL_DELIVERY_EVENT:
- LOG(("SCTP_PARTIAL_DELIVERY_EVENT"));
- break;
case SCTP_AUTHENTICATION_EVENT:
LOG(("SCTP_AUTHENTICATION_EVENT"));
break;
case SCTP_SENDER_DRY_EVENT:
//LOG(("SCTP_SENDER_DRY_EVENT"));
break;
case SCTP_NOTIFICATIONS_STOPPED_EVENT:
LOG(("SCTP_NOTIFICATIONS_STOPPED_EVENT"));
break;
+ case SCTP_PARTIAL_DELIVERY_EVENT:
+ HandlePartialDeliveryEvent(&(notif->sn_pdapi_event));
+ break;
case SCTP_SEND_FAILED_EVENT:
HandleSendFailedEvent(&(notif->sn_send_failed_event));
break;
case SCTP_STREAM_RESET_EVENT:
HandleStreamResetEvent(&(notif->sn_strreset_event));
break;
case SCTP_ASSOC_RESET_EVENT:
LOG(("SCTP_ASSOC_RESET_EVENT"));
@@ -1913,28 +2213,28 @@ DataChannelConnection::HandleNotificatio
default:
LOG(("unknown SCTP event: %u", (uint32_t)notif->sn_header.sn_type));
break;
}
}
int
DataChannelConnection::ReceiveCallback(struct socket* sock, void *data, size_t datalen,
- struct sctp_rcvinfo rcv, int32_t flags)
+ struct sctp_rcvinfo rcv, int flags)
{
ASSERT_WEBRTC(!NS_IsMainThread());
if (!data) {
usrsctp_close(sock); // SCTP has finished shutting down
} else {
MutexAutoLock lock(mLock);
if (flags & MSG_NOTIFICATION) {
HandleNotification(static_cast<union sctp_notification *>(data), datalen);
} else {
- HandleMessage(data, datalen, ntohl(rcv.rcv_ppid), rcv.rcv_sid);
+ HandleMessage(data, datalen, ntohl(rcv.rcv_ppid), rcv.rcv_sid, flags);
}
}
// sctp allocates 'data' with malloc(), and expects the receiver to free
// it (presumably with free).
// XXX future optimization: try to deliver messages without an internal
// alloc/copy, and if so delay the free until later.
free(data);
// usrsctp defines the callback as returning an int, but doesn't use it
@@ -1979,16 +2279,17 @@ DataChannelConnection::Open(const nsACSt
if (aStream != INVALID_STREAM && aStream < mStreams.Length() && mStreams[aStream]) {
LOG(("ERROR: external negotiation of already-open channel %u", aStream));
// XXX How do we indicate this up to the application? Probably the
// caller's job, but we may need to return an error code.
return nullptr;
}
flags = !inOrder ? DATA_CHANNEL_FLAGS_OUT_OF_ORDER_ALLOWED : 0;
+
RefPtr<DataChannel> channel(new DataChannel(this,
aStream,
DataChannel::CONNECTING,
label, protocol,
prPolicy, prValue,
flags,
aListener, aContext));
if (aExternalNegotiated) {
@@ -2098,37 +2399,32 @@ DataChannelConnection::OpenFinish(alread
MOZ_ASSERT(stream != INVALID_STREAM);
// just allocated (& OPEN), or externally negotiated
mStreams[stream] = channel; // holds a reference
channel->mStream = stream;
#ifdef TEST_QUEUED_DATA
// It's painful to write a test for this...
channel->mState = OPEN;
- channel->mReady = true;
- SendMsgInternal(channel, "Help me!", 8, DATA_CHANNEL_PPID_DOMSTRING_LAST);
+ channel->mFlags |= DATA_CHANNEL_FLAGS_READY;
+ SendDataMsgInternalOrBuffer(channel, "Help me!", 8, DATA_CHANNEL_PPID_DOMSTRING);
#endif
if (channel->mFlags & DATA_CHANNEL_FLAGS_OUT_OF_ORDER_ALLOWED) {
// Don't send unordered until this gets cleared
channel->mFlags |= DATA_CHANNEL_FLAGS_WAITING_ACK;
}
if (!(channel->mFlags & DATA_CHANNEL_FLAGS_EXTERNAL_NEGOTIATED)) {
- if (!SendOpenRequestMessage(channel->mLabel, channel->mProtocol,
- stream,
- !!(channel->mFlags & DATA_CHANNEL_FLAGS_OUT_OF_ORDER_ALLOWED),
- channel->mPrPolicy, channel->mPrValue)) {
- LOG(("SendOpenRequest failed, errno = %d", errno));
- if (errno == EAGAIN || errno == EWOULDBLOCK) {
- channel->mFlags |= DATA_CHANNEL_FLAGS_SEND_REQ;
- // Note: we're locked, so there's no danger of a race with the
- // buffer-threshold callback
- return channel.forget();
- }
+ int error = SendOpenRequestMessage(
+ channel->mLabel, channel->mProtocol, stream,
+ !!(channel->mFlags & DATA_CHANNEL_FLAGS_OUT_OF_ORDER_ALLOWED), channel->mPrPolicy,
+ channel->mPrValue);
+ if (error) {
+ LOG(("SendOpenRequest failed, error = %d", error));
if (channel->mFlags & DATA_CHANNEL_FLAGS_FINISH_OPEN) {
// We already returned the channel to the app.
NS_ERROR("Failed to send open request");
Dispatch(do_AddRef(new DataChannelOnMessageAvailable(
DataChannelOnMessageAvailable::ON_CHANNEL_CLOSED, this,
channel)));
}
// If we haven't returned the channel yet, it will get destroyed when we exit
@@ -2138,17 +2434,17 @@ DataChannelConnection::OpenFinish(alread
// we'll be destroying the channel
channel->mState = CLOSED;
return nullptr;
/* NOTREACHED */
}
}
// Either externally negotiated or we sent Open
channel->mState = OPEN;
- channel->mReady = true;
+ channel->mFlags |= DATA_CHANNEL_FLAGS_READY;
// FIX? Move into DOMDataChannel? I don't think we can send it yet here
LOG(("%s: sending ON_CHANNEL_OPEN for %p", __FUNCTION__, channel.get()));
Dispatch(do_AddRef(new DataChannelOnMessageAvailable(
DataChannelOnMessageAvailable::ON_CHANNEL_OPEN, this,
channel)));
return channel.forget();
@@ -2163,145 +2459,272 @@ request_error_cleanup:
return channel.forget();
}
// we'll be destroying the channel, but it never really got set up
// Alternative would be to RUN_ON_THREAD(channel.forget(),::Destroy,...) and
// Dispatch it to ourselves
return nullptr;
}
-int32_t
-DataChannelConnection::SendMsgInternal(DataChannel *channel, const char *data,
- size_t length, uint32_t ppid)
+// Requires mLock to be locked!
+// Returns a POSIX error code directly instead of setting errno.
+int
+DataChannelConnection::SendMsgInternal(OutgoingMsg &msg)
{
- uint16_t flags;
- struct sctp_sendv_spa spa;
- int32_t result;
-
- NS_ENSURE_TRUE(channel->mState == OPEN || channel->mState == CONNECTING, 0);
- NS_WARNING_ASSERTION(length > 0, "Length is 0?!");
-
- // To avoid problems where an in-order OPEN is lost and an
- // out-of-order data message "beats" it, require data to be in-order
- // until we get an ACK.
- if ((channel->mFlags & DATA_CHANNEL_FLAGS_OUT_OF_ORDER_ALLOWED) &&
- !(channel->mFlags & DATA_CHANNEL_FLAGS_WAITING_ACK)) {
- flags = SCTP_UNORDERED;
- } else {
- flags = 0;
+ auto &info = msg.GetInfo().sendv_sndinfo;
+ int error;
+
+ // EOR set?
+ bool eor_set = info.snd_flags & SCTP_EOR ? true : false;
+
+ // Send until buffer is empty
+ size_t left = msg.GetLeft();
+ do {
+ size_t length;
+
+ // Carefully chunk the buffer
+ if (left > DATA_CHANNEL_MAX_BINARY_FRAGMENT) {
+ length = DATA_CHANNEL_MAX_BINARY_FRAGMENT;
+
+ // Unset EOR flag
+ info.snd_flags &= ~SCTP_EOR;
+ } else {
+ length = left;
+
+ // Set EOR flag
+ if (eor_set) {
+ info.snd_flags |= SCTP_EOR;
+ }
+ }
+
+ // Send (or try at least)
+ // SCTP will return EMSGSIZE if the message is bigger than the buffer
+ // size (or EAGAIN if there isn't space). However, we can avoid EMSGSIZE
+ // by carefully crafting small enough message chunks.
+ ssize_t written = usrsctp_sendv(
+ mSocket, msg.GetData(), length, nullptr, 0,
+ (void *)&msg.GetInfo(), (socklen_t)sizeof(struct sctp_sendv_spa),
+ SCTP_SENDV_SPA, 0);
+ if (written < 0) {
+ error = errno;
+ goto out;
+ }
+ LOG(("Sent buffer (written=%zu, len=%zu, left=%zu)",
+ (size_t)written, length, left - (size_t)written));
+
+ // TODO: Remove once resolved (https://github.com/sctplab/usrsctp/issues/132)
+ if (written == 0) {
+ LOG(("@tuexen: usrsctp_sendv returned 0"));
+ error = EAGAIN;
+ goto out;
+ }
+
+ // If not all bytes have been written, this obviously means that usrsctp's buffer is full
+ // and we need to try again later.
+ if ((size_t)written < length) {
+ msg.Advance((size_t)written);
+ error = EAGAIN;
+ goto out;
+ }
+
+ // Update buffer position
+ msg.Advance((size_t)written);
+
+ // Get amount of bytes left in the buffer
+ left = msg.GetLeft();
+ } while (left > 0);
+
+ // Done
+ error = 0;
+
+out:
+ // Reset EOR flag
+ if (eor_set) {
+ info.snd_flags |= SCTP_EOR;
}
- spa.sendv_sndinfo.snd_ppid = htonl(ppid);
- spa.sendv_sndinfo.snd_sid = channel->mStream;
- spa.sendv_sndinfo.snd_flags = flags;
- spa.sendv_sndinfo.snd_context = 0;
- spa.sendv_sndinfo.snd_assoc_id = 0;
- spa.sendv_flags = SCTP_SEND_SNDINFO_VALID;
-
- if (channel->mPrPolicy != SCTP_PR_SCTP_NONE) {
- spa.sendv_prinfo.pr_policy = channel->mPrPolicy;
- spa.sendv_prinfo.pr_value = channel->mPrValue;
- spa.sendv_flags |= SCTP_SEND_PRINFO_VALID;
- }
+ return error;
+}
+
+// Requires mLock to be locked!
+// Returns a POSIX error code directly instead of setting errno.
+// IMPORTANT: Ensure that the buffer passed is guarded by mLock!
+int
+DataChannelConnection::SendMsgInternalOrBuffer(nsTArray<nsAutoPtr<BufferedOutgoingMsg>> &buffer,
+ OutgoingMsg &msg, bool &buffered)
+{
+ NS_WARNING_ASSERTION(msg.GetLength() > 0, "Length is 0?!");
+
+ int error = 0;
+ bool need_buffering = false;
// Note: Main-thread IO, but doesn't block!
// XXX FIX! to deal with heavy overruns of JS trying to pass data in
// (more than the buffersize) queue data onto another thread to do the
// actual sends. See netwerk/protocol/websocket/WebSocketChannel.cpp
- // SCTP will return EMSGSIZE if the message is bigger than the buffer
- // size (or EAGAIN if there isn't space)
-
// Avoid a race between buffer-full-failure (where we have to add the
// packet to the buffered-data queue) and the buffer-now-only-half-full
// callback, which happens on a different thread. Otherwise we might
// fail here, then before we add it to the queue get the half-full
// callback, find nothing to do, then on this thread add it to the
// queue - which would sit there. Also, if we later send more data, it
// would arrive ahead of the buffered message, but if the buffer ever
// got to 1/2 full, the message would get sent - but at a semi-random
// time, after other data it was supposed to be in front of.
// Must lock before empty check for similar reasons!
- MutexAutoLock lock(mLock);
- if (channel->mBufferedData.IsEmpty()) {
- result = usrsctp_sendv(mSocket, data, length,
- nullptr, 0,
- (void *)&spa, (socklen_t)sizeof(struct sctp_sendv_spa),
- SCTP_SENDV_SPA, 0);
- LOG(("Sent buffer (len=%zu), result=%d", length, result));
+ mLock.AssertCurrentThreadOwns();
+ if (buffer.IsEmpty() && (mSendInterleaved || !mPendingType)) {
+ error = SendMsgInternal(msg);
+ switch (error) {
+ case 0:
+ break;
+ case EAGAIN:
+#if (EAGAIN != EWOULDBLOCK)
+ case EWOULDBLOCK:
+#endif
+ need_buffering = true;
+ break;
+ default:
+ LOG(("error %d on sending", error));
+ break;
+ }
} else {
- // Fake EAGAIN if we're already buffering data
- result = -1;
- errno = EAGAIN;
+ need_buffering = true;
+ }
+
+ if (need_buffering) {
+ // queue data for resend! And queue any further data for the stream until it is...
+ auto *bufferedMsg = new BufferedOutgoingMsg(msg); // infallible malloc
+ buffer.AppendElement(bufferedMsg); // owned by mBufferedData array
+ LOG(("Queued %zu buffers (left=%zu, total=%zu)",
+ buffer.Length(), msg.GetLeft(), msg.GetLength()));
+ buffered = true;
+ return 0;
+ }
+
+ buffered = false;
+ return error;
+}
+
+// Caller must ensure that length <= UINT32_MAX
+// Returns a POSIX error code.
+int
+DataChannelConnection::SendDataMsgInternalOrBuffer(DataChannel &channel, const uint8_t *data,
+ size_t len, uint32_t ppid)
+{
+ if (NS_WARN_IF(channel.mState != OPEN && channel.mState != CONNECTING)) {
+ return EINVAL; // TODO: Find a better error code
+ }
+
+ struct sctp_sendv_spa info = {0};
+
+ // General flags
+ info.sendv_flags = SCTP_SEND_SNDINFO_VALID;
+
+ // Set stream identifier, protocol identifier and flags
+ info.sendv_sndinfo.snd_sid = channel.mStream;
+ info.sendv_sndinfo.snd_flags = SCTP_EOR;
+ info.sendv_sndinfo.snd_ppid = htonl(ppid);
+
+ // Unordered?
+ // To avoid problems where an in-order OPEN is lost and an
+ // out-of-order data message "beats" it, require data to be in-order
+ // until we get an ACK.
+ if ((channel.mFlags & DATA_CHANNEL_FLAGS_OUT_OF_ORDER_ALLOWED) &&
+ !(channel.mFlags & DATA_CHANNEL_FLAGS_WAITING_ACK)) {
+ info.sendv_sndinfo.snd_flags |= SCTP_UNORDERED;
+ }
+
+ // Partial reliability policy
+ if (channel.mPrPolicy != SCTP_PR_SCTP_NONE) {
+ info.sendv_prinfo.pr_policy = channel.mPrPolicy;
+ info.sendv_prinfo.pr_value = channel.mPrValue;
+ info.sendv_flags |= SCTP_SEND_PRINFO_VALID;
}
- if (result < 0) {
- if (errno == EAGAIN || errno == EWOULDBLOCK) {
-
- // queue data for resend! And queue any further data for the stream until it is...
- auto *buffered = new BufferedMsg(spa, data, length); // infallible malloc
- channel->mBufferedData.AppendElement(buffered); // owned by mBufferedData array
- channel->mFlags |= DATA_CHANNEL_FLAGS_SEND_DATA;
- LOG(("Queued %zu buffers (len=%zu)",
- channel->mBufferedData.Length(), length));
+
+ // Create message instance and send
+ OutgoingMsg msg(info, data, len);
+ MutexAutoLock lock(mLock);
+ bool buffered;
+ int error = SendMsgInternalOrBuffer(channel.mBufferedData, msg, buffered);
+
+ // Set pending type and stream index (if buffered)
+ if (!error && buffered && !mPendingType) {
+ mPendingType = PENDING_DATA;
+ mCurrentStream = channel.mStream;
+ }
+ return error;
+}
+
+// Caller must ensure that length <= UINT32_MAX
+// Returns a POSIX error code.
+int
+DataChannelConnection::SendDataMsg(DataChannel &channel, const uint8_t *data, size_t len,
+ uint32_t ppidPartial, uint32_t ppidFinal)
+{
+ // We *really* don't want to do this from main thread! - and
+ // SendDataMsgInternalOrBuffer avoids blocking.
+
+ if (mPpidFragmentation) {
+ // TODO: Bug 1381136, remove this block and all other code that uses PPIDs for fragmentation
+ // and reassembly once older Firefoxes without EOR are no longer supported as target
+ // clients.
+
+ // Use the deprecated PPID-level fragmentation if enabled. Should be enabled
+ // in case we can be certain that the other peer is an older Firefox browser
+ // that does support PPID-level fragmentation/reassembly.
+
+ // PPID-level fragmentation can only be applied on reliable data channels.
+ if (len > DATA_CHANNEL_MAX_BINARY_FRAGMENT &&
+ channel.mPrPolicy == DATA_CHANNEL_RELIABLE &&
+ !(channel.mFlags & DATA_CHANNEL_FLAGS_OUT_OF_ORDER_ALLOWED)) {
+ LOG(("Sending data message (total=%zu) using deprecated PPID-based chunks", len));
+
+ size_t left = len;
+ while (left > 0) {
+ // Note: For correctness, chunkLen should also consider mMaxMessageSize as minimum but as
+ // this block is going to be removed soon, I see no need for it.
+ size_t chunkLen = std::min<size_t>(left, DATA_CHANNEL_MAX_BINARY_FRAGMENT);
+ left -= chunkLen;
+ uint32_t ppid = left > 0 ? ppidPartial : ppidFinal;
+
+ // Send the chunk
+ // Note that these might end up being deferred and queued.
+ LOG(("Send chunk (len=%zu, left=%zu, total=%zu, ppid %u",
+ chunkLen, left, len, ppid));
+ int error = SendDataMsgInternalOrBuffer(channel, data, chunkLen, ppid);
+ if (error) {
+ LOG(("*** send chunk fail %d", error));
+ return error;
+ }
+
+ // Update data position
+ data += chunkLen;
+ }
+
+ // Sending chunks complete
+ LOG(("Sent %zu chunks using deprecated PPID-based fragmentation",
+ (size_t)(len+DATA_CHANNEL_MAX_BINARY_FRAGMENT-1)/DATA_CHANNEL_MAX_BINARY_FRAGMENT));
return 0;
}
- LOG(("error %d sending string", errno));
+
+ // Cannot do PPID-based fragmentaton on unreliable channels
+ NS_WARNING_ASSERTION(len <= DATA_CHANNEL_MAX_BINARY_FRAGMENT,
+ "Sending too-large data on unreliable channel!");
+ } else {
+ if (mMaxMessageSize != 0 && len > mMaxMessageSize) {
+ LOG(("Message rejected, too large (%zu > %" PRIu64 ")", len, mMaxMessageSize));
+ return EMSGSIZE;
+ }
}
- return result;
-}
-
-// Handles fragmenting binary messages
-int32_t
-DataChannelConnection::SendBinary(DataChannel *channel, const char *data,
- size_t len,
- uint32_t ppid_partial, uint32_t ppid_final)
-{
- // Since there's a limit on network buffer size and no limits on message
- // size, and we don't want to use EOR mode (multiple writes for a
- // message, but all other streams are blocked until you finish sending
- // this message), we need to add application-level fragmentation of large
- // messages. On a reliable channel, these can be simply rebuilt into a
- // large message. On an unreliable channel, we can't and don't know how
- // long to wait, and there are no retransmissions, and no easy way to
- // tell the user "this part is missing", so on unreliable channels we
- // need to return an error if sending more bytes than the network buffers
- // can hold, and perhaps a lower number.
-
- // We *really* don't want to do this from main thread! - and SendMsgInternal
- // avoids blocking.
- // This MUST be reliable and in-order for the reassembly to work
- if (len > DATA_CHANNEL_MAX_BINARY_FRAGMENT &&
- channel->mPrPolicy == DATA_CHANNEL_RELIABLE &&
- !(channel->mFlags & DATA_CHANNEL_FLAGS_OUT_OF_ORDER_ALLOWED)) {
- int32_t sent=0;
- uint32_t origlen = len;
- LOG(("Sending binary message length %zu in chunks", len));
- // XXX check flags for out-of-order, or force in-order for large binary messages
- while (len > 0) {
- size_t sendlen = std::min<size_t>(len, DATA_CHANNEL_MAX_BINARY_FRAGMENT);
- uint32_t ppid;
- len -= sendlen;
- ppid = len > 0 ? ppid_partial : ppid_final;
- LOG(("Send chunk of %zu bytes, ppid %u", sendlen, ppid));
- // Note that these might end up being deferred and queued.
- sent += SendMsgInternal(channel, data, sendlen, ppid);
- data += sendlen;
- }
- LOG(("Sent %d buffers for %u bytes, %d sent immediately, %zu buffers queued",
- (origlen+DATA_CHANNEL_MAX_BINARY_FRAGMENT-1)/DATA_CHANNEL_MAX_BINARY_FRAGMENT,
- origlen, sent,
- channel->mBufferedData.Length()));
- return sent;
- }
- NS_WARNING_ASSERTION(len <= DATA_CHANNEL_MAX_BINARY_FRAGMENT,
- "Sending too-large data on unreliable channel!");
-
- // This will fail if the message is too large (default 256K)
- return SendMsgInternal(channel, data, len, ppid_final);
+
+ // This will use EOR-based fragmentation if the message is too large (> 64 KiB)
+ return SendDataMsgInternalOrBuffer(channel, data, len, ppidFinal);
}
class ReadBlobRunnable : public Runnable {
public:
ReadBlobRunnable(DataChannelConnection* aConnection,
uint16_t aStream,
nsIInputStream* aBlob)
: Runnable("ReadBlobRunnable")
@@ -2324,45 +2747,48 @@ private:
// can send the IOThread to MainThread to die in a runnable, avoiding
// unsafe event loop recursion. Evil.
RefPtr<DataChannelConnection> mConnection;
uint16_t mStream;
// Use RefCount for preventing the object is deleted when SendBlob returns.
RefPtr<nsIInputStream> mBlob;
};
-int32_t
+// Returns a POSIX error code.
+int
DataChannelConnection::SendBlob(uint16_t stream, nsIInputStream *aBlob)
{
DataChannel *channel = mStreams[stream];
- NS_ENSURE_TRUE(channel, 0);
+ if (NS_WARN_IF(!channel)) {
+ return EINVAL; // TODO: Find a better error code
+ }
+
// Spawn a thread to send the data
if (!mInternalIOThread) {
nsresult rv = NS_NewNamedThread("DataChannel IO",
getter_AddRefs(mInternalIOThread));
if (NS_FAILED(rv)) {
- return -1;
+ return EINVAL; // TODO: Find a better error code
}
}
mInternalIOThread->Dispatch(do_AddRef(new ReadBlobRunnable(this, stream, aBlob)), NS_DISPATCH_NORMAL);
return 0;
}
class DataChannelBlobSendRunnable : public Runnable
{
public:
DataChannelBlobSendRunnable(
already_AddRefed<DataChannelConnection>& aConnection,
uint16_t aStream)
: Runnable("DataChannelBlobSendRunnable")
, mConnection(aConnection)
, mStream(aStream)
- {
- }
+ {}
~DataChannelBlobSendRunnable() override
{
if (!NS_IsMainThread() && mConnection) {
MOZ_ASSERT(false);
// explicitly leak the connection if destroyed off mainthread
Unused << mConnection.forget().take();
}
@@ -2431,39 +2857,60 @@ DataChannelConnection::GetStreamIds(std:
ASSERT_WEBRTC(NS_IsMainThread());
for (uint32_t i = 0; i < mStreams.Length(); ++i) {
if (mStreams[i]) {
aStreamList->push_back(mStreams[i]->mStream);
}
}
}
-int32_t
-DataChannelConnection::SendMsgCommon(uint16_t stream, const nsACString &aMsg,
- bool isBinary)
+// Returns a POSIX error code.
+int
+DataChannelConnection::SendDataMsgCommon(uint16_t stream, const nsACString &aMsg,
+ bool isBinary)
{
ASSERT_WEBRTC(NS_IsMainThread());
// We really could allow this from other threads, so long as we deal with
// asynchronosity issues with channels closing, in particular access to
// mStreams, and issues with the association closing (access to mSocket).
- const char *data = aMsg.BeginReading();
+ const uint8_t *data = (const uint8_t *)aMsg.BeginReading();
uint32_t len = aMsg.Length();
- DataChannel *channel;
+#if (UINT32_MAX > SIZE_MAX)
+ if (len > SIZE_MAX) {
+ return EMSGSIZE;
+ }
+#endif
+ DataChannel *channelPtr;
LOG(("Sending %sto stream %u: %u bytes", isBinary ? "binary " : "", stream, len));
// XXX if we want more efficiency, translate flags once at open time
- channel = mStreams[stream];
- NS_ENSURE_TRUE(channel, 0);
-
- if (isBinary)
- return SendBinary(channel, data, len,
- DATA_CHANNEL_PPID_BINARY, DATA_CHANNEL_PPID_BINARY_LAST);
- return SendBinary(channel, data, len,
- DATA_CHANNEL_PPID_DOMSTRING, DATA_CHANNEL_PPID_DOMSTRING_LAST);
+ channelPtr = mStreams[stream];
+ if (NS_WARN_IF(!channelPtr)) {
+ return EINVAL; // TODO: Find a better error code
+ }
+
+ auto &channel = *channelPtr;
+
+ if (isBinary) {
+ return SendDataMsg(channel, data, len,
+ DATA_CHANNEL_PPID_BINARY_PARTIAL, DATA_CHANNEL_PPID_BINARY);
+ } else {
+ return SendDataMsg(channel, data, len,
+ DATA_CHANNEL_PPID_DOMSTRING_PARTIAL, DATA_CHANNEL_PPID_DOMSTRING);
+ }
+}
+
+void
+DataChannelConnection::Stop()
+{
+ // Note: This will call 'CloseAll' from the main thread
+ Dispatch(do_AddRef(new DataChannelOnMessageAvailable(
+ DataChannelOnMessageAvailable::ON_DISCONNECTED,
+ this)));
}
void
DataChannelConnection::Close(DataChannel *aChannel)
{
MutexAutoLock lock(mLock);
CloseInt(aChannel);
}
@@ -2592,25 +3039,70 @@ DataChannel::ReleaseConnection()
void
DataChannel::SetListener(DataChannelListener *aListener, nsISupports *aContext)
{
MutexAutoLock mLock(mListenerLock);
mContext = aContext;
mListener = aListener;
}
+void
+DataChannel::SendErrnoToErrorResult(int error, ErrorResult& aRv)
+{
+ switch (error) {
+ case 0:
+ break;
+ case EMSGSIZE:
+ aRv.Throw(NS_ERROR_DOM_TYPE_ERR);
+ break;
+ default:
+ aRv.Throw(NS_ERROR_DOM_OPERATION_ERR);
+ break;
+ }
+}
+
+void
+DataChannel::SendMsg(const nsACString &aMsg, ErrorResult& aRv)
+{
+ if (!EnsureValidStream(aRv)) {
+ return;
+ }
+
+ SendErrnoToErrorResult(mConnection->SendMsg(mStream, aMsg), aRv);
+}
+
+void
+DataChannel::SendBinaryMsg(const nsACString &aMsg, ErrorResult& aRv)
+{
+ if (!EnsureValidStream(aRv)) {
+ return;
+ }
+
+ SendErrnoToErrorResult(mConnection->SendBinaryMsg(mStream, aMsg), aRv);
+}
+
+void
+DataChannel::SendBinaryStream(nsIInputStream *aBlob,ErrorResult& aRv)
+{
+ if (!EnsureValidStream(aRv)) {
+ return;
+ }
+
+ SendErrnoToErrorResult(mConnection->SendBlob(mStream, aBlob), aRv);
+}
+
// May be called from another (i.e. Main) thread!
void
DataChannel::AppReady()
{
ENSURE_DATACONNECTION;
MutexAutoLock lock(mConnection->mLock);
- mReady = true;
+ mFlags |= DATA_CHANNEL_FLAGS_READY;
if (mState == WAITING_TO_OPEN) {
mState = OPEN;
mMainThreadEventTarget->Dispatch(
do_AddRef(new DataChannelOnMessageAvailable(
DataChannelOnMessageAvailable::ON_CHANNEL_OPEN, mConnection,
this)));
for (uint32_t i = 0; i < mQueuedMessages.Length(); ++i) {
nsCOMPtr<nsIRunnable> runnable = mQueuedMessages[i];
@@ -2621,34 +3113,31 @@ DataChannel::AppReady()
NS_ASSERTION(mQueuedMessages.IsEmpty(), "Shouldn't have queued messages if not WAITING_TO_OPEN");
}
mQueuedMessages.Clear();
mQueuedMessages.Compact();
// We never use it again... We could even allocate the array in the odd
// cases we need it.
}
-uint32_t
+size_t
DataChannel::GetBufferedAmountLocked() const
{
size_t buffered = 0;
- for (auto& buffer : mBufferedData) {
- buffered += buffer->mLength;
+ for (auto &msg : mBufferedData) {
+ buffered += msg->GetLeft();
}
// XXX Note: per Michael Tuexen, there's no way to currently get the buffered
// amount from the SCTP stack for a single stream. It is on their to-do
// list, and once we import a stack with support for that, we'll need to
// add it to what we buffer. Also we'll need to ask for notification of a per-
// stream buffer-low event and merge that into the handling of buffer-low
// (the equivalent to TCP_NOTSENT_LOWAT on TCP sockets)
- if (buffered > UINT32_MAX) { // paranoia - >4GB buffered is very very unlikely
- buffered = UINT32_MAX;
- }
return buffered;
}
uint32_t
DataChannel::GetBufferedAmountLowThreshold()
{
return mBufferedThreshold;
}
@@ -2659,18 +3148,30 @@ DataChannel::SetBufferedAmountLowThresho
{
mBufferedThreshold = aThreshold;
}
// Called with mLock locked!
void
DataChannel::SendOrQueue(DataChannelOnMessageAvailable *aMessage)
{
- if (!mReady &&
+ if (!(mFlags & DATA_CHANNEL_FLAGS_READY) &&
(mState == CONNECTING || mState == WAITING_TO_OPEN)) {
mQueuedMessages.AppendElement(aMessage);
} else {
nsCOMPtr<nsIRunnable> runnable = aMessage;
mMainThreadEventTarget->Dispatch(runnable.forget());
}
}
+bool
+DataChannel::EnsureValidStream(ErrorResult& aRv)
+{
+ MOZ_ASSERT(mConnection);
+ if (mConnection && mStream != INVALID_STREAM) {
+ return true;
+ } else {
+ aRv.Throw(NS_ERROR_DOM_INVALID_STATE_ERR);
+ return false;
+ }
+}
+
} // namespace mozilla
--- a/netwerk/sctp/datachannel/DataChannel.h
+++ b/netwerk/sctp/datachannel/DataChannel.h
@@ -47,86 +47,118 @@ extern "C" {
}
namespace mozilla {
class DataChannelConnection;
class DataChannel;
class DataChannelOnMessageAvailable;
-// For queuing outgoing messages
-class BufferedMsg
+// For sending outgoing messages.
+// This class only holds a reference to the data and the info structure but does
+// not copy it.
+class OutgoingMsg
{
public:
- BufferedMsg(struct sctp_sendv_spa &spa,const char *data,
+ OutgoingMsg(struct sctp_sendv_spa &info, const uint8_t *data,
size_t length);
- ~BufferedMsg();
+ ~OutgoingMsg() {};
+ void Advance(size_t offset);
+ struct sctp_sendv_spa &GetInfo() { return *mInfo; };
+ size_t GetLength() { return mLength; };
+ size_t GetLeft() { return mLength - mPos; };
+ const uint8_t *GetData() { return (const uint8_t *)(mData + mPos); };
- struct sctp_sendv_spa *mSpa;
- const char *mData;
+protected:
+ OutgoingMsg() {}; // Use this for inheritance only
size_t mLength;
+ const uint8_t *mData;
+ struct sctp_sendv_spa *mInfo;
+ size_t mPos;
+};
+
+// For queuing outgoing messages
+// This class copies data of an outgoing message.
+class BufferedOutgoingMsg : public OutgoingMsg
+{
+public:
+ explicit BufferedOutgoingMsg(OutgoingMsg &message);
+ ~BufferedOutgoingMsg();
};
// for queuing incoming data messages before the Open or
// external negotiation is indicated to us
class QueuedDataMessage
{
public:
- QueuedDataMessage(uint16_t stream, uint32_t ppid,
- const void *data, size_t length)
+ QueuedDataMessage(uint16_t stream, uint32_t ppid, int flags,
+ const void *data, uint32_t length)
: mStream(stream)
, mPpid(ppid)
+ , mFlags(flags)
, mLength(length)
{
- mData = static_cast<char *>(moz_xmalloc(length)); // infallible
- memcpy(mData, data, length);
+ mData = static_cast<uint8_t *>(moz_xmalloc((size_t)length)); // infallible
+ memcpy(mData, data, (size_t)length);
}
~QueuedDataMessage()
{
free(mData);
}
uint16_t mStream;
uint32_t mPpid;
- size_t mLength;
- char *mData;
+ int mFlags;
+ uint32_t mLength;
+ uint8_t *mData;
};
// One per PeerConnection
class DataChannelConnection final
: public net::NeckoTargetHolder
#ifdef SCTP_DTLS_SUPPORTED
, public sigslot::has_slots<>
#endif
{
virtual ~DataChannelConnection();
public:
+ enum {
+ PENDING_NONE = 0U, // No outgoing messages are pending
+ PENDING_DCEP = 1U, // Outgoing DCEP messages are pending
+ PENDING_DATA = 2U, // Outgoing data channel messages are pending
+ };
+
NS_INLINE_DECL_THREADSAFE_REFCOUNTING(DataChannelConnection)
class DataConnectionListener : public SupportsWeakPtr<DataConnectionListener>
{
public:
MOZ_DECLARE_WEAKREFERENCE_TYPENAME(DataChannelConnection::DataConnectionListener)
virtual ~DataConnectionListener() {}
// Called when a new DataChannel has been opened by the other side.
virtual void NotifyDataChannel(already_AddRefed<DataChannel> channel) = 0;
};
explicit DataChannelConnection(DataConnectionListener *listener,
nsIEventTarget *aTarget);
- bool Init(unsigned short aPort, uint16_t aNumStreams, bool aUsingDtls);
+ bool Init(unsigned short aPort, uint16_t aNumStreams, bool aMaxMessageSizeSet,
+ uint64_t aMaxMessageSize);
+
void Destroy(); // So we can spawn refs tied to runnables in shutdown
// Finish Destroy on STS to avoid SCTP race condition with ABORT from far end
void DestroyOnSTS(struct socket *aMasterSocket,
struct socket *aSocket);
+ void SetMaxMessageSize(bool aMaxMessageSizeSet, uint64_t aMaxMessageSize);
+ uint64_t GetMaxMessageSize();
+
#ifdef ALLOW_DIRECT_SCTP_LISTEN_CONNECT
// These block; they require something to decide on listener/connector
// (though you can do simultaneous Connect()). Do not call these from
// the main thread!
bool Listen(unsigned short port);
bool Connect(const char *addr, unsigned short port);
#endif
@@ -149,35 +181,41 @@ public:
const nsACString& protocol,
Type type, bool inOrder,
uint32_t prValue,
DataChannelListener *aListener,
nsISupports *aContext,
bool aExternalNegotiated,
uint16_t aStream);
+ void Stop();
void Close(DataChannel *aChannel);
// CloseInt() must be called with mLock held
void CloseInt(DataChannel *aChannel);
void CloseAll();
- int32_t SendMsg(uint16_t stream, const nsACString &aMsg)
+ // Returns a POSIX error code.
+ int SendMsg(uint16_t stream, const nsACString &aMsg)
{
- return SendMsgCommon(stream, aMsg, false);
+ return SendDataMsgCommon(stream, aMsg, false);
}
- int32_t SendBinaryMsg(uint16_t stream, const nsACString &aMsg)
+
+ // Returns a POSIX error code.
+ int SendBinaryMsg(uint16_t stream, const nsACString &aMsg)
{
- return SendMsgCommon(stream, aMsg, true);
+ return SendDataMsgCommon(stream, aMsg, true);
}
- int32_t SendBlob(uint16_t stream, nsIInputStream *aBlob);
+
+ // Returns a POSIX error code.
+ int SendBlob(uint16_t stream, nsIInputStream *aBlob);
// Called on data reception from the SCTP library
// must(?) be public so my c->c++ trampoline can call it
int ReceiveCallback(struct socket* sock, void *data, size_t datalen,
- struct sctp_rcvinfo rcv, int32_t flags);
+ struct sctp_rcvinfo rcv, int flags);
// Find out state
enum {
CONNECTING = 0U,
OPEN = 1U,
CLOSING = 2U,
CLOSED = 3U
};
@@ -205,48 +243,58 @@ private:
static void DTLSConnectThread(void *data);
int SendPacket(unsigned char data[], size_t len, bool release);
void SctpDtlsInput(TransportFlow *flow, const unsigned char *data, size_t len);
static int SctpDtlsOutput(void *addr, void *buffer, size_t length, uint8_t tos, uint8_t set_df);
#endif
DataChannel* FindChannelByStream(uint16_t stream);
uint16_t FindFreeStream();
bool RequestMoreStreams(int32_t aNeeded = 16);
- int32_t SendControlMessage(void *msg, uint32_t len, uint16_t stream);
- int32_t SendOpenRequestMessage(const nsACString& label, const nsACString& protocol,
- uint16_t stream,
- bool unordered, uint16_t prPolicy, uint32_t prValue);
- int32_t SendOpenAckMessage(uint16_t stream);
- int32_t SendMsgInternal(DataChannel *channel, const char *data,
- size_t length, uint32_t ppid);
- int32_t SendBinary(DataChannel *channel, const char *data,
- size_t len, uint32_t ppid_partial, uint32_t ppid_final);
- int32_t SendMsgCommon(uint16_t stream, const nsACString &aMsg, bool isBinary);
+ uint32_t UpdateCurrentStreamIndex();
+ uint32_t GetCurrentStreamIndex();
+ int SendControlMessage(const uint8_t *data, uint32_t len, uint16_t stream);
+ int SendOpenAckMessage(uint16_t stream);
+ int SendOpenRequestMessage(const nsACString& label, const nsACString& protocol, uint16_t stream,
+ bool unordered, uint16_t prPolicy, uint32_t prValue);
+ bool SendBufferedMessages(nsTArray<nsAutoPtr<BufferedOutgoingMsg>> &buffer);
+ int SendMsgInternal(OutgoingMsg &msg);
+ int SendMsgInternalOrBuffer(nsTArray<nsAutoPtr<BufferedOutgoingMsg>> &buffer, OutgoingMsg &msg,
+ bool &buffered);
+ int SendDataMsgInternalOrBuffer(DataChannel &channel, const uint8_t *data, size_t len,
+ uint32_t ppid);
+ int SendDataMsg(DataChannel &channel, const uint8_t *data, size_t len, uint32_t ppidPartial,
+ uint32_t ppidFinal);
+ int SendDataMsgCommon(uint16_t stream, const nsACString &aMsg, bool isBinary);
void DeliverQueuedData(uint16_t stream);
already_AddRefed<DataChannel> OpenFinish(already_AddRefed<DataChannel>&& aChannel);
void ProcessQueuedOpens();
void ClearResets();
void SendOutgoingStreamReset();
void ResetOutgoingStream(uint16_t stream);
void HandleOpenRequestMessage(const struct rtcweb_datachannel_open_request *req,
- size_t length,
- uint16_t stream);
+ uint32_t length, uint16_t stream);
void HandleOpenAckMessage(const struct rtcweb_datachannel_ack *ack,
- size_t length, uint16_t stream);
- void HandleUnknownMessage(uint32_t ppid, size_t length, uint16_t stream);
- void HandleDataMessage(uint32_t ppid, const void *buffer, size_t length, uint16_t stream);
- void HandleMessage(const void *buffer, size_t length, uint32_t ppid, uint16_t stream);
+ uint32_t length, uint16_t stream);
+ void HandleUnknownMessage(uint32_t ppid, uint32_t length, uint16_t stream);
+ uint8_t BufferMessage(nsACString& recvBuffer, const void *data, uint32_t length, uint32_t ppid,
+ int flags);
+ void HandleDataMessage(const void *buffer, size_t length, uint32_t ppid, uint16_t stream,
+ int flags);
+ void HandleDCEPMessage(const void *buffer, size_t length, uint32_t ppid, uint16_t stream,
+ int flags);
+ void HandleMessage(const void *buffer, size_t length, uint32_t ppid, uint16_t stream, int flags);
void HandleAssociationChangeEvent(const struct sctp_assoc_change *sac);
void HandlePeerAddressChangeEvent(const struct sctp_paddr_change *spc);
void HandleRemoteErrorEvent(const struct sctp_remote_error *sre);
void HandleShutdownEvent(const struct sctp_shutdown_event *sse);
void HandleAdaptationIndication(const struct sctp_adaptation_event *sai);
+ void HandlePartialDeliveryEvent(const struct sctp_pdapi_event *spde);
void HandleSendFailedEvent(const struct sctp_send_failed_event *ssfe);
void HandleStreamResetEvent(const struct sctp_stream_reset_event *strrst);
void HandleStreamChangeEvent(const struct sctp_stream_change_event *strchg);
void HandleNotification(const union sctp_notification *notif, size_t n);
#ifdef SCTP_DTLS_SUPPORTED
bool IsSTSThread() {
bool on = false;
@@ -255,49 +303,55 @@ private:
}
return on;
}
#endif
// Exists solely for proxying release of the TransportFlow to the STS thread
static void ReleaseTransportFlow(const RefPtr<TransportFlow>& aFlow) {}
+ bool mSendInterleaved;
+ bool mPpidFragmentation;
+ bool mMaxMessageSizeSet;
+ uint64_t mMaxMessageSize;
+
// Data:
// NOTE: while this array will auto-expand, increases in the number of
// channels available from the stack must be negotiated!
bool mAllocateEven;
AutoTArray<RefPtr<DataChannel>,16> mStreams;
+ uint32_t mCurrentStream;
nsDeque mPending; // Holds addref'ed DataChannel's -- careful!
// holds data that's come in before a channel is open
nsTArray<nsAutoPtr<QueuedDataMessage>> mQueuedData;
+ // holds outgoing control messages
+ nsTArray<nsAutoPtr<BufferedOutgoingMsg>> mBufferedControl; // GUARDED_BY(mConnection->mLock)
// Streams pending reset
AutoTArray<uint16_t,4> mStreamsResetting;
struct socket *mMasterSocket; // accessed from STS thread
struct socket *mSocket; // cloned from mMasterSocket on successful Connect on STS thread
uint16_t mState; // Protected with mLock
#ifdef SCTP_DTLS_SUPPORTED
RefPtr<TransportFlow> mTransportFlow;
nsCOMPtr<nsIEventTarget> mSTS;
#endif
uint16_t mLocalPort; // Accessed from connect thread
uint16_t mRemotePort;
- bool mUsingDtls;
nsCOMPtr<nsIThread> mInternalIOThread;
+ uint8_t mPendingType;
+ nsCString mRecvBuffer;
};
#define ENSURE_DATACONNECTION \
do { MOZ_ASSERT(mConnection); if (!mConnection) { return; } } while (0)
-#define ENSURE_DATACONNECTION_RET(x) \
- do { MOZ_ASSERT(mConnection); if (!mConnection) { return (x); } } while (0)
-
class DataChannel {
public:
enum {
CONNECTING = 0U,
OPEN = 1U,
CLOSING = 2U,
CLOSED = 3U,
WAITING_TO_OPEN = 4U
@@ -314,17 +368,16 @@ public:
nsISupports *aContext)
: mListenerLock("netwerk::sctp::DataChannel")
, mListener(aListener)
, mContext(aContext)
, mConnection(connection)
, mLabel(label)
, mProtocol(protocol)
, mState(state)
- , mReady(false)
, mStream(stream)
, mPrPolicy(policy)
, mPrValue(value)
, mFlags(flags)
, mIsRecvBinary(false)
, mBufferedThreshold(0) // default from spec
, mMainThreadEventTarget(connection->GetNeckoTarget())
{
@@ -347,62 +400,49 @@ public:
// Close this DataChannel. Can be called multiple times. MUST be called
// before destroying the DataChannel (state must be CLOSED or CLOSING).
void Close();
// Set the listener (especially for channels created from the other side)
void SetListener(DataChannelListener *aListener, nsISupports *aContext);
- // Send a string
- bool SendMsg(const nsACString &aMsg)
- {
- ENSURE_DATACONNECTION_RET(false);
+ // Helper for send methods that converts POSIX error codes to an ErrorResult.
+ static void SendErrnoToErrorResult(int error, ErrorResult& aRv);
- if (mStream != INVALID_STREAM)
- return (mConnection->SendMsg(mStream, aMsg) >= 0);
- else
- return false;
- }
+ // Send a string
+ void SendMsg(const nsACString &aMsg, ErrorResult& aRv);
// Send a binary message (TypedArray)
- bool SendBinaryMsg(const nsACString &aMsg)
- {
- ENSURE_DATACONNECTION_RET(false);
-
- if (mStream != INVALID_STREAM)
- return (mConnection->SendBinaryMsg(mStream, aMsg) >= 0);
- else
- return false;
- }
+ void SendBinaryMsg(const nsACString &aMsg, ErrorResult& aRv);
// Send a binary blob
- bool SendBinaryStream(nsIInputStream *aBlob, uint32_t msgLen)
- {
- ENSURE_DATACONNECTION_RET(false);
-
- if (mStream != INVALID_STREAM)
- return (mConnection->SendBlob(mStream, aBlob) == 0);
- else
- return false;
- }
+ void SendBinaryStream(nsIInputStream *aBlob, ErrorResult& aRv);
uint16_t GetType() { return mPrPolicy; }
bool GetOrdered() { return !(mFlags & DATA_CHANNEL_FLAGS_OUT_OF_ORDER_ALLOWED); }
// Amount of data buffered to send
uint32_t GetBufferedAmount()
{
if (!mConnection) {
return 0;
}
MutexAutoLock lock(mConnection->mLock);
- return GetBufferedAmountLocked();
+ size_t buffered = GetBufferedAmountLocked();
+
+#if (SIZE_MAX > UINT32_MAX)
+ if (buffered > UINT32_MAX) { // paranoia - >4GB buffered is very very unlikely
+ buffered = UINT32_MAX;
+ }
+#endif
+
+ return buffered;
}
// Trigger amount for generating BufferedAmountLow events
uint32_t GetBufferedAmountLowThreshold();
void SetBufferedAmountLowThreshold(uint32_t aThreshold);
// Find out state
@@ -430,75 +470,74 @@ protected:
DataChannelListener *mListener;
nsCOMPtr<nsISupports> mContext;
private:
friend class DataChannelOnMessageAvailable;
friend class DataChannelConnection;
nsresult AddDataToBinaryMsg(const char *data, uint32_t size);
- uint32_t GetBufferedAmountLocked() const;
+ size_t GetBufferedAmountLocked() const;
+ bool EnsureValidStream(ErrorResult& aRv);
RefPtr<DataChannelConnection> mConnection;
nsCString mLabel;
nsCString mProtocol;
uint16_t mState;
- bool mReady;
uint16_t mStream;
uint16_t mPrPolicy;
uint32_t mPrValue;
uint32_t mFlags;
uint32_t mId;
bool mIsRecvBinary;
size_t mBufferedThreshold;
nsCString mRecvBuffer;
- nsTArray<nsAutoPtr<BufferedMsg>> mBufferedData; // GUARDED_BY(mConnection->mLock)
+ nsTArray<nsAutoPtr<BufferedOutgoingMsg>> mBufferedData; // GUARDED_BY(mConnection->mLock)
nsTArray<nsCOMPtr<nsIRunnable>> mQueuedMessages;
nsCOMPtr<nsIEventTarget> mMainThreadEventTarget;
};
// used to dispatch notifications of incoming data to the main thread
// Patterned on CallOnMessageAvailable in WebSockets
// Also used to proxy other items to MainThread
class DataChannelOnMessageAvailable : public Runnable
{
public:
enum {
ON_CONNECTION,
ON_DISCONNECTED,
ON_CHANNEL_CREATED,
ON_CHANNEL_OPEN,
ON_CHANNEL_CLOSED,
- ON_DATA,
+ ON_DATA_STRING,
+ ON_DATA_BINARY,
BUFFER_LOW_THRESHOLD,
NO_LONGER_BUFFERED,
}; /* types */
DataChannelOnMessageAvailable(
int32_t aType,
DataChannelConnection* aConnection,
DataChannel* aChannel,
- nsCString& aData, // XXX this causes inefficiency
- int32_t aLen)
+ nsCString& aData) // XXX this causes inefficiency
: Runnable("DataChannelOnMessageAvailable")
, mType(aType)
, mChannel(aChannel)
, mConnection(aConnection)
, mData(aData)
- , mLen(aLen)
{
}
DataChannelOnMessageAvailable(int32_t aType, DataChannel* aChannel)
: Runnable("DataChannelOnMessageAvailable")
, mType(aType)
, mChannel(aChannel)
{
}
- // XXX is it safe to leave mData/mLen uninitialized? This should only be
+ // XXX is it safe to leave mData uninitialized? This should only be
// used for notifications that don't use them, but I'd like more
// bulletproof compile-time checking.
DataChannelOnMessageAvailable(int32_t aType,
DataChannelConnection* aConnection,
DataChannel* aChannel)
: Runnable("DataChannelOnMessageAvailable")
, mType(aType)
@@ -520,35 +559,35 @@ public:
{
MOZ_ASSERT(NS_IsMainThread());
// Note: calling the listeners can indirectly cause the listeners to be
// made available for GC (by removing event listeners), especially for
// OnChannelClosed(). We hold a ref to the Channel and the listener
// while calling this.
switch (mType) {
- case ON_DATA:
+ case ON_DATA_STRING:
+ case ON_DATA_BINARY:
case ON_CHANNEL_OPEN:
case ON_CHANNEL_CLOSED:
case BUFFER_LOW_THRESHOLD:
case NO_LONGER_BUFFERED:
{
MutexAutoLock lock(mChannel->mListenerLock);
if (!mChannel->mListener) {
DATACHANNEL_LOG(("DataChannelOnMessageAvailable (%d) with null Listener!",mType));
return NS_OK;
}
switch (mType) {
- case ON_DATA:
- if (mLen < 0) {
- mChannel->mListener->OnMessageAvailable(mChannel->mContext, mData);
- } else {
- mChannel->mListener->OnBinaryMessageAvailable(mChannel->mContext, mData);
- }
+ case ON_DATA_STRING:
+ mChannel->mListener->OnMessageAvailable(mChannel->mContext, mData);
+ break;
+ case ON_DATA_BINARY:
+ mChannel->mListener->OnBinaryMessageAvailable(mChannel->mContext, mData);
break;
case ON_CHANNEL_OPEN:
mChannel->mListener->OnChannelConnected(mChannel->mContext);
break;
case ON_CHANNEL_CLOSED:
mChannel->mListener->OnChannelClosed(mChannel->mContext);
break;
case BUFFER_LOW_THRESHOLD:
@@ -582,19 +621,18 @@ public:
break;
}
return NS_OK;
}
private:
~DataChannelOnMessageAvailable() {}
- int32_t mType;
+ int32_t mType;
// XXX should use union
RefPtr<DataChannel> mChannel;
RefPtr<DataChannelConnection> mConnection;
- nsCString mData;
- int32_t mLen;
+ nsCString mData;
};
}
#endif // NETWERK_SCTP_DATACHANNEL_DATACHANNEL_H_