Bug 1407940 - P2. Only ever access mTaskQueue in a thread-safe fashion. r?gerald,r?alwu draft
authorJean-Yves Avenard <jyavenard@mozilla.com>
Thu, 12 Oct 2017 13:30:47 +0200
changeset 679941 d926331edbdbd091059e61381b198cf341fd6c1c
parent 679940 9c73216638e2de9fa6736d9a019e88ec204ff0cc
child 679942 49da5834393d693195e9e01564ebabfba2460fba
child 680143 080371b74d5a77b30665ca905cde94245a92a5ab
child 681015 0698370831a17e3f453e220722b88fa2f4740fad
push id84351
push userbmo:jyavenard@mozilla.com
push dateFri, 13 Oct 2017 07:58:00 +0000
reviewersgerald, alwu
bugs1407940
milestone58.0a1
Bug 1407940 - P2. Only ever access mTaskQueue in a thread-safe fashion. r?gerald,r?alwu mTaskQueue is only read on the main thread, but read and written on the demuxer's taskqueue. We need to ensure that accesses are synchronised. MozReview-Commit-ID: Gbc15iYgZOe
dom/media/mediasource/TrackBuffersManager.cpp
dom/media/mediasource/TrackBuffersManager.h
--- a/dom/media/mediasource/TrackBuffersManager.cpp
+++ b/dom/media/mediasource/TrackBuffersManager.cpp
@@ -91,28 +91,31 @@ TrackBuffersManager::TrackBuffersManager
   : mInputBuffer(new MediaByteBuffer)
   , mBufferFull(false)
   , mFirstInitializationSegmentReceived(false)
   , mNewMediaSegmentStarted(false)
   , mActiveTrack(false)
   , mType(aType)
   , mParser(ContainerParser::CreateForMIMEType(aType))
   , mProcessedInput(0)
-  , mTaskQueue(aParentDecoder->GetDemuxer()->GetTaskQueue())
-  , mParentDecoder(
-      new nsMainThreadPtrHolder<MediaSourceDecoder>(
-        "TrackBuffersManager::mParentDecoder", aParentDecoder, false /* strict */))
+  , mParentDecoder(new nsMainThreadPtrHolder<MediaSourceDecoder>(
+      "TrackBuffersManager::mParentDecoder",
+      aParentDecoder,
+      false /* strict */))
   , mAbstractMainThread(aParentDecoder->AbstractMainThread())
   , mEnded(false)
-  , mVideoEvictionThreshold(Preferences::GetUint("media.mediasource.eviction_threshold.video",
-                                                 100 * 1024 * 1024))
-  , mAudioEvictionThreshold(Preferences::GetUint("media.mediasource.eviction_threshold.audio",
-                                                 20 * 1024 * 1024))
+  , mVideoEvictionThreshold(
+      Preferences::GetUint("media.mediasource.eviction_threshold.video",
+                           100 * 1024 * 1024))
+  , mAudioEvictionThreshold(
+      Preferences::GetUint("media.mediasource.eviction_threshold.audio",
+                           20 * 1024 * 1024))
   , mEvictionState(EvictionState::NO_EVICTION_NEEDED)
   , mMutex("TrackBuffersManager")
+  , mTaskQueue(aParentDecoder->GetDemuxer()->GetTaskQueue())
 {
   MOZ_ASSERT(NS_IsMainThread(), "Must be instanciated on the main thread");
 }
 
 TrackBuffersManager::~TrackBuffersManager()
 {
   ShutdownDemuxers();
 }
