Bug 1208371 - Forward declare MediaStreamGraph classes in MediaPipeline.h. r?bwc draft
authorAndreas Pehrson <pehrsons@gmail.com>
Tue, 26 Jan 2016 16:19:08 +0800
changeset 347668 5f0597d0bfa673223c8e832198229781d3a76e6d
parent 347667 7d5bf0c5a57665046991d775379954622553295b
child 347669 4d68cf3d9a8e7e255810b12ea6bd8c54eec232dc
push id14642
push userpehrsons@gmail.com
push dateTue, 05 Apr 2016 16:45:34 +0000
reviewersbwc
bugs1208371
milestone47.0a1
Bug 1208371 - Forward declare MediaStreamGraph classes in MediaPipeline.h. r?bwc MozReview-Commit-ID: DsDHelzMsz0
media/webrtc/signaling/src/mediapipeline/MediaPipeline.cpp
media/webrtc/signaling/src/mediapipeline/MediaPipeline.h
media/webrtc/signaling/src/peerconnection/MediaPipelineFactory.cpp
media/webrtc/signaling/src/peerconnection/PeerConnectionMedia.cpp
--- a/media/webrtc/signaling/src/mediapipeline/MediaPipeline.cpp
+++ b/media/webrtc/signaling/src/mediapipeline/MediaPipeline.cpp
@@ -17,27 +17,29 @@
 #include "srtp.h"
 
 #if !defined(MOZILLA_EXTERNAL_LINKAGE)
 #include "VideoSegment.h"
 #include "Layers.h"
 #include "LayersLogging.h"
 #include "ImageTypes.h"
 #include "ImageContainer.h"
+#include "DOMMediaStream.h"
 #include "MediaStreamTrack.h"
 #include "VideoUtils.h"
 #ifdef WEBRTC_GONK
 #include "GrallocImages.h"
 #include "mozilla/layers/GrallocTextureClient.h"
 #endif
 #endif
 
 #include "nsError.h"
 #include "AudioSegment.h"
 #include "MediaSegment.h"
+#include "MediaPipelineFilter.h"
 #include "databuffer.h"
 #include "transportflow.h"
 #include "transportlayer.h"
 #include "transportlayerdtls.h"
 #include "transportlayerice.h"
 #include "runnable_utils.h"
 #include "libyuv/convert.h"
 #if !defined(MOZILLA_EXTERNAL_LINKAGE)
