Bug 1399760. P3 - keep ID of the loading channel so we check whether the data callback is from an old channel.
The load ID works as follows:
1. A load ID is passed to MediaCacheStream::NotifyDataStarted()
when loading a new channel.
2. Each MediaCacheStream::NotifyDataReceived() call is also associated
with a load ID from which the data is received.
3. If |mLoadID != aLoadID| tests to be true in NotifyDataReceived(), it means
the data is from an old channel and should be discarded.
4. MediaCache::Update() reset mLoadID for the stream before calling
CacheClientSeek() to prevent data from the old channel from being
stored to the wrong position.
MozReview-Commit-ID: 9kBoublLlln
--- a/dom/media/MediaCache.cpp
+++ b/dom/media/MediaCache.cpp
@@ -1382,16 +1382,20 @@ MediaCache::Update()
NS_ASSERTION(stream->mIsTransportSeekable || desiredOffset == 0,
"Trying to seek in a non-seekable stream!");
// Round seek offset down to the start of the block. This is essential
// because we don't want to think we have part of a block already
// in mPartialBlockBuffer.
stream->mChannelOffset =
OffsetToBlockIndexUnchecked(desiredOffset) * BLOCK_SIZE;
actions[i] = stream->mCacheSuspended ? SEEK_AND_RESUME : SEEK;
+ // mChannelOffset is updated to a new position. We don't want data from
+ // the old channel to be written to the wrong position. 0 is a sentinel
+ // value which will not match any ID passed to NotifyDataReceived().
+ stream->mLoadID = 0;
} else if (enableReading && stream->mCacheSuspended) {
actions[i] = RESUME;
} else if (!enableReading && !stream->mCacheSuspended) {
actions[i] = SUSPEND;
}
}
#ifdef DEBUG
mInUpdate = false;
@@ -1849,67 +1853,88 @@ MediaCache::NoteSeek(MediaCacheStream* a
}
}
}
void
MediaCacheStream::NotifyDataLength(int64_t aLength)
{
NS_ASSERTION(NS_IsMainThread(), "Only call on main thread");
+ LOG("Stream %p DataLength: %" PRId64, this, aLength);
ReentrantMonitorAutoEnter mon(mMediaCache->GetReentrantMonitor());
mStreamLength = aLength;
}
void
-MediaCacheStream::NotifyDataStarted(int64_t aOffset)
+MediaCacheStream::NotifyDataStarted(uint32_t aLoadID, int64_t aOffset)
{
NS_ASSERTION(NS_IsMainThread(), "Only call on main thread");
+ MOZ_ASSERT(aLoadID > 0);
+ LOG("Stream %p DataStarted: %" PRId64 " aLoadID=%u", this, aOffset, aLoadID);
ReentrantMonitorAutoEnter mon(mMediaCache->GetReentrantMonitor());
NS_WARNING_ASSERTION(aOffset == mChannelOffset,
"Server is giving us unexpected offset");
MOZ_ASSERT(aOffset >= 0);
mChannelOffset = aOffset;
if (mStreamLength >= 0) {
// If we started reading at a certain offset, then for sure
// the stream is at least that long.
mStreamLength = std::max(mStreamLength, mChannelOffset);
}
+ mLoadID = aLoadID;
}
void
MediaCacheStream::UpdatePrincipal(nsIPrincipal* aPrincipal)
{
MOZ_ASSERT(NS_IsMainThread());
MediaCache::ResourceStreamIterator iter(mMediaCache, mResourceID);
while (MediaCacheStream* stream = iter.Next()) {
if (nsContentUtils::CombineResourcePrincipals(&stream->mPrincipal,
aPrincipal)) {
stream->mClient->CacheClientNotifyPrincipalChanged();
}
}
}
void
-MediaCacheStream::NotifyDataReceived(int64_t aSize, const char* aData)
+MediaCacheStream::NotifyDataReceived(uint32_t aLoadID,
+ int64_t aSize,
+ const char* aData)
{
+ MOZ_ASSERT(aLoadID > 0);
// This might happen off the main thread.
// It is safe to read mClosed without holding the monitor because this
// function is guaranteed to happen before Close().
MOZ_DIAGNOSTIC_ASSERT(!mClosed);
ReentrantMonitorAutoEnter mon(mMediaCache->GetReentrantMonitor());
+ LOG("Stream %p DataReceived at %" PRId64 " count=%" PRId64 " aLoadID=%u",
+ this,
+ mChannelOffset,
+ aSize,
+ aLoadID);
+
+ // TODO: For now NotifyDataReceived() always runs on the main thread. This
+ // assertion is to make sure our load ID algorithm doesn't go wrong. Remove it
+ // when OMT data delievery is enabled.
+ MOZ_DIAGNOSTIC_ASSERT(mLoadID == aLoadID);
+
+ if (mLoadID != aLoadID) {
+ // mChannelOffset is updated to a new position when loading a new channel.
+ // We should discard the data coming from the old channel so it won't be
+ // stored to the wrong positoin.
+ return;
+ }
int64_t size = aSize;
const char* data = aData;
- LOG("Stream %p DataReceived at %" PRId64 " count=%" PRId64,
- this, mChannelOffset, aSize);
-
// We process the data one block (or part of a block) at a time
while (size > 0) {
uint32_t blockIndex = OffsetToBlockIndexUnchecked(mChannelOffset);
int32_t blockOffset = int32_t(mChannelOffset - blockIndex*BLOCK_SIZE);
int32_t chunkSize = std::min<int64_t>(BLOCK_SIZE - blockOffset, size);
if (blockOffset == 0) {
// We've just started filling this buffer so now is a good time
--- a/dom/media/MediaCache.h
+++ b/dom/media/MediaCache.h
@@ -251,23 +251,23 @@ public:
void NotifyDataLength(int64_t aLength);
// Notifies the cache that a load has begun. We pass the offset
// because in some cases the offset might not be what the cache
// requested. In particular we might unexpectedly start providing
// data at offset 0. This need not be called if the offset is the
// offset that the cache requested in
// ChannelMediaResource::CacheClientSeek. This can be called at any
// time by the client, not just after a CacheClientSeek.
- void NotifyDataStarted(int64_t aOffset);
+ void NotifyDataStarted(uint32_t aLoadID, int64_t aOffset);
// Notifies the cache that data has been received. The stream already
// knows the offset because data is received in sequence and
// the starting offset is known via NotifyDataStarted or because
// the cache requested the offset in
// ChannelMediaResource::CacheClientSeek, or because it defaulted to 0.
- void NotifyDataReceived(int64_t aSize, const char* aData);
+ void NotifyDataReceived(uint32_t aLoadID, int64_t aSize, const char* aData);
// Notifies the cache that the current bytes should be written to disk.
// Called on the main thread.
void FlushPartialBlock();
// Notifies the cache that the channel has closed with the given status.
void NotifyDataEnded(nsresult aStatus);
// Notifies the stream that the channel is reopened. The stream should
// reset variables such as |mDidNotifyDataEnded|.
@@ -489,16 +489,19 @@ private:
uint32_t mPinCount;
// The status used when we did CacheClientNotifyDataEnded. Only valid
// when mDidNotifyDataEnded is true.
nsresult mNotifyDataEndedStatus;
// The last reported read mode
ReadMode mCurrentMode;
// True if some data in mPartialBlockBuffer has been read as metadata
bool mMetadataInPartialBlockBuffer;
+ // The load ID of the current channel. Used to check whether the data is
+ // coming from an old channel and should be discarded.
+ uint32_t mLoadID = 0;
// The following field is protected by the cache's monitor but are
// only written on the main thread.
// Data received for the block containing mChannelOffset. Data needs
// to wait here so we can write back a complete block. The first
// mChannelOffset%BLOCK_SIZE bytes have been filled in with good data,
// the rest are garbage.
--- a/dom/media/MediaResource.cpp
+++ b/dom/media/MediaResource.cpp
@@ -141,17 +141,17 @@ nsresult
ChannelMediaResource::Listener::OnDataAvailable(nsIRequest* aRequest,
nsISupports* aContext,
nsIInputStream* aStream,
uint64_t aOffset,
uint32_t aCount)
{
// This might happen off the main thread.
MOZ_DIAGNOSTIC_ASSERT(mResource);
- return mResource->OnDataAvailable(aRequest, aStream, aCount);
+ return mResource->OnDataAvailable(mLoadID, aStream, aCount);
}
nsresult
ChannelMediaResource::Listener::AsyncOnChannelRedirect(
nsIChannel* aOld,
nsIChannel* aNew,
uint32_t aFlags,
nsIAsyncVerifyRedirectCallback* cb)
@@ -313,17 +313,17 @@ ChannelMediaResource::OnStartRequest(nsI
// and the server isn't sending Accept-Ranges:bytes then we don't
// support seeking. We also can't seek in compressed streams.
seekable = !isCompressed && acceptsRanges;
} else {
// Not an HTTP channel. Assume data will be sent from position zero.
startOffset = 0;
}
- mCacheStream.NotifyDataStarted(startOffset);
+ mCacheStream.NotifyDataStarted(mLoadID, startOffset);
mCacheStream.SetTransportSeekable(seekable);
mChannelStatistics.Start();
mReopenOnError = false;
mSuspendAgent.UpdateSuspendedStatusIfNeeded();
// Fires an initial progress event.
owner->DownloadProgressed();
@@ -440,52 +440,56 @@ ChannelMediaResource::OnChannelRedirect(
{
mChannel = aNew;
mSuspendAgent.NotifyChannelOpened(mChannel);
return SetupChannelHeaders(aOffset);
}
nsresult
ChannelMediaResource::CopySegmentToCache(nsIInputStream* aInStream,
- void* aResource,
+ void* aClosure,
const char* aFromSegment,
uint32_t aToOffset,
uint32_t aCount,
uint32_t* aWriteCount)
{
- ChannelMediaResource* res = static_cast<ChannelMediaResource*>(aResource);
- res->mCacheStream.NotifyDataReceived(aCount, aFromSegment);
+ Closure* closure = static_cast<Closure*>(aClosure);
+ closure->mResource->mCacheStream.NotifyDataReceived(
+ closure->mLoadID, aCount, aFromSegment);
*aWriteCount = aCount;
return NS_OK;
}
nsresult
-ChannelMediaResource::OnDataAvailable(nsIRequest* aRequest,
+ChannelMediaResource::OnDataAvailable(uint32_t aLoadID,
nsIInputStream* aStream,
uint32_t aCount)
{
// This might happen off the main thread.
- NS_ASSERTION(mChannel.get() == aRequest, "Wrong channel!");
+ // Don't assert |mChannel.get() == aRequest| since reading mChannel here off
+ // the main thread is a data race.
// Update principals before putting the data in the cache. This is important,
// we want to make sure all principals are updated before any consumer can see
// the new data.
// TODO: Handle the case where OnDataAvailable() runs off the main thread.
UpdatePrincipal();
RefPtr<ChannelMediaResource> self = this;
nsCOMPtr<nsIRunnable> r = NS_NewRunnableFunction(
"ChannelMediaResource::OnDataAvailable",
[self, aCount]() { self->mChannelStatistics.AddBytes(aCount); });
mCallback->AbstractMainThread()->Dispatch(r.forget());
+ Closure closure{ aLoadID, this };
uint32_t count = aCount;
while (count > 0) {
uint32_t read;
- nsresult rv = aStream->ReadSegments(CopySegmentToCache, this, count, &read);
+ nsresult rv =
+ aStream->ReadSegments(CopySegmentToCache, &closure, count, &read);
if (NS_FAILED(rv))
return rv;
NS_ASSERTION(read > 0, "Read 0 bytes while data was available?");
count -= read;
}
return NS_OK;
}
@@ -506,30 +510,30 @@ ChannelMediaResource::Open(nsIStreamList
}
nsresult rv = mCacheStream.Init(cl);
if (NS_FAILED(rv)) {
return rv;
}
MOZ_ASSERT(GetOffset() == 0, "Who set offset already?");
- mListener = new Listener(this, 0);
+ mListener = new Listener(this, 0, ++mLoadID);
*aStreamListener = mListener;
NS_ADDREF(*aStreamListener);
return NS_OK;
}
nsresult
ChannelMediaResource::OpenChannel(int64_t aOffset)
{
MOZ_ASSERT(NS_IsMainThread());
MOZ_ASSERT(mChannel);
MOZ_ASSERT(!mListener, "Listener should have been removed by now");
- mListener = new Listener(this, aOffset);
+ mListener = new Listener(this, aOffset, ++mLoadID);
nsresult rv = mChannel->SetNotificationCallbacks(mListener.get());
NS_ENSURE_SUCCESS(rv, rv);
rv = SetupChannelHeaders(aOffset);
NS_ENSURE_SUCCESS(rv, rv);
rv = mChannel->AsyncOpen2(mListener);
NS_ENSURE_SUCCESS(rv, rv);
--- a/dom/media/MediaResource.h
+++ b/dom/media/MediaResource.h
@@ -503,44 +503,46 @@ public:
class Listener final
: public nsIStreamListener
, public nsIInterfaceRequestor
, public nsIChannelEventSink
, public nsIThreadRetargetableStreamListener
{
~Listener() {}
public:
- Listener(ChannelMediaResource* aResource, int64_t aOffset)
+ Listener(ChannelMediaResource* aResource, int64_t aOffset, uint32_t aLoadID)
: mResource(aResource)
, mOffset(aOffset)
+ , mLoadID(aLoadID)
{}
NS_DECL_ISUPPORTS
NS_DECL_NSIREQUESTOBSERVER
NS_DECL_NSISTREAMLISTENER
NS_DECL_NSICHANNELEVENTSINK
NS_DECL_NSIINTERFACEREQUESTOR
NS_DECL_NSITHREADRETARGETABLESTREAMLISTENER
void Revoke() { mResource = nullptr; }
private:
RefPtr<ChannelMediaResource> mResource;
const int64_t mOffset;
+ const uint32_t mLoadID;
};
friend class Listener;
nsresult GetCachedRanges(MediaByteRangeSet& aRanges) override;
protected:
bool IsSuspendedByCache();
// These are called on the main thread by Listener.
nsresult OnStartRequest(nsIRequest* aRequest, int64_t aRequestOffset);
nsresult OnStopRequest(nsIRequest* aRequest, nsresult aStatus);
- nsresult OnDataAvailable(nsIRequest* aRequest,
+ nsresult OnDataAvailable(uint32_t aLoadID,
nsIInputStream* aStream,
uint32_t aCount);
nsresult OnChannelRedirect(nsIChannel* aOld,
nsIChannel* aNew,
uint32_t aFlags,
int64_t aOffset);
// Opens the channel, using an HTTP byte range request to start at aOffset
@@ -559,25 +561,33 @@ protected:
// Parses 'Content-Range' header and returns results via parameters.
// Returns error if header is not available, values are not parse-able or
// values are out of range.
nsresult ParseContentRangeHeader(nsIHttpChannel * aHttpChan,
int64_t& aRangeStart,
int64_t& aRangeEnd,
int64_t& aRangeTotal);
+ struct Closure
+ {
+ uint32_t mLoadID;
+ ChannelMediaResource* mResource;
+ };
+
static nsresult CopySegmentToCache(nsIInputStream* aInStream,
- void* aResource,
+ void* aClosure,
const char* aFromSegment,
uint32_t aToOffset,
uint32_t aCount,
uint32_t* aWriteCount);
// Main thread access only
RefPtr<Listener> mListener;
+ // A mono-increasing integer to uniquely identify the channel we are loading.
+ uint32_t mLoadID = 0;
// When this flag is set, if we get a network error we should silently
// reopen the stream.
bool mReopenOnError;
// Any thread access
MediaCacheStream mCacheStream;
MediaChannelStatistics mChannelStatistics;