Bug 1428952 - P3: Update to version 0.2.0 of AudioIPC. r?kinetik draft
authorDan Glastonbury <dan.glastonbury@gmail.com>
Fri, 03 Nov 2017 12:48:20 +1000
changeset 723029 7d43e2d2cc04e67ba6250a1f42a6bbb8ab854628
parent 723028 73b54ebac44a8ea59ad20c27e05425a8994f2d15
child 723030 d350b7a90cc3019601895dca02586417a87df8ca
push id96297
push userbmo:dglastonbury@mozilla.com
push dateMon, 22 Jan 2018 05:21:01 +0000
reviewerskinetik
bugs1428952
milestone59.0a1
Bug 1428952 - P3: Update to version 0.2.0 of AudioIPC. r?kinetik MozReview-Commit-ID: As6AdntcOog
media/audioipc/README_MOZILLA
media/audioipc/audioipc/Cargo.toml
media/audioipc/audioipc/src/async.rs
media/audioipc/audioipc/src/cmsg.rs
media/audioipc/audioipc/src/codec.rs
media/audioipc/audioipc/src/connection.rs
media/audioipc/audioipc/src/core.rs
media/audioipc/audioipc/src/fd_passing.rs
media/audioipc/audioipc/src/frame.rs
media/audioipc/audioipc/src/lib.rs
media/audioipc/audioipc/src/messages.rs
media/audioipc/audioipc/src/msg.rs
media/audioipc/audioipc/src/rpc/client/mod.rs
media/audioipc/audioipc/src/rpc/client/proxy.rs
media/audioipc/audioipc/src/rpc/driver.rs
media/audioipc/audioipc/src/rpc/mod.rs
media/audioipc/audioipc/src/rpc/server.rs
media/audioipc/audioipc/src/shm.rs
media/audioipc/client/Cargo.toml
media/audioipc/client/src/context.rs
media/audioipc/client/src/lib.rs
media/audioipc/client/src/send_recv.rs
media/audioipc/client/src/stream.rs
media/audioipc/gecko.patch
media/audioipc/server/Cargo.toml
media/audioipc/server/src/channel.rs
media/audioipc/server/src/lib.rs
--- a/media/audioipc/README_MOZILLA
+++ b/media/audioipc/README_MOZILLA
@@ -1,8 +1,8 @@
 The source from this directory was copied from the audioipc-2
 git repository using the update.sh script.  The only changes
 made were those applied by update.sh and the addition of
 Makefile.in build files for the Mozilla build system.
 
 The audioipc-2 git repository is: https://github.com/djg/audioipc-2.git
 
-The git commit ID used was 864332fde124761da15c444e58889faf598a219f (2017-10-30 10:55:41 +1300)
+The git commit ID used was 71cc67f44b803e0288997247c060375196f1cf9b (2018-01-19 17:16:33 +1000)
--- a/media/audioipc/audioipc/Cargo.toml
+++ b/media/audioipc/audioipc/Cargo.toml
@@ -1,24 +1,29 @@
 [package]
 name = "audioipc"
-version = "0.1.0"
+version = "0.2.0"
 authors = [
         "Matthew Gregan <kinetik@flim.org>",
         "Dan Glastonbury <dan.glastonbury@gmail.com>"
         ]
 description = "Remote Cubeb IPC"
 
 [dependencies]
 cubeb-core = { path = "../../cubeb-rs/cubeb-core" }
 bincode = "0.8"
 bytes = "0.4"
+# rayon-core in Gecko uses futures 0.1.13
+futures = "=0.1.13"
+iovec = "0.1"
 libc = "0.2"
 log = "^0.3.6"
 memmap = "0.5.2"
-mio = "0.6.7"
-mio-uds = "0.6.4"
+scoped-tls = "0.1"
 serde = "1.*.*"
 serde_derive = "1.*.*"
+tokio-core = "0.1"
+tokio-io = "0.1"
+tokio-uds = "0.1.7"
 
 [dependencies.error-chain]
 version = "0.11.0"
-default-features = false
+default-features = false
\ No newline at end of file
--- a/media/audioipc/audioipc/src/async.rs
+++ b/media/audioipc/audioipc/src/async.rs
@@ -1,153 +1,174 @@
 // Copyright © 2017 Mozilla Foundation
 //
 // This program is made available under an ISC-style license.  See the
 // accompanying file LICENSE for details
 
 //! Various async helpers modelled after futures-rs and tokio-io.
 
-use {RecvFd, SendFd};
+use {RecvMsg, SendMsg};
 use bytes::{Buf, BufMut};
-use mio_uds;
-use std::io as std_io;
-use std::os::unix::io::RawFd;
-use std::os::unix::net;
-
-/// A convenience macro for working with `io::Result<T>` from the
-/// `std::io::Read` and `std::io::Write` traits.
-///
-/// This macro takes `io::Result<T>` as input, and returns `T` as the output. If
-/// the input type is of the `Err` variant, then `Async::NotReady` is returned if
-/// it indicates `WouldBlock` or otherwise `Err` is returned.
-#[macro_export]
-macro_rules! try_nb {
-    ($e:expr) => (match $e {
-        Ok(t) => t,
-        Err(ref e) if e.kind() == ::std::io::ErrorKind::WouldBlock => {
-            return Ok(Async::NotReady)
-        }
-        Err(e) => return Err(e.into()),
-    })
-}
-
-/////////////////////////////////////////////////////////////////////////////////////////
-// Async support - Handle EWOULDBLOCK/EAGAIN from non-blocking I/O operations.
-
-/// Return type for async methods, indicates whether the operation was
-/// ready or not.
-///
-/// * `Ok(Async::Ready(t))` means that the operation has completed successfully.
-/// * `Ok(Async::NotReady)` means that the underlying system is not ready to handle operation.
-/// * `Err(e)` means that the operation has completed with the given error `e`.
-pub type AsyncResult<T, E> = Result<Async<T>, E>;
+use futures::{Async, Poll};
+use iovec::IoVec;
+use std::io;
+use tokio_io::{AsyncRead, AsyncWrite};
+use tokio_uds::UnixStream;
 
