Bug 1404997 - P21. Make MediaPipelineReceiveAudio listener asynchronous. r?pehrsons draft
authorJean-Yves Avenard <jyavenard@mozilla.com>
Mon, 11 Dec 2017 00:16:24 +0100
changeset 712541 6f436d98eae9e7eba1aaf77d1a3ee25b8d576f0e
parent 712540 cfe1040e4ce4ac6076bf1f0be80729e50794cc8f
child 712542 4587bcde88c0d089c9168ccabc9e0e8c89ed704d
push id93357
push userbmo:jyavenard@mozilla.com
push dateSun, 17 Dec 2017 09:29:04 +0000
reviewerspehrsons
bugs1404997, 1424653
milestone59.0a1
Bug 1404997 - P21. Make MediaPipelineReceiveAudio listener asynchronous. r?pehrsons We keep the synchronous version that will be used in bug 1424653 MozReview-Commit-ID: JTGaRYm20ca
media/webrtc/signaling/src/mediapipeline/MediaPipeline.cpp
--- a/media/webrtc/signaling/src/mediapipeline/MediaPipeline.cpp
+++ b/media/webrtc/signaling/src/mediapipeline/MediaPipeline.cpp
@@ -2,67 +2,63 @@
 /* This Source Code Form is subject to the terms of the Mozilla Public
  * License, v. 2.0. If a copy of the MPL was not distributed with this file,
  * You can obtain one at http://mozilla.org/MPL/2.0/. */
 
 // Original author: ekr@rtfm.com
 
 #include "MediaPipeline.h"
 
-#include "MediaStreamGraphImpl.h"
-
 #include <inttypes.h>
 #include <math.h>
 
-#include "nspr.h"
-#include "srtp.h"
-
-#include "VideoSegment.h"
+#include "AudioSegment.h"
+#include "AutoTaskQueue.h"
+#include "CSFLog.h"
+#include "DOMMediaStream.h"
+#include "ImageContainer.h"
+#include "ImageTypes.h"
 #include "Layers.h"
 #include "LayersLogging.h"
-#include "ImageTypes.h"
-#include "ImageContainer.h"
+#include "MediaEngine.h"
+#include "MediaPipelineFilter.h"
+#include "MediaSegment.h"
+#include "MediaStreamGraphImpl.h"
+#include "MediaStreamListener.h"
 #include "MediaStreamTrack.h"
-#include "MediaStreamListener.h"
 #include "MediaStreamVideoSink.h"
-#include "VideoUtils.h"
+#include "RtpLogger.h"
+#include "VideoSegment.h"
 #include "VideoStreamTrack.h"
-#include "MediaEngine.h"
-
+#include "VideoUtils.h"
+#include "databuffer.h"
+#include "libyuv/convert.h"
+#include "mozilla/PeerIdentity.h"
+#include "mozilla/Preferences.h"
+#include "mozilla/SharedThreadPool.h"
+#include "mozilla/Sprintf.h"
+#include "mozilla/UniquePtr.h"
+#include "mozilla/UniquePtrExtensions.h"
+#include "mozilla/dom/RTCStatsReportBinding.h"
+#include "mozilla/gfx/Point.h"
+#include "mozilla/gfx/Types.h"
 #include "nsError.h"
-#include "AudioSegment.h"
-#include "AutoTaskQueue.h"
-#include "MediaSegment.h"
-#include "MediaPipelineFilter.h"
-#include "RtpLogger.h"
-#include "databuffer.h"
+#include "nsThreadUtils.h"
+#include "nspr.h"
+#include "runnable_utils.h"
+#include "srtp.h"
 #include "transportflow.h"
 #include "transportlayer.h"
 #include "transportlayerdtls.h"
 #include "transportlayerice.h"
-#include "runnable_utils.h"
-#include "libyuv/convert.h"
-#include "mozilla/dom/RTCStatsReportBinding.h"
-#include "mozilla/SharedThreadPool.h"
-#include "mozilla/PeerIdentity.h"
-#include "mozilla/Preferences.h"
-#include "mozilla/gfx/Point.h"
-#include "mozilla/gfx/Types.h"
-#include "mozilla/UniquePtr.h"
-#include "mozilla/UniquePtrExtensions.h"
-#include "mozilla/Sprintf.h"
 
+#include "webrtc/base/bind.h"
 #include "webrtc/common_types.h"
-#include "webrtc/common_video/libyuv/include/webrtc_libyuv.h"
 #include "webrtc/common_video/include/video_frame_buffer.h"