@@ -65,16 +67,54 @@ using namespace mozilla::layers;
 
 // Logging context
 MOZ_MTLOG_MODULE("mediapipeline")
 
 namespace mozilla {
 
 static char kDTLSExporterLabel[] = "EXTRACTOR-dtls_srtp";
 
+MediaPipeline::MediaPipeline(const std::string& pc,
+                             Direction direction,
+                             nsCOMPtr<nsIEventTarget> main_thread,
+                             nsCOMPtr<nsIEventTarget> sts_thread,
+                             const std::string& track_id,
+                             int level,
+                             RefPtr<MediaSessionConduit> conduit,
+                             RefPtr<TransportFlow> rtp_transport,
+                             RefPtr<TransportFlow> rtcp_transport,
+                             nsAutoPtr<MediaPipelineFilter> filter)
+  : direction_(direction),
+    track_id_(track_id),
+    level_(level),
+    conduit_(conduit),
+    rtp_(rtp_transport, rtcp_transport ? RTP : MUX),
+    rtcp_(rtcp_transport ? rtcp_transport : rtp_transport,
+          rtcp_transport ? RTCP : MUX),
+    main_thread_(main_thread),
+    sts_thread_(sts_thread),
+    rtp_packets_sent_(0),
+    rtcp_packets_sent_(0),
+    rtp_packets_received_(0),
+    rtcp_packets_received_(0),
+    rtp_bytes_sent_(0),
+    rtp_bytes_received_(0),
+    pc_(pc),
+    description_(),
+    filter_(filter),
+    rtp_parser_(webrtc::RtpHeaderParser::Create()) {
+  // To indicate rtcp-mux rtcp_transport should be nullptr.
+  // Therefore it's an error to send in the same flow for
+  // both rtp and rtcp.
+  MOZ_ASSERT(rtp_transport != rtcp_transport);
+
+  // PipelineTransport() will access this->sts_thread_; moved here for safety
+  transport_ = new PipelineTransport(this);
+}
+
 MediaPipeline::~MediaPipeline() {
   ASSERT_ON_THREAD(main_thread_);
   MOZ_MTLOG(ML_INFO, "Destroying MediaPipeline: " << description_);
 }
 
 nsresult MediaPipeline::Init() {
   ASSERT_ON_THREAD(main_thread_);
 
@@ -639,16 +679,132 @@ void MediaPipeline::PacketReceived(Trans
 
   if (IsRtp(data, len)) {
     RtpPacketReceived(layer, data, len);
   } else {
     RtcpPacketReceived(layer, data, len);
   }
 }
 
+class MediaPipelineTransmit::PipelineListener
+  : public MediaStreamTrackDirectListener
+{
+friend class MediaPipelineTransmit;
+public:
+  explicit PipelineListener(const RefPtr<MediaSessionConduit>& conduit)
+    : conduit_(conduit),
+      track_id_(TRACK_INVALID),
+      mMutex("MediaPipelineTransmit::PipelineListener"),
+      track_id_external_(TRACK_INVALID),
+      active_(false),
+      enabled_(false),
+      direct_connect_(false),
+      packetizer_(nullptr)
+#if !defined(MOZILLA_EXTERNAL_LINKAGE)
+    , last_img_(-1)
+#endif // MOZILLA_EXTERNAL_LINKAGE
+  {
+  }
+
+  ~PipelineListener()
+  {
+    if (NS_IsMainThread()) {
+      // This would happen if the cycle collector is destructing us, in which
+      // case it'd be too late to dispatch something to main thread anyway.
+      conduit_ = nullptr;
+      return;
+    }
+
+    // release conduit on mainthread.  Must use forget()!
+    nsresult rv = NS_DispatchToMainThread(new
+      ConduitDeleteEvent(conduit_.forget()));
+    MOZ_ASSERT(!NS_FAILED(rv),"Could not dispatch conduit shutdown to main");
+    if (NS_FAILED(rv)) {
+      MOZ_CRASH();
+    }
+  }
+
+  // Dispatches setting the internal TrackID to TRACK_INVALID to the media
+  // graph thread to keep it in sync with other MediaStreamGraph operations
+  // like RemoveListener() and AddListener(). The TrackID will be updated on
+  // the next NewData() callback.
+  void UnsetTrackId(MediaStreamGraphImpl* graph);
+
+  void SetActive(bool active) { active_ = active; }
+  void SetEnabled(bool enabled) { enabled_ = enabled; }
+
+  // Implement MediaStreamTrackListener
+  void NotifyQueuedChanges(MediaStreamGraph* aGraph,
+                           StreamTime aTrackOffset,
+                           const MediaSegment& aQueuedMedia) override;
+
+  // Implement MediaStreamTrackDirectListener
+  void NotifyRealtimeTrackData(MediaStreamGraph* aGraph,
+                               StreamTime aTrackOffset,
+                               const MediaSegment& aMedia) override;
+  void NotifyDirectListenerInstalled(InstallationResult aResult) override;
+  void NotifyDirectListenerUninstalled() override;
+
+private:
+  void UnsetTrackIdImpl() {
+    MutexAutoLock lock(mMutex);
+    track_id_ = track_id_external_ = TRACK_INVALID;
+  }
+
+  void NewData(MediaStreamGraph* graph,
+               StreamTime offset,
+               const MediaSegment& media);
+
+  virtual void ProcessAudioChunk(AudioSessionConduit *conduit,
+                                 TrackRate rate, AudioChunk& chunk);
+#if !defined(MOZILLA_EXTERNAL_LINKAGE)
+  virtual void ProcessVideoChunk(VideoSessionConduit *conduit,
+                                 VideoChunk& chunk);
+#endif // MOZILLA_EXTERNAL_LINKAGE
+  RefPtr<MediaSessionConduit> conduit_;
+
+  // May be TRACK_INVALID until we see data from the track
+  TrackID track_id_; // this is the current TrackID this listener is attached to
+  Mutex mMutex;
+  // protected by mMutex
+  // May be TRACK_INVALID until we see data from the track
+  TrackID track_id_external_; // this is queried from other threads
+
+  // active is true if there is a transport to send on
+  mozilla::Atomic<bool> active_;
+  // enabled is true if the media access control permits sending
+  // actual content; when false you get black/silence
+  mozilla::Atomic<bool> enabled_;
+
+  // Written and read on the MediaStreamGraph thread
+  bool direct_connect_;
+
+  nsAutoPtr<AudioPacketizer<int16_t, int16_t>> packetizer_;
+#if !defined(MOZILLA_EXTERNAL_LINKAGE)
+  int32_t last_img_; // serial number of last Image
+#endif // MOZILLA_EXTERNAL_LINKAGE
+};
+
+MediaPipelineTransmit::MediaPipelineTransmit(
+    const std::string& pc,
+    nsCOMPtr<nsIEventTarget> main_thread,
+    nsCOMPtr<nsIEventTarget> sts_thread,
+    dom::MediaStreamTrack* domtrack,
+    const std::string& track_id,
+    int level,
+    RefPtr<MediaSessionConduit> conduit,
+    RefPtr<TransportFlow> rtp_transport,
+    RefPtr<TransportFlow> rtcp_transport,
+    nsAutoPtr<MediaPipelineFilter> filter) :
+  MediaPipeline(pc, TRANSMIT, main_thread, sts_thread, track_id, level,
+                conduit, rtp_transport, rtcp_transport, filter),
+  listener_(new PipelineListener(conduit)),
+  domtrack_(domtrack)
+{}
+
 nsresult MediaPipelineTransmit::Init() {
   AttachToTrack(track_id_);
 
   return MediaPipeline::Init();
 }
 
 void MediaPipelineTransmit::AttachToTrack(const std::string& track_id) {
   ASSERT_ON_THREAD(main_thread_);
@@ -1312,29 +1468,39 @@ void MediaPipelineTransmit::PipelineList
     return;
   }
   MOZ_MTLOG(ML_DEBUG, "Sending an I420 video frame converted from " <<
                       Stringify(surf->GetFormat()));
   conduit->SendVideoFrame(yuv, buffer_size, size.width, size.height, mozilla::kVideoI420, 0);
 }
 #endif
 
-nsresult MediaPipelineReceiveAudio::Init() {
-  ASSERT_ON_THREAD(main_thread_);
-  MOZ_MTLOG(ML_DEBUG, __FUNCTION__);
+class TrackAddedCallback {
+ public:
+  virtual void TrackAdded(TrackTicks current_ticks) = 0;
+
+  NS_INLINE_DECL_THREADSAFE_REFCOUNTING(TrackAddedCallback);
+
+ protected:
+  virtual ~TrackAddedCallback() {}
+};
+
+class GenericReceiveListener;
 
-  description_ = pc_ + "| Receive audio[";
-  description_ += track_id_;
-  description_ += "]";
+class GenericReceiveCallback : public TrackAddedCallback
+{
+ public:
+  explicit GenericReceiveCallback(GenericReceiveListener* listener)
+    : listener_(listener) {}
 
-  listener_->AddSelf(new AudioSegment());
+  void TrackAdded(TrackTicks time);
 
-  return MediaPipelineReceive::Init();
-}
-
+ private:
+  RefPtr<GenericReceiveListener> listener_;
+};
 
 // Add a track and listener on the MSG thread using the MSG command queue
 static void AddTrackAndListener(MediaStream* source,
                                 TrackID track_id, TrackRate track_rate,
                                 MediaStreamListener* listener, MediaSegment* segment,
                                 const RefPtr<TrackAddedCallback>& completed,
                                 bool queue_track) {
   // This both adds the listener and the track
@@ -1412,140 +1578,514 @@ static void AddTrackAndListener(MediaStr
   } else {
     source->AsSourceStream()->AddTrack(track_id, 0, segment,
                                        SourceMediaStream::ADDTRACK_QUEUED);
   }
   MOZ_MTLOG(ML_INFO, "Queued track-add for track id " << track_id <<
                      " on MediaStream " << source);
 }
 
-void GenericReceiveListener::AddSelf(MediaSegment* segment) {
-  RefPtr<TrackAddedCallback> callback = new GenericReceiveCallback(this);
-  AddTrackAndListener(source_, track_id_, track_rate_, this, segment, callback,
-                      queue_track_);
+class GenericReceiveListener : public MediaStreamListener
+{
+ public:
+  GenericReceiveListener(SourceMediaStream *source, TrackID track_id,
+                         TrackRate track_rate, bool queue_track)
+    : source_(source),
+      track_id_(track_id),
+      track_rate_(track_rate),
+      played_ticks_(0),
+      queue_track_(queue_track),
+      principal_handle_(PRINCIPAL_HANDLE_NONE) {}
+
+  virtual ~GenericReceiveListener() {}
+
+  void AddSelf(MediaSegment* segment)
+  {
+    RefPtr<TrackAddedCallback> callback = new GenericReceiveCallback(this);
+    AddTrackAndListener(source_, track_id_, track_rate_, this, segment, callback,
+                        queue_track_);
+  }
+
+  void SetPlayedTicks(TrackTicks time) {
+    played_ticks_ = time;
+  }
+
+  void EndTrack() {
+    source_->EndTrack(track_id_);
+  }
+
+#ifndef USE_FAKE_MEDIA_STREAMS
+  // Must be called on the main thread
+  void SetPrincipalHandle_m(const PrincipalHandle& principal_handle)
+  {
+    class Message : public ControlMessage
+    {
+    public:
+      Message(GenericReceiveListener* listener,
+              MediaStream* stream,
+              const PrincipalHandle& principal_handle)
+        : ControlMessage(stream),
+          listener_(listener),
+          principal_handle_(principal_handle)
+      {}
+
+      void Run() override {
+        listener_->SetPrincipalHandle_msg(principal_handle_);
+      }
+
+      RefPtr<GenericReceiveListener> listener_;
+      PrincipalHandle principal_handle_;
+    };
+
+    source_->GraphImpl()->AppendMessage(MakeUnique<Message>(this, source_, principal_handle));
+  }
+
+  // Must be called on the MediaStreamGraph thread
+  void SetPrincipalHandle_msg(const PrincipalHandle& principal_handle)
+  {
+    principal_handle_ = principal_handle;
+  }
+#endif // USE_FAKE_MEDIA_STREAMS
+
+ protected:
+  SourceMediaStream *source_;
+  TrackID track_id_;
+  TrackRate track_rate_;
+  TrackTicks played_ticks_;
+  bool queue_track_;
+  PrincipalHandle principal_handle_;
+};
+
+void GenericReceiveCallback::TrackAdded(TrackTicks time)
+{
+  listener_->SetPlayedTicks(time);
+}
+
+MediaPipelineReceive::MediaPipelineReceive(
+    const std::string& pc,
+    nsCOMPtr<nsIEventTarget> main_thread,
+    nsCOMPtr<nsIEventTarget> sts_thread,
+    SourceMediaStream *stream,
+    const std::string& track_id,
+    int level,
+    RefPtr<MediaSessionConduit> conduit,
+    RefPtr<TransportFlow> rtp_transport,
+    RefPtr<TransportFlow> rtcp_transport,
+    nsAutoPtr<MediaPipelineFilter> filter) :
+  MediaPipeline(pc, RECEIVE, main_thread, sts_thread,
+                track_id, level, conduit, rtp_transport,
+                rtcp_transport, filter),
+  stream_(stream),
+  segments_added_(0)
+{
+  MOZ_ASSERT(stream_);
+}
+
+MediaPipelineReceive::~MediaPipelineReceive()
+{
+  MOZ_ASSERT(!stream_);  // Check that we have shut down already.
+}
+
+class MediaPipelineReceiveAudio::PipelineListener
+  : public GenericReceiveListener
+{
+public:
+  PipelineListener(SourceMediaStream * source, TrackID track_id,
+                   const RefPtr<MediaSessionConduit>& conduit,
+                   bool queue_track)
+    : GenericReceiveListener(source, track_id, DEFAULT_SAMPLE_RATE, queue_track), // XXX rate assumption
+      conduit_(conduit)
+  {
+    MOZ_ASSERT(track_rate_%100 == 0);
+  }
+
+  ~PipelineListener()
+  {
+    if (NS_IsMainThread()) {
+      // This would happen if the cycle collector is destructing us, in which
+      // case it'd be too late to dispatch something to main thread anyway.
+      conduit_ = nullptr;
+      return;
+    }
+
+    // release conduit on mainthread.  Must use forget()!
+    nsresult rv = NS_DispatchToMainThread(new
+      ConduitDeleteEvent(conduit_.forget()));
+    MOZ_ASSERT(!NS_FAILED(rv),"Could not dispatch conduit shutdown to main");
+    if (NS_FAILED(rv)) {
+      MOZ_CRASH();
+    }
+  }
+
+  // Implement MediaStreamListener
+  void NotifyQueuedTrackChanges(MediaStreamGraph* graph, TrackID tid,
+                                StreamTime offset,
+                                uint32_t events,
+                                const MediaSegment& queued_media,
+                                MediaStream* input_stream,
+                                TrackID input_tid) override {}
+
+  void NotifyPull(MediaStreamGraph* graph, StreamTime desired_time) override
+  {
+    MOZ_ASSERT(source_);
+    if (!source_) {
+      MOZ_MTLOG(ML_ERROR, "NotifyPull() called from a non-SourceMediaStream");
+      return;
+    }
+
+    // This comparison is done in total time to avoid accumulated roundoff errors.
+    while (source_->TicksToTimeRoundDown(track_rate_, played_ticks_) <
+           desired_time) {
+      // Max size given stereo is 480*2*2 = 1920 (48KHz)
+      const size_t AUDIO_SAMPLE_BUFFER_MAX = 1920;
+      MOZ_ASSERT((track_rate_/100)*sizeof(uint16_t) * 2 <= AUDIO_SAMPLE_BUFFER_MAX);
+
+      int16_t scratch_buffer[AUDIO_SAMPLE_BUFFER_MAX];
+
+      int samples_length;
+
+      // This fetches 10ms of data, either mono or stereo
+      MediaConduitErrorCode err =
+          static_cast<AudioSessionConduit*>(conduit_.get())->GetAudioFrame(
+              scratch_buffer,
+              track_rate_,
+              0,  // TODO(ekr@rtfm.com): better estimate of "capture" (really playout) delay
+              samples_length);
+
+      if (err != kMediaConduitNoError) {
+        // Insert silence on conduit/GIPS failure (extremely unlikely)
+        MOZ_MTLOG(ML_ERROR, "Audio conduit failed (" << err
+                  << ") to return data @ " << played_ticks_
+                  << " (desired " << desired_time << " -> "
+                  << source_->StreamTimeToSeconds(desired_time) << ")");
+        samples_length = track_rate_/100; // if this is not enough we'll loop and provide more
+        PodArrayZero(scratch_buffer);
+      }
+
+      MOZ_ASSERT(samples_length * sizeof(uint16_t) < AUDIO_SAMPLE_BUFFER_MAX);
+
+      MOZ_MTLOG(ML_DEBUG, "Audio conduit returned buffer of length "
+                << samples_length);
+
+      RefPtr<SharedBuffer> samples = SharedBuffer::Create(samples_length * sizeof(uint16_t));
+      int16_t *samples_data = static_cast<int16_t *>(samples->Data());
+      AudioSegment segment;
+      // We derive the number of channels of the stream from the number of samples
+      // the AudioConduit gives us, considering it gives us packets of 10ms and we
+      // know the rate.
+      uint32_t channelCount = samples_length / (track_rate_ / 100);
+      AutoTArray<int16_t*,2> channels;
+      AutoTArray<const int16_t*,2> outputChannels;
+      size_t frames = samples_length / channelCount;
+
+      channels.SetLength(channelCount);
+
+      size_t offset = 0;
+      for (size_t i = 0; i < channelCount; i++) {
+        channels[i] = samples_data + offset;
+        offset += frames;
+      }
+
+      DeinterleaveAndConvertBuffer(scratch_buffer,
+                                   frames,
+                                   channelCount,
+                                   channels.Elements());
+
+      outputChannels.AppendElements(channels);
+
+      segment.AppendFrames(samples.forget(), outputChannels, frames,
+                           principal_handle_);
+
+      // Handle track not actually added yet or removed/finished
+      if (source_->AppendToTrack(track_id_, &segment)) {
+        played_ticks_ += frames;
+      } else {
+        MOZ_MTLOG(ML_ERROR, "AppendToTrack failed");
+        // we can't un-read the data, but that's ok since we don't want to
+        // buffer - but don't i-loop!
+        return;
+      }
+    }
+  }
+
+private:
+  RefPtr<MediaSessionConduit> conduit_;
+};
+
+MediaPipelineReceiveAudio::MediaPipelineReceiveAudio(
+    const std::string& pc,
+    nsCOMPtr<nsIEventTarget> main_thread,
+    nsCOMPtr<nsIEventTarget> sts_thread,
+    SourceMediaStream* stream,
+    const std::string& media_stream_track_id,
+    TrackID numeric_track_id,
+    int level,
+    RefPtr<AudioSessionConduit> conduit,
+    RefPtr<TransportFlow> rtp_transport,
+    RefPtr<TransportFlow> rtcp_transport,
+    nsAutoPtr<MediaPipelineFilter> filter,
+    bool queue_track) :
+  MediaPipelineReceive(pc, main_thread, sts_thread,
+                       stream, media_stream_track_id, level, conduit,
+                       rtp_transport, rtcp_transport, filter),
+  listener_(new PipelineListener(stream, numeric_track_id, conduit,
+                                 queue_track))
+{}
+
+void MediaPipelineReceiveAudio::DetachMedia()
+{
+  ASSERT_ON_THREAD(main_thread_);
+  if (stream_) {
+    stream_->RemoveListener(listener_);
+    stream_ = nullptr;
+  }
+}
+
+nsresult MediaPipelineReceiveAudio::Init() {
+  ASSERT_ON_THREAD(main_thread_);
+  MOZ_MTLOG(ML_DEBUG, __FUNCTION__);
+
+  description_ = pc_ + "| Receive audio[";
+  description_ += track_id_;
+  description_ += "]";
+
+  listener_->AddSelf(new AudioSegment());
+
+  return MediaPipelineReceive::Init();
 }
 
 #ifndef USE_FAKE_MEDIA_STREAMS
-void GenericReceiveListener::SetPrincipalHandle_m(const PrincipalHandle& principal_handle)
+void MediaPipelineReceiveAudio::SetPrincipalHandle_m(const PrincipalHandle& principal_handle)
 {
-  class Message : public ControlMessage
-  {
-  public:
-    Message(GenericReceiveListener* listener,
-            MediaStream* stream,
-            const PrincipalHandle& principal_handle)
-      : ControlMessage(stream), listener_(listener), principal_handle_(principal_handle)
-    {}
-
-    void Run() override {
-      listener_->SetPrincipalHandle_msg(principal_handle_);
-    }
-
-    RefPtr<GenericReceiveListener> listener_;
-    PrincipalHandle principal_handle_;
-  };
-
-  source_->GraphImpl()->AppendMessage(MakeUnique<Message>(this, source_, principal_handle));
-}
-
-void GenericReceiveListener::SetPrincipalHandle_msg(const PrincipalHandle& principal_handle)
-{
-  principal_handle_ = principal_handle;
+  listener_->SetPrincipalHandle_m(principal_handle);
 }
 #endif // USE_FAKE_MEDIA_STREAMS
 
-MediaPipelineReceiveAudio::PipelineListener::PipelineListener(
-    SourceMediaStream * source, TrackID track_id,
-    const RefPtr<MediaSessionConduit>& conduit, bool queue_track)
-  : GenericReceiveListener(source, track_id, DEFAULT_SAMPLE_RATE, queue_track), // XXX rate assumption
-    conduit_(conduit)
-{
-  MOZ_ASSERT(track_rate_%100 == 0);
-}
+class MediaPipelineReceiveVideo::PipelineListener
+  : public GenericReceiveListener {
+public:
+  PipelineListener(SourceMediaStream * source, TrackID track_id,
+                   bool queue_track)
+    : GenericReceiveListener(source, track_id, source->GraphRate(), queue_track),
+      width_(640),
+      height_(480),
+#if defined(MOZILLA_INTERNAL_API)
+      image_container_(),
+      image_(),
+#endif
+      monitor_("Video PipelineListener")
+  {
+#if !defined(MOZILLA_EXTERNAL_LINKAGE)
+    image_container_ = LayerManager::CreateImageContainer();
+#endif
+  }
+
+
+  // Implement MediaStreamListener
+  void NotifyQueuedTrackChanges(MediaStreamGraph* graph, TrackID tid,
+                                StreamTime offset,
+                                uint32_t events,
+                                const MediaSegment& queued_media,
+                                MediaStream* input_stream,
+                                TrackID input_tid) override {}
+  void NotifyPull(MediaStreamGraph* graph, StreamTime desired_time) override
+  {
+    ReentrantMonitorAutoEnter enter(monitor_);
+
+  #if defined(MOZILLA_INTERNAL_API)
+    RefPtr<Image> image = image_;
+    // our constructor sets track_rate_ to the graph rate
+    MOZ_ASSERT(track_rate_ == source_->GraphRate());
+  #endif
 
-void MediaPipelineReceiveAudio::PipelineListener::
-NotifyPull(MediaStreamGraph* graph, StreamTime desired_time) {
-  MOZ_ASSERT(source_);
-  if (!source_) {
-    MOZ_MTLOG(ML_ERROR, "NotifyPull() called from a non-SourceMediaStream");
-    return;
+  #if defined(MOZILLA_INTERNAL_API)
+    StreamTime delta = desired_time - played_ticks_;
+
+    // Don't append if we've already provided a frame that supposedly
+    // goes past the current aDesiredTime Doing so means a negative
+    // delta and thus messes up handling of the graph
+    if (delta > 0) {
+      VideoSegment segment;
+      segment.AppendFrame(image.forget(), delta, IntSize(width_, height_),
+                          principal_handle_);
+      // Handle track not actually added yet or removed/finished
+      if (source_->AppendToTrack(track_id_, &segment)) {
+        played_ticks_ = desired_time;
+      } else {
+        MOZ_MTLOG(ML_ERROR, "AppendToTrack failed");
+        return;
+      }
+    }
+  #endif
+  }
+
+  // Accessors for external writes from the renderer
+  void FrameSizeChange(unsigned int width,
+                       unsigned int height,
+                       unsigned int number_of_streams) {
+    ReentrantMonitorAutoEnter enter(monitor_);
+
+    width_ = width;
+    height_ = height;
+  }
+
+  void RenderVideoFrame(const unsigned char* buffer,
+                        size_t buffer_size,
+                        uint32_t time_stamp,
+                        int64_t render_time,
+                        const RefPtr<layers::Image>& video_image)
+  {
+    RenderVideoFrame(buffer, buffer_size, width_, (width_ + 1) >> 1,
+                     time_stamp, render_time, video_image);
   }
 
-  // This comparison is done in total time to avoid accumulated roundoff errors.
-  while (source_->TicksToTimeRoundDown(track_rate_, played_ticks_) <
-         desired_time) {
-    // Max size given stereo is 480*2*2 = 1920 (48KHz)
-    const size_t AUDIO_SAMPLE_BUFFER_MAX = 1920;
-    MOZ_ASSERT((track_rate_/100)*sizeof(uint16_t) * 2 <= AUDIO_SAMPLE_BUFFER_MAX);
+  void RenderVideoFrame(const unsigned char* buffer,
+                        size_t buffer_size,
+                        uint32_t y_stride,
+                        uint32_t cbcr_stride,
+                        uint32_t time_stamp,
+                        int64_t render_time,
+                        const RefPtr<layers::Image>& video_image)
+  {
+#ifdef MOZILLA_INTERNAL_API
+    ReentrantMonitorAutoEnter enter(monitor_);
+#endif // MOZILLA_INTERNAL_API
 
-    int16_t scratch_buffer[AUDIO_SAMPLE_BUFFER_MAX];
+#if defined(MOZILLA_INTERNAL_API)
+    if (buffer) {
+      // Create a video frame using |buffer|.
+#ifdef MOZ_WIDGET_GONK
+      RefPtr<PlanarYCbCrImage> yuvImage = new GrallocImage();
+#else
+      RefPtr<PlanarYCbCrImage> yuvImage = image_container_->CreatePlanarYCbCrImage();
+#endif
+      uint8_t* frame = const_cast<uint8_t*>(static_cast<const uint8_t*> (buffer));
 
-    int samples_length;
+      PlanarYCbCrData yuvData;
+      yuvData.mYChannel = frame;
+      yuvData.mYSize = IntSize(y_stride, height_);
+      yuvData.mYStride = y_stride;
+      yuvData.mCbCrStride = cbcr_stride;
+      yuvData.mCbChannel = frame + height_ * yuvData.mYStride;
+      yuvData.mCrChannel = yuvData.mCbChannel + ((height_ + 1) >> 1) * yuvData.mCbCrStride;
+      yuvData.mCbCrSize = IntSize(yuvData.mCbCrStride, (height_ + 1) >> 1);
+      yuvData.mPicX = 0;
+      yuvData.mPicY = 0;
+      yuvData.mPicSize = IntSize(width_, height_);
+      yuvData.mStereoMode = StereoMode::MONO;
 
-    // This fetches 10ms of data, either mono or stereo
-    MediaConduitErrorCode err =
-        static_cast<AudioSessionConduit*>(conduit_.get())->GetAudioFrame(
-            scratch_buffer,
-            track_rate_,
-            0,  // TODO(ekr@rtfm.com): better estimate of "capture" (really playout) delay
-            samples_length);
+      if (!yuvImage->SetData(yuvData)) {
+        MOZ_ASSERT(false);
+        return;
+      }
+
+      image_ = yuvImage;
+    }
+#ifdef WEBRTC_GONK
+    else {
+      // Decoder produced video frame that can be appended to the track directly.
+      MOZ_ASSERT(video_image);
+      image_ = video_image;
+    }
+#endif // WEBRTC_GONK
+#endif // MOZILLA_INTERNAL_API
+  }
 
-    if (err != kMediaConduitNoError) {
-      // Insert silence on conduit/GIPS failure (extremely unlikely)
-      MOZ_MTLOG(ML_ERROR, "Audio conduit failed (" << err
-                << ") to return data @ " << played_ticks_
-                << " (desired " << desired_time << " -> "
-                << source_->StreamTimeToSeconds(desired_time) << ")");
-      samples_length = track_rate_/100; // if this is not enough we'll loop and provide more
-      PodArrayZero(scratch_buffer);
-    }
+private:
+  int width_;
+  int height_;
+#if defined(MOZILLA_INTERNAL_API)
+  RefPtr<layers::ImageContainer> image_container_;
+  RefPtr<layers::Image> image_;
+#endif
+  mozilla::ReentrantMonitor monitor_; // Monitor for processing WebRTC frames.
+                                      // Protects image_ against:
+                                      // - Writing from the GIPS thread
+                                      // - Reading from the MSG thread
+};
 
-    MOZ_ASSERT(samples_length * sizeof(uint16_t) < AUDIO_SAMPLE_BUFFER_MAX);
-
-    MOZ_MTLOG(ML_DEBUG, "Audio conduit returned buffer of length "
-              << samples_length);
+class MediaPipelineReceiveVideo::PipelineRenderer : public VideoRenderer
+{
+public:
+  explicit PipelineRenderer(MediaPipelineReceiveVideo *pipeline) :
+    pipeline_(pipeline) {}
 
-    RefPtr<SharedBuffer> samples = SharedBuffer::Create(samples_length * sizeof(uint16_t));
-    int16_t *samples_data = static_cast<int16_t *>(samples->Data());
-    AudioSegment segment;
-    // We derive the number of channels of the stream from the number of samples
-    // the AudioConduit gives us, considering it gives us packets of 10ms and we
-    // know the rate.
-    uint32_t channelCount = samples_length / (track_rate_ / 100);
-    AutoTArray<int16_t*,2> channels;
-    AutoTArray<const int16_t*,2> outputChannels;
-    size_t frames = samples_length / channelCount;
+  void Detach() { pipeline_ = nullptr; }
+
+  // Implement VideoRenderer
+  void FrameSizeChange(unsigned int width,
+                       unsigned int height,
+                       unsigned int number_of_streams) override
+  {
+    pipeline_->listener_->FrameSizeChange(width, height, number_of_streams);
+  }
 
-    channels.SetLength(channelCount);
+  void RenderVideoFrame(const unsigned char* buffer,
+                        size_t buffer_size,
+                        uint32_t time_stamp,
+                        int64_t render_time,
+                        const ImageHandle& handle) override
+  {
+    pipeline_->listener_->RenderVideoFrame(buffer, buffer_size,
+                                           time_stamp, render_time,
+                                           handle.GetImage());
+  }
 
-    size_t offset = 0;
-    for (size_t i = 0; i < channelCount; i++) {
-      channels[i] = samples_data + offset;
-      offset += frames;
-    }
+  void RenderVideoFrame(const unsigned char* buffer,
+                        size_t buffer_size,
+                        uint32_t y_stride,
+                        uint32_t cbcr_stride,
+                        uint32_t time_stamp,
+                        int64_t render_time,
+                        const ImageHandle& handle) override
+  {
+    pipeline_->listener_->RenderVideoFrame(buffer, buffer_size,
+                                           y_stride, cbcr_stride,
+                                           time_stamp, render_time,
+                                           handle.GetImage());
+  }
 
-    DeinterleaveAndConvertBuffer(scratch_buffer,
-                                 frames,
-                                 channelCount,
-                                 channels.Elements());
+private:
+  MediaPipelineReceiveVideo *pipeline_;  // Raw pointer to avoid cycles
+};
 
-    outputChannels.AppendElements(channels);
-
-    segment.AppendFrames(samples.forget(), outputChannels, frames,
-                         principal_handle_);
 
-    // Handle track not actually added yet or removed/finished
-    if (source_->AppendToTrack(track_id_, &segment)) {
-      played_ticks_ += frames;
-    } else {
-      MOZ_MTLOG(ML_ERROR, "AppendToTrack failed");
-      // we can't un-read the data, but that's ok since we don't want to
-      // buffer - but don't i-loop!
-      return;
-    }
+MediaPipelineReceiveVideo::MediaPipelineReceiveVideo(
+    const std::string& pc,
+    nsCOMPtr<nsIEventTarget> main_thread,
+    nsCOMPtr<nsIEventTarget> sts_thread,
+    SourceMediaStream *stream,
+    const std::string& media_stream_track_id,
+    TrackID numeric_track_id,
+    int level,
+    RefPtr<VideoSessionConduit> conduit,
+    RefPtr<TransportFlow> rtp_transport,
+    RefPtr<TransportFlow> rtcp_transport,
+    nsAutoPtr<MediaPipelineFilter> filter,
+    bool queue_track) :
+  MediaPipelineReceive(pc, main_thread, sts_thread,
+                       stream, media_stream_track_id, level, conduit,
+                       rtp_transport, rtcp_transport, filter),
+  renderer_(new PipelineRenderer(this)),
+  listener_(new PipelineListener(stream, numeric_track_id, queue_track))
+{}
+
+void MediaPipelineReceiveVideo::DetachMedia()
+{
+  ASSERT_ON_THREAD(main_thread_);
+
+  listener_->EndTrack();
+  // stop generating video and thus stop invoking the PipelineRenderer
+  // and PipelineListener - the renderer has a raw ptr to the Pipeline to
+  // avoid cycles, and the render callbacks are invoked from a different
+  // thread so simple null-checks would cause TSAN bugs without locks.
+  static_cast<VideoSessionConduit*>(conduit_.get())->DetachRenderer();
+  if (stream_) {
+    stream_->RemoveListener(listener_);
+    stream_ = nullptr;
   }
 }
 
 nsresult MediaPipelineReceiveVideo::Init() {
   ASSERT_ON_THREAD(main_thread_);
   MOZ_MTLOG(ML_DEBUG, __FUNCTION__);
 
   description_ = pc_ + "| Receive video[";
@@ -1558,119 +2098,16 @@ nsresult MediaPipelineReceiveVideo::Init
 
   // Always happens before we can DetachMedia()
   static_cast<VideoSessionConduit *>(conduit_.get())->
       AttachRenderer(renderer_);
 
   return MediaPipelineReceive::Init();
 }
 
-MediaPipelineReceiveVideo::PipelineListener::PipelineListener(
-  SourceMediaStream* source, TrackID track_id, bool queue_track)
-  : GenericReceiveListener(source, track_id, source->GraphRate(), queue_track),
-    width_(640),
-    height_(480),
-#if defined(MOZILLA_INTERNAL_API)
-    image_container_(),
-    image_(),
-#endif
-    monitor_("Video PipelineListener") {
-#if !defined(MOZILLA_EXTERNAL_LINKAGE)
-  image_container_ = LayerManager::CreateImageContainer();
-#endif
-}
-
-void MediaPipelineReceiveVideo::PipelineListener::RenderVideoFrame(
-    const unsigned char* buffer,
-    size_t buffer_size,
-    uint32_t time_stamp,
-    int64_t render_time,
-    const RefPtr<Image>& video_image) {
-  RenderVideoFrame(buffer, buffer_size, width_, (width_ + 1) >> 1,
-                   time_stamp, render_time, video_image);
+#ifndef USE_FAKE_MEDIA_STREAMS
+void MediaPipelineReceiveVideo::SetPrincipalHandle_m(const PrincipalHandle& principal_handle)
+{
+  listener_->SetPrincipalHandle_m(principal_handle);
 }
-
-void MediaPipelineReceiveVideo::PipelineListener::RenderVideoFrame(
-    const unsigned char* buffer,
-    size_t buffer_size,
-    uint32_t y_stride,
-    uint32_t cbcr_stride,
-    uint32_t time_stamp,
-    int64_t render_time,
-    const RefPtr<Image>& video_image) {
-
-#ifdef MOZILLA_INTERNAL_API
-  ReentrantMonitorAutoEnter enter(monitor_);
-#endif // MOZILLA_INTERNAL_API
-
-#if defined(MOZILLA_INTERNAL_API)
-  if (buffer) {
-    // Create a video frame using |buffer|.
-#ifdef MOZ_WIDGET_GONK
-    RefPtr<PlanarYCbCrImage> yuvImage = new GrallocImage();
-#else
-    RefPtr<PlanarYCbCrImage> yuvImage = image_container_->CreatePlanarYCbCrImage();
-#endif
-    uint8_t* frame = const_cast<uint8_t*>(static_cast<const uint8_t*> (buffer));
-
-    PlanarYCbCrData yuvData;
-    yuvData.mYChannel = frame;
-    yuvData.mYSize = IntSize(y_stride, height_);
-    yuvData.mYStride = y_stride;
-    yuvData.mCbCrStride = cbcr_stride;
-    yuvData.mCbChannel = frame + height_ * yuvData.mYStride;
-    yuvData.mCrChannel = yuvData.mCbChannel + ((height_ + 1) >> 1) * yuvData.mCbCrStride;
-    yuvData.mCbCrSize = IntSize(yuvData.mCbCrStride, (height_ + 1) >> 1);
-    yuvData.mPicX = 0;
-    yuvData.mPicY = 0;
-    yuvData.mPicSize = IntSize(width_, height_);
-    yuvData.mStereoMode = StereoMode::MONO;
-
-    if (!yuvImage->SetData(yuvData)) {
-      MOZ_ASSERT(false);
-      return;
-    }
-
-    image_ = yuvImage;
-  }
-#ifdef WEBRTC_GONK
-  else {
-    // Decoder produced video frame that can be appended to the track directly.
-    MOZ_ASSERT(video_image);
-    image_ = video_image;
-  }
-#endif // WEBRTC_GONK
-#endif // MOZILLA_INTERNAL_API
-}
-
-void MediaPipelineReceiveVideo::PipelineListener::
-NotifyPull(MediaStreamGraph* graph, StreamTime desired_time) {
-  ReentrantMonitorAutoEnter enter(monitor_);
-
-#if defined(MOZILLA_INTERNAL_API)
-  RefPtr<Image> image = image_;
-  // our constructor sets track_rate_ to the graph rate
-  MOZ_ASSERT(track_rate_ == source_->GraphRate());
-#endif
-
-#if defined(MOZILLA_INTERNAL_API)
-  StreamTime delta = desired_time - played_ticks_;
-
-  // Don't append if we've already provided a frame that supposedly
-  // goes past the current aDesiredTime Doing so means a negative
-  // delta and thus messes up handling of the graph
-  if (delta > 0) {
-    VideoSegment segment;
-    segment.AppendFrame(image.forget(), delta, IntSize(width_, height_),
-                        principal_handle_);
-    // Handle track not actually added yet or removed/finished
-    if (source_->AppendToTrack(track_id_, &segment)) {
-      played_ticks_ = desired_time;
-    } else {
-      MOZ_MTLOG(ML_ERROR, "AppendToTrack failed");
-      return;
-    }
-  }
-#endif
-}
-
+#endif // USE_FAKE_MEDIA_STREAMS
 
 }  // end namespace
--- a/media/webrtc/signaling/src/mediapipeline/MediaPipeline.h
+++ b/media/webrtc/signaling/src/mediapipeline/MediaPipeline.h
@@ -7,41 +7,42 @@
 
 #ifndef mediapipeline_h__
 #define mediapipeline_h__
 
 #include "sigslot.h"
 
 #ifdef USE_FAKE_MEDIA_STREAMS
 #include "FakeMediaStreams.h"
-#else
-#include "DOMMediaStream.h"
-#include "MediaStreamGraph.h"
-#include "VideoUtils.h"
 #endif
 #include "MediaConduitInterface.h"
-#include "MediaPipelineFilter.h"
-#include "AudioSegment.h"
 #include "mozilla/ReentrantMonitor.h"
 #include "mozilla/Atomics.h"
 #include "SrtpFlow.h"
 #include "databuffer.h"
 #include "runnable_utils.h"
 #include "transportflow.h"
 #include "AudioPacketizer.h"
-
-#if defined(MOZILLA_INTERNAL_API)
-#include "VideoSegment.h"
-#endif
+#include "StreamBuffer.h"
 
 #include "webrtc/modules/rtp_rtcp/interface/rtp_header_parser.h"
 
+class nsIPrincipal;
+
 namespace mozilla {
+class MediaPipelineFilter;
+class PeerIdentity;
 
-class PeerIdentity;
+#ifndef USE_FAKE_MEDIA_STREAMS
+namespace dom {
+  class MediaStreamTrack;
+} // namespace dom
+
+class SourceMediaStream;
+#endif // USE_FAKE_MEDIA_STREAMS
 
 // A class that represents the pipeline of audio and video
 // The dataflow looks like:
 //
 // TRANSMIT
 // CaptureDevice -> stream -> [us] -> conduit -> [us] -> transport -> network
 //
 // RECEIVE
@@ -77,44 +78,17 @@ class MediaPipeline : public sigslot::ha
                 Direction direction,
                 nsCOMPtr<nsIEventTarget> main_thread,
                 nsCOMPtr<nsIEventTarget> sts_thread,
                 const std::string& track_id,
                 int level,
                 RefPtr<MediaSessionConduit> conduit,
                 RefPtr<TransportFlow> rtp_transport,
                 RefPtr<TransportFlow> rtcp_transport,
-                nsAutoPtr<MediaPipelineFilter> filter)
-      : direction_(direction),
-        track_id_(track_id),
-        level_(level),
-        conduit_(conduit),
-        rtp_(rtp_transport, rtcp_transport ? RTP : MUX),
-        rtcp_(rtcp_transport ? rtcp_transport : rtp_transport,
-              rtcp_transport ? RTCP : MUX),
-        main_thread_(main_thread),
-        sts_thread_(sts_thread),
-        rtp_packets_sent_(0),
-        rtcp_packets_sent_(0),
-        rtp_packets_received_(0),
-        rtcp_packets_received_(0),
-        rtp_bytes_sent_(0),
-        rtp_bytes_received_(0),
-        pc_(pc),
-        description_(),
-        filter_(filter),
-        rtp_parser_(webrtc::RtpHeaderParser::Create()) {
-      // To indicate rtcp-mux rtcp_transport should be nullptr.
-      // Therefore it's an error to send in the same flow for
-      // both rtp and rtcp.
-      MOZ_ASSERT(rtp_transport != rtcp_transport);
-
-      // PipelineTransport() will access this->sts_thread_; moved here for safety
-      transport_ = new PipelineTransport(this);
-    }
+                nsAutoPtr<MediaPipelineFilter> filter);
 
   // Must be called on the STS thread.  Must be called after ShutdownMedia_m().
   void DetachTransport_s();
 
   // Must be called on the main thread.
   void ShutdownMedia_m()
   {
     ASSERT_ON_THREAD(main_thread_);
@@ -296,83 +270,16 @@ class MediaPipeline : public sigslot::ha
   nsAutoPtr<webrtc::RtpHeaderParser> rtp_parser_;
 
  private:
   nsresult Init_s();
 
   bool IsRtp(const unsigned char *data, size_t len);
 };
 
-class GenericReceiveListener : public MediaStreamListener
-{
- public:
-  GenericReceiveListener(SourceMediaStream *source, TrackID track_id,
-                         TrackRate track_rate, bool queue_track)
-    : source_(source),
-      track_id_(track_id),
-      track_rate_(track_rate),
-      played_ticks_(0),
-      queue_track_(queue_track),
-      principal_handle_(PRINCIPAL_HANDLE_NONE) {}
-
-  virtual ~GenericReceiveListener() {}
-
-  void AddSelf(MediaSegment* segment);
-
-  void SetPlayedTicks(TrackTicks time) {
-    played_ticks_ = time;
-  }
-
-  void EndTrack() {
-    source_->EndTrack(track_id_);
-  }
-
-#ifndef USE_FAKE_MEDIA_STREAMS
-  // Must be called on the main thread
-  void SetPrincipalHandle_m(const PrincipalHandle& aPrincipal);
-
-  // Must be called on the MediaStreamGraph thread
-  void SetPrincipalHandle_msg(const PrincipalHandle& aPrincipal);
-#endif // USE_FAKE_MEDIA_STREAMS
-
- protected:
-  SourceMediaStream *source_;
-  TrackID track_id_;
-  TrackRate track_rate_;
-  TrackTicks played_ticks_;
-  bool queue_track_;
-  PrincipalHandle principal_handle_;
-};
-
-class TrackAddedCallback {
- public:
-  virtual void TrackAdded(TrackTicks current_ticks) = 0;
-
-  NS_INLINE_DECL_THREADSAFE_REFCOUNTING(TrackAddedCallback);
-
- protected:
-  virtual ~TrackAddedCallback() {}
-};
-
-class GenericReceiveListener;
-
-class GenericReceiveCallback : public TrackAddedCallback
-{
- public:
-  explicit GenericReceiveCallback(GenericReceiveListener* listener)
-    : listener_(listener) {}
-
-  void TrackAdded(TrackTicks time) {
-    listener_->SetPlayedTicks(time);
-  }
-
- private:
-  RefPtr<GenericReceiveListener> listener_;
-};
-
 class ConduitDeleteEvent: public nsRunnable
 {
 public:
   explicit ConduitDeleteEvent(already_AddRefed<MediaSessionConduit> aConduit) :
     mConduit(aConduit) {}
 
   /* we exist solely to proxy release of the conduit */
   NS_IMETHOD Run() { return NS_OK; }
@@ -389,22 +296,17 @@ public:
                         nsCOMPtr<nsIEventTarget> main_thread,
                         nsCOMPtr<nsIEventTarget> sts_thread,
                         dom::MediaStreamTrack* domtrack,
                         const std::string& track_id,
                         int level,
                         RefPtr<MediaSessionConduit> conduit,
                         RefPtr<TransportFlow> rtp_transport,
                         RefPtr<TransportFlow> rtcp_transport,
-                        nsAutoPtr<MediaPipelineFilter> filter) :
-      MediaPipeline(pc, TRANSMIT, main_thread, sts_thread, track_id, level,
-                    conduit, rtp_transport, rtcp_transport, filter),
-      listener_(new PipelineListener(conduit)),
-      domtrack_(domtrack)
-  {}
+                        nsAutoPtr<MediaPipelineFilter> filter);
 
   // Initialize (stuff here may fail)
   nsresult Init() override;
 
   virtual void AttachToTrack(const std::string& track_id);
 
   // written and used from MainThread
   bool IsVideo() const override;
@@ -425,114 +327,18 @@ public:
   nsresult TransportReady_s(TransportInfo &info) override;
 
   // Replace a track with a different one
   // In non-compliance with the likely final spec, allow the new
   // track to be part of a different stream (since we don't support
   // multiple tracks of a type in a stream yet).  bug 1056650
   virtual nsresult ReplaceTrack(dom::MediaStreamTrack& domtrack);
 
-
   // Separate class to allow ref counting
-  class PipelineListener : public MediaStreamTrackDirectListener {
-   friend class MediaPipelineTransmit;
-   public:
-    explicit PipelineListener(const RefPtr<MediaSessionConduit>& conduit)
-      : conduit_(conduit),
-        track_id_(TRACK_INVALID),
-        mMutex("MediaPipelineTransmit::PipelineListener"),
-        track_id_external_(TRACK_INVALID),
-        active_(false),
-        enabled_(false),
-        direct_connect_(false),
-        packetizer_(nullptr)
-#if !defined(MOZILLA_EXTERNAL_LINKAGE)
-        , last_img_(-1)
-#endif // MOZILLA_INTERNAL_API
-    {
-    }
-
-    ~PipelineListener()
-    {
-      if (NS_IsMainThread()) {
-        // This would happen if the cycle collector is destructing us, in which
-        // case it'd be too late to dispatch something to main thread anyway.
-        conduit_ = nullptr;
-        return;
-      }
-
-      // release conduit on mainthread.  Must use forget()!
-      nsresult rv = NS_DispatchToMainThread(new
-        ConduitDeleteEvent(conduit_.forget()));
-      MOZ_ASSERT(!NS_FAILED(rv),"Could not dispatch conduit shutdown to main");
-      if (NS_FAILED(rv)) {
-        MOZ_CRASH();
-      }
-    }
-
-    // Dispatches setting the internal TrackID to TRACK_INVALID to the media
-    // graph thread to keep it in sync with other MediaStreamGraph operations
-    // like RemoveListener() and AddListener(). The TrackID will be updated on
-    // the next NewData() callback.
-    void UnsetTrackId(MediaStreamGraphImpl* graph);
-
-    void SetActive(bool active) { active_ = active; }
-    void SetEnabled(bool enabled) { enabled_ = enabled; }
-
-    // Implement MediaStreamTrackListener
-    void NotifyQueuedChanges(MediaStreamGraph* aGraph,
-                             StreamTime aTrackOffset,
-                             const MediaSegment& aQueuedMedia) override;
-
-    // Implement MediaStreamTrackDirectListener
-    void NotifyRealtimeTrackData(MediaStreamGraph* aGraph,
-                                 StreamTime aTrackOffset,
-                                 const MediaSegment& aMedia) override;
-    void NotifyDirectListenerInstalled(InstallationResult aResult) override;
-    void NotifyDirectListenerUninstalled() override;
-
-   private:
-    void UnsetTrackIdImpl() {
-      MutexAutoLock lock(mMutex);
-      track_id_ = track_id_external_ = TRACK_INVALID;
-    }
-
-    void NewData(MediaStreamGraph* graph,
-                 StreamTime offset,
-                 const MediaSegment& media);
-
-    virtual void ProcessAudioChunk(AudioSessionConduit *conduit,
-                                   TrackRate rate, AudioChunk& chunk);
-#if !defined(MOZILLA_EXTERNAL_LINKAGE)
-    virtual void ProcessVideoChunk(VideoSessionConduit *conduit,
-                                   VideoChunk& chunk);
-#endif
-    RefPtr<MediaSessionConduit> conduit_;
-
-    // May be TRACK_INVALID until we see data from the track
-    TrackID track_id_; // this is the current TrackID this listener is attached to
-    Mutex mMutex;
-    // protected by mMutex
-    // May be TRACK_INVALID until we see data from the track
-    TrackID track_id_external_; // this is queried from other threads
-
-    // active is true if there is a transport to send on
-    mozilla::Atomic<bool> active_;
-    // enabled is true if the media access control permits sending
-    // actual content; when false you get black/silence
-    mozilla::Atomic<bool> enabled_;
-
-    // Written and read on the MediaStreamGraph thread
-    bool direct_connect_;
-
-    nsAutoPtr<AudioPacketizer<int16_t, int16_t>> packetizer_;
-#if !defined(MOZILLA_EXTERNAL_LINKAGE)
-    int32_t last_img_; // serial number of last Image
-#endif // MOZILLA_INTERNAL_API
-  };
+  class PipelineListener;
 
  private:
   RefPtr<PipelineListener> listener_;
   dom::MediaStreamTrack* domtrack_;
 };
 
 
 // A specialization of pipeline for reading from the network and
@@ -544,36 +350,27 @@ class MediaPipelineReceive : public Medi
                        nsCOMPtr<nsIEventTarget> main_thread,
                        nsCOMPtr<nsIEventTarget> sts_thread,
                        SourceMediaStream *stream,
                        const std::string& track_id,
                        int level,
                        RefPtr<MediaSessionConduit> conduit,
                        RefPtr<TransportFlow> rtp_transport,
                        RefPtr<TransportFlow> rtcp_transport,
-                       nsAutoPtr<MediaPipelineFilter> filter) :
-      MediaPipeline(pc, RECEIVE, main_thread, sts_thread,
-                    track_id, level, conduit, rtp_transport,
-                    rtcp_transport, filter),
-      stream_(stream),
-      segments_added_(0) {
-    MOZ_ASSERT(stream_);
-  }
+                       nsAutoPtr<MediaPipelineFilter> filter);
 
   int segments_added() const { return segments_added_; }
 
 #ifndef USE_FAKE_MEDIA_STREAMS
   // Sets the PrincipalHandle we set on the media chunks produced by this
   // pipeline. Must be called on the main thread.
   virtual void SetPrincipalHandle_m(const PrincipalHandle& principal_handle) = 0;
 #endif // USE_FAKE_MEDIA_STREAMS
  protected:
-  ~MediaPipelineReceive() {
-    MOZ_ASSERT(!stream_);  // Check that we have shut down already.
-  }
+  ~MediaPipelineReceive();
 
   RefPtr<SourceMediaStream> stream_;
   int segments_added_;
 
  private:
 };
 
 
@@ -592,80 +389,30 @@ class MediaPipelineReceiveAudio : public
                             // unique within a single DOMMediaStream, which is
                             // used by MediaStreamGraph
                             TrackID numeric_track_id,
                             int level,
                             RefPtr<AudioSessionConduit> conduit,
                             RefPtr<TransportFlow> rtp_transport,
                             RefPtr<TransportFlow> rtcp_transport,
                             nsAutoPtr<MediaPipelineFilter> filter,
-                            bool queue_track) :
-      MediaPipelineReceive(pc, main_thread, sts_thread,
-                           stream, media_stream_track_id, level, conduit,
-                           rtp_transport, rtcp_transport, filter),
-      listener_(new PipelineListener(stream, numeric_track_id, conduit,
-                                     queue_track)) {
-  }
+                            bool queue_track);
 
-  void DetachMedia() override {
-    ASSERT_ON_THREAD(main_thread_);
-    if (stream_) {
-      stream_->RemoveListener(listener_);
-      stream_ = nullptr;
-    }
-  }
+  void DetachMedia() override;
 
   nsresult Init() override;
   bool IsVideo() const override { return false; }
 
 #ifndef USE_FAKE_MEDIA_STREAMS