-#[derive(Copy, Clone, Debug, PartialEq)]
-pub enum Async<T> {
-    /// Represents that a value is immediately ready.
-    Ready(T),
-    /// Represents that a value is not ready yet, but may be so later.
-    NotReady
-}
-
-impl<T> Async<T> {
-    pub fn is_ready(&self) -> bool {
-        match *self {
-            Async::Ready(_) => true,
-            Async::NotReady => false,
-        }
-    }
-
-    pub fn is_not_ready(&self) -> bool {
-        !self.is_ready()
-    }
-}
-
-/// Return type for an async attempt to send a value.
-///
-/// * `Ok(AsyncSend::Ready)` means that the operation has completed successfully.
-/// * `Ok(AsyncSend::NotReady(t))` means that the underlying system is not ready to handle
-///    send. returns the value that tried to be sent in `t`.
-/// * `Err(e)` means that operation has completed with the given error `e`.
-pub type AsyncSendResult<T, E> = Result<AsyncSend<T>, E>;
-
-#[derive(Copy, Clone, Debug, PartialEq)]
-pub enum AsyncSend<T> {
-    Ready,
-    NotReady(T)
-}
-
-pub trait AsyncRecvFd: RecvFd {
-    unsafe fn prepare_uninitialized_buffer(&self, bytes: &mut [u8]) -> bool {
-        for byte in bytes.iter_mut() {
-            *byte = 0;
-        }
-
-        true
-    }
-
+pub trait AsyncRecvMsg: AsyncRead {
     /// Pull some bytes from this source into the specified `Buf`, returning
     /// how many bytes were read.
     ///
     /// The `buf` provided will have bytes read into it and the internal cursor
     /// will be advanced if any bytes were read. Note that this method typically
     /// will not reallocate the buffer provided.
-    fn recv_buf_fd<B>(&mut self, buf: &mut B) -> AsyncResult<(usize, Option<RawFd>), std_io::Error>
+    fn recv_msg_buf<B>(&mut self, buf: &mut B, cmsg: &mut B) -> Poll<(usize, i32), io::Error>
     where
         Self: Sized,
-        B: BufMut,
-    {
-        if !buf.has_remaining_mut() {
-            return Ok(Async::Ready((0, None)));
-        }
-
-        unsafe {
-            let (n, fd) = {
-                let bytes = buf.bytes_mut();
-                self.prepare_uninitialized_buffer(bytes);
-                try_nb!(self.recv_fd(bytes))
-            };
-
-            buf.advance_mut(n);
-            Ok(Async::Ready((n, fd)))
-        }
-    }
+        B: BufMut;
 }
 
-impl AsyncRecvFd for net::UnixStream {}
-impl AsyncRecvFd for mio_uds::UnixStream {}
-
 /// A trait for writable objects which operated in an async fashion.
 ///
 /// This trait inherits from `std::io::Write` and indicates that an I/O object is
 /// **nonblocking**, meaning that it will return an error instead of blocking
 /// when bytes cannot currently be written, but hasn't closed. Specifically
 /// this means that the `write` function for types that implement this trait
 /// can have a few return values:
 ///
 /// * `Ok(n)` means that `n` bytes of data was immediately written .
 /// * `Err(e) if e.kind() == ErrorKind::WouldBlock` means that no data was
 ///   written from the buffer provided. The I/O object is not currently
 ///   writable but may become writable in the future.
 /// * `Err(e)` for other errors are standard I/O errors coming from the
 ///   underlying object.
-pub trait AsyncSendFd: SendFd {
+pub trait AsyncSendMsg: AsyncWrite {
     /// Write a `Buf` into this value, returning how many bytes were written.
     ///
     /// Note that this method will advance the `buf` provided automatically by
     /// the number of bytes written.
-    fn send_buf_fd<B>(&mut self, buf: &mut B, fd: Option<RawFd>) -> AsyncResult<usize, std_io::Error>
+    fn send_msg_buf<B, C>(&mut self, buf: &mut B, cmsg: &C) -> Poll<usize, io::Error>
     where
         Self: Sized,
         B: Buf,
+        C: Buf;
+}
+
+////////////////////////////////////////////////////////////////////////////////
+
+impl AsyncRecvMsg for UnixStream {
+    fn recv_msg_buf<B>(&mut self, buf: &mut B, cmsg: &mut B) -> Poll<(usize, i32), io::Error>
+    where
+        B: BufMut,
     {
-        if !buf.has_remaining() {
-            return Ok(Async::Ready(0));
+        if let Async::NotReady = <UnixStream>::poll_read(self) {
+            return Ok(Async::NotReady);
         }
+        let r = unsafe {
+            // The `IoVec` type can't have a 0-length size, so we create a bunch
+            // of dummy versions on the stack with 1 length which we'll quickly
+            // overwrite.
+            let b1: &mut [u8] = &mut [0];
+            let b2: &mut [u8] = &mut [0];
+            let b3: &mut [u8] = &mut [0];
+            let b4: &mut [u8] = &mut [0];
+            let b5: &mut [u8] = &mut [0];
+            let b6: &mut [u8] = &mut [0];
+            let b7: &mut [u8] = &mut [0];
+            let b8: &mut [u8] = &mut [0];
+            let b9: &mut [u8] = &mut [0];
+            let b10: &mut [u8] = &mut [0];
+            let b11: &mut [u8] = &mut [0];
+            let b12: &mut [u8] = &mut [0];
+            let b13: &mut [u8] = &mut [0];
+            let b14: &mut [u8] = &mut [0];
+            let b15: &mut [u8] = &mut [0];
+            let b16: &mut [u8] = &mut [0];
+            let mut bufs: [&mut IoVec; 16] = [
+                b1.into(),
+                b2.into(),
+                b3.into(),
+                b4.into(),
+                b5.into(),
+                b6.into(),
+                b7.into(),
+                b8.into(),
+                b9.into(),
+                b10.into(),
+                b11.into(),
+                b12.into(),
+                b13.into(),
+                b14.into(),
+                b15.into(),
+                b16.into(),
+            ];
+            let n = buf.bytes_vec_mut(&mut bufs);
+            self.recv_msg(&mut bufs[..n], cmsg.bytes_mut())
+        };
 
-        let n = try_nb!(self.send_fd(buf.bytes(), fd));
-        buf.advance(n);
-        Ok(Async::Ready(n))
+        match r {
+            Ok((n, cmsg_len, flags)) => {
+                unsafe {
+                    buf.advance_mut(n);
+                }
+                unsafe {
+                    cmsg.advance_mut(cmsg_len);
+                }
+                Ok((n, flags).into())
+            },
+            Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
+                self.need_read();
+                Ok(Async::NotReady)
+            },
+            Err(e) => Err(e),
+        }
     }
 }
 
-impl AsyncSendFd for net::UnixStream {}
-impl AsyncSendFd for mio_uds::UnixStream {}
+impl AsyncSendMsg for UnixStream {
+    fn send_msg_buf<B, C>(&mut self, buf: &mut B, cmsg: &C) -> Poll<usize, io::Error>
+    where
+        B: Buf,
+        C: Buf,
+    {
+        if let Async::NotReady = <UnixStream>::poll_write(self) {
+            return Ok(Async::NotReady);
+        }
+        let r = {
+            // The `IoVec` type can't have a zero-length size, so create a dummy
+            // version from a 1-length slice which we'll overwrite with the
+            // `bytes_vec` method.
+            static DUMMY: &[u8] = &[0];
+            let nom = <&IoVec>::from(DUMMY);
+            let mut bufs = [
+                nom,
+                nom,
+                nom,
+                nom,
+                nom,
+                nom,
+                nom,
+                nom,
+                nom,
+                nom,
+                nom,
+                nom,
+                nom,
+                nom,
+                nom,
+                nom,
+            ];
+            let n = buf.bytes_vec(&mut bufs);
+            self.send_msg(&bufs[..n], cmsg.bytes())
+        };
+        match r {
+            Ok(n) => {
+                buf.advance(n);
+                Ok(Async::Ready(n))
+            },
+            Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
+                self.need_write();
+                Ok(Async::NotReady)
+            },
+            Err(e) => Err(e),
+        }
+    }
+}
new file mode 100644
--- /dev/null
+++ b/media/audioipc/audioipc/src/cmsg.rs
@@ -0,0 +1,187 @@
+// Copyright © 2017 Mozilla Foundation
+//
+// This program is made available under an ISC-style license.  See the
+// accompanying file LICENSE for details
+
+use bytes::{BufMut, Bytes, BytesMut};
+use libc::{self, cmsghdr};
+use std::{convert, mem, ops, slice};
+use std::os::unix::io::RawFd;
+
+#[derive(Clone, Debug)]
+pub struct Fds {
+    fds: Bytes
+}
+
+impl convert::AsRef<[RawFd]> for Fds {
+    fn as_ref(&self) -> &[RawFd] {
+        let n = self.fds.len() / mem::size_of::<RawFd>();
+        unsafe { slice::from_raw_parts(self.fds.as_ptr() as *const _, n) }
+    }
+}
+
+impl ops::Deref for Fds {
+    type Target = [RawFd];
+
+    #[inline]
+    fn deref(&self) -> &[RawFd] {
+        self.as_ref()
+    }
+}
+
+pub struct ControlMsgIter {
+    control: Bytes
+}
+
+pub fn iterator(c: Bytes) -> ControlMsgIter {
+    ControlMsgIter {
+        control: c
+    }
+}
+
+impl Iterator for ControlMsgIter {
+    type Item = Fds;
+
+    // This follows the logic in __cmsg_nxthdr from glibc
+    // /usr/include/bits/socket.h
+    fn next(&mut self) -> Option<Self::Item> {
+        loop {
+            let control = self.control.clone();
+            let cmsghdr_len = align(mem::size_of::<cmsghdr>());
+
+            if control.len() < cmsghdr_len {
+                // No more entries---not enough data in `control` for a
+                // complete message.
+                return None;
+            }
+
+            let cmsg: &cmsghdr = unsafe { &*(control.as_ptr() as *const _) };
+            // The offset to the next cmsghdr in control.  This must be
+            // aligned to a boundary that matches the type used to
+            // represent the length of the message.
+            let cmsg_len = cmsg.cmsg_len;
+            let next_cmsghdr = align(cmsg_len as _);
+            self.control = if next_cmsghdr > control.len() {
+                // No more entries---not enough data in `control` for a
+                // complete message.
+                Bytes::new()
+            } else {
+                control.slice_from(next_cmsghdr)
+            };
+
+            match (cmsg.cmsg_level, cmsg.cmsg_type) {
+                (libc::SOL_SOCKET, libc::SCM_RIGHTS) => {
+                    trace!("Found SCM_RIGHTS...");
+                    return Some(Fds {
+                        fds: control.slice(cmsghdr_len, cmsg_len as _)
+                    });
+                },
+                (level, kind) => {
+                    trace!("Skipping cmsg level, {}, type={}...", level, kind);
+                },
+            }
+        }
+    }
+}
+
+#[derive(Clone, Copy, Debug, Eq, Hash, Ord, PartialEq, PartialOrd)]
+pub enum Error {
+    /// Not enough space in storage to insert control mesage.
+    NoSpace
+}
+
+#[must_use]
+pub struct ControlMsgBuilder {
+    result: Result<BytesMut, Error>
+}
+
+pub fn builder(buf: &mut BytesMut) -> ControlMsgBuilder {
+    let buf = aligned(buf);
+    ControlMsgBuilder {
+        result: Ok(buf)
+    }
+}
+
+impl ControlMsgBuilder {
+    fn msg(mut self, level: libc::c_int, kind: libc::c_int, msg: &[u8]) -> Self {
+        self.result = self.result.and_then(align_buf).and_then(|mut cmsg| {
+            let cmsg_len = len(msg.len());
+            if cmsg.remaining_mut() < cmsg_len {
+                return Err(Error::NoSpace);
+            }
+
+            let cmsghdr = cmsghdr {
+                cmsg_len: cmsg_len as _,
+                cmsg_level: level,
+                cmsg_type: kind
+            };
+
+            let cmsghdr = unsafe { slice::from_raw_parts(&cmsghdr as *const _ as *const _, mem::size_of::<cmsghdr>()) };
+            cmsg.put_slice(cmsghdr);
+            let mut cmsg = try!(align_buf(cmsg));
+            cmsg.put_slice(msg);
+
+            Ok(cmsg)
+        });
+
+        self
+    }
+
+    pub fn rights(self, fds: &[RawFd]) -> Self {
+        self.msg(libc::SOL_SOCKET, libc::SCM_RIGHTS, fds.as_bytes())
+    }
+
+    pub fn finish(self) -> Result<Bytes, Error> {
+        self.result.map(|mut cmsg| cmsg.take().freeze())
+    }
+}
+
+pub trait AsBytes {
+    fn as_bytes(&self) -> &[u8];
+}
+
+impl<'a, T: Sized> AsBytes for &'a [T] {
+    fn as_bytes(&self) -> &[u8] {
+        // TODO: This should account for the alignment of T
+        let byte_count = self.len() * mem::size_of::<T>();
+        unsafe { slice::from_raw_parts(self.as_ptr() as *const _, byte_count) }
+    }
+}
+
+fn aligned(buf: &BytesMut) -> BytesMut {
+    let mut aligned_buf = buf.clone();
+    aligned_buf.reserve(buf.remaining_mut());
+    let cmsghdr_align = mem::align_of::<cmsghdr>();
+    let n = unsafe { aligned_buf.bytes_mut().as_ptr() } as usize & (cmsghdr_align - 1);
+    if n != 0 {
+        unsafe { aligned_buf.advance_mut(n) };
+        drop(aligned_buf.take());
+    }
+    aligned_buf
+}
+
+fn align_buf(mut cmsg: BytesMut) -> Result<BytesMut, Error> {
+    let offset = unsafe { cmsg.bytes_mut().as_ptr() } as usize;
+    let adjust = align(offset) - offset;
+    if cmsg.remaining_mut() < adjust {
+        return Err(Error::NoSpace);
+    }
+
+    for _ in 0..adjust {
+        cmsg.put_u8(0);
+    }
+    Ok(cmsg)
+}
+
+fn align(len: usize) -> usize {
+    let cmsghdr_align = mem::align_of::<cmsghdr>();
+    (len + cmsghdr_align - 1) & !(cmsghdr_align - 1)
+}
+
+pub fn len(len: usize) -> usize {
+    align(mem::size_of::<cmsghdr>()) + len
+}
+
+pub fn space(len: usize) -> usize {
+    align(mem::size_of::<cmsghdr>()) + align(len)
+}
--- a/media/audioipc/audioipc/src/codec.rs
+++ b/media/audioipc/audioipc/src/codec.rs
@@ -1,141 +1,180 @@
 // Copyright © 2017 Mozilla Foundation
 //
 // This program is made available under an ISC-style license.  See the
 // accompanying file LICENSE for details
 
 //! `Encoder`s and `Decoder`s from items to/from `BytesMut` buffers.
 
-use bincode::{self, Bounded, deserialize, serialize_into, serialized_size};
-use bytes::{Buf, BufMut, BytesMut, LittleEndian};
+use bincode::{self, deserialize, serialize_into, serialized_size, Bounded};
+use bytes::{BufMut, ByteOrder, BytesMut, LittleEndian};
 use serde::de::DeserializeOwned;
 use serde::ser::Serialize;
-use std::io as std_io;
-use std::io::Cursor;
-use std::mem;
+use std::fmt::Debug;
+use std::io;
+use std::marker::PhantomData;
 
 ////////////////////////////////////////////////////////////////////////////////
 // Split buffer into size delimited frames - This appears more complicated than
 // might be necessary due to handling the possibility of messages being split
 // across reads.
 
-#[derive(Debug)]
-enum FrameState {
-    Head,
-    Data(usize)
-}
+pub trait Codec {
+    /// The type of items to be encoded into byte buffer
+    type In;
+
+    /// The type of items to be returned by decoding from byte buffer
+    type Out;
+
+    /// Attempts to decode a frame from the provided buffer of bytes.
+    fn decode(&mut self, buf: &mut BytesMut) -> io::Result<Option<Self::Out>>;
 
-#[derive(Debug)]
-pub struct Decoder {
-    state: FrameState
-}
-
-impl Decoder {
-    pub fn new() -> Self {
-        Decoder {
-            state: FrameState::Head
+    /// A default method available to be called when there are no more bytes
+    /// available to be read from the I/O.
+    fn decode_eof(&mut self, buf: &mut BytesMut) -> io::Result<Self::Out> {
+        match try!(self.decode(buf)) {
+            Some(frame) => Ok(frame),
+            None => Err(io::Error::new(
+                io::ErrorKind::Other,
+                "bytes remaining on stream"
+            ))
         }
     }
 
-    fn decode_head(&mut self, src: &mut BytesMut) -> std_io::Result<Option<usize>> {
-        let head_size = mem::size_of::<u16>();
-        if src.len() < head_size {
+    /// Encodes a frame intox the buffer provided.
+    fn encode(&mut self, msg: Self::In, buf: &mut BytesMut) -> io::Result<()>;
+}
+
+/// Codec based upon bincode serialization
+///
+/// Messages that have been serialized using bincode are prefixed with
+/// the length of the message to aid in deserialization, so that it's
+/// known if enough data has been received to decode a complete
+/// message.
+pub struct LengthDelimitedCodec<In, Out> {
+    state: State,
+    __in: PhantomData<In>,
+    __out: PhantomData<Out>
+}
+
+enum State {
+    Length,
+    Data(u16)
+}
+
+impl<In, Out> Default for LengthDelimitedCodec<In, Out> {
+    fn default() -> Self {
+        LengthDelimitedCodec {
+            state: State::Length,
+            __in: PhantomData,
+            __out: PhantomData
+        }
+    }
+}
+
+impl<In, Out> LengthDelimitedCodec<In, Out> {
+    // Lengths are encoded as little endian u16
+    fn decode_length(&mut self, buf: &mut BytesMut) -> io::Result<Option<u16>> {
+        if buf.len() < 2 {
             // Not enough data
             return Ok(None);
         }
 
-        let n = {
-            let mut src = Cursor::new(&mut *src);
-
-            // match endianess
-            let n = src.get_uint::<LittleEndian>(head_size);
-
-            if n > u64::from(u16::max_value()) {
-                return Err(std_io::Error::new(
-                    std_io::ErrorKind::InvalidData,
-                    "frame size too big"
-                ));
-            }
-
-            // The check above ensures there is no overflow
-            n as usize
-        };
+        let n = LittleEndian::read_u16(buf.as_ref());
 
         // Consume the length field
-        let _ = src.split_to(head_size);
+        let _ = buf.split_to(2);
 
         Ok(Some(n))
     }
 
-    fn decode_data(&self, n: usize, src: &mut BytesMut) -> std_io::Result<Option<BytesMut>> {
+    fn decode_data(&mut self, buf: &mut BytesMut, n: u16) -> io::Result<Option<Out>>
+    where
+        Out: DeserializeOwned + Debug
+    {
         // At this point, the buffer has already had the required capacity
         // reserved. All there is to do is read.
-        if src.len() < n {
+        let n = n as usize;
+        if buf.len() < n {
             return Ok(None);
         }
 
-        Ok(Some(src.split_to(n)))
+        let buf = buf.split_to(n).freeze();
+
+        trace!("Attempting to decode");
+        let msg = try!(deserialize::<Out>(buf.as_ref()).map_err(|e| match *e {
+            bincode::ErrorKind::IoError(e) => e,
+            _ => io::Error::new(io::ErrorKind::Other, *e)
+        }));
+
+        trace!("... Decoded {:?}", msg);
+        Ok(Some(msg))
     }
+}
 
-    pub fn split_frame(&mut self, src: &mut BytesMut) -> std_io::Result<Option<BytesMut>> {
+impl<In, Out> Codec for LengthDelimitedCodec<In, Out>
+where
+    In: Serialize + Debug,
+    Out: DeserializeOwned + Debug
+{
+    type In = In;
+    type Out = Out;
+
+    fn decode(&mut self, buf: &mut BytesMut) -> io::Result<Option<Self::Out>> {
         let n = match self.state {
-            FrameState::Head => {
-                match try!(self.decode_head(src)) {
+            State::Length => {
+                match try!(self.decode_length(buf)) {
                     Some(n) => {
-                        self.state = FrameState::Data(n);
+                        self.state = State::Data(n);
 
                         // Ensure that the buffer has enough space to read the
                         // incoming payload
-                        src.reserve(n);
+                        buf.reserve(n as usize);
 
                         n
                     },
-                    None => return Ok(None),
+                    None => return Ok(None)
                 }
             },
-            FrameState::Data(n) => n,
+            State::Data(n) => n
         };
 
-        match try!(self.decode_data(n, src)) {
+        match try!(self.decode_data(buf, n)) {
             Some(data) => {
                 // Update the decode state
-                self.state = FrameState::Head;
+                self.state = State::Length;
 
                 // Make sure the buffer has enough space to read the next head
-                src.reserve(mem::size_of::<u16>());
+                buf.reserve(2);
 
                 Ok(Some(data))
             },
-            None => Ok(None),
+            None => Ok(None)
         }
     }
 
-    pub fn decode<ITEM: DeserializeOwned>(&mut self, src: &mut BytesMut) -> Result<Option<ITEM>, bincode::Error> {
-        match try!(self.split_frame(src)) {
-            Some(buf) => deserialize::<ITEM>(buf.as_ref()).and_then(|t| Ok(Some(t))),
-            None => Ok(None),
+    fn encode(&mut self, item: Self::In, buf: &mut BytesMut) -> io::Result<()> {
+        trace!("Attempting to encode");
+        let encoded_len = serialized_size(&item);
+        if encoded_len > 8 * 1024 {
+            return Err(io::Error::new(
+                io::ErrorKind::InvalidInput,
+                "encoded message too big"
+            ));
         }
-    }
-}
+
+        buf.reserve((encoded_len + 2) as usize);
+
+        buf.put_u16::<LittleEndian>(encoded_len as u16);
 
-impl Default for Decoder {
-    fn default() -> Self {
-        Self::new()
+        if let Err(e) =
+            serialize_into::<_, Self::In, _>(&mut buf.writer(), &item, Bounded(encoded_len))
+        {
+            match *e {
+                bincode::ErrorKind::IoError(e) => return Err(e),
+                _ => return Err(io::Error::new(io::ErrorKind::Other, *e))
+            }
+        }
+
+        Ok(())
     }
 }
-
-pub fn encode<ITEM: Serialize>(dst: &mut BytesMut, item: &ITEM) -> Result<(), bincode::Error> {
-    let head_len = mem::size_of::<u16>() as u64;
-    let item_len = serialized_size(item);
-
-    if head_len + item_len > u64::from(u16::max_value()) {
-        return Err(Box::new(bincode::ErrorKind::IoError(std_io::Error::new(
-            std_io::ErrorKind::InvalidInput,
-            "frame too big"
-        ))));
-    }
-
-    let n = (head_len + item_len) as usize;
-    dst.reserve(n);
-    dst.put_u16::<LittleEndian>(item_len as u16);
-    serialize_into(&mut dst.writer(), item, Bounded(item_len))
-}
deleted file mode 100644
--- a/media/audioipc/audioipc/src/connection.rs
+++ /dev/null
@@ -1,206 +0,0 @@
-use {AutoCloseFd, RecvFd, SendFd};
-use async::{Async, AsyncRecvFd};
-use bytes::{BufMut, BytesMut};
-use codec::{Decoder, encode};
-use errors::*;
-use mio::{Poll, PollOpt, Ready, Token};
-use mio::event::Evented;
-use mio::unix::EventedFd;
-use serde::de::DeserializeOwned;
-use serde::ser::Serialize;
-use std::collections::VecDeque;
-use std::error::Error;
-use std::fmt::Debug;
-use std::io::{self, Read};
-use std::os::unix::io::{AsRawFd, RawFd};
-use std::os::unix::net;
-use std::os::unix::prelude::*;
-
-// Because of the trait implementation rules in Rust, this needs to be
-// a wrapper class to allow implementation of a trait from another
-// crate on a struct from yet another crate.
-//
-// This class is effectively net::UnixStream.
-
-#[derive(Debug)]
-pub struct Connection {
-    stream: net::UnixStream,
-    recv_buffer: BytesMut,
-    recv_fd: VecDeque<AutoCloseFd>,
-    send_buffer: BytesMut,
-    decoder: Decoder
-}
-
-impl Connection {
-    pub fn new(stream: net::UnixStream) -> Connection {
-        info!("Create new connection");
-        stream.set_nonblocking(false).unwrap();
-        Connection {
-            stream: stream,
-            recv_buffer: BytesMut::with_capacity(1024),
-            recv_fd: VecDeque::new(),
-            send_buffer: BytesMut::with_capacity(1024),
-            decoder: Decoder::new()
-        }
-    }
-
-    /// Creates an unnamed pair of connected sockets.
-    ///
-    /// Returns two `Connection`s which are connected to each other.
-    ///
-    /// # Examples
-    ///
-    /// ```no_run
-    /// use audioipc::Connection;
-    ///
-    /// let (conn1, conn2) = match Connection::pair() {
-    ///     Ok((conn1, conn2)) => (conn1, conn2),
-    ///     Err(e) => {
-    ///         println!("Couldn't create a pair of connections: {:?}", e);
-    ///         return
-    ///     }
-    /// };
-    /// ```
-    pub fn pair() -> io::Result<(Connection, Connection)> {
-        let (s1, s2) = net::UnixStream::pair()?;
-        Ok((Connection::new(s1), Connection::new(s2)))
-    }
-
-    pub fn take_fd(&mut self) -> Option<RawFd> {
-        self.recv_fd.pop_front().map(|fd| fd.into_raw_fd())
-    }
-
-    pub fn receive<RT>(&mut self) -> Result<RT>
-    where
-        RT: DeserializeOwned + Debug,
-    {
-        self.receive_with_fd()
-    }
-
-    pub fn receive_with_fd<RT>(&mut self) -> Result<RT>
-    where
-        RT: DeserializeOwned + Debug,
-    {
-        trace!("received_with_fd...");
-        loop {
-            trace!("   recv_buffer = {:?}", self.recv_buffer);
-            if !self.recv_buffer.is_empty() {
-                let r = self.decoder.decode(&mut self.recv_buffer);
-                trace!("receive {:?}", r);
-                match r {
-                    Ok(Some(r)) => return Ok(r),
-                    Ok(None) => {
-                        /* Buffer doesn't contain enough data for a complete
-                         * message, so need to enter recv_buf_fd to get more. */
-                    },
-                    Err(e) => return Err(e).chain_err(|| "Failed to deserialize message"),
-                }
-            }
-
-            // Otherwise, try to read more data and try again. Make sure we've
-            // got room for at least one byte to read to ensure that we don't
-            // get a spurious 0 that looks like EOF
-
-            // The decoder.decode should have reserved an amount for
-            // the next bit it needs to read.  Check that we reserved
-            // enough space for, at least the 2 byte size prefix.
-            assert!(self.recv_buffer.remaining_mut() > 2);
-
-            // TODO: Read until block, EOF, or error.
-            // TODO: Switch back to recv_fd.
-            match self.stream.recv_buf_fd(&mut self.recv_buffer) {
-                Ok(Async::Ready((0, _))) => return Err(ErrorKind::Disconnected.into()),
-                // TODO: Handle partial read?
-                Ok(Async::Ready((_, fd))) => {
-                    trace!(
-                        "   recv_buf_fd: recv_buffer: {:?}, recv_fd: {:?}, fd: {:?}",
-                        self.recv_buffer,
-                        self.recv_fd,
-                        fd
-                    );
-                    if let Some(fd) = fd {
-                        self.recv_fd.push_back(
-                            unsafe { AutoCloseFd::from_raw_fd(fd) }
-                        );
-                    }
-                },
-                Ok(Async::NotReady) => bail!("Socket should be blocking."),
-                // TODO: Handle dropped message.
-                // Err(ref e) if e.kind() == std::io::ErrorKind::WouldBlock => panic!("wouldblock"),
-                Err(e) => bail!("stream recv_buf_fd returned: {}", e.description()),
-            }
-        }
-    }
-
-    pub fn send<ST>(&mut self, msg: ST) -> Result<usize>
-    where
-        ST: Serialize + Debug,
-    {
-        self.send_with_fd::<ST, Connection>(msg, None)
-    }
-
-    pub fn send_with_fd<ST, FD>(&mut self, msg: ST, fd_to_send: Option<FD>) -> Result<usize>
-    where
-        ST: Serialize + Debug,
-        FD: IntoRawFd + Debug,
-    {
-        trace!("send_with_fd {:?}, {:?}", msg, fd_to_send);
-        try!(encode(&mut self.send_buffer, &msg));
-        let fd_to_send = fd_to_send.map(|fd| fd.into_raw_fd());
-        let send = self.send_buffer.take().freeze();
-        self.stream.send_fd(send.as_ref(), fd_to_send).chain_err(
-            || "Failed to send message with fd"
-        )
-    }
-}
-
-impl Evented for Connection {
-    fn register(&self, poll: &Poll, token: Token, events: Ready, opts: PollOpt) -> io::Result<()> {
-        EventedFd(&self.stream.as_raw_fd()).register(poll, token, events, opts)
-    }
-
-    fn reregister(&self, poll: &Poll, token: Token, events: Ready, opts: PollOpt) -> io::Result<()> {
-        EventedFd(&self.stream.as_raw_fd()).reregister(poll, token, events, opts)
-    }
-
-    fn deregister(&self, poll: &Poll) -> io::Result<()> {
-        EventedFd(&self.stream.as_raw_fd()).deregister(poll)
-    }
-}
-
-impl Read for Connection {
-    fn read(&mut self, bytes: &mut [u8]) -> io::Result<usize> {
-        self.stream.read(bytes)
-    }
-}
-
-// TODO: Is this required?
-impl<'a> Read for &'a Connection {
-    fn read(&mut self, bytes: &mut [u8]) -> io::Result<usize> {
-        (&self.stream).read(bytes)
-    }
-}
-
-impl RecvFd for Connection {
-    fn recv_fd(&mut self, buf_to_recv: &mut [u8]) -> io::Result<(usize, Option<RawFd>)> {
-        self.stream.recv_fd(buf_to_recv)
-    }
-}
-
-impl FromRawFd for Connection {
-    unsafe fn from_raw_fd(fd: RawFd) -> Connection {
-        Connection::new(net::UnixStream::from_raw_fd(fd))
-    }
-}
-
-impl IntoRawFd for Connection {
-    fn into_raw_fd(self) -> RawFd {
-        self.stream.into_raw_fd()
-    }
-}
-
-impl SendFd for Connection {
-    fn send_fd(&mut self, buf_to_send: &[u8], fd_to_send: Option<RawFd>) -> io::Result<usize> {
-        self.stream.send_fd(buf_to_send, fd_to_send)
-    }
-}
new file mode 100644
--- /dev/null
+++ b/media/audioipc/audioipc/src/core.rs
@@ -0,0 +1,102 @@
+// Ease accessing reactor::Core handles.
+
+use futures::{Future, IntoFuture};
+use futures::sync::oneshot;
+use std::{fmt, io, thread};
+use std::sync::mpsc;
+use tokio_core::reactor::{Core, Handle, Remote};
+
+scoped_thread_local! {
+    static HANDLE: Handle
+}
+
+pub fn handle() -> Handle {
+    HANDLE.with(|handle| handle.clone())
+}
+
+pub fn spawn<F>(f: F)
+where
+    F: Future<Item = (), Error = ()> + 'static
+{
+    HANDLE.with(|handle| handle.spawn(f))
+}
+
+pub fn spawn_fn<F, R>(f: F)
+where
+    F: FnOnce() -> R + 'static,
+    R: IntoFuture<Item = (), Error = ()> + 'static
+{
+    HANDLE.with(|handle| handle.spawn_fn(f))
+}
+
+struct Inner {
+    join: thread::JoinHandle<()>,
+    shutdown: oneshot::Sender<()>
+}
+
+pub struct CoreThread {
+    inner: Option<Inner>,
+    remote: Remote
+}
+
+impl CoreThread {
+    pub fn remote(&self) -> Remote {
+        self.remote.clone()
+    }
+}
+
+impl Drop for CoreThread {
+    fn drop(&mut self) {
+        trace!("Shutting down {:?}", self);
+        if let Some(inner) = self.inner.take() {
+            let _ = inner.shutdown.send(());
+            drop(inner.join.join());
+        }
+    }
+}
+
+impl fmt::Debug for CoreThread {
+    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
+        // f.debug_tuple("CoreThread").field(&"...").finish()
+        f.debug_tuple("CoreThread").field(&self.remote).finish()
+    }
+}
+
+pub fn spawn_thread<S, F>(name: S, f: F) -> io::Result<CoreThread>
+where
+    S: Into<String>,
+    F: FnOnce() -> io::Result<()> + Send + 'static
+{
+    let (shutdown_tx, shutdown_rx) = oneshot::channel::<()>();
+    let (remote_tx, remote_rx) = mpsc::channel::<Remote>();
+
+    let join = try!(thread::Builder::new().name(name.into()).spawn(move || {
+        let mut core = Core::new().expect("Failed to create reactor::Core");
+        let handle = core.handle();
+        let remote = handle.remote().clone();
+        drop(remote_tx.send(remote));
+
+        drop(HANDLE.set(&handle, || {
+            f().and_then(|_| {
+                let _ = core.run(shutdown_rx);
+                Ok(())
+            })
+        }));
+        trace!("thread shutdown...");
+    }));
+
+    let remote = try!(remote_rx.recv().or_else(|_| {
+        Err(io::Error::new(
+            io::ErrorKind::Other,
+            "Failed to receive remote handle from spawned thread"
+        ))
+    }));
+
+    Ok(CoreThread {
+        inner: Some(Inner {
+            join: join,
+            shutdown: shutdown_tx
+        }),
+        remote: remote
+    })
+}
new file mode 100644
--- /dev/null
+++ b/media/audioipc/audioipc/src/fd_passing.rs
@@ -0,0 +1,364 @@
+// Copyright © 2017 Mozilla Foundation
+//
+// This program is made available under an ISC-style license.  See the
+// accompanying file LICENSE for details
+
+use async::{AsyncRecvMsg, AsyncSendMsg};
+use bytes::{Bytes, BytesMut, IntoBuf};
+use cmsg;
+use codec::Codec;
+use futures::{AsyncSink, Poll, Sink, StartSend, Stream};
+use libc;
+use messages::AssocRawFd;
+use std::{fmt, io, mem};
+use std::collections::VecDeque;
+use std::os::unix::io::RawFd;
+
+const INITIAL_CAPACITY: usize = 1024;
+const BACKPRESSURE_THRESHOLD: usize = 4 * INITIAL_CAPACITY;
+const FDS_CAPACITY: usize = 16;
+
+struct IncomingFds {
+    cmsg: BytesMut,
+    recv_fds: Option<cmsg::ControlMsgIter>
+}
+
+impl IncomingFds {
+    pub fn new(c: usize) -> Self {
+        let capacity = c * cmsg::space(mem::size_of::<[RawFd; 3]>());
+        IncomingFds {
+            cmsg: BytesMut::with_capacity(capacity),
+            recv_fds: None
+        }
+    }
+
+    pub fn take_fds(&mut self) -> Option<[RawFd; 3]> {
+        loop {
+            let fds = self.recv_fds
+                .as_mut()
+                .and_then(|recv_fds| recv_fds.next())
+                .and_then(|fds| Some(clone_into_array(&fds)));
+
+            if fds.is_some() {
+                return fds;
+            }
+
+            if self.cmsg.is_empty() {
+                return None;
+            }
+
+            self.recv_fds = Some(cmsg::iterator(self.cmsg.take().freeze()));
+        }
+    }
+
+    pub fn cmsg(&mut self) -> &mut BytesMut {
+        self.cmsg.reserve(cmsg::space(mem::size_of::<[RawFd; 3]>()));
+        &mut self.cmsg
+    }
+}
+
+#[derive(Debug)]
+struct Frame {
+    msgs: Bytes,
+    fds: Option<Bytes>
+}
+
+/// A unified `Stream` and `Sink` interface over an I/O object, using
+/// the `Codec` trait to encode and decode the payload.
+pub struct FramedWithFds<A, C> {
+    io: A,
+    codec: C,
+    // Stream
+    read_buf: BytesMut,
+    incoming_fds: IncomingFds,
+    is_readable: bool,
+    eof: bool,
+    // Sink
+    frames: VecDeque<Frame>,
+    write_buf: BytesMut,
+    outgoing_fds: BytesMut
+}
+
+impl<A, C> FramedWithFds<A, C>
+where
+    A: AsyncSendMsg
+{
+    // If there is a buffered frame, try to write it to `A`
+    fn do_write(&mut self) -> Poll<(), io::Error> {
+        debug!("do_write...");
+        // Create a frame from any pending message in `write_buf`.
+        if !self.write_buf.is_empty() {
+            self.set_frame(None);
+        }
+
+        trace!("pending frames: {:?}", self.frames);
+
+        let mut processed = 0;
+
+        loop {
+            let n = match self.frames.front() {
+                Some(frame) => {
+                    trace!("sending msg {:?}, fds {:?}", frame.msgs, frame.fds);
+                    let mut msgs = frame.msgs.clone().into_buf();
+                    let mut fds = match frame.fds {
+                        Some(ref fds) => fds.clone(),
+                        None => Bytes::new()
+                    }.into_buf();
+                    try_ready!(self.io.send_msg_buf(&mut msgs, &fds))
+                },
+                _ => {
+                    // No pending frames.
+                    return Ok(().into());
+                }
+            };
+
+            match self.frames.pop_front() {
+                Some(mut frame) => {
+                    processed += 1;
+
+                    // Close any fds that have been sent. The fds are
+                    // encoded in cmsg format inside frame.fds. Use
+                    // the cmsg iterator to access msg and extract
+                    // RawFds.
+                    frame.fds.take().and_then(|cmsg| {
+                        for fds in cmsg::iterator(cmsg) {
+                            close_fds(&*fds)
+                        }
+                        Some(())
+                    });
+
+                    if n != frame.msgs.len() {
+                        // If only part of the message was sent then
+                        // re-queue the remaining message at the head
+                        // of the queue. (Don't need to resend the fds
+                        // since they've been sent with the first
+                        // part.)
+                        drop(frame.msgs.split_to(n));
+                        self.frames.push_front(frame);
+                        break;
+                    }
+                },
+                _ => panic!()
+            }
+        }
+        debug!("process {} frames", processed);
+
+        trace!("pending frames: {:?}", self.frames);
+
+        Ok(().into())
+    }
+
+    fn set_frame(&mut self, fds: Option<Bytes>) {
+        if self.write_buf.is_empty() {
+            assert!(fds.is_none());
+            trace!("set_frame: No pending messages...");
+            return;
+        }
+
+        let msgs = self.write_buf.take().freeze();
+        trace!("set_frame: msgs={:?} fds={:?}", msgs, fds);
+
+        self.frames.push_back(Frame {
+            msgs,
+            fds
+        });
+    }
+}
+
+impl<A, C> Stream for FramedWithFds<A, C>
+where
+    A: AsyncRecvMsg,
+    C: Codec,
+    C::Out: AssocRawFd
+{
+    type Item = C::Out;
+    type Error = io::Error;
+
+    fn poll(&mut self) -> Poll<Option<Self::Item>, Self::Error> {
+        loop {
+            // Repeatedly call `decode` or `decode_eof` as long as it is
+            // "readable". Readable is defined as not having returned `None`. If
+            // the upstream has returned EOF, and the decoder is no longer
+            // readable, it can be assumed that the decoder will never become
+            // readable again, at which point the stream is terminated.
+            if self.is_readable {
+                if self.eof {
+                    let mut item = try!(self.codec.decode_eof(&mut self.read_buf));
+                    item.take_fd(|| self.incoming_fds.take_fds());
+                    return Ok(Some(item).into());
+                }
+
+                trace!("attempting to decode a frame");
+
+                if let Some(mut item) = try!(self.codec.decode(&mut self.read_buf)) {
+                    trace!("frame decoded from buffer");
+                    item.take_fd(|| self.incoming_fds.take_fds());
+                    return Ok(Some(item).into());
+                }
+
+                self.is_readable = false;
+            }
+
+            assert!(!self.eof);
+
+            // Otherwise, try to read more data and try again. Make sure we've
+            // got room for at least one byte to read to ensure that we don't
+            // get a spurious 0 that looks like EOF
+            let (n, _) = try_ready!(
+                self.io
+                    .recv_msg_buf(&mut self.read_buf, self.incoming_fds.cmsg())
+            );
+
+            // if flags != 0 {
+            //     error!("recv_msg_buf: flags = {:x}", flags)
+            // }
+
+            if n == 0 {
+                self.eof = true;
+            }
+
+            self.is_readable = true;
+        }
+    }
+}
+
+impl<A, C> Sink for FramedWithFds<A, C>
+where
+    A: AsyncSendMsg,
+    C: Codec,
+    C::In: AssocRawFd + fmt::Debug
+{
+    type SinkItem = C::In;
+    type SinkError = io::Error;
+
+    fn start_send(
+        &mut self,
+        item: Self::SinkItem
+    ) -> StartSend<Self::SinkItem, Self::SinkError> {
+        trace!("start_send: item={:?}", item);
+
+        // If the buffer is already over BACKPRESSURE_THRESHOLD,
+        // then attempt to flush it. If after flush it's *still*
+        // over BACKPRESSURE_THRESHOLD, then reject the send.
+        if self.write_buf.len() > BACKPRESSURE_THRESHOLD {
+            try!(self.poll_complete());
+            if self.write_buf.len() > BACKPRESSURE_THRESHOLD {
+                return Ok(AsyncSink::NotReady(item));
+            }
+        }
+
+        let fds = item.fd();
+        try!(self.codec.encode(item, &mut self.write_buf));
+        let fds = fds.and_then(|fds| {
+            cmsg::builder(&mut self.outgoing_fds)
+                .rights(&fds[..])
+                .finish()
+                .ok()
+        });
+
+        trace!("item fds: {:?}", fds);
+
+        if fds.is_some() {
+            // Enforce splitting sends on messages that contain file
+            // descriptors.
+            self.set_frame(fds);
+        }
+
+        Ok(AsyncSink::Ready)
+    }
+
+    fn poll_complete(&mut self) -> Poll<(), Self::SinkError> {
+        trace!("flushing framed transport");
+
+        try_ready!(self.do_write());
+
+        try_nb!(self.io.flush());
+
+        trace!("framed transport flushed");
+        Ok(().into())
+    }
+
+    fn close(&mut self) -> Poll<(), Self::SinkError> {
+        try_ready!(self.poll_complete());
+        self.io.shutdown()
+    }
+}
+
+pub fn framed_with_fds<A, C>(io: A, codec: C) -> FramedWithFds<A, C> {
+    FramedWithFds {
+        io: io,
+        codec: codec,
+        read_buf: BytesMut::with_capacity(INITIAL_CAPACITY),
+        incoming_fds: IncomingFds::new(FDS_CAPACITY),
+        is_readable: false,
+        eof: false,
+        frames: VecDeque::new(),
+        write_buf: BytesMut::with_capacity(INITIAL_CAPACITY),
+        outgoing_fds: BytesMut::with_capacity(
+            FDS_CAPACITY * cmsg::space(mem::size_of::<[RawFd; 3]>())
+        )
+    }
+}
+
+fn write_zero() -> io::Error {
+    io::Error::new(io::ErrorKind::WriteZero, "failed to write frame to io")
+}
+
+fn clone_into_array<A, T>(slice: &[T]) -> A
+where
+    A: Sized + Default + AsMut<[T]>,
+    T: Clone
+{
+    let mut a = Default::default();
+    <A as AsMut<[T]>>::as_mut(&mut a).clone_from_slice(slice);
+    a
+}
+
+fn close_fds(fds: &[RawFd]) {
+    for fd in fds {
+        unsafe {
+            libc::close(*fd);
+        }
+    }
+}
+
+#[cfg(test)]
+mod tests {
+    use bytes::BufMut;
+
+    const CMSG_BYTES: &[u8] =
+        b"\x1c\0\0\0\0\0\0\0\x01\0\0\0\x01\0\0\02\0\0\0[\0\0\0\\\0\0\0\xe5\xe5\xe5\xe5";
+
+    #[test]
+    fn single_cmsg() {
+        let mut incoming = super::IncomingFds::new(16);
+
+        incoming.cmsg().put_slice(CMSG_BYTES);
+        assert!(incoming.take_fds().is_some());
+        assert!(incoming.take_fds().is_none());
+    }
+
+    #[test]
+    fn multiple_cmsg_1() {
+        let mut incoming = super::IncomingFds::new(16);
+
+        incoming.cmsg().put_slice(CMSG_BYTES);
+        assert!(incoming.take_fds().is_some());
+        incoming.cmsg().put_slice(CMSG_BYTES);
+        assert!(incoming.take_fds().is_some());
+        assert!(incoming.take_fds().is_none());
+    }
+
+    #[test]
+    fn multiple_cmsg_2() {
+        let mut incoming = super::IncomingFds::new(16);
+
+        incoming.cmsg().put_slice(CMSG_BYTES);
+        incoming.cmsg().put_slice(CMSG_BYTES);
+        assert!(incoming.take_fds().is_some());
+        incoming.cmsg().put_slice(CMSG_BYTES);
+        assert!(incoming.take_fds().is_some());
+        assert!(incoming.take_fds().is_some());
+        assert!(incoming.take_fds().is_none());
+    }
+}
new file mode 100644
--- /dev/null
+++ b/media/audioipc/audioipc/src/frame.rs
@@ -0,0 +1,164 @@
+// Copyright © 2017 Mozilla Foundation
+//
+// This program is made available under an ISC-style license.  See the
+// accompanying file LICENSE for details
+
+use bytes::{Buf, Bytes, BytesMut, IntoBuf};
+use codec::Codec;
+use futures::{AsyncSink, Poll, Sink, StartSend, Stream};
+use std::io;
+use tokio_io::{AsyncRead, AsyncWrite};
+
+const INITIAL_CAPACITY: usize = 1024;
+const BACKPRESSURE_THRESHOLD: usize = 4 * INITIAL_CAPACITY;
+
+/// A unified `Stream` and `Sink` interface over an I/O object, using
+/// the `Codec` trait to encode and decode the payload.
+pub struct Framed<A, C> {
+    io: A,
+    codec: C,
+    read_buf: BytesMut,
+    write_buf: BytesMut,
+    frame: Option<<Bytes as IntoBuf>::Buf>,
+    is_readable: bool,
+    eof: bool
+}
+
+impl<A, C> Framed<A, C>
+where
+    A: AsyncWrite,
+{
+    // If there is a buffered frame, try to write it to `A`
+    fn do_write(&mut self) -> Poll<(), io::Error> {
+        loop {
+            if self.frame.is_none() {
+                self.set_frame();
+            }
+
+            if self.frame.is_none() {
+                return Ok(().into());
+            }
+
+            let done = {
+                let frame = self.frame.as_mut().unwrap();
+                try_ready!(self.io.write_buf(frame));
+                !frame.has_remaining()
+            };
+
+            if done {
+                self.frame = None;
+            }
+        }
+    }
+
+    fn set_frame(&mut self) {
+        if self.write_buf.is_empty() {
+            return;
+        }
+
+        debug_assert!(self.frame.is_none());
+
+        self.frame = Some(self.write_buf.take().freeze().into_buf());
+    }
+}
+
+impl<A, C> Stream for Framed<A, C>
+where
+    A: AsyncRead,
+    C: Codec,
+{
+    type Item = C::Out;
+    type Error = io::Error;
+
+    fn poll(&mut self) -> Poll<Option<Self::Item>, Self::Error> {
+        loop {
+            // Repeatedly call `decode` or `decode_eof` as long as it is
+            // "readable". Readable is defined as not having returned `None`. If
+            // the upstream has returned EOF, and the decoder is no longer
+            // readable, it can be assumed that the decoder will never become
+            // readable again, at which point the stream is terminated.
+            if self.is_readable {
+                if self.eof {
+                    let frame = try!(self.codec.decode_eof(&mut self.read_buf));
+                    return Ok(Some(frame).into());
+                }
+
+                trace!("attempting to decode a frame");
+
+                if let Some(frame) = try!(self.codec.decode(&mut self.read_buf)) {
+                    trace!("frame decoded from buffer");
+                    return Ok(Some(frame).into());
+                }
+
+                self.is_readable = false;
+            }
+
+            assert!(!self.eof);
+
+            // Otherwise, try to read more data and try again. Make sure we've
+            // got room for at least one byte to read to ensure that we don't
+            // get a spurious 0 that looks like EOF
+            if try_ready!(self.io.read_buf(&mut self.read_buf)) == 0 {
+                self.eof = true;
+            }
+
+            self.is_readable = true;
+        }
+    }
+}
+
+impl<A, C> Sink for Framed<A, C>
+where
+    A: AsyncWrite,
+    C: Codec,
+{
+    type SinkItem = C::In;
+    type SinkError = io::Error;
+
+    fn start_send(&mut self, item: Self::SinkItem) -> StartSend<Self::SinkItem, Self::SinkError> {
+        // If the buffer is already over BACKPRESSURE_THRESHOLD,
+        // then attempt to flush it. If after flush it's *still*
+        // over BACKPRESSURE_THRESHOLD, then reject the send.
+        if self.write_buf.len() > BACKPRESSURE_THRESHOLD {
+            try!(self.poll_complete());
+            if self.write_buf.len() > BACKPRESSURE_THRESHOLD {
+                return Ok(AsyncSink::NotReady(item));
+            }
+        }
+
+        try!(self.codec.encode(item, &mut self.write_buf));
+        Ok(AsyncSink::Ready)
+    }
+
+    fn poll_complete(&mut self) -> Poll<(), Self::SinkError> {
+        trace!("flushing framed transport");
+
+        try_ready!(self.do_write());
+
+        try_nb!(self.io.flush());
+
+        trace!("framed transport flushed");
+        Ok(().into())
+    }
+
+    fn close(&mut self) -> Poll<(), Self::SinkError> {
+        try_ready!(self.poll_complete());
+        self.io.shutdown()
+    }
+}
+
+fn write_zero() -> io::Error {
+    io::Error::new(io::ErrorKind::WriteZero, "failed to write frame to io")
+}
+
+pub fn framed<A, C>(io: A, codec: C) -> Framed<A, C> {
+    Framed {
+        io: io,
+        codec: codec,
+        read_buf: BytesMut::with_capacity(INITIAL_CAPACITY),
+        write_buf: BytesMut::with_capacity(INITIAL_CAPACITY),
+        frame: None,
+        is_readable: false,
+        eof: false
+    }
+}
--- a/media/audioipc/audioipc/src/lib.rs
+++ b/media/audioipc/audioipc/src/lib.rs
@@ -12,71 +12,74 @@ extern crate error_chain;
 extern crate log;
 
 #[macro_use]
 extern crate serde_derive;
 
 extern crate bincode;
 extern crate bytes;
 extern crate cubeb_core;
+#[macro_use]
+extern crate futures;
+extern crate iovec;
 extern crate libc;
 extern crate memmap;
-extern crate mio;
-extern crate mio_uds;
 extern crate serde;
+#[macro_use]
+extern crate scoped_tls;
+extern crate tokio_core;
+#[macro_use]
+extern crate tokio_io;
+extern crate tokio_uds;
 
 pub mod async;
+pub mod cmsg;
 pub mod codec;
-mod connection;
 pub mod errors;
+pub mod fd_passing;
+pub mod frame;
+pub mod rpc;
+pub mod core;
 pub mod messages;
 mod msg;
 pub mod shm;
 
-pub use connection::*;
+use iovec::IoVec;
+
+#[cfg(target_os = "linux")]
+use libc::MSG_CMSG_CLOEXEC;
 pub use messages::{ClientMessage, ServerMessage};
 
 use std::env::temp_dir;
 use std::io;
 use std::os::unix::io::{AsRawFd, FromRawFd, IntoRawFd, RawFd};
-use std::os::unix::net;
 use std::path::PathBuf;
+#[cfg(not(target_os = "linux"))]
+const MSG_CMSG_CLOEXEC: libc::c_int = 0;
 
 // Extend sys::os::unix::net::UnixStream to support sending and receiving a single file desc.
 // We can extend UnixStream by using traits, eliminating the need to introduce a new wrapped
 // UnixStream type.
-pub trait RecvFd {
-    fn recv_fd(&mut self, bytes: &mut [u8]) -> io::Result<(usize, Option<RawFd>)>;
+pub trait RecvMsg {
+    fn recv_msg(&mut self, iov: &mut [&mut IoVec], cmsg: &mut [u8]) -> io::Result<(usize, usize, i32)>;
 }
 
-pub trait SendFd {
-    fn send_fd(&mut self, bytes: &[u8], fd: Option<RawFd>) -> io::Result<(usize)>;
+pub trait SendMsg {
+    fn send_msg(&mut self, iov: &[&IoVec], cmsg: &[u8]) -> io::Result<usize>;
 }
 
-impl RecvFd for net::UnixStream {
-    fn recv_fd(&mut self, buf_to_recv: &mut [u8]) -> io::Result<(usize, Option<RawFd>)> {
-        msg::recvmsg(self.as_raw_fd(), buf_to_recv)
+impl<T: AsRawFd> RecvMsg for T {
+    fn recv_msg(&mut self, iov: &mut [&mut IoVec], cmsg: &mut [u8]) -> io::Result<(usize, usize, i32)> {
+        msg::recv_msg_with_flags(self.as_raw_fd(), iov, cmsg, MSG_CMSG_CLOEXEC)
     }
 }
 
-impl RecvFd for mio_uds::UnixStream {
-    fn recv_fd(&mut self, buf_to_recv: &mut [u8]) -> io::Result<(usize, Option<RawFd>)> {
-        msg::recvmsg(self.as_raw_fd(), buf_to_recv)
-    }
-}
-
-impl SendFd for net::UnixStream {
-    fn send_fd(&mut self, buf_to_send: &[u8], fd_to_send: Option<RawFd>) -> io::Result<usize> {
-        msg::sendmsg(self.as_raw_fd(), buf_to_send, fd_to_send)
-    }
-}
-
-impl SendFd for mio_uds::UnixStream {
-    fn send_fd(&mut self, buf_to_send: &[u8], fd_to_send: Option<RawFd>) -> io::Result<usize> {
-        msg::sendmsg(self.as_raw_fd(), buf_to_send, fd_to_send)
+impl<T: AsRawFd> SendMsg for T {
+    fn send_msg(&mut self, iov: &[&IoVec], cmsg: &[u8]) -> io::Result<usize> {
+        msg::send_msg_with_flags(self.as_raw_fd(), iov, cmsg, 0)
     }
 }
 
 ////////////////////////////////////////////////////////////////////////////////
 
 #[derive(Debug)]
 pub struct AutoCloseFd(RawFd);
 
--- a/media/audioipc/audioipc/src/messages.rs
+++ b/media/audioipc/audioipc/src/messages.rs
@@ -1,16 +1,17 @@
 // Copyright © 2017 Mozilla Foundation
 //
 // This program is made available under an ISC-style license.  See the
 // accompanying file LICENSE for details
 
 use cubeb_core::{self, ffi};
 use std::ffi::{CStr, CString};
 use std::os::raw::c_char;
+use std::os::unix::io::RawFd;
 use std::ptr;
 
 #[derive(Debug, Serialize, Deserialize)]
 pub struct Device {
     pub output_name: Option<Vec<u8>>,
     pub input_name: Option<Vec<u8>>
 }
 
@@ -164,29 +165,33 @@ fn dup_str(s: *const c_char) -> Option<V
     } else {
         let vec: Vec<u8> = unsafe { CStr::from_ptr(s) }.to_bytes().to_vec();
         Some(vec)
     }
 }
 
 fn opt_str(v: Option<Vec<u8>>) -> *const c_char {
     match v {
-        Some(v) => {
-            match CString::new(v) {
-                Ok(s) => s.into_raw(),
-                Err(_) => {
-                    debug!("Failed to convert bytes to CString");
-                    ptr::null()
-                },
+        Some(v) => match CString::new(v) {
+            Ok(s) => s.into_raw(),
+            Err(_) => {
+                debug!("Failed to convert bytes to CString");
+                ptr::null()
             }
         },
-        None => ptr::null(),
+        None => ptr::null()
     }
 }
 
+#[derive(Debug, Serialize, Deserialize)]
+pub struct StreamCreate {
+    pub token: usize,
+    pub fds: [RawFd; 3]
+}
+
 // Client -> Server messages.
 // TODO: Callbacks should be different messages types so
 // ServerConn::process_msg doesn't have a catch-all case.
 #[derive(Debug, Serialize, Deserialize)]
 pub enum ServerMessage {
     ClientConnect,
     ClientDisconnect,
 
@@ -202,47 +207,81 @@ pub enum ServerMessage {
 
     StreamStart(usize),
     StreamStop(usize),
     StreamResetDefaultDevice(usize),
     StreamGetPosition(usize),
     StreamGetLatency(usize),
     StreamSetVolume(usize, f32),
     StreamSetPanning(usize, f32),
-    StreamGetCurrentDevice(usize),
-
-    StreamDataCallback(isize),
-    StreamStateCallback
+    StreamGetCurrentDevice(usize)
 }
 
 // Server -> Client messages.
 // TODO: Streams need id.
 #[derive(Debug, Serialize, Deserialize)]
 pub enum ClientMessage {
     ClientConnected,
     ClientDisconnected,
 
     ContextBackendId(),
     ContextMaxChannelCount(u32),
     ContextMinLatency(u32),
     ContextPreferredSampleRate(u32),
     ContextPreferredChannelLayout(ffi::cubeb_channel_layout),
     ContextEnumeratedDevices(Vec<DeviceInfo>),
 
-    StreamCreated(usize), /*(RawFd)*/
-    StreamCreatedInputShm, /*(RawFd)*/
-    StreamCreatedOutputShm, /*(RawFd)*/
+    StreamCreated(StreamCreate),
     StreamDestroyed,
 
     StreamStarted,
     StreamStopped,
     StreamDefaultDeviceReset,
     StreamPosition(u64),
     StreamLatency(u32),
     StreamVolumeSet,
     StreamPanningSet,
     StreamCurrentDevice(Device),
 
-    StreamDataCallback(isize, usize),
-    StreamStateCallback(ffi::cubeb_state),
-
     Error(ffi::cubeb_error_code)
 }
+
+#[derive(Debug, Deserialize, Serialize)]
+pub enum CallbackReq {
+    Data(isize, usize),
+    State(ffi::cubeb_state)
+}
+
+#[derive(Debug, Deserialize, Serialize)]
+pub enum CallbackResp {
+    Data(isize),
+    State
+}
+
+pub trait AssocRawFd {
+    fn fd(&self) -> Option<[RawFd; 3]> {
+        None
+    }
+    fn take_fd<F>(&mut self, _: F)
+    where
+        F: FnOnce() -> Option<[RawFd; 3]>
+    {
+    }
+}
+
+impl AssocRawFd for ServerMessage {}
+impl AssocRawFd for ClientMessage {
+    fn fd(&self) -> Option<[RawFd; 3]> {
+        match *self {
+            ClientMessage::StreamCreated(ref data) => Some(data.fds),
+            _ => None
+        }
+    }
+
+    fn take_fd<F>(&mut self, f: F)
+    where
+        F: FnOnce() -> Option<[RawFd; 3]>
+    {
+        if let ClientMessage::StreamCreated(ref mut data) = *self {
+            data.fds = f().unwrap();
+        }
+    }
+}
--- a/media/audioipc/audioipc/src/msg.rs
+++ b/media/audioipc/audioipc/src/msg.rs
@@ -1,170 +1,80 @@
+use iovec::IoVec;
+use iovec::unix as iovec;
 use libc;
-use std::io;
-use std::mem;
+use std::{cmp, io, mem, ptr};
 use std::os::unix::io::RawFd;
-use std::ptr;
 
 fn cvt(r: libc::ssize_t) -> io::Result<usize> {
     if r == -1 {
         Err(io::Error::last_os_error())
     } else {
         Ok(r as usize)
     }
 }
 
 // Convert return of -1 into error message, handling retry on EINTR
 fn cvt_r<F: FnMut() -> libc::ssize_t>(mut f: F) -> io::Result<usize> {
     loop {
         match cvt(f()) {
             Err(ref e) if e.kind() == io::ErrorKind::Interrupted => {},
-            other => return other,
+            other => return other
         }
     }
 }
 
-// Note: The following fields must be laid out together, the OS expects them
-// to be part of a single allocation.
-#[repr(C)]
-struct CmsgSpace {
-    cmsghdr: libc::cmsghdr,
-    #[cfg(not(target_os = "macos"))]
-    __padding: [usize; 0],
-    data: libc::c_int,
-}
-
-#[cfg(not(target_os = "macos"))]
-fn cmsg_align(len: usize) -> usize {
-    let align_bytes = mem::size_of::<usize>() - 1;
-    (len + align_bytes) & !align_bytes
-}
-
-#[cfg(target_os = "macos")]
-fn cmsg_align(len: usize) -> usize {
-    len
-}
-
-fn cmsg_space() -> usize {
-    mem::size_of::<CmsgSpace>()
-}
+pub fn recv_msg_with_flags(
+    socket: RawFd,
+    bufs: &mut [&mut IoVec],
+    cmsg: &mut [u8],
+    flags: libc::c_int
+) -> io::Result<(usize, usize, libc::c_int)> {
+    let slice = iovec::as_os_slice_mut(bufs);
+    let len = cmp::min(<libc::c_int>::max_value() as usize, slice.len());
+    let (control, controllen) = if cmsg.is_empty() {
+        (ptr::null_mut(), 0)
+    } else {
+        (cmsg.as_ptr() as *mut _, cmsg.len())
+    };
 
-fn cmsg_len() -> usize {
-    cmsg_align(mem::size_of::<libc::cmsghdr>()) + mem::size_of::<libc::c_int>()
-}
-
-pub fn sendmsg(fd: RawFd, to_send: &[u8], fd_to_send: Option<RawFd>) -> io::Result<usize> {
     let mut msghdr: libc::msghdr = unsafe { mem::zeroed() };
-    let mut iovec: libc::iovec = unsafe { mem::zeroed() };
-    let mut cmsg: CmsgSpace = unsafe { mem::zeroed() };
-
-    msghdr.msg_iov = &mut iovec as *mut _;
-    msghdr.msg_iovlen = 1;
-    if fd_to_send.is_some() {
-        msghdr.msg_control = &mut cmsg.cmsghdr as *mut _ as *mut _;
-        msghdr.msg_controllen = cmsg_space() as _;
-    }
+    msghdr.msg_name = ptr::null_mut();
+    msghdr.msg_namelen = 0;
+    msghdr.msg_iov = slice.as_mut_ptr();
+    msghdr.msg_iovlen = len as _;
+    msghdr.msg_control = control;
+    msghdr.msg_controllen = controllen as _;
 
-    iovec.iov_base = if to_send.is_empty() {
-        // Empty Vecs have a non-null pointer.
-        ptr::null_mut()
-    } else {
-        to_send.as_ptr() as *const _ as *mut _
-    };
-    iovec.iov_len = to_send.len();
+    let n = try!(cvt_r(|| unsafe {
+        libc::recvmsg(socket, &mut msghdr as *mut _, flags)
+    }));
 
-    cmsg.cmsghdr.cmsg_len = cmsg_len() as _;
-    cmsg.cmsghdr.cmsg_level = libc::SOL_SOCKET;
-    cmsg.cmsghdr.cmsg_type = libc::SCM_RIGHTS;
-
-    cmsg.data = fd_to_send.unwrap_or(-1);
-
-    cvt_r(|| unsafe { libc::sendmsg(fd, &msghdr, 0) })
+    let controllen = msghdr.msg_controllen as usize;
+    Ok((n, controllen, msghdr.msg_flags))
 }
 
-pub fn recvmsg(fd: RawFd, to_recv: &mut [u8]) -> io::Result<(usize, Option<RawFd>)> {
-    let mut msghdr: libc::msghdr = unsafe { mem::zeroed() };
-    let mut iovec: libc::iovec = unsafe { mem::zeroed() };
-    let mut cmsg: CmsgSpace = unsafe { mem::zeroed() };
-
-    msghdr.msg_iov = &mut iovec as *mut _;
-    msghdr.msg_iovlen = 1;
-    msghdr.msg_control = &mut cmsg.cmsghdr as *mut _ as *mut _;
-    msghdr.msg_controllen = cmsg_space() as _;
-
-    iovec.iov_base = if to_recv.is_empty() {
-        // Empty Vecs have a non-null pointer.
-        ptr::null_mut()
+pub fn send_msg_with_flags(
+    socket: RawFd,
+    bufs: &[&IoVec],
+    cmsg: &[u8],
+    flags: libc::c_int
+) -> io::Result<usize> {
+    let slice = iovec::as_os_slice(bufs);
+    let len = cmp::min(<libc::c_int>::max_value() as usize, slice.len());
+    let (control, controllen) = if cmsg.is_empty() {
+        (ptr::null_mut(), 0)
     } else {
-        to_recv.as_ptr() as *const _ as *mut _
-    };
-    iovec.iov_len = to_recv.len();
-
-    let result = try!(cvt_r(|| unsafe { libc::recvmsg(fd, &mut msghdr, 0) }));
-    let fd = if msghdr.msg_controllen == cmsg_space() as _ &&
-        cmsg.cmsghdr.cmsg_len == cmsg_len() as _ &&
-        cmsg.cmsghdr.cmsg_level == libc::SOL_SOCKET &&
-        cmsg.cmsghdr.cmsg_type == libc::SCM_RIGHTS {
-        Some(cmsg.data)
-    } else {
-        None
+        (cmsg.as_ptr() as *mut _, cmsg.len())
     };
 
-    Ok((result as usize, fd))
-}
-
-#[cfg(test)]
-mod tests {
-    use libc;
-    use std::mem;
-    use std::os::unix::net::UnixStream;
-    use std::os::unix::io::{AsRawFd, FromRawFd, IntoRawFd};
-    use std::io::{Read, Write};
-    use super::{cmsg_len, cmsg_space, sendmsg, recvmsg};
+    let mut msghdr: libc::msghdr = unsafe { mem::zeroed() };
+    msghdr.msg_name = ptr::null_mut();
+    msghdr.msg_namelen = 0;
+    msghdr.msg_iov = slice.as_ptr() as *mut _;
+    msghdr.msg_iovlen = len as _;
+    msghdr.msg_control = control;
+    msghdr.msg_controllen = controllen as _;
 
-    #[test]
-    fn portable_sizes() {
-        if cfg!(all(target_os = "linux", target_pointer_width = "64")) {
-            assert_eq!(mem::size_of::<libc::cmsghdr>(), 16);
-            assert_eq!(cmsg_len(), 20);
-            assert_eq!(cmsg_space(), 24);
-        } else if cfg!(all(target_os = "linux", target_pointer_width = "32")) {
-            assert_eq!(mem::size_of::<libc::cmsghdr>(), 12);
-            assert_eq!(cmsg_len(), 16);
-            assert_eq!(cmsg_space(), 16);
-        } else if cfg!(target_os = "macos") {
-            assert_eq!(mem::size_of::<libc::cmsghdr>(), 12);
-            assert_eq!(cmsg_len(), 16);
-            assert_eq!(cmsg_space(), 16);
-        } else if cfg!(target_pointer_width = "64") {
-            assert_eq!(mem::size_of::<libc::cmsghdr>(), 12);
-            assert_eq!(cmsg_len(), 20);
-            assert_eq!(cmsg_space(), 24);
-        } else {
-            assert_eq!(mem::size_of::<libc::cmsghdr>(), 12);
-            assert_eq!(cmsg_len(), 16);
-            assert_eq!(cmsg_space(), 16);
-        }
-    }
-
-    #[test]
-    fn fd_passing() {
-        let (tx, rx) = UnixStream::pair().unwrap();
-
-        let (send_tx, mut send_rx) = UnixStream::pair().unwrap();
-
-        let fd = send_tx.into_raw_fd();
-        assert_eq!(sendmsg(tx.as_raw_fd(), b"a", Some(fd)).unwrap(), 1);
-        unsafe { libc::close(fd) };
-
-        let mut buf = [0u8];
-        let (got, fd) = recvmsg(rx.as_raw_fd(), &mut buf).unwrap();
-        assert_eq!(got, 1);
-        assert_eq!(&buf, b"a");
-
-        let mut send_tx = unsafe { UnixStream::from_raw_fd(fd.unwrap()) };
-        assert_eq!(send_tx.write(b"b").unwrap(), 1);
-
-        let mut buf = [0u8];
-        assert_eq!(send_rx.read(&mut buf).unwrap(), 1);
-        assert_eq!(&buf, b"b");
-    }
+    cvt_r(|| unsafe {
+        libc::sendmsg(socket, &msghdr as *const _, flags)
+    })
 }
new file mode 100644
--- /dev/null
+++ b/media/audioipc/audioipc/src/rpc/client/mod.rs
@@ -0,0 +1,158 @@
+// This is a derived version of simple/pipeline/client.rs from
+// tokio_proto crate used under MIT license.
+//
+// Original version of client.rs:
+// https://github.com/tokio-rs/tokio-proto/commit/8fb8e482dcd55cf02ceee165f8e08eee799c96d3
+//
+// The following modifications were made:
+// * Simplify the code to implement RPC for pipeline requests that
+//   contain simple request/response messages:
+//   * Remove `Error` types,
+//   * Remove `bind_transport` fn & `BindTransport` type,
+//   * Remove all "Lift"ing functionality.
+//   * Remove `Service` trait since audioipc doesn't use `tokio_service`
+//     crate.
+//
+// Copyright (c) 2016 Tokio contributors
+//
+// Permission is hereby granted, free of charge, to any
+// person obtaining a copy of this software and associated
+// documentation files (the "Software"), to deal in the
+// Software without restriction, including without
+// limitation the rights to use, copy, modify, merge,
+// publish, distribute, sublicense, and/or sell copies of
+// the Software, and to permit persons to whom the Software
+// is furnished to do so, subject to the following
+// conditions:
+//
+// The above copyright notice and this permission notice
+// shall be included in all copies or substantial portions
+// of the Software.
+//
+// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF
+// ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED
+// TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A
+// PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT
+// SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY
+// CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION
+// OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR
+// IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
+// DEALINGS IN THE SOFTWARE.
+
+use futures::{Async, Future, Poll, Sink, Stream};
+use futures::sync::oneshot;
+use rpc::Handler;
+use rpc::driver::Driver;
+use std::collections::VecDeque;
+use std::io;
+use tokio_core::reactor::Handle;
+
+mod proxy;
+
+pub use self::proxy::{ClientProxy, Response};
+
+pub fn bind_client<C>(
+    transport: C::Transport,
+    handle: &Handle
+) -> proxy::ClientProxy<C::Request, C::Response>
+where
+    C: Client
+{
+    let (tx, rx) = proxy::channel();
+
+    let fut = {
+        let handler = ClientHandler::<C> {
+            transport: transport,
+            requests: rx,
+            in_flight: VecDeque::with_capacity(32)
+        };
+        Driver::new(handler)
+    };
+
+    // Spawn the RPC driver into task
+    handle.spawn(Box::new(fut.map_err(|_| ())));
+
+    tx
+}
+
+pub trait Client: 'static {
+    /// Request
+    type Request: 'static;
+
+    /// Response
+    type Response: 'static;
+
+    /// The message transport, which works with async I/O objects of type `A`
+    type Transport: 'static
+        + Stream<Item = Self::Response, Error = io::Error>
+        + Sink<SinkItem = Self::Request, SinkError = io::Error>;
+}
+
+////////////////////////////////////////////////////////////////////////////////
+
+struct ClientHandler<C>
+where
+    C: Client
+{
+    transport: C::Transport,
+    requests: proxy::Receiver<C::Request, C::Response>,
+    in_flight: VecDeque<oneshot::Sender<C::Response>>
+}
+
+impl<C> Handler for ClientHandler<C>
+where
+    C: Client
+{
+    type In = C::Response;
+    type Out = C::Request;
+    type Transport = C::Transport;
+
+    fn transport(&mut self) -> &mut Self::Transport {
+        &mut self.transport
+    }
+
+    fn consume(&mut self, response: Self::In) -> io::Result<()> {
+        trace!("ClientHandler::consume");
+        if let Some(complete) = self.in_flight.pop_front() {
+            drop(complete.send(response));
+        } else {
+            return Err(io::Error::new(
+                io::ErrorKind::Other,
+                "request / response mismatch"
+            ));
+        }
+
+        Ok(())
+    }
+
+    /// Produce a message
+    fn produce(&mut self) -> Poll<Option<Self::Out>, io::Error> {
+        trace!("ClientHandler::produce");
+
+        // Try to get a new request
+        match self.requests.poll() {
+            Ok(Async::Ready(Some((request, complete)))) => {
+                trace!("  --> received request");
+
+                // Track complete handle
+                self.in_flight.push_back(complete);
+
+                Ok(Some(request).into())
+            },
+            Ok(Async::Ready(None)) => {
+                trace!("  --> client dropped");
+                Ok(None.into())
+            },
+            Ok(Async::NotReady) => {
+                trace!("  --> not ready");
+                Ok(Async::NotReady)
+            },
+            Err(_) => unreachable!()
+        }
+    }
+
+    /// RPC currently in flight
+    fn has_in_flight(&self) -> bool {
+        !self.in_flight.is_empty()
+    }
+}
new file mode 100644
--- /dev/null
+++ b/media/audioipc/audioipc/src/rpc/client/proxy.rs
@@ -0,0 +1,140 @@
+// This is a derived version of client_proxy.rs from
+// tokio_proto crate used under MIT license.
+//
+// Original version of client_proxy.rs:
+// https://github.com/tokio-rs/tokio-proto/commit/8fb8e482dcd55cf02ceee165f8e08eee799c96d3
+//
+// The following modifications were made:
+// * Remove `Service` trait since audioipc doesn't use `tokio_service`
+//   crate.
+// * Remove `RefCell` from `ClientProxy` since cubeb is called from
+//   multiple threads. `mpsc::UnboundedSender` is thread safe.
+// * Simplify the interface to just request (`R`) and response (`Q`)
+//   removing error (`E`).
+// * Remove the `Envelope` type.
+// * Renamed `pair` to `channel` to represent that an `rpc::channel`
+//   is being created.
+//
+// Original License:
+//
+// Copyright (c) 2016 Tokio contributors
+//
+// Permission is hereby granted, free of charge, to any
+// person obtaining a copy of this software and associated
+// documentation files (the "Software"), to deal in the
+// Software without restriction, including without
+// limitation the rights to use, copy, modify, merge,
+// publish, distribute, sublicense, and/or sell copies of
+// the Software, and to permit persons to whom the Software
+// is furnished to do so, subject to the following
+// conditions:
+//
+// The above copyright notice and this permission notice
+// shall be included in all copies or substantial portions
+// of the Software.
+//
+// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF
+// ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED
+// TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A
+// PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT
+// SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY
+// CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION
+// OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR
+// IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
+// DEALINGS IN THE SOFTWARE.
+
+use futures::{Async, Future, Poll};
+use futures::sync::{mpsc, oneshot};
+use std::fmt;
+use std::io;
+
+/// Message used to dispatch requests to the task managing the
+/// client connection.
+pub type Request<R, Q> = (R, oneshot::Sender<Q>);
+
+/// Receive requests submitted to the client
+pub type Receiver<R, Q> = mpsc::UnboundedReceiver<Request<R, Q>>;
+
+/// Response future returned from a client
+pub struct Response<Q> {
+    inner: oneshot::Receiver<Q>
+}
+
+pub struct ClientProxy<R, Q> {
+    tx: mpsc::UnboundedSender<Request<R, Q>>
+}
+
+impl<R, Q> Clone for ClientProxy<R, Q> {
+    fn clone(&self) -> Self {
+        ClientProxy {
+            tx: self.tx.clone()
+        }
+    }
+}
+
+pub fn channel<R, Q>() -> (ClientProxy<R, Q>, Receiver<R, Q>) {
+    // Create a channel to send the requests to client-side of rpc.
+    let (tx, rx) = mpsc::unbounded();
+
+    // Wrap the `tx` part in ClientProxy so the rpc call interface
+    // can be implemented.
+    let client = ClientProxy {
+        tx
+    };
+
+    (client, rx)
+}
+
+impl<R, Q> ClientProxy<R, Q> {
+    pub fn call(&self, request: R) -> Response<Q> {
+        // The response to returned from the rpc client task over a
+        // oneshot channel.
+        let (tx, rx) = oneshot::channel();
+
+        // If send returns an Err, its because the other side has been dropped.
+        // By ignoring it, we are just dropping the `tx`, which will mean the
+        // rx will return Canceled when polled. In turn, that is translated
+        // into a BrokenPipe, which conveys the proper error.
+        let _ = self.tx.send((request, tx));
+
+        Response {
+            inner: rx
+        }
+    }
+}
+
+impl<R, Q> fmt::Debug for ClientProxy<R, Q>
+where
+    R: fmt::Debug,
+    Q: fmt::Debug
+{
+    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
+        write!(f, "ClientProxy {{ ... }}")
+    }
+}
+
+impl<Q> Future for Response<Q> {
+    type Item = Q;
+    type Error = io::Error;
+
+    fn poll(&mut self) -> Poll<Q, io::Error> {
+        match self.inner.poll() {
+            Ok(Async::Ready(res)) => Ok(Async::Ready(res)),
+            Ok(Async::NotReady) => Ok(Async::NotReady),
+            // Convert oneshot::Canceled into io::Error
+            Err(_) => {
+                let e = io::Error::new(io::ErrorKind::BrokenPipe, "broken pipe");
+                Err(e)
+            }
+        }
+    }
+}
+
+impl<Q> fmt::Debug for Response<Q>
+where
+    Q: fmt::Debug
+{
+    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
+        write!(f, "Response {{ ... }}")
+    }
+}
new file mode 100644
--- /dev/null
+++ b/media/audioipc/audioipc/src/rpc/driver.rs
@@ -0,0 +1,171 @@
+// Copyright © 2017 Mozilla Foundation
+//
+// This program is made available under an ISC-style license.  See the
+// accompanying file LICENSE for details
+
+use futures::{Async, AsyncSink, Future, Poll, Sink, Stream};
+use rpc::Handler;
+use std::fmt;
+use std::io;
+
+pub struct Driver<T>
+where
+    T: Handler
+{
+    // Glue
+    handler: T,
+
+    // True as long as the connection has more request frames to read.
+    run: bool,
+
+    // True when the transport is fully flushed
+    is_flushed: bool
+}
+
+impl<T> Driver<T>
+where
+    T: Handler
+{
+    /// Create a new rpc driver with the given service and transport.
+    pub fn new(handler: T) -> Driver<T> {
+        Driver {
+            handler: handler,
+            run: true,
+            is_flushed: true
+        }
+    }
+
+    /// Returns true if the driver has nothing left to do
+    fn is_done(&self) -> bool {
+        !self.run && self.is_flushed && !self.has_in_flight()
+    }
+
+    /// Process incoming messages off the transport.
+    fn receive_incoming(&mut self) -> io::Result<()> {
+        while self.run {
+            if let Async::Ready(req) = try!(self.handler.transport().poll()) {
+                try!(self.process_incoming(req));
+            } else {
+                break;
+            }
+        }
+        Ok(())
+    }
+
+    /// Process an incoming message
+    fn process_incoming(&mut self, req: Option<T::In>) -> io::Result<()> {
+        trace!("process_incoming");
+        // At this point, the service & transport are ready to process the
+        // request, no matter what it is.
+        match req {
+            Some(message) => {
+                trace!("received message");
+
+                if let Err(e) = self.handler.consume(message) {
+                    // TODO: Should handler be infalliable?
+                    panic!("unimplemented error handling: {:?}", e);
+                }
+            },
+            None => {
+                trace!("received None");
+                // At this point, we just return. This works
+                // because poll with be called again and go
+                // through the receive-cycle again.
+                self.run = false;
+            }
+        }
+
+        Ok(())
+    }
+
+    /// Send outgoing messages to the transport.
+    fn send_outgoing(&mut self) -> io::Result<()> {
+        trace!("send_responses");
+        loop {
+            match try!(self.handler.produce()) {
+                Async::Ready(Some(message)) => {
+                    trace!("  --> got message");
+                    try!(self.process_outgoing(message));
+                },
+                Async::Ready(None) => {
+                    trace!("  --> got None");
+                    // The service is done with the connection.
+                    break;
+                },
+                // Nothing to dispatch
+                Async::NotReady => break
+            }
+        }
+
+        Ok(())
+    }
+
+    fn process_outgoing(&mut self, message: T::Out) -> io::Result<()> {
+        trace!("process_outgoing");
+        try!(assert_send(&mut self.handler.transport(), message));
+
+        Ok(())
+    }
+
+    fn flush(&mut self) -> io::Result<()> {
+        self.is_flushed = try!(self.handler.transport().poll_complete()).is_ready();
+
+        // TODO:
+        Ok(())
+    }
+
+    fn has_in_flight(&self) -> bool {
+        self.handler.has_in_flight()
+    }
+}
+
+impl<T> Future for Driver<T>
+where
+    T: Handler
+{
+    type Item = ();
+    type Error = io::Error;
+
+    fn poll(&mut self) -> Poll<(), Self::Error> {
+        trace!("rpc::Driver::tick");
+
+        // First read off data from the socket
+        try!(self.receive_incoming());
+
+        // Handle completed responses
+        try!(self.send_outgoing());
+
+        // Try flushing buffered writes
+        try!(self.flush());
+
+        if self.is_done() {
+            return Ok(().into());
+        }
+
+        // Tick again later
+        Ok(Async::NotReady)
+    }
+}
+
+fn assert_send<S: Sink>(s: &mut S, item: S::SinkItem) -> Result<(), S::SinkError> {
+    match try!(s.start_send(item)) {
+        AsyncSink::Ready => Ok(()),
+        AsyncSink::NotReady(_) => panic!(
+            "sink reported itself as ready after `poll_ready` but was \
+             then unable to accept a message"
+        )
+    }
+}
+
+impl<T> fmt::Debug for Driver<T>
+where
+    T: Handler + fmt::Debug
+{
+    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
+        f.debug_struct("rpc::Handler")
+            .field("handler", &self.handler)
+            .field("run", &self.run)
+            .field("is_flushed", &self.is_flushed)
+            .finish()
+    }
+}
new file mode 100644
--- /dev/null
+++ b/media/audioipc/audioipc/src/rpc/mod.rs
@@ -0,0 +1,36 @@
+// Copyright © 2017 Mozilla Foundation
+//
+// This program is made available under an ISC-style license.  See the
+// accompanying file LICENSE for details
+
+use futures::{Poll, Sink, Stream};
+use std::io;
+
+mod driver;
+mod client;
+mod server;
+
+pub use self::client::{bind_client, Client, ClientProxy, Response};
+pub use self::server::{bind_server, Server};
+
+pub trait Handler {
+    /// Message type read from transport
+    type In;
+    /// Message type written to transport
+    type Out;
+    type Transport: 'static
+        + Stream<Item = Self::In, Error = io::Error>
+        + Sink<SinkItem = Self::Out, SinkError = io::Error>;
+
+    /// Mutable reference to the transport
+    fn transport(&mut self) -> &mut Self::Transport;
+
+    /// Consume a request
+    fn consume(&mut self, message: Self::In) -> io::Result<()>;
+
+    /// Produce a response
+    fn produce(&mut self) -> Poll<Option<Self::Out>, io::Error>;
+
+    /// RPC currently in flight
+    fn has_in_flight(&self) -> bool;
+}
new file mode 100644
--- /dev/null
+++ b/media/audioipc/audioipc/src/rpc/server.rs
@@ -0,0 +1,178 @@
+// This is a derived version of simple/pipeline/server.rs from
+// tokio_proto crate used under MIT license.
+//
+// Original version of server.rs:
+// https://github.com/tokio-rs/tokio-proto/commit/8fb8e482dcd55cf02ceee165f8e08eee799c96d3
+//
+// The following modifications were made:
+// * Simplify the code to implement RPC for pipeline requests that
+//   contain simple request/response messages:
+//   * Remove `Error` types,
+//   * Remove `bind_transport` fn & `BindTransport` type,
+//   * Remove all "Lift"ing functionality.
+//   * Remove `Service` trait since audioipc doesn't use `tokio_service`
+//     crate.
+//
+// Copyright (c) 2016 Tokio contributors
+//
+// Permission is hereby granted, free of charge, to any
+// person obtaining a copy of this software and associated
+// documentation files (the "Software"), to deal in the
+// Software without restriction, including without
+// limitation the rights to use, copy, modify, merge,
+// publish, distribute, sublicense, and/or sell copies of
+// the Software, and to permit persons to whom the Software
+// is furnished to do so, subject to the following
+// conditions:
+//
+// The above copyright notice and this permission notice
+// shall be included in all copies or substantial portions
+// of the Software.
+//
+// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF
+// ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED
+// TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A
+// PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT
+// SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY
+// CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION
+// OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR
+// IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
+// DEALINGS IN THE SOFTWARE.
+
+use futures::{Async, Future, Poll, Sink, Stream};
+use rpc::Handler;
+use rpc::driver::Driver;
+use std::collections::VecDeque;
+use std::io;
+use tokio_core::reactor::Handle;
+
+/// Bind an async I/O object `io` to the `server`.
+pub fn bind_server<S>(transport: S::Transport, server: S, handle: &Handle)
+where
+    S: Server
+{
+    let fut = {
+        let handler = ServerHandler {
+            server: server,
+            transport: transport,
+            in_flight: VecDeque::with_capacity(32)
+        };
+        Driver::new(handler)
+    };
+
+    // Spawn the RPC driver into task
+    handle.spawn(Box::new(fut.map_err(|_| ())))
+}
+
+pub trait Server: 'static {
+    /// Request
+    type Request: 'static;
+
+    /// Response
+    type Response: 'static;
+
+    /// Future
+    type Future: Future<Item = Self::Response, Error = ()>;
+
+    /// The message transport, which works with async I/O objects of
+    /// type `A`.
+    type Transport: 'static
+        + Stream<Item = Self::Request, Error = io::Error>
+        + Sink<SinkItem = Self::Response, SinkError = io::Error>;
+
+    /// Process the request and return the response asynchronously.
+    fn process(&mut self, req: Self::Request) -> Self::Future;
+}
+
+
+////////////////////////////////////////////////////////////////////////////////
+
+struct ServerHandler<S>
+where
+    S: Server
+{
+    // The service handling the connection
+    server: S,
+    // The transport responsible for sending/receving messages over the wire
+    transport: S::Transport,
+    // FIFO of "in flight" responses to requests.
+    in_flight: VecDeque<InFlight<S::Future>>
+}
+
+impl<S> Handler for ServerHandler<S>
+where
+    S: Server
+{
+    type In = S::Request;
+    type Out = S::Response;
+    type Transport = S::Transport;
+
+    /// Mutable reference to the transport
+    fn transport(&mut self) -> &mut Self::Transport {
+        &mut self.transport
+    }
+
+    /// Consume a message
+    fn consume(&mut self, request: Self::In) -> io::Result<()> {
+        trace!("ServerHandler::consume");
+        let response = self.server.process(request);
+        self.in_flight.push_back(InFlight::Active(response));
+
+        // TODO: Should the error be handled differently?
+        Ok(())
+    }
+
+    /// Produce a message
+    fn produce(&mut self) -> Poll<Option<Self::Out>, io::Error> {
+        trace!("ServerHandler::produce");
+
+        // Make progress on pending responses
+        for pending in &mut self.in_flight {
+            pending.poll();
+        }
+
+        // Is the head of the queue ready?
+        match self.in_flight.front() {
+            Some(&InFlight::Done(_)) => {},
+            _ => {
+                trace!("  --> not ready");
+                return Ok(Async::NotReady);
+            }
+        }
+
+        // Return the ready response
+        match self.in_flight.pop_front() {
+            Some(InFlight::Done(res)) => {
+                trace!("  --> received response");
+                Ok(Async::Ready(Some(res)))
+            },
+            _ => panic!()
+        }
+    }
+
+    /// RPC currently in flight
+    fn has_in_flight(&self) -> bool {
+        !self.in_flight.is_empty()
+    }
+}
+
+////////////////////////////////////////////////////////////////////////////////
+
+enum InFlight<F: Future<Error = ()>> {
+    Active(F),
+    Done(F::Item)
+}
+
+impl<F: Future<Error = ()>> InFlight<F> {
+    fn poll(&mut self) {
+        let res = match *self {
+            InFlight::Active(ref mut f) => match f.poll() {
+                Ok(Async::Ready(e)) => e,
+                Err(_) => unreachable!(),
+                Ok(Async::NotReady) => return
+            },
+            _ => return
+        };
+        *self = InFlight::Done(res);
+    }
+}
--- a/media/audioipc/audioipc/src/shm.rs
+++ b/media/audioipc/audioipc/src/shm.rs
@@ -1,31 +1,35 @@
+use errors::*;
+use memmap::{Mmap, MmapViewSync, Protection};
+use std::fs::{remove_file, File, OpenOptions};
 use std::path::Path;
-use std::fs::{remove_file, OpenOptions, File};
 use std::sync::atomic;
-use memmap::{Mmap, Protection};
-use errors::*;
 
 pub struct SharedMemReader {
-    mmap: Mmap,
+    mmap: Mmap
 }
 
 impl SharedMemReader {
     pub fn new(path: &Path, size: usize) -> Result<(SharedMemReader, File)> {
-        let file = OpenOptions::new().read(true)
-                                     .write(true)
-                                     .create_new(true)
-                                     .open(path)?;
+        let file = OpenOptions::new()
+            .read(true)
+            .write(true)
+            .create_new(true)
+            .open(path)?;
         let _ = remove_file(path);
         file.set_len(size as u64)?;
         let mmap = Mmap::open(&file, Protection::Read)?;
         assert_eq!(mmap.len(), size);
-        Ok((SharedMemReader {
-            mmap
-        }, file))
+        Ok((
+            SharedMemReader {
+                mmap
+            },
+            file
+        ))
     }
 
     pub fn read(&self, buf: &mut [u8]) -> Result<()> {
         if buf.is_empty() {
             return Ok(());
         }
         // TODO: Track how much is in the shm area.
         if buf.len() <= self.mmap.len() {
@@ -37,59 +41,74 @@ impl SharedMemReader {
             Ok(())
         } else {
             bail!("mmap size");
         }
     }
 }
 
 pub struct SharedMemSlice {
-    mmap: Mmap,
+    view: MmapViewSync
 }
 
 impl SharedMemSlice {
-    pub fn from(file: File, size: usize) -> Result<SharedMemSlice> {
-        let mmap = Mmap::open(&file, Protection::Read)?;
+    pub fn from(file: &File, size: usize) -> Result<SharedMemSlice> {
+        let mmap = Mmap::open(file, Protection::Read)?;
         assert_eq!(mmap.len(), size);
+        let view = mmap.into_view_sync();
         Ok(SharedMemSlice {
-            mmap
+            view
         })
     }
 
     pub fn get_slice(&self, size: usize) -> Result<&[u8]> {
         if size == 0 {
             return Ok(&[]);
         }
         // TODO: Track how much is in the shm area.
-        if size <= self.mmap.len() {
+        if size <= self.view.len() {
             atomic::fence(atomic::Ordering::Acquire);
-            let buf = unsafe { &self.mmap.as_slice()[..size] };
+            let buf = unsafe { &self.view.as_slice()[..size] };
             Ok(buf)
         } else {
             bail!("mmap size");
         }
     }
+
+    /// Clones the view of the memory map.
+    ///
+    /// The underlying memory map is shared, and thus the caller must ensure that the memory
+    /// underlying the view is not illegally aliased.
+    pub unsafe fn clone_view(&self) -> Self {
+        SharedMemSlice {
+            view: self.view.clone()
+        }
+    }
 }
 
 pub struct SharedMemWriter {
-    mmap: Mmap,
+    mmap: Mmap
 }
 
 impl SharedMemWriter {
     pub fn new(path: &Path, size: usize) -> Result<(SharedMemWriter, File)> {
-        let file = OpenOptions::new().read(true)
-                                     .write(true)
-                                     .create_new(true)
-                                     .open(path)?;
+        let file = OpenOptions::new()
+            .read(true)
+            .write(true)
+            .create_new(true)
+            .open(path)?;
         let _ = remove_file(path);
         file.set_len(size as u64)?;
         let mmap = Mmap::open(&file, Protection::ReadWrite)?;
-        Ok((SharedMemWriter {
-            mmap
-        }, file))
+        Ok((
+            SharedMemWriter {
+                mmap
+            },
+            file
+        ))
     }
 
     pub fn write(&mut self, buf: &[u8]) -> Result<()> {
         if buf.is_empty() {
             return Ok(());
         }
         // TODO: Track how much is in the shm area.
         if buf.len() <= self.mmap.len() {
@@ -100,34 +119,46 @@ impl SharedMemWriter {
             Ok(())
         } else {
             bail!("mmap size");
         }
     }
 }
 
 pub struct SharedMemMutSlice {
-    mmap: Mmap,
+    view: MmapViewSync
 }
 
 impl SharedMemMutSlice {
-    pub fn from(file: File, size: usize) -> Result<SharedMemMutSlice> {
-        let mmap = Mmap::open(&file, Protection::ReadWrite)?;
+    pub fn from(file: &File, size: usize) -> Result<SharedMemMutSlice> {
+        let mmap = Mmap::open(file, Protection::ReadWrite)?;
         assert_eq!(mmap.len(), size);
+        let view = mmap.into_view_sync();
         Ok(SharedMemMutSlice {
-            mmap
+            view
         })
     }
 
     pub fn get_mut_slice(&mut self, size: usize) -> Result<&mut [u8]> {
         if size == 0 {
             return Ok(&mut []);
         }
         // TODO: Track how much is in the shm area.
-        if size <= self.mmap.len() {
-            let buf = unsafe { &mut self.mmap.as_mut_slice()[..size] };
+        if size <= self.view.len() {
+            let buf = unsafe { &mut self.view.as_mut_slice()[..size] };
             atomic::fence(atomic::Ordering::Release);
             Ok(buf)
         } else {
             bail!("mmap size");
         }
     }
+
+    /// Clones the view of the memory map.
+    ///
+    /// The underlying memory map is shared, and thus the caller must
+    /// ensure that the memory underlying the view is not illegally
+    /// aliased.
+    pub unsafe fn clone_view(&self) -> Self {
+        SharedMemMutSlice {
+            view: self.view.clone()
+        }
+    }
 }
--- a/media/audioipc/client/Cargo.toml
+++ b/media/audioipc/client/Cargo.toml
@@ -1,12 +1,21 @@
 [package]
 name = "audioipc-client"
-version = "0.1.0"
-authors = ["Dan Glastonbury <dan.glastonbury@gmail.com>"]
+version = "0.2.0"
+authors = [
+        "Matthew Gregan <kinetik@flim.org>",
+        "Dan Glastonbury <dan.glastonbury@gmail.com>"
+        ]
 description = "Cubeb Backend for talking to remote cubeb server."
 
 [dependencies]
 audioipc = { path="../audioipc" }
-cubeb-backend = { path="../../cubeb-rs/cubeb-backend" }
-cubeb-core = { path="../../cubeb-rs/cubeb-core" }
+cubeb-backend = { path = "../../cubeb-rs/cubeb-backend" }
+cubeb-core = { path = "../../cubeb-rs/cubeb-core" }
+# rayon-core in Gecko uses futures 0.1.13
+futures = { version="=0.1.13", default-features=false, features=["use_std"] }
+# futures-cpupool 0.1.5 matches futures 0.1.13
+futures-cpupool = { version="=0.1.5", default-features=false }
 libc = "0.2"
 log = "^0.3.6"
+tokio-core = "0.1"
+tokio-uds = "0.1.7"
\ No newline at end of file
--- a/media/audioipc/client/src/context.rs
+++ b/media/audioipc/client/src/context.rs
@@ -1,102 +1,171 @@
 // Copyright © 2017 Mozilla Foundation
 //
 // This program is made available under an ISC-style license.  See the
 // accompanying file LICENSE for details
 
 use ClientStream;
 use assert_not_in_callback;
-use audioipc::{ClientMessage, Connection, ServerMessage, messages};
+use audioipc::{messages, ClientMessage, ServerMessage};
+use audioipc::{core, rpc};
+use audioipc::codec::LengthDelimitedCodec;
+use audioipc::fd_passing::{framed_with_fds, FramedWithFds};
 use cubeb_backend::{Context, Ops};
-use cubeb_core::{DeviceId, DeviceType, Error, ErrorCode, Result, StreamParams, ffi};
+use cubeb_core::{ffi, DeviceId, DeviceType, Error, Result, StreamParams};
 use cubeb_core::binding::Binding;
+use futures::Future;
+use futures_cpupool::{self, CpuPool};
+use libc;
+use std::{fmt, io, mem};
 use std::ffi::{CStr, CString};
-use std::mem;
 use std::os::raw::c_void;
-use std::os::unix::net::UnixStream;
 use std::os::unix::io::FromRawFd;
-use std::sync::{Mutex, MutexGuard};
+use std::os::unix::net;
+use std::sync::mpsc;
 use stream;
-use libc;
+use tokio_core::reactor::{Handle, Remote};
+use tokio_uds::UnixStream;
+
+struct CubebClient;
 
-#[derive(Debug)]
-pub struct ClientContext {
-    _ops: *const Ops,
-    connection: Mutex<Connection>
+impl rpc::Client for CubebClient {
+    type Request = ServerMessage;
+    type Response = ClientMessage;
+    type Transport =
+        FramedWithFds<UnixStream, LengthDelimitedCodec<Self::Request, Self::Response>>;
 }
 
+macro_rules! t(
+    ($e:expr) => (
+        match $e {
+            Ok(e) => e,
+            Err(_) => return Err(Error::default())
+        }
+    ));
+
 pub const CLIENT_OPS: Ops = capi_new!(ClientContext, ClientStream);
 
+pub struct ClientContext {
+    _ops: *const Ops,
+    rpc: rpc::ClientProxy<ServerMessage, ClientMessage>,
+    core: core::CoreThread,
+    cpu_pool: CpuPool
+}
+
 impl ClientContext {
     #[doc(hidden)]
-    pub fn connection(&self) -> MutexGuard<Connection> {
-        self.connection.lock().unwrap()
+    pub fn remote(&self) -> Remote {
+        self.core.remote()
+    }
+
+    #[doc(hidden)]
+    pub fn rpc(&self) -> rpc::ClientProxy<ServerMessage, ClientMessage> {
+        self.rpc.clone()
+    }
+
+    #[doc(hidden)]
+    pub fn cpu_pool(&self) -> CpuPool {
+        self.cpu_pool.clone()
     }
 }
 
 // TODO: encapsulate connect, etc inside audioipc.
-fn open_server_stream() -> Result<UnixStream> {
+fn open_server_stream() -> Result<net::UnixStream> {
     unsafe {
         if let Some(fd) = super::G_SERVER_FD {
-            return Ok(UnixStream::from_raw_fd(fd));
+            return Ok(net::UnixStream::from_raw_fd(fd));
         }
 
         Err(Error::default())
     }
 }
 
 impl Context for ClientContext {
     fn init(_context_name: Option<&CStr>) -> Result<*mut ffi::cubeb> {
+        fn bind_and_send_client(
+            stream: UnixStream,
+            handle: &Handle,
+            tx_rpc: &mpsc::Sender<rpc::ClientProxy<ServerMessage, ClientMessage>>
+        ) -> Option<()> {
+            let transport = framed_with_fds(stream, Default::default());
+            let rpc = rpc::bind_client::<CubebClient>(transport, handle);
+            // If send fails then the rx end has closed
+            // which is unlikely here.
+            let _ = tx_rpc.send(rpc);
+            Some(())
+        }
+
         assert_not_in_callback();
+
+        let (tx_rpc, rx_rpc) = mpsc::channel();
+
+        let core = t!(core::spawn_thread("AudioIPC Client RPC", move || {
+            let handle = core::handle();
+
+            open_server_stream().ok()
+                .and_then(|stream| UnixStream::from_stream(stream, &handle).ok())
+                .and_then(|stream| bind_and_send_client(stream, &handle, &tx_rpc))
+                .ok_or_else(|| io::Error::new(
+                    io::ErrorKind::Other,
+                    "Failed to open stream and create rpc."
+                ))
+        }));
+
+        let rpc = t!(rx_rpc.recv());
+
+        let cpupool = futures_cpupool::Builder::new()
+            .name_prefix("AudioIPC")
+            .create();
+
         let ctx = Box::new(ClientContext {
             _ops: &CLIENT_OPS as *const _,
-            connection: Mutex::new(Connection::new(open_server_stream()?))
+            rpc: rpc,
+            core: core,
+            cpu_pool: cpupool
         });
         Ok(Box::into_raw(ctx) as *mut _)
     }
 
     fn backend_id(&self) -> &'static CStr {
         assert_not_in_callback();
         unsafe { CStr::from_ptr(b"remote\0".as_ptr() as *const _) }
     }
 
     fn max_channel_count(&self) -> Result<u32> {
         assert_not_in_callback();
-        let mut conn = self.connection();
-        send_recv!(conn, ContextGetMaxChannelCount => ContextMaxChannelCount())
+        send_recv!(self.rpc(), ContextGetMaxChannelCount => ContextMaxChannelCount())
     }
 
     fn min_latency(&self, params: &StreamParams) -> Result<u32> {
         assert_not_in_callback();
         let params = messages::StreamParams::from(unsafe { &*params.raw() });
-        let mut conn = self.connection();
-        send_recv!(conn, ContextGetMinLatency(params) => ContextMinLatency())
+        send_recv!(self.rpc(), ContextGetMinLatency(params) => ContextMinLatency())
     }
 
     fn preferred_sample_rate(&self) -> Result<u32> {
         assert_not_in_callback();
-        let mut conn = self.connection();
-        send_recv!(conn, ContextGetPreferredSampleRate => ContextPreferredSampleRate())
+        send_recv!(self.rpc(), ContextGetPreferredSampleRate => ContextPreferredSampleRate())
     }
 
     fn preferred_channel_layout(&self) -> Result<ffi::cubeb_channel_layout> {
         assert_not_in_callback();
-        let mut conn = self.connection();
-        send_recv!(conn, ContextGetPreferredChannelLayout => ContextPreferredChannelLayout())
+        send_recv!(self.rpc(),
+                   ContextGetPreferredChannelLayout => ContextPreferredChannelLayout())
     }
 
     fn enumerate_devices(&self, devtype: DeviceType) -> Result<ffi::cubeb_device_collection> {
         assert_not_in_callback();
-        let mut conn = self.connection();
-        let v: Vec<ffi::cubeb_device_info> =
-            match send_recv!(conn, ContextGetDeviceEnumeration(devtype.bits()) => ContextEnumeratedDevices()) {
-                Ok(mut v) => v.drain(..).map(|i| i.into()).collect(),
-                Err(e) => return Err(e),
-            };
+        let v: Vec<ffi::cubeb_device_info> = match send_recv!(self.rpc(),
+                             ContextGetDeviceEnumeration(devtype.bits()) =>
+                             ContextEnumeratedDevices())
+        {
+            Ok(mut v) => v.drain(..).map(|i| i.into()).collect(),
+            Err(e) => return Err(e)
+        };
         let vs = v.into_boxed_slice();
         let coll = ffi::cubeb_device_collection {
             count: vs.len(),
             device: vs.as_ptr()
         };
         // Giving away the memory owned by vs.  Don't free it!
         // Reclaimed in `device_collection_destroy`.
         mem::forget(vs);
@@ -135,30 +204,32 @@ impl Context for ClientContext {
         input_device: DeviceId,
         input_stream_params: Option<&ffi::cubeb_stream_params>,
         output_device: DeviceId,
         output_stream_params: Option<&ffi::cubeb_stream_params>,
         latency_frame: u32,
         // These params aren't sent to the server
         data_callback: ffi::cubeb_data_callback,
         state_callback: ffi::cubeb_state_callback,
-        user_ptr: *mut c_void,
+        user_ptr: *mut c_void
     ) -> Result<*mut ffi::cubeb_stream> {
         assert_not_in_callback();
 
-        fn opt_stream_params(p: Option<&ffi::cubeb_stream_params>) -> Option<messages::StreamParams> {
+        fn opt_stream_params(
+            p: Option<&ffi::cubeb_stream_params>
+        ) -> Option<messages::StreamParams> {
             match p {
                 Some(raw) => Some(messages::StreamParams::from(raw)),
-                None => None,
+                None => None
             }
         }
 
         let stream_name = match stream_name {
             Some(s) => Some(s.to_bytes().to_vec()),
-            None => None,
+            None => None
         };
 
         let input_stream_params = opt_stream_params(input_stream_params);
         let output_stream_params = opt_stream_params(output_stream_params);
 
         let init_params = messages::StreamInitParams {
             stream_name: stream_name,
             input_device: input_device.raw() as _,
@@ -169,36 +240,37 @@ impl Context for ClientContext {
         };
         stream::init(self, init_params, data_callback, state_callback, user_ptr)
     }
 
     fn register_device_collection_changed(
         &self,
         _dev_type: DeviceType,
         _collection_changed_callback: ffi::cubeb_device_collection_changed_callback,
-        _user_ptr: *mut c_void,
+        _user_ptr: *mut c_void
     ) -> Result<()> {
         assert_not_in_callback();
         Ok(())
     }
 }
 
 impl Drop for ClientContext {
     fn drop(&mut self) {
-        let mut conn = self.connection();
         info!("ClientContext drop...");
-        let r = conn.send(ServerMessage::ClientDisconnect);
-        if r.is_err() {
-            debug!("ClientContext::Drop send error={:?}", r);
-        } else {
-            let r = conn.receive();
-            if let Ok(ClientMessage::ClientDisconnected) = r {
-            } else {
-                debug!("ClientContext::Drop receive error={:?}", r);
-            }
-        }
+        let _ = send_recv!(self.rpc(), ClientDisconnect => ClientDisconnected);
         unsafe {
             if super::G_SERVER_FD.is_some() {
                 libc::close(super::G_SERVER_FD.take().unwrap());
             }
         }
     }
 }
+
+impl fmt::Debug for ClientContext {
+    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
+        f.debug_struct("ClientContext")
+            .field("_ops", &self._ops)
+            .field("rpc", &self.rpc)
+            .field("core", &self.core)
+            .field("cpu_pool", &"...")
+            .finish()
+    }
+}
--- a/media/audioipc/client/src/lib.rs
+++ b/media/audioipc/client/src/lib.rs
@@ -1,20 +1,24 @@
 // Copyright © 2017 Mozilla Foundation
 //
 // This program is made available under an ISC-style license.  See the
 // accompanying file LICENSE for details.
 
 extern crate audioipc;
-extern crate cubeb_core;
 #[macro_use]
 extern crate cubeb_backend;
+extern crate cubeb_core;
+extern crate futures;
+extern crate futures_cpupool;
 extern crate libc;
 #[macro_use]
 extern crate log;
+extern crate tokio_core;
+extern crate tokio_uds;
 
 #[macro_use]
 mod send_recv;
 mod context;
 mod stream;
 
 use context::ClientContext;
 use cubeb_backend::capi;
@@ -37,17 +41,21 @@ fn assert_not_in_callback() {
         assert_eq!(*b.borrow(), false);
     });
 }
 
 static mut G_SERVER_FD: Option<RawFd> = None;
 
 #[no_mangle]
 /// Entry point from C code.
-pub unsafe extern "C" fn audioipc_client_init(c: *mut *mut ffi::cubeb, context_name: *const c_char, server_connection: c_int) -> c_int {
+pub unsafe extern "C" fn audioipc_client_init(
+    c: *mut *mut ffi::cubeb,
+    context_name: *const c_char,
+    server_connection: c_int
+) -> c_int {
     // TODO: Windows portability (for fd).
     // TODO: Better way to pass extra parameters to Context impl.
     if G_SERVER_FD.is_some() {
         panic!("audioipc client's server connection already initialized.");
     }
     if server_connection >= 0 {
         G_SERVER_FD = Some(server_connection);
     }
--- a/media/audioipc/client/src/send_recv.rs
+++ b/media/audioipc/client/src/send_recv.rs
@@ -1,66 +1,68 @@
+use cubeb_core::Error;
+use cubeb_core::ffi;
+
+#[doc(hidden)]
+pub fn _err<E>(e: E) -> Error
+where
+    E: Into<Option<ffi::cubeb_error_code>>
+{
+    match e.into() {
+        Some(e) => unsafe { Error::from_raw(e) },
+        None => Error::new()
+    }
+}
+
 #[macro_export]
 macro_rules! send_recv {
-    ($conn:expr, $smsg:ident => $rmsg:ident) => {{
-        send_recv!(__send $conn, $smsg);
-        send_recv!(__recv $conn, $rmsg)
+    ($rpc:expr, $smsg:ident => $rmsg:ident) => {{
+        let resp = send_recv!(__send $rpc, $smsg);
+        send_recv!(__recv resp, $rmsg)
     }};
-    ($conn:expr, $smsg:ident => $rmsg:ident()) => {{
-        send_recv!(__send $conn, $smsg);
-        send_recv!(__recv $conn, $rmsg __result)
+    ($rpc:expr, $smsg:ident => $rmsg:ident()) => {{
+        let resp = send_recv!(__send $rpc, $smsg);
+        send_recv!(__recv resp, $rmsg __result)
     }};
-    ($conn:expr, $smsg:ident($($a:expr),*) => $rmsg:ident) => {{
-        send_recv!(__send $conn, $smsg, $($a),*);
-        send_recv!(__recv $conn, $rmsg)
+    ($rpc:expr, $smsg:ident($($a:expr),*) => $rmsg:ident) => {{
+        let resp = send_recv!(__send $rpc, $smsg, $($a),*);
+        send_recv!(__recv resp, $rmsg)
     }};
-    ($conn:expr, $smsg:ident($($a:expr),*) => $rmsg:ident()) => {{
-        send_recv!(__send $conn, $smsg, $($a),*);
-        send_recv!(__recv $conn, $rmsg __result)
+    ($rpc:expr, $smsg:ident($($a:expr),*) => $rmsg:ident()) => {{
+        let resp = send_recv!(__send $rpc, $smsg, $($a),*);
+        send_recv!(__recv resp, $rmsg __result)
     }};
     //
-    (__send $conn:expr, $smsg:ident) => ({
-        let r = $conn.send(ServerMessage::$smsg);
-        if r.is_err() {
-            debug!("send error - got={:?}", r);
-            return Err(ErrorCode::Error.into());
-        }
+    (__send $rpc:expr, $smsg:ident) => ({
+        $rpc.call(ServerMessage::$smsg)
+    });
+    (__send $rpc:expr, $smsg:ident, $($a:expr),*) => ({
+        $rpc.call(ServerMessage::$smsg($($a),*))
     });
-    (__send $conn:expr, $smsg:ident, $($a:expr),*) => ({
-        let r = $conn.send(ServerMessage::$smsg($($a),*));
-        if r.is_err() {
-            debug!("send error - got={:?}", r);
-            return Err(ErrorCode::Error.into());
-        }
-    });
-    (__recv $conn:expr, $rmsg:ident) => ({
-        match $conn.receive() {
+    (__recv $resp:expr, $rmsg:ident) => ({
+        match $resp.wait() {
             Ok(ClientMessage::$rmsg) => Ok(()),
-            // Macro can see ErrorCode but not Error? I don't understand.
-            // ::cubeb_core::Error is fragile but this isn't general purpose code.
-            Ok(ClientMessage::Error(e)) => Err(unsafe { ::cubeb_core::Error::from_raw(e) }),
+            Ok(ClientMessage::Error(e)) => Err($crate::send_recv::_err(e)),
             Ok(m) => {
-                debug!("receive unexpected message - got={:?}", m);
-                Err(ErrorCode::Error.into())
+                debug!("received wrong message - got={:?}", m);
+                Err($crate::send_recv::_err(None))
             },
             Err(e) => {
-                debug!("receive error - got={:?}", e);
-                Err(ErrorCode::Error.into())
+                debug!("received error from rpc - got={:?}", e);
+                Err($crate::send_recv::_err(None))
             },
         }
     });
-    (__recv $conn:expr, $rmsg:ident __result) => ({
-        match $conn.receive() {
+    (__recv $resp:expr, $rmsg:ident __result) => ({
+        match $resp.wait() {
             Ok(ClientMessage::$rmsg(v)) => Ok(v),
-            // Macro can see ErrorCode but not Error? I don't understand.
-            // ::cubeb_core::Error is fragile but this isn't general purpose code.
-            Ok(ClientMessage::Error(e)) => Err(unsafe { ::cubeb_core::Error::from_raw(e) }),
+            Ok(ClientMessage::Error(e)) => Err($crate::send_recv::_err(e)),
             Ok(m) => {
-                debug!("receive unexpected message - got={:?}", m);
-                Err(ErrorCode::Error.into())
+                debug!("received wrong message - got={:?}", m);
+                Err($crate::send_recv::_err(None))
             },
             Err(e) => {
-                debug!("receive error - got={:?}", e);
-                Err(ErrorCode::Error.into())
+                debug!("received error - got={:?}", e);
+                Err($crate::send_recv::_err(None))
             },
         }
     })
 }
--- a/media/audioipc/client/src/stream.rs
+++ b/media/audioipc/client/src/stream.rs
@@ -1,277 +1,225 @@
 // Copyright © 2017 Mozilla Foundation
 //
 // This program is made available under an ISC-style license.  See the
 // accompanying file LICENSE for details
 
+use {assert_not_in_callback, set_in_callback};
 use ClientContext;
-use {set_in_callback, assert_not_in_callback};
-use audioipc::{ClientMessage, Connection, ServerMessage, messages};
+use audioipc::codec::LengthDelimitedCodec;
+use audioipc::frame::{framed, Framed};
+use audioipc::messages::{self, CallbackReq, CallbackResp, ClientMessage, ServerMessage};
+use audioipc::rpc;
 use audioipc::shm::{SharedMemMutSlice, SharedMemSlice};
 use cubeb_backend::Stream;
-use cubeb_core::{ErrorCode, Result, ffi};
+use cubeb_core::{ffi, Result};
+use futures::Future;
+use futures_cpupool::{CpuFuture, CpuPool};
 use std::ffi::CString;
 use std::fs::File;
 use std::os::raw::c_void;
 use std::os::unix::io::FromRawFd;
+use std::os::unix::net;
 use std::ptr;
-use std::thread;
+use std::sync::mpsc;
+use tokio_uds::UnixStream;
 
 // TODO: Remove and let caller allocate based on cubeb backend requirements.
 const SHM_AREA_SIZE: usize = 2 * 1024 * 1024;
 
 pub struct ClientStream<'ctx> {
     // This must be a reference to Context for cubeb, cubeb accesses stream methods via stream->context->ops
     context: &'ctx ClientContext,
-    token: usize,
-    join_handle: Option<thread::JoinHandle<()>>
+    token: usize
 }
 
-fn stream_thread(
-    mut conn: Connection,
-    input_shm: &SharedMemSlice,
-    mut output_shm: SharedMemMutSlice,
+struct CallbackServer {
+    input_shm: SharedMemSlice,
+    output_shm: SharedMemMutSlice,
     data_cb: ffi::cubeb_data_callback,
     state_cb: ffi::cubeb_state_callback,
     user_ptr: usize,
-) {
-    loop {
-        let r = match conn.receive::<ClientMessage>() {
-            Ok(r) => r,
-            Err(e) => {
-                debug!("stream_thread: Failed to receive message: {:?}", e);
-                return;
-            },
-        };
+    cpu_pool: CpuPool
+}
 
-        match r {
-            ClientMessage::StreamDestroyed => {
-                info!("stream_thread: Shutdown callback thread.");
-                return;
-            },
-            ClientMessage::StreamDataCallback(nframes, frame_size) => {
-                trace!(
+impl rpc::Server for CallbackServer {
+    type Request = CallbackReq;
+    type Response = CallbackResp;
+    type Future = CpuFuture<Self::Response, ()>;
+    type Transport = Framed<UnixStream, LengthDelimitedCodec<Self::Response, Self::Request>>;
+
+    fn process(&mut self, req: Self::Request) -> Self::Future {
+        match req {
+            CallbackReq::Data(nframes, frame_size) => {
+                info!(
                     "stream_thread: Data Callback: nframes={} frame_size={}",
                     nframes,
                     frame_size
                 );
-                // TODO: This is proof-of-concept. Make it better.
-                let input_ptr: *const u8 = input_shm
-                    .get_slice(nframes as usize * frame_size)
-                    .unwrap()
-                    .as_ptr();
-                let output_ptr: *mut u8 = output_shm
-                    .get_mut_slice(nframes as usize * frame_size)
-                    .unwrap()
-                    .as_mut_ptr();
-                set_in_callback(true);
-                let nframes = data_cb(
-                    ptr::null_mut(),
-                    user_ptr as *mut c_void,
-                    input_ptr as *const _,
-                    output_ptr as *mut _,
-                    nframes as _
-                );
-                set_in_callback(false);
-                let r = conn.send(ServerMessage::StreamDataCallback(nframes as isize));
-                if r.is_err() {
-                    debug!("stream_thread: Failed to send StreamDataCallback: {:?}", r);
-                    return;
-                }
+
+                // Clone values that need to be moved into the cpu pool thread.
+                let input_shm = unsafe { self.input_shm.clone_view() };
+                let mut output_shm = unsafe { self.output_shm.clone_view() };
+                let user_ptr = self.user_ptr;
+                let cb = self.data_cb;
+
+                self.cpu_pool.spawn_fn(move || {
+                    // TODO: This is proof-of-concept. Make it better.
+                    let input_ptr: *const u8 = input_shm
+                        .get_slice(nframes as usize * frame_size)
+                        .unwrap()
+                        .as_ptr();
+                    let output_ptr: *mut u8 = output_shm
+                        .get_mut_slice(nframes as usize * frame_size)
+                        .unwrap()
+                        .as_mut_ptr();
+
+                    set_in_callback(true);
+                    let nframes = cb(
+                        ptr::null_mut(),
+                        user_ptr as *mut c_void,
+                        input_ptr as *const _,
+                        output_ptr as *mut _,
+                        nframes as _
+                    );
+                    set_in_callback(false);
+
+                    Ok(CallbackResp::Data(nframes as isize))
+                })
             },
-            ClientMessage::StreamStateCallback(state) => {
+            CallbackReq::State(state) => {
                 info!("stream_thread: State Callback: {:?}", state);
-                set_in_callback(true);
-                state_cb(ptr::null_mut(), user_ptr as *mut _, state);
-                set_in_callback(false);
-                let r = conn.send(ServerMessage::StreamStateCallback);
-                if r.is_err() {
-                    debug!("stream_thread: Failed to send StreamStateCallback: {:?}", r);
-                    return;
-                }
-            },
-            m => {
-                info!("Unexpected ClientMessage: {:?}", m);
-            },
+                let user_ptr = self.user_ptr;
+                let cb = self.state_cb;
+                self.cpu_pool.spawn_fn(move || {
+                    set_in_callback(true);
+                    cb(ptr::null_mut(), user_ptr as *mut _, state);
+                    set_in_callback(false);
+
+                    Ok(CallbackResp::State)
+                })
+            }
         }
     }
 }
 
 impl<'ctx> ClientStream<'ctx> {
     fn init(
         ctx: &'ctx ClientContext,
         init_params: messages::StreamInitParams,
         data_callback: ffi::cubeb_data_callback,
         state_callback: ffi::cubeb_state_callback,
-        user_ptr: *mut c_void,
+        user_ptr: *mut c_void
     ) -> Result<*mut ffi::cubeb_stream> {
         assert_not_in_callback();
-        let mut conn = ctx.connection();
+
+        let rpc = ctx.rpc();
+        let data = try!(send_recv!(rpc, StreamInit(init_params) => StreamCreated()));
+
+        trace!("token = {}, fds = {:?}", data.token, data.fds);
 
-        let r = conn.send(ServerMessage::StreamInit(init_params));
-        if r.is_err() {
-            debug!("ClientStream::init: Failed to send StreamInit: {:?}", r);
-            return Err(ErrorCode::Error.into());
-        }
+        let stm = data.fds[0];
+        let stream = unsafe { net::UnixStream::from_raw_fd(stm) };
 
-        let r = match conn.receive_with_fd::<ClientMessage>() {
-            Ok(r) => r,
-            Err(_) => return Err(ErrorCode::Error.into()),
-        };
+        let input = data.fds[1];
+        let input_file = unsafe { File::from_raw_fd(input) };
+        let input_shm = SharedMemSlice::from(&input_file, SHM_AREA_SIZE).unwrap();
 
-        let (token, conn2) = match r {
-            ClientMessage::StreamCreated(tok) => {
-                let fd = conn.take_fd();
-                if fd.is_none() {
-                    debug!("Missing fd!");
-                    return Err(ErrorCode::Error.into());
-                }
-                (tok, unsafe { Connection::from_raw_fd(fd.unwrap()) })
-            },
-            m => {
-                debug!("Unexpected message: {:?}", m);
-                return Err(ErrorCode::Error.into());
-            },
-        };
+        let output = data.fds[2];
+        let output_file = unsafe { File::from_raw_fd(output) };
+        let output_shm = SharedMemMutSlice::from(&output_file, SHM_AREA_SIZE).unwrap();
+
+        let user_data = user_ptr as usize;
+
+        let cpu_pool = ctx.cpu_pool();
 
-        // TODO: It'd be nicer to receive these two fds as part of
-        // StreamCreated, but that requires changing sendmsg/recvmsg to
-        // support multiple fds.
-        let r = match conn.receive_with_fd::<ClientMessage>() {
-            Ok(r) => r,
-            Err(_) => return Err(ErrorCode::Error.into()),
+        let server = CallbackServer {
+            input_shm: input_shm,
+            output_shm: output_shm,
+            data_cb: data_callback,
+            state_cb: state_callback,
+            user_ptr: user_data,
+            cpu_pool: cpu_pool
         };
 
-        let input_file = match r {
-            ClientMessage::StreamCreatedInputShm => {
-                let fd = conn.take_fd();
-                if fd.is_none() {
-                    debug!("Missing fd!");
-                    return Err(ErrorCode::Error.into());
-                }
-                unsafe { File::from_raw_fd(fd.unwrap()) }
-            },
-            m => {
-                debug!("Unexpected message: {:?}", m);
-                return Err(ErrorCode::Error.into());
-            },
-        };
-
-        let input_shm = SharedMemSlice::from(input_file, SHM_AREA_SIZE).unwrap();
-
-        let r = match conn.receive_with_fd::<ClientMessage>() {
-            Ok(r) => r,
-            Err(_) => return Err(ErrorCode::Error.into()),
-        };
-
-        let output_file = match r {
-            ClientMessage::StreamCreatedOutputShm => {
-                let fd = conn.take_fd();
-                if fd.is_none() {
-                    debug!("Missing fd!");
-                    return Err(ErrorCode::Error.into());
-                }
-                unsafe { File::from_raw_fd(fd.unwrap()) }
-            },
-            m => {
-                debug!("Unexpected message: {:?}", m);
-                return Err(ErrorCode::Error.into());
-            },
-        };
-
-        let output_shm = SharedMemMutSlice::from(output_file, SHM_AREA_SIZE).unwrap();
-
-        let user_data = user_ptr as usize;
-        let join_handle = thread::spawn(move || {
-            stream_thread(
-                conn2,
-                &input_shm,
-                output_shm,
-                data_callback,
-                state_callback,
-                user_data
-            )
+        let (wait_tx, wait_rx) = mpsc::channel();
+        ctx.remote().spawn(move |handle| {
+            let stream = UnixStream::from_stream(stream, handle).unwrap();
+            let transport = framed(stream, Default::default());
+            rpc::bind_server(transport, server, handle);
+            wait_tx.send(()).unwrap();
+            Ok(())
         });
+        wait_rx.recv().unwrap();
 
         Ok(Box::into_raw(Box::new(ClientStream {
             context: ctx,
-            token: token,
-            join_handle: Some(join_handle)
+            token: data.token
         })) as _)
     }
 }
 
 impl<'ctx> Drop for ClientStream<'ctx> {
     fn drop(&mut self) {
-        let mut conn = self.context.connection();
-        let r = conn.send(ServerMessage::StreamDestroy(self.token));
-        if r.is_err() {
-            debug!("ClientStream::Drop send error={:?}", r);
-        } else {
-            let r = conn.receive();
-            if let Ok(ClientMessage::StreamDestroyed) = r {
-            } else {
-                debug!("ClientStream::Drop receive error={:?}", r);
-            }
-        }
-        // XXX: This is guaranteed to wait forever if the send failed.
-        self.join_handle.take().unwrap().join().unwrap();
+        trace!("ClientStream drop...");
+        let rpc = self.context.rpc();
+        let _ = send_recv!(rpc, StreamDestroy(self.token) => StreamDestroyed);
     }
 }
 
 impl<'ctx> Stream for ClientStream<'ctx> {
     fn start(&self) -> Result<()> {
         assert_not_in_callback();
-        let mut conn = self.context.connection();
-        send_recv!(conn, StreamStart(self.token) => StreamStarted)
+        let rpc = self.context.rpc();
+        send_recv!(rpc, StreamStart(self.token) => StreamStarted)
     }
 
     fn stop(&self) -> Result<()> {
         assert_not_in_callback();
-        let mut conn = self.context.connection();
-        send_recv!(conn, StreamStop(self.token) => StreamStopped)
+        let rpc = self.context.rpc();
+        send_recv!(rpc, StreamStop(self.token) => StreamStopped)
     }
 
     fn reset_default_device(&self) -> Result<()> {
         assert_not_in_callback();
-        let mut conn = self.context.connection();
-        send_recv!(conn, StreamResetDefaultDevice(self.token) => StreamDefaultDeviceReset)
+        let rpc = self.context.rpc();
+        send_recv!(rpc, StreamResetDefaultDevice(self.token) => StreamDefaultDeviceReset)
     }
 
     fn position(&self) -> Result<u64> {
         assert_not_in_callback();
-        let mut conn = self.context.connection();
-        send_recv!(conn, StreamGetPosition(self.token) => StreamPosition())
+        let rpc = self.context.rpc();
+        send_recv!(rpc, StreamGetPosition(self.token) => StreamPosition())
     }
 
     fn latency(&self) -> Result<u32> {
         assert_not_in_callback();
-        let mut conn = self.context.connection();
-        send_recv!(conn, StreamGetLatency(self.token) => StreamLatency())
+        let rpc = self.context.rpc();
+        send_recv!(rpc, StreamGetLatency(self.token) => StreamLatency())
     }
 
     fn set_volume(&self, volume: f32) -> Result<()> {
         assert_not_in_callback();
-        let mut conn = self.context.connection();
-        send_recv!(conn, StreamSetVolume(self.token, volume) => StreamVolumeSet)
+        let rpc = self.context.rpc();
+        send_recv!(rpc, StreamSetVolume(self.token, volume) => StreamVolumeSet)
     }
 
     fn set_panning(&self, panning: f32) -> Result<()> {
         assert_not_in_callback();
-        let mut conn = self.context.connection();
-        send_recv!(conn, StreamSetPanning(self.token, panning) => StreamPanningSet)
+        let rpc = self.context.rpc();
+        send_recv!(rpc, StreamSetPanning(self.token, panning) => StreamPanningSet)
     }
 
     fn current_device(&self) -> Result<*const ffi::cubeb_device> {
         assert_not_in_callback();
-        let mut conn = self.context.connection();
-        match send_recv!(conn, StreamGetCurrentDevice(self.token) => StreamCurrentDevice()) {
+        let rpc = self.context.rpc();
+        match send_recv!(rpc, StreamGetCurrentDevice(self.token) => StreamCurrentDevice()) {
             Ok(d) => Ok(Box::into_raw(Box::new(d.into()))),
-            Err(e) => Err(e),
+            Err(e) => Err(e)
         }
     }
 
     fn device_destroy(&self, device: *const ffi::cubeb_device) -> Result<()> {
         assert_not_in_callback();
         // It's all unsafe...
         if !device.is_null() {
             unsafe {
@@ -285,24 +233,24 @@ impl<'ctx> Stream for ClientStream<'ctx>
             }
         }
         Ok(())
     }
 
     // TODO: How do we call this back? On what thread?
     fn register_device_changed_callback(
         &self,
-        _device_changed_callback: ffi::cubeb_device_changed_callback,
+        _device_changed_callback: ffi::cubeb_device_changed_callback
     ) -> Result<()> {
         assert_not_in_callback();
         Ok(())
     }
 }
 
 pub fn init(
     ctx: &ClientContext,
     init_params: messages::StreamInitParams,
     data_callback: ffi::cubeb_data_callback,
     state_callback: ffi::cubeb_state_callback,
-    user_ptr: *mut c_void,
+    user_ptr: *mut c_void
 ) -> Result<*mut ffi::cubeb_stream> {
     ClientStream::init(ctx, init_params, data_callback, state_callback, user_ptr)
 }
new file mode 100644
--- /dev/null
+++ b/media/audioipc/gecko.patch
@@ -0,0 +1,60 @@
+From 2c1d5189aedc7f90b603807a36a57254dc24471c Mon Sep 17 00:00:00 2001
+From: Dan Glastonbury <dglastonbury@mozilla.com>
+Date: Fri, 19 Jan 2018 11:56:58 +1000
+Subject: gecko: Change paths to vendored crates.
+
+
+diff --git a/media/audioipc/Cargo.toml b/media/audioipc/Cargo.toml
+index ede6064..d0a1979 100644
+--- a/media/audioipc/Cargo.toml
++++ b/media/audioipc/Cargo.toml
+@@ -1,2 +1,2 @@
+ [workspace]
+-members = ["audioipc", "client", "server", "ipctest"]
++members = ["audioipc", "client", "server"]
+diff --git a/audioipc/Cargo.toml b/audioipc/Cargo.toml
+index 669c6ff..308cb5c 100644
+--- a/media/audioipc/audioipc/Cargo.toml
++++ b/media/audioipc/audioipc/Cargo.toml
+@@ -8,7 +8,7 @@ authors = [
+ description = "Remote Cubeb IPC"
+ 
+ [dependencies]
+-cubeb-core = { git = "https://github.com/djg/cubeb-rs", version="^0.1" }
++cubeb-core = { path = "../../cubeb-rs/cubeb-core" }
+ bincode = "0.8"
+ bytes = "0.4"
+ # rayon-core in Gecko uses futures 0.1.13
+diff --git a/client/Cargo.toml b/client/Cargo.toml
+index c81b19a..9e3f8a5 100644
+--- a/media/audioipc/client/Cargo.toml
++++ b/media/audioipc/client/Cargo.toml
+@@ -9,8 +9,8 @@ description = "Cubeb Backend for talking to remote cubeb server."
+ 
+ [dependencies]
+ audioipc = { path="../audioipc" }
+-cubeb-backend = { git="https://github.com/djg/cubeb-rs", version="^0.2" }
+-cubeb-core = { git="https://github.com/djg/cubeb-rs", version="^0.1" }
++cubeb-backend = { path = "../../cubeb-rs/cubeb-backend" }
++cubeb-core = { path = "../../cubeb-rs/cubeb-core" }
+ # rayon-core in Gecko uses futures 0.1.13
+ futures = { version="=0.1.13", default-features=false, features=["use_std"] }
+ # futures-cpupool 0.1.5 matches futures 0.1.13
+diff --git a/server/Cargo.toml b/server/Cargo.toml
+index 5b79b83..01463be 100644
+--- a/media/audioipc/server/Cargo.toml
++++ b/media/audioipc/server/Cargo.toml
+@@ -9,8 +9,8 @@ description = "Remote cubeb server"
+ 
+ [dependencies]
+ audioipc = { path = "../audioipc" }
+-cubeb-core = { git = "https://github.com/djg/cubeb-rs", version="^0.1" }
+-cubeb = { git = "https://github.com/djg/cubeb-rs", version="^0.3.1" }
++cubeb-core = { path = "../../cubeb-rs/cubeb-core" }
++cubeb = { path = "../../cubeb-rs/cubeb-api" }
+ bytes = "0.4"
+ lazycell = "^0.4"
+ libc = "0.2"
+-- 
+2.10.2
+
--- a/media/audioipc/server/Cargo.toml
+++ b/media/audioipc/server/Cargo.toml
@@ -1,21 +1,26 @@
 [package]
 name = "audioipc-server"
-version = "0.1.0"
-authors = ["Dan Glastonbury <dan.glastonbury@gmail.com>"]
+version = "0.2.0"
+authors = [
+        "Matthew Gregan <kinetik@flim.org>",
+        "Dan Glastonbury <dan.glastonbury@gmail.com>"
+        ]
 description = "Remote cubeb server"
 
 [dependencies]
 audioipc = { path = "../audioipc" }
 cubeb-core = { path = "../../cubeb-rs/cubeb-core" }
 cubeb = { path = "../../cubeb-rs/cubeb-api" }
 bytes = "0.4"
 lazycell = "^0.4"
 libc = "0.2"
 log = "^0.3.6"
-mio = "0.6.7"
-mio-uds = "0.6.4"
 slab = "0.3.0"
+# rayon-core in Gecko uses futures 0.1.13
+futures = "=0.1.13"
+tokio-core = "0.1"
+tokio-uds = "0.1.7"
 
 [dependencies.error-chain]
 version = "0.11.0"
-default-features = false
+default-features = false
\ No newline at end of file
deleted file mode 100644
--- a/media/audioipc/server/src/channel.rs
+++ /dev/null
@@ -1,394 +0,0 @@
-/* NOTE: Imported from https://github.com/carllerche/mio with minimal
-   modifications to build locally.  License and copyright per
-   https://raw.githubusercontent.com/carllerche/mio/master/LICENSE */
-
-//! Thread safe communication channel implementing `Evented`
-
-#![allow(unused_imports, dead_code, deprecated, missing_debug_implementations)]
-
-use mio::{Evented, Ready, Poll, PollOpt, Registration, SetReadiness, Token};
-use lazycell::{LazyCell, AtomicLazyCell};
-use std::any::Any;
-use std::fmt;
-use std::io;
-use std::error;
-use std::sync::{mpsc, Arc};
-use std::sync::atomic::{AtomicUsize, Ordering};
-
-/// Creates a new asynchronous channel, where the `Receiver` can be registered
-/// with `Poll`.
-pub fn channel<T>() -> (Sender<T>, Receiver<T>) {
-    let (tx_ctl, rx_ctl) = ctl_pair();
-    let (tx, rx) = mpsc::channel();
-
-    let tx = Sender {
-        tx: tx,
-        ctl: tx_ctl,
-    };
-
-    let rx = Receiver {
-        rx: rx,
-        ctl: rx_ctl,
-    };
-
-    (tx, rx)
-}
-
-/// Creates a new synchronous, bounded channel where the `Receiver` can be
-/// registered with `Poll`.
-pub fn sync_channel<T>(bound: usize) -> (SyncSender<T>, Receiver<T>) {
-    let (tx_ctl, rx_ctl) = ctl_pair();
-    let (tx, rx) = mpsc::sync_channel(bound);
-
-    let tx = SyncSender {
-        tx: tx,
-        ctl: tx_ctl,
-    };
-
-    let rx = Receiver {
-        rx: rx,
-        ctl: rx_ctl,
-    };
-
-    (tx, rx)
-}
-
-pub fn ctl_pair() -> (SenderCtl, ReceiverCtl) {
-    let inner = Arc::new(Inner {
-        pending: AtomicUsize::new(0),
-        senders: AtomicUsize::new(1),
-        set_readiness: AtomicLazyCell::new(),
-    });
-
-    let tx = SenderCtl {
-        inner: inner.clone(),
-    };
-
-    let rx = ReceiverCtl {
-        registration: LazyCell::new(),
-        inner: inner,
-    };
-
-    (tx, rx)
-}
-
-/// Tracks messages sent on a channel in order to update readiness.
-pub struct SenderCtl {
-    inner: Arc<Inner>,
-}
-
-/// Tracks messages received on a channel in order to track readiness.
-pub struct ReceiverCtl {
-    registration: LazyCell<Registration>,
-    inner: Arc<Inner>,
-}
-
-pub struct Sender<T> {
-    tx: mpsc::Sender<T>,
-    ctl: SenderCtl,
-}
-
-pub struct SyncSender<T> {
-    tx: mpsc::SyncSender<T>,
-    ctl: SenderCtl,
-}
-
-pub struct Receiver<T> {
-    rx: mpsc::Receiver<T>,
-    ctl: ReceiverCtl,
-}
-
-pub enum SendError<T> {
-    Io(io::Error),
-    Disconnected(T),
-}
-
-pub enum TrySendError<T> {
-    Io(io::Error),
-    Full(T),
-    Disconnected(T),
-}
-
-struct Inner {
-    // The number of outstanding messages for the receiver to read
-    pending: AtomicUsize,
-    // The number of sender handles
-    senders: AtomicUsize,
-    // The set readiness handle
-    set_readiness: AtomicLazyCell<SetReadiness>,
-}
-
-impl<T> Sender<T> {
-    pub fn send(&self, t: T) -> Result<(), SendError<T>> {
-        self.tx.send(t)
-            .map_err(SendError::from)
-            .and_then(|_| {
-                self.ctl.inc()?;
-                Ok(())
-            })
-    }
-}
-
-impl<T> Clone for Sender<T> {
-    fn clone(&self) -> Sender<T> {
-        Sender {
-            tx: self.tx.clone(),
-            ctl: self.ctl.clone(),
-        }
-    }
-}
-
-impl<T> SyncSender<T> {
-    pub fn send(&self, t: T) -> Result<(), SendError<T>> {
-        self.tx.send(t)
-            .map_err(From::from)
-            .and_then(|_| {
-                self.ctl.inc()?;
-                Ok(())
-            })
-    }
-
-    pub fn try_send(&self, t: T) -> Result<(), TrySendError<T>> {
-        self.tx.try_send(t)
-            .map_err(From::from)
-            .and_then(|_| {
-                self.ctl.inc()?;
-                Ok(())
-            })
-    }
-}
-
-impl<T> Clone for SyncSender<T> {
-    fn clone(&self) -> SyncSender<T> {
-        SyncSender {
-            tx: self.tx.clone(),
-            ctl: self.ctl.clone(),
-        }
-    }
-}
-
-impl<T> Receiver<T> {
-    pub fn try_recv(&self) -> Result<T, mpsc::TryRecvError> {
-        self.rx.try_recv().and_then(|res| {
-            let _ = self.ctl.dec();
-            Ok(res)
-        })
-    }
-}
-
-impl<T> Evented for Receiver<T> {
-    fn register(&self, poll: &Poll, token: Token, interest: Ready, opts: PollOpt) -> io::Result<()> {
-        self.ctl.register(poll, token, interest, opts)
-    }
-
-    fn reregister(&self, poll: &Poll, token: Token, interest: Ready, opts: PollOpt) -> io::Result<()> {
-        self.ctl.reregister(poll, token, interest, opts)
-    }
-
-    fn deregister(&self, poll: &Poll) -> io::Result<()> {
-        self.ctl.deregister(poll)
-    }
-}
-
-/*
- *
- * ===== SenderCtl / ReceiverCtl =====
- *
- */
-
-impl SenderCtl {
-    /// Call to track that a message has been sent
-    pub fn inc(&self) -> io::Result<()> {
-        let cnt = self.inner.pending.fetch_add(1, Ordering::Acquire);
-
-        if 0 == cnt {
-            // Toggle readiness to readable
-            if let Some(set_readiness) = self.inner.set_readiness.borrow() {
-                set_readiness.set_readiness(Ready::readable())?;
-            }
-        }
-
-        Ok(())
-    }
-}
-
-impl Clone for SenderCtl {
-    fn clone(&self) -> SenderCtl {
-        self.inner.senders.fetch_add(1, Ordering::Relaxed);
-        SenderCtl { inner: self.inner.clone() }
-    }
-}
-
-impl Drop for SenderCtl {
-    fn drop(&mut self) {
-        if self.inner.senders.fetch_sub(1, Ordering::Release) == 1 {
-            let _ = self.inc();
-        }
-    }
-}
-
-impl ReceiverCtl {
-    pub fn dec(&self) -> io::Result<()> {
-        let first = self.inner.pending.load(Ordering::Acquire);
-
-        if first == 1 {
-            // Unset readiness
-            if let Some(set_readiness) = self.inner.set_readiness.borrow() {
-                set_readiness.set_readiness(Ready::empty())?;
-            }
-        }
-
-        // Decrement
-        let second = self.inner.pending.fetch_sub(1, Ordering::AcqRel);
-
-        if first == 1 && second > 1 {
-            // There are still pending messages. Since readiness was
-            // previously unset, it must be reset here
-            if let Some(set_readiness) = self.inner.set_readiness.borrow() {
-                set_readiness.set_readiness(Ready::readable())?;
-            }
-        }
-
-        Ok(())
-    }
-}
-
-impl Evented for ReceiverCtl {
-    fn register(&self, poll: &Poll, token: Token, interest: Ready, opts: PollOpt) -> io::Result<()> {
-        if self.registration.borrow().is_some() {
-            return Err(io::Error::new(io::ErrorKind::Other, "receiver already registered"));
-        }
-
-        let (registration, set_readiness) = Registration::new(poll, token, interest, opts);
-
-
-        if self.inner.pending.load(Ordering::Relaxed) > 0 {
-            // TODO: Don't drop readiness
-            let _ = set_readiness.set_readiness(Ready::readable());
-        }
-
-        self.registration.fill(registration).ok().expect("unexpected state encountered");
-        self.inner.set_readiness.fill(set_readiness).ok().expect("unexpected state encountered");
-
-        Ok(())
-    }
-
-    fn reregister(&self, poll: &Poll, token: Token, interest: Ready, opts: PollOpt) -> io::Result<()> {
-        match self.registration.borrow() {
-            Some(registration) => registration.update(poll, token, interest, opts),
-            None => Err(io::Error::new(io::ErrorKind::Other, "receiver not registered")),
-        }
-    }
-
-    fn deregister(&self, poll: &Poll) -> io::Result<()> {
-        match self.registration.borrow() {
-            Some(registration) => registration.deregister(poll),
-            None => Err(io::Error::new(io::ErrorKind::Other, "receiver not registered")),
-        }
-    }
-}
-
-/*
- *
- * ===== Error conversions =====
- *
- */
-
-impl<T> From<mpsc::SendError<T>> for SendError<T> {
-    fn from(src: mpsc::SendError<T>) -> SendError<T> {
-        SendError::Disconnected(src.0)
-    }
-}
-
-impl<T> From<io::Error> for SendError<T> {
-    fn from(src: io::Error) -> SendError<T> {
-        SendError::Io(src)
-    }
-}
-
-impl<T> From<mpsc::TrySendError<T>> for TrySendError<T> {
-    fn from(src: mpsc::TrySendError<T>) -> TrySendError<T> {
-        match src {
-            mpsc::TrySendError::Full(v) => TrySendError::Full(v),
-            mpsc::TrySendError::Disconnected(v) => TrySendError::Disconnected(v),
-        }
-    }
-}
-
-impl<T> From<mpsc::SendError<T>> for TrySendError<T> {
-    fn from(src: mpsc::SendError<T>) -> TrySendError<T> {
-        TrySendError::Disconnected(src.0)
-    }
-}
-
-impl<T> From<io::Error> for TrySendError<T> {
-    fn from(src: io::Error) -> TrySendError<T> {
-        TrySendError::Io(src)
-    }
-}
-
-/*
- *
- * ===== Implement Error, Debug and Display for Errors =====
- *
- */
-
-impl<T: Any> error::Error for SendError<T> {
-    fn description(&self) -> &str {
-        match self {
-            &SendError::Io(ref io_err) => io_err.description(),
-            &SendError::Disconnected(..) => "Disconnected",
-        }
-    }
-}
-
-impl<T: Any> error::Error for TrySendError<T> {
-    fn description(&self) -> &str {
-        match self {
-            &TrySendError::Io(ref io_err) => io_err.description(),
-            &TrySendError::Full(..) => "Full",
-            &TrySendError::Disconnected(..) => "Disconnected",
-        }
-    }
-}
-
-impl<T> fmt::Debug for SendError<T> {
-    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
-        format_send_error(self, f)
-    }
-}
-
-impl<T> fmt::Display for SendError<T> {
-    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
-        format_send_error(self, f)
-    }
-}
-
-impl<T> fmt::Debug for TrySendError<T> {
-    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
-        format_try_send_error(self, f)
-    }
-}
-
-impl<T> fmt::Display for TrySendError<T> {
-    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
-        format_try_send_error(self, f)
-    }
-}
-
-#[inline]
-fn format_send_error<T>(e: &SendError<T>, f: &mut fmt::Formatter) -> fmt::Result {
-    match e {
-        &SendError::Io(ref io_err) => write!(f, "{}", io_err),
-        &SendError::Disconnected(..) => write!(f, "Disconnected"),
-    }
-}
-
-#[inline]
-fn format_try_send_error<T>(e: &TrySendError<T>, f: &mut fmt::Formatter) -> fmt::Result {
-    match e {
-        &TrySendError::Io(ref io_err) => write!(f, "{}", io_err),
-        &TrySendError::Full(..) => write!(f, "Full"),
-        &TrySendError::Disconnected(..) => write!(f, "Disconnected"),
-    }
-}
--- a/media/audioipc/server/src/lib.rs
+++ b/media/audioipc/server/src/lib.rs
@@ -3,81 +3,99 @@ extern crate error_chain;
 
 #[macro_use]
 extern crate log;
 
 extern crate audioipc;
 extern crate bytes;
 extern crate cubeb;
 extern crate cubeb_core;
+extern crate futures;
 extern crate lazycell;
 extern crate libc;
-extern crate mio;
-extern crate mio_uds;
 extern crate slab;
+extern crate tokio_core;
+extern crate tokio_uds;
 
-use audioipc::AutoCloseFd;
-use audioipc::async::{Async, AsyncRecvFd, AsyncSendFd};
-use audioipc::codec::{Decoder, encode};
-use audioipc::messages::{ClientMessage, DeviceInfo, ServerMessage, StreamInitParams, StreamParams};
+use audioipc::codec::LengthDelimitedCodec;
+use audioipc::core;
+use audioipc::fd_passing::{framed_with_fds, FramedWithFds};
+use audioipc::frame::{framed, Framed};
+use audioipc::messages::{CallbackReq, CallbackResp, ClientMessage, DeviceInfo, ServerMessage,
+                         StreamCreate, StreamInitParams, StreamParams};
+use audioipc::rpc;
 use audioipc::shm::{SharedMemReader, SharedMemWriter};
-use bytes::{Bytes, BytesMut};
 use cubeb_core::binding::Binding;
 use cubeb_core::ffi;
-use mio::{Ready, Token};
-use mio_uds::UnixStream;
-use std::{slice, thread};
-use std::collections::{HashSet, VecDeque};
+use futures::Future;
+use futures::future::{self, FutureResult};
+use futures::sync::oneshot;
+use std::{ptr, slice};
+use std::cell::RefCell;
 use std::convert::From;
-use std::io::Cursor;
+use std::error::Error;
 use std::os::raw::c_void;
+use std::os::unix::net;
 use std::os::unix::prelude::*;
-use std::sync::{Mutex, MutexGuard};
-
-mod channel;
+use tokio_core::reactor::Remote;
+use tokio_uds::UnixStream;
 
 pub mod errors {
     error_chain! {
         links {
             AudioIPC(::audioipc::errors::Error, ::audioipc::errors::ErrorKind);
         }
         foreign_links {
             Cubeb(::cubeb_core::Error);
             Io(::std::io::Error);
+            Canceled(::futures::sync::oneshot::Canceled);
         }
     }
 }
 
 use errors::*;
 
+thread_local!(static CONTEXT_KEY: RefCell<Option<cubeb::Result<cubeb::Context>>> = RefCell::new(None));
+
+fn with_local_context<T, F>(f: F) -> T
+where
+    F: FnOnce(&cubeb::Result<cubeb::Context>) -> T
+{
+    CONTEXT_KEY.with(|k| {
+        let mut context = k.borrow_mut();
+        if context.is_none() {
+            *context = Some(cubeb::Context::init("AudioIPC Server", None));
+        }
+        f(context.as_ref().unwrap())
+    })
+}
+
 // TODO: Remove and let caller allocate based on cubeb backend requirements.
 const SHM_AREA_SIZE: usize = 2 * 1024 * 1024;
 
-// The size in which the server conns slab is grown.
-const SERVER_CONN_CHUNK_SIZE: usize = 16;
-
 // The size in which the stream slab is grown.
 const STREAM_CONN_CHUNK_SIZE: usize = 64;
 
+struct CallbackClient;
+
+impl rpc::Client for CallbackClient {
+    type Request = CallbackReq;
+    type Response = CallbackResp;
+    type Transport = Framed<UnixStream, LengthDelimitedCodec<Self::Request, Self::Response>>;
+}
+
 // TODO: this should forward to the client.
 struct Callback {
     /// Size of input frame in bytes
     input_frame_size: u16,
     /// Size of output frame in bytes
     output_frame_size: u16,
-    connection: Mutex<audioipc::Connection>,
     input_shm: SharedMemWriter,
-    output_shm: SharedMemReader
-}
-
-impl Callback {
-    #[doc(hidden)]
-    fn connection(&self) -> MutexGuard<audioipc::Connection> {
-        self.connection.lock().unwrap()
-    }
+    output_shm: SharedMemReader,
+    rpc: rpc::ClientProxy<CallbackReq, CallbackResp>
 }
 
 impl cubeb::StreamCallback for Callback {
     type Frame = u8;
 
     fn data_callback(&mut self, input: &[u8], output: &mut [u8]) -> isize {
         trace!("Stream data callback: {} {}", input.len(), output.len());
 
@@ -89,340 +107,232 @@ impl cubeb::StreamCallback for Callback 
         let real_output = unsafe {
             let size_bytes = output.len() * self.output_frame_size as usize;
             trace!("Resize output to {}", size_bytes);
             slice::from_raw_parts_mut(output.as_mut_ptr(), size_bytes)
         };
 
         self.input_shm.write(real_input).unwrap();
 
-        let mut conn = self.connection();
-        let r = conn.send(ClientMessage::StreamDataCallback(
-            output.len() as isize,
-            self.output_frame_size as usize
-        ));
-        if r.is_err() {
-            debug!("data_callback: Failed to send to client - got={:?}", r);
-            return -1;
-        }
+        let r = self.rpc
+            .call(CallbackReq::Data(
+                output.len() as isize,
+                self.output_frame_size as usize
+            ))
+            .wait();
 
-        let r = conn.receive();
         match r {
-            Ok(ServerMessage::StreamDataCallback(cb_result)) => {
-                if cb_result >= 0 {
-                    let len = cb_result as usize * self.output_frame_size as usize;
-                    self.output_shm.read(&mut real_output[..len]).unwrap();
-                    cb_result
-                } else {
-                    cb_result
-                }
+            Ok(CallbackResp::Data(cb_result)) => if cb_result >= 0 {
+                let len = cb_result as usize * self.output_frame_size as usize;
+                self.output_shm.read(&mut real_output[..len]).unwrap();
+                cb_result
+            } else {
+                cb_result
             },
             _ => {
                 debug!("Unexpected message {:?} during data_callback", r);
                 -1
-            },
+            }
         }
     }
 
     fn state_callback(&mut self, state: cubeb::State) {
         info!("Stream state callback: {:?}", state);
         // TODO: Share this conversion with the same code in cubeb-rs?
         let state = match state {
             cubeb::State::Started => ffi::CUBEB_STATE_STARTED,
             cubeb::State::Stopped => ffi::CUBEB_STATE_STOPPED,
             cubeb::State::Drained => ffi::CUBEB_STATE_DRAINED,
-            cubeb::State::Error => ffi::CUBEB_STATE_ERROR,
+            cubeb::State::Error => ffi::CUBEB_STATE_ERROR
         };
-        let mut conn = self.connection();
-        let r = conn.send(ClientMessage::StreamStateCallback(state));
-        if r.is_err() {
-            debug!("state_callback: Failed to send to client - got={:?}", r);
-        }
 
-        // Bug 1414623 - We need to block on an ACK from the client
-        // side to make state_callback synchronous. If not, then there
-        // exists a race on cubeb_stream_stop()/cubeb_stream_destroy()
-        // in Gecko that results in a UAF.
-        let r = conn.receive();
+        let r = self.rpc.call(CallbackReq::State(state)).wait();
+
         match r {
-            Ok(ServerMessage::StreamStateCallback) => {},
+            Ok(CallbackResp::State) => {},
             _ => {
-                debug!("Unexpected message {:?} during state_callback", r);
-            },
-        }
-    }
-}
-
-impl Drop for Callback {
-    fn drop(&mut self) {
-        let mut conn = self.connection();
-        let r = conn.send(ClientMessage::StreamDestroyed);
-        if r.is_err() {
-            debug!("Callback::drop failed to send StreamDestroyed = {:?}", r);
-        }
+                debug!("Unexpected message {:?} during callback", r);
+            }
+        };
     }
 }
 
-type Slab<T> = slab::Slab<T, Token>;
 type StreamSlab = slab::Slab<cubeb::Stream<Callback>, usize>;
 
-// TODO: Server command token must be outside range used by server.connections slab.
-// usize::MAX is already used internally in mio.
-const CMD: Token = Token(std::usize::MAX - 1);
-
-struct ServerConn {
-    //connection: audioipc::Connection,
-    io: UnixStream,
-    token: Option<Token>,
-    streams: StreamSlab,
-    decoder: Decoder,
-    recv_buffer: BytesMut,
-    send_buffer: BytesMut,
-    pending_send: VecDeque<(Bytes, Option<AutoCloseFd>)>,
-    device_ids: HashSet<usize>
+pub struct CubebServer {
+    cb_remote: Remote,
+    streams: StreamSlab
 }
 
-impl ServerConn {
-    fn new(io: UnixStream) -> ServerConn {
-        let mut sc = ServerConn {
-            io: io,
-            token: None,
-            // TODO: Handle increasing slab size. Pick a good default size.
-            streams: StreamSlab::with_capacity(STREAM_CONN_CHUNK_SIZE),
-            decoder: Decoder::new(),
-            recv_buffer: BytesMut::with_capacity(4096),
-            send_buffer: BytesMut::with_capacity(4096),
-            pending_send: VecDeque::new(),
-            device_ids: HashSet::new()
-        };
-        sc.device_ids.insert(0); // nullptr is always a valid (default) device id.
-        sc
-    }
-
-    fn process_read(&mut self, context: &Result<cubeb::Context>) -> Result<Ready> {
-        // According to *something*, processing non-blocking stream
-        // should attempt to read until EWOULDBLOCK is returned.
-        while let Async::Ready((n, fd)) = try!(self.io.recv_buf_fd(&mut self.recv_buffer)) {
-            trace!("Received {} bytes and fd {:?}", n, fd);
+impl rpc::Server for CubebServer {
+    type Request = ServerMessage;
+    type Response = ClientMessage;
+    type Future = FutureResult<Self::Response, ()>;
+    type Transport =
+        FramedWithFds<UnixStream, LengthDelimitedCodec<Self::Response, Self::Request>>;
 
-            // Reading 0 signifies EOF
-            if n == 0 {
-                return Err(
-                    ::errors::ErrorKind::AudioIPC(::audioipc::errors::ErrorKind::Disconnected).into()
-                );
-            }
-
-            if let Some(fd) = fd {
-                trace!("Unexpectedly received an fd from client.");
-                let _ = unsafe { AutoCloseFd::from_raw_fd(fd) };
-            }
+    fn process(&mut self, req: Self::Request) -> Self::Future {
+        let resp = with_local_context(|context| match *context {
+            Err(_) => error(cubeb::Error::new()),
+            Ok(ref context) => self.process_msg(context, &req)
+        });
+        future::ok(resp)
+    }
+}
 
-            // Process all the complete messages contained in
-            // send.recv_buffer.  It's possible that a read might not
-            // return a complete message, so self.decoder.decode
-            // returns Ok(None).
-            loop {
-                match self.decoder.decode::<ServerMessage>(&mut self.recv_buffer) {
-                    Ok(Some(msg)) => {
-                        info!("ServerConn::process: got {:?}", msg);
-                        try!(self.process_msg(&msg, context));
-                    },
-                    Ok(None) => {
-                        break;
-                    },
-                    Err(e) => {
-                        return Err(e).chain_err(|| "Failed to decoder ServerMessage");
-                    },
-                }
-            }
+impl CubebServer {
+    pub fn new(cb_remote: Remote) -> Self {
+        CubebServer {
+            cb_remote: cb_remote,
+            streams: StreamSlab::with_capacity(STREAM_CONN_CHUNK_SIZE)
         }
-
-        // Send any pending responses to client.
-        self.flush_pending_send()
     }
 
     // Process a request coming from the client.
-    fn process_msg(&mut self, msg: &ServerMessage, context: &Result<cubeb::Context>) -> Result<()> {
-        let resp: ClientMessage = if let Ok(ref context) = *context {
-            if let ServerMessage::StreamInit(ref params) = *msg {
-                return self.process_stream_init(context, params);
-            };
+    fn process_msg(&mut self, context: &cubeb::Context, msg: &ServerMessage) -> ClientMessage {
+        let resp: ClientMessage = match *msg {
+            ServerMessage::ClientConnect => panic!("already connected"),
 
-            match *msg {
-                ServerMessage::ClientConnect => {
-                    panic!("already connected");
-                },
+            ServerMessage::ClientDisconnect => {
+                // TODO:
+                //self.connection.client_disconnect();
+                ClientMessage::ClientDisconnected
+            },
 
-                ServerMessage::ClientDisconnect => {
-                    // TODO:
-                    //self.connection.client_disconnect();
-                    ClientMessage::ClientDisconnected
-                },
+            ServerMessage::ContextGetBackendId => ClientMessage::ContextBackendId(),
 
-                ServerMessage::ContextGetBackendId => ClientMessage::ContextBackendId(),
+            ServerMessage::ContextGetMaxChannelCount => context
+                .max_channel_count()
+                .map(ClientMessage::ContextMaxChannelCount)
+                .unwrap_or_else(error),
 
-                ServerMessage::ContextGetMaxChannelCount => {
-                    context
-                        .max_channel_count()
-                        .map(ClientMessage::ContextMaxChannelCount)
-                        .unwrap_or_else(error)
-                },
+            ServerMessage::ContextGetMinLatency(ref params) => {
+                let format = cubeb::SampleFormat::from(params.format);
+                let layout = cubeb::ChannelLayout::from(params.layout);
 
-                ServerMessage::ContextGetMinLatency(ref params) => {
-                    let format = cubeb::SampleFormat::from(params.format);
-                    let layout = cubeb::ChannelLayout::from(params.layout);
-
-                    let params = cubeb::StreamParamsBuilder::new()
-                        .format(format)
-                        .rate(u32::from(params.rate))
-                        .channels(u32::from(params.channels))
-                        .layout(layout)
-                        .take();
+                let params = cubeb::StreamParamsBuilder::new()
+                    .format(format)
+                    .rate(u32::from(params.rate))
+                    .channels(u32::from(params.channels))
+                    .layout(layout)
+                    .take();
 
-                    context
-                        .min_latency(&params)
-                        .map(ClientMessage::ContextMinLatency)
-                        .unwrap_or_else(error)
-                },
+                context
+                    .min_latency(&params)
+                    .map(ClientMessage::ContextMinLatency)
+                    .unwrap_or_else(error)
+            },
 
-                ServerMessage::ContextGetPreferredSampleRate => {
-                    context
-                        .preferred_sample_rate()
-                        .map(ClientMessage::ContextPreferredSampleRate)
-                        .unwrap_or_else(error)
-                },
+            ServerMessage::ContextGetPreferredSampleRate => context
+                .preferred_sample_rate()
+                .map(ClientMessage::ContextPreferredSampleRate)
+                .unwrap_or_else(error),
 
-                ServerMessage::ContextGetPreferredChannelLayout => {
-                    context
-                        .preferred_channel_layout()
-                        .map(|l| ClientMessage::ContextPreferredChannelLayout(l as _))
-                        .unwrap_or_else(error)
-                },
+            ServerMessage::ContextGetPreferredChannelLayout => context
+                .preferred_channel_layout()
+                .map(|l| ClientMessage::ContextPreferredChannelLayout(l as _))
+                .unwrap_or_else(error),
 
-                ServerMessage::ContextGetDeviceEnumeration(device_type) => {
-                    context
-                        .enumerate_devices(cubeb::DeviceType::from_bits_truncate(device_type))
-                        .map(|devices| {
-                            let v: Vec<DeviceInfo> = devices.iter()
-                                                            .map(|i| i.raw().into())
-                                                            .collect();
-                            for i in &v {
-                                self.device_ids.insert(i.devid);
-                            }
-                            ClientMessage::ContextEnumeratedDevices(v)
-                        })
-                        .unwrap_or_else(error)
-                },
+            ServerMessage::ContextGetDeviceEnumeration(device_type) => context
+                .enumerate_devices(cubeb::DeviceType::from_bits_truncate(device_type))
+                .map(|devices| {
+                    let v: Vec<DeviceInfo> = devices.iter().map(|i| i.raw().into()).collect();
+                    ClientMessage::ContextEnumeratedDevices(v)
+                })
+                .unwrap_or_else(error),
+
+            ServerMessage::StreamInit(ref params) => self.process_stream_init(context, params)
+                .unwrap_or_else(|_| error(cubeb::Error::new())),
 
-                ServerMessage::StreamInit(_) => {
-                    panic!("StreamInit should have already been handled.");
-                },
-
-                ServerMessage::StreamDestroy(stm_tok) => {
-                    self.streams.remove(stm_tok);
-                    ClientMessage::StreamDestroyed
-                },
+            ServerMessage::StreamDestroy(stm_tok) => {
+                self.streams.remove(stm_tok);
+                ClientMessage::StreamDestroyed
+            },
 
-                ServerMessage::StreamStart(stm_tok) => {
-                    let _ = self.streams[stm_tok].start();
-                    ClientMessage::StreamStarted
-                },
+            ServerMessage::StreamStart(stm_tok) => {
+                let _ = self.streams[stm_tok].start();
+                ClientMessage::StreamStarted
+            },
 
-                ServerMessage::StreamStop(stm_tok) => {
-                    let _ = self.streams[stm_tok].stop();
-                    ClientMessage::StreamStopped
-                },
+            ServerMessage::StreamStop(stm_tok) => {
+                let _ = self.streams[stm_tok].stop();
+                ClientMessage::StreamStopped
+            },
 
-                ServerMessage::StreamGetPosition(stm_tok) => {
-                    self.streams[stm_tok]
-                        .position()
-                        .map(ClientMessage::StreamPosition)
-                        .unwrap_or_else(error)
-                },
+            ServerMessage::StreamResetDefaultDevice(stm_tok) => {
+                let _ = self.streams[stm_tok].reset_default_device();
+                ClientMessage::StreamDefaultDeviceReset
+            },
 
-                ServerMessage::StreamGetLatency(stm_tok) => {
-                    self.streams[stm_tok]
-                        .latency()
-                        .map(ClientMessage::StreamLatency)
-                        .unwrap_or_else(error)
-                },
+            ServerMessage::StreamGetPosition(stm_tok) => self.streams[stm_tok]
+                .position()
+                .map(ClientMessage::StreamPosition)
+                .unwrap_or_else(error),
 
-                ServerMessage::StreamSetVolume(stm_tok, volume) => {
-                    self.streams[stm_tok]
-                        .set_volume(volume)
-                        .map(|_| ClientMessage::StreamVolumeSet)
-                        .unwrap_or_else(error)
-                },
+            ServerMessage::StreamGetLatency(stm_tok) => self.streams[stm_tok]
+                .latency()
+                .map(ClientMessage::StreamLatency)
+                .unwrap_or_else(error),
 
-                ServerMessage::StreamSetPanning(stm_tok, panning) => {
-                    self.streams[stm_tok]
-                        .set_panning(panning)
-                        .map(|_| ClientMessage::StreamPanningSet)
-                        .unwrap_or_else(error)
-                },
+            ServerMessage::StreamSetVolume(stm_tok, volume) => self.streams[stm_tok]
+                .set_volume(volume)
+                .map(|_| ClientMessage::StreamVolumeSet)
+                .unwrap_or_else(error),
 
-                ServerMessage::StreamGetCurrentDevice(stm_tok) => {
-                    self.streams[stm_tok]
-                        .current_device()
-                        .map(|device| ClientMessage::StreamCurrentDevice(device.into()))
-                        .unwrap_or_else(error)
-                },
+            ServerMessage::StreamSetPanning(stm_tok, panning) => self.streams[stm_tok]
+                .set_panning(panning)
+                .map(|_| ClientMessage::StreamPanningSet)
+                .unwrap_or_else(error),
 
-                _ => {
-                    bail!("Unexpected Message");
-                },
-            }
-        } else {
-            error(cubeb::Error::new())
+            ServerMessage::StreamGetCurrentDevice(stm_tok) => self.streams[stm_tok]
+                .current_device()
+                .map(|device| ClientMessage::StreamCurrentDevice(device.into()))
+                .unwrap_or_else(error)
         };
 
         debug!("process_msg: req={:?}, resp={:?}", msg, resp);
 
-        self.queue_message(resp)
+        resp
     }
 
     // Stream init is special, so it's been separated from process_msg.
-    fn process_stream_init(&mut self, context: &cubeb::Context, params: &StreamInitParams) -> Result<()> {
+    fn process_stream_init(
+        &mut self,
+        context: &cubeb::Context,
+        params: &StreamInitParams
+    ) -> Result<ClientMessage> {
         fn opt_stream_params(params: Option<&StreamParams>) -> Option<cubeb::StreamParams> {
             params.and_then(|p| {
                 let raw = ffi::cubeb_stream_params::from(p);
                 Some(unsafe { cubeb::StreamParams::from_raw(&raw as *const _) })
             })
         }
 
         fn frame_size_in_bytes(params: Option<cubeb::StreamParams>) -> u16 {
             params
                 .map(|p| {
                     let sample_size = match p.format() {
-                        cubeb::SampleFormat::S16LE |
-                        cubeb::SampleFormat::S16BE |
-                        cubeb::SampleFormat::S16NE => 2,
-                        cubeb::SampleFormat::Float32LE |
-                        cubeb::SampleFormat::Float32BE |
-                        cubeb::SampleFormat::Float32NE => 4,
+                        cubeb::SampleFormat::S16LE
+                        | cubeb::SampleFormat::S16BE
+                        | cubeb::SampleFormat::S16NE => 2,
+                        cubeb::SampleFormat::Float32LE
+                        | cubeb::SampleFormat::Float32BE
+                        | cubeb::SampleFormat::Float32NE => 4
                     };
                     let channel_count = p.channels() as u16;
                     sample_size * channel_count
                 })
                 .unwrap_or(0u16)
         }
 
-
-        if !self.device_ids.contains(&params.input_device) {
-            bail!("Invalid input_device passed to stream_init");
-        }
         // TODO: Yuck!
-        let input_device = unsafe { cubeb::DeviceId::from_raw(params.input_device as *const _) };
-
-        if !self.device_ids.contains(&params.output_device) {
-            bail!("Invalid output_device passed to stream_init");
-        }
-        // TODO: Yuck!
-        let output_device = unsafe { cubeb::DeviceId::from_raw(params.output_device as *const _) };
-
+        let input_device =
+            unsafe { cubeb::DeviceId::from_raw(params.input_device as *const _) };
+        let output_device =
+            unsafe { cubeb::DeviceId::from_raw(params.output_device as *const _) };
         let latency = params.latency_frames;
         let mut builder = cubeb::StreamInitOptionsBuilder::new();
         builder
             .input_device(input_device)
             .output_device(output_device)
             .latency(latency);
 
         if let Some(ref stream_name) = params.stream_name {
@@ -436,405 +346,172 @@ impl ServerConn {
         if let Some(ref osp) = output_stream_params {
             builder.output_stream_param(osp);
         }
         let params = builder.take();
 
         let input_frame_size = frame_size_in_bytes(input_stream_params);
         let output_frame_size = frame_size_in_bytes(output_stream_params);
 
-        let (conn1, conn2) = audioipc::Connection::pair()?;
-        info!("Created connection pair: {:?}-{:?}", conn1, conn2);
+        let (stm1, stm2) = net::UnixStream::pair()?;
+        info!("Created callback pair: {:?}-{:?}", stm1, stm2);
+        let (input_shm, input_file) =
+            SharedMemWriter::new(&audioipc::get_shm_path("input"), SHM_AREA_SIZE)?;
+        let (output_shm, output_file) =
+            SharedMemReader::new(&audioipc::get_shm_path("output"), SHM_AREA_SIZE)?;
 
-        let (input_shm, input_file) = SharedMemWriter::new(&audioipc::get_shm_path("input"), SHM_AREA_SIZE)?;
-        let (output_shm, output_file) = SharedMemReader::new(&audioipc::get_shm_path("output"), SHM_AREA_SIZE)?;
+        // This code is currently running on the Client/Server RPC
+        // handling thread.  We need to move the registration of the
+        // bind_client to the callback RPC handling thread.  This is
+        // done by spawning a future on cb_remote.
+
+        let id = core::handle().id();
 
-        let err = match context.stream_init(
-            &params,
-            Callback {
-                input_frame_size: input_frame_size,
-                output_frame_size: output_frame_size,
-                connection: Mutex::new(conn2),
-                input_shm: input_shm,
-                output_shm: output_shm
-            }
-        ) {
-            Ok(stream) => {
+        let (tx, rx) = oneshot::channel();
+        self.cb_remote.spawn(move |handle| {
+            // Ensure we're running on a loop different to the one
+            // invoking spawn_fn.
+            assert_ne!(id, handle.id());
+            let stream = UnixStream::from_stream(stm2, handle).unwrap();
+            let transport = framed(stream, Default::default());
+            let rpc = rpc::bind_client::<CallbackClient>(transport, handle);
+            drop(tx.send(rpc));
+            Ok(())
+        });
+
+        let rpc: rpc::ClientProxy<CallbackReq, CallbackResp> = match rx.wait() {
+            Ok(rpc) => rpc,
+            Err(_) => bail!("Failed to create callback rpc.")
+        };
+
+        context
+            .stream_init(
+                &params,
+                Callback {
+                    input_frame_size: input_frame_size,
+                    output_frame_size: output_frame_size,
+                    input_shm: input_shm,
+                    output_shm: output_shm,
+                    rpc: rpc
+                }
+            )
+            .and_then(|stream| {
                 if !self.streams.has_available() {
                     trace!(
                         "server connection ran out of stream slots. reserving {} more.",
                         STREAM_CONN_CHUNK_SIZE
                     );
                     self.streams.reserve_exact(STREAM_CONN_CHUNK_SIZE);
                 }
 
                 let stm_tok = match self.streams.vacant_entry() {
                     Some(entry) => {
-                        debug!(
-                            "Registering stream {:?}",
-                            entry.index(),
-                        );
+                        debug!("Registering stream {:?}", entry.index(),);
 
                         entry.insert(stream).index()
                     },
                     None => {
                         // TODO: Turn into error
-                        panic!("Failed to insert stream into slab. No entries");
-                    },
+                        panic!("Failed to insert stream into slab. No entries")
+                    }
                 };
 
-                try!(self.queue_init_messages(
-                    stm_tok,
-                    conn1,
-                    input_file,
-                    output_file
-                ));
-                None
-            },
-            Err(e) => Some(error(e)),
-        };
-
-        if let Some(err) = err {
-            try!(self.queue_message(err))
-        }
-
-        Ok(())
-    }
-
-    fn queue_init_messages<T, U, V>(&mut self, stm_tok: usize, conn: T, input_file: U, output_file: V) -> Result<()>
-    where
-        T: IntoRawFd,
-        U: IntoRawFd,
-        V: IntoRawFd,
-    {
-        try!(self.queue_message_fd(
-            ClientMessage::StreamCreated(stm_tok),
-            conn
-        ));
-        try!(self.queue_message_fd(
-            ClientMessage::StreamCreatedInputShm,
-            input_file
-        ));
-        try!(self.queue_message_fd(
-            ClientMessage::StreamCreatedOutputShm,
-            output_file
-        ));
-        Ok(())
-    }
-
-    fn queue_message(&mut self, msg: ClientMessage) -> Result<()> {
-        debug!("queue_message: {:?}", msg);
-        encode::<ClientMessage>(&mut self.send_buffer, &msg).or_else(|e| {
-            Err(e).chain_err(|| "Failed to encode msg into send buffer")
-        })
-    }
-
-    // Since send_fd supports sending one RawFd at a time, queuing a
-    // message with a RawFd forces use to take the current send_buffer
-    // and move it pending queue.
-    fn queue_message_fd<FD: IntoRawFd>(&mut self, msg: ClientMessage, fd: FD) -> Result<()> {
-        let fd = fd.into_raw_fd();
-        debug!("queue_message_fd: {:?} {:?}", msg, fd);
-        try!(self.queue_message(msg));
-        self.take_pending_send(Some(fd));
-        Ok(())
-    }
-
-    // Take the current messages in the send_buffer and move them to
-    // pending queue.
-    fn take_pending_send(&mut self, fd: Option<RawFd>) {
-        let pending = self.send_buffer.take().freeze();
-        debug!("take_pending_send: ({:?} {:?})", pending, fd);
-        self.pending_send.push_back((
-            pending,
-            fd.map(|fd| unsafe { AutoCloseFd::from_raw_fd(fd) })
-        ));
-    }
-
-    // Process the pending queue and send them to client.
-    fn flush_pending_send(&mut self) -> Result<Ready> {
-        debug!("flush_pending_send");
-        // take any pending messages in the send buffer.
-        if !self.send_buffer.is_empty() {
-            self.take_pending_send(None);
-        }
-
-        trace!("pending queue: {:?}", self.pending_send);
-
-        let mut result = Ready::readable();
-        let mut processed = 0;
-
-        for &mut (ref mut buf, ref mut fd) in &mut self.pending_send {
-            trace!("sending buf {:?}, fd {:?}", buf, fd);
-            let r = {
-                let mut src = Cursor::new(buf.as_ref());
-                let fd = match *fd {
-                    Some(ref fd) => Some(fd.as_raw_fd()),
-                    None => None,
-                };
-                try!(self.io.send_buf_fd(&mut src, fd))
-            };
-            match r {
-                Async::Ready(n) if n == buf.len() => {
-                    processed += 1;
-                },
-                Async::Ready(n) => {
-                    let _ = buf.split_to(n);
-                    let _ = fd.take();
-                    result.insert(Ready::writable());
-                    break;
-                },
-                Async::NotReady => {
-                    result.insert(Ready::writable());
-                },
-            }
-        }
-
-        debug!("processed {} buffers", processed);
-
-        self.pending_send = self.pending_send.split_off(processed);
-
-        trace!("pending queue: {:?}", self.pending_send);
-
-        Ok(result)
+                Ok(ClientMessage::StreamCreated(StreamCreate {
+                    token: stm_tok,
+                    fds: [
+                        stm1.into_raw_fd(),
+                        input_file.into_raw_fd(),
+                        output_file.into_raw_fd(),
+                    ]
+                }))
+            })
+            .map_err(|e| e.into())
     }
 }
 
-pub struct Server {
-    // Ok(None)      - Server hasn't tried to create cubeb::Context.
-    // Ok(Some(ctx)) - Server has successfully created cubeb::Context.
-    // Err(_)        - Server has tried and failed to create cubeb::Context.
-    //                 Don't try again.
-    context: Option<Result<cubeb::Context>>,
-    conns: Slab<ServerConn>,
-    rx: channel::Receiver<Command>,
-    tx: std::sync::mpsc::Sender<Response>,
+struct ServerWrapper {
+    core_thread: core::CoreThread,
+    callback_thread: core::CoreThread
 }
 
-impl Drop for Server {
-    fn drop(&mut self) {
-        // ServerConns rely on the cubeb context, so we must free them
-        // explicitly before the context is dropped.
-        if !self.conns.is_empty() {
-            debug!("dropping Server with {} live ServerConns", self.conns.len());
-            self.conns.clear();
-        }
+fn run() -> Result<ServerWrapper> {
+    trace!("Starting up cubeb audio server event loop thread...");
+
+    let callback_thread = try!(
+        core::spawn_thread("AudioIPC Callback RPC", || {
+            trace!("Starting up cubeb audio callback event loop thread...");
+            Ok(())
+        }).or_else(|e| {
+            debug!(
+                "Failed to start cubeb audio callback event loop thread: {:?}",
+                e.description()
+            );
+            Err(e)
+        })
+    );
+
+    let core_thread = try!(
+        core::spawn_thread("AudioIPC Server RPC", move || Ok(())).or_else(|e| {
+            debug!(
+                "Failed to cubeb audio core event loop thread: {:?}",
+                e.description()
+            );
+            Err(e)
+        })
+    );
+
+    Ok(ServerWrapper {
+        core_thread: core_thread,
+        callback_thread: callback_thread
+    })
+}
+
+#[no_mangle]
+pub extern "C" fn audioipc_server_start() -> *mut c_void {
+    match run() {
+        Ok(server) => Box::into_raw(Box::new(server)) as *mut _,
+        Err(_) => ptr::null_mut() as *mut _
     }
 }
 
-impl Server {
-    pub fn new(rx: channel::Receiver<Command>,
-               tx: std::sync::mpsc::Sender<Response>) -> Server {
-        Server {
-            context: None,
-            conns: slab::Slab::with_capacity(SERVER_CONN_CHUNK_SIZE),
-            rx: rx,
-            tx: tx,
-        }
-    }
-
-    fn handle_new_connection(&mut self, poll: &mut mio::Poll, client_socket: UnixStream) -> Result<()> {
-        if !self.conns.has_available() {
-            trace!(
-                "server ran out of connection slots. reserving {} more.",
-                SERVER_CONN_CHUNK_SIZE
-            );
-            self.conns.reserve_exact(SERVER_CONN_CHUNK_SIZE);
-        }
-
-        let token = match self.conns.vacant_entry() {
-            Some(entry) => {
-                debug!("registering {:?}", entry.index());
-                let cxn = ServerConn::new(client_socket);
-                entry.insert(cxn).index()
-            },
-            None => {
-                panic!("failed to insert connection");
-            },
-        };
+#[no_mangle]
+pub extern "C" fn audioipc_server_new_client(p: *mut c_void) -> libc::c_int {
+    let (wait_tx, wait_rx) = oneshot::channel();
+    let wrapper: &ServerWrapper = unsafe { &*(p as *mut _) };
 
-        // Register the connection
-        self.conns[token].token = Some(token);
-        poll.register(
-            &self.conns[token].io,
-            token,
-            mio::Ready::readable(),
-            mio::PollOpt::edge() | mio::PollOpt::oneshot()
-        ).unwrap();
-        /*
-        let r = self.conns[token].receive();
-        debug!("received {:?}", r);
-        let r = self.conns[token].send(ClientMessage::ClientConnected);
-        debug!("sent {:?} (ClientConnected)", r);
-         */
-
-        // Since we have a connection try creating a cubeb context. If
-        // it fails, mark the failure with Err.
-        if self.context.is_none() {
-            self.context = Some(cubeb::Context::init("AudioIPC Server", None).or_else(|e| {
-                Err(e).chain_err(|| "Unable to create cubeb context.")
-            }))
-        }
-
-        Ok(())
-    }
-
-    fn new_connection(&mut self, poll: &mut mio::Poll) -> Result<AutoCloseFd> {
-        let (s1, s2) = UnixStream::pair()?;
-        self.handle_new_connection(poll, s1)?;
-        unsafe {
-            Ok(audioipc::AutoCloseFd::from_raw_fd(s2.into_raw_fd()))
-        }
-    }
-
-    pub fn poll(&mut self, poll: &mut mio::Poll) -> Result<()> {
-        let mut events = mio::Events::with_capacity(16);
+    let cb_remote = wrapper.callback_thread.remote();
 
-        match poll.poll(&mut events, None) {
-            Ok(_) => {},
-            Err(ref e) if e.kind() == std::io::ErrorKind::Interrupted => return Ok(()),
-            Err(e) => warn!("server poll error: {}", e),
-        }
-
-        for event in events.iter() {
-            match event.token() {
-                CMD => {
-                    match self.rx.try_recv().unwrap() {
-                        Command::Quit => {
-                            info!("Quitting Audio Server loop");
-                            self.tx.send(Response::Quit).unwrap();
-                            bail!("quit");
-                        },
-                        Command::NewConnection => {
-                            info!("Creating new connection");
-                            let fd = self.new_connection(poll)?;
-                            self.tx.send(Response::Connection(fd)).unwrap();
-                        }
-                    }
-                },
-                token => {
-                    trace!("token {:?} ready", token);
-
-                    let context = self.context.as_ref().expect(
-                        "Shouldn't receive a message before accepting connection."
-                    );
-
-                    let mut readiness = Ready::readable();
+    // We create a pair of connected unix domain sockets. One socket is
+    // registered with the reactor core, the other is returned to the
+    // caller.
+    net::UnixStream::pair()
+        .and_then(|(sock1, sock2)| {
+            // Spawn closure to run on same thread as reactor::Core
+            // via remote handle.
+            wrapper.core_thread.remote().spawn(|handle| {
+                trace!("Incoming connection");
+                UnixStream::from_stream(sock2, handle)
+                    .and_then(|sock| {
+                        let transport = framed_with_fds(sock, Default::default());
+                        rpc::bind_server(transport, CubebServer::new(cb_remote), handle);
+                        Ok(())
+                    })
+                    .map_err(|_| ())
+                    // Notify waiting thread that sock2 has been registered.
+                    .and_then(|_| wait_tx.send(()))
+            });
+            // Wait for notification that sock2 has been registered
+            // with reactor::Core.
+            let _ = wait_rx.wait();
+            Ok(sock1.into_raw_fd())
+        })
+        .unwrap_or(-1)
+}
 
-                    if event.readiness().is_readable() {
-                        let r = self.conns[token].process_read(context);
-                        trace!("got {:?}", r);
-
-                        if let Err(e) = r {
-                            debug!("dropped client {:?} due to error {:?}", token, e);
-                            self.conns.remove(token);
-                            continue;
-                        }
-                    };
-
-                    if event.readiness().is_writable() {
-                        let r = self.conns[token].flush_pending_send();
-                        trace!("got {:?}", r);
-
-                        match r {
-                            Ok(r) => readiness = r,
-                            Err(e) => {
-                                debug!("dropped client {:?} due to error {:?}", token, e);
-                                self.conns.remove(token);
-                                continue;
-                            },
-                        }
-                    };
-
-                    poll.reregister(
-                        &self.conns[token].io,
-                        token,
-                        readiness,
-                        mio::PollOpt::edge() | mio::PollOpt::oneshot()
-                    ).unwrap();
-                },
-            }
-        }
-
-        Ok(())
-    }
+#[no_mangle]
+pub extern "C" fn audioipc_server_stop(p: *mut c_void) {
+    let wrapper = unsafe { Box::<ServerWrapper>::from_raw(p as *mut _) };
+    drop(wrapper);
 }
 
 fn error(error: cubeb::Error) -> ClientMessage {
     ClientMessage::Error(error.raw_code())
 }
-
-struct ServerWrapper {
-    thread_handle: std::thread::JoinHandle<()>,
-    tx: channel::Sender<Command>,
-    rx: std::sync::mpsc::Receiver<Response>,
-}
-
-#[derive(Debug)]
-pub enum Command {
-    Quit,
-    NewConnection,
-}
-
-#[derive(Debug)]
-pub enum Response {
-    Quit,
-    Connection(AutoCloseFd)
-}
-
-impl ServerWrapper {
-    fn shutdown(self) {
-        let _ = self.tx.send(Command::Quit);
-        let r = self.rx.recv();
-        match r {
-            Ok(Response::Quit) => info!("server quit response received"),
-            e => warn!("unexpected response to server quit: {:?}", e),
-        };
-        self.thread_handle.join().unwrap();
-    }
-}
-
-#[no_mangle]
-pub extern "C" fn audioipc_server_start() -> *mut c_void {
-    let (tx, rx) = channel::channel();
-    let (tx2, rx2) = std::sync::mpsc::channel();
-
-    let handle = thread::spawn(move || {
-        let mut poll = mio::Poll::new().unwrap();
-        let mut server = Server::new(rx, tx2);
-
-        poll.register(&server.rx, CMD, mio::Ready::readable(), mio::PollOpt::edge())
-            .unwrap();
-
-        loop {
-            if server.poll(&mut poll).is_err() {
-                return;
-            }
-        }
-    });
-
-    let wrapper = ServerWrapper {
-        thread_handle: handle,
-        tx: tx,
-        rx: rx2,
-    };
-
-    Box::into_raw(Box::new(wrapper)) as *mut _
-}
-
-#[no_mangle]
-pub extern "C" fn audioipc_server_new_client(p: *mut c_void) -> libc::c_int {
-    let wrapper: &mut ServerWrapper = unsafe { &mut *(p as *mut _) };
-
-    wrapper.tx.send(Command::NewConnection).unwrap();
-
-    if let Response::Connection(fd) = wrapper.rx.recv().unwrap() {
-        return fd.into_raw_fd();
-    }
-
-    -1
-}
-
-#[no_mangle]
-pub extern "C" fn audioipc_server_stop(p: *mut c_void) {
-    let wrapper = unsafe { Box::<ServerWrapper>::from_raw(p as *mut _) };
-    wrapper.shutdown();
-}