@@ -122,18 +125,22 @@ TrackBuffersManager::AppendData(already_
                                 const SourceBufferAttributes& aAttributes)
 {
   MOZ_ASSERT(NS_IsMainThread());
   RefPtr<MediaByteBuffer> data(aData);
   MSE_DEBUG("Appending %zu bytes", data->Length());
 
   mEnded = false;
 
-  return InvokeAsync(GetTaskQueue(), this, __func__,
-    &TrackBuffersManager::DoAppendData, data.forget(), aAttributes);
+  return InvokeAsync(static_cast<AbstractThread*>(GetTaskQueueSafe().get()),
+                     this,
+                     __func__,
+                     &TrackBuffersManager::DoAppendData,
+                     data.forget(),
+                     aAttributes);
 }
 
 RefPtr<TrackBuffersManager::AppendPromise>
 TrackBuffersManager::DoAppendData(already_AddRefed<MediaByteBuffer> aData,
                                   const SourceBufferAttributes& aAttributes)
 {
   RefPtr<AppendBufferTask> task = new AppendBufferTask(Move(aData), aAttributes);
   RefPtr<AppendPromise> p = task->mPromise.Ensure(__func__);
@@ -143,42 +150,50 @@ TrackBuffersManager::DoAppendData(alread
 }
 
 void
 TrackBuffersManager::QueueTask(SourceBufferTask* aTask)
 {
   // The source buffer is a wrapped native, it would be unlinked twice and so
   // the TrackBuffersManager::Detach() would also be called twice. Since the
   // detach task has been done before, we could ignore this task.
-  if (!GetTaskQueue()) {
+  RefPtr<AutoTaskQueue> taskQueue = GetTaskQueueSafe();
+  if (!taskQueue) {
     MOZ_ASSERT(aTask->GetType() == SourceBufferTask::Type::Detach,
                "only detach task could happen here!");
     MSE_DEBUG("Could not queue the task '%s' without task queue",
               aTask->GetTypeName());
     return;
   }
 
-  if (!OnTaskQueue()) {
-    GetTaskQueue()->Dispatch(NewRunnableMethod<RefPtr<SourceBufferTask>>(
+  if (!taskQueue->IsCurrentThreadIn()) {
+    taskQueue->Dispatch(NewRunnableMethod<RefPtr<SourceBufferTask>>(
       "TrackBuffersManager::QueueTask",
       this,
       &TrackBuffersManager::QueueTask,
       aTask));
     return;
   }
-  MOZ_ASSERT(OnTaskQueue());
   mQueue.Push(aTask);
   ProcessTasks();
 }
 
 void
 TrackBuffersManager::ProcessTasks()
 {
+  // ProcessTask is always called OnTaskQueue, however it is possible that it is
+  // called once again after a first Detach task has run, in which case
+  // mTaskQueue would be null.
+  // This can happen under two conditions:
+  // 1- Two Detach tasks were queued in a row due to a double cycle collection.
+  // 2- An call to ProcessTasks() had queued another run of ProcessTasks while
+  //    a Detach task is pending.
+  // We handle these two cases by aborting early.
   // A second Detach task was queued, prior the first one running, ignore it.
-  if (!GetTaskQueue()) {
+  if (!mTaskQueue) {
     RefPtr<SourceBufferTask> task = mQueue.Pop();
     if (!task) {
       return;
     }
     MOZ_RELEASE_ASSERT(task->GetType() == SourceBufferTask::Type::Detach,
                        "only detach task could happen here!");
     MSE_DEBUG("Could not process the task '%s' after detached",
               task->GetTypeName());
@@ -229,27 +244,27 @@ TrackBuffersManager::ProcessTasks()
     case Type::Abort:
       // not handled yet, and probably never.
       break;
     case Type::Reset:
       CompleteResetParserState();
       break;
     case Type::Detach:
       mCurrentInputBuffer = nullptr;
-      mTaskQueue = nullptr;
       MOZ_DIAGNOSTIC_ASSERT(mQueue.Length() == 0,
                             "Detach task must be the last");
       mVideoTracks.Reset();
       mAudioTracks.Reset();
       ShutdownDemuxers();
+      ResetTaskQueue();
       return;
     default:
       NS_WARNING("Invalid Task");
   }
-  GetTaskQueue()->Dispatch(
+  TaskQueueFromTaskQueue()->Dispatch(
     NewRunnableMethod("TrackBuffersManager::ProcessTasks",
                       this,
                       &TrackBuffersManager::ProcessTasks));
 }
 
 // The MSE spec requires that we abort the current SegmentParserLoop
 // which is then followed by a call to ResetParserState.
 // However due to our asynchronous design this causes inherent difficulties.
@@ -290,17 +305,19 @@ TrackBuffersManager::ResetParserState(So
 RefPtr<TrackBuffersManager::RangeRemovalPromise>
 TrackBuffersManager::RangeRemoval(TimeUnit aStart, TimeUnit aEnd)
 {
   MOZ_ASSERT(NS_IsMainThread());
   MSE_DEBUG("From %.2f to %.2f", aStart.ToSeconds(), aEnd.ToSeconds());
 
   mEnded = false;
 
-  return InvokeAsync(GetTaskQueue(), this, __func__,
+  return InvokeAsync(static_cast<AbstractThread*>(GetTaskQueueSafe().get()),
+                     this,
+                     __func__,
                      &TrackBuffersManager::CodedFrameRemovalWithPromise,
                      TimeInterval(aStart, aEnd));
 }
 
 TrackBuffersManager::EvictDataResult
 TrackBuffersManager::EvictData(const TimeUnit& aPlaybackTime, int64_t aSize)
 {
   MOZ_ASSERT(NS_IsMainThread());
@@ -770,26 +787,27 @@ TrackBuffersManager::SegmentParserLoop()
           NeedMoreData();
           return;
         }
       }
 
       // 3. If the input buffer contains one or more complete coded frames, then run the coded frame processing algorithm.
       RefPtr<TrackBuffersManager> self = this;
       CodedFrameProcessing()
-        ->Then(GetTaskQueue(), __func__,
-               [self] (bool aNeedMoreData) {
+        ->Then(TaskQueueFromTaskQueue(),
+               __func__,
+               [self](bool aNeedMoreData) {
                  self->mProcessingRequest.Complete();
                  if (aNeedMoreData) {
                    self->NeedMoreData();
                  } else {
                    self->ScheduleSegmentParserLoop();
                  }
                },
-               [self] (const MediaResult& aRejectValue) {
+               [self](const MediaResult& aRejectValue) {
                  self->mProcessingRequest.Complete();
                  self->RejectAppend(aRejectValue, __func__);
                })
         ->Track(mProcessingRequest);
       return;
     }
   }
 }
@@ -820,17 +838,18 @@ TrackBuffersManager::RejectAppend(const 
   mSourceBufferAttributes = nullptr;
   mCurrentTask = nullptr;
   ProcessTasks();
 }
 
 void
 TrackBuffersManager::ScheduleSegmentParserLoop()
 {
-  GetTaskQueue()->Dispatch(
+  MOZ_ASSERT(OnTaskQueue());
+  TaskQueueFromTaskQueue()->Dispatch(
     NewRunnableMethod("TrackBuffersManager::SegmentParserLoop",
                       this,
                       &TrackBuffersManager::SegmentParserLoop));
 }
 
 void
 TrackBuffersManager::ShutdownDemuxers()
 {
@@ -869,30 +888,32 @@ TrackBuffersManager::CreateDemuxerforMIM
 #endif
   NS_WARNING("Not supported (yet)");
 }
 
 // We reset the demuxer by creating a new one and initializing it.
 void
 TrackBuffersManager::ResetDemuxingState()
 {
+  MOZ_ASSERT(OnTaskQueue());
   MOZ_ASSERT(mParser && mParser->HasInitData());
   RecreateParser(true);
   mCurrentInputBuffer = new SourceBufferResource();
   // The demuxer isn't initialized yet ; we don't want to notify it
   // that data has been appended yet ; so we simply append the init segment
   // to the resource.
   mCurrentInputBuffer->AppendData(mParser->InitData());
   CreateDemuxerforMIMEType();
   if (!mInputDemuxer) {
     RejectAppend(NS_ERROR_FAILURE, __func__);
     return;
   }
   mInputDemuxer->Init()
-    ->Then(GetTaskQueue(), __func__,
+    ->Then(TaskQueueFromTaskQueue(),
+           __func__,
            this,
            &TrackBuffersManager::OnDemuxerResetDone,
            &TrackBuffersManager::OnDemuxerInitFailed)
     ->Track(mDemuxerInitRequest);
 }
 
 void
 TrackBuffersManager::OnDemuxerResetDone(const MediaResult& aResult)
@@ -953,16 +974,17 @@ TrackBuffersManager::AppendDataToCurrent
   MOZ_ASSERT(mCurrentInputBuffer);
   mCurrentInputBuffer->AppendData(aData);
   mInputDemuxer->NotifyDataArrived();
 }
 
 void
 TrackBuffersManager::InitializationSegmentReceived()
 {
+  MOZ_ASSERT(OnTaskQueue());
   MOZ_ASSERT(mParser->HasCompleteInitData());
 
   int64_t endInit = mParser->InitSegmentRange().mEnd;
   if (mInputBuffer->Length() > mProcessedInput ||
       int64_t(mProcessedInput - mInputBuffer->Length()) > endInit) {
     // Something is not quite right with the data appended. Refuse it.
     RejectAppend(MediaResult(NS_ERROR_FAILURE,
                              "Invalid state following initialization segment"),
@@ -984,17 +1006,18 @@ TrackBuffersManager::InitializationSegme
   }
   CreateDemuxerforMIMEType();
   if (!mInputDemuxer) {
     NS_WARNING("TODO type not supported");
     RejectAppend(NS_ERROR_DOM_NOT_SUPPORTED_ERR, __func__);
     return;
   }
   mInputDemuxer->Init()
-    ->Then(GetTaskQueue(), __func__,
+    ->Then(TaskQueueFromTaskQueue(),
+           __func__,
            this,
            &TrackBuffersManager::OnDemuxerInitDone,
            &TrackBuffersManager::OnDemuxerInitFailed)
     ->Track(mDemuxerInitRequest);
 }
 
 void
 TrackBuffersManager::OnDemuxerInitDone(const MediaResult& aResult)
@@ -1287,17 +1310,19 @@ void
 TrackBuffersManager::DoDemuxVideo()
 {
   MOZ_ASSERT(OnTaskQueue());
   if (!HasVideo()) {
     DoDemuxAudio();
     return;
   }
   mVideoTracks.mDemuxer->GetSamples(-1)
-    ->Then(GetTaskQueue(), __func__, this,
+    ->Then(TaskQueueFromTaskQueue(),
+           __func__,
+           this,
            &TrackBuffersManager::OnVideoDemuxCompleted,
            &TrackBuffersManager::OnVideoDemuxFailed)
     ->Track(mVideoTracks.mDemuxRequest);
 }
 
 void
 TrackBuffersManager::MaybeDispatchEncryptedEvent(
   const nsTArray<RefPtr<MediaRawData>>& aSamples)
@@ -1329,17 +1354,19 @@ void
 TrackBuffersManager::DoDemuxAudio()
 {
   MOZ_ASSERT(OnTaskQueue());
   if (!HasAudio()) {
     CompleteCodedFrameProcessing();
     return;
   }
   mAudioTracks.mDemuxer->GetSamples(-1)
-    ->Then(GetTaskQueue(), __func__, this,
+    ->Then(TaskQueueFromTaskQueue(),
+           __func__,
+           this,
            &TrackBuffersManager::OnAudioDemuxCompleted,
            &TrackBuffersManager::OnAudioDemuxFailed)
     ->Track(mAudioTracks.mDemuxRequest);
 }
 
 void
 TrackBuffersManager::OnAudioDemuxCompleted(RefPtr<MediaTrackDemuxer::SamplesHolder> aSamples)
 {
--- a/dom/media/mediasource/TrackBuffersManager.h
+++ b/dom/media/mediasource/TrackBuffersManager.h
@@ -5,16 +5,17 @@
  * file, You can obtain one at http://mozilla.org/MPL/2.0/. */
 
 #ifndef MOZILLA_TRACKBUFFERSMANAGER_H_
 #define MOZILLA_TRACKBUFFERSMANAGER_H_
 
 #include "mozilla/Atomics.h"
 #include "mozilla/Maybe.h"
 #include "mozilla/Mutex.h"
+#include "mozilla/NotNull.h"
 #include "AutoTaskQueue.h"
 
 #include "MediaContainerType.h"
 #include "MediaData.h"
 #include "MediaDataDemuxer.h"
 #include "MediaResult.h"
 #include "MediaSourceDecoder.h"
 #include "SourceBufferTask.h"
@@ -450,26 +451,39 @@ private:
       default:
         return mAudioTracks;
     }
   }
   TrackData mVideoTracks;
   TrackData mAudioTracks;
 
   // TaskQueue methods and objects.
-  AbstractThread* GetTaskQueue() const
+  RefPtr<AutoTaskQueue> GetTaskQueueSafe() const
   {
+    MutexAutoLock mut(mMutex);
     return mTaskQueue;
   }
+  NotNull<AbstractThread*> TaskQueueFromTaskQueue() const
+  {
+#ifdef DEBUG
+    RefPtr<AutoTaskQueue> taskQueue = GetTaskQueueSafe();
+    MOZ_ASSERT(taskQueue && taskQueue->IsCurrentThreadIn());
+#endif
+    return WrapNotNull(mTaskQueue.get());
+  }
   bool OnTaskQueue() const
   {
-    MOZ_RELEASE_ASSERT(GetTaskQueue());
-    return GetTaskQueue()->IsCurrentThreadIn();
+    auto taskQueue = TaskQueueFromTaskQueue();
+    return taskQueue->IsCurrentThreadIn();
   }
-  RefPtr<AutoTaskQueue> mTaskQueue;
+  void ResetTaskQueue()
+  {
+    MutexAutoLock mut(mMutex);
+    mTaskQueue = nullptr;
+  }
 
   // SourceBuffer Queues and running context.
   SourceBufferTaskQueue mQueue;
   void QueueTask(SourceBufferTask* aTask);
   void ProcessTasks();
   // Set if the TrackBuffersManager is currently processing a task.
   // At this stage, this task is always a AppendBufferTask.
   RefPtr<SourceBufferTask> mCurrentTask;
@@ -502,16 +516,20 @@ private:
     NO_EVICTION_NEEDED,
     EVICTION_NEEDED,
     EVICTION_COMPLETED,
   };
   Atomic<EvictionState> mEvictionState;
 
   // Monitor to protect following objects accessed across multiple threads.
   mutable Mutex mMutex;
+  // mTaskQueue is only ever written after construction on the task queue.
+  // As such, it can be accessed while on task queue without the need for the
+  // mutex.
+  RefPtr<AutoTaskQueue> mTaskQueue;
   // Stable audio and video track time ranges.
   media::TimeIntervals mVideoBufferedRanges;
   media::TimeIntervals mAudioBufferedRanges;
   // MediaInfo of the first init segment read.
   MediaInfo mInfo;
 };
 
 } // namespace mozilla