-  void SetPrincipalHandle_m(const PrincipalHandle& principal_handle) override
-  {
-    listener_->SetPrincipalHandle_m(principal_handle);
-  }
+  void SetPrincipalHandle_m(const PrincipalHandle& principal_handle) override;
 #endif // USE_FAKE_MEDIA_STREAMS
 
  private:
   // Separate class to allow ref counting
-  class PipelineListener : public GenericReceiveListener {
-   public:
-    PipelineListener(SourceMediaStream * source, TrackID track_id,
-                     const RefPtr<MediaSessionConduit>& conduit,
-                     bool queue_track);
-
-    ~PipelineListener()
-    {
-      if (NS_IsMainThread()) {
-        // This would happen if the cycle collector is destructing us, in which
-        // case it'd be too late to dispatch something to main thread anyway.
-        conduit_ = nullptr;
-        return;
-      }
-
-      // release conduit on mainthread.  Must use forget()!
-      nsresult rv = NS_DispatchToMainThread(new
-        ConduitDeleteEvent(conduit_.forget()));
-      MOZ_ASSERT(!NS_FAILED(rv),"Could not dispatch conduit shutdown to main");
-      if (NS_FAILED(rv)) {
-        MOZ_CRASH();
-      }
-    }
-
-    // Implement MediaStreamListener
-    void NotifyQueuedTrackChanges(MediaStreamGraph* graph, TrackID tid,
-                                  StreamTime offset,
-                                  uint32_t events,
-                                  const MediaSegment& queued_media,
-                                  MediaStream* input_stream,
-                                  TrackID input_tid) override {}
-    void NotifyPull(MediaStreamGraph* graph, StreamTime desired_time) override;
-
-   private:
-    RefPtr<MediaSessionConduit> conduit_;
-  };
+  class PipelineListener;
 
   RefPtr<PipelineListener> listener_;
 };
 
 
 // A specialization of pipeline for reading from the network and
 // rendering video.
 class MediaPipelineReceiveVideo : public MediaPipelineReceive {
@@ -681,144 +428,34 @@ class MediaPipelineReceiveVideo : public
                             // unique within a single DOMMediaStream, which is
                             // used by MediaStreamGraph
                             TrackID numeric_track_id,
                             int level,
                             RefPtr<VideoSessionConduit> conduit,
                             RefPtr<TransportFlow> rtp_transport,
                             RefPtr<TransportFlow> rtcp_transport,
                             nsAutoPtr<MediaPipelineFilter> filter,
-                            bool queue_track) :
-      MediaPipelineReceive(pc, main_thread, sts_thread,
-                           stream, media_stream_track_id, level, conduit,
-                           rtp_transport, rtcp_transport, filter),
-      renderer_(new PipelineRenderer(this)),
-      listener_(new PipelineListener(stream, numeric_track_id, queue_track)) {
-  }
+                            bool queue_track);
 
   // Called on the main thread.
