Bug 979417 - Implement EOR when receiving and explicit EOR when sending on data channels (including DCEP). r?jesup draft
authorLennart Grahl <lennart.grahl@gmail.com>
Wed, 26 Jul 2017 13:18:54 +0200
changeset 644754 2eea818fa77951110dd4d792fc620e70f9a064ba
parent 644753 c4eb424ae51815c00c532dfb550d463b29dcd63a
child 725701 270c89f9d1724dc15688320be2265542314a3cd0
push id73536
push userbmo:lennart.grahl@gmail.com
push dateFri, 11 Aug 2017 09:40:50 +0000
reviewersjesup
bugs979417
milestone57.0a1
Bug 979417 - Implement EOR when receiving and explicit EOR when sending on data channels (including DCEP). r?jesup This allows sending and receiving arbitrarily (we limit to 1 GiB atm) sized messages while not relying on the deprecated PPID fragmentation/reassembly mode. The code already supports the ndata extension but it's not activated, yet. Without the SCTP ndata extension, a large data channel message will monopolise the SCTP association. While this is a problem, it is a temporary solution until the extension is being activated. Keep in mind that every application that uses data channels currently does fragmentation/reassembly on application-level and it's unlikely that this will change until the popular implementations (libwebrtc) implement EOR as well. Moreover, until the WebRTC API specifies an API that hands over partial messages, doing application-level fragmentation/reassembly is still useful for very large messages (sadly). We fall back to PPID-based fragmentation/reassembly mode IFF a=max-message-size is not set in the SDP and the negotiated amount of SCTP inbound streams is exactly 256. Other implementations should avoid using this combination (to be precise, other implementations should send a=max-message-size). It also changes behaviour of RTCDataChannel.send which now raises TypeError in case the message is too large for the other peer to receive. This is a necessity to ensure that implementations that do not look at the EOR flag when receiving are always able to receive our messages. Even if these implementations do not set a=max-message-size, we use a safe default value (64 KiB, dictated by the spec) that every implementation should be able to receive, with or without EOR support. * Due to the use of explicit EOR, this required some major refactoring of all send-related and deferred sending functions (which is now a lot less complex). There's now only one place where `usrsctp_sendv` is being used. * All data channel messages and DCEP messages will be sent without copying them first. Only in case this fails (e.g. usrsctp's buffer is full), the message will be copied and added to a buffer queue. * Queued data channel messages will now be re-sent fairly (round-robin). * Maximum message size and the PPID-based fragmentation are configurable using about:config (media.peerconnection.sctp.force_ppid_fragmentation and media.peerconnection.sctp.force_maximum_message_size). * Enable interleaving of incoming messages for different streams (preparation for SCTP ndata, has no effect until it is enabled). * Enable interleaving of outgoing messages (disabled if SCTP ndata has not been negotiated). * Add pending messages flag to reduce performance impact from frequent calls to SendDeferredMessages. * Handle partial delivery events (for cases where a partially delivered message is being aborted). * Close a data channel/the connection in case the message is too large to be handled (this is only applied in cases where the remote peer ignores our announced local maximum message size). * Various size_t to uint32_t conversions (message length) and back should be safe now. * Remove aUsingDtls/mUsingDtls from DataChannelConnection. * Set maximum message size in SDP and in the data channel stack. * Replace implicit NS_ENSURE_*'s with explicit NS_WARN_IF's. * Add SetMaxMessageSize method for late-applying those signalling parameters when a data channel has been created before the remote SDP was available. * Limit remote maximum message size and add a GetMaxMessageSize method for a future implementation of RTCSctpTransport.maxMessageSize. MozReview-Commit-ID: FlmZrpC5zVI
dom/base/nsDOMDataChannel.cpp
dom/base/nsDOMDataChannel.h
media/webrtc/signaling/src/jsep/JsepCodecDescription.h
media/webrtc/signaling/src/jsep/JsepSessionImpl.cpp
media/webrtc/signaling/src/peerconnection/PeerConnectionImpl.cpp
netwerk/sctp/datachannel/DataChannel.cpp
netwerk/sctp/datachannel/DataChannel.h
netwerk/sctp/datachannel/DataChannelProtocol.h
--- a/dom/base/nsDOMDataChannel.cpp
+++ b/dom/base/nsDOMDataChannel.cpp
@@ -265,17 +265,17 @@ nsDOMDataChannel::Close()
   return NS_OK;
 }
 
 // All of the following is copy/pasted from WebSocket.cpp.
 void
 nsDOMDataChannel::Send(const nsAString& aData, ErrorResult& aRv)
 {
   NS_ConvertUTF16toUTF8 msgString(aData);
-  Send(nullptr, msgString, msgString.Length(), false, aRv);
+  Send(nullptr, msgString, false, aRv);
 }
 
 void
 nsDOMDataChannel::Send(Blob& aData, ErrorResult& aRv)
 {
   MOZ_ASSERT(NS_IsMainThread(), "Not running on main thread");
 
   nsCOMPtr<nsIInputStream> msgStream;
@@ -289,55 +289,54 @@ nsDOMDataChannel::Send(Blob& aData, Erro
     return;
   }
 
   if (msgLength > UINT32_MAX) {
     aRv.Throw(NS_ERROR_FILE_TOO_BIG);
     return;
   }
 
-  Send(msgStream, EmptyCString(), msgLength, true, aRv);
+  Send(msgStream, EmptyCString(), true, aRv);
 }
 
 void
 nsDOMDataChannel::Send(const ArrayBuffer& aData, ErrorResult& aRv)
 {
   MOZ_ASSERT(NS_IsMainThread(), "Not running on main thread");
 
   aData.ComputeLengthAndData();
 
   static_assert(sizeof(*aData.Data()) == 1, "byte-sized data required");
 
   uint32_t len = aData.Length();
   char* data = reinterpret_cast<char*>(aData.Data());
 
   nsDependentCSubstring msgString(data, len);
-  Send(nullptr, msgString, len, true, aRv);
+  Send(nullptr, msgString, true, aRv);
 }
 
 void
 nsDOMDataChannel::Send(const ArrayBufferView& aData, ErrorResult& aRv)
 {
   MOZ_ASSERT(NS_IsMainThread(), "Not running on main thread");
 
   aData.ComputeLengthAndData();
 
   static_assert(sizeof(*aData.Data()) == 1, "byte-sized data required");
 
   uint32_t len = aData.Length();
   char* data = reinterpret_cast<char*>(aData.Data());
 
   nsDependentCSubstring msgString(data, len);
-  Send(nullptr, msgString, len, true, aRv);
+  Send(nullptr, msgString, true, aRv);
 }
 
 void
 nsDOMDataChannel::Send(nsIInputStream* aMsgStream,
                        const nsACString& aMsgString,
-                       uint32_t aMsgLength,
                        bool aIsBinary,
                        ErrorResult& aRv)
 {
   MOZ_ASSERT(NS_IsMainThread());
   uint16_t state = mozilla::DataChannel::CLOSED;
   if (!mSentClose) {
     state = mDataChannel->GetReadyState();
   }
@@ -352,29 +351,25 @@ nsDOMDataChannel::Send(nsIInputStream* a
   if (state == mozilla::DataChannel::CLOSING ||
       state == mozilla::DataChannel::CLOSED) {
     return;
   }
 
   MOZ_ASSERT(state == mozilla::DataChannel::OPEN,
              "Unknown state in nsDOMDataChannel::Send");
 
-  bool sent;
   if (aMsgStream) {
-    sent = mDataChannel->SendBinaryStream(aMsgStream, aMsgLength);
+    mDataChannel->SendBinaryStream(aMsgStream, aRv);
   } else {
     if (aIsBinary) {
-      sent = mDataChannel->SendBinaryMsg(aMsgString);
+      mDataChannel->SendBinaryMsg(aMsgString, aRv);
     } else {
-      sent = mDataChannel->SendMsg(aMsgString);
+      mDataChannel->SendMsg(aMsgString, aRv);
     }
   }
-  if (!sent) {
-    aRv.Throw(NS_ERROR_FAILURE);
-  }
 }
 
 nsresult
 nsDOMDataChannel::DoOnMessageAvailable(const nsACString& aData,
                                        bool aBinary)
 {
   MOZ_ASSERT(NS_IsMainThread());
 
--- a/dom/base/nsDOMDataChannel.h
+++ b/dom/base/nsDOMDataChannel.h
@@ -123,17 +123,17 @@ public:
   // (and possibly collected).
   void DontKeepAliveAnyMore();
 
 protected:
   ~nsDOMDataChannel();
 
 private:
   void Send(nsIInputStream* aMsgStream, const nsACString& aMsgString,
-            uint32_t aMsgLength, bool aIsBinary, mozilla::ErrorResult& aRv);
+            bool aIsBinary, mozilla::ErrorResult& aRv);
 
   void ReleaseSelf();
 
   // to keep us alive while we have listeners
   RefPtr<nsDOMDataChannel> mSelfRef;
   // Owning reference
   RefPtr<mozilla::DataChannel> mDataChannel;
   nsString  mOrigin;
--- a/media/webrtc/signaling/src/jsep/JsepCodecDescription.h
+++ b/media/webrtc/signaling/src/jsep/JsepCodecDescription.h
@@ -814,17 +814,17 @@ class JsepApplicationCodecDescription : 
   {
     JsepCodecDescription::Negotiate(pt, remoteMsection);
 
     uint32_t message_size;
     mRemoteMMSSet = remoteMsection.GetMaxMessageSize(&message_size);
     if (mRemoteMMSSet) {
       mRemoteMaxMessageSize = message_size;
     } else {
-      mRemoteMaxMessageSize = WEBRTC_DATACHANELL_MAX_MESSAGE_SIZE_DEFAULT;
+      mRemoteMaxMessageSize = WEBRTC_DATACHANNEL_MAX_MESSAGE_SIZE_REMOTE_DEFAULT;
     }
 
     int sctp_port = remoteMsection.GetSctpPort();
     if (sctp_port) {
       mRemotePort = sctp_port;
       return true;
     }
 
--- a/media/webrtc/signaling/src/jsep/JsepSessionImpl.cpp
+++ b/media/webrtc/signaling/src/jsep/JsepSessionImpl.cpp
@@ -2362,19 +2362,17 @@ JsepSessionImpl::SetupDefaultCodecs()
       90000     // clock rate (match other video codecs)
       );
   mSupportedCodecs.values.push_back(ulpfec);
 
   mSupportedCodecs.values.push_back(new JsepApplicationCodecDescription(
       "webrtc-datachannel",
       WEBRTC_DATACHANNEL_STREAMS_DEFAULT,
       WEBRTC_DATACHANNEL_PORT_DEFAULT,
-      // TODO: Bug 979417 needs to change this to
-      // WEBRTC_DATACHANELL_MAX_MESSAGE_SIZE_DEFAULT
-      0
+      WEBRTC_DATACHANNEL_MAX_MESSAGE_SIZE_LOCAL
       ));
 
   // Update the redundant encodings for the RED codec with the supported
   // codecs.  Note: only uses the video codecs.
   red->UpdateRedundantEncodings(mSupportedCodecs.values);
 }
 
 void
--- a/media/webrtc/signaling/src/peerconnection/PeerConnectionImpl.cpp
+++ b/media/webrtc/signaling/src/peerconnection/PeerConnectionImpl.cpp
@@ -1075,37 +1075,39 @@ PeerConnectionImpl::ConfigureJsepSession
 
   mJsepSession->SortCodecs(comparator);
   return NS_OK;
 }
 
 // Data channels won't work without a window, so in order for the C++ unit
 // tests to work (it doesn't have a window available) we ifdef the following
 // two implementations.
+//
+// Note: 'media.peerconnection.sctp.force_ppid_fragmentation' and
+//       'media.peerconnection.sctp.force_maximum_message_size' change behaviour triggered by
+//       these parameters.
 NS_IMETHODIMP
 PeerConnectionImpl::EnsureDataConnection(uint16_t aLocalPort,
                                          uint16_t aNumstreams,
                                          uint32_t aMaxMessageSize,
                                          bool aMMSSet)
 {
   PC_AUTO_ENTER_API_CALL(false);
 
   if (mDataConnection) {
     CSFLogDebug(logTag,"%s DataConnection already connected",__FUNCTION__);
-    // Ignore the request to connect when already connected.  This entire
-    // implementation is temporary.  Ignore aNumstreams as it's merely advisory
-    // and we increase the number of streams dynamically as needed.
+    mDataConnection->SetMaxMessageSize(aMMSSet, aMaxMessageSize);
     return NS_OK;
   }
 
   nsCOMPtr<nsIEventTarget> target = mWindow
       ? mWindow->EventTargetFor(TaskCategory::Other)
       : nullptr;
   mDataConnection = new DataChannelConnection(this, target);
-  if (!mDataConnection->Init(aLocalPort, aNumstreams, true)) {
+  if (!mDataConnection->Init(aLocalPort, aNumstreams, aMMSSet, aMaxMessageSize)) {
     CSFLogError(logTag,"%s DataConnection Init Failed",__FUNCTION__);
     return NS_ERROR_FAILURE;
   }
   CSFLogDebug(logTag,"%s DataChannelConnection %p attached to %s",
               __FUNCTION__, (void*) mDataConnection.get(), mHandle.c_str());
   return NS_OK;
 }
 
@@ -1314,17 +1316,17 @@ PeerConnectionImpl::CreateDataChannel(co
   MOZ_ASSERT(aRetval);
 
   RefPtr<DataChannel> dataChannel;
   DataChannelConnection::Type theType =
     static_cast<DataChannelConnection::Type>(aType);
 
   nsresult rv = EnsureDataConnection(WEBRTC_DATACHANNEL_PORT_DEFAULT,
                                      WEBRTC_DATACHANNEL_STREAMS_DEFAULT,
-                                     WEBRTC_DATACHANELL_MAX_MESSAGE_SIZE_DEFAULT,
+                                     WEBRTC_DATACHANNEL_MAX_MESSAGE_SIZE_REMOTE_DEFAULT,
                                      false);
   if (NS_FAILED(rv)) {
     return rv;
   }
   dataChannel = mDataConnection->Open(
     NS_ConvertUTF16toUTF8(aLabel), NS_ConvertUTF16toUTF8(aProtocol), theType,
     ordered,
     aType == DataChannelConnection::PARTIAL_RELIABLE_REXMIT ? aMaxNum :
--- a/netwerk/sctp/datachannel/DataChannel.cpp
+++ b/netwerk/sctp/datachannel/DataChannel.cpp
@@ -30,16 +30,18 @@
 #pragma warning(pop)
 #endif
 
 #include "DataChannelLog.h"
 
 #include "nsServiceManagerUtils.h"
 #include "nsIObserverService.h"
 #include "nsIObserver.h"
+#include "nsIPrefBranch.h"
+#include "nsIPrefService.h"
 #include "mozilla/Services.h"
 #include "mozilla/Sprintf.h"
 #include "nsProxyRelease.h"
 #include "nsThread.h"
 #include "nsThreadUtils.h"
 #include "nsAutoPtr.h"
 #include "nsNetUtil.h"
 #include "nsNetCID.h"
@@ -122,29 +124,48 @@ public:
       (void) rv;
     }
     return NS_OK;
   }
 };
 
 NS_IMPL_ISUPPORTS(DataChannelShutdown, nsIObserver);
 
-BufferedMsg::BufferedMsg(struct sctp_sendv_spa &spa, const char *data,
-                         size_t length) : mLength(length)
+OutgoingMsg::OutgoingMsg(struct sctp_sendv_spa &info, const uint8_t *data,
+                         size_t length)
+  : mLength(length)
+  , mData(data)
+{
+  mInfo = &info;
+  mPos = 0;
+}
+
+void OutgoingMsg::Advance(size_t offset)
 {
-  mSpa = new sctp_sendv_spa;
-  *mSpa = spa;
-  auto *tmp = new char[length]; // infallible malloc!
-  memcpy(tmp, data, length);
+  mPos += offset;
+  if (mPos > mLength) {
+    mPos = mLength;
+  }
+}
+
+BufferedOutgoingMsg::BufferedOutgoingMsg(OutgoingMsg &msg)
+{
+  size_t length = msg.GetLeft();
+  auto *tmp = new uint8_t[length]; // infallible malloc!
+  memcpy(tmp, msg.GetData(), length);
+  mLength = length;
   mData = tmp;
+  mInfo = new sctp_sendv_spa;
+  *mInfo = msg.GetInfo();
+  mPos = 0;
 }
 
