Bug 1454385 - Add a single producer single consumer lock and wait free queue to mfbt/. r?froydnj draft
authorPaul Adenot <paul@paul.cx>
Fri, 13 Apr 2018 17:14:05 +0200
changeset 800601 37d365b1948acbf4a306a9518fee5d4f10a2164e
parent 798691 d36cd8bdbc5c0df1d1d7a167f5fedb95c3a3648e
push id111420
push userpaul@paul.cx
push dateMon, 28 May 2018 15:53:51 +0000
reviewersfroydnj
bugs1454385
milestone62.0a1
Bug 1454385 - Add a single producer single consumer lock and wait free queue to mfbt/. r?froydnj MozReview-Commit-ID: 6Dq0GQtYgv2
mfbt/SPSCQueue.h
mfbt/moz.build
mfbt/tests/TestSPSCQueue.cpp
mfbt/tests/moz.build
testing/cppunittest.ini
new file mode 100644
--- /dev/null
+++ b/mfbt/SPSCQueue.h
@@ -0,0 +1,428 @@
+/* -*- Mode: C++; tab-width: 8; indent-tabs-mode: nil; c-basic-offset: 2 -*- */
+/* vim: set ts=8 sts=2 et sw=2 tw=80: */
+/* This Source Code Form is subject to the terms of the Mozilla Public
+ * License, v. 2.0. If a copy of the MPL was not distributed with this
+ * file, You can obtain one at http://mozilla.org/MPL/2.0/. */
+
+/* Single producer single consumer lock-free and wait-free queue. */
+
+#ifndef mozilla_LockFreeQueue_h
+#define mozilla_LockFreeQueue_h
+
+#include "mozilla/Assertions.h"
+#include "mozilla/Attributes.h"
+#include "mozilla/PodOperations.h"
+#include <algorithm>
+#include <atomic>
+#include <cstdint>
+#include <memory>
+#include <thread>
+
+namespace mozilla {
+
+namespace details {
+template<typename T, bool IsPod = std::is_trivial<T>::value>
+struct MemoryOperations {
+  /**
+   * This allows zeroing (using memset) or default-constructing a number of
+   * elements calling the constructors if necessary.
+   */
+  static void ConstructDefault(T* aDestination, size_t aCount);
+  /**
+   * This allows either moving (if T supports it) or copying a number of
+   * elements from a `aSource` pointer to a `aDestination` pointer.
+   * If it is safe to do so and this call copies, this uses PodCopy. Otherwise,
+   * constructors and destructors are called in a loop.
+   */
+  static void MoveOrCopy(T* aDestination, T* aSource, size_t aCount);
+};
+
+template<typename T>
+struct MemoryOperations<T, true>
+{
+  static void ConstructDefault(T* aDestination, size_t aCount)
+  {
+    PodZero(aDestination, aCount);
+  }
+  static void MoveOrCopy(T* aDestination, T* aSource, size_t aCount)
+  {
+    PodCopy(aDestination, aSource, aCount);
+  }
+};
+
+template<typename T>
+struct MemoryOperations<T, false>
+{
+  static void ConstructDefault(T* aDestination, size_t aCount)
+  {
+    for (size_t i = 0; i < aCount; i++) {
+      aDestination[i] = T();
+    }
+  }
+  static void MoveOrCopy(T* aDestination, T* aSource, size_t aCount)
+  {
+    std::move(aSource, aSource + aCount, aDestination);
+  }
+};
+}
+
+
+/**
+ * This data structure allows producing data from one thread, and consuming it
+ * on another thread, safely and without explicit synchronization.
+ *
+ * The role for the producer and the consumer must be constant, i.e., the
+ * producer should always be on one thread and the consumer should always be on
+ * another thread.
+ *
+ * Some words about the inner workings of this class:
+ * - Capacity is fixed. Only one allocation is performed, in the constructor.
+ *   When reading and writing, the return value of the method allows checking if
+ *   the ring buffer is empty or full.
+ * - We always keep the read index at least one element ahead of the write
+ *   index, so we can distinguish between an empty and a full ring buffer: an
+ *   empty ring buffer is when the write index is at the same position as the
+ *   read index. A full buffer is when the write index is exactly one position
+ *   before the read index.
+ * - We synchronize updates to the read index after having read the data, and
+ *   the write index after having written the data. This means that the each
+ *   thread can only touch a portion of the buffer that is not touched by the
+ *   other thread.
+ * - Callers are expected to provide buffers. When writing to the queue,
+ *   elements are copied into the internal storage from the buffer passed in.
+ *   When reading from the queue, the user is expected to provide a buffer.
+ *   Because this is a ring buffer, data might not be contiguous in memory;
+ *   providing an external buffer to copy into is an easy way to have linear
+ *   data for further processing.
+ */
+template<typename T>
+class SPSCRingBufferBase
+{
+public:
+  /**
+   * Constructor for a ring buffer.
+   *
+   * This performs an allocation on the heap, but is the only allocation that
+   * will happen for the life time of a `SPSCRingBufferBase`.
+   *
+   * @param Capacity The maximum number of element this ring buffer will hold.
+   */
+  explicit
+  SPSCRingBufferBase(int aCapacity)
+    : mReadIndex(0)
+    , mWriteIndex(0)
+    /* One more element to distinguish from empty and full buffer. */
+    , mCapacity(aCapacity + 1)
+  {
+    MOZ_ASSERT(StorageCapacity() < std::numeric_limits<int>::max() / 2,
+              "buffer too large for the type of index used.");
+    MOZ_ASSERT(mCapacity > 0 && aCapacity != std::numeric_limits<int>::max());
+
+    mData = std::make_unique<T[]>(StorageCapacity());
+
+    std::atomic_thread_fence(std::memory_order::memory_order_seq_cst);
+  }
+  /**
+   * Push `aCount` zero or default constructed elements in the array.
+   *
+   * Only safely called on the producer thread.
+   *
+   * @param count The number of elements to enqueue.
+   * @return The number of element enqueued.
+   */
+  MOZ_MUST_USE
+  int EnqueueDefault(int aCount) {
+    return Enqueue(nullptr, aCount);
+  }
+  /**
+   * @brief Put an element in the queue.
+   *
+   * Only safely called on the producer thread.
+   *
+   * @param element The element to put in the queue.
+   *
+   * @return 1 if the element was inserted, 0 otherwise.
+   */
+  MOZ_MUST_USE
+  int Enqueue(T& aElement) {
+    return Enqueue(&aElement, 1);
+  }
+  /**
+   * Push `aCount` elements in the ring buffer.
+   *
+   * Only safely called on the producer thread.
+   *
+   * @param elements a pointer to a buffer containing at least `count` elements.
+   * If `elements` is nullptr, zero or default constructed elements are enqueud.
+   * @param count The number of elements to read from `elements`
+   * @return The number of elements successfully coped from `elements` and
+   * inserted into the ring buffer.
+   */
+  MOZ_MUST_USE
+  int Enqueue(T* aElements, int aCount)
+  {
+#ifdef DEBUG
+    AssertCorrectThread(mProducerId);
+#endif
+
+    int rdIdx = mReadIndex.load(std::memory_order::memory_order_acquire);
+    int wrIdx = mWriteIndex.load(std::memory_order::memory_order_relaxed);
+
+    if (IsFull(rdIdx, wrIdx)) {
+      return 0;
+    }
+
+    int toWrite = std::min(AvailableWriteInternal(rdIdx, wrIdx), aCount);
+
+    /* First part, from the write index to the end of the array. */
+    int firstPart = std::min(StorageCapacity() - wrIdx, toWrite);
+    /* Second part, from the beginning of the array */
+    int secondPart = toWrite - firstPart;
+
+    if (aElements) {
+      details::MemoryOperations<T>::MoveOrCopy(mData.get() + wrIdx, aElements, firstPart);
+      details::MemoryOperations<T>::MoveOrCopy(mData.get(), aElements + firstPart, secondPart);
+    } else {
+      details::MemoryOperations<T>::ConstructDefault(mData.get() + wrIdx, firstPart);
+      details::MemoryOperations<T>::ConstructDefault(mData.get(), secondPart);
+    }
+
+    mWriteIndex.store(IncrementIndex(wrIdx, toWrite),
+                      std::memory_order::memory_order_release);
+
+    return toWrite;
+  }
+  /**
+   * Retrieve at most `count` elements from the ring buffer, and copy them to
+   * `elements`, if non-null.
+   *
+   * Only safely called on the consumer side.
+   *
+   * @param elements A pointer to a buffer with space for at least `count`
+   * elements. If `elements` is `nullptr`, `count` element will be discarded.
+   * @param count The maximum number of elements to Dequeue.
+   * @return The number of elements written to `elements`.
+   */
+  MOZ_MUST_USE
+  int Dequeue(T* elements, int count)
+  {
+#ifdef DEBUG
+    AssertCorrectThread(mConsumerId);
+#endif
+
+    int wrIdx = mWriteIndex.load(std::memory_order::memory_order_acquire);
+    int rdIdx = mReadIndex.load(std::memory_order::memory_order_relaxed);
+
+    if (IsEmpty(rdIdx, wrIdx)) {
+      return 0;
+    }
+
+    int toRead = std::min(AvailableReadInternal(rdIdx, wrIdx), count);
+
+    int firstPart = std::min(StorageCapacity() - rdIdx, toRead);
+    int secondPart = toRead - firstPart;
+
+    if (elements) {
+      details::MemoryOperations<T>::MoveOrCopy(elements, mData.get() + rdIdx, firstPart);
+      details::MemoryOperations<T>::MoveOrCopy(elements + firstPart, mData.get(), secondPart);
+    }
+
+    mReadIndex.store(IncrementIndex(rdIdx, toRead),
+                     std::memory_order::memory_order_release);
+
+    return toRead;
+  }
+  /**
+   * Get the number of available elements for consuming.
+   *
+   * Only safely called on the consumer thread. This can be less than the actual
+   * number of elements in the queue, since the mWriteIndex is updated at the
+   * very end of the Enqueue method on the producer thread, but consequently
+   * always returns a number of elements such that a call to Dequeue return this
+   * number of elements.
+   *
+   * @return The number of available elements for reading.
+   */
+  int AvailableRead() const
+  {
+#ifdef DEBUG
+    AssertCorrectThread(mConsumerId);
+#endif
+    return AvailableReadInternal(
+      mReadIndex.load(std::memory_order::memory_order_relaxed),
+      mWriteIndex.load(std::memory_order::memory_order_relaxed));
+  }
+  /**
+   * Get the number of available elements for writing.
+   *
+   * Only safely called on the producer thread. This can be less than than the
+   * actual number of slots that are available, because mReadIndex is update at
+   * the very end of the Deque method. It always returns a number such that a
+   * call to Enqueue with this number will succeed in enqueuing this number of
+   * elements.
+   *
+   * @return The number of empty slots in the buffer, available for writing.
+   */
+  int AvailableWrite() const
+  {
+#ifdef DEBUG
+    AssertCorrectThread(mProducerId);
+#endif
+    return AvailableWriteInternal(
+      mReadIndex.load(std::memory_order::memory_order_relaxed),
+      mWriteIndex.load(std::memory_order::memory_order_relaxed));
+  }
+  /**
+   * Get the total Capacity, for this ring buffer.
+   *
+   * Can be called safely on any thread.
+   *
+   * @return The maximum Capacity of this ring buffer.
+   */
+  int Capacity() const { return StorageCapacity() - 1; }
+  /**
+   * Reset the consumer and producer thread identifier, in case the threads are
+   * being changed. This has to be externally synchronized. This is no-op when
+   * asserts are disabled.
+   */
+  void ResetThreadIds()
+  {
+#ifdef DEBUG
+    mConsumerId = mProducerId = std::thread::id();
+#endif
+  }
+private:
+  /** Return true if the ring buffer is empty.
+   *
+   * This can be called from the consumer or the producer thread.
+   *
+   * @param aReadIndex the read index to consider
+   * @param writeIndex the write index to consider
+   * @return true if the ring buffer is empty, false otherwise.
+   **/
+  bool IsEmpty(int aReadIndex, int aWriteIndex) const
+  {
+    return aWriteIndex == aReadIndex;
+  }
+  /** Return true if the ring buffer is full.
+   *
+   * This happens if the write index is exactly one element behind the read
+   * index.
+   *
+   * This can be called from the consummer or the producer thread.
+   *
+   * @param aReadIndex the read index to consider
+   * @param writeIndex the write index to consider
+   * @return true if the ring buffer is full, false otherwise.
+   **/
+  bool IsFull(int aReadIndex, int aWriteIndex) const
+  {
+    return (aWriteIndex + 1) % StorageCapacity() == aReadIndex;
+  }
+  /**
+   * Return the size of the storage. It is one more than the number of elements
+   * that can be stored in the buffer.
+   *
+   * This can be called from any thread.
+   *
+   * @return the number of elements that can be stored in the buffer.
+   */
+  int StorageCapacity() const { return mCapacity; }
+  /**
+   * Returns the number of elements available for reading.
+   *
+   * This can be called from the consummer or producer thread, but see the
+   * comment in `AvailableRead`.
+   *
+   * @return the number of available elements for reading.
+   */
+  int AvailableReadInternal(int aReadIndex, int aWriteIndex) const
+  {
+    if (aWriteIndex >= aReadIndex) {
+      return aWriteIndex - aReadIndex;
+    } else {
+      return aWriteIndex + StorageCapacity() - aReadIndex;
+    }
+  }
+  /**
+   * Returns the number of empty elements, available for writing.
+   *
+   * This can be called from the consummer or producer thread, but see the
+   * comment in `AvailableWrite`.
+   *
+   * @return the number of elements that can be written into the array.
+   */
+  int AvailableWriteInternal(int aReadIndex, int aWriteIndex) const
+  {
+    /* We subtract one element here to always keep at least one sample
+     * free in the buffer, to distinguish between full and empty array. */
+    int rv = aReadIndex - aWriteIndex - 1;
+    if (aWriteIndex >= aReadIndex) {
+      rv += StorageCapacity();
+    }
+    return rv;
+  }
+  /**
+   * Increments an index, wrapping it around the storage.
+   *
+   * Incrementing `mWriteIndex` can be done on the producer thread.
+   * Incrementing `mReadIndex` can be done on the consummer thread.
+   *
+   * @param index a reference to the index to increment.
+   * @param increment the number by which `index` is incremented.
+   * @return the new index.
+   */
+  int IncrementIndex(int aIndex, int aIncrement) const
+  {
+    MOZ_ASSERT(aIncrement >= 0 &&
+               aIncrement < StorageCapacity() &&
+               aIndex < StorageCapacity());
+    return (aIndex + aIncrement) % StorageCapacity();
+  }
+  /**
+   * @brief This allows checking that Enqueue (resp. Dequeue) are always
+   * called by the right thread.
+   *
+   * The role of the thread are assigned the first time they call Enqueue or
+   * Dequeue, and cannot change, except when ResetThreadIds is called..
+   *
+   * @param id the id of the thread that has called the calling method first.
+   */
+#ifdef DEBUG
+  static void AssertCorrectThread(std::thread::id& aId)
+  {
+    if (aId == std::thread::id()) {
+      aId = std::this_thread::get_id();
+      return;
+    }
+    MOZ_ASSERT(aId == std::this_thread::get_id());
+  }
+#endif
+  /** Index at which the oldest element is. */
+  std::atomic<int> mReadIndex;
+  /** Index at which to write new elements. `mWriteIndex` is always at
+   * least one element ahead of `mReadIndex`. */
+  std::atomic<int> mWriteIndex;
+  /** Maximum number of elements that can be stored in the ring buffer. */
+  const int mCapacity;
+  /** Data storage, of size `mCapacity + 1` */
+  std::unique_ptr<T[]> mData;
+#ifdef DEBUG
+  /** The id of the only thread that is allowed to read from the queue. */
+  mutable std::thread::id mConsumerId;
+  /** The id of the only thread that is allowed to write from the queue. */
+  mutable std::thread::id mProducerId;
+#endif
+};
+
+/**
+ * Instantiation of the `SPSCRingBufferBase` type. This is safe to use
+ * from two threads, one producer, one consumer (that never change role),
+ * without explicit synchronization.
+ */
+template<typename T>
+using SPSCQueue = SPSCRingBufferBase<T>;
+
+} // namespace mozilla
+
+#endif // mozilla_LockFreeQueue_h
--- a/mfbt/moz.build
+++ b/mfbt/moz.build
@@ -83,16 +83,17 @@ EXPORTS.mozilla = [
     'ScopeExit.h',
     'SegmentedVector.h',
     'SHA1.h',
     'SharedLibrary.h',
     'SmallPointerArray.h',
     'Span.h',
     'SplayTree.h',
     'Sprintf.h',
+    'SPSCQueue.h',
     'StaticAnalysisFunctions.h',
     'TaggedAnonymousMemory.h',
     'TemplateLib.h',
     'TextUtils.h',
     'ThreadLocal.h',
     'ThreadSafeWeakPtr.h',
     'ToString.h',
     'Tuple.h',
new file mode 100644
--- /dev/null
+++ b/mfbt/tests/TestSPSCQueue.cpp
@@ -0,0 +1,251 @@
+/* -*- Mode: C++; tab-width: 8; indent-tabs-mode: nil; c-basic-offset: 2 -*- */
+/* vim: set ts=8 sts=2 et sw=2 tw=80: */
+/* This Source Code Form is subject to the terms of the Mozilla Public
+ * License, v. 2.0. If a copy of the MPL was not distributed with this
+ * file, You can obtain one at http://mozilla.org/MPL/2.0/. */
+
+#include "mozilla/SPSCQueue.h"
+#include "mozilla/PodOperations.h"
+#include <vector>
+#include <iostream>
+#include <thread>
+#include <chrono>
+#include <memory>
+#include <string>
+
+using namespace mozilla;
+
+/* Generate a monotonically increasing sequence of numbers. */
+template<typename T>
+class SequenceGenerator
+{
+public:
+  SequenceGenerator()
+  { }
+  void Get(T * aElements, size_t aCount)
+  {
+    for (size_t i = 0; i < aCount; i++) {
+      aElements[i] = static_cast<T>(mIndex);
+      mIndex++;
+    }
+  }
+  void Rewind(size_t aCount)
+  {
+    mIndex -= aCount;
+  }
+private:
+  size_t mIndex = 0;
+};
+
+/* Checks that a sequence is monotonically increasing. */
+template<typename T>
+class SequenceVerifier
+{
+public:
+  SequenceVerifier()
+  { }
+  void Check(T * aElements, size_t aCount)
+  {
+    for (size_t i = 0; i < aCount; i++) {
+      if (aElements[i] != static_cast<T>(mIndex)) {
+        std::cerr << "Element " << i << " is different. Expected "
+          << static_cast<T>(mIndex) << ", got " << aElements[i]
+          << "." << std::endl;
+        MOZ_RELEASE_ASSERT(false);
+      }
+      mIndex++;
+    }
+  }
+private:
+  size_t mIndex = 0;
+};
+
+const int BLOCK_SIZE = 127;
+
+template<typename T>
+void TestRing(int capacity)
+{
+  SPSCQueue<T> buf(capacity);
+  std::unique_ptr<T[]> seq(new T[capacity]);
+  SequenceGenerator<T> gen;
+  SequenceVerifier<T> checker;
+
+  int iterations = 1002;
+
+  while(iterations--) {
+    gen.Get(seq.get(), BLOCK_SIZE);
+    int rv = buf.Enqueue(seq.get(), BLOCK_SIZE);
+    MOZ_RELEASE_ASSERT(rv == BLOCK_SIZE);
+    PodZero(seq.get(), BLOCK_SIZE);
+    rv = buf.Dequeue(seq.get(), BLOCK_SIZE);
+    MOZ_RELEASE_ASSERT(rv == BLOCK_SIZE);
+    checker.Check(seq.get(), BLOCK_SIZE);
+  }
+}
+
+template<typename T>
+void TestRingMultiThread(int capacity)
+{
+  SPSCQueue<T> buf(capacity);
+  SequenceVerifier<T> checker;
+  std::unique_ptr<T[]> outBuffer(new T[capacity]);
+
+  std::thread t([&buf, capacity] {
+    int iterations = 1002;
+    std::unique_ptr<T[]> inBuffer(new T[capacity]);
+    SequenceGenerator<T> gen;
+
+    while(iterations--) {
+      std::this_thread::sleep_for(std::chrono::microseconds(10));
+      gen.Get(inBuffer.get(), BLOCK_SIZE);
+      int rv = buf.Enqueue(inBuffer.get(), BLOCK_SIZE);
+      MOZ_RELEASE_ASSERT(rv <= BLOCK_SIZE);
+      if (rv != BLOCK_SIZE) {
+        gen.Rewind(BLOCK_SIZE - rv);
+      }
+    }
+  });
+
+  int remaining = 1002;
+
+  while(remaining--) {
+    std::this_thread::sleep_for(std::chrono::microseconds(10));
+    int rv = buf.Dequeue(outBuffer.get(), BLOCK_SIZE);
+    MOZ_RELEASE_ASSERT(rv <= BLOCK_SIZE);
+    checker.Check(outBuffer.get(), rv);
+  }
+
+  t.join();
+}
+
+template<typename T>
+void BasicAPITest(T& ring)
+{
+  MOZ_RELEASE_ASSERT(ring.Capacity() == 128);
+
+  MOZ_RELEASE_ASSERT(ring.AvailableRead() == 0);
+  MOZ_RELEASE_ASSERT(ring.AvailableWrite() == 128);
+
+  int rv = ring.EnqueueDefault(63);
+
+  MOZ_RELEASE_ASSERT(rv == 63);
+  MOZ_RELEASE_ASSERT(ring.AvailableRead() == 63);
+  MOZ_RELEASE_ASSERT(ring.AvailableWrite() == 65);
+
+  rv = ring.EnqueueDefault(65);
+
+  MOZ_RELEASE_ASSERT(rv == 65);
+  MOZ_RELEASE_ASSERT(ring.AvailableRead() == 128);
+  MOZ_RELEASE_ASSERT(ring.AvailableWrite() == 0);
+
+  rv = ring.Dequeue(nullptr, 63);
+
+  MOZ_RELEASE_ASSERT(ring.AvailableRead() == 65);
+  MOZ_RELEASE_ASSERT(ring.AvailableWrite() == 63);
+
+  rv = ring.Dequeue(nullptr, 65);
+
+  MOZ_RELEASE_ASSERT(ring.AvailableRead() == 0);
+  MOZ_RELEASE_ASSERT(ring.AvailableWrite() == 128);
+}
+
+const size_t RING_BUFFER_SIZE = 128;
+const size_t ENQUEUE_SIZE = RING_BUFFER_SIZE / 2;
+
+void TestResetAPI() {
+  SPSCQueue<float> ring(RING_BUFFER_SIZE);
+  std::thread t([&ring] {
+    std::unique_ptr<float[]> inBuffer(new float[ENQUEUE_SIZE]);
+    int rv = ring.Enqueue(inBuffer.get(), ENQUEUE_SIZE);
+    MOZ_RELEASE_ASSERT(rv > 0);
+  });
+
+  t.join();
+
+  ring.ResetThreadIds();
+
+  // Enqueue with a different thread. We have reset the thread ID
+  // in the ring buffer, this should work.
+  std::thread t2([&ring] {
+    std::unique_ptr<float[]> inBuffer(new float[ENQUEUE_SIZE]);
+    int rv = ring.Enqueue(inBuffer.get(), ENQUEUE_SIZE);
+    MOZ_RELEASE_ASSERT(rv > 0);
+  });
+
+  t2.join();
+}
+
+void
+TestMove()
+{
+  const size_t ELEMENT_COUNT = 16;
+  struct Thing {
+    Thing()
+      : mStr("")
+    { }
+    explicit
+    Thing(const std::string& aStr)
+      :mStr(aStr)
+    { }
+    Thing(Thing&& aOtherThing)
+    {
+      mStr = std::move(aOtherThing.mStr);
+      // aOtherThing.mStr.clear();
+    }
+    Thing& operator=(Thing&& aOtherThing)
+    {
+      mStr = std::move(aOtherThing.mStr);
+      return *this;
+    }
+    std::string mStr;
+  };
+
+  std::vector<Thing> vec_in;
+  std::vector<Thing> vec_out;
+
+  for (uint32_t i = 0; i < ELEMENT_COUNT; i++) {
+    vec_in.push_back(Thing(std::to_string(i)));
+    vec_out.push_back(Thing());
+  }
+
+  SPSCQueue<Thing> queue(ELEMENT_COUNT);
+
+  int rv = queue.Enqueue(&vec_in[0], ELEMENT_COUNT);
+  MOZ_RELEASE_ASSERT(rv == ELEMENT_COUNT);
+
+  // Check that we've moved the std::string into the queue.
+  for (uint32_t i = 0; i < ELEMENT_COUNT; i++) {
+    MOZ_RELEASE_ASSERT(vec_in[i].mStr.empty());
+  }
+
+  rv = queue.Dequeue(&vec_out[0], ELEMENT_COUNT);
+  MOZ_RELEASE_ASSERT(rv == ELEMENT_COUNT);
+
+  for (uint32_t i = 0; i < ELEMENT_COUNT; i++) {
+    MOZ_RELEASE_ASSERT(std::stoul(vec_out[i].mStr) == i);
+  }
+}
+
+int main()
+{
+  const int minCapacity = 199;
+  const int maxCapacity = 1277;
+  const int capacityIncrement = 27;
+
+  SPSCQueue<float> q1(128);
+  BasicAPITest(q1);
+  SPSCQueue<char> q2(128);
+  BasicAPITest(q2);
+
+  for (uint32_t i = minCapacity; i < maxCapacity; i+=capacityIncrement) {
+    TestRing<uint32_t>(i);
+    TestRingMultiThread<uint32_t>(i);
+    TestRing<float>(i);
+    TestRingMultiThread<float>(i);
+  }
+
+  TestResetAPI();
+  TestMove();
+
+  return 0;
+}
--- a/mfbt/tests/moz.build
+++ b/mfbt/tests/moz.build
@@ -46,16 +46,17 @@ CppUnitTests([
     'TestResult',
     'TestRollingMean',
     'TestSaturate',
     'TestScopeExit',
     'TestSegmentedVector',
     'TestSHA1',
     'TestSmallPointerArray',
     'TestSplayTree',
+    'TestSPSCQueue',
     'TestTemplateLib',
     'TestTextUtils',
     'TestThreadSafeWeakPtr',
     'TestTuple',
     'TestTypedEnum',
     'TestTypeTraits',
     'TestUniquePtr',
     'TestVariant',
--- a/testing/cppunittest.ini
+++ b/testing/cppunittest.ini
@@ -37,16 +37,18 @@ skip-if = os == 'android' # Bug 1147630
 [TestRefPtr]
 [TestRollingMean]
 [TestScopeExit]
 [TestSegmentedVector]
 [TestSHA1]
 [TestSmallPointerArray]
 [TestSaturate]
 [TestSplayTree]
+[TestSPSCQueue]
+skip-if = os == 'linux' # Bug 1464084
 [TestSyncRunnable]
 [TestTemplateLib]
 [TestTuple]
 [TestTypeTraits]
 [TestTypedEnum]
 [TestUniquePtr]
 [TestVariant]
 [TestVector]