-  void DetachMedia() override {
-    ASSERT_ON_THREAD(main_thread_);
-
-    listener_->EndTrack();
-    // stop generating video and thus stop invoking the PipelineRenderer
-    // and PipelineListener - the renderer has a raw ptr to the Pipeline to
-    // avoid cycles, and the render callbacks are invoked from a different
-    // thread so simple null-checks would cause TSAN bugs without locks.
-    static_cast<VideoSessionConduit*>(conduit_.get())->DetachRenderer();
-    if (stream_) {
-      stream_->RemoveListener(listener_);
-      stream_ = nullptr;
-    }
-  }
+  void DetachMedia() override;
 
   nsresult Init() override;
   bool IsVideo() const override { return true; }
 
 #ifndef USE_FAKE_MEDIA_STREAMS
-  void SetPrincipalHandle_m(const PrincipalHandle& principal_handle) override
-  {
-    listener_->SetPrincipalHandle_m(principal_handle);
-  }
+  void SetPrincipalHandle_m(const PrincipalHandle& principal_handle) override;
 #endif // USE_FAKE_MEDIA_STREAMS
 
  private:
-  class PipelineRenderer : public VideoRenderer {
-   public:
-    explicit PipelineRenderer(MediaPipelineReceiveVideo *pipeline) :
-      pipeline_(pipeline) {}
-
-    void Detach() { pipeline_ = nullptr; }
-
-    // Implement VideoRenderer
-    void FrameSizeChange(unsigned int width,
-                         unsigned int height,
-                         unsigned int number_of_streams) override {
-      pipeline_->listener_->FrameSizeChange(width, height, number_of_streams);
-    }
-
-    void RenderVideoFrame(const unsigned char* buffer,
-                          size_t buffer_size,
-                          uint32_t time_stamp,
-                          int64_t render_time,
-                          const ImageHandle& handle) override {
-      pipeline_->listener_->RenderVideoFrame(buffer, buffer_size,
-                                             time_stamp, render_time,
-                                             handle.GetImage());
-    }
-
-    void RenderVideoFrame(const unsigned char* buffer,
-                          size_t buffer_size,
-                          uint32_t y_stride,
-                          uint32_t cbcr_stride,
-                          uint32_t time_stamp,
-                          int64_t render_time,
-                          const ImageHandle& handle) override {
-      pipeline_->listener_->RenderVideoFrame(buffer, buffer_size,
-                                             y_stride, cbcr_stride,
-                                             time_stamp, render_time,
-                                             handle.GetImage());
-    }
-
-   private:
-    MediaPipelineReceiveVideo *pipeline_;  // Raw pointer to avoid cycles
-  };
+  class PipelineRenderer;
+  friend class PipelineRenderer;
 
   // Separate class to allow ref counting