-BufferedMsg::~BufferedMsg()
+BufferedOutgoingMsg::~BufferedOutgoingMsg()
 {
-  delete mSpa;
+  delete mInfo;
   delete mData;
 }
 
 static int
 receive_cb(struct socket* sock, union sctp_sockstore addr,
            void *data, size_t datalen,
            struct sctp_rcvinfo rcv, int flags, void *ulp_info)
 {
@@ -175,17 +196,16 @@ GetConnectionFromSocket(struct socket* s
 }
 
 // called when the buffer empties to the threshold value
 static int
 threshold_event(struct socket* sock, uint32_t sb_free)
 {
   DataChannelConnection *connection = GetConnectionFromSocket(sock);
   if (connection) {
-    LOG(("SendDeferred()"));
     connection->SendDeferredMessages();
   } else {
     LOG(("Can't find connection for socket %p", sock));
   }
   return 0;
 }
 
 static void
@@ -207,22 +227,24 @@ debug_printf(const char *format, ...)
   }
 }
 
 DataChannelConnection::DataChannelConnection(DataConnectionListener *listener,
                                              nsIEventTarget *aTarget)
   : NeckoTargetHolder(aTarget)
   , mLock("netwerk::sctp::DataChannelConnection")
 {
+  mCurrentStream = 0;
   mState = CLOSED;
   mSocket = nullptr;
   mMasterSocket = nullptr;
   mListener = listener;
   mLocalPort = 0;
   mRemotePort = 0;
+  mPendingType = PENDING_NONE;
   LOG(("Constructor DataChannelConnection=%p, listener=%p", this, mListener.get()));
   mInternalIOThread = nullptr;
 }
 
 DataChannelConnection::~DataChannelConnection()
 {
   LOG(("Deleting DataChannelConnection %p", (void *) this));
   // This may die on the MainThread, or on the STS thread
@@ -271,20 +293,18 @@ DataChannelConnection::Destroy()
   // we can deregister this DataChannelConnection without leaking.
   ClearResets();
 
   MOZ_ASSERT(mSTS);
   ASSERT_WEBRTC(NS_IsMainThread());
   // Must do this in Destroy() since we may then delete this object.
   // Do this before dispatching to create a consistent ordering of calls to
   // the SCTP stack.
-  if (mUsingDtls) {
-    usrsctp_deregister_address(static_cast<void *>(this));
-    LOG(("Deregistered %p from the SCTP stack.", static_cast<void *>(this)));
-  }
+  usrsctp_deregister_address(static_cast<void *>(this));
+  LOG(("Deregistered %p from the SCTP stack.", static_cast<void *>(this)));
 
   // Finish Destroy on STS thread to avoid bug 876167 - once that's fixed,
   // the usrsctp_close() calls can move back here (and just proxy the
   // disconnect_all())
   RUN_ON_THREAD(mSTS, WrapRunnable(RefPtr<DataChannelConnection>(this),
                                    &DataChannelConnection::DestroyOnSTS,
                                    mSocket, mMasterSocket),
                 NS_DISPATCH_NORMAL);
@@ -306,80 +326,85 @@ void DataChannelConnection::DestroyOnSTS
     usrsctp_close(aSocket);
   if (aMasterSocket)
     usrsctp_close(aMasterSocket);
 
   disconnect_all();
 }
 
 bool
-DataChannelConnection::Init(unsigned short aPort, uint16_t aNumStreams, bool aUsingDtls)
+DataChannelConnection::Init(unsigned short aPort, uint16_t aNumStreams, bool aMaxMessageSizeSet,
+                            uint64_t aMaxMessageSize)
 {
   struct sctp_initmsg initmsg;
-  struct sctp_udpencaps encaps;
   struct sctp_assoc_value av;
   struct sctp_event event;
   socklen_t len;
 
   uint16_t event_types[] = {SCTP_ASSOC_CHANGE,
                             SCTP_PEER_ADDR_CHANGE,
                             SCTP_REMOTE_ERROR,
                             SCTP_SHUTDOWN_EVENT,
                             SCTP_ADAPTATION_INDICATION,
+                            SCTP_PARTIAL_DELIVERY_EVENT,
                             SCTP_SEND_FAILED_EVENT,
                             SCTP_STREAM_RESET_EVENT,
                             SCTP_STREAM_CHANGE_EVENT};
   {
     ASSERT_WEBRTC(NS_IsMainThread());
-
     // MutexAutoLock lock(mLock); Not needed since we're on mainthread always
+
+    mSendInterleaved = false;
+    mPpidFragmentation = false;
+    SetMaxMessageSize(aMaxMessageSizeSet, aMaxMessageSize);
+
     if (!sctp_initialized) {
-      if (aUsingDtls) {
-        LOG(("sctp_init(DTLS)"));
+      LOG(("sctp_init"));
 #ifdef MOZ_PEERCONNECTION
-        usrsctp_init(0,
-                     DataChannelConnection::SctpDtlsOutput,
-                     debug_printf
-                    );
+      usrsctp_init(0,
+                   DataChannelConnection::SctpDtlsOutput,
+                   debug_printf
+                  );
 #else
-        NS_ASSERTION(!aUsingDtls, "Trying to use SCTP/DTLS without mtransport");
+      MOZ_CRASH("Trying to use SCTP/DTLS without mtransport");
 #endif
-      } else {
-        LOG(("sctp_init(%u)", aPort));
-        usrsctp_init(aPort,
-                     nullptr,
-                     debug_printf
-                    );
-      }
 
       // Set logging to SCTP:LogLevel::Debug to get SCTP debugs
       if (MOZ_LOG_TEST(gSCTPLog, LogLevel::Debug)) {
         usrsctp_sysctl_set_sctp_debug_on(SCTP_DEBUG_ALL);
       }
 
+      // Do not send ABORTs in response to INITs (1).
+      // Do not send ABORTs for received Out of the Blue packets (2).
       usrsctp_sysctl_set_sctp_blackhole(2);
-      // ECN is currently not supported by the Firefox code
+
+      // Disable the Explicit Congestion Notification extension (currently not supported by the
+      // Firefox code)
       usrsctp_sysctl_set_sctp_ecn_enable(0);
+
+      // Enable interleaving messages for different streams (incoming)
+      // See: https://tools.ietf.org/html/rfc6458#section-8.1.20
+      usrsctp_sysctl_set_sctp_default_frag_interleave(2);
+
       sctp_initialized = true;
 
       RefPtr<DataChannelShutdown> shutdown = new DataChannelShutdown();
       shutdown->Init();
     }
   }
 
   // XXX FIX! make this a global we get once
   // Find the STS thread
   nsresult rv;
   mSTS = do_GetService(NS_SOCKETTRANSPORTSERVICE_CONTRACTID, &rv);
   MOZ_ASSERT(NS_SUCCEEDED(rv));
 
   // Open sctp with a callback
   if ((mMasterSocket = usrsctp_socket(
-         aUsingDtls ? AF_CONN : AF_INET,
-         SOCK_STREAM, IPPROTO_SCTP, receive_cb, threshold_event,
+         AF_CONN, SOCK_STREAM, IPPROTO_SCTP, receive_cb, threshold_event,
          usrsctp_sysctl_get_sctp_sendspace() / 2, this)) == nullptr) {
     return false;
   }
 
   // Make non-blocking for bind/connect.  SCTP over UDP defaults to non-blocking
   // in associations for normal IO
   if (usrsctp_set_non_blocking(mMasterSocket, 1) < 0) {
     LOG(("Couldn't set non_blocking on SCTP socket"));
@@ -399,40 +424,49 @@ DataChannelConnection::Init(unsigned sho
     // unsafe to allow it to continue if this fails
     goto error_cleanup;
   }
 
   // XXX Consider disabling this when we add proper SDP negotiation.
   // We may want to leave enabled for supporting 'cloning' of SDP offers, which
   // implies re-use of the same pseudo-port number, or forcing a renegotiation.
   {
-    uint32_t on = 1;
+    const int option_value = 1;
     if (usrsctp_setsockopt(mMasterSocket, IPPROTO_SCTP, SCTP_REUSE_PORT,
-                           (const void *)&on, (socklen_t)sizeof(on)) < 0) {
+                           (const void *)&option_value, (socklen_t)sizeof(option_value)) < 0) {
       LOG(("Couldn't set SCTP_REUSE_PORT on SCTP socket"));
     }
     if (usrsctp_setsockopt(mMasterSocket, IPPROTO_SCTP, SCTP_NODELAY,
-                           (const void *)&on, (socklen_t)sizeof(on)) < 0) {
+                           (const void *)&option_value, (socklen_t)sizeof(option_value)) < 0) {
       LOG(("Couldn't set SCTP_NODELAY on SCTP socket"));
     }
   }
 
-  if (!aUsingDtls) {
-    memset(&encaps, 0, sizeof(encaps));
-    encaps.sue_address.ss_family = AF_INET;
-    encaps.sue_port = htons(aPort);
-    if (usrsctp_setsockopt(mMasterSocket, IPPROTO_SCTP, SCTP_REMOTE_UDP_ENCAPS_PORT,
-                           (const void*)&encaps,
-                           (socklen_t)sizeof(struct sctp_udpencaps)) < 0) {
-      LOG(("*** failed encaps errno %d", errno));
+  // Set explicit EOR
+  {
+    const int option_value = 1;
+    if (usrsctp_setsockopt(mMasterSocket, IPPROTO_SCTP, SCTP_EXPLICIT_EOR,
+                           (const void *)&option_value, (socklen_t)sizeof(option_value)) < 0) {
+      LOG(("*** failed enable explicit EOR mode %d", errno));
       goto error_cleanup;
     }
-    LOG(("SCTP encapsulation local port %d", aPort));
   }
 
+  // Enable ndata
+  // TODO: Bug 1381145, enable this once ndata has been deployed
+#if 0
+  av.assoc_id = SCTP_FUTURE_ASSOC;
+  av.assoc_value = 1;
+  if (usrsctp_setsockopt(mMasterSocket, IPPROTO_SCTP, SCTP_INTERLEAVING_SUPPORTED, &av,
+                         (socklen_t)sizeof(struct sctp_assoc_value)) < 0) {
+    LOG(("*** failed enable ndata errno %d", errno));
+    goto error_cleanup;
+  }
+#endif
+
   av.assoc_id = SCTP_ALL_ASSOC;
   av.assoc_value = SCTP_ENABLE_RESET_STREAM_REQ | SCTP_ENABLE_CHANGE_ASSOC_REQ;
   if (usrsctp_setsockopt(mMasterSocket, IPPROTO_SCTP, SCTP_ENABLE_STREAM_RESET, &av,
                          (socklen_t)sizeof(struct sctp_assoc_value)) < 0) {
     LOG(("*** failed enable stream reset errno %d", errno));
     goto error_cleanup;
   }
 
@@ -465,32 +499,82 @@ DataChannelConnection::Init(unsigned sho
   initmsg.sinit_max_instreams = MAX_NUM_STREAMS;
   if (usrsctp_setsockopt(mMasterSocket, IPPROTO_SCTP, SCTP_INITMSG, &initmsg,
                          (socklen_t)sizeof(initmsg)) < 0) {
     LOG(("*** failed setsockopt SCTP_INITMSG, errno %d", errno));
     goto error_cleanup;
   }
 
   mSocket = nullptr;
-  if (aUsingDtls) {
-    mUsingDtls = true;
-    usrsctp_register_address(static_cast<void *>(this));
-    LOG(("Registered %p within the SCTP stack.", static_cast<void *>(this)));
-  } else {
-    mUsingDtls = false;
-  }
+  usrsctp_register_address(static_cast<void *>(this));
+  LOG(("Registered %p within the SCTP stack.", static_cast<void *>(this)));
   return true;
 
 error_cleanup:
   usrsctp_close(mMasterSocket);
   mMasterSocket = nullptr;
-  mUsingDtls = false;
   return false;
 }
 
+void
+DataChannelConnection::SetMaxMessageSize(bool aMaxMessageSizeSet, uint64_t aMaxMessageSize)
+{
+  MutexAutoLock lock(mLock); // TODO: Needed?
+
+  mMaxMessageSizeSet = aMaxMessageSizeSet;
+  mMaxMessageSize = aMaxMessageSize;
+
+  bool ppidFragmentationEnforced = false;
+  nsresult rv;
+  nsCOMPtr<nsIPrefService> prefs = do_GetService("@mozilla.org/preferences-service;1", &rv);
+  if (!NS_WARN_IF(NS_FAILED(rv))) {
+    nsCOMPtr<nsIPrefBranch> branch = do_QueryInterface(prefs);
+
+    if (branch) {
+      if (!NS_FAILED(branch->GetBoolPref(
+          "media.peerconnection.sctp.force_ppid_fragmentation", &mPpidFragmentation))) {
+        // Ensure that forced on/off PPID fragmentation does not get overridden when Firefox has
+        // been detected.
+        mMaxMessageSizeSet = true;
+        ppidFragmentationEnforced = true;
+      }
+
+      int32_t temp;
+      if (!NS_FAILED(branch->GetIntPref(
+          "media.peerconnection.sctp.force_maximum_message_size", &temp))) {
+        if (temp >= 0) {
+          mMaxMessageSize = (uint64_t)temp;
+        }
+      }
+    }
+  }
+
+  // Fix remote MMS. This code exists, so future implementations of RTCSctpTransport.maxMessageSize
+  // can simply provide that value from GetMaxMessageSize.
+
+  // TODO: Bug 1382779, once resolved, can be increased to min(Uint8ArrayMaxSize, UINT32_MAX)
+  // TODO: Bug 1381146, once resolved, can be increased to whatever we support then (hopefully
+  //       SIZE_MAX)
+  if (mMaxMessageSize == 0 || mMaxMessageSize > WEBRTC_DATACHANNEL_MAX_MESSAGE_SIZE_REMOTE) {
+    mMaxMessageSize = WEBRTC_DATACHANNEL_MAX_MESSAGE_SIZE_REMOTE;
+  }
+
+  LOG(("Use PPID-based fragmentation/reassembly: %s (enforced=%s)",
+       mPpidFragmentation ? "yes" : "no", ppidFragmentationEnforced ? "yes" : "no"));
+  LOG(("Maximum message size (outgoing data): %" PRIu64 " (set=%s, enforced=%s)",
+       mMaxMessageSize, mMaxMessageSizeSet ? "yes" : "no",
+       aMaxMessageSize != mMaxMessageSize ? "yes" : "no"));
+}
+
+uint64_t
+DataChannelConnection::GetMaxMessageSize()
+{
+  return mMaxMessageSize;
+}
+
 #ifdef MOZ_PEERCONNECTION
 void
 DataChannelConnection::SetEvenOdd()
 {
   ASSERT_WEBRTC(IsSTSThread());
 
   TransportLayerDtls *dtls = static_cast<TransportLayerDtls *>(
       mTransportFlow->GetLayer(TransportLayerDtls::ID()));
@@ -499,17 +583,19 @@ DataChannelConnection::SetEvenOdd()
 }
 
 bool
 DataChannelConnection::ConnectViaTransportFlow(TransportFlow *aFlow, uint16_t localport, uint16_t remoteport)
 {
   LOG(("Connect DTLS local %u, remote %u", localport, remoteport));
 
   NS_PRECONDITION(mMasterSocket, "SCTP wasn't initialized before ConnectViaTransportFlow!");
-  NS_ENSURE_TRUE(aFlow, false);
+  if (NS_WARN_IF(!aFlow)) {
+    return false;
+  }
 
   mTransportFlow = aFlow;
   mLocalPort = localport;
   mRemotePort = remoteport;
   mState = CONNECTING;
 
   RUN_ON_THREAD(mSTS, WrapRunnable(RefPtr<DataChannelConnection>(this),
                                    &DataChannelConnection::SetSignals),
@@ -874,16 +960,39 @@ DataChannelConnection::FindFreeStream()
     }
   }
   if (i >= limit) {
     return INVALID_STREAM;
   }
   return i;
 }
 
+uint32_t
+DataChannelConnection::UpdateCurrentStreamIndex()
+{
+  if (mCurrentStream == mStreams.Length() - 1) {
+      mCurrentStream = 0;
+  } else {
+    ++mCurrentStream;
+  }
+
+  return mCurrentStream;
+}
+
+uint32_t
+DataChannelConnection::GetCurrentStreamIndex()
+{
+  // Fix current stream index (in case #streams decreased)
+  if (mCurrentStream >= mStreams.Length()) {
+    mCurrentStream = 0;
+  }
+
+  return mCurrentStream;
+}
+
 bool
 DataChannelConnection::RequestMoreStreams(int32_t aNeeded)
 {
   struct sctp_status status;
   struct sctp_add_streams sas;
   uint32_t outStreamsNeeded;
   socklen_t len;
 
@@ -918,46 +1027,62 @@ DataChannelConnection::RequestMoreStream
     return false;
   }
   LOG(("Requested %u more streams", outStreamsNeeded));
   // We add to mStreams when we get a SCTP_STREAM_CHANGE_EVENT and the
   // values are larger than mStreams.Length()
   return true;
 }
 
