--- a/netwerk/sctp/datachannel/DataChannel.cpp
+++ b/netwerk/sctp/datachannel/DataChannel.cpp
@@ -363,17 +363,17 @@ DataChannelConnection::Init(unsigned sho
if (branch) {
if (!NS_FAILED(branch->GetBoolPref(
"media.peerconnection.sctp.force_ppid_fragmentation", &mPpidFragmentation))) {
// Ensure that forced on/off PPID fragmentation does not get overridden when Firefox has
// been detected.
mMaxMessageSizeSet = true;
}
-
+
int32_t temp;
if (!NS_FAILED(branch->GetIntPref(
"media.peerconnection.sctp.force_maximum_message_size", &temp))) {
if (temp >= 0) {
mMaxMessageSize = (uint64_t)temp;
}
}
}
@@ -398,25 +398,25 @@ DataChannelConnection::Init(unsigned sho
// Set logging to SCTP:LogLevel::Debug to get SCTP debugs
if (MOZ_LOG_TEST(gSCTPLog, LogLevel::Debug)) {
usrsctp_sysctl_set_sctp_debug_on(SCTP_DEBUG_ALL);
}
// Do not send ABORTs in response to INITs (1).
// Do not send ABORTs for received Out of the Blue packets (2).
usrsctp_sysctl_set_sctp_blackhole(2);
-
+
// Disable the Explicit Congestion Notification extension (currently not supported by the
// Firefox code)
usrsctp_sysctl_set_sctp_ecn_enable(0);
-
+
// Enable interleaving messages for different streams (incoming)
// See: https://tools.ietf.org/html/rfc6458#section-8.1.20
usrsctp_sysctl_set_sctp_default_frag_interleave(2);
-
+
sctp_initialized = true;
RefPtr<DataChannelShutdown> shutdown = new DataChannelShutdown();
shutdown->Init();
}
}
// XXX FIX! make this a global we get once
@@ -472,19 +472,19 @@ DataChannelConnection::Init(unsigned sho
{
const int option_value = 1;
if (usrsctp_setsockopt(mMasterSocket, IPPROTO_SCTP, SCTP_EXPLICIT_EOR,
(const void *)&option_value, (socklen_t)sizeof(option_value)) < 0) {
LOG(("*** failed enable explicit EOR mode %d", errno));
goto error_cleanup;
}
}
-
+
// Enable ndata
- // TODO: Enable this once ndata has been deployed
+ // TODO: Bug 1381145, enable this once ndata has been deployed
#if 0
av.assoc_id = SCTP_FUTURE_ASSOC;
av.assoc_value = 1;
if (usrsctp_setsockopt(mMasterSocket, IPPROTO_SCTP, SCTP_INTERLEAVING_SUPPORTED, &av,
(socklen_t)sizeof(struct sctp_assoc_value)) < 0) {
LOG(("*** failed enable ndata errno %d", errno));
goto error_cleanup;
}
@@ -555,17 +555,19 @@ DataChannelConnection::SetEvenOdd()
}
bool
DataChannelConnection::ConnectViaTransportFlow(TransportFlow *aFlow, uint16_t localport, uint16_t remoteport)
{
LOG(("Connect DTLS local %u, remote %u", localport, remoteport));
NS_PRECONDITION(mMasterSocket, "SCTP wasn't initialized before ConnectViaTransportFlow!");
- NS_ENSURE_TRUE(aFlow, false);
+ if (NS_WARN_IF(!aFlow)) {
+ return false;
+ }
mTransportFlow = aFlow;
mLocalPort = localport;
mRemotePort = remoteport;
mState = CONNECTING;
RUN_ON_THREAD(mSTS, WrapRunnable(RefPtr<DataChannelConnection>(this),
&DataChannelConnection::SetSignals),
@@ -939,28 +941,28 @@ DataChannelConnection::FindFreeStream()
uint32_t
DataChannelConnection::UpdateCurrentStreamIndex()
{
if (mCurrentStream == mStreams.Length() - 1) {
mCurrentStream = 0;
} else {
++mCurrentStream;
}
-
+
return mCurrentStream;
}
uint32_t
DataChannelConnection::GetCurrentStreamIndex()
{
// Fix current stream index (in case #streams decreased)
if (mCurrentStream >= mStreams.Length()) {
mCurrentStream = 0;
}
-
+
return mCurrentStream;
}
bool
DataChannelConnection::RequestMoreStreams(int32_t aNeeded)
{
struct sctp_status status;
struct sctp_add_streams sas;
@@ -1011,28 +1013,28 @@ DataChannelConnection::SendControlMessag
// General flags
info.sendv_flags = SCTP_SEND_SNDINFO_VALID;
// Set stream identifier, protocol identifier and flags
info.sendv_sndinfo.snd_sid = stream;
info.sendv_sndinfo.snd_flags = SCTP_EOR;
info.sendv_sndinfo.snd_ppid = htonl(DATA_CHANNEL_PPID_CONTROL);
-
+
// Create message instance and send
// Note: Main-thread IO, but doesn't block
#if (UINT32_MAX > SIZE_MAX)
if (len > SIZE_MAX) {
return EMSGSIZE;
}
#endif
OutgoingMsg msg(info, data, (size_t)len);
bool buffered;
int error = SendMsgInternalOrBuffer(mBufferedControl, msg, buffered);
-
+
// Set pending type and stream index (if buffered)
if (!error && buffered && !mPendingType) {
mPendingType = PENDING_DCEP;
}
return error;
}
// Returns a POSIX error code.
@@ -1122,77 +1124,77 @@ DataChannelConnection::SendDeferredMessa
// Send pending control messages
// Note: If ndata is not active, check if DCEP messages are currently outstanding. These need to
// be sent first before other streams can be used for sending.
if (!mBufferedControl.IsEmpty() && (mSendInterleaved || mPendingType == PENDING_DCEP)) {
if (SendBufferedMessages(mBufferedControl)) {
return true;
}
-
+
// Note: There may or may not be pending data messages
mPendingType = PENDING_DATA;
}
-
+
bool blocked = false;
uint32_t i = GetCurrentStreamIndex();
uint32_t end = i;
do {
channel = mStreams[i];
if (!channel || channel->mBufferedData.IsEmpty()) {
i = UpdateCurrentStreamIndex();
continue;
}
// Clear if closing/closed
if (channel->mState == CLOSED || channel->mState == CLOSING) {
channel->mBufferedData.Clear();
i = UpdateCurrentStreamIndex();
continue;
}
-
+
size_t bufferedAmount = channel->GetBufferedAmountLocked();
size_t threshold = channel->mBufferedThreshold;
bool wasOverThreshold = bufferedAmount >= threshold;
-
+
// Send buffered data messages
// Warning: This will fail in case ndata is inactive and a previously deallocated data channel
// has not been closed properly. If you ever see that no messages can be sent on any
// channel, this is likely the cause (an explicit EOR message partially sent whose
// remaining chunks are still being waited for).
blocked = SendBufferedMessages(channel->mBufferedData);
bufferedAmount = channel->GetBufferedAmountLocked();
-
+
// can never fire with default threshold of 0
if (wasOverThreshold && bufferedAmount < threshold) {
LOG(("%s: sending BUFFER_LOW_THRESHOLD for %s/%s: %u", __FUNCTION__,
channel->mLabel.get(), channel->mProtocol.get(), channel->mStream));
Dispatch(do_AddRef(new DataChannelOnMessageAvailable(
DataChannelOnMessageAvailable::BUFFER_LOW_THRESHOLD,
this, channel)));
}
-
+
if (bufferedAmount == 0) {
// buffered-to-not-buffered transition; tell the DOM code in case this makes it
// available for GC
LOG(("%s: sending NO_LONGER_BUFFERED for %s/%s: %u", __FUNCTION__,
channel->mLabel.get(), channel->mProtocol.get(), channel->mStream));
Dispatch(do_AddRef(new DataChannelOnMessageAvailable(
DataChannelOnMessageAvailable::NO_LONGER_BUFFERED,
this, channel)));
}
-
+
// Update current stream index
// Note: If ndata is not active, the outstanding data messages on this stream need to be sent
// first before other streams can be used for sending.
if (mSendInterleaved || !blocked) {
i = UpdateCurrentStreamIndex();
}
} while (!blocked && i != end);
-
+
if (!blocked) {
mPendingType = mBufferedControl.IsEmpty() ? PENDING_NONE : PENDING_DCEP;
}
return blocked;
}
// Called with mLock locked!
@@ -1360,17 +1362,19 @@ void
DataChannelConnection::HandleOpenAckMessage(const struct rtcweb_datachannel_ack *ack,
uint32_t length, uint16_t stream)
{
DataChannel *channel;
mLock.AssertCurrentThreadOwns();
channel = FindChannelByStream(stream);
- NS_ENSURE_TRUE_VOID(channel);
+ if (NS_WARN_IF(!channel)) {
+ return;
+ }
LOG(("OpenAck received for stream %u, waiting=%d", stream,
(channel->mFlags & DATA_CHANNEL_FLAGS_WAITING_ACK) ? 1 : 0));
channel->mFlags &= ~DATA_CHANNEL_FLAGS_WAITING_ACK;
}
// Caller must ensure that length <= SIZE_MAX
@@ -1379,255 +1383,260 @@ DataChannelConnection::HandleUnknownMess
{
/* XXX: Send an error message? */
LOG(("unknown DataChannel message received: %u, len %u on stream %d", ppid, length, stream));
// XXX Log to JS error console if possible
}
uint8_t
DataChannelConnection::BufferMessage(nsACString& recvBuffer, const void *data,
- uint32_t length, uint32_t ppid, int32_t flags)
+ uint32_t length, uint32_t ppid, int flags)
{
const char *buffer = (const char *) data;
uint8_t bufferFlags = 0;
-
+
if ((flags & MSG_EOR) &&
ppid != DATA_CHANNEL_PPID_BINARY_PARTIAL &&
ppid != DATA_CHANNEL_PPID_DOMSTRING_PARTIAL) {
bufferFlags |= DATA_CHANNEL_BUFFER_MESSAGE_FLAGS_COMPLETE;
}
-
+
// Return directly if nothing has been buffered
if ((bufferFlags & DATA_CHANNEL_BUFFER_MESSAGE_FLAGS_COMPLETE) && recvBuffer.IsEmpty()) {
return bufferFlags;
}
// Ensure it doesn't blow up our buffer
// TODO: Change 'WEBRTC_DATACHANNEL_MAX_MESSAGE_SIZE_LOCAL' to whatever the new buffer is capable
// of holding.
if (((uint64_t) recvBuffer.Length()) + ((uint64_t) length) > WEBRTC_DATACHANNEL_MAX_MESSAGE_SIZE_LOCAL) {
bufferFlags |= DATA_CHANNEL_BUFFER_MESSAGE_FLAGS_TOO_LARGE;
return bufferFlags;
}
// Copy & add to receive buffer
- {
- nsAutoCString recvData(buffer, length); // copies (<64) or allocates
- recvBuffer += recvData;
- bufferFlags |= DATA_CHANNEL_BUFFER_MESSAGE_FLAGS_BUFFERED;
- }
+ recvBuffer.Append(buffer, length);
+ bufferFlags |= DATA_CHANNEL_BUFFER_MESSAGE_FLAGS_BUFFERED;
return bufferFlags;
}
void
-DataChannelConnection::HandleDataMessage(const void *data, size_t dLength, uint32_t ppid,
- uint16_t stream, int32_t flags)
+DataChannelConnection::HandleDataMessage(const void *data, size_t length, uint32_t ppid,
+ uint16_t stream, int flags)
{
DataChannel *channel;
const char *buffer = (const char *) data;
-
+
mLock.AssertCurrentThreadOwns();
channel = FindChannelByStream(stream);
-
+
// Note: Until we support SIZE_MAX sized messages, we need this check
#if (SIZE_MAX > UINT32_MAX)
- if (dLength > UINT32_MAX) {
+ if (length > UINT32_MAX) {
LOG(("DataChannel: Cannot handle message of size %" PRIuSIZE " (max=%" PRIu32 ")",
- dLength, UINT32_MAX));
+ length, UINT32_MAX));
CloseInt(channel);
return;
}
#endif
- uint32_t length = (uint32_t)dLength;
+ uint32_t data_length = (uint32_t)length;
// XXX A closed channel may trip this... check
// NOTE: the updated spec from the IETF says we should set in-order until we receive an ACK.
// That would make this code moot. Keep it for now for backwards compatibility.
if (!channel) {
// In the updated 0-RTT open case, the sender can send data immediately
// after Open, and doesn't set the in-order bit (since we don't have a
// response or ack). Also, with external negotiation, data can come in
// before we're told about the external negotiation. We need to buffer
// data until either a) Open comes in, if the ordering get messed up,
// or b) the app tells us this channel was externally negotiated. When
// these occur, we deliver the data.
// Since this is rare and non-performance, keep a single list of queued
// data messages to deliver once the channel opens.
- LOG(("Queuing data for stream %u, length %u", stream, length));
+ LOG(("Queuing data for stream %u, length %u", stream, data_length));
// Copies data
- mQueuedData.AppendElement(new QueuedDataMessage(stream, ppid, flags, data, length));
+ mQueuedData.AppendElement(new QueuedDataMessage(stream, ppid, flags, data, data_length));
+ return;
+ }
+
+ // Ignore incoming data in case the channel is closed
+ if (channel->mState == CLOSED) {
+ return;
+ }
+
+ bool is_binary = true;
+ uint8_t bufferFlags;
+ int32_t type;
+ const char* info = "";
+
+ if (ppid == DATA_CHANNEL_PPID_DOMSTRING_PARTIAL ||
+ ppid == DATA_CHANNEL_PPID_DOMSTRING) {
+ is_binary = false;
+ }
+ if (is_binary != channel->mIsRecvBinary && !channel->mRecvBuffer.IsEmpty()) {
+ NS_WARNING("DataChannel message aborted by fragment type change!");
+ // TODO: Maybe closing would be better as this is a hard to detect protocol violation?
+ channel->mRecvBuffer.Truncate(0);
+ }
+ channel->mIsRecvBinary = is_binary;
+
+ // Remaining chunks of previously truncated message (due to the buffer being full)?
+ if (channel->mFlags & DATA_CHANNEL_FLAGS_CLOSING_TOO_LARGE) {
+ LOG(("DataChannel: Ignoring partial message of length %u, buffer full and closing",
+ data_length));
+ // Only unblock if unordered
+ if ((channel->mFlags & DATA_CHANNEL_FLAGS_OUT_OF_ORDER_ALLOWED) && (flags & MSG_EOR)) {
+ channel->mFlags &= ~DATA_CHANNEL_FLAGS_CLOSING_TOO_LARGE;
+ }
+ }
+
+ // Buffer message until complete
+ bufferFlags = BufferMessage(channel->mRecvBuffer, buffer, data_length, ppid, flags);
+ if (bufferFlags & DATA_CHANNEL_BUFFER_MESSAGE_FLAGS_TOO_LARGE) {
+ LOG(("DataChannel: Buffered message would become too large to handle, closing channel"));
+ channel->mRecvBuffer.Truncate(0);
+ channel->mFlags |= DATA_CHANNEL_FLAGS_CLOSING_TOO_LARGE;
+ CloseInt(channel);
return;
}
-
- // XXX should this be a simple if, no warnings/debugbreaks?
- NS_ENSURE_TRUE_VOID(channel->mState != CLOSED);
-
- {
- bool is_binary = true;
- uint8_t bufferFlags;
- int32_t type;
- const char* info = "";
-
- if (ppid == DATA_CHANNEL_PPID_DOMSTRING_PARTIAL ||
- ppid == DATA_CHANNEL_PPID_DOMSTRING) {
- is_binary = false;
- }
- if (is_binary != channel->mIsRecvBinary && !channel->mRecvBuffer.IsEmpty()) {
- NS_WARNING("DataChannel message aborted by fragment type change!");
- // TODO: Maybe closing would be better as this is a hard to detect protocol violation?
- channel->mRecvBuffer.Truncate(0);
- }
- channel->mIsRecvBinary = is_binary;
-
- // Remaining chunks of previously truncated message (due to the buffer being full)?
- if (channel->mFlags & DATA_CHANNEL_FLAGS_CLOSING_TOO_LARGE) {
- LOG(("DataChannel: Ignoring partial message of length %u, buffer full and closing", length));
- // Only unblock if unordered
- if ((channel->mFlags & DATA_CHANNEL_FLAGS_OUT_OF_ORDER_ALLOWED) && (flags & MSG_EOR)) {
- channel->mFlags &= ~DATA_CHANNEL_FLAGS_CLOSING_TOO_LARGE;
+ if (!(bufferFlags & DATA_CHANNEL_BUFFER_MESSAGE_FLAGS_COMPLETE)) {
+ LOG(("DataChannel: Partial %s message of length %u (total %u) on channel id %u",
+ is_binary ? "binary" : "string", data_length, channel->mRecvBuffer.Length(),
+ channel->mStream));
+ return; // Not ready to notify application
+ }
+ if (bufferFlags & DATA_CHANNEL_BUFFER_MESSAGE_FLAGS_BUFFERED) {
+ data_length = channel->mRecvBuffer.Length();
+ }
+
+ // Complain about large messages (only complain - we can handle it)
+ if (data_length > WEBRTC_DATACHANNEL_MAX_MESSAGE_SIZE_LOCAL) {
+ LOG(("DataChannel: Received message of length %u is > announced maximum message size (%u)",
+ data_length, WEBRTC_DATACHANNEL_MAX_MESSAGE_SIZE_LOCAL));
+ }
+
+ switch (ppid) {
+ case DATA_CHANNEL_PPID_DOMSTRING:
+ LOG(("DataChannel: Received string message of length %u on channel %u",
+ data_length, channel->mStream));
+ type = DataChannelOnMessageAvailable::ON_DATA_STRING;
+ if (bufferFlags & DATA_CHANNEL_BUFFER_MESSAGE_FLAGS_BUFFERED) {
+ info = " (string fragmented)";
}
- }
-
- // Buffer message until complete
- bufferFlags = BufferMessage(channel->mRecvBuffer, buffer, length, ppid, flags);
- if (bufferFlags & DATA_CHANNEL_BUFFER_MESSAGE_FLAGS_TOO_LARGE) {
- LOG(("DataChannel: Buffered message would become too large to handle, closing channel"));
- channel->mRecvBuffer.Truncate(0);
- channel->mFlags |= DATA_CHANNEL_FLAGS_CLOSING_TOO_LARGE;
- CloseInt(channel);
+ // else send using recvData normally
+
+ // WebSockets checks IsUTF8() here; we can try to deliver it
+ break;
+
+ case DATA_CHANNEL_PPID_BINARY:
+ LOG(("DataChannel: Received binary message of length %u on channel id %u",
+ data_length, channel->mStream));
+ type = DataChannelOnMessageAvailable::ON_DATA_BINARY;
+ if (bufferFlags & DATA_CHANNEL_BUFFER_MESSAGE_FLAGS_BUFFERED) {
+ info = " (binary fragmented)";
+ }
+
+ // else send using recvData normally
+ break;
+
+ default:
+ NS_ERROR("Unknown data PPID");
return;
- }
- if (!(bufferFlags & DATA_CHANNEL_BUFFER_MESSAGE_FLAGS_COMPLETE)) {
- LOG(("DataChannel: Partial %s message of length %u (total %u) on channel id %u",
- is_binary ? "binary" : "string", length, channel->mRecvBuffer.Length(),
- channel->mStream));
- return; // Not ready to notify application
- }
- if (bufferFlags & DATA_CHANNEL_BUFFER_MESSAGE_FLAGS_BUFFERED) {
- length = channel->mRecvBuffer.Length();
- }
-
- // Complain about large messages (only complain - we can handle it)
- if (length > WEBRTC_DATACHANNEL_MAX_MESSAGE_SIZE_LOCAL) {
- LOG(("DataChannel: Received message of length %u is > announced maximum message size (%u)",
- length, WEBRTC_DATACHANNEL_MAX_MESSAGE_SIZE_LOCAL));
- }
-
- switch (ppid) {
- case DATA_CHANNEL_PPID_DOMSTRING:
- LOG(("DataChannel: Received string message of length %u on channel %u",
- length, channel->mStream));
- type = DataChannelOnMessageAvailable::ON_DATA_STRING;
- if (bufferFlags & DATA_CHANNEL_BUFFER_MESSAGE_FLAGS_BUFFERED) {
- info = " (string fragmented)";
- }
- // else send using recvData normally
-
- // WebSockets checks IsUTF8() here; we can try to deliver it
- break;
-
- case DATA_CHANNEL_PPID_BINARY:
- LOG(("DataChannel: Received binary message of length %u on channel id %u",
- length, channel->mStream));
- type = DataChannelOnMessageAvailable::ON_DATA_BINARY;
- if (bufferFlags & DATA_CHANNEL_BUFFER_MESSAGE_FLAGS_BUFFERED) {
- info = " (binary fragmented)";
- }
-
- // else send using recvData normally
- break;
-
- default:
- NS_ERROR("Unknown data PPID");
- return;
- }
-
- // Notify onmessage
- LOG(("%s: sending ON_DATA_%s%s for %p", __FUNCTION__,
- (type == DataChannelOnMessageAvailable::ON_DATA_STRING) ? "STRING" : "BINARY",
- info, channel));
- if (bufferFlags & DATA_CHANNEL_BUFFER_MESSAGE_FLAGS_BUFFERED) {
- channel->SendOrQueue(new DataChannelOnMessageAvailable(
- type, this, channel, channel->mRecvBuffer));
- channel->mRecvBuffer.Truncate(0);
- } else {
- nsAutoCString recvData(buffer, length); // copies (<64) or allocates
- channel->SendOrQueue(new DataChannelOnMessageAvailable(
- type, this, channel, recvData));
- }
+ }
+
+ // Notify onmessage
+ LOG(("%s: sending ON_DATA_%s%s for %p", __FUNCTION__,
+ (type == DataChannelOnMessageAvailable::ON_DATA_STRING) ? "STRING" : "BINARY",
+ info, channel));
+ if (bufferFlags & DATA_CHANNEL_BUFFER_MESSAGE_FLAGS_BUFFERED) {
+ channel->SendOrQueue(new DataChannelOnMessageAvailable(
+ type, this, channel, channel->mRecvBuffer));
+ channel->mRecvBuffer.Truncate(0);
+ } else {
+ nsAutoCString recvData(buffer, data_length); // copies (<64) or allocates
+ channel->SendOrQueue(new DataChannelOnMessageAvailable(
+ type, this, channel, recvData));
}
}
void
-DataChannelConnection::HandleDCEPMessage(const void *buffer, size_t dLength, uint32_t ppid,
- uint16_t stream, int32_t flags)
+DataChannelConnection::HandleDCEPMessage(const void *buffer, size_t length, uint32_t ppid,
+ uint16_t stream, int flags)
{
const struct rtcweb_datachannel_open_request *req;
const struct rtcweb_datachannel_ack *ack;
-
+
// Note: Until we support SIZE_MAX sized messages, we need this check
#if (SIZE_MAX > UINT32_MAX)
- if (dLength > UINT32_MAX) {
- LOG(("DataChannel: Cannot handle message of size %" PRIuSIZE " (max=%u)", dLength, UINT32_MAX));
+ if (length > UINT32_MAX) {
+ LOG(("DataChannel: Cannot handle message of size %" PRIuSIZE " (max=%u)", length, UINT32_MAX));
Stop();
return;
}
#endif
- uint32_t length = (uint32_t)dLength;
-
+ uint32_t data_length = (uint32_t)length;
+
mLock.AssertCurrentThreadOwns();
// Buffer message until complete
- const uint8_t bufferFlags = BufferMessage(mRecvBuffer, buffer, length, ppid, flags);
+ const uint8_t bufferFlags = BufferMessage(mRecvBuffer, buffer, data_length, ppid, flags);
if (bufferFlags & DATA_CHANNEL_BUFFER_MESSAGE_FLAGS_TOO_LARGE) {
LOG(("DataChannel: Buffered message would become too large to handle, closing connection"));
mRecvBuffer.Truncate(0);
Stop();
return;
}
if (!(bufferFlags & DATA_CHANNEL_BUFFER_MESSAGE_FLAGS_COMPLETE)) {
- LOG(("Buffered partial DCEP message of length %u", length));
+ LOG(("Buffered partial DCEP message of length %u", data_length));
return;
}
if (bufferFlags & DATA_CHANNEL_BUFFER_MESSAGE_FLAGS_BUFFERED) {
buffer = reinterpret_cast<const void *>(mRecvBuffer.BeginReading());
- length = mRecvBuffer.Length();
+ data_length = mRecvBuffer.Length();
}
req = static_cast<const struct rtcweb_datachannel_open_request *>(buffer);
- LOG(("Handling DCEP message of length %u", length));
-
- NS_ENSURE_TRUE_VOID(((size_t)length) >= sizeof(*ack)); // smallest message
+ LOG(("Handling DCEP message of length %u", data_length));
+
+ // Ensure minimum message size (ack is the smallest DCEP message)
+ if ((size_t)data_length < sizeof(*ack)) {
+ LOG(("Ignored invalid DCEP message (too short)"));
+ return;
+ }
+
switch (req->msg_type) {
case DATA_CHANNEL_OPEN_REQUEST:
// structure includes a possibly-unused char label[1] (in a packed structure)
- NS_ENSURE_TRUE_VOID(((size_t)length) >= sizeof(*req) - 1);
-
- HandleOpenRequestMessage(req, length, stream);
+ if (NS_WARN_IF((size_t)data_length < sizeof(*req) - 1)) {
+ return;
+ }
+
+ HandleOpenRequestMessage(req, data_length, stream);
break;
case DATA_CHANNEL_ACK:
// >= sizeof(*ack) checked above
-
+
ack = static_cast<const struct rtcweb_datachannel_ack *>(buffer);
- HandleOpenAckMessage(ack, length, stream);
+ HandleOpenAckMessage(ack, data_length, stream);
break;
default:
- HandleUnknownMessage(ppid, length, stream);
+ HandleUnknownMessage(ppid, data_length, stream);
break;
}
-
+
// Reset buffer
mRecvBuffer.Truncate(0);
}
// Called with mLock locked!
void
DataChannelConnection::HandleMessage(const void *buffer, size_t length, uint32_t ppid,
- uint16_t stream, int32_t flags)
+ uint16_t stream, int flags)
{
mLock.AssertCurrentThreadOwns();
switch (ppid) {
case DATA_CHANNEL_PPID_CONTROL:
HandleDCEPMessage(buffer, length, ppid, stream, flags);
break;
case DATA_CHANNEL_PPID_DOMSTRING_PARTIAL:
@@ -1649,17 +1658,17 @@ DataChannelConnection::HandleAssociation
uint32_t i, n;
switch (sac->sac_state) {
case SCTP_COMM_UP:
LOG(("Association change: SCTP_COMM_UP"));
if (mState == CONNECTING) {
mSocket = mMasterSocket;
mState = OPEN;
-
+
// Check for older Firefox by looking at the amount of incoming streams
LOG(("Negotiated number of incoming streams: %" PRIu16, sac->sac_inbound_streams));
if (!mMaxMessageSizeSet
&& sac->sac_inbound_streams == WEBRTC_DATACHANNEL_STREAMS_OLDER_FIREFOX) {
LOG(("Older Firefox detected, using PPID-based fragmentation"));
mPpidFragmentation = true;
}
@@ -1696,17 +1705,20 @@ DataChannelConnection::HandleAssociation
break;
default:
LOG(("Association change: UNKNOWN"));
break;
}
LOG(("Association change: streams (in/out) = (%u/%u)",
sac->sac_inbound_streams, sac->sac_outbound_streams));
- NS_ENSURE_TRUE_VOID(sac);
+ if (NS_WARN_IF(!sac)) {
+ return;
+ }
+
n = sac->sac_length - sizeof(*sac);
if ((sac->sac_state == SCTP_COMM_UP) || (sac->sac_state == SCTP_RESTART)) {
if (n > 0) {
for (i = 0; i < n; ++i) {
switch (sac->sac_info[i]) {
case SCTP_ASSOC_SUPPORTS_PR:
LOG(("Supports: PR"));
break;
@@ -1830,39 +1842,37 @@ void
DataChannelConnection::HandleAdaptationIndication(const struct sctp_adaptation_event *sai)
{
LOG(("Adaptation indication: %x.", sai-> sai_adaptation_ind));
}
void
DataChannelConnection::HandlePartialDeliveryEvent(const struct sctp_pdapi_event *spde)
{
- // Note: Be aware that stream and sn being u32 instead of u16 is a bug in the SCTP API
- // This may change in the future.
-
+ // Note: Be aware that stream and sequence number being u32 instead of u16 is
+ // a bug in the SCTP API. This may change in the future.
+
LOG(("Partial delivery event: "));
switch (spde->pdapi_indication) {
case SCTP_PARTIAL_DELIVERY_ABORTED:
LOG(("delivery aborted "));
break;
default:
LOG(("??? "));
break;
}
- LOG(("(flags = %x) ", spde->pdapi_flags));
- LOG(("stream = %" PRIu32 " ", spde->pdapi_stream));
- LOG(("sn = %" PRIu32 " ", spde->pdapi_seq));
- LOG(("\n"));
-
+ LOG(("(flags = %x), stream = %" PRIu32 ", sn = %" PRIu32, spde->pdapi_flags, spde->pdapi_stream,
+ spde->pdapi_seq));
+
// Validate stream ID
if (spde->pdapi_stream >= UINT16_MAX) {
LOG(("Invalid stream id in partial delivery event: %" PRIu32 "\n", spde->pdapi_stream));
return;
}
-
+
// Find channel and reset buffer
DataChannel *channel = FindChannelByStream((uint16_t)spde->pdapi_stream);
if (channel) {
LOG(("Abort partially delivered message of %u bytes\n", channel->mRecvBuffer.Length()));
channel->mRecvBuffer.Truncate(0);
}
}
@@ -2088,17 +2098,17 @@ DataChannelConnection::HandleStreamChang
DataChannelOnMessageAvailable::ON_CHANNEL_CLOSED, this,
channel)));
// maybe fire onError (bug 843625)
} else {
stream = FindFreeStream();
if (stream != INVALID_STREAM) {
channel->mStream = stream;
mStreams[stream] = channel;
-
+
// Send open request
int error = SendOpenRequestMessage(
channel->mLabel, channel->mProtocol, channel->mStream,
!!(channel->mFlags & DATA_CHANNEL_FLAGS_OUT_OF_ORDER_ALLOWED), channel->mPrPolicy,
channel->mPrValue);
if (error) {
LOG(("SendOpenRequest failed, error = %d", error));
// Close the channel, inform the user
@@ -2176,17 +2186,17 @@ DataChannelConnection::HandleNotificatio
default:
LOG(("unknown SCTP event: %u", (uint32_t)notif->sn_header.sn_type));
break;
}
}
int
DataChannelConnection::ReceiveCallback(struct socket* sock, void *data, size_t datalen,
- struct sctp_rcvinfo rcv, int32_t flags)
+ struct sctp_rcvinfo rcv, int flags)
{
ASSERT_WEBRTC(!NS_IsMainThread());
if (!data) {
usrsctp_close(sock); // SCTP has finished shutting down
} else {
MutexAutoLock lock(mLock);
if (flags & MSG_NOTIFICATION) {
@@ -2242,17 +2252,17 @@ DataChannelConnection::Open(const nsACSt
if (aStream != INVALID_STREAM && aStream < mStreams.Length() && mStreams[aStream]) {
LOG(("ERROR: external negotiation of already-open channel %u", aStream));
// XXX How do we indicate this up to the application? Probably the
// caller's job, but we may need to return an error code.
return nullptr;
}
flags = !inOrder ? DATA_CHANNEL_FLAGS_OUT_OF_ORDER_ALLOWED : 0;
-
+
RefPtr<DataChannel> channel(new DataChannel(this,
aStream,
DataChannel::CONNECTING,
label, protocol,
prPolicy, prValue,
flags,
aListener, aContext));
if (aExternalNegotiated) {
@@ -2429,34 +2439,34 @@ request_error_cleanup:
// Requires mLock to be locked!
// Returns a POSIX error code directly instead of setting errno.
int
DataChannelConnection::SendMsgInternal(OutgoingMsg &msg)
{
auto &info = msg.GetInfo().sendv_sndinfo;
int error;
-
+
// EOR set?
bool eor_set = info.snd_flags & SCTP_EOR ? true : false;
-
+
// Send until buffer is empty
size_t left = msg.GetLeft();
do {
size_t length;
-
+
// Carefully chunk the buffer
if (left > DATA_CHANNEL_MAX_BINARY_FRAGMENT) {
length = DATA_CHANNEL_MAX_BINARY_FRAGMENT;
// Unset EOR flag
info.snd_flags &= ~SCTP_EOR;
} else {
length = left;
-
+
// Reset EOR flag
if (eor_set) {
info.snd_flags |= SCTP_EOR;
}
}
// Send (or try at least)
// SCTP will return EMSGSIZE if the message is bigger than the buffer
@@ -2485,17 +2495,17 @@ DataChannelConnection::SendMsgInternal(O
if ((size_t)written < length) {
msg.Advance((size_t)written);
error = EAGAIN;
goto out;
}
// Update buffer position
msg.Advance((size_t)written);
-
+
// Get amount of bytes left in the buffer
left = msg.GetLeft();
} while (left > 0);
// Done
error = 0;
out:
@@ -2510,35 +2520,35 @@ out:
// Requires mLock to be locked!
// Returns a POSIX error code directly instead of setting errno.
// IMPORTANT: Ensure that the buffer passed is guarded by mLock!
int
DataChannelConnection::SendMsgInternalOrBuffer(nsTArray<nsAutoPtr<BufferedOutgoingMsg>> &buffer,
OutgoingMsg &msg, bool &buffered)
{
NS_WARNING_ASSERTION(msg.GetLength() > 0, "Length is 0?!");
-
+
int error = 0;
bool need_buffering = false;
-
+
// Note: Main-thread IO, but doesn't block!
// XXX FIX! to deal with heavy overruns of JS trying to pass data in
// (more than the buffersize) queue data onto another thread to do the
// actual sends. See netwerk/protocol/websocket/WebSocketChannel.cpp
-
+
// Avoid a race between buffer-full-failure (where we have to add the
// packet to the buffered-data queue) and the buffer-now-only-half-full
// callback, which happens on a different thread. Otherwise we might
// fail here, then before we add it to the queue get the half-full
// callback, find nothing to do, then on this thread add it to the
// queue - which would sit there. Also, if we later send more data, it
// would arrive ahead of the buffered message, but if the buffer ever
// got to 1/2 full, the message would get sent - but at a semi-random
// time, after other data it was supposed to be in front of.
-
+
// Must lock before empty check for similar reasons!
mLock.AssertCurrentThreadOwns();
if (buffer.IsEmpty() && (mSendInterleaved || !mPendingType)) {
error = SendMsgInternal(msg);
switch (error) {
case 0:
break;
case EAGAIN:
@@ -2549,139 +2559,143 @@ DataChannelConnection::SendMsgInternalOr
break;
default:
LOG(("error %d on sending", error));
break;
}
} else {
need_buffering = true;
}
-
+
if (need_buffering) {
// queue data for resend! And queue any further data for the stream until it is...
auto *bufferedMsg = new BufferedOutgoingMsg(msg); // infallible malloc
buffer.AppendElement(bufferedMsg); // owned by mBufferedData array
LOG(("Queued %" PRIuSIZE " buffers (left=%" PRIuSIZE ", total=%" PRIuSIZE ")",
buffer.Length(), msg.GetLeft(), msg.GetLength()));
buffered = true;
return 0;
}
-
+
buffered = false;
return error;
}
// Caller must ensure that length <= UINT32_MAX
// Returns a POSIX error code.
int
DataChannelConnection::SendDataMsgInternalOrBuffer(DataChannel &channel, const uint8_t *data,
size_t len, uint32_t ppid)
{
- NS_ENSURE_TRUE(channel.mState == OPEN || channel.mState == CONNECTING, 0);
-
+ if (NS_WARN_IF(channel.mState != OPEN && channel.mState != CONNECTING)) {
+ return EINVAL; // TODO: Find a better error code
+ }
+
struct sctp_sendv_spa info = {0};
// General flags
info.sendv_flags = SCTP_SEND_SNDINFO_VALID;
// Set stream identifier, protocol identifier and flags
info.sendv_sndinfo.snd_sid = channel.mStream;
info.sendv_sndinfo.snd_flags = SCTP_EOR;
info.sendv_sndinfo.snd_ppid = htonl(ppid);
-
+
// Unordered?
// To avoid problems where an in-order OPEN is lost and an
// out-of-order data message "beats" it, require data to be in-order
// until we get an ACK.
if ((channel.mFlags & DATA_CHANNEL_FLAGS_OUT_OF_ORDER_ALLOWED) &&
!(channel.mFlags & DATA_CHANNEL_FLAGS_WAITING_ACK)) {
info.sendv_sndinfo.snd_flags |= SCTP_UNORDERED;
}
// Partial reliability policy
if (channel.mPrPolicy != SCTP_PR_SCTP_NONE) {
info.sendv_prinfo.pr_policy = channel.mPrPolicy;
info.sendv_prinfo.pr_value = channel.mPrValue;
info.sendv_flags |= SCTP_SEND_PRINFO_VALID;
}
-
+
// Create message instance and send
OutgoingMsg msg(info, data, len);
MutexAutoLock lock(mLock);
bool buffered;
int error = SendMsgInternalOrBuffer(channel.mBufferedData, msg, buffered);
-
+
// Set pending type and stream index (if buffered)
if (!error && buffered && !mPendingType) {
mPendingType = PENDING_DATA;
mCurrentStream = channel.mStream;
}
return error;
}
// Caller must ensure that length <= UINT32_MAX
// Returns a POSIX error code.
int
DataChannelConnection::SendDataMsg(DataChannel &channel, const uint8_t *data, size_t len,
uint32_t ppidPartial, uint32_t ppidFinal)
{
// We *really* don't want to do this from main thread! - and
// SendDataMsgInternalOrBuffer avoids blocking.
-
+
if (mPpidFragmentation) {
- // TODO: This block shall be removed after a transitional period.
-
+ // TODO: Bug 1381136, remove this block and all other code that uses PPIDs for fragmentation
+ // and reassembly once older Firefoxes without EOR are no longer supported as target
+ // clients.
+
// Use the deprecated PPID-level fragmentation if enabled. Should be enabled
// in case we can be certain that the other peer is an older Firefox browser
// that does support PPID-level fragmentation/reassembly.
-
+
// PPID-level fragmentation can only be applied on reliable data channels.
if (len > DATA_CHANNEL_MAX_BINARY_FRAGMENT &&
channel.mPrPolicy == DATA_CHANNEL_RELIABLE &&
!(channel.mFlags & DATA_CHANNEL_FLAGS_OUT_OF_ORDER_ALLOWED)) {
LOG(("Sending data message (total=%" PRIuSIZE ") using deprecated PPID-based chunks", len));
-
+
size_t left = len;
while (left > 0) {
// Note: For correctness, chunkLen should also consider mMaxMessageSize as minimum but as
// this block is going to be removed soon, I see no need for it.
size_t chunkLen = std::min<size_t>(left, DATA_CHANNEL_MAX_BINARY_FRAGMENT);
left -= chunkLen;
uint32_t ppid = left > 0 ? ppidPartial : ppidFinal;
-
+
// Send the chunk
// Note that these might end up being deferred and queued.
LOG(("Send chunk (len=%" PRIuSIZE ", left=%" PRIuSIZE ", total=%" PRIuSIZE ", ppid %u",
chunkLen, left, len, ppid));
int error = SendDataMsgInternalOrBuffer(channel, data, chunkLen, ppid);
if (error) {
LOG(("*** send chunk fail %d", error));
return error;
}
-
+
// Update data position
data += chunkLen;
}
-
+
// Sending chunks complete
LOG(("Sent %" PRIuSIZE " chunks using deprecated PPID-based fragmentation",
(size_t)(len+DATA_CHANNEL_MAX_BINARY_FRAGMENT-1)/DATA_CHANNEL_MAX_BINARY_FRAGMENT));
return 0;
}
-
+
// Cannot do PPID-based fragmentaton on unreliable channels
NS_WARNING_ASSERTION(len <= DATA_CHANNEL_MAX_BINARY_FRAGMENT,
"Sending too-large data on unreliable channel!");
} else {
if (mMaxMessageSize != 0 && len > mMaxMessageSize) {
LOG(("Message rejected, too large (%" PRIuSIZE " > %" PRIu64 ")", len, mMaxMessageSize));
return EMSGSIZE;
}
}
-
+
// This will use EOR-based fragmentation if the message is too large (> 64 KiB)
return SendDataMsgInternalOrBuffer(channel, data, len, ppidFinal);
}
class ReadBlobRunnable : public Runnable {
public:
ReadBlobRunnable(DataChannelConnection* aConnection,
uint16_t aStream,
@@ -2711,17 +2725,20 @@ private:
RefPtr<nsIInputStream> mBlob;
};
// Returns a POSIX error code.
int
DataChannelConnection::SendBlob(uint16_t stream, nsIInputStream *aBlob)
{
DataChannel *channel = mStreams[stream];
- NS_ENSURE_TRUE(channel, 0);
+ if (NS_WARN_IF(!channel)) {
+ return EINVAL; // TODO: Find a better error code
+ }
+
// Spawn a thread to send the data
if (!mInternalIOThread) {
nsresult rv = NS_NewNamedThread("DataChannel IO",
getter_AddRefs(mInternalIOThread));
if (NS_FAILED(rv)) {
return EINVAL; // TODO: Find a better error code
}
}
@@ -2830,24 +2847,26 @@ DataChannelConnection::SendDataMsgCommon
const uint8_t *data = (const uint8_t *)aMsg.BeginReading();
uint32_t len = aMsg.Length();
#if (UINT32_MAX > SIZE_MAX)
if (len > SIZE_MAX) {
return EMSGSIZE;
}
#endif
- DataChannel *channelp;
+ DataChannel *channelPtr;
LOG(("Sending %sto stream %u: %u bytes", isBinary ? "binary " : "", stream, len));
// XXX if we want more efficiency, translate flags once at open time
- channelp = mStreams[stream];
- NS_ENSURE_TRUE(channelp, 0);
-
- auto &channel = *channelp;
+ channelPtr = mStreams[stream];
+ if (NS_WARN_IF(!channelPtr)) {
+ return EINVAL; // TODO: Find a better error code
+ }
+
+ auto &channel = *channelPtr;
if (isBinary) {
return SendDataMsg(channel, data, len,
DATA_CHANNEL_PPID_BINARY_PARTIAL, DATA_CHANNEL_PPID_BINARY);
} else {
return SendDataMsg(channel, data, len,
DATA_CHANNEL_PPID_DOMSTRING_PARTIAL, DATA_CHANNEL_PPID_DOMSTRING);
}
--- a/netwerk/sctp/datachannel/DataChannel.h
+++ b/netwerk/sctp/datachannel/DataChannel.h
@@ -84,17 +84,17 @@ public:
~BufferedOutgoingMsg();
};
// for queuing incoming data messages before the Open or
// external negotiation is indicated to us
class QueuedDataMessage
{
public:
- QueuedDataMessage(uint16_t stream, uint32_t ppid, int32_t flags,
+ QueuedDataMessage(uint16_t stream, uint32_t ppid, int flags,
const void *data, uint32_t length)
: mStream(stream)
, mPpid(ppid)
, mFlags(flags)
, mLength(length)
{
mData = static_cast<uint8_t *>(moz_xmalloc((size_t)length)); // infallible
memcpy(mData, data, (size_t)length);
@@ -102,17 +102,17 @@ public:
~QueuedDataMessage()
{
free(mData);
}
uint16_t mStream;
uint32_t mPpid;
- int32_t mFlags;
+ int mFlags;
uint32_t mLength;
uint8_t *mData;
};
// One per PeerConnection
class DataChannelConnection final
: public net::NeckoTargetHolder
#ifdef SCTP_DTLS_SUPPORTED
@@ -188,30 +188,30 @@ public:
void CloseInt(DataChannel *aChannel);
void CloseAll();
// Returns a POSIX error code.
int SendMsg(uint16_t stream, const nsACString &aMsg)
{
return SendDataMsgCommon(stream, aMsg, false);
}
-
+
// Returns a POSIX error code.
int SendBinaryMsg(uint16_t stream, const nsACString &aMsg)
{
return SendDataMsgCommon(stream, aMsg, true);
}
-
+
// Returns a POSIX error code.
int SendBlob(uint16_t stream, nsIInputStream *aBlob);
// Called on data reception from the SCTP library
// must(?) be public so my c->c++ trampoline can call it
int ReceiveCallback(struct socket* sock, void *data, size_t datalen,
- struct sctp_rcvinfo rcv, int32_t flags);
+ struct sctp_rcvinfo rcv, int flags);
// Find out state
enum {
CONNECTING = 0U,
OPEN = 1U,
CLOSING = 2U,
CLOSED = 3U
};
@@ -269,22 +269,22 @@ private:
void SendOutgoingStreamReset();
void ResetOutgoingStream(uint16_t stream);
void HandleOpenRequestMessage(const struct rtcweb_datachannel_open_request *req,
uint32_t length, uint16_t stream);
void HandleOpenAckMessage(const struct rtcweb_datachannel_ack *ack,
uint32_t length, uint16_t stream);
void HandleUnknownMessage(uint32_t ppid, uint32_t length, uint16_t stream);
uint8_t BufferMessage(nsACString& recvBuffer, const void *data, uint32_t length, uint32_t ppid,
- int32_t flags);
- void HandleDataMessage(const void *buffer, size_t dLength, uint32_t ppid, uint16_t stream,
- int32_t flags);
- void HandleDCEPMessage(const void *buffer, size_t dLength, uint32_t ppid, uint16_t stream,
- int32_t flags);
- void HandleMessage(const void *buffer, size_t length, uint32_t ppid, uint16_t stream, int32_t flags);
+ int flags);
+ void HandleDataMessage(const void *buffer, size_t length, uint32_t ppid, uint16_t stream,
+ int flags);
+ void HandleDCEPMessage(const void *buffer, size_t length, uint32_t ppid, uint16_t stream,
+ int flags);
+ void HandleMessage(const void *buffer, size_t length, uint32_t ppid, uint16_t stream, int flags);
void HandleAssociationChangeEvent(const struct sctp_assoc_change *sac);
void HandlePeerAddressChangeEvent(const struct sctp_paddr_change *spc);
void HandleRemoteErrorEvent(const struct sctp_remote_error *sre);
void HandleShutdownEvent(const struct sctp_shutdown_event *sse);
void HandleAdaptationIndication(const struct sctp_adaptation_event *sai);
void HandlePartialDeliveryEvent(const struct sctp_pdapi_event *spde);
void HandleSendFailedEvent(const struct sctp_send_failed_event *ssfe);
void HandleStreamResetEvent(const struct sctp_stream_reset_event *strrst);
@@ -421,23 +421,23 @@ public:
uint32_t GetBufferedAmount()
{
if (!mConnection) {
return 0;
}
MutexAutoLock lock(mConnection->mLock);
size_t buffered = GetBufferedAmountLocked();
-
+
#if (SIZE_MAX > UINT32_MAX)
if (buffered > UINT32_MAX) { // paranoia - >4GB buffered is very very unlikely
buffered = UINT32_MAX;
}
#endif
-
+
return buffered;
}
// Trigger amount for generating BufferedAmountLow events
uint32_t GetBufferedAmountLowThreshold();
void SetBufferedAmountLowThreshold(uint32_t aThreshold);
@@ -508,17 +508,17 @@ public:
BUFFER_LOW_THRESHOLD,
NO_LONGER_BUFFERED,
}; /* types */
DataChannelOnMessageAvailable(
int32_t aType,
DataChannelConnection* aConnection,
DataChannel* aChannel,
- nsCString& aData) // XXX this causes inefficiency)
+ nsCString& aData) // XXX this causes inefficiency
: Runnable("DataChannelOnMessageAvailable")
, mType(aType)
, mChannel(aChannel)
, mConnection(aConnection)
, mData(aData)
{
}