-  class PipelineListener : public GenericReceiveListener {
-   public:
-    PipelineListener(SourceMediaStream * source, TrackID track_id,
-                     bool queue_track);
-
-    // Implement MediaStreamListener
-    void NotifyQueuedTrackChanges(MediaStreamGraph* graph, TrackID tid,
-                                  StreamTime offset,
-                                  uint32_t events,
-                                  const MediaSegment& queued_media,
-                                  MediaStream* input_stream,
-                                  TrackID input_tid) override {}
-    void NotifyPull(MediaStreamGraph* graph, StreamTime desired_time) override;
-
-    // Accessors for external writes from the renderer
-    void FrameSizeChange(unsigned int width,
-                         unsigned int height,
-                         unsigned int number_of_streams) {
-      ReentrantMonitorAutoEnter enter(monitor_);
-
-      width_ = width;
-      height_ = height;
-    }
-
-    void RenderVideoFrame(const unsigned char* buffer,
-                          size_t buffer_size,
-                          uint32_t time_stamp,
-                          int64_t render_time,
-                          const RefPtr<layers::Image>& video_image);
-    void RenderVideoFrame(const unsigned char* buffer,
-                          size_t buffer_size,
-                          uint32_t y_stride,
-                          uint32_t cbcr_stride,
-                          uint32_t time_stamp,
-                          int64_t render_time,
-                          const RefPtr<layers::Image>& video_image);
-
-   private:
-    int width_;
-    int height_;
-#if defined(MOZILLA_INTERNAL_API)
-    RefPtr<layers::ImageContainer> image_container_;
-    RefPtr<layers::Image> image_;
-#endif
-    mozilla::ReentrantMonitor monitor_; // Monitor for processing WebRTC frames.
-                                        // Protects image_ against:
-                                        // - Writing from the GIPS thread
-                                        // - Reading from the MSG thread
-  };
-
-  friend class PipelineRenderer;
+  class PipelineListener;
 
   RefPtr<PipelineRenderer> renderer_;
   RefPtr<PipelineListener> listener_;
 };
 
 