-int32_t
-DataChannelConnection::SendControlMessage(void *msg, uint32_t len, uint16_t stream)
+// Returns a POSIX error code.
+int
+DataChannelConnection::SendControlMessage(const uint8_t *data, uint32_t len, uint16_t stream)
 {
-  struct sctp_sndinfo sndinfo;
-
+  struct sctp_sendv_spa info = {0};
+
+  // General flags
+  info.sendv_flags = SCTP_SEND_SNDINFO_VALID;
+
+  // Set stream identifier, protocol identifier and flags
+  info.sendv_sndinfo.snd_sid = stream;
+  info.sendv_sndinfo.snd_flags = SCTP_EOR;
+  info.sendv_sndinfo.snd_ppid = htonl(DATA_CHANNEL_PPID_CONTROL);
+
+  // Create message instance and send
   // Note: Main-thread IO, but doesn't block
-  memset(&sndinfo, 0, sizeof(struct sctp_sndinfo));
-  sndinfo.snd_sid = stream;
-  sndinfo.snd_ppid = htonl(DATA_CHANNEL_PPID_CONTROL);
-  if (usrsctp_sendv(mSocket, msg, len, nullptr, 0,
-                    &sndinfo, (socklen_t)sizeof(struct sctp_sndinfo),
-                    SCTP_SENDV_SNDINFO, 0) < 0) {
-    //LOG(("***failed: sctp_sendv")); don't log because errno is a return!
-    return (0);
+#if (UINT32_MAX > SIZE_MAX)
+  if (len > SIZE_MAX) {
+    return EMSGSIZE;
   }
-  return (1);
+#endif
+  OutgoingMsg msg(info, data, (size_t)len);
+  bool buffered;
+  int error = SendMsgInternalOrBuffer(mBufferedControl, msg, buffered);
+
+  // Set pending type (if buffered)
+  if (!error && buffered && !mPendingType) {
+    mPendingType = PENDING_DCEP;
+  }
+  return error;
 }
 
-int32_t
+// Returns a POSIX error code.
+int
 DataChannelConnection::SendOpenAckMessage(uint16_t stream)
 {
   struct rtcweb_datachannel_ack ack;
 
   memset(&ack, 0, sizeof(struct rtcweb_datachannel_ack));
   ack.msg_type = DATA_CHANNEL_ACK;
 
-  return SendControlMessage(&ack, sizeof(ack), stream);
+  return SendControlMessage((const uint8_t *)&ack, sizeof(ack), stream);
 }
 
-int32_t
+// Returns a POSIX error code.
+int
 DataChannelConnection::SendOpenRequestMessage(const nsACString& label,
                                               const nsACString& protocol,
                                               uint16_t stream, bool unordered,
                                               uint16_t prPolicy, uint32_t prValue)
 {
   const int label_len = label.Length(); // not including nul
   const int proto_len = protocol.Length(); // not including nul
   // careful - request struct include one char for the label
@@ -974,203 +1099,186 @@ DataChannelConnection::SendOpenRequestMe
     break;
   case SCTP_PR_SCTP_TTL:
     req->channel_type = DATA_CHANNEL_PARTIAL_RELIABLE_TIMED;
     break;
   case SCTP_PR_SCTP_RTX:
     req->channel_type = DATA_CHANNEL_PARTIAL_RELIABLE_REXMIT;
     break;
   default:
-    // FIX! need to set errno!  Or make all these SendXxxx() funcs return 0 or errno!
     free(req);
-    return (0);
+    return EINVAL;
   }
   if (unordered) {
     // Per the current types, all differ by 0x80 between ordered and unordered
     req->channel_type |= 0x80; // NOTE: be careful if new types are added in the future
   }
 
   req->reliability_param = htonl(prValue);
   req->priority = htons(0); /* XXX: add support */
   req->label_length = htons(label_len);
   req->protocol_length = htons(proto_len);
   memcpy(&req->label[0], PromiseFlatCString(label).get(), label_len);
   memcpy(&req->label[label_len], PromiseFlatCString(protocol).get(), proto_len);
 
-  int32_t result = SendControlMessage(req, req_size, stream);
+  // TODO: req_size is an int... that looks hairy
+  int error = SendControlMessage((const uint8_t *)req, req_size, stream);
 
   free(req);
-  return result;
+  return error;
 }
 
 // XXX This should use a separate thread (outbound queue) which should
 // select() to know when to *try* to send data to the socket again.
 // Alternatively, it can use a timeout, but that's guaranteed to be wrong
 // (just not sure in what direction).  We could re-implement NSPR's
 // PR_POLL_WRITE/etc handling... with a lot of work.
 
 // Better yet, use the SCTP stack's notifications on buffer state to avoid
 // filling the SCTP's buffers.
 