-#include "webrtc/base/bind.h"
-
-#include "nsThreadUtils.h"
-
-#include "CSFLog.h"
+#include "webrtc/common_video/libyuv/include/webrtc_libyuv.h"
 
 // Max size given stereo is 480*2*2 = 1920 (10ms of 16-bits stereo audio at
 // 48KHz)
 #define AUDIO_SAMPLE_BUFFER_MAX_BYTES (480 * 2 * 2)
 static_assert((WEBRTC_MAX_SAMPLE_RATE / 100) * sizeof(uint16_t) * 2
                <= AUDIO_SAMPLE_BUFFER_MAX_BYTES,
                "AUDIO_SAMPLE_BUFFER_MAX_BYTES is not large enough");
 
@@ -465,17 +461,17 @@ protected:
                         buffer_size.value(),
                         size.width,
                         size.height,
                         mozilla::kVideoI420,
                         0);
   }
 
   Atomic<int32_t, Relaxed> mLength;
-  RefPtr<AutoTaskQueue> mTaskQueue;
+  const RefPtr<AutoTaskQueue> mTaskQueue;
 
   // Written and read from the queueing thread (normally MSG).
   int32_t mLastImage;           // serial number of last Image
   TimeStamp mDisabledFrameSent; // The time we sent the last disabled frame.
 #ifdef DEBUG
   uint32_t mThrottleCount;
   uint32_t mThrottleRecord;
 #endif