-}  // end namespace
+}  // namespace mozilla
 #endif
--- a/media/webrtc/signaling/src/peerconnection/MediaPipelineFactory.cpp
+++ b/media/webrtc/signaling/src/peerconnection/MediaPipelineFactory.cpp
@@ -4,16 +4,17 @@
 
 #include "logging.h"
 #include "nsIGfxInfo.h"
 #include "nsServiceManagerUtils.h"
 
 #include "PeerConnectionImpl.h"
 #include "PeerConnectionMedia.h"
 #include "MediaPipelineFactory.h"
+#include "MediaPipelineFilter.h"
 #include "transportflow.h"
 #include "transportlayer.h"
 #include "transportlayerdtls.h"
 #include "transportlayerice.h"
 
 #include "signaling/src/jsep/JsepTrack.h"
 #include "signaling/src/jsep/JsepTransport.h"
 #include "signaling/src/common/PtrVector.h"
--- a/media/webrtc/signaling/src/peerconnection/PeerConnectionMedia.cpp
+++ b/media/webrtc/signaling/src/peerconnection/PeerConnectionMedia.cpp
@@ -23,16 +23,19 @@
 #include "signaling/src/jsep/JsepSession.h"
 #include "signaling/src/jsep/JsepTransport.h"
 
 #ifdef USE_FAKE_STREAMS
 #include "DOMMediaStream.h"
 #include "FakeMediaStreams.h"
 #else
 #include "MediaSegment.h"
+#ifdef MOZILLA_INTERNAL_API
+#include "MediaStreamGraph.h"
+#endif
 #endif
 
 #include "nsNetCID.h"
 #include "nsNetUtil.h"
 #include "nsIURI.h"
 #include "nsIScriptSecurityManager.h"
 #include "nsICancelable.h"
 #include "nsIDocument.h"