-// returns if we're still blocked or not
+// returns if we're still blocked (true)
 bool
 DataChannelConnection::SendDeferredMessages()
 {
-  uint32_t i;
   RefPtr<DataChannel> channel; // we may null out the refs to this
-  bool still_blocked = false;
 
   // This may block while something is modifying channels, but should not block for IO
   MutexAutoLock lock(mLock);
 
-  // XXX For total fairness, on a still_blocked we'd start next time at the
-  // same index.  Sorry, not going to bother for now.
-  for (i = 0; i < mStreams.Length(); ++i) {
+  LOG(("SendDeferredMessages called, pending type: %d", mPendingType));
+  if (!mPendingType) {
+    return false;
+  }
+
+  // Send pending control messages
+  // Note: If ndata is not active, check if DCEP messages are currently outstanding. These need to
+  //       be sent first before other streams can be used for sending.
+  if (!mBufferedControl.IsEmpty() && (mSendInterleaved || mPendingType == PENDING_DCEP)) {
+    if (SendBufferedMessages(mBufferedControl)) {
+      return true;
+    }
+
+    // Note: There may or may not be pending data messages
+    mPendingType = PENDING_DATA;
+  }
+
+  bool blocked = false;
+  uint32_t i = GetCurrentStreamIndex();
+  uint32_t end = i;
+  do {
     channel = mStreams[i];
-    if (!channel)
+    if (!channel || channel->mBufferedData.IsEmpty()) {
+      i = UpdateCurrentStreamIndex();
       continue;
-
-    // Only one of these should be set....
-    if (channel->mFlags & DATA_CHANNEL_FLAGS_SEND_REQ) {
-      if (SendOpenRequestMessage(channel->mLabel, channel->mProtocol,
-                                 channel->mStream,
-                                 channel->mFlags & DATA_CHANNEL_FLAGS_OUT_OF_ORDER_ALLOWED,
-                                 channel->mPrPolicy, channel->mPrValue)) {
-        channel->mFlags &= ~DATA_CHANNEL_FLAGS_SEND_REQ;
-
-        channel->mState = OPEN;
-        channel->mReady = true;
-        LOG(("%s: sending ON_CHANNEL_OPEN for %p", __FUNCTION__, channel.get()));
-        Dispatch(do_AddRef(new DataChannelOnMessageAvailable(
-                   DataChannelOnMessageAvailable::ON_CHANNEL_OPEN, this,
-                   channel)));
-      } else {
-        if (errno == EAGAIN || errno == EWOULDBLOCK) {
-          still_blocked = true;
-        } else {
-          // Close the channel, inform the user
-          mStreams[channel->mStream] = nullptr;
-          channel->mState = CLOSED;
-          // Don't need to reset; we didn't open it
-          Dispatch(do_AddRef(new DataChannelOnMessageAvailable(
-                     DataChannelOnMessageAvailable::ON_CHANNEL_CLOSED, this,
-                     channel)));
-        }
-      }
+    }
+
+    // Clear if closing/closed
+    if (channel->mState == CLOSED || channel->mState == CLOSING) {
+      channel->mBufferedData.Clear();
+      i = UpdateCurrentStreamIndex();
+      continue;
     }
-    if (still_blocked)
-      break;
-
-    if (channel->mFlags & DATA_CHANNEL_FLAGS_SEND_ACK) {
-      if (SendOpenAckMessage(channel->mStream)) {
-        channel->mFlags &= ~DATA_CHANNEL_FLAGS_SEND_ACK;
-      } else {
-        if (errno == EAGAIN || errno == EWOULDBLOCK) {
-          still_blocked = true;
-        } else {
-          // Close the channel, inform the user
-          CloseInt(channel);
-          // XXX send error via DataChannelOnMessageAvailable (bug 843625)
-        }
-      }
+
+    size_t bufferedAmount = channel->GetBufferedAmountLocked();
+    size_t threshold = channel->mBufferedThreshold;
+    bool wasOverThreshold = bufferedAmount >= threshold;
+
+    // Send buffered data messages
+    // Warning: This will fail in case ndata is inactive and a previously deallocated data channel
+    //          has not been closed properly. If you ever see that no messages can be sent on any
+    //          channel, this is likely the cause (an explicit EOR message partially sent whose
+    //          remaining chunks are still being waited for).
+    blocked = SendBufferedMessages(channel->mBufferedData);
+    bufferedAmount = channel->GetBufferedAmountLocked();
+
+    // can never fire with default threshold of 0
+    if (wasOverThreshold && bufferedAmount < threshold) {
+      LOG(("%s: sending BUFFER_LOW_THRESHOLD for %s/%s: %u", __FUNCTION__,
+           channel->mLabel.get(), channel->mProtocol.get(), channel->mStream));
+      Dispatch(do_AddRef(new DataChannelOnMessageAvailable(
+                 DataChannelOnMessageAvailable::BUFFER_LOW_THRESHOLD,
+                 this, channel)));
     }
-    if (still_blocked)
-      break;
-
-    if (channel->mFlags & DATA_CHANNEL_FLAGS_SEND_DATA) {
-      bool failed_send = false;
-      int32_t result;
-
-      if (channel->mState == CLOSED || channel->mState == CLOSING) {
-        channel->mBufferedData.Clear();
-      }
-
-      uint32_t buffered_amount = channel->GetBufferedAmountLocked();
-      uint32_t threshold = channel->GetBufferedAmountLowThreshold();
-      bool was_over_threshold = buffered_amount >= threshold;
-
-      while (!channel->mBufferedData.IsEmpty() &&
-             !failed_send) {
-        struct sctp_sendv_spa *spa = channel->mBufferedData[0]->mSpa;
-        const char *data           = channel->mBufferedData[0]->mData;
-        size_t len                 = channel->mBufferedData[0]->mLength;
-
-        // SCTP will return EMSGSIZE if the message is bigger than the buffer
-        // size (or EAGAIN if there isn't space)
-        if ((result = usrsctp_sendv(mSocket, data, len,
-                                    nullptr, 0,
-                                    (void *)spa, (socklen_t)sizeof(struct sctp_sendv_spa),
-                                    SCTP_SENDV_SPA,
-                                    0)) < 0) {
-          if (errno == EAGAIN || errno == EWOULDBLOCK) {
-            // leave queued for resend
-            failed_send = true;
-            LOG(("queue full again when resending %zu bytes (%d)", len, result));
-          } else {
-            LOG(("error %d re-sending string", errno));
-            failed_send = true;
-          }
-        } else {
-          LOG(("Resent buffer of %zu bytes (%d)", len, result));
-          // In theory this could underflow if >4GB was buffered and re
-          // truncated in GetBufferedAmount(), but this won't cause any problems.
-          buffered_amount -= channel->mBufferedData[0]->mLength;
-          channel->mBufferedData.RemoveElementAt(0);
-          // can never fire with default threshold of 0
-          if (was_over_threshold && buffered_amount < threshold) {
-            LOG(("%s: sending BUFFER_LOW_THRESHOLD for %s/%s: %u", __FUNCTION__,
-                 channel->mLabel.get(), channel->mProtocol.get(), channel->mStream));
-            Dispatch(do_AddRef(new DataChannelOnMessageAvailable(
-                       DataChannelOnMessageAvailable::BUFFER_LOW_THRESHOLD,
-                       this, channel)));
-            was_over_threshold = false;
-          }
-          if (buffered_amount == 0) {
-            // buffered-to-not-buffered transition; tell the DOM code in case this makes it
-            // available for GC
-            LOG(("%s: sending NO_LONGER_BUFFERED for %s/%s: %u", __FUNCTION__,
-                 channel->mLabel.get(), channel->mProtocol.get(), channel->mStream));
-            Dispatch(do_AddRef(new DataChannelOnMessageAvailable(
-                       DataChannelOnMessageAvailable::NO_LONGER_BUFFERED,
-                       this, channel)));
-          }
-        }
-      }
-      if (channel->mBufferedData.IsEmpty())
-        channel->mFlags &= ~DATA_CHANNEL_FLAGS_SEND_DATA;
-      else
-        still_blocked = true;
+
+    if (bufferedAmount == 0) {
+      // buffered-to-not-buffered transition; tell the DOM code in case this makes it
+      // available for GC
+      LOG(("%s: sending NO_LONGER_BUFFERED for %s/%s: %u", __FUNCTION__,
+           channel->mLabel.get(), channel->mProtocol.get(), channel->mStream));
+      Dispatch(do_AddRef(new DataChannelOnMessageAvailable(
+                 DataChannelOnMessageAvailable::NO_LONGER_BUFFERED,
+                 this, channel)));
+    }
+
+    // Update current stream index
+    // Note: If ndata is not active, the outstanding data messages on this stream need to be sent
+    //       first before other streams can be used for sending.
+    if (mSendInterleaved || !blocked) {
+      i = UpdateCurrentStreamIndex();
     }
-    if (still_blocked)
-      break;
+  } while (!blocked && i != end);
+
+  if (!blocked) {
+    mPendingType = mBufferedControl.IsEmpty() ? PENDING_NONE : PENDING_DCEP;
   }
-
-  return still_blocked;
+  return blocked;
 }
 
+
+// Called with mLock locked!
+// buffer MUST have at least one item!
+// returns if we're still blocked (true)
+bool
+DataChannelConnection::SendBufferedMessages(nsTArray<nsAutoPtr<BufferedOutgoingMsg>> &buffer)
+{
+  do {
+    // Re-send message
+    int error = SendMsgInternal(*buffer[0]);
+    switch (error) {
+      case 0:
+        buffer.RemoveElementAt(0);
+        break;
+      case EAGAIN:
+#if (EAGAIN != EWOULDBLOCK)
+      case EWOULDBLOCK:
+#endif
+        return true;
+      default:
+        buffer.RemoveElementAt(0);
+        LOG(("error on sending: %d", error));
+        break;
+    }
+  } while (!buffer.IsEmpty());
+
+  return false;
+}
+
+// Caller must ensure that length <= SIZE_MAX
 void
 DataChannelConnection::HandleOpenRequestMessage(const struct rtcweb_datachannel_open_request *req,
-                                                size_t length,
-                                                uint16_t stream)
+                                                uint32_t length, uint16_t stream)
 {
   RefPtr<DataChannel> channel;
   uint32_t prValue;
   uint16_t prPolicy;
   uint32_t flags;
 
   mLock.AssertCurrentThreadOwns();
 
-  if (length != (sizeof(*req) - 1) + ntohs(req->label_length) + ntohs(req->protocol_length)) {
-    LOG(("%s: Inconsistent length: %zu, should be %zu", __FUNCTION__, length,
-         (sizeof(*req) - 1) + ntohs(req->label_length) + ntohs(req->protocol_length)));
-    if (length < (sizeof(*req) - 1) + ntohs(req->label_length) + ntohs(req->protocol_length))
+  const size_t requiredLength =
+    (sizeof(*req) - 1) + ntohs(req->label_length) + ntohs(req->protocol_length);
+  if (((size_t)length) != requiredLength) {
+    LOG(("%s: Inconsistent length: %u, should be %zu",
+         __FUNCTION__, length, requiredLength));
+    if (((size_t)length) < requiredLength)
       return;
   }
 
-  LOG(("%s: length %zu, sizeof(*req) = %zu", __FUNCTION__, length, sizeof(*req)));
+  LOG(("%s: length %u, sizeof(*req) = %zu", __FUNCTION__, length, sizeof(*req)));
 
   switch (req->channel_type) {
     case DATA_CHANNEL_RELIABLE:
     case DATA_CHANNEL_RELIABLE_UNORDERED:
       prPolicy = SCTP_PR_SCTP_NONE;
       break;
     case DATA_CHANNEL_PARTIAL_RELIABLE_REXMIT:
     case DATA_CHANNEL_PARTIAL_RELIABLE_REXMIT_UNORDERED:
@@ -1232,21 +1340,23 @@ DataChannelConnection::HandleOpenRequest
   LOG(("%s: sending ON_CHANNEL_CREATED for %s/%s: %u (state %u)", __FUNCTION__,
        channel->mLabel.get(), channel->mProtocol.get(), stream, channel->mState));
   Dispatch(do_AddRef(new DataChannelOnMessageAvailable(
              DataChannelOnMessageAvailable::ON_CHANNEL_CREATED,
              this, channel)));
 
   LOG(("%s: deferring sending ON_CHANNEL_OPEN for %p", __FUNCTION__, channel.get()));
 
-  if (!SendOpenAckMessage(stream)) {
-    // XXX Only on EAGAIN!?  And if not, then close the channel??
-    channel->mFlags |= DATA_CHANNEL_FLAGS_SEND_ACK;
-    // Note: we're locked, so there's no danger of a race with the
-    // buffer-threshold callback
+  int error = SendOpenAckMessage(stream);
+  if (error) {
+    LOG(("SendOpenRequest failed, error = %d", error));
+    // Close the channel, inform the user
+    CloseInt(channel);
+    // XXX send error via DataChannelOnMessageAvailable (bug 843625)
+    return;
   }
 
   // Now process any queued data messages for the channel (which will
   // themselves likely get queued until we leave WAITING_TO_OPEN, plus any
   // more that come in before that happens)
   DeliverQueuedData(stream);
 }
 
@@ -1257,214 +1367,343 @@ DataChannelConnection::DeliverQueuedData
 {
   mLock.AssertCurrentThreadOwns();
 
   uint32_t i = 0;
   while (i < mQueuedData.Length()) {
     // Careful! we may modify the array length from within the loop!
     if (mQueuedData[i]->mStream == stream) {
       LOG(("Delivering queued data for stream %u, length %u",
-           stream, (unsigned int) mQueuedData[i]->mLength));
+           stream, mQueuedData[i]->mLength));
       // Deliver the queued data
-      HandleDataMessage(mQueuedData[i]->mPpid,
-                        mQueuedData[i]->mData, mQueuedData[i]->mLength,
-                        mQueuedData[i]->mStream);
+      HandleDataMessage(mQueuedData[i]->mData, mQueuedData[i]->mLength,
+                        mQueuedData[i]->mPpid, mQueuedData[i]->mStream,
+                        mQueuedData[i]->mFlags);
       mQueuedData.RemoveElementAt(i);
       continue; // don't bump index since we removed the element
     }
     i++;
   }
 }
 
+// Caller must ensure that length <= SIZE_MAX
 void
 DataChannelConnection::HandleOpenAckMessage(const struct rtcweb_datachannel_ack *ack,
-                                            size_t length, uint16_t stream)
+                                            uint32_t length, uint16_t stream)
 {
   DataChannel *channel;
 
   mLock.AssertCurrentThreadOwns();
 
   channel = FindChannelByStream(stream);
-  NS_ENSURE_TRUE_VOID(channel);
+  if (NS_WARN_IF(!channel)) {
+    return;
+  }
 
   LOG(("OpenAck received for stream %u, waiting=%d", stream,
        (channel->mFlags & DATA_CHANNEL_FLAGS_WAITING_ACK) ? 1 : 0));
 
   channel->mFlags &= ~DATA_CHANNEL_FLAGS_WAITING_ACK;
 }
 
+// Caller must ensure that length <= SIZE_MAX
 void
-DataChannelConnection::HandleUnknownMessage(uint32_t ppid, size_t length, uint16_t stream)
+DataChannelConnection::HandleUnknownMessage(uint32_t ppid, uint32_t length, uint16_t stream)
 {
   /* XXX: Send an error message? */
-  LOG(("unknown DataChannel message received: %u, len %zu on stream %d", ppid, length, stream));
+  LOG(("unknown DataChannel message received: %u, len %u on stream %d", ppid, length, stream));
   // XXX Log to JS error console if possible
 }
 
+uint8_t
+DataChannelConnection::BufferMessage(nsACString& recvBuffer, const void *data,
+                                     uint32_t length, uint32_t ppid, int flags)
+{
+  const char *buffer = (const char *) data;
+  uint8_t bufferFlags = 0;
+
+  if ((flags & MSG_EOR) &&
+      ppid != DATA_CHANNEL_PPID_BINARY_PARTIAL &&
+      ppid != DATA_CHANNEL_PPID_DOMSTRING_PARTIAL) {
+    bufferFlags |= DATA_CHANNEL_BUFFER_MESSAGE_FLAGS_COMPLETE;
+
+    // Return directly if nothing has been buffered
+    if (recvBuffer.IsEmpty()) {
+      return bufferFlags;
+    }
+  }
+
+  // Ensure it doesn't blow up our buffer
+  // TODO: Change 'WEBRTC_DATACHANNEL_MAX_MESSAGE_SIZE_LOCAL' to whatever the new buffer is capable
+  //       of holding.
+  if (((uint64_t) recvBuffer.Length()) + ((uint64_t) length) > WEBRTC_DATACHANNEL_MAX_MESSAGE_SIZE_LOCAL) {
+    bufferFlags |= DATA_CHANNEL_BUFFER_MESSAGE_FLAGS_TOO_LARGE;
+    return bufferFlags;
+  }
+
+  // Copy & add to receive buffer
+  recvBuffer.Append(buffer, length);
+  bufferFlags |= DATA_CHANNEL_BUFFER_MESSAGE_FLAGS_BUFFERED;
+  return bufferFlags;
+}
+
 void
-DataChannelConnection::HandleDataMessage(uint32_t ppid,
-                                         const void *data, size_t length,
-                                         uint16_t stream)
+DataChannelConnection::HandleDataMessage(const void *data, size_t length, uint32_t ppid,
+                                         uint16_t stream, int flags)
 {
   DataChannel *channel;
   const char *buffer = (const char *) data;
 
   mLock.AssertCurrentThreadOwns();
-
   channel = FindChannelByStream(stream);
 
+  // Note: Until we support SIZE_MAX sized messages, we need this check
+#if (SIZE_MAX > UINT32_MAX)
+  if (length > UINT32_MAX) {
+    LOG(("DataChannel: Cannot handle message of size %zu (max=%" PRIu32 ")",
+         length, UINT32_MAX));
+    CloseInt(channel);
+    return;
+  }
+#endif
+  uint32_t data_length = (uint32_t)length;
+
   // XXX A closed channel may trip this... check
   // NOTE: the updated spec from the IETF says we should set in-order until we receive an ACK.
   // That would make this code moot.  Keep it for now for backwards compatibility.
   if (!channel) {
     // In the updated 0-RTT open case, the sender can send data immediately
     // after Open, and doesn't set the in-order bit (since we don't have a
     // response or ack).  Also, with external negotiation, data can come in
     // before we're told about the external negotiation.  We need to buffer
     // data until either a) Open comes in, if the ordering get messed up,
     // or b) the app tells us this channel was externally negotiated.  When
     // these occur, we deliver the data.
 
     // Since this is rare and non-performance, keep a single list of queued
     // data messages to deliver once the channel opens.
-    LOG(("Queuing data for stream %u, length %zu", stream, length));
+    LOG(("Queuing data for stream %u, length %u", stream, data_length));
     // Copies data
-    mQueuedData.AppendElement(new QueuedDataMessage(stream, ppid, data, length));
+    mQueuedData.AppendElement(new QueuedDataMessage(stream, ppid, flags, data, data_length));
+    return;
+  }
+
+  // Ignore incoming data in case the channel is closed
+  if (channel->mState == CLOSED) {
+    return;
+  }
+
+  bool is_binary = true;
+  uint8_t bufferFlags;
+  int32_t type;
+  const char* info = "";
+
+  if (ppid == DATA_CHANNEL_PPID_DOMSTRING_PARTIAL ||
+      ppid == DATA_CHANNEL_PPID_DOMSTRING) {
+    is_binary = false;
+  }
+  if (is_binary != channel->mIsRecvBinary && !channel->mRecvBuffer.IsEmpty()) {
+    NS_WARNING("DataChannel message aborted by fragment type change!");
+    // TODO: Maybe closing would be better as this is a hard to detect protocol violation?
+    channel->mRecvBuffer.Truncate(0);
+  }
+  channel->mIsRecvBinary = is_binary;
+
+  // Remaining chunks of previously truncated message (due to the buffer being full)?
+  if (channel->mFlags & DATA_CHANNEL_FLAGS_CLOSING_TOO_LARGE) {
+    LOG(("DataChannel: Ignoring partial message of length %u, buffer full and closing",
+         data_length));
+    // Only unblock if unordered
+    if ((channel->mFlags & DATA_CHANNEL_FLAGS_OUT_OF_ORDER_ALLOWED) && (flags & MSG_EOR)) {
+      channel->mFlags &= ~DATA_CHANNEL_FLAGS_CLOSING_TOO_LARGE;
+    }
+  }
+
+  // Buffer message until complete
+  bufferFlags = BufferMessage(channel->mRecvBuffer, buffer, data_length, ppid, flags);
+  if (bufferFlags & DATA_CHANNEL_BUFFER_MESSAGE_FLAGS_TOO_LARGE) {
+    LOG(("DataChannel: Buffered message would become too large to handle, closing channel"));
+    channel->mRecvBuffer.Truncate(0);
+    channel->mFlags |= DATA_CHANNEL_FLAGS_CLOSING_TOO_LARGE;
+    CloseInt(channel);
     return;
   }
-
-  // XXX should this be a simple if, no warnings/debugbreaks?
-  NS_ENSURE_TRUE_VOID(channel->mState != CLOSED);
-
-  {
-    nsAutoCString recvData(buffer, length); // copies (<64) or allocates
-    bool is_binary = true;
-
-    if (ppid == DATA_CHANNEL_PPID_DOMSTRING ||
-        ppid == DATA_CHANNEL_PPID_DOMSTRING_LAST) {
-      is_binary = false;
-    }
-    if (is_binary != channel->mIsRecvBinary && !channel->mRecvBuffer.IsEmpty()) {
-      NS_WARNING("DataChannel message aborted by fragment type change!");
-      channel->mRecvBuffer.Truncate(0);
-    }
-    channel->mIsRecvBinary = is_binary;
-
-    switch (ppid) {
-      case DATA_CHANNEL_PPID_DOMSTRING:
-      case DATA_CHANNEL_PPID_BINARY:
-        channel->mRecvBuffer += recvData;
-        LOG(("DataChannel: Partial %s message of length %zu (total %u) on channel id %u",
-             is_binary ? "binary" : "string", length, channel->mRecvBuffer.Length(),
-             channel->mStream));
-        return; // Not ready to notify application
-
-      case DATA_CHANNEL_PPID_DOMSTRING_LAST:
-        LOG(("DataChannel: String message received of length %zu on channel %u",
-             length, channel->mStream));
-        if (!channel->mRecvBuffer.IsEmpty()) {
-          channel->mRecvBuffer += recvData;
-          LOG(("%s: sending ON_DATA (string fragmented) for %p", __FUNCTION__, channel));
-          channel->SendOrQueue(new DataChannelOnMessageAvailable(
-                                 DataChannelOnMessageAvailable::ON_DATA, this,
-                                 channel, channel->mRecvBuffer, -1));
-          channel->mRecvBuffer.Truncate(0);
-          return;
-        }
-        // else send using recvData normally
-        length = -1; // Flag for DOMString
-
-        // WebSockets checks IsUTF8() here; we can try to deliver it
-        break;
-
-      case DATA_CHANNEL_PPID_BINARY_LAST:
-        LOG(("DataChannel: Received binary message of length %zu on channel id %u",
-             length, channel->mStream));
-        if (!channel->mRecvBuffer.IsEmpty()) {
-          channel->mRecvBuffer += recvData;
-          LOG(("%s: sending ON_DATA (binary fragmented) for %p", __FUNCTION__, channel));
-          channel->SendOrQueue(new DataChannelOnMessageAvailable(
-                                 DataChannelOnMessageAvailable::ON_DATA, this,
-                                 channel, channel->mRecvBuffer,
-                                 channel->mRecvBuffer.Length()));
-          channel->mRecvBuffer.Truncate(0);
-          return;
-        }
-        // else send using recvData normally
-        break;
-
-      default:
-        NS_ERROR("Unknown data PPID");
+  if (!(bufferFlags & DATA_CHANNEL_BUFFER_MESSAGE_FLAGS_COMPLETE)) {
+    LOG(("DataChannel: Partial %s message of length %u (total %u) on channel id %u",
+         is_binary ? "binary" : "string", data_length, channel->mRecvBuffer.Length(),
+         channel->mStream));
+    return; // Not ready to notify application
+  }
+  if (bufferFlags & DATA_CHANNEL_BUFFER_MESSAGE_FLAGS_BUFFERED) {
+    data_length = channel->mRecvBuffer.Length();
+  }
+
+  // Complain about large messages (only complain - we can handle it)
+  if (data_length > WEBRTC_DATACHANNEL_MAX_MESSAGE_SIZE_LOCAL) {
+    LOG(("DataChannel: Received message of length %u is > announced maximum message size (%u)",
+         data_length, WEBRTC_DATACHANNEL_MAX_MESSAGE_SIZE_LOCAL));
+  }
+
+  switch (ppid) {
+    case DATA_CHANNEL_PPID_DOMSTRING:
+      LOG(("DataChannel: Received string message of length %u on channel %u",
+           data_length, channel->mStream));
+      type = DataChannelOnMessageAvailable::ON_DATA_STRING;
+      if (bufferFlags & DATA_CHANNEL_BUFFER_MESSAGE_FLAGS_BUFFERED) {
+        info = " (string fragmented)";
+      }
+      // else send using recvData normally
+
+      // WebSockets checks IsUTF8() here; we can try to deliver it
+      break;
+
+    case DATA_CHANNEL_PPID_BINARY:
+      LOG(("DataChannel: Received binary message of length %u on channel id %u",
+           data_length, channel->mStream));
+      type = DataChannelOnMessageAvailable::ON_DATA_BINARY;
+      if (bufferFlags & DATA_CHANNEL_BUFFER_MESSAGE_FLAGS_BUFFERED) {
+        info = " (binary fragmented)";
+      }
+
+      // else send using recvData normally
+      break;
+
+    default:
+      NS_ERROR("Unknown data PPID");
+      return;
+  }
+
+  // Notify onmessage
+  LOG(("%s: sending ON_DATA_%s%s for %p", __FUNCTION__,
+       (type == DataChannelOnMessageAvailable::ON_DATA_STRING) ? "STRING" : "BINARY",
+       info, channel));
+  if (bufferFlags & DATA_CHANNEL_BUFFER_MESSAGE_FLAGS_BUFFERED) {
+    channel->SendOrQueue(new DataChannelOnMessageAvailable(
+                           type, this, channel, channel->mRecvBuffer));
+    channel->mRecvBuffer.Truncate(0);
+  } else {
+    nsAutoCString recvData(buffer, data_length); // copies (<64) or allocates
+    channel->SendOrQueue(new DataChannelOnMessageAvailable(
+                           type, this, channel, recvData));
+  }
+}
+
+void
+DataChannelConnection::HandleDCEPMessage(const void *buffer, size_t length, uint32_t ppid,
+                                         uint16_t stream, int flags)
+{
+  const struct rtcweb_datachannel_open_request *req;
+  const struct rtcweb_datachannel_ack *ack;
+
+  // Note: Until we support SIZE_MAX sized messages, we need this check
+#if (SIZE_MAX > UINT32_MAX)
+  if (length > UINT32_MAX) {
+    LOG(("DataChannel: Cannot handle message of size %zu (max=%u)", length, UINT32_MAX));
+    Stop();
+    return;
+  }
+#endif
+  uint32_t data_length = (uint32_t)length;
+
+  mLock.AssertCurrentThreadOwns();
+
+  // Buffer message until complete
+  const uint8_t bufferFlags = BufferMessage(mRecvBuffer, buffer, data_length, ppid, flags);
+  if (bufferFlags & DATA_CHANNEL_BUFFER_MESSAGE_FLAGS_TOO_LARGE) {
+    LOG(("DataChannel: Buffered message would become too large to handle, closing connection"));
+    mRecvBuffer.Truncate(0);
+    Stop();
+    return;
+  }
+  if (!(bufferFlags & DATA_CHANNEL_BUFFER_MESSAGE_FLAGS_COMPLETE)) {
+    LOG(("Buffered partial DCEP message of length %u", data_length));
+    return;
+  }
+  if (bufferFlags & DATA_CHANNEL_BUFFER_MESSAGE_FLAGS_BUFFERED) {
+    buffer = reinterpret_cast<const void *>(mRecvBuffer.BeginReading());
+    data_length = mRecvBuffer.Length();
+  }
+
+  req = static_cast<const struct rtcweb_datachannel_open_request *>(buffer);
+  LOG(("Handling DCEP message of length %u", data_length));
+
+  // Ensure minimum message size (ack is the smallest DCEP message)
+  if ((size_t)data_length < sizeof(*ack)) {
+    LOG(("Ignored invalid DCEP message (too short)"));
+    return;
+  }
+
+  switch (req->msg_type) {
+    case DATA_CHANNEL_OPEN_REQUEST:
+      // structure includes a possibly-unused char label[1] (in a packed structure)
+      if (NS_WARN_IF((size_t)data_length < sizeof(*req) - 1)) {
         return;
-    }
-    /* Notify onmessage */
-    LOG(("%s: sending ON_DATA for %p", __FUNCTION__, channel));
-    channel->SendOrQueue(new DataChannelOnMessageAvailable(
-                           DataChannelOnMessageAvailable::ON_DATA, this,
-                           channel, recvData, length));
+      }
+
+      HandleOpenRequestMessage(req, data_length, stream);
+      break;
+    case DATA_CHANNEL_ACK:
+      // >= sizeof(*ack) checked above
+
+      ack = static_cast<const struct rtcweb_datachannel_ack *>(buffer);
+      HandleOpenAckMessage(ack, data_length, stream);
+      break;
+    default:
+      HandleUnknownMessage(ppid, data_length, stream);
+      break;
   }
+
+  // Reset buffer
+  mRecvBuffer.Truncate(0);
 }
 
 // Called with mLock locked!
 void
-DataChannelConnection::HandleMessage(const void *buffer, size_t length, uint32_t ppid, uint16_t stream)
+DataChannelConnection::HandleMessage(const void *buffer, size_t length, uint32_t ppid,
+                                     uint16_t stream, int flags)
 {
-  const struct rtcweb_datachannel_open_request *req;
-  const struct rtcweb_datachannel_ack *ack;
-
   mLock.AssertCurrentThreadOwns();
 
   switch (ppid) {
     case DATA_CHANNEL_PPID_CONTROL:
-      req = static_cast<const struct rtcweb_datachannel_open_request *>(buffer);
-
-      NS_ENSURE_TRUE_VOID(length >= sizeof(*ack)); // smallest message
-      switch (req->msg_type) {
-        case DATA_CHANNEL_OPEN_REQUEST:
-          // structure includes a possibly-unused char label[1] (in a packed structure)
-          NS_ENSURE_TRUE_VOID(length >= sizeof(*req) - 1);
-
-          HandleOpenRequestMessage(req, length, stream);
-          break;
-        case DATA_CHANNEL_ACK:
-          // >= sizeof(*ack) checked above
-
-          ack = static_cast<const struct rtcweb_datachannel_ack *>(buffer);
-          HandleOpenAckMessage(ack, length, stream);
-          break;
-        default:
-          HandleUnknownMessage(ppid, length, stream);
-          break;
-      }
+      HandleDCEPMessage(buffer, length, ppid, stream, flags);
       break;
+    case DATA_CHANNEL_PPID_DOMSTRING_PARTIAL:
     case DATA_CHANNEL_PPID_DOMSTRING:
-    case DATA_CHANNEL_PPID_DOMSTRING_LAST:
+    case DATA_CHANNEL_PPID_BINARY_PARTIAL:
     case DATA_CHANNEL_PPID_BINARY:
-    case DATA_CHANNEL_PPID_BINARY_LAST:
-      HandleDataMessage(ppid, buffer, length, stream);
+      HandleDataMessage(buffer, length, ppid, stream, flags);
       break;
     default:
-      LOG(("Message of length %zu, PPID %u on stream %u received.",
-           length, ppid, stream));
+      LOG(("Message of length %zu PPID %u on stream %u received (%s).",
+           length, ppid, stream, (flags & MSG_EOR) ? "complete" : "partial"));
       break;
   }
 }
 
 void
 DataChannelConnection::HandleAssociationChangeEvent(const struct sctp_assoc_change *sac)
 {
   uint32_t i, n;
 
   switch (sac->sac_state) {
   case SCTP_COMM_UP:
     LOG(("Association change: SCTP_COMM_UP"));
     if (mState == CONNECTING) {
       mSocket = mMasterSocket;
       mState = OPEN;
 
+      // Check for older Firefox by looking at the amount of incoming streams
+      LOG(("Negotiated number of incoming streams: %" PRIu16, sac->sac_inbound_streams));
+      if (!mMaxMessageSizeSet
+          && sac->sac_inbound_streams == WEBRTC_DATACHANNEL_STREAMS_OLDER_FIREFOX) {
+        LOG(("Older Firefox detected, using PPID-based fragmentation"));
+        mPpidFragmentation = true;
+      }
+
       SetEvenOdd();
 
       Dispatch(do_AddRef(new DataChannelOnMessageAvailable(
                  DataChannelOnMessageAvailable::ON_CONNECTION,
                  this)));
       LOG(("DTLS connect() succeeded!  Entering connected mode"));
 
       // Open any streams pending...
@@ -1474,63 +1713,71 @@ DataChannelConnection::HandleAssociation
       LOG(("DataConnection Already OPEN"));
     } else {
       LOG(("Unexpected state: %d", mState));
     }
     break;
   case SCTP_COMM_LOST:
     LOG(("Association change: SCTP_COMM_LOST"));
     // This association is toast, so also close all the channels -- from mainthread!
-    Dispatch(do_AddRef(new DataChannelOnMessageAvailable(
-               DataChannelOnMessageAvailable::ON_DISCONNECTED,
-               this)));
+    Stop();
     break;
   case SCTP_RESTART:
     LOG(("Association change: SCTP_RESTART"));
     break;
   case SCTP_SHUTDOWN_COMP:
     LOG(("Association change: SCTP_SHUTDOWN_COMP"));
-    Dispatch(do_AddRef(new DataChannelOnMessageAvailable(
-               DataChannelOnMessageAvailable::ON_DISCONNECTED,
-               this)));
+    Stop();
     break;
   case SCTP_CANT_STR_ASSOC:
     LOG(("Association change: SCTP_CANT_STR_ASSOC"));
     break;
   default:
     LOG(("Association change: UNKNOWN"));
     break;
   }
   LOG(("Association change: streams (in/out) = (%u/%u)",
        sac->sac_inbound_streams, sac->sac_outbound_streams));
 
-  NS_ENSURE_TRUE_VOID(sac);
+  if (NS_WARN_IF(!sac)) {
+    return;
+  }
+
   n = sac->sac_length - sizeof(*sac);
-  if (((sac->sac_state == SCTP_COMM_UP) ||
-        (sac->sac_state == SCTP_RESTART)) && (n > 0)) {
-    for (i = 0; i < n; ++i) {
-      switch (sac->sac_info[i]) {
-      case SCTP_ASSOC_SUPPORTS_PR:
-        LOG(("Supports: PR"));
-        break;
-      case SCTP_ASSOC_SUPPORTS_AUTH:
-        LOG(("Supports: AUTH"));
-        break;
-      case SCTP_ASSOC_SUPPORTS_ASCONF:
-        LOG(("Supports: ASCONF"));
-        break;
-      case SCTP_ASSOC_SUPPORTS_MULTIBUF:
-        LOG(("Supports: MULTIBUF"));
-        break;
-      case SCTP_ASSOC_SUPPORTS_RE_CONFIG:
-        LOG(("Supports: RE-CONFIG"));
-        break;
-      default:
-        LOG(("Supports: UNKNOWN(0x%02x)", sac->sac_info[i]));
-        break;
+  if ((sac->sac_state == SCTP_COMM_UP) || (sac->sac_state == SCTP_RESTART)) {
+    if (n > 0) {
+      for (i = 0; i < n; ++i) {
+        switch (sac->sac_info[i]) {
+        case SCTP_ASSOC_SUPPORTS_PR:
+          LOG(("Supports: PR"));
+          break;
+        case SCTP_ASSOC_SUPPORTS_AUTH:
+          LOG(("Supports: AUTH"));
+          break;
+        case SCTP_ASSOC_SUPPORTS_ASCONF:
+          LOG(("Supports: ASCONF"));
+          break;
+        case SCTP_ASSOC_SUPPORTS_MULTIBUF:
+          LOG(("Supports: MULTIBUF"));
+          break;
+        case SCTP_ASSOC_SUPPORTS_RE_CONFIG:
+          LOG(("Supports: RE-CONFIG"));
+          break;
+#if defined(SCTP_ASSOC_SUPPORTS_INTERLEAVING)
+        case SCTP_ASSOC_SUPPORTS_INTERLEAVING:
+          LOG(("Supports: NDATA"));
+          // TODO: This should probably be set earlier above in 'case SCTP_COMM_UP' but we also
+          //       need this for 'SCTP_RESTART'.
+          mSendInterleaved = true;
+          break;
+#endif
+        default:
+          LOG(("Supports: UNKNOWN(0x%02x)", sac->sac_info[i]));
+          break;
+        }
       }
     }
   } else if (((sac->sac_state == SCTP_COMM_LOST) ||
               (sac->sac_state == SCTP_CANT_STR_ASSOC)) && (n > 0)) {
     LOG(("Association: ABORT ="));
     for (i = 0; i < n; ++i) {
       LOG((" 0x%02x", sac->sac_info[i]));
     }
@@ -1620,16 +1867,48 @@ DataChannelConnection::HandleShutdownEve
 
 void
 DataChannelConnection::HandleAdaptationIndication(const struct sctp_adaptation_event *sai)
 {
   LOG(("Adaptation indication: %x.", sai-> sai_adaptation_ind));
 }
 
 void
+DataChannelConnection::HandlePartialDeliveryEvent(const struct sctp_pdapi_event *spde)
+{
+  // Note: Be aware that stream and sequence number being u32 instead of u16 is
+  //       a bug in the SCTP API. This may change in the future.
+
+  LOG(("Partial delivery event: "));
+  switch (spde->pdapi_indication) {
+    case SCTP_PARTIAL_DELIVERY_ABORTED:
+      LOG(("delivery aborted "));
+      break;
+    default:
+      LOG(("??? "));
+      break;
+  }
+  LOG(("(flags = %x), stream = %" PRIu32 ", sn = %" PRIu32, spde->pdapi_flags, spde->pdapi_stream,
+       spde->pdapi_seq));
+
+  // Validate stream ID
+  if (spde->pdapi_stream >= UINT16_MAX) {
+    LOG(("Invalid stream id in partial delivery event: %" PRIu32 "\n", spde->pdapi_stream));
+    return;
+  }
+
+  // Find channel and reset buffer
+  DataChannel *channel = FindChannelByStream((uint16_t)spde->pdapi_stream);
+  if (channel) {
+    LOG(("Abort partially delivered message of %u bytes\n", channel->mRecvBuffer.Length()));
+    channel->mRecvBuffer.Truncate(0);
+  }
+}
+
+void
 DataChannelConnection::HandleSendFailedEvent(const struct sctp_send_failed_event *ssfe)
 {
   size_t i, n;
 
   if (ssfe->ssfe_flags & SCTP_DATA_UNSENT) {
     LOG(("Unsent "));
   }
    if (ssfe->ssfe_flags & SCTP_DATA_SENT) {
@@ -1814,16 +2093,17 @@ DataChannelConnection::HandleStreamChang
     size_t num_needed = mPending.GetSize();
     LOG(("%zu of %d new streams already needed", num_needed,
          new_len - old_len));
     num_needed -= (new_len - old_len); // number we added
     if (num_needed > 0) {
       if (num_needed < 16)
         num_needed = 16;
       LOG(("Not enough new streams, asking for %zu more", num_needed));
+      // TODO: parameter is an int32_t but we pass size_t
       RequestMoreStreams(num_needed);
     } else if (strchg->strchange_outstrms < strchg->strchange_instrms) {
       LOG(("Requesting %d output streams to match partner",
            strchg->strchange_instrms - strchg->strchange_outstrms));
       RequestMoreStreams(strchg->strchange_instrms - strchg->strchange_outstrms);
     }
 
     ProcessQueuedOpens();
@@ -1845,19 +2125,39 @@ DataChannelConnection::HandleStreamChang
                    DataChannelOnMessageAvailable::ON_CHANNEL_CLOSED, this,
                    channel)));
         // maybe fire onError (bug 843625)
       } else {
         stream = FindFreeStream();
         if (stream != INVALID_STREAM) {
           channel->mStream = stream;
           mStreams[stream] = channel;
-          channel->mFlags |= DATA_CHANNEL_FLAGS_SEND_REQ;
-          // Note: we're locked, so there's no danger of a race with the
-          // buffer-threshold callback
+
+          // Send open request
+          int error = SendOpenRequestMessage(
+              channel->mLabel, channel->mProtocol, channel->mStream,
+              !!(channel->mFlags & DATA_CHANNEL_FLAGS_OUT_OF_ORDER_ALLOWED), channel->mPrPolicy,
+              channel->mPrValue);
+          if (error) {
+            LOG(("SendOpenRequest failed, error = %d", error));
+            // Close the channel, inform the user
+            mStreams[channel->mStream] = nullptr;
+            channel->mState = CLOSED;
+            // Don't need to reset; we didn't open it
+            Dispatch(do_AddRef(new DataChannelOnMessageAvailable(
+                       DataChannelOnMessageAvailable::ON_CHANNEL_CLOSED, this,
+                       channel)));
+          } else {
+            channel->mState = OPEN;
+            channel->mFlags |= DATA_CHANNEL_FLAGS_READY;
+            LOG(("%s: sending ON_CHANNEL_OPEN for %p", __FUNCTION__, channel.get()));
+            Dispatch(do_AddRef(new DataChannelOnMessageAvailable(
+                       DataChannelOnMessageAvailable::ON_CHANNEL_OPEN, this,
+                       channel)));
+          }
         } else {
           /* We will not find more ... */
           break;
         }
       }
     }
   }
 }
@@ -1881,28 +2181,28 @@ DataChannelConnection::HandleNotificatio
     HandleRemoteErrorEvent(&(notif->sn_remote_error));
     break;
   case SCTP_SHUTDOWN_EVENT:
     HandleShutdownEvent(&(notif->sn_shutdown_event));
     break;
   case SCTP_ADAPTATION_INDICATION:
     HandleAdaptationIndication(&(notif->sn_adaptation_event));
     break;
-  case SCTP_PARTIAL_DELIVERY_EVENT:
-    LOG(("SCTP_PARTIAL_DELIVERY_EVENT"));
-    break;
   case SCTP_AUTHENTICATION_EVENT:
     LOG(("SCTP_AUTHENTICATION_EVENT"));
     break;
   case SCTP_SENDER_DRY_EVENT:
     //LOG(("SCTP_SENDER_DRY_EVENT"));
     break;
   case SCTP_NOTIFICATIONS_STOPPED_EVENT:
     LOG(("SCTP_NOTIFICATIONS_STOPPED_EVENT"));
     break;
+  case SCTP_PARTIAL_DELIVERY_EVENT:
+    HandlePartialDeliveryEvent(&(notif->sn_pdapi_event));
+    break;
   case SCTP_SEND_FAILED_EVENT:
     HandleSendFailedEvent(&(notif->sn_send_failed_event));
     break;
   case SCTP_STREAM_RESET_EVENT:
     HandleStreamResetEvent(&(notif->sn_strreset_event));
     break;
   case SCTP_ASSOC_RESET_EVENT:
     LOG(("SCTP_ASSOC_RESET_EVENT"));
@@ -1913,28 +2213,28 @@ DataChannelConnection::HandleNotificatio
   default:
     LOG(("unknown SCTP event: %u", (uint32_t)notif->sn_header.sn_type));
     break;
    }
  }
 
 int
 DataChannelConnection::ReceiveCallback(struct socket* sock, void *data, size_t datalen,
-                                       struct sctp_rcvinfo rcv, int32_t flags)
+                                       struct sctp_rcvinfo rcv, int flags)
 {
   ASSERT_WEBRTC(!NS_IsMainThread());
 
   if (!data) {
     usrsctp_close(sock); // SCTP has finished shutting down
   } else {
     MutexAutoLock lock(mLock);
     if (flags & MSG_NOTIFICATION) {
       HandleNotification(static_cast<union sctp_notification *>(data), datalen);
     } else {
-      HandleMessage(data, datalen, ntohl(rcv.rcv_ppid), rcv.rcv_sid);
+      HandleMessage(data, datalen, ntohl(rcv.rcv_ppid), rcv.rcv_sid, flags);
     }
   }
   // sctp allocates 'data' with malloc(), and expects the receiver to free
   // it (presumably with free).
   // XXX future optimization: try to deliver messages without an internal
   // alloc/copy, and if so delay the free until later.
   free(data);
   // usrsctp defines the callback as returning an int, but doesn't use it
@@ -1979,16 +2279,17 @@ DataChannelConnection::Open(const nsACSt
   if (aStream != INVALID_STREAM && aStream < mStreams.Length() && mStreams[aStream]) {
     LOG(("ERROR: external negotiation of already-open channel %u", aStream));
     // XXX How do we indicate this up to the application?  Probably the
     // caller's job, but we may need to return an error code.
     return nullptr;
   }
 
   flags = !inOrder ? DATA_CHANNEL_FLAGS_OUT_OF_ORDER_ALLOWED : 0;
+
   RefPtr<DataChannel> channel(new DataChannel(this,
                                                 aStream,
                                                 DataChannel::CONNECTING,
                                                 label, protocol,
                                                 prPolicy, prValue,
                                                 flags,
                                                 aListener, aContext));
   if (aExternalNegotiated) {
@@ -2098,37 +2399,32 @@ DataChannelConnection::OpenFinish(alread
   MOZ_ASSERT(stream != INVALID_STREAM);
   // just allocated (& OPEN), or externally negotiated
   mStreams[stream] = channel; // holds a reference
   channel->mStream = stream;
 
 #ifdef TEST_QUEUED_DATA
   // It's painful to write a test for this...
   channel->mState = OPEN;
-  channel->mReady = true;
-  SendMsgInternal(channel, "Help me!", 8, DATA_CHANNEL_PPID_DOMSTRING_LAST);
+  channel->mFlags |= DATA_CHANNEL_FLAGS_READY;
+  SendDataMsgInternalOrBuffer(channel, "Help me!", 8, DATA_CHANNEL_PPID_DOMSTRING);
 #endif
 
   if (channel->mFlags & DATA_CHANNEL_FLAGS_OUT_OF_ORDER_ALLOWED) {
     // Don't send unordered until this gets cleared
     channel->mFlags |= DATA_CHANNEL_FLAGS_WAITING_ACK;
   }
 
   if (!(channel->mFlags & DATA_CHANNEL_FLAGS_EXTERNAL_NEGOTIATED)) {
-    if (!SendOpenRequestMessage(channel->mLabel, channel->mProtocol,
-                                stream,
-                                !!(channel->mFlags & DATA_CHANNEL_FLAGS_OUT_OF_ORDER_ALLOWED),
-                                channel->mPrPolicy, channel->mPrValue)) {
-      LOG(("SendOpenRequest failed, errno = %d", errno));
-      if (errno == EAGAIN || errno == EWOULDBLOCK) {
-        channel->mFlags |= DATA_CHANNEL_FLAGS_SEND_REQ;
-        // Note: we're locked, so there's no danger of a race with the
-        // buffer-threshold callback
-        return channel.forget();
-      }
+    int error = SendOpenRequestMessage(
+        channel->mLabel, channel->mProtocol, stream,
+        !!(channel->mFlags & DATA_CHANNEL_FLAGS_OUT_OF_ORDER_ALLOWED), channel->mPrPolicy,
+        channel->mPrValue);
+    if (error) {
+      LOG(("SendOpenRequest failed, error = %d", error));
       if (channel->mFlags & DATA_CHANNEL_FLAGS_FINISH_OPEN) {
         // We already returned the channel to the app.
         NS_ERROR("Failed to send open request");
         Dispatch(do_AddRef(new DataChannelOnMessageAvailable(
                    DataChannelOnMessageAvailable::ON_CHANNEL_CLOSED, this,
                    channel)));
       }
       // If we haven't returned the channel yet, it will get destroyed when we exit
@@ -2138,17 +2434,17 @@ DataChannelConnection::OpenFinish(alread
       // we'll be destroying the channel
       channel->mState = CLOSED;
       return nullptr;
       /* NOTREACHED */
     }
   }
   // Either externally negotiated or we sent Open
   channel->mState = OPEN;
-  channel->mReady = true;
+  channel->mFlags |= DATA_CHANNEL_FLAGS_READY;
   // FIX?  Move into DOMDataChannel?  I don't think we can send it yet here
   LOG(("%s: sending ON_CHANNEL_OPEN for %p", __FUNCTION__, channel.get()));
   Dispatch(do_AddRef(new DataChannelOnMessageAvailable(
              DataChannelOnMessageAvailable::ON_CHANNEL_OPEN, this,
              channel)));
 
   return channel.forget();
 
@@ -2163,145 +2459,272 @@ request_error_cleanup:
     return channel.forget();
   }
   // we'll be destroying the channel, but it never really got set up
   // Alternative would be to RUN_ON_THREAD(channel.forget(),::Destroy,...) and
   // Dispatch it to ourselves
   return nullptr;
 }
 
-int32_t
-DataChannelConnection::SendMsgInternal(DataChannel *channel, const char *data,
-                                       size_t length, uint32_t ppid)
+// Requires mLock to be locked!
+// Returns a POSIX error code directly instead of setting errno.
+int
+DataChannelConnection::SendMsgInternal(OutgoingMsg &msg)
 {
-  uint16_t flags;
-  struct sctp_sendv_spa spa;
-  int32_t result;
-
-  NS_ENSURE_TRUE(channel->mState == OPEN || channel->mState == CONNECTING, 0);
-  NS_WARNING_ASSERTION(length > 0, "Length is 0?!");
-
-  // To avoid problems where an in-order OPEN is lost and an
-  // out-of-order data message "beats" it, require data to be in-order
-  // until we get an ACK.
-  if ((channel->mFlags & DATA_CHANNEL_FLAGS_OUT_OF_ORDER_ALLOWED) &&
-      !(channel->mFlags & DATA_CHANNEL_FLAGS_WAITING_ACK)) {
-    flags = SCTP_UNORDERED;
-  } else {
-    flags = 0;
+  auto &info = msg.GetInfo().sendv_sndinfo;
+  int error;
+
+  // EOR set?
+  bool eor_set = info.snd_flags & SCTP_EOR ? true : false;
+
+  // Send until buffer is empty
+  size_t left = msg.GetLeft();
+  do {
+    size_t length;
+
+    // Carefully chunk the buffer
+    if (left > DATA_CHANNEL_MAX_BINARY_FRAGMENT) {
+      length = DATA_CHANNEL_MAX_BINARY_FRAGMENT;
+
+      // Unset EOR flag
+      info.snd_flags &= ~SCTP_EOR;
+    } else {
+      length = left;
+
+      // Set EOR flag
+      if (eor_set) {
+        info.snd_flags |= SCTP_EOR;
+      }
+    }
+
+    // Send (or try at least)
+    // SCTP will return EMSGSIZE if the message is bigger than the buffer
+    // size (or EAGAIN if there isn't space). However, we can avoid EMSGSIZE
+    // by carefully crafting small enough message chunks.
+    ssize_t written = usrsctp_sendv(
+        mSocket, msg.GetData(), length, nullptr, 0,
+        (void *)&msg.GetInfo(), (socklen_t)sizeof(struct sctp_sendv_spa),
+        SCTP_SENDV_SPA, 0);
+    if (written < 0) {
+      error = errno;
+      goto out;
+    }
+    LOG(("Sent buffer (written=%zu, len=%zu, left=%zu)",
+         (size_t)written, length, left - (size_t)written));
+
+    // TODO: Remove once resolved (https://github.com/sctplab/usrsctp/issues/132)
+    if (written == 0) {
+      LOG(("@tuexen: usrsctp_sendv returned 0"));
+      error = EAGAIN;
+      goto out;
+    }
+
+    // If not all bytes have been written, this obviously means that usrsctp's buffer is full
+    // and we need to try again later.
+    if ((size_t)written < length) {
+      msg.Advance((size_t)written);
+      error = EAGAIN;
+      goto out;
+    }
+
+    // Update buffer position
+    msg.Advance((size_t)written);
+
+    // Get amount of bytes left in the buffer
+    left = msg.GetLeft();
+  } while (left > 0);
+
+  // Done
+  error = 0;
+
+out:
+  // Reset EOR flag
+  if (eor_set) {
+    info.snd_flags |= SCTP_EOR;
   }
 
-  spa.sendv_sndinfo.snd_ppid = htonl(ppid);
-  spa.sendv_sndinfo.snd_sid = channel->mStream;
-  spa.sendv_sndinfo.snd_flags = flags;
-  spa.sendv_sndinfo.snd_context = 0;
-  spa.sendv_sndinfo.snd_assoc_id = 0;
-  spa.sendv_flags = SCTP_SEND_SNDINFO_VALID;
-
-  if (channel->mPrPolicy != SCTP_PR_SCTP_NONE) {
-    spa.sendv_prinfo.pr_policy = channel->mPrPolicy;
-    spa.sendv_prinfo.pr_value = channel->mPrValue;
-    spa.sendv_flags |= SCTP_SEND_PRINFO_VALID;
-  }
+  return error;
+}
+
+// Requires mLock to be locked!
+// Returns a POSIX error code directly instead of setting errno.
+// IMPORTANT: Ensure that the buffer passed is guarded by mLock!
+int
+DataChannelConnection::SendMsgInternalOrBuffer(nsTArray<nsAutoPtr<BufferedOutgoingMsg>> &buffer,
+                                               OutgoingMsg &msg, bool &buffered)
+{
+  NS_WARNING_ASSERTION(msg.GetLength() > 0, "Length is 0?!");
+
+  int error = 0;
+  bool need_buffering = false;
 
   // Note: Main-thread IO, but doesn't block!
   // XXX FIX!  to deal with heavy overruns of JS trying to pass data in
   // (more than the buffersize) queue data onto another thread to do the
   // actual sends.  See netwerk/protocol/websocket/WebSocketChannel.cpp
 
-  // SCTP will return EMSGSIZE if the message is bigger than the buffer
-  // size (or EAGAIN if there isn't space)
-
   // Avoid a race between buffer-full-failure (where we have to add the
   // packet to the buffered-data queue) and the buffer-now-only-half-full
   // callback, which happens on a different thread.  Otherwise we might
   // fail here, then before we add it to the queue get the half-full
   // callback, find nothing to do, then on this thread add it to the
   // queue - which would sit there.  Also, if we later send more data, it
   // would arrive ahead of the buffered message, but if the buffer ever
   // got to 1/2 full, the message would get sent - but at a semi-random
   // time, after other data it was supposed to be in front of.
 
   // Must lock before empty check for similar reasons!
-  MutexAutoLock lock(mLock);
-  if (channel->mBufferedData.IsEmpty()) {
-    result = usrsctp_sendv(mSocket, data, length,
-                           nullptr, 0,
-                           (void *)&spa, (socklen_t)sizeof(struct sctp_sendv_spa),
-                           SCTP_SENDV_SPA, 0);
-    LOG(("Sent buffer (len=%zu), result=%d", length, result));
+  mLock.AssertCurrentThreadOwns();
+  if (buffer.IsEmpty() && (mSendInterleaved || !mPendingType)) {
+    error = SendMsgInternal(msg);
+    switch (error) {
+      case 0:
+        break;
+      case EAGAIN:
+#if (EAGAIN != EWOULDBLOCK)
+      case EWOULDBLOCK:
+#endif
+        need_buffering = true;
+        break;
+      default:
+        LOG(("error %d on sending", error));
+        break;
+    }
   } else {
-    // Fake EAGAIN if we're already buffering data
-    result = -1;
-    errno = EAGAIN;
+    need_buffering = true;
+  }
+
+  if (need_buffering) {
+    // queue data for resend!  And queue any further data for the stream until it is...
+    auto *bufferedMsg = new BufferedOutgoingMsg(msg); // infallible malloc
+    buffer.AppendElement(bufferedMsg); // owned by mBufferedData array
+    LOG(("Queued %zu buffers (left=%zu, total=%zu)",
+         buffer.Length(), msg.GetLeft(), msg.GetLength()));
+    buffered = true;
+    return 0;
+  }
+
+  buffered = false;
+  return error;
+}
+
+// Caller must ensure that length <= UINT32_MAX
+// Returns a POSIX error code.
+int
+DataChannelConnection::SendDataMsgInternalOrBuffer(DataChannel &channel, const uint8_t *data,
+                                                   size_t len, uint32_t ppid)
+{
+  if (NS_WARN_IF(channel.mState != OPEN && channel.mState != CONNECTING)) {
+    return EINVAL; // TODO: Find a better error code
+  }
+
+  struct sctp_sendv_spa info = {0};
+
+  // General flags
+  info.sendv_flags = SCTP_SEND_SNDINFO_VALID;
+
+  // Set stream identifier, protocol identifier and flags
+  info.sendv_sndinfo.snd_sid = channel.mStream;
+  info.sendv_sndinfo.snd_flags = SCTP_EOR;
+  info.sendv_sndinfo.snd_ppid = htonl(ppid);
+
+  // Unordered?
+  // To avoid problems where an in-order OPEN is lost and an
+  // out-of-order data message "beats" it, require data to be in-order
+  // until we get an ACK.
+  if ((channel.mFlags & DATA_CHANNEL_FLAGS_OUT_OF_ORDER_ALLOWED) &&
+      !(channel.mFlags & DATA_CHANNEL_FLAGS_WAITING_ACK)) {
+    info.sendv_sndinfo.snd_flags |= SCTP_UNORDERED;
+  }
+
+  // Partial reliability policy
+  if (channel.mPrPolicy != SCTP_PR_SCTP_NONE) {
+    info.sendv_prinfo.pr_policy = channel.mPrPolicy;
+    info.sendv_prinfo.pr_value = channel.mPrValue;
+    info.sendv_flags |= SCTP_SEND_PRINFO_VALID;
   }
-  if (result < 0) {
-    if (errno == EAGAIN || errno == EWOULDBLOCK) {
-
-      // queue data for resend!  And queue any further data for the stream until it is...
-      auto *buffered = new BufferedMsg(spa, data, length); // infallible malloc
-      channel->mBufferedData.AppendElement(buffered); // owned by mBufferedData array
-      channel->mFlags |= DATA_CHANNEL_FLAGS_SEND_DATA;
-      LOG(("Queued %zu buffers (len=%zu)",
-           channel->mBufferedData.Length(), length));
+
+  // Create message instance and send
+  OutgoingMsg msg(info, data, len);
+  MutexAutoLock lock(mLock);
+  bool buffered;
+  int error = SendMsgInternalOrBuffer(channel.mBufferedData, msg, buffered);
+
+  // Set pending type and stream index (if buffered)
+  if (!error && buffered && !mPendingType) {
+    mPendingType = PENDING_DATA;
+    mCurrentStream = channel.mStream;
+  }
+  return error;
+}
+
+// Caller must ensure that length <= UINT32_MAX
+// Returns a POSIX error code.
+int
+DataChannelConnection::SendDataMsg(DataChannel &channel, const uint8_t *data, size_t len,
+                                   uint32_t ppidPartial, uint32_t ppidFinal)
+{
+  // We *really* don't want to do this from main thread! - and
+  // SendDataMsgInternalOrBuffer avoids blocking.
+
+  if (mPpidFragmentation) {
+    // TODO: Bug 1381136, remove this block and all other code that uses PPIDs for fragmentation
+    //       and reassembly once older Firefoxes without EOR are no longer supported as target
+    //       clients.
+
+    // Use the deprecated PPID-level fragmentation if enabled. Should be enabled
+    // in case we can be certain that the other peer is an older Firefox browser
+    // that does support PPID-level fragmentation/reassembly.
+
+    // PPID-level fragmentation can only be applied on reliable data channels.
+    if (len > DATA_CHANNEL_MAX_BINARY_FRAGMENT &&
+        channel.mPrPolicy == DATA_CHANNEL_RELIABLE &&
+        !(channel.mFlags & DATA_CHANNEL_FLAGS_OUT_OF_ORDER_ALLOWED)) {
+      LOG(("Sending data message (total=%zu) using deprecated PPID-based chunks", len));
+
+      size_t left = len;
+      while (left > 0) {
+        // Note: For correctness, chunkLen should also consider mMaxMessageSize as minimum but as
+        //       this block is going to be removed soon, I see no need for it.
+        size_t chunkLen = std::min<size_t>(left, DATA_CHANNEL_MAX_BINARY_FRAGMENT);
+        left -= chunkLen;
+        uint32_t ppid = left > 0 ? ppidPartial : ppidFinal;
+
+        // Send the chunk
+        // Note that these might end up being deferred and queued.
+        LOG(("Send chunk (len=%zu, left=%zu, total=%zu, ppid %u",
+             chunkLen, left, len, ppid));
+        int error = SendDataMsgInternalOrBuffer(channel, data, chunkLen, ppid);
+        if (error) {
+          LOG(("*** send chunk fail %d", error));
+          return error;
+        }
+
+        // Update data position
+        data += chunkLen;
+      }
+
+      // Sending chunks complete
+      LOG(("Sent %zu chunks using deprecated PPID-based fragmentation",
+           (size_t)(len+DATA_CHANNEL_MAX_BINARY_FRAGMENT-1)/DATA_CHANNEL_MAX_BINARY_FRAGMENT));
       return 0;
     }
-    LOG(("error %d sending string", errno));
+
+    // Cannot do PPID-based fragmentaton on unreliable channels
+    NS_WARNING_ASSERTION(len <= DATA_CHANNEL_MAX_BINARY_FRAGMENT,
+                         "Sending too-large data on unreliable channel!");
+  } else {
+    if (mMaxMessageSize != 0 && len > mMaxMessageSize) {
+      LOG(("Message rejected, too large (%zu > %" PRIu64 ")", len, mMaxMessageSize));
+      return EMSGSIZE;
+    }
   }
-  return result;
-}
-
-// Handles fragmenting binary messages
-int32_t
-DataChannelConnection::SendBinary(DataChannel *channel, const char *data,
-                                  size_t len,
-                                  uint32_t ppid_partial, uint32_t ppid_final)
-{
-  // Since there's a limit on network buffer size and no limits on message
-  // size, and we don't want to use EOR mode (multiple writes for a
-  // message, but all other streams are blocked until you finish sending
-  // this message), we need to add application-level fragmentation of large
-  // messages.  On a reliable channel, these can be simply rebuilt into a
-  // large message.  On an unreliable channel, we can't and don't know how
-  // long to wait, and there are no retransmissions, and no easy way to
-  // tell the user "this part is missing", so on unreliable channels we
-  // need to return an error if sending more bytes than the network buffers
-  // can hold, and perhaps a lower number.
-
-  // We *really* don't want to do this from main thread! - and SendMsgInternal
-  // avoids blocking.
-  // This MUST be reliable and in-order for the reassembly to work
-  if (len > DATA_CHANNEL_MAX_BINARY_FRAGMENT &&
-      channel->mPrPolicy == DATA_CHANNEL_RELIABLE &&
-      !(channel->mFlags & DATA_CHANNEL_FLAGS_OUT_OF_ORDER_ALLOWED)) {
-    int32_t sent=0;
-    uint32_t origlen = len;
-    LOG(("Sending binary message length %zu in chunks", len));
-    // XXX check flags for out-of-order, or force in-order for large binary messages
-    while (len > 0) {
-      size_t sendlen = std::min<size_t>(len, DATA_CHANNEL_MAX_BINARY_FRAGMENT);
-      uint32_t ppid;
-      len -= sendlen;
-      ppid = len > 0 ? ppid_partial : ppid_final;
-      LOG(("Send chunk of %zu bytes, ppid %u", sendlen, ppid));
-      // Note that these might end up being deferred and queued.
-      sent += SendMsgInternal(channel, data, sendlen, ppid);
-      data += sendlen;
-    }
-    LOG(("Sent %d buffers for %u bytes, %d sent immediately, %zu buffers queued",
-         (origlen+DATA_CHANNEL_MAX_BINARY_FRAGMENT-1)/DATA_CHANNEL_MAX_BINARY_FRAGMENT,
-         origlen, sent,
-         channel->mBufferedData.Length()));
-    return sent;
-  }
-  NS_WARNING_ASSERTION(len <= DATA_CHANNEL_MAX_BINARY_FRAGMENT,
-                       "Sending too-large data on unreliable channel!");
-
-  // This will fail if the message is too large (default 256K)
-  return SendMsgInternal(channel, data, len, ppid_final);
+
+  // This will use EOR-based fragmentation if the message is too large (> 64 KiB)
+  return SendDataMsgInternalOrBuffer(channel, data, len, ppidFinal);
 }
 
 class ReadBlobRunnable : public Runnable {
 public:
   ReadBlobRunnable(DataChannelConnection* aConnection,
                    uint16_t aStream,
                    nsIInputStream* aBlob)
     : Runnable("ReadBlobRunnable")
@@ -2324,45 +2747,48 @@ private:
   // can send the IOThread to MainThread to die in a runnable, avoiding
   // unsafe event loop recursion.  Evil.
   RefPtr<DataChannelConnection> mConnection;
   uint16_t mStream;
   // Use RefCount for preventing the object is deleted when SendBlob returns.
   RefPtr<nsIInputStream> mBlob;
 };
 
-int32_t
+// Returns a POSIX error code.
+int
 DataChannelConnection::SendBlob(uint16_t stream, nsIInputStream *aBlob)
 {
   DataChannel *channel = mStreams[stream];
-  NS_ENSURE_TRUE(channel, 0);
+  if (NS_WARN_IF(!channel)) {
+    return EINVAL; // TODO: Find a better error code
+  }
+
   // Spawn a thread to send the data
   if (!mInternalIOThread) {
     nsresult rv = NS_NewNamedThread("DataChannel IO",
                                     getter_AddRefs(mInternalIOThread));
     if (NS_FAILED(rv)) {
-      return -1;
+      return EINVAL; // TODO: Find a better error code
     }
   }
 
   mInternalIOThread->Dispatch(do_AddRef(new ReadBlobRunnable(this, stream, aBlob)), NS_DISPATCH_NORMAL);
   return 0;
 }
 
 class DataChannelBlobSendRunnable : public Runnable
 {
 public:
   DataChannelBlobSendRunnable(
     already_AddRefed<DataChannelConnection>& aConnection,
     uint16_t aStream)
     : Runnable("DataChannelBlobSendRunnable")
     , mConnection(aConnection)
     , mStream(aStream)
-  {
-  }
+  {}
 
   ~DataChannelBlobSendRunnable() override
   {
     if (!NS_IsMainThread() && mConnection) {
       MOZ_ASSERT(false);
       // explicitly leak the connection if destroyed off mainthread
       Unused << mConnection.forget().take();
     }
@@ -2431,39 +2857,60 @@ DataChannelConnection::GetStreamIds(std:
   ASSERT_WEBRTC(NS_IsMainThread());
   for (uint32_t i = 0; i < mStreams.Length(); ++i) {
     if (mStreams[i]) {
       aStreamList->push_back(mStreams[i]->mStream);
     }
   }
 }
 
-int32_t
-DataChannelConnection::SendMsgCommon(uint16_t stream, const nsACString &aMsg,
-                                     bool isBinary)
+// Returns a POSIX error code.
+int
+DataChannelConnection::SendDataMsgCommon(uint16_t stream, const nsACString &aMsg,
+                                         bool isBinary)
 {
   ASSERT_WEBRTC(NS_IsMainThread());
   // We really could allow this from other threads, so long as we deal with
   // asynchronosity issues with channels closing, in particular access to
   // mStreams, and issues with the association closing (access to mSocket).
 
-  const char *data = aMsg.BeginReading();
+  const uint8_t *data = (const uint8_t *)aMsg.BeginReading();
   uint32_t len     = aMsg.Length();
-  DataChannel *channel;
+#if (UINT32_MAX > SIZE_MAX)
+  if (len > SIZE_MAX) {
+    return EMSGSIZE;
+  }
+#endif
+  DataChannel *channelPtr;
 
   LOG(("Sending %sto stream %u: %u bytes", isBinary ? "binary " : "", stream, len));
   // XXX if we want more efficiency, translate flags once at open time
-  channel = mStreams[stream];
-  NS_ENSURE_TRUE(channel, 0);
-
-  if (isBinary)
-    return SendBinary(channel, data, len,
-                      DATA_CHANNEL_PPID_BINARY, DATA_CHANNEL_PPID_BINARY_LAST);
-  return SendBinary(channel, data, len,
-                    DATA_CHANNEL_PPID_DOMSTRING, DATA_CHANNEL_PPID_DOMSTRING_LAST);
+  channelPtr = mStreams[stream];
+  if (NS_WARN_IF(!channelPtr)) {
+    return EINVAL; // TODO: Find a better error code
+  }
+
+  auto &channel = *channelPtr;
+
+  if (isBinary) {
+    return SendDataMsg(channel, data, len,
+                       DATA_CHANNEL_PPID_BINARY_PARTIAL, DATA_CHANNEL_PPID_BINARY);
+  } else {
+    return SendDataMsg(channel, data, len,
+                       DATA_CHANNEL_PPID_DOMSTRING_PARTIAL, DATA_CHANNEL_PPID_DOMSTRING);
+  }
+}
+
+void
+DataChannelConnection::Stop()
+{
+  // Note: This will call 'CloseAll' from the main thread
+  Dispatch(do_AddRef(new DataChannelOnMessageAvailable(
+             DataChannelOnMessageAvailable::ON_DISCONNECTED,
+             this)));
 }
 
 void
 DataChannelConnection::Close(DataChannel *aChannel)
 {
   MutexAutoLock lock(mLock);
   CloseInt(aChannel);
 }
@@ -2592,25 +3039,70 @@ DataChannel::ReleaseConnection()
 void
 DataChannel::SetListener(DataChannelListener *aListener, nsISupports *aContext)
 {
   MutexAutoLock mLock(mListenerLock);
   mContext = aContext;
   mListener = aListener;
 }
 
+void
+DataChannel::SendErrnoToErrorResult(int error, ErrorResult& aRv)
+{
+  switch (error) {
+    case 0:
+      break;
+    case EMSGSIZE:
+      aRv.Throw(NS_ERROR_DOM_TYPE_ERR);
+      break;
+    default:
+      aRv.Throw(NS_ERROR_DOM_OPERATION_ERR);
+      break;
+  }
+}
+
+void
+DataChannel::SendMsg(const nsACString &aMsg, ErrorResult& aRv)
+{
+  if (!EnsureValidStream(aRv)) {
+    return;
+  }
+
+  SendErrnoToErrorResult(mConnection->SendMsg(mStream, aMsg), aRv);
+}
+
+void
+DataChannel::SendBinaryMsg(const nsACString &aMsg, ErrorResult& aRv)
+{
+  if (!EnsureValidStream(aRv)) {
+    return;
+  }
+
+  SendErrnoToErrorResult(mConnection->SendBinaryMsg(mStream, aMsg), aRv);
+}
+
+void
+DataChannel::SendBinaryStream(nsIInputStream *aBlob,ErrorResult& aRv)
+{
+  if (!EnsureValidStream(aRv)) {
+    return;
+  }
+
+  SendErrnoToErrorResult(mConnection->SendBlob(mStream, aBlob), aRv);
+}
+
 // May be called from another (i.e. Main) thread!
 void
 DataChannel::AppReady()
 {
   ENSURE_DATACONNECTION;
 
   MutexAutoLock lock(mConnection->mLock);
 
-  mReady = true;
+  mFlags |= DATA_CHANNEL_FLAGS_READY;
   if (mState == WAITING_TO_OPEN) {
     mState = OPEN;
     mMainThreadEventTarget->Dispatch(
       do_AddRef(new DataChannelOnMessageAvailable(
                   DataChannelOnMessageAvailable::ON_CHANNEL_OPEN, mConnection,
                   this)));
     for (uint32_t i = 0; i < mQueuedMessages.Length(); ++i) {
       nsCOMPtr<nsIRunnable> runnable = mQueuedMessages[i];
@@ -2621,34 +3113,31 @@ DataChannel::AppReady()
     NS_ASSERTION(mQueuedMessages.IsEmpty(), "Shouldn't have queued messages if not WAITING_TO_OPEN");
   }
   mQueuedMessages.Clear();
   mQueuedMessages.Compact();
   // We never use it again...  We could even allocate the array in the odd
   // cases we need it.
 }
 
-uint32_t
+size_t
 DataChannel::GetBufferedAmountLocked() const
 {
   size_t buffered = 0;
 
-  for (auto& buffer : mBufferedData) {
-    buffered += buffer->mLength;
+  for (auto &msg : mBufferedData) {
+    buffered += msg->GetLeft();
   }
   // XXX Note: per Michael Tuexen, there's no way to currently get the buffered
   // amount from the SCTP stack for a single stream.  It is on their to-do
   // list, and once we import a stack with support for that, we'll need to
   // add it to what we buffer.  Also we'll need to ask for notification of a per-
   // stream buffer-low event and merge that into the handling of buffer-low
   // (the equivalent to TCP_NOTSENT_LOWAT on TCP sockets)
 
-  if (buffered > UINT32_MAX) { // paranoia - >4GB buffered is very very unlikely
-    buffered = UINT32_MAX;
-  }
   return buffered;
 }
 
 uint32_t
 DataChannel::GetBufferedAmountLowThreshold()
 {
   return mBufferedThreshold;
 }
@@ -2659,18 +3148,30 @@ DataChannel::SetBufferedAmountLowThresho
 {
   mBufferedThreshold = aThreshold;
 }
 
 // Called with mLock locked!
 void
 DataChannel::SendOrQueue(DataChannelOnMessageAvailable *aMessage)
 {
-  if (!mReady &&
+  if (!(mFlags & DATA_CHANNEL_FLAGS_READY) &&
       (mState == CONNECTING || mState == WAITING_TO_OPEN)) {
     mQueuedMessages.AppendElement(aMessage);
   } else {
     nsCOMPtr<nsIRunnable> runnable = aMessage;
     mMainThreadEventTarget->Dispatch(runnable.forget());
   }
 }
 
+bool
+DataChannel::EnsureValidStream(ErrorResult& aRv)
+{
+  MOZ_ASSERT(mConnection);
+  if (mConnection && mStream != INVALID_STREAM) {
+    return true;
+  } else {
+    aRv.Throw(NS_ERROR_DOM_INVALID_STATE_ERR);
+    return false;
+  }
+}
+
 } // namespace mozilla
--- a/netwerk/sctp/datachannel/DataChannel.h
+++ b/netwerk/sctp/datachannel/DataChannel.h
@@ -47,86 +47,118 @@ extern "C" {
 }
 
 namespace mozilla {
 
 class DataChannelConnection;
 class DataChannel;
 class DataChannelOnMessageAvailable;
 
-// For queuing outgoing messages
-class BufferedMsg
+// For sending outgoing messages.
+// This class only holds a reference to the data and the info structure but does
+// not copy it.
+class OutgoingMsg
 {
 public:
-  BufferedMsg(struct sctp_sendv_spa &spa,const char *data,
+  OutgoingMsg(struct sctp_sendv_spa &info, const uint8_t *data,
               size_t length);
-  ~BufferedMsg();
+  ~OutgoingMsg() {};
+  void Advance(size_t offset);
+  struct sctp_sendv_spa &GetInfo() { return *mInfo; };
+  size_t GetLength() { return mLength; };
+  size_t GetLeft() { return mLength - mPos; };
+  const uint8_t *GetData() { return (const uint8_t *)(mData + mPos); };
 
-  struct sctp_sendv_spa *mSpa;
-  const char *mData;
+protected:
+  OutgoingMsg() {}; // Use this for inheritance only
   size_t mLength;
+  const uint8_t *mData;
+  struct sctp_sendv_spa *mInfo;
+  size_t mPos;
+};
+
+// For queuing outgoing messages
+// This class copies data of an outgoing message.
+class BufferedOutgoingMsg : public OutgoingMsg
+{
+public:
+  explicit BufferedOutgoingMsg(OutgoingMsg &message);
+  ~BufferedOutgoingMsg();
 };
 
 // for queuing incoming data messages before the Open or
 // external negotiation is indicated to us
 class QueuedDataMessage
 {
 public:
-  QueuedDataMessage(uint16_t stream, uint32_t ppid,
-             const void *data, size_t length)
+  QueuedDataMessage(uint16_t stream, uint32_t ppid, int flags,
+                    const void *data, uint32_t length)
     : mStream(stream)
     , mPpid(ppid)
+    , mFlags(flags)
     , mLength(length)
   {
-    mData = static_cast<char *>(moz_xmalloc(length)); // infallible
-    memcpy(mData, data, length);
+    mData = static_cast<uint8_t *>(moz_xmalloc((size_t)length)); // infallible
+    memcpy(mData, data, (size_t)length);
   }
 
   ~QueuedDataMessage()
   {
     free(mData);
   }
 
   uint16_t mStream;
   uint32_t mPpid;
-  size_t   mLength;
-  char     *mData;
+  int      mFlags;
+  uint32_t mLength;
+  uint8_t  *mData;
 };
 
 // One per PeerConnection
 class DataChannelConnection final
   : public net::NeckoTargetHolder
 #ifdef SCTP_DTLS_SUPPORTED
   , public sigslot::has_slots<>
 #endif
 {
   virtual ~DataChannelConnection();
 
 public:
+  enum {
+      PENDING_NONE = 0U, // No outgoing messages are pending
+      PENDING_DCEP = 1U, // Outgoing DCEP messages are pending
+      PENDING_DATA = 2U, // Outgoing data channel messages are pending
+  };
+
   NS_INLINE_DECL_THREADSAFE_REFCOUNTING(DataChannelConnection)
 
   class DataConnectionListener : public SupportsWeakPtr<DataConnectionListener>
   {
   public:
     MOZ_DECLARE_WEAKREFERENCE_TYPENAME(DataChannelConnection::DataConnectionListener)
     virtual ~DataConnectionListener() {}
 
     // Called when a new DataChannel has been opened by the other side.
     virtual void NotifyDataChannel(already_AddRefed<DataChannel> channel) = 0;
   };
 
   explicit DataChannelConnection(DataConnectionListener *listener,
                                  nsIEventTarget *aTarget);
 
-  bool Init(unsigned short aPort, uint16_t aNumStreams, bool aUsingDtls);
+  bool Init(unsigned short aPort, uint16_t aNumStreams, bool aMaxMessageSizeSet,
+            uint64_t aMaxMessageSize);
+
   void Destroy(); // So we can spawn refs tied to runnables in shutdown
   // Finish Destroy on STS to avoid SCTP race condition with ABORT from far end
   void DestroyOnSTS(struct socket *aMasterSocket,
                     struct socket *aSocket);
 
+  void SetMaxMessageSize(bool aMaxMessageSizeSet, uint64_t aMaxMessageSize);
+  uint64_t GetMaxMessageSize();
+
 #ifdef ALLOW_DIRECT_SCTP_LISTEN_CONNECT
   // These block; they require something to decide on listener/connector
   // (though you can do simultaneous Connect()).  Do not call these from
   // the main thread!
   bool Listen(unsigned short port);
   bool Connect(const char *addr, unsigned short port);
 #endif
 
@@ -149,35 +181,41 @@ public:
                                      const nsACString& protocol,
                                      Type type, bool inOrder,
                                      uint32_t prValue,
                                      DataChannelListener *aListener,
                                      nsISupports *aContext,
                                      bool aExternalNegotiated,
                                      uint16_t aStream);
 
+  void Stop();
   void Close(DataChannel *aChannel);
   // CloseInt() must be called with mLock held
   void CloseInt(DataChannel *aChannel);
   void CloseAll();
 
-  int32_t SendMsg(uint16_t stream, const nsACString &aMsg)
+  // Returns a POSIX error code.
+  int SendMsg(uint16_t stream, const nsACString &aMsg)
     {
-      return SendMsgCommon(stream, aMsg, false);
+      return SendDataMsgCommon(stream, aMsg, false);
     }
-  int32_t SendBinaryMsg(uint16_t stream, const nsACString &aMsg)
+
+  // Returns a POSIX error code.
+  int SendBinaryMsg(uint16_t stream, const nsACString &aMsg)
     {
-      return SendMsgCommon(stream, aMsg, true);
+      return SendDataMsgCommon(stream, aMsg, true);
     }
-  int32_t SendBlob(uint16_t stream, nsIInputStream *aBlob);
+
+  // Returns a POSIX error code.
+  int SendBlob(uint16_t stream, nsIInputStream *aBlob);
 
   // Called on data reception from the SCTP library
   // must(?) be public so my c->c++ trampoline can call it
   int ReceiveCallback(struct socket* sock, void *data, size_t datalen,
-                      struct sctp_rcvinfo rcv, int32_t flags);
+                      struct sctp_rcvinfo rcv, int flags);
 
   // Find out state
   enum {
     CONNECTING = 0U,
     OPEN = 1U,
     CLOSING = 2U,
     CLOSED = 3U
   };
@@ -205,48 +243,58 @@ private:
   static void DTLSConnectThread(void *data);
   int SendPacket(unsigned char data[], size_t len, bool release);
   void SctpDtlsInput(TransportFlow *flow, const unsigned char *data, size_t len);
   static int SctpDtlsOutput(void *addr, void *buffer, size_t length, uint8_t tos, uint8_t set_df);
 #endif
   DataChannel* FindChannelByStream(uint16_t stream);
   uint16_t FindFreeStream();
   bool RequestMoreStreams(int32_t aNeeded = 16);
-  int32_t SendControlMessage(void *msg, uint32_t len, uint16_t stream);
-  int32_t SendOpenRequestMessage(const nsACString& label, const nsACString& protocol,
-                                 uint16_t stream,
-                                 bool unordered, uint16_t prPolicy, uint32_t prValue);
-  int32_t SendOpenAckMessage(uint16_t stream);
-  int32_t SendMsgInternal(DataChannel *channel, const char *data,
-                          size_t length, uint32_t ppid);
-  int32_t SendBinary(DataChannel *channel, const char *data,
-                     size_t len, uint32_t ppid_partial, uint32_t ppid_final);
-  int32_t SendMsgCommon(uint16_t stream, const nsACString &aMsg, bool isBinary);
+  uint32_t UpdateCurrentStreamIndex();
+  uint32_t GetCurrentStreamIndex();
+  int SendControlMessage(const uint8_t *data, uint32_t len, uint16_t stream);
+  int SendOpenAckMessage(uint16_t stream);
+  int SendOpenRequestMessage(const nsACString& label, const nsACString& protocol, uint16_t stream,
+                             bool unordered, uint16_t prPolicy, uint32_t prValue);
+  bool SendBufferedMessages(nsTArray<nsAutoPtr<BufferedOutgoingMsg>> &buffer);
+  int SendMsgInternal(OutgoingMsg &msg);
+  int SendMsgInternalOrBuffer(nsTArray<nsAutoPtr<BufferedOutgoingMsg>> &buffer, OutgoingMsg &msg,
+                              bool &buffered);
+  int SendDataMsgInternalOrBuffer(DataChannel &channel, const uint8_t *data, size_t len,
+                                  uint32_t ppid);
+  int SendDataMsg(DataChannel &channel, const uint8_t *data, size_t len, uint32_t ppidPartial,
+                  uint32_t ppidFinal);
+  int SendDataMsgCommon(uint16_t stream, const nsACString &aMsg, bool isBinary);
 
   void DeliverQueuedData(uint16_t stream);
 
   already_AddRefed<DataChannel> OpenFinish(already_AddRefed<DataChannel>&& aChannel);
 
   void ProcessQueuedOpens();
   void ClearResets();
   void SendOutgoingStreamReset();
   void ResetOutgoingStream(uint16_t stream);
   void HandleOpenRequestMessage(const struct rtcweb_datachannel_open_request *req,
-                                size_t length,
-                                uint16_t stream);
+                                uint32_t length, uint16_t stream);
   void HandleOpenAckMessage(const struct rtcweb_datachannel_ack *ack,
-                            size_t length, uint16_t stream);
-  void HandleUnknownMessage(uint32_t ppid, size_t length, uint16_t stream);
-  void HandleDataMessage(uint32_t ppid, const void *buffer, size_t length, uint16_t stream);
-  void HandleMessage(const void *buffer, size_t length, uint32_t ppid, uint16_t stream);
+                            uint32_t length, uint16_t stream);
+  void HandleUnknownMessage(uint32_t ppid, uint32_t length, uint16_t stream);
+  uint8_t BufferMessage(nsACString& recvBuffer, const void *data, uint32_t length, uint32_t ppid,
+                        int flags);
+  void HandleDataMessage(const void *buffer, size_t length, uint32_t ppid, uint16_t stream,
+                         int flags);
+  void HandleDCEPMessage(const void *buffer, size_t length, uint32_t ppid, uint16_t stream,
+                         int flags);
+  void HandleMessage(const void *buffer, size_t length, uint32_t ppid, uint16_t stream, int flags);
   void HandleAssociationChangeEvent(const struct sctp_assoc_change *sac);
   void HandlePeerAddressChangeEvent(const struct sctp_paddr_change *spc);
   void HandleRemoteErrorEvent(const struct sctp_remote_error *sre);
   void HandleShutdownEvent(const struct sctp_shutdown_event *sse);
   void HandleAdaptationIndication(const struct sctp_adaptation_event *sai);
+  void HandlePartialDeliveryEvent(const struct sctp_pdapi_event *spde);
   void HandleSendFailedEvent(const struct sctp_send_failed_event *ssfe);
   void HandleStreamResetEvent(const struct sctp_stream_reset_event *strrst);
   void HandleStreamChangeEvent(const struct sctp_stream_change_event *strchg);
   void HandleNotification(const union sctp_notification *notif, size_t n);
 
 #ifdef SCTP_DTLS_SUPPORTED
   bool IsSTSThread() {
     bool on = false;
@@ -255,49 +303,55 @@ private:
     }
     return on;
   }
 #endif
 
   // Exists solely for proxying release of the TransportFlow to the STS thread
   static void ReleaseTransportFlow(const RefPtr<TransportFlow>& aFlow) {}
 
+  bool mSendInterleaved;
+  bool mPpidFragmentation;
+  bool mMaxMessageSizeSet;
+  uint64_t mMaxMessageSize;
+
   // Data:
   // NOTE: while this array will auto-expand, increases in the number of
   // channels available from the stack must be negotiated!
   bool mAllocateEven;
   AutoTArray<RefPtr<DataChannel>,16> mStreams;
+  uint32_t mCurrentStream;
   nsDeque mPending; // Holds addref'ed DataChannel's -- careful!
   // holds data that's come in before a channel is open
   nsTArray<nsAutoPtr<QueuedDataMessage>> mQueuedData;
+  // holds outgoing control messages
+  nsTArray<nsAutoPtr<BufferedOutgoingMsg>> mBufferedControl; // GUARDED_BY(mConnection->mLock)
 
   // Streams pending reset
   AutoTArray<uint16_t,4> mStreamsResetting;
 
   struct socket *mMasterSocket; // accessed from STS thread
   struct socket *mSocket; // cloned from mMasterSocket on successful Connect on STS thread
   uint16_t mState; // Protected with mLock
 
 #ifdef SCTP_DTLS_SUPPORTED
   RefPtr<TransportFlow> mTransportFlow;
   nsCOMPtr<nsIEventTarget> mSTS;
 #endif
   uint16_t mLocalPort; // Accessed from connect thread
   uint16_t mRemotePort;
-  bool mUsingDtls;
 
   nsCOMPtr<nsIThread> mInternalIOThread;
+  uint8_t mPendingType;
+  nsCString mRecvBuffer;
 };
 
 #define ENSURE_DATACONNECTION \
   do { MOZ_ASSERT(mConnection); if (!mConnection) { return; } } while (0)
 
-#define ENSURE_DATACONNECTION_RET(x) \
-  do { MOZ_ASSERT(mConnection); if (!mConnection) { return (x); } } while (0)
-
 class DataChannel {
 public:
   enum {
     CONNECTING = 0U,
     OPEN = 1U,
     CLOSING = 2U,
     CLOSED = 3U,
     WAITING_TO_OPEN = 4U
@@ -314,17 +368,16 @@ public:
               nsISupports *aContext)
     : mListenerLock("netwerk::sctp::DataChannel")
     , mListener(aListener)
     , mContext(aContext)
     , mConnection(connection)
     , mLabel(label)
     , mProtocol(protocol)
     , mState(state)
-    , mReady(false)
     , mStream(stream)
     , mPrPolicy(policy)
     , mPrValue(value)
     , mFlags(flags)
     , mIsRecvBinary(false)
     , mBufferedThreshold(0) // default from spec
     , mMainThreadEventTarget(connection->GetNeckoTarget())
     {
@@ -347,62 +400,49 @@ public:
 
   // Close this DataChannel.  Can be called multiple times.  MUST be called
   // before destroying the DataChannel (state must be CLOSED or CLOSING).
   void Close();
 
   // Set the listener (especially for channels created from the other side)
   void SetListener(DataChannelListener *aListener, nsISupports *aContext);
 
-  // Send a string
-  bool SendMsg(const nsACString &aMsg)
-    {
-      ENSURE_DATACONNECTION_RET(false);
+  // Helper for send methods that converts POSIX error codes to an ErrorResult.
+  static void SendErrnoToErrorResult(int error, ErrorResult& aRv);
 
-      if (mStream != INVALID_STREAM)
-        return (mConnection->SendMsg(mStream, aMsg) >= 0);
-      else
-        return false;
-    }
+  // Send a string
+  void SendMsg(const nsACString &aMsg, ErrorResult& aRv);
 
   // Send a binary message (TypedArray)
-  bool SendBinaryMsg(const nsACString &aMsg)
-    {
-      ENSURE_DATACONNECTION_RET(false);
-
-      if (mStream != INVALID_STREAM)
-        return (mConnection->SendBinaryMsg(mStream, aMsg) >= 0);
-      else
-        return false;
-    }
+  void SendBinaryMsg(const nsACString &aMsg, ErrorResult& aRv);
 
   // Send a binary blob
-  bool SendBinaryStream(nsIInputStream *aBlob, uint32_t msgLen)
-    {
-      ENSURE_DATACONNECTION_RET(false);
-
-      if (mStream != INVALID_STREAM)
-        return (mConnection->SendBlob(mStream, aBlob) == 0);
-      else
-        return false;
-    }
+  void SendBinaryStream(nsIInputStream *aBlob, ErrorResult& aRv);
 
   uint16_t GetType() { return mPrPolicy; }
 
   bool GetOrdered() { return !(mFlags & DATA_CHANNEL_FLAGS_OUT_OF_ORDER_ALLOWED); }
 
   // Amount of data buffered to send
   uint32_t GetBufferedAmount()
   {
     if (!mConnection) {
       return 0;
     }
 
     MutexAutoLock lock(mConnection->mLock);
-    return GetBufferedAmountLocked();
+    size_t buffered = GetBufferedAmountLocked();
+
+#if (SIZE_MAX > UINT32_MAX)
+    if (buffered > UINT32_MAX) { // paranoia - >4GB buffered is very very unlikely
+      buffered = UINT32_MAX;
+    }
+#endif
+
+    return buffered;
   }
 
 
   // Trigger amount for generating BufferedAmountLow events
   uint32_t GetBufferedAmountLowThreshold();
   void SetBufferedAmountLowThreshold(uint32_t aThreshold);
 
   // Find out state
@@ -430,75 +470,74 @@ protected:
   DataChannelListener *mListener;
   nsCOMPtr<nsISupports> mContext;
 
 private:
   friend class DataChannelOnMessageAvailable;
   friend class DataChannelConnection;
 
   nsresult AddDataToBinaryMsg(const char *data, uint32_t size);
-  uint32_t GetBufferedAmountLocked() const;
+  size_t GetBufferedAmountLocked() const;
+  bool EnsureValidStream(ErrorResult& aRv);
 
   RefPtr<DataChannelConnection> mConnection;
   nsCString mLabel;
   nsCString mProtocol;
   uint16_t mState;
-  bool     mReady;
   uint16_t mStream;
   uint16_t mPrPolicy;
   uint32_t mPrValue;
   uint32_t mFlags;
   uint32_t mId;
   bool mIsRecvBinary;
   size_t mBufferedThreshold;
   nsCString mRecvBuffer;
-  nsTArray<nsAutoPtr<BufferedMsg>> mBufferedData; // GUARDED_BY(mConnection->mLock)
+  nsTArray<nsAutoPtr<BufferedOutgoingMsg>> mBufferedData; // GUARDED_BY(mConnection->mLock)
   nsTArray<nsCOMPtr<nsIRunnable>> mQueuedMessages;
   nsCOMPtr<nsIEventTarget> mMainThreadEventTarget;
 };
 
 // used to dispatch notifications of incoming data to the main thread
 // Patterned on CallOnMessageAvailable in WebSockets
 // Also used to proxy other items to MainThread
 class DataChannelOnMessageAvailable : public Runnable
 {
 public:
   enum {
     ON_CONNECTION,
     ON_DISCONNECTED,
     ON_CHANNEL_CREATED,
     ON_CHANNEL_OPEN,
     ON_CHANNEL_CLOSED,
-    ON_DATA,
+    ON_DATA_STRING,
+    ON_DATA_BINARY,
     BUFFER_LOW_THRESHOLD,
     NO_LONGER_BUFFERED,
   };  /* types */
 
   DataChannelOnMessageAvailable(
     int32_t aType,
     DataChannelConnection* aConnection,
     DataChannel* aChannel,
-    nsCString& aData, // XXX this causes inefficiency
-    int32_t aLen)
+    nsCString& aData) // XXX this causes inefficiency
     : Runnable("DataChannelOnMessageAvailable")
     , mType(aType)
     , mChannel(aChannel)
     , mConnection(aConnection)
     , mData(aData)
-    , mLen(aLen)
   {
   }
 
   DataChannelOnMessageAvailable(int32_t aType, DataChannel* aChannel)
     : Runnable("DataChannelOnMessageAvailable")
     , mType(aType)
     , mChannel(aChannel)
   {
   }
-  // XXX is it safe to leave mData/mLen uninitialized?  This should only be
+  // XXX is it safe to leave mData uninitialized?  This should only be
   // used for notifications that don't use them, but I'd like more
   // bulletproof compile-time checking.
 
   DataChannelOnMessageAvailable(int32_t aType,
                                 DataChannelConnection* aConnection,
                                 DataChannel* aChannel)
     : Runnable("DataChannelOnMessageAvailable")
     , mType(aType)
@@ -520,35 +559,35 @@ public:
   {
     MOZ_ASSERT(NS_IsMainThread());
 
     // Note: calling the listeners can indirectly cause the listeners to be
     // made available for GC (by removing event listeners), especially for
     // OnChannelClosed().  We hold a ref to the Channel and the listener
     // while calling this.
     switch (mType) {
-      case ON_DATA:
+      case ON_DATA_STRING:
+      case ON_DATA_BINARY:
       case ON_CHANNEL_OPEN:
       case ON_CHANNEL_CLOSED:
       case BUFFER_LOW_THRESHOLD:
       case NO_LONGER_BUFFERED:
         {
           MutexAutoLock lock(mChannel->mListenerLock);
           if (!mChannel->mListener) {
             DATACHANNEL_LOG(("DataChannelOnMessageAvailable (%d) with null Listener!",mType));
             return NS_OK;
           }
 
           switch (mType) {
-            case ON_DATA:
-              if (mLen < 0) {
-                mChannel->mListener->OnMessageAvailable(mChannel->mContext, mData);
-              } else {
-                mChannel->mListener->OnBinaryMessageAvailable(mChannel->mContext, mData);
-              }
+            case ON_DATA_STRING:
+              mChannel->mListener->OnMessageAvailable(mChannel->mContext, mData);
+              break;
+            case ON_DATA_BINARY:
+              mChannel->mListener->OnBinaryMessageAvailable(mChannel->mContext, mData);
               break;
             case ON_CHANNEL_OPEN:
               mChannel->mListener->OnChannelConnected(mChannel->mContext);
               break;
             case ON_CHANNEL_CLOSED:
               mChannel->mListener->OnChannelClosed(mChannel->mContext);
               break;
             case BUFFER_LOW_THRESHOLD:
@@ -582,19 +621,18 @@ public:
         break;
     }
     return NS_OK;
   }
 
 private:
   ~DataChannelOnMessageAvailable() {}
 
-  int32_t                           mType;
+  int32_t                         mType;
   // XXX should use union
   RefPtr<DataChannel>             mChannel;
   RefPtr<DataChannelConnection>   mConnection;
-  nsCString                         mData;
-  int32_t                           mLen;
+  nsCString                       mData;
 };
 
 }
 
 #endif  // NETWERK_SCTP_DATACHANNEL_DATACHANNEL_H_
--- a/netwerk/sctp/datachannel/DataChannelProtocol.h
+++ b/netwerk/sctp/datachannel/DataChannelProtocol.h
@@ -11,37 +11,46 @@
 #define SCTP_PACKED __attribute__((packed))
 #elif defined(_MSC_VER)
 #pragma pack (push, 1)
 #define SCTP_PACKED
 #else
 #error "Unsupported compiler"
 #endif
 
-#define WEBRTC_DATACHANNEL_STREAMS_DEFAULT          256
-#define WEBRTC_DATACHANNEL_PORT_DEFAULT             5000
-#define WEBRTC_DATACHANELL_MAX_MESSAGE_SIZE_DEFAULT 65536
+#define WEBRTC_DATACHANNEL_STREAMS_DEFAULT                 256
+// Do not change this value!
+#define WEBRTC_DATACHANNEL_STREAMS_OLDER_FIREFOX           256
+#define WEBRTC_DATACHANNEL_PORT_DEFAULT                    5000
+// TODO: Bug 1381146, change once we resolve the nsCString limitation
+#define WEBRTC_DATACHANNEL_MAX_MESSAGE_SIZE_LOCAL          1073741823
+#define WEBRTC_DATACHANNEL_MAX_MESSAGE_SIZE_REMOTE_DEFAULT 65535
+// TODO: Bug 1382779, once resolved, can be increased to min(Uint8ArrayMaxSize, UINT32_MAX)
+// TODO: Bug 1381146, once resolved, can be increased to whatever we support then (hopefully
+//       SIZE_MAX) or be removed
+#define WEBRTC_DATACHANNEL_MAX_MESSAGE_SIZE_REMOTE         2147483637
 
-#define DATA_CHANNEL_PPID_CONTROL        50
-#define DATA_CHANNEL_PPID_BINARY         52
-#define DATA_CHANNEL_PPID_BINARY_LAST    53
-#define DATA_CHANNEL_PPID_DOMSTRING      54
-#define DATA_CHANNEL_PPID_DOMSTRING_LAST 51
+#define DATA_CHANNEL_PPID_CONTROL           50
+#define DATA_CHANNEL_PPID_BINARY_PARTIAL    52
+#define DATA_CHANNEL_PPID_BINARY            53
+#define DATA_CHANNEL_PPID_DOMSTRING_PARTIAL 54
+#define DATA_CHANNEL_PPID_DOMSTRING         51
 
 #define DATA_CHANNEL_MAX_BINARY_FRAGMENT 0x4000
 
-#define DATA_CHANNEL_FLAGS_SEND_REQ             0x00000001
-#define DATA_CHANNEL_FLAGS_SEND_RSP             0x00000002
-#define DATA_CHANNEL_FLAGS_SEND_ACK             0x00000004
-#define DATA_CHANNEL_FLAGS_OUT_OF_ORDER_ALLOWED 0x00000008
-#define DATA_CHANNEL_FLAGS_SEND_DATA            0x00000010
-#define DATA_CHANNEL_FLAGS_FINISH_OPEN          0x00000020
-#define DATA_CHANNEL_FLAGS_FINISH_RSP           0x00000040
-#define DATA_CHANNEL_FLAGS_EXTERNAL_NEGOTIATED  0x00000080
-#define DATA_CHANNEL_FLAGS_WAITING_ACK          0x00000100
+#define DATA_CHANNEL_FLAGS_READY                0x00000001
+#define DATA_CHANNEL_FLAGS_OUT_OF_ORDER_ALLOWED 0x00000002
+#define DATA_CHANNEL_FLAGS_FINISH_OPEN          0x00000004
+#define DATA_CHANNEL_FLAGS_EXTERNAL_NEGOTIATED  0x00000008
+#define DATA_CHANNEL_FLAGS_WAITING_ACK          0x00000010
+#define DATA_CHANNEL_FLAGS_CLOSING_TOO_LARGE    0x00000020
+
+#define DATA_CHANNEL_BUFFER_MESSAGE_FLAGS_TOO_LARGE 0x01
+#define DATA_CHANNEL_BUFFER_MESSAGE_FLAGS_BUFFERED  0x02
+#define DATA_CHANNEL_BUFFER_MESSAGE_FLAGS_COMPLETE  0x04
 
 #define INVALID_STREAM (0xFFFF)
 // max is 0xFFFF: Streams 0 to 0xFFFE = 0xFFFF streams
 #define MAX_NUM_STREAMS (2048)
 
 struct rtcweb_datachannel_open_request {
   uint8_t  msg_type; // DATA_CHANNEL_OPEN
   uint8_t  channel_type;