@@ -2176,76 +2172,98 @@ MediaPipelineReceive::~MediaPipelineRece
 class MediaPipelineReceiveAudio::PipelineListener
   : public GenericReceiveListener
 {
 public:
   PipelineListener(dom::MediaStreamTrack* aTrack,
                    const RefPtr<MediaSessionConduit>& aConduit)
     : GenericReceiveListener(aTrack)
     , mConduit(aConduit)
+    , mSource(mTrack->GetInputStream()->AsSourceStream())
+    , mTrackId(mTrack->GetInputTrackId())
+    , mRate(mSource ? mSource->GraphRate() : 0)
+    , mTaskQueue(new AutoTaskQueue(
+        SharedThreadPool::Get(NS_LITERAL_CSTRING("PipelineAudioListener"))))
     , mLastLog(0)
   {
+    MOZ_ASSERT(mSource);
+  }
+
+  // Implement MediaStreamListener
+  void NotifyPull(MediaStreamGraph* aGraph,
+                  StreamTime aDesiredTime) override
+  {
+    if (!mSource) {
+      CSFLogError(LOGTAG, "NotifyPull() called from a non-SourceMediaStream");
+      return;
+    }
+    NotifyPullImpl(aDesiredTime);
   }
 
+  RefPtr<SourceMediaStream::NotifyPullPromise> AsyncNotifyPull(
+    MediaStreamGraph* aGraph,
+    StreamTime aDesiredTime) override
+  {
+    if (!mSource) {
+      CSFLogError(LOGTAG, "NotifyPull() called from a non-SourceMediaStream");
+      return SourceMediaStream::NotifyPullPromise::CreateAndReject(true,
+                                                                   __func__);
+    }
+    RefPtr<PipelineListener> self = this;
+    return InvokeAsync(mTaskQueue, __func__, [self, aDesiredTime]() {
+      self->NotifyPullImpl(aDesiredTime);
+      return SourceMediaStream::NotifyPullPromise::CreateAndResolve(true,
+                                                                    __func__);
+    });
+  }
+
+private:
   ~PipelineListener()
   {
     if (!NS_IsMainThread()) {
       // release conduit on mainthread.  Must use forget()!
       nsresult rv =
         NS_DispatchToMainThread(new ConduitDeleteEvent(mConduit.forget()));
-      MOZ_ASSERT(!NS_FAILED(rv), "Could not dispatch conduit shutdown to main");
-      if (NS_FAILED(rv)) {
-        MOZ_CRASH();
-      }
+      MOZ_DIAGNOSTIC_ASSERT(NS_SUCCEEDED(rv));
     } else {
       mConduit = nullptr;
     }
   }
 
-  // Implement MediaStreamListener
-  void NotifyPull(MediaStreamGraph* aGraph, StreamTime aDesiredTime) override
+  void NotifyPullImpl(StreamTime aDesiredTime)
   {
-    RefPtr<SourceMediaStream> source =
-      mTrack->GetInputStream()->AsSourceStream();
-    MOZ_ASSERT(source);
-    if (!source) {
-      CSFLogError(LOGTAG, "NotifyPull() called from a non-SourceMediaStream");
-      return;
-    }
-
-    const TrackRate rate = aGraph->GraphRate();
-    uint32_t samplesPer10ms = rate / 100;
+    uint32_t samplesPer10ms = mRate / 100;
     // Determine how many frames we need.
     // As we get frames from conduit_ at the same rate as the graph's rate,
     // the number of frames needed straightfully determined.
     TrackTicks framesNeeded = aDesiredTime - mPlayedTicks;
 
     while (framesNeeded >= 0) {
       int16_t scratchBuffer[AUDIO_SAMPLE_BUFFER_MAX_BYTES / sizeof(int16_t)];
 
       int samplesLength;
 
       // This fetches 10ms of data, either mono or stereo
       MediaConduitErrorCode err =
         static_cast<AudioSessionConduit*>(mConduit.get())
           ->GetAudioFrame(scratchBuffer,
-                          rate,
+                          mRate,
                           0, // TODO(ekr@rtfm.com): better estimate of "capture"
                              // (really playout) delay
                           samplesLength);
 
       if (err != kMediaConduitNoError) {
         // Insert silence on conduit/GIPS failure (extremely unlikely)
         CSFLogError(LOGTAG,
                     "Audio conduit failed (%d) to return data @ %" PRId64
                     " (desired %" PRId64 " -> %f)",
                     err,
                     mPlayedTicks,
                     aDesiredTime,
-                    source->StreamTimeToSeconds(aDesiredTime));
+                    mSource->StreamTimeToSeconds(aDesiredTime));
         // if this is not enough we'll loop and provide more
         samplesLength = samplesPer10ms;
         PodArrayZero(scratchBuffer);
       }
 
       MOZ_ASSERT(samplesLength * sizeof(uint16_t) <=
                  AUDIO_SAMPLE_BUFFER_MAX_BYTES);
 
@@ -2276,44 +2294,47 @@ public:
         scratchBuffer, frames, channelCount, channels.Elements());
 
       outputChannels.AppendElements(channels);
 
       segment.AppendFrames(
         samples.forget(), outputChannels, frames, mPrincipalHandle);
 
       // Handle track not actually added yet or removed/finished
-      if (source->AppendToTrack(mTrack->GetInputTrackId(), &segment)) {
+      if (mSource->AppendToTrack(mTrackId, &segment)) {
         framesNeeded -= frames;
         mPlayedTicks += frames;
         if (MOZ_LOG_TEST(AudioLogModule(), LogLevel::Debug)) {
-          if (mPlayedTicks > mLastLog + rate) { // ~ 1 second
-            MOZ_LOG(
-              AudioLogModule(),
-              LogLevel::Debug,
-              ("%p: Inserting %zu samples into track %d, total = %" PRIu64,
-               (void*)this,
-               frames,
-               mTrack->GetInputTrackId(),
-               mPlayedTicks));
+          if (mPlayedTicks > mLastLog + mRate) {
+            MOZ_LOG(AudioLogModule(),
+                    LogLevel::Debug,
+                    ("%p: Inserting samples into track %d, total = "
+                     "%" PRIu64,
+                     (void*)this,
+                     mTrackId,
+                     mPlayedTicks));
             mLastLog = mPlayedTicks;
           }
         }
       } else {
         CSFLogError(LOGTAG, "AppendToTrack failed");
         // we can't un-read the data, but that's ok since we don't want to
         // buffer - but don't i-loop!
-        return;
+        break;
       }
     }
   }
 
-private:
   RefPtr<MediaSessionConduit> mConduit;
-  TrackTicks mLastLog; // mPlayedTicks when we last logged
+  const RefPtr<SourceMediaStream> mSource;
+  const TrackID mTrackId;
+  const TrackRate mRate;
+  const RefPtr<AutoTaskQueue> mTaskQueue;
+  // Graph's current sampling rate
+  TrackTicks mLastLog = 0; // mPlayedTicks when we last logged
 };
 
 MediaPipelineReceiveAudio::MediaPipelineReceiveAudio(
   const std::string& aPc,
   nsCOMPtr<nsIEventTarget> aMainThread,
   nsCOMPtr<nsIEventTarget> aStsThread,
   RefPtr<AudioSessionConduit> aConduit,
   dom::MediaStreamTrack* aTrack)