Bug 1440538 - P2: Update audioipc to commit 933fb48. r?kinetik draft
authorDan Glastonbury <dan.glastonbury@gmail.com>
Tue, 13 Feb 2018 14:39:06 +1000
changeset 758836 66e7dea34f6e05d9b007e9611dbb3efbf8b1ab94
parent 758835 70a17dd2f8090f411c81b4e924238b408e6d806c
child 758837 161cc5e219c3545047c03dea1d49b7a466626e26
push id100192
push userbmo:dglastonbury@mozilla.com
push dateFri, 23 Feb 2018 04:06:09 +0000
reviewerskinetik
bugs1440538
milestone60.0a1
Bug 1440538 - P2: Update audioipc to commit 933fb48. r?kinetik MozReview-Commit-ID: 29VUZKxz3xR
media/audioipc/README_MOZILLA
media/audioipc/audioipc/Cargo.toml
media/audioipc/audioipc/src/async.rs
media/audioipc/audioipc/src/cmsg.rs
media/audioipc/audioipc/src/codec.rs
media/audioipc/audioipc/src/core.rs
media/audioipc/audioipc/src/errors.rs
media/audioipc/audioipc/src/fd_passing.rs
media/audioipc/audioipc/src/frame.rs
media/audioipc/audioipc/src/lib.rs
media/audioipc/audioipc/src/messages.rs
media/audioipc/audioipc/src/msg.rs
media/audioipc/audioipc/src/rpc/client/mod.rs
media/audioipc/audioipc/src/rpc/client/proxy.rs
media/audioipc/audioipc/src/rpc/driver.rs
media/audioipc/audioipc/src/rpc/server.rs
media/audioipc/audioipc/src/shm.rs
media/audioipc/client/Cargo.toml
media/audioipc/client/src/context.rs
media/audioipc/client/src/lib.rs
media/audioipc/client/src/send_recv.rs
media/audioipc/client/src/stream.rs
media/audioipc/gecko.patch
media/audioipc/server/Cargo.toml
media/audioipc/server/src/lib.rs
--- a/media/audioipc/README_MOZILLA
+++ b/media/audioipc/README_MOZILLA
@@ -1,8 +1,8 @@
 The source from this directory was copied from the audioipc-2
 git repository using the update.sh script.  The only changes
 made were those applied by update.sh and the addition of
 Makefile.in build files for the Mozilla build system.
 
 The audioipc-2 git repository is: https://github.com/djg/audioipc-2.git
 
-The git commit ID used was d7798606aa590ef402344b7a519a0053725a9805 (2018-01-27 09:07:03 +1000)
+The git commit ID used was 933fb48b252a10569ba8d598541577c6f2dc308f (2018-02-21 17:13:04 +1000)
--- a/media/audioipc/audioipc/Cargo.toml
+++ b/media/audioipc/audioipc/Cargo.toml
@@ -3,17 +3,17 @@ name = "audioipc"
 version = "0.2.1"
 authors = [
         "Matthew Gregan <kinetik@flim.org>",
         "Dan Glastonbury <dan.glastonbury@gmail.com>"
         ]
 description = "Remote Cubeb IPC"
 
 [dependencies]
-cubeb-core = { path = "../../cubeb-rs/cubeb-core" }
+cubeb = "0.4"
 bincode = "0.8"
 bytes = "0.4"
 # rayon-core in Gecko uses futures 0.1.13
 futures = "=0.1.13"
 iovec = "0.1"
 libc = "0.2"
 log = "^0.3.6"
 memmap = "0.5.2"
--- a/media/audioipc/audioipc/src/async.rs
+++ b/media/audioipc/audioipc/src/async.rs
@@ -108,21 +108,21 @@ impl AsyncRecvMsg for UnixStream {
             Ok((n, cmsg_len, flags)) => {
                 unsafe {
                     buf.advance_mut(n);
                 }
                 unsafe {
                     cmsg.advance_mut(cmsg_len);
                 }
                 Ok((n, flags).into())
-            },
+            }
             Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
                 self.need_read();
                 Ok(Async::NotReady)
-            },
+            }
             Err(e) => Err(e),
         }
     }
 }
 
 impl AsyncSendMsg for UnixStream {
     fn send_msg_buf<B, C>(&mut self, buf: &mut B, cmsg: &C) -> Poll<usize, io::Error>
     where
@@ -134,41 +134,26 @@ impl AsyncSendMsg for UnixStream {
         }
         let r = {
             // The `IoVec` type can't have a zero-length size, so create a dummy
             // version from a 1-length slice which we'll overwrite with the
             // `bytes_vec` method.
             static DUMMY: &[u8] = &[0];
             let nom = <&IoVec>::from(DUMMY);
             let mut bufs = [
-                nom,
-                nom,
-                nom,
-                nom,
-                nom,
-                nom,
-                nom,
-                nom,
-                nom,
-                nom,
-                nom,
-                nom,
-                nom,
-                nom,
-                nom,
-                nom,
+                nom, nom, nom, nom, nom, nom, nom, nom, nom, nom, nom, nom, nom, nom, nom, nom
             ];
             let n = buf.bytes_vec(&mut bufs);
             self.send_msg(&bufs[..n], cmsg.bytes())
         };
         match r {
             Ok(n) => {
                 buf.advance(n);
                 Ok(Async::Ready(n))
-            },
+            }
             Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
                 self.need_write();
                 Ok(Async::NotReady)
-            },
+            }
             Err(e) => Err(e),
         }
     }
 }
--- a/media/audioipc/audioipc/src/cmsg.rs
+++ b/media/audioipc/audioipc/src/cmsg.rs
@@ -5,17 +5,17 @@
 
 use bytes::{BufMut, Bytes, BytesMut};
 use libc::{self, cmsghdr};
 use std::{convert, mem, ops, slice};
 use std::os::unix::io::RawFd;
 
 #[derive(Clone, Debug)]
 pub struct Fds {
-    fds: Bytes
+    fds: Bytes,
 }
 
 impl convert::AsRef<[RawFd]> for Fds {
     fn as_ref(&self) -> &[RawFd] {
         let n = self.fds.len() / mem::size_of::<RawFd>();
         unsafe { slice::from_raw_parts(self.fds.as_ptr() as *const _, n) }
     }
 }
@@ -25,23 +25,21 @@ impl ops::Deref for Fds {
 
     #[inline]
     fn deref(&self) -> &[RawFd] {
         self.as_ref()
     }
 }
 
 pub struct ControlMsgIter {
-    control: Bytes
+    control: Bytes,
 }
 
 pub fn iterator(c: Bytes) -> ControlMsgIter {
-    ControlMsgIter {
-        control: c
-    }
+    ControlMsgIter { control: c }
 }
 
 impl Iterator for ControlMsgIter {
     type Item = Fds;
 
     // This follows the logic in __cmsg_nxthdr from glibc
     // /usr/include/bits/socket.h
     fn next(&mut self) -> Option<Self::Item> {
@@ -68,60 +66,60 @@ impl Iterator for ControlMsgIter {
             } else {
                 control.slice_from(next_cmsghdr)
             };
 
             match (cmsg.cmsg_level, cmsg.cmsg_type) {
                 (libc::SOL_SOCKET, libc::SCM_RIGHTS) => {
                     trace!("Found SCM_RIGHTS...");
                     return Some(Fds {
-                        fds: control.slice(cmsghdr_len, cmsg_len as _)
+                        fds: control.slice(cmsghdr_len, cmsg_len as _),
                     });
-                },
+                }
                 (level, kind) => {
                     trace!("Skipping cmsg level, {}, type={}...", level, kind);
-                },
+                }
             }
         }
     }
 }
 
 #[derive(Clone, Copy, Debug, Eq, Hash, Ord, PartialEq, PartialOrd)]
 pub enum Error {
     /// Not enough space in storage to insert control mesage.
-    NoSpace
+    NoSpace,
 }
 
 #[must_use]
 pub struct ControlMsgBuilder {
-    result: Result<BytesMut, Error>
+    result: Result<BytesMut, Error>,
 }
 
 pub fn builder(buf: &mut BytesMut) -> ControlMsgBuilder {
     let buf = aligned(buf);
-    ControlMsgBuilder {
-        result: Ok(buf)
-    }
+    ControlMsgBuilder { result: Ok(buf) }
 }
 
 impl ControlMsgBuilder {
     fn msg(mut self, level: libc::c_int, kind: libc::c_int, msg: &[u8]) -> Self {
         self.result = self.result.and_then(align_buf).and_then(|mut cmsg| {
             let cmsg_len = len(msg.len());
             if cmsg.remaining_mut() < cmsg_len {
                 return Err(Error::NoSpace);
             }
 
             let cmsghdr = cmsghdr {
                 cmsg_len: cmsg_len as _,
                 cmsg_level: level,
-                cmsg_type: kind
+                cmsg_type: kind,
             };
 
-            let cmsghdr = unsafe { slice::from_raw_parts(&cmsghdr as *const _ as *const _, mem::size_of::<cmsghdr>()) };
+            let cmsghdr = unsafe {
+                slice::from_raw_parts(&cmsghdr as *const _ as *const _, mem::size_of::<cmsghdr>())
+            };
             cmsg.put_slice(cmsghdr);
             let mut cmsg = try!(align_buf(cmsg));
             cmsg.put_slice(msg);
 
             Ok(cmsg)
         });
 
         self
--- a/media/audioipc/audioipc/src/codec.rs
+++ b/media/audioipc/audioipc/src/codec.rs
@@ -30,48 +30,48 @@ pub trait Codec {
 
     /// A default method available to be called when there are no more bytes
     /// available to be read from the I/O.
     fn decode_eof(&mut self, buf: &mut BytesMut) -> io::Result<Self::Out> {
         match try!(self.decode(buf)) {
             Some(frame) => Ok(frame),
             None => Err(io::Error::new(
                 io::ErrorKind::Other,
-                "bytes remaining on stream"
-            ))
+                "bytes remaining on stream",
+            )),
         }
     }
 
     /// Encodes a frame intox the buffer provided.
     fn encode(&mut self, msg: Self::In, buf: &mut BytesMut) -> io::Result<()>;
 }
 
 /// Codec based upon bincode serialization
 ///
 /// Messages that have been serialized using bincode are prefixed with
 /// the length of the message to aid in deserialization, so that it's
 /// known if enough data has been received to decode a complete
 /// message.
 pub struct LengthDelimitedCodec<In, Out> {
     state: State,
     __in: PhantomData<In>,
-    __out: PhantomData<Out>
+    __out: PhantomData<Out>,
 }
 
 enum State {
     Length,
-    Data(u16)
+    Data(u16),
 }
 
 impl<In, Out> Default for LengthDelimitedCodec<In, Out> {
     fn default() -> Self {
         LengthDelimitedCodec {
             state: State::Length,
             __in: PhantomData,
-            __out: PhantomData
+            __out: PhantomData,
         }
     }
 }
 
 impl<In, Out> LengthDelimitedCodec<In, Out> {
     // Lengths are encoded as little endian u16
     fn decode_length(&mut self, buf: &mut BytesMut) -> io::Result<Option<u16>> {
         if buf.len() < 2 {
@@ -84,97 +84,97 @@ impl<In, Out> LengthDelimitedCodec<In, O
         // Consume the length field
         let _ = buf.split_to(2);
 
         Ok(Some(n))
     }
 
     fn decode_data(&mut self, buf: &mut BytesMut, n: u16) -> io::Result<Option<Out>>
     where
-        Out: DeserializeOwned + Debug
+        Out: DeserializeOwned + Debug,
     {
         // At this point, the buffer has already had the required capacity
         // reserved. All there is to do is read.
         let n = n as usize;
         if buf.len() < n {
             return Ok(None);
         }
 
         let buf = buf.split_to(n).freeze();
 
         trace!("Attempting to decode");
         let msg = try!(deserialize::<Out>(buf.as_ref()).map_err(|e| match *e {
             bincode::ErrorKind::IoError(e) => e,
-            _ => io::Error::new(io::ErrorKind::Other, *e)
+            _ => io::Error::new(io::ErrorKind::Other, *e),
         }));
 
         trace!("... Decoded {:?}", msg);
         Ok(Some(msg))
     }
 }
 
 impl<In, Out> Codec for LengthDelimitedCodec<In, Out>
 where
     In: Serialize + Debug,
-    Out: DeserializeOwned + Debug
+    Out: DeserializeOwned + Debug,
 {
     type In = In;
     type Out = Out;
 
     fn decode(&mut self, buf: &mut BytesMut) -> io::Result<Option<Self::Out>> {
         let n = match self.state {
             State::Length => {
                 match try!(self.decode_length(buf)) {
                     Some(n) => {
                         self.state = State::Data(n);
 
                         // Ensure that the buffer has enough space to read the
                         // incoming payload
                         buf.reserve(n as usize);
 
                         n
-                    },
-                    None => return Ok(None)
+                    }
+                    None => return Ok(None),
                 }
-            },
-            State::Data(n) => n
+            }
+            State::Data(n) => n,
         };
 
         match try!(self.decode_data(buf, n)) {
             Some(data) => {
                 // Update the decode state
                 self.state = State::Length;
 
                 // Make sure the buffer has enough space to read the next head
                 buf.reserve(2);
 
                 Ok(Some(data))
-            },
-            None => Ok(None)
+            }
+            None => Ok(None),
         }
     }
 
     fn encode(&mut self, item: Self::In, buf: &mut BytesMut) -> io::Result<()> {
         trace!("Attempting to encode");
         let encoded_len = serialized_size(&item);
         if encoded_len > 8 * 1024 {
             return Err(io::Error::new(
                 io::ErrorKind::InvalidInput,
-                "encoded message too big"
+                "encoded message too big",
             ));
         }
 
         buf.reserve((encoded_len + 2) as usize);
 
         buf.put_u16::<LittleEndian>(encoded_len as u16);
 
         if let Err(e) =
             serialize_into::<_, Self::In, _>(&mut buf.writer(), &item, Bounded(encoded_len))
         {
             match *e {
                 bincode::ErrorKind::IoError(e) => return Err(e),
-                _ => return Err(io::Error::new(io::ErrorKind::Other, *e))
+                _ => return Err(io::Error::new(io::ErrorKind::Other, *e)),
             }
         }
 
         Ok(())
     }
 }
--- a/media/audioipc/audioipc/src/core.rs
+++ b/media/audioipc/audioipc/src/core.rs
@@ -11,37 +11,37 @@ scoped_thread_local! {
 }
 
 pub fn handle() -> Handle {
     HANDLE.with(|handle| handle.clone())
 }
 
 pub fn spawn<F>(f: F)
 where
-    F: Future<Item = (), Error = ()> + 'static
+    F: Future<Item = (), Error = ()> + 'static,
 {
     HANDLE.with(|handle| handle.spawn(f))
 }
 
 pub fn spawn_fn<F, R>(f: F)
 where
     F: FnOnce() -> R + 'static,
-    R: IntoFuture<Item = (), Error = ()> + 'static
+    R: IntoFuture<Item = (), Error = ()> + 'static,
 {
     HANDLE.with(|handle| handle.spawn_fn(f))
 }
 
 struct Inner {
     join: thread::JoinHandle<()>,
-    shutdown: oneshot::Sender<()>
+    shutdown: oneshot::Sender<()>,
 }
 
 pub struct CoreThread {
     inner: Option<Inner>,
-    remote: Remote
+    remote: Remote,
 }
 
 impl CoreThread {
     pub fn remote(&self) -> Remote {
         self.remote.clone()
     }
 }
 
@@ -60,17 +60,17 @@ impl fmt::Debug for CoreThread {
         // f.debug_tuple("CoreThread").field(&"...").finish()
         f.debug_tuple("CoreThread").field(&self.remote).finish()
     }
 }
 
 pub fn spawn_thread<S, F>(name: S, f: F) -> io::Result<CoreThread>
 where
     S: Into<String>,
-    F: FnOnce() -> io::Result<()> + Send + 'static
+    F: FnOnce() -> io::Result<()> + Send + 'static,
 {
     let (shutdown_tx, shutdown_rx) = oneshot::channel::<()>();
     let (remote_tx, remote_rx) = mpsc::channel::<Remote>();
 
     let join = try!(thread::Builder::new().name(name.into()).spawn(move || {
         let mut core = Core::new().expect("Failed to create reactor::Core");
         let handle = core.handle();
         let remote = handle.remote().clone();
@@ -80,23 +80,21 @@ where
             f().and_then(|_| {
                 let _ = core.run(shutdown_rx);
                 Ok(())
             })
         }));
         trace!("thread shutdown...");
     }));
 
-    let remote = try!(remote_rx.recv().or_else(|_| {
-        Err(io::Error::new(
-            io::ErrorKind::Other,
-            "Failed to receive remote handle from spawned thread"
-        ))
-    }));
+    let remote = try!(remote_rx.recv().or_else(|_| Err(io::Error::new(
+        io::ErrorKind::Other,
+        "Failed to receive remote handle from spawned thread"
+    ))));
 
     Ok(CoreThread {
         inner: Some(Inner {
             join: join,
-            shutdown: shutdown_tx
+            shutdown: shutdown_tx,
         }),
-        remote: remote
+        remote: remote,
     })
 }
--- a/media/audioipc/audioipc/src/errors.rs
+++ b/media/audioipc/audioipc/src/errors.rs
@@ -1,17 +1,17 @@
 use bincode;
-use cubeb_core;
+use cubeb;
 use std;
 
 error_chain! {
     // Maybe replace with chain_err to improve the error info.
     foreign_links {
         Bincode(bincode::Error);
         Io(std::io::Error);
-        Cubeb(cubeb_core::Error);
+        Cubeb(cubeb::Error);
     }
 
     // Replace bail!(str) with explicit errors.
     errors {
         Disconnected
     }
 }
--- a/media/audioipc/audioipc/src/fd_passing.rs
+++ b/media/audioipc/audioipc/src/fd_passing.rs
@@ -15,25 +15,25 @@ use std::collections::VecDeque;
 use std::os::unix::io::RawFd;
 
 const INITIAL_CAPACITY: usize = 1024;
 const BACKPRESSURE_THRESHOLD: usize = 4 * INITIAL_CAPACITY;
 const FDS_CAPACITY: usize = 16;
 
 struct IncomingFds {
     cmsg: BytesMut,
-    recv_fds: Option<cmsg::ControlMsgIter>
+    recv_fds: Option<cmsg::ControlMsgIter>,
 }
 
 impl IncomingFds {
     pub fn new(c: usize) -> Self {
         let capacity = c * cmsg::space(mem::size_of::<[RawFd; 3]>());
         IncomingFds {
             cmsg: BytesMut::with_capacity(capacity),
-            recv_fds: None
+            recv_fds: None,
         }
     }
 
     pub fn take_fds(&mut self) -> Option<[RawFd; 3]> {
         loop {
             let fds = self.recv_fds
                 .as_mut()
                 .and_then(|recv_fds| recv_fds.next())
@@ -55,38 +55,38 @@ impl IncomingFds {
         self.cmsg.reserve(cmsg::space(mem::size_of::<[RawFd; 3]>()));
         &mut self.cmsg
     }
 }
 
 #[derive(Debug)]
 struct Frame {
     msgs: Bytes,
-    fds: Option<Bytes>
+    fds: Option<Bytes>,
 }
 
 /// A unified `Stream` and `Sink` interface over an I/O object, using
 /// the `Codec` trait to encode and decode the payload.
 pub struct FramedWithFds<A, C> {
     io: A,
     codec: C,
     // Stream
     read_buf: BytesMut,
     incoming_fds: IncomingFds,
     is_readable: bool,
     eof: bool,
     // Sink
     frames: VecDeque<Frame>,
     write_buf: BytesMut,
-    outgoing_fds: BytesMut
+    outgoing_fds: BytesMut,
 }
 
 impl<A, C> FramedWithFds<A, C>
 where
-    A: AsyncSendMsg
+    A: AsyncSendMsg,
 {
     // If there is a buffered frame, try to write it to `A`
     fn do_write(&mut self) -> Poll<(), io::Error> {
         debug!("do_write...");
         // Create a frame from any pending message in `write_buf`.
         if !self.write_buf.is_empty() {
             self.set_frame(None);
         }
@@ -97,20 +97,20 @@ where
 
         loop {
             let n = match self.frames.front() {
                 Some(frame) => {
                     trace!("sending msg {:?}, fds {:?}", frame.msgs, frame.fds);
                     let mut msgs = frame.msgs.clone().into_buf();
                     let mut fds = match frame.fds {
                         Some(ref fds) => fds.clone(),
-                        None => Bytes::new()
+                        None => Bytes::new(),
                     }.into_buf();
                     try_ready!(self.io.send_msg_buf(&mut msgs, &fds))
-                },
+                }
                 _ => {
                     // No pending frames.
                     return Ok(().into());
                 }
             };
 
             match self.frames.pop_front() {
                 Some(mut frame) => {
@@ -132,18 +132,18 @@ where
                         // re-queue the remaining message at the head
                         // of the queue. (Don't need to resend the fds
                         // since they've been sent with the first
                         // part.)
                         drop(frame.msgs.split_to(n));
                         self.frames.push_front(frame);
                         break;
                     }
-                },
-                _ => panic!()
+                }
+                _ => panic!(),
             }
         }
         debug!("process {} frames", processed);
 
         trace!("pending frames: {:?}", self.frames);
 
         Ok(().into())
     }
@@ -153,28 +153,25 @@ where
             assert!(fds.is_none());
             trace!("set_frame: No pending messages...");
             return;
         }
 
         let msgs = self.write_buf.take().freeze();
         trace!("set_frame: msgs={:?} fds={:?}", msgs, fds);
 
-        self.frames.push_back(Frame {
-            msgs,
-            fds
-        });
+        self.frames.push_back(Frame { msgs, fds });
     }
 }
 
 impl<A, C> Stream for FramedWithFds<A, C>
 where
     A: AsyncRecvMsg,
     C: Codec,
-    C::Out: AssocRawFd
+    C::Out: AssocRawFd,
 {
     type Item = C::Out;
     type Error = io::Error;
 
     fn poll(&mut self) -> Poll<Option<Self::Item>, Self::Error> {
         loop {
             // Repeatedly call `decode` or `decode_eof` as long as it is
             // "readable". Readable is defined as not having returned `None`. If
@@ -221,25 +218,22 @@ where
         }
     }
 }
 
 impl<A, C> Sink for FramedWithFds<A, C>
 where
     A: AsyncSendMsg,
     C: Codec,
-    C::In: AssocRawFd + fmt::Debug
+    C::In: AssocRawFd + fmt::Debug,
 {
     type SinkItem = C::In;
     type SinkError = io::Error;
 
-    fn start_send(
-        &mut self,
-        item: Self::SinkItem
-    ) -> StartSend<Self::SinkItem, Self::SinkError> {
+    fn start_send(&mut self, item: Self::SinkItem) -> StartSend<Self::SinkItem, Self::SinkError> {
         trace!("start_send: item={:?}", item);
 
         // If the buffer is already over BACKPRESSURE_THRESHOLD,
         // then attempt to flush it. If after flush it's *still*
         // over BACKPRESSURE_THRESHOLD, then reject the send.
         if self.write_buf.len() > BACKPRESSURE_THRESHOLD {
             try!(self.poll_complete());
             if self.write_buf.len() > BACKPRESSURE_THRESHOLD {
@@ -290,29 +284,29 @@ pub fn framed_with_fds<A, C>(io: A, code
         codec: codec,
         read_buf: BytesMut::with_capacity(INITIAL_CAPACITY),
         incoming_fds: IncomingFds::new(FDS_CAPACITY),
         is_readable: false,
         eof: false,
         frames: VecDeque::new(),
         write_buf: BytesMut::with_capacity(INITIAL_CAPACITY),
         outgoing_fds: BytesMut::with_capacity(
-            FDS_CAPACITY * cmsg::space(mem::size_of::<[RawFd; 3]>())
-        )
+            FDS_CAPACITY * cmsg::space(mem::size_of::<[RawFd; 3]>()),
+        ),
     }
 }
 
 fn write_zero() -> io::Error {
     io::Error::new(io::ErrorKind::WriteZero, "failed to write frame to io")
 }
 
 fn clone_into_array<A, T>(slice: &[T]) -> A
 where
     A: Sized + Default + AsMut<[T]>,
-    T: Clone
+    T: Clone,
 {
     let mut a = Default::default();
     <A as AsMut<[T]>>::as_mut(&mut a).clone_from_slice(slice);
     a
 }
 
 fn close_fds(fds: &[RawFd]) {
     for fd in fds {
--- a/media/audioipc/audioipc/src/frame.rs
+++ b/media/audioipc/audioipc/src/frame.rs
@@ -16,17 +16,17 @@ const BACKPRESSURE_THRESHOLD: usize = 4 
 /// the `Codec` trait to encode and decode the payload.
 pub struct Framed<A, C> {
     io: A,
     codec: C,
     read_buf: BytesMut,
     write_buf: BytesMut,
     frame: Option<<Bytes as IntoBuf>::Buf>,
     is_readable: bool,
-    eof: bool
+    eof: bool,
 }
 
 impl<A, C> Framed<A, C>
 where
     A: AsyncWrite,
 {
     // If there is a buffered frame, try to write it to `A`
     fn do_write(&mut self) -> Poll<(), io::Error> {
@@ -154,11 +154,11 @@ fn write_zero() -> io::Error {
 pub fn framed<A, C>(io: A, codec: C) -> Framed<A, C> {
     Framed {
         io: io,
         codec: codec,
         read_buf: BytesMut::with_capacity(INITIAL_CAPACITY),
         write_buf: BytesMut::with_capacity(INITIAL_CAPACITY),
         frame: None,
         is_readable: false,
-        eof: false
+        eof: false,
     }
 }
--- a/media/audioipc/audioipc/src/lib.rs
+++ b/media/audioipc/audioipc/src/lib.rs
@@ -1,35 +1,34 @@
 // Copyright © 2017 Mozilla Foundation
 //
 // This program is made available under an ISC-style license.  See the
 // accompanying file LICENSE for details
 #![allow(dead_code)] // TODO: Remove.
-
 #![recursion_limit = "1024"]
 #[macro_use]
 extern crate error_chain;
 
 #[macro_use]
 extern crate log;
 
 #[macro_use]
 extern crate serde_derive;
 
 extern crate bincode;
 extern crate bytes;
-extern crate cubeb_core;
+extern crate cubeb;
 #[macro_use]
 extern crate futures;
 extern crate iovec;
 extern crate libc;
 extern crate memmap;
-extern crate serde;
 #[macro_use]
 extern crate scoped_tls;
+extern crate serde;
 extern crate tokio_core;
 #[macro_use]
 extern crate tokio_io;
 extern crate tokio_uds;
 
 pub mod async;
 pub mod cmsg;
 pub mod codec;
@@ -38,41 +37,47 @@ pub mod fd_passing;
 pub mod frame;
 pub mod rpc;
 pub mod core;
 pub mod messages;
 mod msg;
 pub mod shm;
 
 use iovec::IoVec;
-
 #[cfg(target_os = "linux")]
 use libc::MSG_CMSG_CLOEXEC;
 pub use messages::{ClientMessage, ServerMessage};
-
 use std::env::temp_dir;
 use std::io;
 use std::os::unix::io::{AsRawFd, FromRawFd, IntoRawFd, RawFd};
 use std::path::PathBuf;
 #[cfg(not(target_os = "linux"))]
 const MSG_CMSG_CLOEXEC: libc::c_int = 0;
 
 // Extend sys::os::unix::net::UnixStream to support sending and receiving a single file desc.
 // We can extend UnixStream by using traits, eliminating the need to introduce a new wrapped
 // UnixStream type.
 pub trait RecvMsg {
-    fn recv_msg(&mut self, iov: &mut [&mut IoVec], cmsg: &mut [u8]) -> io::Result<(usize, usize, i32)>;
+    fn recv_msg(
+        &mut self,
+        iov: &mut [&mut IoVec],
+        cmsg: &mut [u8],
+    ) -> io::Result<(usize, usize, i32)>;
 }
 
 pub trait SendMsg {
     fn send_msg(&mut self, iov: &[&IoVec], cmsg: &[u8]) -> io::Result<usize>;
 }
 
 impl<T: AsRawFd> RecvMsg for T {
-    fn recv_msg(&mut self, iov: &mut [&mut IoVec], cmsg: &mut [u8]) -> io::Result<(usize, usize, i32)> {
+    fn recv_msg(
+        &mut self,
+        iov: &mut [&mut IoVec],
+        cmsg: &mut [u8],
+    ) -> io::Result<(usize, usize, i32)> {
         msg::recv_msg_with_flags(self.as_raw_fd(), iov, cmsg, MSG_CMSG_CLOEXEC)
     }
 }
 
 impl<T: AsRawFd> SendMsg for T {
     fn send_msg(&mut self, iov: &[&IoVec], cmsg: &[u8]) -> io::Result<usize> {
         msg::send_msg_with_flags(self.as_raw_fd(), iov, cmsg, 0)
     }
--- a/media/audioipc/audioipc/src/messages.rs
+++ b/media/audioipc/audioipc/src/messages.rs
@@ -1,48 +1,48 @@
 // Copyright © 2017 Mozilla Foundation
 //
 // This program is made available under an ISC-style license.  See the
 // accompanying file LICENSE for details
 
-use cubeb_core::{self, ffi};
+use cubeb::{self, ffi};
 use std::ffi::{CStr, CString};
-use std::os::raw::c_char;
+use std::os::raw::{c_char, c_int, c_uint};
 use std::os::unix::io::RawFd;
 use std::ptr;
 
 #[derive(Debug, Serialize, Deserialize)]
 pub struct Device {
     pub output_name: Option<Vec<u8>>,
-    pub input_name: Option<Vec<u8>>
+    pub input_name: Option<Vec<u8>>,
 }
 
-impl<'a> From<cubeb_core::Device<'a>> for Device {
-    fn from(info: cubeb_core::Device) -> Self {
+impl<'a> From<&'a cubeb::DeviceRef> for Device {
+    fn from(info: &'a cubeb::DeviceRef) -> Self {
         Self {
             output_name: info.output_name_bytes().map(|s| s.to_vec()),
-            input_name: info.input_name_bytes().map(|s| s.to_vec())
+            input_name: info.input_name_bytes().map(|s| s.to_vec()),
         }
     }
 }
 
 impl From<ffi::cubeb_device> for Device {
     fn from(info: ffi::cubeb_device) -> Self {
         Self {
             output_name: dup_str(info.output_name),
-            input_name: dup_str(info.input_name)
+            input_name: dup_str(info.input_name),
         }
     }
 }
 
 impl From<Device> for ffi::cubeb_device {
     fn from(info: Device) -> Self {
         Self {
             output_name: opt_str(info.output_name),
-            input_name: opt_str(info.input_name)
+            input_name: opt_str(info.input_name),
         }
     }
 }
 
 #[derive(Debug, Serialize, Deserialize)]
 pub struct DeviceInfo {
     pub devid: usize,
     pub device_id: Option<Vec<u8>>,
@@ -57,21 +57,22 @@ pub struct DeviceInfo {
     pub format: ffi::cubeb_device_fmt,
     pub default_format: ffi::cubeb_device_fmt,
     pub max_channels: u32,
     pub default_rate: u32,
     pub max_rate: u32,
     pub min_rate: u32,
 
     pub latency_lo: u32,
-    pub latency_hi: u32
+    pub latency_hi: u32,
 }
 
-impl<'a> From<&'a ffi::cubeb_device_info> for DeviceInfo {
-    fn from(info: &'a ffi::cubeb_device_info) -> Self {
+impl<'a> From<&'a cubeb::DeviceInfoRef> for DeviceInfo {
+    fn from(info: &'a cubeb::DeviceInfoRef) -> Self {
+        let info = unsafe { &*info.as_ptr() };
         DeviceInfo {
             devid: info.devid as _,
             device_id: dup_str(info.device_id),
             friendly_name: dup_str(info.friendly_name),
             group_id: dup_str(info.group_id),
             vendor_name: dup_str(info.vendor_name),
 
             device_type: info.device_type,
@@ -81,17 +82,17 @@ impl<'a> From<&'a ffi::cubeb_device_info
             format: info.format,
             default_format: info.default_format,
             max_channels: info.max_channels,
             default_rate: info.default_rate,
             max_rate: info.max_rate,
             min_rate: info.min_rate,
 
             latency_lo: info.latency_lo,
-            latency_hi: info.latency_hi
+            latency_hi: info.latency_hi,
         }
     }
 }
 
 impl From<DeviceInfo> for ffi::cubeb_device_info {
     fn from(info: DeviceInfo) -> Self {
         ffi::cubeb_device_info {
             devid: info.devid as _,
@@ -107,92 +108,72 @@ impl From<DeviceInfo> for ffi::cubeb_dev
             format: info.format,
             default_format: info.default_format,
             max_channels: info.max_channels,
             default_rate: info.default_rate,
             max_rate: info.max_rate,
             min_rate: info.min_rate,
 
             latency_lo: info.latency_lo,
-            latency_hi: info.latency_hi
+            latency_hi: info.latency_hi,
         }
     }
 }
 
-#[derive(Debug, Serialize, Deserialize)]
+#[derive(Clone, Copy, Debug, Deserialize, Serialize)]
 pub struct StreamParams {
-    pub format: u32,
-    pub rate: u16,
-    pub channels: u8,
-    pub layout: i32,
-    pub prefs: i32
+    pub format: ffi::cubeb_sample_format,
+    pub rate: c_uint,
+    pub channels: c_uint,
+    pub layout: ffi::cubeb_channel_layout,
+    pub prefs: ffi::cubeb_stream_prefs,
 }
 
-impl<'a> From<&'a ffi::cubeb_stream_params> for StreamParams {
-    fn from(params: &'a ffi::cubeb_stream_params) -> Self {
-        assert!(params.channels <= u32::from(u8::max_value()));
-
-        StreamParams {
-            format: params.format,
-            rate: params.rate as u16,
-            channels: params.channels as u8,
-            layout: params.layout,
-            prefs: params.prefs
-        }
-    }
-}
-
-impl<'a> From<&'a StreamParams> for ffi::cubeb_stream_params {
-    fn from(params: &StreamParams) -> Self {
-        ffi::cubeb_stream_params {
-            format: params.format,
-            rate: u32::from(params.rate),
-            channels: u32::from(params.channels),
-            layout: params.layout,
-            prefs: params.prefs
-        }
+impl<'a> From<&'a cubeb::StreamParamsRef> for StreamParams {
+    fn from(x: &cubeb::StreamParamsRef) -> StreamParams {
+        unsafe { *(x.as_ptr() as *mut StreamParams) }
     }
 }
 
 #[derive(Debug, Serialize, Deserialize)]
 pub struct StreamInitParams {
     pub stream_name: Option<Vec<u8>>,
     pub input_device: usize,
     pub input_stream_params: Option<StreamParams>,
     pub output_device: usize,
     pub output_stream_params: Option<StreamParams>,
-    pub latency_frames: u32
+    pub latency_frames: u32,
 }
 
 fn dup_str(s: *const c_char) -> Option<Vec<u8>> {
     if s.is_null() {
         None
     } else {
         let vec: Vec<u8> = unsafe { CStr::from_ptr(s) }.to_bytes().to_vec();
         Some(vec)
     }
 }
 
-fn opt_str(v: Option<Vec<u8>>) -> *const c_char {
+fn opt_str(v: Option<Vec<u8>>) -> *mut c_char {
     match v {
         Some(v) => match CString::new(v) {
             Ok(s) => s.into_raw(),
             Err(_) => {
                 debug!("Failed to convert bytes to CString");
-                ptr::null()
+                ptr::null_mut()
             }
         },
-        None => ptr::null()
+        None => ptr::null_mut(),
     }
 }
 
 #[derive(Debug, Serialize, Deserialize)]
 pub struct StreamCreate {
     pub token: usize,
-    pub fds: [RawFd; 3]
+    pub fds: [RawFd; 3],
 }
 
 // Client -> Server messages.
 // TODO: Callbacks should be different messages types so
 // ServerConn::process_msg doesn't have a catch-all case.
 #[derive(Debug, Serialize, Deserialize)]
 pub enum ServerMessage {
     ClientConnect,
@@ -210,17 +191,17 @@ pub enum ServerMessage {
 
     StreamStart(usize),
     StreamStop(usize),
     StreamResetDefaultDevice(usize),
     StreamGetPosition(usize),
     StreamGetLatency(usize),
     StreamSetVolume(usize, f32),
     StreamSetPanning(usize, f32),
-    StreamGetCurrentDevice(usize)
+    StreamGetCurrentDevice(usize),
 }
 
 // Server -> Client messages.
 // TODO: Streams need id.
 #[derive(Debug, Serialize, Deserialize)]
 pub enum ClientMessage {
     ClientConnected,
     ClientDisconnected,
@@ -239,52 +220,84 @@ pub enum ClientMessage {
     StreamStopped,
     StreamDefaultDeviceReset,
     StreamPosition(u64),
     StreamLatency(u32),
     StreamVolumeSet,
     StreamPanningSet,
     StreamCurrentDevice(Device),
 
-    Error(ffi::cubeb_error_code)
+    Error(c_int),
 }
 
 #[derive(Debug, Deserialize, Serialize)]
 pub enum CallbackReq {
     Data(isize, usize),
-    State(ffi::cubeb_state)
+    State(ffi::cubeb_state),
 }
 
 #[derive(Debug, Deserialize, Serialize)]
 pub enum CallbackResp {
     Data(isize),
-    State
+    State,
 }
 
 pub trait AssocRawFd {
     fn fd(&self) -> Option<[RawFd; 3]> {
         None
     }
     fn take_fd<F>(&mut self, _: F)
     where
-        F: FnOnce() -> Option<[RawFd; 3]>
+        F: FnOnce() -> Option<[RawFd; 3]>,
     {
     }
 }
 
 impl AssocRawFd for ServerMessage {}
 impl AssocRawFd for ClientMessage {
     fn fd(&self) -> Option<[RawFd; 3]> {
         match *self {
             ClientMessage::StreamCreated(ref data) => Some(data.fds),
-            _ => None
+            _ => None,
         }
     }
 
     fn take_fd<F>(&mut self, f: F)
     where
-        F: FnOnce() -> Option<[RawFd; 3]>
+        F: FnOnce() -> Option<[RawFd; 3]>,
     {
         if let ClientMessage::StreamCreated(ref mut data) = *self {
             data.fds = f().unwrap();
         }
     }
 }
+
+#[cfg(test)]
+mod test {
+    use super::StreamParams;
+    use cubeb::ffi;
+    use std::mem;
+
+    #[test]
+    fn stream_params_size_check() {
+        assert_eq!(
+            mem::size_of::<StreamParams>(),
+            mem::size_of::<ffi::cubeb_stream_params>()
+        )
+    }
+
+    #[test]
+    fn stream_params_from() {
+        let mut raw = ffi::cubeb_stream_params::default();
+        raw.format = ffi::CUBEB_SAMPLE_FLOAT32BE;
+        raw.rate = 96_000;
+        raw.channels = 32;
+        raw.layout = ffi::CUBEB_LAYOUT_3F1_LFE;
+        raw.prefs = ffi::CUBEB_STREAM_PREF_LOOPBACK;
+        let wrapped = ::cubeb::StreamParams::from(raw);
+        let params = StreamParams::from(wrapped.as_ref());
+        assert_eq!(params.format, raw.format);
+        assert_eq!(params.rate, raw.rate);
+        assert_eq!(params.channels, raw.channels);
+        assert_eq!(params.layout, raw.layout);
+        assert_eq!(params.prefs, raw.prefs);
+    }
+}
--- a/media/audioipc/audioipc/src/msg.rs
+++ b/media/audioipc/audioipc/src/msg.rs
@@ -11,27 +11,27 @@ fn cvt(r: libc::ssize_t) -> io::Result<u
         Ok(r as usize)
     }
 }
 
 // Convert return of -1 into error message, handling retry on EINTR
 fn cvt_r<F: FnMut() -> libc::ssize_t>(mut f: F) -> io::Result<usize> {
     loop {
         match cvt(f()) {
-            Err(ref e) if e.kind() == io::ErrorKind::Interrupted => {},
-            other => return other
+            Err(ref e) if e.kind() == io::ErrorKind::Interrupted => {}
+            other => return other,
         }
     }
 }
 
 pub fn recv_msg_with_flags(
     socket: RawFd,
     bufs: &mut [&mut IoVec],
     cmsg: &mut [u8],
-    flags: libc::c_int
+    flags: libc::c_int,
 ) -> io::Result<(usize, usize, libc::c_int)> {
     let slice = iovec::as_os_slice_mut(bufs);
     let len = cmp::min(<libc::c_int>::max_value() as usize, slice.len());
     let (control, controllen) = if cmsg.is_empty() {
         (ptr::null_mut(), 0)
     } else {
         (cmsg.as_ptr() as *mut _, cmsg.len())
     };
@@ -51,17 +51,17 @@ pub fn recv_msg_with_flags(
     let controllen = msghdr.msg_controllen as usize;
     Ok((n, controllen, msghdr.msg_flags))
 }
 
 pub fn send_msg_with_flags(
     socket: RawFd,
     bufs: &[&IoVec],
     cmsg: &[u8],
-    flags: libc::c_int
+    flags: libc::c_int,
 ) -> io::Result<usize> {
     let slice = iovec::as_os_slice(bufs);
     let len = cmp::min(<libc::c_int>::max_value() as usize, slice.len());
     let (control, controllen) = if cmsg.is_empty() {
         (ptr::null_mut(), 0)
     } else {
         (cmsg.as_ptr() as *mut _, cmsg.len())
     };
@@ -69,12 +69,10 @@ pub fn send_msg_with_flags(
     let mut msghdr: libc::msghdr = unsafe { mem::zeroed() };
     msghdr.msg_name = ptr::null_mut();
     msghdr.msg_namelen = 0;
     msghdr.msg_iov = slice.as_ptr() as *mut _;
     msghdr.msg_iovlen = len as _;
     msghdr.msg_control = control;
     msghdr.msg_controllen = controllen as _;
 
-    cvt_r(|| unsafe {
-        libc::sendmsg(socket, &msghdr as *const _, flags)
-    })
+    cvt_r(|| unsafe { libc::sendmsg(socket, &msghdr as *const _, flags) })
 }
--- a/media/audioipc/audioipc/src/rpc/client/mod.rs
+++ b/media/audioipc/audioipc/src/rpc/client/mod.rs
@@ -48,28 +48,28 @@ use std::io;
 use tokio_core::reactor::Handle;
 
 mod proxy;
 
 pub use self::proxy::{ClientProxy, Response};
 
 pub fn bind_client<C>(
     transport: C::Transport,
-    handle: &Handle
+    handle: &Handle,
 ) -> proxy::ClientProxy<C::Request, C::Response>
 where
-    C: Client
+    C: Client,
 {
     let (tx, rx) = proxy::channel();
 
     let fut = {
         let handler = ClientHandler::<C> {
             transport: transport,
             requests: rx,
-            in_flight: VecDeque::with_capacity(32)
+            in_flight: VecDeque::with_capacity(32),
         };
         Driver::new(handler)
     };
 
     // Spawn the RPC driver into task
     handle.spawn(Box::new(fut.map_err(|_| ())));
 
     tx
@@ -87,43 +87,43 @@ pub trait Client: 'static {
         + Stream<Item = Self::Response, Error = io::Error>
         + Sink<SinkItem = Self::Request, SinkError = io::Error>;
 }
 
 ////////////////////////////////////////////////////////////////////////////////
 
 struct ClientHandler<C>
 where
-    C: Client
+    C: Client,
 {
     transport: C::Transport,
     requests: proxy::Receiver<C::Request, C::Response>,
-    in_flight: VecDeque<oneshot::Sender<C::Response>>
+    in_flight: VecDeque<oneshot::Sender<C::Response>>,
 }
 
 impl<C> Handler for ClientHandler<C>
 where
-    C: Client
+    C: Client,
 {
     type In = C::Response;
     type Out = C::Request;
     type Transport = C::Transport;
 
     fn transport(&mut self) -> &mut Self::Transport {
         &mut self.transport
     }
 
     fn consume(&mut self, response: Self::In) -> io::Result<()> {
         trace!("ClientHandler::consume");
         if let Some(complete) = self.in_flight.pop_front() {
             drop(complete.send(response));
         } else {
             return Err(io::Error::new(
                 io::ErrorKind::Other,
-                "request / response mismatch"
+                "request / response mismatch",
             ));
         }
 
         Ok(())
     }
 
     /// Produce a message
     fn produce(&mut self) -> Poll<Option<Self::Out>, io::Error> {
@@ -133,26 +133,26 @@ where
         match self.requests.poll() {
             Ok(Async::Ready(Some((request, complete)))) => {
                 trace!("  --> received request");
 
                 // Track complete handle
                 self.in_flight.push_back(complete);
 
                 Ok(Some(request).into())
-            },
+            }
             Ok(Async::Ready(None)) => {
                 trace!("  --> client dropped");
                 Ok(None.into())
-            },
+            }
             Ok(Async::NotReady) => {
                 trace!("  --> not ready");
                 Ok(Async::NotReady)
-            },
-            Err(_) => unreachable!()
+            }
+            Err(_) => unreachable!(),
         }
     }
 
     /// RPC currently in flight
     fn has_in_flight(&self) -> bool {
         !self.in_flight.is_empty()
     }
 }
--- a/media/audioipc/audioipc/src/rpc/client/proxy.rs
+++ b/media/audioipc/audioipc/src/rpc/client/proxy.rs
@@ -52,66 +52,62 @@ use std::io;
 /// client connection.
 pub type Request<R, Q> = (R, oneshot::Sender<Q>);
 
 /// Receive requests submitted to the client
 pub type Receiver<R, Q> = mpsc::UnboundedReceiver<Request<R, Q>>;
 
 /// Response future returned from a client
 pub struct Response<Q> {
-    inner: oneshot::Receiver<Q>
+    inner: oneshot::Receiver<Q>,
 }
 
 pub struct ClientProxy<R, Q> {
-    tx: mpsc::UnboundedSender<Request<R, Q>>
+    tx: mpsc::UnboundedSender<Request<R, Q>>,
 }
 
 impl<R, Q> Clone for ClientProxy<R, Q> {
     fn clone(&self) -> Self {
         ClientProxy {
-            tx: self.tx.clone()
+            tx: self.tx.clone(),
         }
     }
 }
 
 pub fn channel<R, Q>() -> (ClientProxy<R, Q>, Receiver<R, Q>) {
     // Create a channel to send the requests to client-side of rpc.
     let (tx, rx) = mpsc::unbounded();
 
     // Wrap the `tx` part in ClientProxy so the rpc call interface
     // can be implemented.
-    let client = ClientProxy {
-        tx
-    };
+    let client = ClientProxy { tx };
 
     (client, rx)
 }
 
 impl<R, Q> ClientProxy<R, Q> {
     pub fn call(&self, request: R) -> Response<Q> {
         // The response to returned from the rpc client task over a
         // oneshot channel.
         let (tx, rx) = oneshot::channel();
 
         // If send returns an Err, its because the other side has been dropped.
         // By ignoring it, we are just dropping the `tx`, which will mean the
         // rx will return Canceled when polled. In turn, that is translated
         // into a BrokenPipe, which conveys the proper error.
         let _ = self.tx.send((request, tx));
 
-        Response {
-            inner: rx
-        }
+        Response { inner: rx }
     }
 }
 
 impl<R, Q> fmt::Debug for ClientProxy<R, Q>
 where
     R: fmt::Debug,
-    Q: fmt::Debug
+    Q: fmt::Debug,
 {
     fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
         write!(f, "ClientProxy {{ ... }}")
     }
 }
 
 impl<Q> Future for Response<Q> {
     type Item = Q;
@@ -127,14 +123,14 @@ impl<Q> Future for Response<Q> {
                 Err(e)
             }
         }
     }
 }
 
 impl<Q> fmt::Debug for Response<Q>
 where
-    Q: fmt::Debug
+    Q: fmt::Debug,
 {
     fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
         write!(f, "Response {{ ... }}")
     }
 }
--- a/media/audioipc/audioipc/src/rpc/driver.rs
+++ b/media/audioipc/audioipc/src/rpc/driver.rs
@@ -5,38 +5,38 @@
 
 use futures::{Async, AsyncSink, Future, Poll, Sink, Stream};
 use rpc::Handler;
 use std::fmt;
 use std::io;
 
 pub struct Driver<T>
 where
-    T: Handler
+    T: Handler,
 {
     // Glue
     handler: T,
 
     // True as long as the connection has more request frames to read.
     run: bool,
 
     // True when the transport is fully flushed
-    is_flushed: bool
+    is_flushed: bool,
 }
 
 impl<T> Driver<T>
 where
-    T: Handler
+    T: Handler,
 {
     /// Create a new rpc driver with the given service and transport.
     pub fn new(handler: T) -> Driver<T> {
         Driver {
             handler: handler,
             run: true,
-            is_flushed: true
+            is_flushed: true,
         }
     }
 
     /// Returns true if the driver has nothing left to do
     fn is_done(&self) -> bool {
         !self.run && self.is_flushed && !self.has_in_flight()
     }
 
@@ -60,17 +60,17 @@ where
         match req {
             Some(message) => {
                 trace!("received message");
 
                 if let Err(e) = self.handler.consume(message) {
                     // TODO: Should handler be infalliable?
                     panic!("unimplemented error handling: {:?}", e);
                 }
-            },
+            }
             None => {
                 trace!("received None");
                 // At this point, we just return. This works
                 // because poll with be called again and go
                 // through the receive-cycle again.
                 self.run = false;
             }
         }
@@ -81,24 +81,24 @@ where
     /// Send outgoing messages to the transport.
     fn send_outgoing(&mut self) -> io::Result<()> {
         trace!("send_responses");
         loop {
             match try!(self.handler.produce()) {
                 Async::Ready(Some(message)) => {
                     trace!("  --> got message");
                     try!(self.process_outgoing(message));
-                },
+                }
                 Async::Ready(None) => {
                     trace!("  --> got None");
                     // The service is done with the connection.
                     break;
-                },
+                }
                 // Nothing to dispatch
-                Async::NotReady => break
+                Async::NotReady => break,
             }
         }
 
         Ok(())
     }
 
     fn process_outgoing(&mut self, message: T::Out) -> io::Result<()> {
         trace!("process_outgoing");
@@ -116,17 +116,17 @@ where
 
     fn has_in_flight(&self) -> bool {
         self.handler.has_in_flight()
     }
 }
 
 impl<T> Future for Driver<T>
 where
-    T: Handler
+    T: Handler,
 {
     type Item = ();
     type Error = io::Error;
 
     fn poll(&mut self) -> Poll<(), Self::Error> {
         trace!("rpc::Driver::tick");
 
         // First read off data from the socket
@@ -148,23 +148,23 @@ where
 }
 
 fn assert_send<S: Sink>(s: &mut S, item: S::SinkItem) -> Result<(), S::SinkError> {
     match try!(s.start_send(item)) {
         AsyncSink::Ready => Ok(()),
         AsyncSink::NotReady(_) => panic!(
             "sink reported itself as ready after `poll_ready` but was \
              then unable to accept a message"
-        )
+        ),
     }
 }
 
 impl<T> fmt::Debug for Driver<T>
 where
-    T: Handler + fmt::Debug
+    T: Handler + fmt::Debug,
 {
     fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
         f.debug_struct("rpc::Handler")
             .field("handler", &self.handler)
             .field("run", &self.run)
             .field("is_flushed", &self.is_flushed)
             .finish()
     }
--- a/media/audioipc/audioipc/src/rpc/server.rs
+++ b/media/audioipc/audioipc/src/rpc/server.rs
@@ -44,23 +44,23 @@ use rpc::Handler;
 use rpc::driver::Driver;
 use std::collections::VecDeque;
 use std::io;
 use tokio_core::reactor::Handle;
 
 /// Bind an async I/O object `io` to the `server`.
 pub fn bind_server<S>(transport: S::Transport, server: S, handle: &Handle)
 where
-    S: Server
+    S: Server,
 {
     let fut = {
         let handler = ServerHandler {
             server: server,
             transport: transport,
-            in_flight: VecDeque::with_capacity(32)
+            in_flight: VecDeque::with_capacity(32),
         };
         Driver::new(handler)
     };
 
     // Spawn the RPC driver into task
     handle.spawn(Box::new(fut.map_err(|_| ())))
 }
 
@@ -79,34 +79,33 @@ pub trait Server: 'static {
     type Transport: 'static
         + Stream<Item = Self::Request, Error = io::Error>
         + Sink<SinkItem = Self::Response, SinkError = io::Error>;
 
     /// Process the request and return the response asynchronously.
     fn process(&mut self, req: Self::Request) -> Self::Future;
 }
 
-
 ////////////////////////////////////////////////////////////////////////////////
 
 struct ServerHandler<S>
 where
-    S: Server
+    S: Server,
 {
     // The service handling the connection
     server: S,
     // The transport responsible for sending/receving messages over the wire
     transport: S::Transport,
     // FIFO of "in flight" responses to requests.
-    in_flight: VecDeque<InFlight<S::Future>>
+    in_flight: VecDeque<InFlight<S::Future>>,
 }
 
 impl<S> Handler for ServerHandler<S>
 where
-    S: Server
+    S: Server,
 {
     type In = S::Request;
     type Out = S::Response;
     type Transport = S::Transport;
 
     /// Mutable reference to the transport
     fn transport(&mut self) -> &mut Self::Transport {
         &mut self.transport
@@ -128,51 +127,51 @@ where
 
         // Make progress on pending responses
         for pending in &mut self.in_flight {
             pending.poll();
         }
 
         // Is the head of the queue ready?
         match self.in_flight.front() {
-            Some(&InFlight::Done(_)) => {},
+            Some(&InFlight::Done(_)) => {}
             _ => {
                 trace!("  --> not ready");
                 return Ok(Async::NotReady);
             }
         }
 
         // Return the ready response
         match self.in_flight.pop_front() {
             Some(InFlight::Done(res)) => {
                 trace!("  --> received response");
                 Ok(Async::Ready(Some(res)))
-            },
-            _ => panic!()
+            }
+            _ => panic!(),
         }
     }
 
     /// RPC currently in flight
     fn has_in_flight(&self) -> bool {
         !self.in_flight.is_empty()
     }
 }
 
 ////////////////////////////////////////////////////////////////////////////////
 
 enum InFlight<F: Future<Error = ()>> {
     Active(F),
-    Done(F::Item)
+    Done(F::Item),
 }
 
 impl<F: Future<Error = ()>> InFlight<F> {
     fn poll(&mut self) {
         let res = match *self {
             InFlight::Active(ref mut f) => match f.poll() {
                 Ok(Async::Ready(e)) => e,
                 Err(_) => unreachable!(),
-                Ok(Async::NotReady) => return
+                Ok(Async::NotReady) => return,
             },
-            _ => return
+            _ => return,
         };
         *self = InFlight::Done(res);
     }
 }
--- a/media/audioipc/audioipc/src/shm.rs
+++ b/media/audioipc/audioipc/src/shm.rs
@@ -1,35 +1,30 @@
 use errors::*;
 use memmap::{Mmap, MmapViewSync, Protection};
 use std::fs::{remove_file, File, OpenOptions};
 use std::path::Path;
 use std::sync::atomic;
 
 pub struct SharedMemReader {
-    mmap: Mmap
+    mmap: Mmap,
 }
 
 impl SharedMemReader {
     pub fn new(path: &Path, size: usize) -> Result<(SharedMemReader, File)> {
         let file = OpenOptions::new()
             .read(true)
             .write(true)
             .create_new(true)
             .open(path)?;
         let _ = remove_file(path);
         file.set_len(size as u64)?;
         let mmap = Mmap::open(&file, Protection::Read)?;
         assert_eq!(mmap.len(), size);
-        Ok((
-            SharedMemReader {
-                mmap
-            },
-            file
-        ))
+        Ok((SharedMemReader { mmap }, file))
     }
 
     pub fn read(&self, buf: &mut [u8]) -> Result<()> {
         if buf.is_empty() {
             return Ok(());
         }
         // TODO: Track how much is in the shm area.
         if buf.len() <= self.mmap.len() {
@@ -41,27 +36,25 @@ impl SharedMemReader {
             Ok(())
         } else {
             bail!("mmap size");
         }
     }
 }
 
 pub struct SharedMemSlice {
-    view: MmapViewSync
+    view: MmapViewSync,
 }
 
 impl SharedMemSlice {
     pub fn from(file: &File, size: usize) -> Result<SharedMemSlice> {
         let mmap = Mmap::open(file, Protection::Read)?;
         assert_eq!(mmap.len(), size);
         let view = mmap.into_view_sync();
-        Ok(SharedMemSlice {
-            view
-        })
+        Ok(SharedMemSlice { view })
     }
 
     pub fn get_slice(&self, size: usize) -> Result<&[u8]> {
         if size == 0 {
             return Ok(&[]);
         }
         // TODO: Track how much is in the shm area.
         if size <= self.view.len() {
@@ -74,41 +67,36 @@ impl SharedMemSlice {
     }
 
     /// Clones the view of the memory map.
     ///
     /// The underlying memory map is shared, and thus the caller must ensure that the memory
     /// underlying the view is not illegally aliased.
     pub unsafe fn clone_view(&self) -> Self {
         SharedMemSlice {
-            view: self.view.clone()
+            view: self.view.clone(),
         }
     }
 }
 
 pub struct SharedMemWriter {
-    mmap: Mmap
+    mmap: Mmap,
 }
 
 impl SharedMemWriter {
     pub fn new(path: &Path, size: usize) -> Result<(SharedMemWriter, File)> {
         let file = OpenOptions::new()
             .read(true)
             .write(true)
             .create_new(true)
             .open(path)?;
         let _ = remove_file(path);
         file.set_len(size as u64)?;
         let mmap = Mmap::open(&file, Protection::ReadWrite)?;
-        Ok((
-            SharedMemWriter {
-                mmap
-            },
-            file
-        ))
+        Ok((SharedMemWriter { mmap }, file))
     }
 
     pub fn write(&mut self, buf: &[u8]) -> Result<()> {
         if buf.is_empty() {
             return Ok(());
         }
         // TODO: Track how much is in the shm area.
         if buf.len() <= self.mmap.len() {
@@ -119,27 +107,25 @@ impl SharedMemWriter {
             Ok(())
         } else {
             bail!("mmap size");
         }
     }
 }
 
 pub struct SharedMemMutSlice {
-    view: MmapViewSync
+    view: MmapViewSync,
 }
 
 impl SharedMemMutSlice {
     pub fn from(file: &File, size: usize) -> Result<SharedMemMutSlice> {
         let mmap = Mmap::open(file, Protection::ReadWrite)?;
         assert_eq!(mmap.len(), size);
         let view = mmap.into_view_sync();
-        Ok(SharedMemMutSlice {
-            view
-        })
+        Ok(SharedMemMutSlice { view })
     }
 
     pub fn get_mut_slice(&mut self, size: usize) -> Result<&mut [u8]> {
         if size == 0 {
             return Ok(&mut []);
         }
         // TODO: Track how much is in the shm area.
         if size <= self.view.len() {
@@ -153,12 +139,12 @@ impl SharedMemMutSlice {
 
     /// Clones the view of the memory map.
     ///
     /// The underlying memory map is shared, and thus the caller must
     /// ensure that the memory underlying the view is not illegally
     /// aliased.
     pub unsafe fn clone_view(&self) -> Self {
         SharedMemMutSlice {
-            view: self.view.clone()
+            view: self.view.clone(),
         }
     }
 }
--- a/media/audioipc/client/Cargo.toml
+++ b/media/audioipc/client/Cargo.toml
@@ -1,21 +1,21 @@
 [package]
 name = "audioipc-client"
-version = "0.2.0"
+version = "0.3.0"
 authors = [
         "Matthew Gregan <kinetik@flim.org>",
         "Dan Glastonbury <dan.glastonbury@gmail.com>"
         ]
 description = "Cubeb Backend for talking to remote cubeb server."
 
 [dependencies]
 audioipc = { path="../audioipc" }
-cubeb-backend = { path = "../../cubeb-rs/cubeb-backend" }
-cubeb-core = { path = "../../cubeb-rs/cubeb-core" }
+cubeb-backend = "0.4"
+foreign-types = "0.3"
 # rayon-core in Gecko uses futures 0.1.13
 futures = { version="=0.1.13", default-features=false, features=["use_std"] }
 # futures-cpupool 0.1.5 matches futures 0.1.13
 futures-cpupool = { version="=0.1.5", default-features=false }
 libc = "0.2"
 log = "^0.3.6"
 tokio-core = "0.1"
 tokio-uds = "0.1.7"
\ No newline at end of file
--- a/media/audioipc/client/src/context.rs
+++ b/media/audioipc/client/src/context.rs
@@ -4,56 +4,54 @@
 // accompanying file LICENSE for details
 
 use ClientStream;
 use assert_not_in_callback;
 use audioipc::{messages, ClientMessage, ServerMessage};
 use audioipc::{core, rpc};
 use audioipc::codec::LengthDelimitedCodec;
 use audioipc::fd_passing::{framed_with_fds, FramedWithFds};
-use cubeb_backend::{Context, Ops};
-use cubeb_core::{ffi, DeviceId, DeviceType, Error, Result, StreamParams};
-use cubeb_core::binding::Binding;
+use cubeb_backend::{ffi, ChannelLayout, Context, ContextOps, DeviceCollectionRef, DeviceId,
+                    DeviceType, Error, Ops, Result, Stream, StreamParams, StreamParamsRef};
 use futures::Future;
 use futures_cpupool::{self, CpuPool};
 use libc;
-use std::{fmt, io, mem};
+use std::{fmt, io, mem, ptr};
 use std::ffi::{CStr, CString};
 use std::os::raw::c_void;
 use std::os::unix::io::FromRawFd;
 use std::os::unix::net;
 use std::sync::mpsc;
 use stream;
 use tokio_core::reactor::{Handle, Remote};
 use tokio_uds::UnixStream;
 
 struct CubebClient;
 
 impl rpc::Client for CubebClient {
     type Request = ServerMessage;
     type Response = ClientMessage;
-    type Transport =
-        FramedWithFds<UnixStream, LengthDelimitedCodec<Self::Request, Self::Response>>;
+    type Transport = FramedWithFds<UnixStream, LengthDelimitedCodec<Self::Request, Self::Response>>;
 }
 
 macro_rules! t(
     ($e:expr) => (
         match $e {
             Ok(e) => e,
             Err(_) => return Err(Error::default())
         }
     ));
 
 pub const CLIENT_OPS: Ops = capi_new!(ClientContext, ClientStream);
 
 pub struct ClientContext {
     _ops: *const Ops,
     rpc: rpc::ClientProxy<ServerMessage, ClientMessage>,
     core: core::CoreThread,
-    cpu_pool: CpuPool
+    cpu_pool: CpuPool,
 }
 
 impl ClientContext {
     #[doc(hidden)]
     pub fn remote(&self) -> Remote {
         self.core.remote()
     }
 
@@ -74,192 +72,202 @@ fn open_server_stream() -> Result<net::U
         if let Some(fd) = super::G_SERVER_FD {
             return Ok(net::UnixStream::from_raw_fd(fd));
         }
 
         Err(Error::default())
     }
 }
 
-impl Context for ClientContext {
-    fn init(_context_name: Option<&CStr>) -> Result<*mut ffi::cubeb> {
+impl ContextOps for ClientContext {
+    fn init(_context_name: Option<&CStr>) -> Result<Context> {
         fn bind_and_send_client(
             stream: UnixStream,
             handle: &Handle,
-            tx_rpc: &mpsc::Sender<rpc::ClientProxy<ServerMessage, ClientMessage>>
+            tx_rpc: &mpsc::Sender<rpc::ClientProxy<ServerMessage, ClientMessage>>,
         ) -> Option<()> {
             let transport = framed_with_fds(stream, Default::default());
             let rpc = rpc::bind_client::<CubebClient>(transport, handle);
             // If send fails then the rx end has closed
             // which is unlikely here.
             let _ = tx_rpc.send(rpc);
             Some(())
         }
 
         assert_not_in_callback();
 
         let (tx_rpc, rx_rpc) = mpsc::channel();
 
         let core = t!(core::spawn_thread("AudioIPC Client RPC", move || {
             let handle = core::handle();
 
-            open_server_stream().ok()
+            open_server_stream()
+                .ok()
                 .and_then(|stream| UnixStream::from_stream(stream, &handle).ok())
                 .and_then(|stream| bind_and_send_client(stream, &handle, &tx_rpc))
-                .ok_or_else(|| io::Error::new(
-                    io::ErrorKind::Other,
-                    "Failed to open stream and create rpc."
-                ))
+                .ok_or_else(|| {
+                    io::Error::new(
+                        io::ErrorKind::Other,
+                        "Failed to open stream and create rpc.",
+                    )
+                })
         }));
 
         let rpc = t!(rx_rpc.recv());
 
         let cpupool = futures_cpupool::Builder::new()
             .name_prefix("AudioIPC")
             .create();
 
         let ctx = Box::new(ClientContext {
             _ops: &CLIENT_OPS as *const _,
             rpc: rpc,
             core: core,
-            cpu_pool: cpupool
+            cpu_pool: cpupool,
         });
-        Ok(Box::into_raw(ctx) as *mut _)
+        Ok(unsafe { Context::from_ptr(Box::into_raw(ctx) as *mut _) })
     }
 
-    fn backend_id(&self) -> &'static CStr {
+    fn backend_id(&mut self) -> &'static CStr {
         assert_not_in_callback();
         unsafe { CStr::from_ptr(b"remote\0".as_ptr() as *const _) }
     }
 
-    fn max_channel_count(&self) -> Result<u32> {
+    fn max_channel_count(&mut self) -> Result<u32> {
         assert_not_in_callback();
         send_recv!(self.rpc(), ContextGetMaxChannelCount => ContextMaxChannelCount())
     }
 
-    fn min_latency(&self, params: &StreamParams) -> Result<u32> {
+    fn min_latency(&mut self, params: StreamParams) -> Result<u32> {
         assert_not_in_callback();
-        let params = messages::StreamParams::from(unsafe { &*params.raw() });
+        let params = messages::StreamParams::from(params.as_ref());
         send_recv!(self.rpc(), ContextGetMinLatency(params) => ContextMinLatency())
     }
 
-    fn preferred_sample_rate(&self) -> Result<u32> {
+    fn preferred_sample_rate(&mut self) -> Result<u32> {
         assert_not_in_callback();
         send_recv!(self.rpc(), ContextGetPreferredSampleRate => ContextPreferredSampleRate())
     }
 
-    fn preferred_channel_layout(&self) -> Result<ffi::cubeb_channel_layout> {
+    fn preferred_channel_layout(&mut self) -> Result<ChannelLayout> {
         assert_not_in_callback();
         send_recv!(self.rpc(),
                    ContextGetPreferredChannelLayout => ContextPreferredChannelLayout())
+            .map(|l| {
+            ChannelLayout::from(l)
+        })
     }
 
-    fn enumerate_devices(&self, devtype: DeviceType) -> Result<ffi::cubeb_device_collection> {
+    fn enumerate_devices(
+        &mut self,
+        devtype: DeviceType,
+        collection: &DeviceCollectionRef,
+    ) -> Result<()> {
         assert_not_in_callback();
         let v: Vec<ffi::cubeb_device_info> = match send_recv!(self.rpc(),
                              ContextGetDeviceEnumeration(devtype.bits()) =>
                              ContextEnumeratedDevices())
         {
             Ok(mut v) => v.drain(..).map(|i| i.into()).collect(),
-            Err(e) => return Err(e)
+            Err(e) => return Err(e),
         };
-        let vs = v.into_boxed_slice();
-        let coll = ffi::cubeb_device_collection {
-            count: vs.len(),
-            device: vs.as_ptr()
-        };
+        let mut vs = v.into_boxed_slice();
+        let coll = unsafe { &mut *collection.as_ptr() };
+        coll.device = vs.as_mut_ptr();
+        coll.count = vs.len();
         // Giving away the memory owned by vs.  Don't free it!
         // Reclaimed in `device_collection_destroy`.
         mem::forget(vs);
-        Ok(coll)
+        Ok(())
     }
 
-    fn device_collection_destroy(&self, collection: *mut ffi::cubeb_device_collection) {
+    fn device_collection_destroy(&mut self, collection: &mut DeviceCollectionRef) -> Result<()> {
         assert_not_in_callback();
         unsafe {
-            let coll = &*collection;
+            let coll = &mut *collection.as_ptr();
             let mut devices = Vec::from_raw_parts(
                 coll.device as *mut ffi::cubeb_device_info,
                 coll.count,
-                coll.count
+                coll.count,
             );
             for dev in &mut devices {
                 if !dev.device_id.is_null() {
                     let _ = CString::from_raw(dev.device_id as *mut _);
                 }
                 if !dev.group_id.is_null() {
                     let _ = CString::from_raw(dev.group_id as *mut _);
                 }
                 if !dev.vendor_name.is_null() {
                     let _ = CString::from_raw(dev.vendor_name as *mut _);
                 }
                 if !dev.friendly_name.is_null() {
                     let _ = CString::from_raw(dev.friendly_name as *mut _);
                 }
             }
+            coll.device = ptr::null_mut();
+            coll.count = 0;
+            Ok(())
         }
     }
 
     fn stream_init(
-        &self,
+        &mut self,
         stream_name: Option<&CStr>,
         input_device: DeviceId,
-        input_stream_params: Option<&ffi::cubeb_stream_params>,
+        input_stream_params: Option<&StreamParamsRef>,
         output_device: DeviceId,
-        output_stream_params: Option<&ffi::cubeb_stream_params>,
+        output_stream_params: Option<&StreamParamsRef>,
         latency_frame: u32,
         // These params aren't sent to the server
         data_callback: ffi::cubeb_data_callback,
         state_callback: ffi::cubeb_state_callback,
-        user_ptr: *mut c_void
-    ) -> Result<*mut ffi::cubeb_stream> {
+        user_ptr: *mut c_void,
+    ) -> Result<Stream> {
         assert_not_in_callback();
 
-        fn opt_stream_params(
-            p: Option<&ffi::cubeb_stream_params>
-        ) -> Option<messages::StreamParams> {
+        fn opt_stream_params(p: Option<&StreamParamsRef>) -> Option<messages::StreamParams> {
             match p {
-                Some(raw) => Some(messages::StreamParams::from(raw)),
-                None => None
+                Some(p) => Some(messages::StreamParams::from(p)),
+                None => None,
             }
         }
 
         let stream_name = match stream_name {
             Some(s) => Some(s.to_bytes().to_vec()),
-            None => None
+            None => None,
         };
 
         let input_stream_params = opt_stream_params(input_stream_params);
         let output_stream_params = opt_stream_params(output_stream_params);
 
         let init_params = messages::StreamInitParams {
             stream_name: stream_name,
-            input_device: input_device.raw() as _,
+            input_device: input_device as usize,
             input_stream_params: input_stream_params,
-            output_device: output_device.raw() as _,
+            output_device: output_device as usize,
             output_stream_params: output_stream_params,
-            latency_frames: latency_frame
+            latency_frames: latency_frame,
         };
         stream::init(self, init_params, data_callback, state_callback, user_ptr)
     }
 
     fn register_device_collection_changed(
-        &self,
+        &mut self,
         _dev_type: DeviceType,
         _collection_changed_callback: ffi::cubeb_device_collection_changed_callback,
-        _user_ptr: *mut c_void
+        _user_ptr: *mut c_void,
     ) -> Result<()> {
         assert_not_in_callback();
         Ok(())
     }
 }
 
 impl Drop for ClientContext {
     fn drop(&mut self) {
-        info!("ClientContext drop...");
+        debug!("ClientContext drop...");
         let _ = send_recv!(self.rpc(), ClientDisconnect => ClientDisconnected);
         unsafe {
             if super::G_SERVER_FD.is_some() {
                 libc::close(super::G_SERVER_FD.take().unwrap());
             }
         }
     }
 }
--- a/media/audioipc/client/src/lib.rs
+++ b/media/audioipc/client/src/lib.rs
@@ -1,33 +1,32 @@
 // Copyright © 2017 Mozilla Foundation
 //
 // This program is made available under an ISC-style license.  See the
 // accompanying file LICENSE for details.
 
 extern crate audioipc;
 #[macro_use]
 extern crate cubeb_backend;
-extern crate cubeb_core;
+extern crate foreign_types;
 extern crate futures;
 extern crate futures_cpupool;
 extern crate libc;
 #[macro_use]
 extern crate log;
 extern crate tokio_core;
 extern crate tokio_uds;
 
 #[macro_use]
 mod send_recv;
 mod context;
 mod stream;
 
 use context::ClientContext;
-use cubeb_backend::capi;
-use cubeb_core::ffi;
+use cubeb_backend::{capi, ffi};
 use std::os::raw::{c_char, c_int};
 use std::os::unix::io::RawFd;
 use stream::ClientStream;
 
 thread_local!(static IN_CALLBACK: std::cell::RefCell<bool> = std::cell::RefCell::new(false));
 
 fn set_in_callback(in_callback: bool) {
     IN_CALLBACK.with(|b| {
@@ -44,17 +43,17 @@ fn assert_not_in_callback() {
 
 static mut G_SERVER_FD: Option<RawFd> = None;
 
 #[no_mangle]
 /// Entry point from C code.
 pub unsafe extern "C" fn audioipc_client_init(
     c: *mut *mut ffi::cubeb,
     context_name: *const c_char,
-    server_connection: c_int
+    server_connection: c_int,
 ) -> c_int {
     // TODO: Windows portability (for fd).
     // TODO: Better way to pass extra parameters to Context impl.
     if G_SERVER_FD.is_some() {
         panic!("audioipc client's server connection already initialized.");
     }
     if server_connection >= 0 {
         G_SERVER_FD = Some(server_connection);
--- a/media/audioipc/client/src/send_recv.rs
+++ b/media/audioipc/client/src/send_recv.rs
@@ -1,19 +1,19 @@
-use cubeb_core::Error;
-use cubeb_core::ffi;
+use cubeb_backend::Error;
+use std::os::raw::c_int;
 
 #[doc(hidden)]
 pub fn _err<E>(e: E) -> Error
 where
-    E: Into<Option<ffi::cubeb_error_code>>
+    E: Into<Option<c_int>>,
 {
     match e.into() {
         Some(e) => unsafe { Error::from_raw(e) },
-        None => Error::new()
+        None => Error::error(),
     }
 }
 
 #[macro_export]
 macro_rules! send_recv {
     ($rpc:expr, $smsg:ident => $rmsg:ident) => {{
         let resp = send_recv!(__send $rpc, $smsg);
         send_recv!(__recv resp, $rmsg)
--- a/media/audioipc/client/src/stream.rs
+++ b/media/audioipc/client/src/stream.rs
@@ -5,116 +5,135 @@
 
 use {assert_not_in_callback, set_in_callback};
 use ClientContext;
 use audioipc::codec::LengthDelimitedCodec;
 use audioipc::frame::{framed, Framed};
 use audioipc::messages::{self, CallbackReq, CallbackResp, ClientMessage, ServerMessage};
 use audioipc::rpc;
 use audioipc::shm::{SharedMemMutSlice, SharedMemSlice};
-use cubeb_backend::Stream;
-use cubeb_core::{ffi, Result};
+use cubeb_backend::{ffi, DeviceRef, Error, Result, Stream, StreamOps};
 use futures::Future;
 use futures_cpupool::{CpuFuture, CpuPool};
 use std::ffi::CString;
 use std::fs::File;
 use std::os::raw::c_void;
 use std::os::unix::io::FromRawFd;
 use std::os::unix::net;
 use std::ptr;
 use std::sync::mpsc;
 use tokio_uds::UnixStream;
 
 // TODO: Remove and let caller allocate based on cubeb backend requirements.
 const SHM_AREA_SIZE: usize = 2 * 1024 * 1024;
 
+pub struct Device(ffi::cubeb_device);
+
+impl Drop for Device {
+    fn drop(&mut self) {
+        unsafe {
+            if !self.0.input_name.is_null() {
+                let _ = CString::from_raw(self.0.input_name as *mut _);
+            }
+            if !self.0.output_name.is_null() {
+                let _ = CString::from_raw(self.0.output_name as *mut _);
+            }
+        }
+    }
+}
+
+#[derive(Debug)]
 pub struct ClientStream<'ctx> {
-    // This must be a reference to Context for cubeb, cubeb accesses stream methods via stream->context->ops
+    // This must be a reference to Context for cubeb, cubeb accesses
+    // stream methods via stream->context->ops
     context: &'ctx ClientContext,
-    token: usize
+    token: usize,
 }
 
 struct CallbackServer {
     input_shm: SharedMemSlice,
     output_shm: SharedMemMutSlice,
     data_cb: ffi::cubeb_data_callback,
     state_cb: ffi::cubeb_state_callback,
     user_ptr: usize,
-    cpu_pool: CpuPool
+    cpu_pool: CpuPool,
 }
 
 impl rpc::Server for CallbackServer {
     type Request = CallbackReq;
     type Response = CallbackResp;
     type Future = CpuFuture<Self::Response, ()>;
     type Transport = Framed<UnixStream, LengthDelimitedCodec<Self::Response, Self::Request>>;
 
     fn process(&mut self, req: Self::Request) -> Self::Future {
         match req {
             CallbackReq::Data(nframes, frame_size) => {
-                info!(
+                debug!(
                     "stream_thread: Data Callback: nframes={} frame_size={}",
-                    nframes,
-                    frame_size
+                    nframes, frame_size
                 );
 
                 // Clone values that need to be moved into the cpu pool thread.
                 let input_shm = unsafe { self.input_shm.clone_view() };
                 let mut output_shm = unsafe { self.output_shm.clone_view() };
                 let user_ptr = self.user_ptr;
-                let cb = self.data_cb;
+                let cb = self.data_cb.unwrap();
 
                 self.cpu_pool.spawn_fn(move || {
                     // TODO: This is proof-of-concept. Make it better.
                     let input_ptr: *const u8 = input_shm
                         .get_slice(nframes as usize * frame_size)
                         .unwrap()
                         .as_ptr();
                     let output_ptr: *mut u8 = output_shm
                         .get_mut_slice(nframes as usize * frame_size)
                         .unwrap()
                         .as_mut_ptr();
 
                     set_in_callback(true);
-                    let nframes = cb(
-                        ptr::null_mut(),
-                        user_ptr as *mut c_void,
-                        input_ptr as *const _,
-                        output_ptr as *mut _,
-                        nframes as _
-                    );
+                    let nframes = unsafe {
+                        cb(
+                            ptr::null_mut(),
+                            user_ptr as *mut c_void,
+                            input_ptr as *const _,
+                            output_ptr as *mut _,
+                            nframes as _,
+                        )
+                    };
                     set_in_callback(false);
 
                     Ok(CallbackResp::Data(nframes as isize))
                 })
-            },
+            }
             CallbackReq::State(state) => {
-                info!("stream_thread: State Callback: {:?}", state);
+                debug!("stream_thread: State Callback: {:?}", state);
                 let user_ptr = self.user_ptr;
-                let cb = self.state_cb;
+                let cb = self.state_cb.unwrap();
                 self.cpu_pool.spawn_fn(move || {
                     set_in_callback(true);
-                    cb(ptr::null_mut(), user_ptr as *mut _, state);
+                    unsafe {
+                        cb(ptr::null_mut(), user_ptr as *mut _, state);
+                    }
                     set_in_callback(false);
 
                     Ok(CallbackResp::State)
                 })
             }
         }
     }
 }
 
 impl<'ctx> ClientStream<'ctx> {
     fn init(
         ctx: &'ctx ClientContext,
         init_params: messages::StreamInitParams,
         data_callback: ffi::cubeb_data_callback,
         state_callback: ffi::cubeb_state_callback,
-        user_ptr: *mut c_void
-    ) -> Result<*mut ffi::cubeb_stream> {
+        user_ptr: *mut c_void,
+    ) -> Result<Stream> {
         assert_not_in_callback();
 
         let rpc = ctx.rpc();
         let data = try!(send_recv!(rpc, StreamInit(init_params) => StreamCreated()));
 
         trace!("token = {}, fds = {:?}", data.token, data.fds);
 
         let stm = data.fds[0];
@@ -133,124 +152,121 @@ impl<'ctx> ClientStream<'ctx> {
         let cpu_pool = ctx.cpu_pool();
 
         let server = CallbackServer {
             input_shm: input_shm,
             output_shm: output_shm,
             data_cb: data_callback,
             state_cb: state_callback,
             user_ptr: user_data,
-            cpu_pool: cpu_pool
+            cpu_pool: cpu_pool,
         };
 
         let (wait_tx, wait_rx) = mpsc::channel();
         ctx.remote().spawn(move |handle| {
             let stream = UnixStream::from_stream(stream, handle).unwrap();
             let transport = framed(stream, Default::default());
             rpc::bind_server(transport, server, handle);
             wait_tx.send(()).unwrap();
             Ok(())
         });
         wait_rx.recv().unwrap();
 
-        Ok(Box::into_raw(Box::new(ClientStream {
+        let stream = Box::into_raw(Box::new(ClientStream {
             context: ctx,
-            token: data.token
-        })) as _)
+            token: data.token,
+        }));
+        Ok(unsafe { Stream::from_ptr(stream as *mut _) })
     }
 }
 
 impl<'ctx> Drop for ClientStream<'ctx> {
     fn drop(&mut self) {
         trace!("ClientStream drop...");
         let rpc = self.context.rpc();
         let _ = send_recv!(rpc, StreamDestroy(self.token) => StreamDestroyed);
     }
 }
 
-impl<'ctx> Stream for ClientStream<'ctx> {
-    fn start(&self) -> Result<()> {
+impl<'ctx> StreamOps for ClientStream<'ctx> {
+    fn start(&mut self) -> Result<()> {
         assert_not_in_callback();
         let rpc = self.context.rpc();
         send_recv!(rpc, StreamStart(self.token) => StreamStarted)
     }
 
-    fn stop(&self) -> Result<()> {
+    fn stop(&mut self) -> Result<()> {
         assert_not_in_callback();
         let rpc = self.context.rpc();
         send_recv!(rpc, StreamStop(self.token) => StreamStopped)
     }
 
-    fn reset_default_device(&self) -> Result<()> {
+    fn reset_default_device(&mut self) -> Result<()> {
         assert_not_in_callback();
         let rpc = self.context.rpc();
         send_recv!(rpc, StreamResetDefaultDevice(self.token) => StreamDefaultDeviceReset)
     }
 
-    fn position(&self) -> Result<u64> {
+    fn position(&mut self) -> Result<u64> {
         assert_not_in_callback();
         let rpc = self.context.rpc();
         send_recv!(rpc, StreamGetPosition(self.token) => StreamPosition())
     }
 
-    fn latency(&self) -> Result<u32> {
+    fn latency(&mut self) -> Result<u32> {
         assert_not_in_callback();
         let rpc = self.context.rpc();
         send_recv!(rpc, StreamGetLatency(self.token) => StreamLatency())
     }
 
-    fn set_volume(&self, volume: f32) -> Result<()> {
+    fn set_volume(&mut self, volume: f32) -> Result<()> {
         assert_not_in_callback();
         let rpc = self.context.rpc();
         send_recv!(rpc, StreamSetVolume(self.token, volume) => StreamVolumeSet)
     }
 
-    fn set_panning(&self, panning: f32) -> Result<()> {
+    fn set_panning(&mut self, panning: f32) -> Result<()> {
         assert_not_in_callback();
         let rpc = self.context.rpc();
         send_recv!(rpc, StreamSetPanning(self.token, panning) => StreamPanningSet)
     }
 
-    fn current_device(&self) -> Result<*const ffi::cubeb_device> {
+    fn current_device(&mut self) -> Result<&DeviceRef> {
         assert_not_in_callback();
         let rpc = self.context.rpc();
         match send_recv!(rpc, StreamGetCurrentDevice(self.token) => StreamCurrentDevice()) {
-            Ok(d) => Ok(Box::into_raw(Box::new(d.into()))),
-            Err(e) => Err(e)
+            Ok(d) => Ok(unsafe { DeviceRef::from_ptr(Box::into_raw(Box::new(d.into()))) }),
+            Err(e) => Err(e),
         }
     }
 
-    fn device_destroy(&self, device: *const ffi::cubeb_device) -> Result<()> {
+    fn device_destroy(&mut self, device: &DeviceRef) -> Result<()> {
         assert_not_in_callback();
         // It's all unsafe...
-        if !device.is_null() {
+        if device.as_ptr().is_null() {
+            Err(Error::error())
+        } else {
             unsafe {
-                if !(*device).output_name.is_null() {
-                    let _ = CString::from_raw((*device).output_name as *mut _);
-                }
-                if !(*device).input_name.is_null() {
-                    let _ = CString::from_raw((*device).input_name as *mut _);
-                }
-                let _: Box<ffi::cubeb_device> = Box::from_raw(device as *mut _);
+                let _: Box<Device> = Box::from_raw(device.as_ptr() as *mut _);
             }
+            Ok(())
         }
-        Ok(())
     }
 
     // TODO: How do we call this back? On what thread?
     fn register_device_changed_callback(
-        &self,
-        _device_changed_callback: ffi::cubeb_device_changed_callback
+        &mut self,
+        _device_changed_callback: ffi::cubeb_device_changed_callback,
     ) -> Result<()> {
         assert_not_in_callback();
         Ok(())
     }
 }
 
 pub fn init(
     ctx: &ClientContext,
     init_params: messages::StreamInitParams,
     data_callback: ffi::cubeb_data_callback,
     state_callback: ffi::cubeb_state_callback,
-    user_ptr: *mut c_void
-) -> Result<*mut ffi::cubeb_stream> {
+    user_ptr: *mut c_void,
+) -> Result<Stream> {
     ClientStream::init(ctx, init_params, data_callback, state_callback, user_ptr)
 }
--- a/media/audioipc/gecko.patch
+++ b/media/audioipc/gecko.patch
@@ -7,54 +7,9 @@ Subject: gecko: Change paths to vendored
 diff --git a/media/audioipc/Cargo.toml b/media/audioipc/Cargo.toml
 index ede6064..d0a1979 100644
 --- a/media/audioipc/Cargo.toml
 +++ b/media/audioipc/Cargo.toml
 @@ -1,2 +1,2 @@
  [workspace]
 -members = ["audioipc", "client", "server", "ipctest"]
 +members = ["audioipc", "client", "server"]
-diff --git a/audioipc/Cargo.toml b/audioipc/Cargo.toml
-index 669c6ff..308cb5c 100644
---- a/media/audioipc/audioipc/Cargo.toml
-+++ b/media/audioipc/audioipc/Cargo.toml
-@@ -8,7 +8,7 @@ authors = [
- description = "Remote Cubeb IPC"
- 
- [dependencies]
--cubeb-core = { git = "https://github.com/djg/cubeb-rs", version="^0.1" }
-+cubeb-core = { path = "../../cubeb-rs/cubeb-core" }
- bincode = "0.8"
- bytes = "0.4"
- # rayon-core in Gecko uses futures 0.1.13
-diff --git a/client/Cargo.toml b/client/Cargo.toml
-index c81b19a..9e3f8a5 100644
---- a/media/audioipc/client/Cargo.toml
-+++ b/media/audioipc/client/Cargo.toml
-@@ -9,8 +9,8 @@ description = "Cubeb Backend for talking to remote cubeb server."
- 
- [dependencies]
- audioipc = { path="../audioipc" }
--cubeb-backend = { git="https://github.com/djg/cubeb-rs", version="^0.2" }
--cubeb-core = { git="https://github.com/djg/cubeb-rs", version="^0.1" }
-+cubeb-backend = { path = "../../cubeb-rs/cubeb-backend" }
-+cubeb-core = { path = "../../cubeb-rs/cubeb-core" }
- # rayon-core in Gecko uses futures 0.1.13
- futures = { version="=0.1.13", default-features=false, features=["use_std"] }
- # futures-cpupool 0.1.5 matches futures 0.1.13
-diff --git a/server/Cargo.toml b/server/Cargo.toml
-index 5b79b83..01463be 100644
---- a/media/audioipc/server/Cargo.toml
-+++ b/media/audioipc/server/Cargo.toml
-@@ -9,8 +9,8 @@ description = "Remote cubeb server"
- 
- [dependencies]
- audioipc = { path = "../audioipc" }
--cubeb-core = { git = "https://github.com/djg/cubeb-rs", version="^0.1" }
--cubeb = { git = "https://github.com/djg/cubeb-rs", version="^0.3.2" }
-+cubeb-core = { path = "../../cubeb-rs/cubeb-core" }
-+cubeb = { path = "../../cubeb-rs/cubeb-api" }
- bytes = "0.4"
- lazycell = "^0.4"
- libc = "0.2"
--- 
-2.10.2
 
--- a/media/audioipc/server/Cargo.toml
+++ b/media/audioipc/server/Cargo.toml
@@ -4,18 +4,17 @@ version = "0.2.1"
 authors = [
         "Matthew Gregan <kinetik@flim.org>",
         "Dan Glastonbury <dan.glastonbury@gmail.com>"
         ]
 description = "Remote cubeb server"
 
 [dependencies]
 audioipc = { path = "../audioipc" }
-cubeb-core = { path = "../../cubeb-rs/cubeb-core" }
-cubeb = { path = "../../cubeb-rs/cubeb-api" }
+cubeb = "0.4"
 bytes = "0.4"
 lazycell = "^0.4"
 libc = "0.2"
 log = "^0.3.6"
 slab = "0.3.0"
 # rayon-core in Gecko uses futures 0.1.13
 futures = "=0.1.13"
 tokio-core = "0.1"
--- a/media/audioipc/server/src/lib.rs
+++ b/media/audioipc/server/src/lib.rs
@@ -2,34 +2,31 @@
 extern crate error_chain;
 
 #[macro_use]
 extern crate log;
 
 extern crate audioipc;
 extern crate bytes;
 extern crate cubeb;
-extern crate cubeb_core;
 extern crate futures;
 extern crate lazycell;
 extern crate libc;
 extern crate slab;
 extern crate tokio_core;
 extern crate tokio_uds;
 
 use audioipc::codec::LengthDelimitedCodec;
 use audioipc::core;
 use audioipc::fd_passing::{framed_with_fds, FramedWithFds};
 use audioipc::frame::{framed, Framed};
-use audioipc::messages::{CallbackReq, CallbackResp, ClientMessage, DeviceInfo, ServerMessage,
-                         StreamCreate, StreamInitParams, StreamParams};
+use audioipc::messages::{CallbackReq, CallbackResp, ClientMessage, Device, DeviceInfo,
+                         ServerMessage, StreamCreate, StreamInitParams, StreamParams};
 use audioipc::rpc;
 use audioipc::shm::{SharedMemReader, SharedMemWriter};
-use cubeb_core::binding::Binding;
-use cubeb_core::ffi;
 use futures::Future;
 use futures::future::{self, FutureResult};
 use futures::sync::oneshot;
 use std::{ptr, slice};
 use std::cell::RefCell;
 use std::convert::From;
 use std::error::Error;
 use std::os::raw::c_void;
@@ -39,35 +36,36 @@ use tokio_core::reactor::Remote;
 use tokio_uds::UnixStream;
 
 pub mod errors {
     error_chain! {
         links {
             AudioIPC(::audioipc::errors::Error, ::audioipc::errors::ErrorKind);
         }
         foreign_links {
-            Cubeb(::cubeb_core::Error);
+            Cubeb(::cubeb::Error);
             Io(::std::io::Error);
             Canceled(::futures::sync::oneshot::Canceled);
         }
     }
 }
 
 use errors::*;
 
-thread_local!(static CONTEXT_KEY: RefCell<Option<cubeb::Result<cubeb::Context>>> = RefCell::new(None));
+type ContextKey = RefCell<Option<cubeb::Result<cubeb::Context>>>;
+thread_local!(static CONTEXT_KEY:ContextKey = RefCell::new(None));
 
 fn with_local_context<T, F>(f: F) -> T
 where
-    F: FnOnce(&cubeb::Result<cubeb::Context>) -> T
+    F: FnOnce(&cubeb::Result<cubeb::Context>) -> T,
 {
     CONTEXT_KEY.with(|k| {
         let mut context = k.borrow_mut();
         if context.is_none() {
-            *context = Some(cubeb::Context::init("AudioIPC Server", None));
+            *context = Some(cubeb::init("AudioIPC Server"));
         }
         f(context.as_ref().unwrap())
     })
 }
 
 // TODO: Remove and let caller allocate based on cubeb backend requirements.
 const SHM_AREA_SIZE: usize = 2 * 1024 * 1024;
 
@@ -77,130 +75,56 @@ const STREAM_CONN_CHUNK_SIZE: usize = 64
 struct CallbackClient;
 
 impl rpc::Client for CallbackClient {
     type Request = CallbackReq;
     type Response = CallbackResp;
     type Transport = Framed<UnixStream, LengthDelimitedCodec<Self::Request, Self::Response>>;
 }
 
-// TODO: this should forward to the client.
-struct Callback {
-    /// Size of input frame in bytes
-    input_frame_size: u16,
-    /// Size of output frame in bytes
-    output_frame_size: u16,
-    input_shm: SharedMemWriter,
-    output_shm: SharedMemReader,
-    rpc: rpc::ClientProxy<CallbackReq, CallbackResp>
-}
-
-impl cubeb::StreamCallback for Callback {
-    type Frame = u8;
-
-    fn data_callback(&mut self, input: &[u8], output: &mut [u8]) -> isize {
-        trace!("Stream data callback: {} {}", input.len(), output.len());
-
-        // len is of input and output is frame len. Turn these into the real lengths.
-        let real_input = unsafe {
-            let size_bytes = input.len() * self.input_frame_size as usize;
-            slice::from_raw_parts(input.as_ptr(), size_bytes)
-        };
-        let real_output = unsafe {
-            let size_bytes = output.len() * self.output_frame_size as usize;
-            trace!("Resize output to {}", size_bytes);
-            slice::from_raw_parts_mut(output.as_mut_ptr(), size_bytes)
-        };
-
-        self.input_shm.write(real_input).unwrap();
-
-        let r = self.rpc
-            .call(CallbackReq::Data(
-                output.len() as isize,
-                self.output_frame_size as usize
-            ))
-            .wait();
-
-        match r {
-            Ok(CallbackResp::Data(cb_result)) => if cb_result >= 0 {
-                let len = cb_result as usize * self.output_frame_size as usize;
-                self.output_shm.read(&mut real_output[..len]).unwrap();
-                cb_result
-            } else {
-                cb_result
-            },
-            _ => {
-                debug!("Unexpected message {:?} during data_callback", r);
-                -1
-            }
-        }
-    }
-
-    fn state_callback(&mut self, state: cubeb::State) {
-        info!("Stream state callback: {:?}", state);
-        // TODO: Share this conversion with the same code in cubeb-rs?
-        let state = match state {
-            cubeb::State::Started => ffi::CUBEB_STATE_STARTED,
-            cubeb::State::Stopped => ffi::CUBEB_STATE_STOPPED,
-            cubeb::State::Drained => ffi::CUBEB_STATE_DRAINED,
-            cubeb::State::Error => ffi::CUBEB_STATE_ERROR
-        };
-
-        let r = self.rpc.call(CallbackReq::State(state)).wait();
-
-        match r {
-            Ok(CallbackResp::State) => {},
-            _ => {
-                debug!("Unexpected message {:?} during callback", r);
-            }
-        };
-    }
-}
-
-type StreamSlab = slab::Slab<cubeb::Stream<Callback>, usize>;
+type StreamSlab = slab::Slab<cubeb::Stream<u8>, usize>;
 
 pub struct CubebServer {
     cb_remote: Remote,
-    streams: StreamSlab
+    streams: StreamSlab,
 }
 
 impl rpc::Server for CubebServer {
     type Request = ServerMessage;
     type Response = ClientMessage;
     type Future = FutureResult<Self::Response, ()>;
-    type Transport =
-        FramedWithFds<UnixStream, LengthDelimitedCodec<Self::Response, Self::Request>>;
+    type Transport = FramedWithFds<UnixStream, LengthDelimitedCodec<Self::Response, Self::Request>>;
 
     fn process(&mut self, req: Self::Request) -> Self::Future {
         let resp = with_local_context(|context| match *context {
-            Err(_) => error(cubeb::Error::new()),
-            Ok(ref context) => self.process_msg(context, &req)
+            Err(_) => error(cubeb::Error::error()),
+            Ok(ref context) => self.process_msg(context, &req),
         });
         future::ok(resp)
     }
 }
 
 impl CubebServer {
     pub fn new(cb_remote: Remote) -> Self {
         CubebServer {
             cb_remote: cb_remote,
-            streams: StreamSlab::with_capacity(STREAM_CONN_CHUNK_SIZE)
+            streams: StreamSlab::with_capacity(STREAM_CONN_CHUNK_SIZE),
         }
     }
 
     // Process a request coming from the client.
     fn process_msg(&mut self, context: &cubeb::Context, msg: &ServerMessage) -> ClientMessage {
         let resp: ClientMessage = match *msg {
             ServerMessage::ClientConnect => panic!("already connected"),
 
             ServerMessage::ClientDisconnect => {
                 // TODO:
                 //self.connection.client_disconnect();
                 ClientMessage::ClientDisconnected
-            },
+            }
 
             ServerMessage::ContextGetBackendId => ClientMessage::ContextBackendId(),
 
             ServerMessage::ContextGetMaxChannelCount => context
                 .max_channel_count()
                 .map(ClientMessage::ContextMaxChannelCount)
                 .unwrap_or_else(error),
 
@@ -214,58 +138,58 @@ impl CubebServer {
                     .channels(u32::from(params.channels))
                     .layout(layout)
                     .take();
 
                 context
                     .min_latency(&params)
                     .map(ClientMessage::ContextMinLatency)
                     .unwrap_or_else(error)
-            },
+            }
 
             ServerMessage::ContextGetPreferredSampleRate => context
                 .preferred_sample_rate()
                 .map(ClientMessage::ContextPreferredSampleRate)
                 .unwrap_or_else(error),
 
             ServerMessage::ContextGetPreferredChannelLayout => context
                 .preferred_channel_layout()
                 .map(|l| ClientMessage::ContextPreferredChannelLayout(l as _))
                 .unwrap_or_else(error),
 
             ServerMessage::ContextGetDeviceEnumeration(device_type) => context
                 .enumerate_devices(cubeb::DeviceType::from_bits_truncate(device_type))
                 .map(|devices| {
-                    let v: Vec<DeviceInfo> = devices.iter().map(|i| i.raw().into()).collect();
+                    let v: Vec<DeviceInfo> = devices.iter().map(|i| i.as_ref().into()).collect();
                     ClientMessage::ContextEnumeratedDevices(v)
                 })
                 .unwrap_or_else(error),
 
             ServerMessage::StreamInit(ref params) => self.process_stream_init(context, params)
-                .unwrap_or_else(|_| error(cubeb::Error::new())),
+                .unwrap_or_else(|_| error(cubeb::Error::error())),
 
             ServerMessage::StreamDestroy(stm_tok) => {
                 self.streams.remove(stm_tok);
                 ClientMessage::StreamDestroyed
-            },
+            }
 
             ServerMessage::StreamStart(stm_tok) => {
                 let _ = self.streams[stm_tok].start();
                 ClientMessage::StreamStarted
-            },
+            }
 
             ServerMessage::StreamStop(stm_tok) => {
                 let _ = self.streams[stm_tok].stop();
                 ClientMessage::StreamStopped
-            },
+            }
 
             ServerMessage::StreamResetDefaultDevice(stm_tok) => {
                 let _ = self.streams[stm_tok].reset_default_device();
                 ClientMessage::StreamDefaultDeviceReset
-            },
+            }
 
             ServerMessage::StreamGetPosition(stm_tok) => self.streams[stm_tok]
                 .position()
                 .map(ClientMessage::StreamPosition)
                 .unwrap_or_else(error),
 
             ServerMessage::StreamGetLatency(stm_tok) => self.streams[stm_tok]
                 .latency()
@@ -279,86 +203,60 @@ impl CubebServer {
 
             ServerMessage::StreamSetPanning(stm_tok, panning) => self.streams[stm_tok]
                 .set_panning(panning)
                 .map(|_| ClientMessage::StreamPanningSet)
                 .unwrap_or_else(error),
 
             ServerMessage::StreamGetCurrentDevice(stm_tok) => self.streams[stm_tok]
                 .current_device()
-                .map(|device| ClientMessage::StreamCurrentDevice(device.into()))
-                .unwrap_or_else(error)
+                .map(|device| ClientMessage::StreamCurrentDevice(Device::from(device)))
+                .unwrap_or_else(error),
         };
 
         debug!("process_msg: req={:?}, resp={:?}", msg, resp);
 
         resp
     }
 
     // Stream init is special, so it's been separated from process_msg.
     fn process_stream_init(
         &mut self,
         context: &cubeb::Context,
-        params: &StreamInitParams
+        params: &StreamInitParams,
     ) -> Result<ClientMessage> {
-        fn opt_stream_params(params: Option<&StreamParams>) -> Option<cubeb::StreamParams> {
-            params.and_then(|p| {
-                let raw = ffi::cubeb_stream_params::from(p);
-                Some(unsafe { cubeb::StreamParams::from_raw(&raw as *const _) })
-            })
-        }
-
-        fn frame_size_in_bytes(params: Option<cubeb::StreamParams>) -> u16 {
+        fn frame_size_in_bytes(params: Option<&StreamParams>) -> u16 {
             params
                 .map(|p| {
-                    let sample_size = match p.format() {
+                    let format = p.format.into();
+                    let sample_size = match format {
                         cubeb::SampleFormat::S16LE
                         | cubeb::SampleFormat::S16BE
                         | cubeb::SampleFormat::S16NE => 2,
                         cubeb::SampleFormat::Float32LE
                         | cubeb::SampleFormat::Float32BE
-                        | cubeb::SampleFormat::Float32NE => 4
+                        | cubeb::SampleFormat::Float32NE => 4,
                     };
-                    let channel_count = p.channels() as u16;
+                    let channel_count = p.channels as u16;
                     sample_size * channel_count
                 })
                 .unwrap_or(0u16)
         }
 
         // TODO: Yuck!
-        let input_device =
-            unsafe { cubeb::DeviceId::from_raw(params.input_device as *const _) };
-        let output_device =
-            unsafe { cubeb::DeviceId::from_raw(params.output_device as *const _) };
+        let input_device = params.input_device as *const _;
+        let output_device = params.output_device as *const _;
         let latency = params.latency_frames;
-        let mut builder = cubeb::StreamInitOptionsBuilder::new();
-        builder
-            .input_device(input_device)
-            .output_device(output_device)
-            .latency(latency);
 
-        if let Some(ref stream_name) = params.stream_name {
-            builder.stream_name(stream_name);
-        }
-        let input_stream_params = opt_stream_params(params.input_stream_params.as_ref());
-        if let Some(ref isp) = input_stream_params {
-            builder.input_stream_param(isp);
-        }
-        let output_stream_params = opt_stream_params(params.output_stream_params.as_ref());
-        if let Some(ref osp) = output_stream_params {
-            builder.output_stream_param(osp);
-        }
-        let params = builder.take();
-
-        let input_frame_size = frame_size_in_bytes(input_stream_params);
-        let output_frame_size = frame_size_in_bytes(output_stream_params);
+        let input_frame_size = frame_size_in_bytes(params.input_stream_params.as_ref());
+        let output_frame_size = frame_size_in_bytes(params.output_stream_params.as_ref());
 
         let (stm1, stm2) = net::UnixStream::pair()?;
-        info!("Created callback pair: {:?}-{:?}", stm1, stm2);
-        let (input_shm, input_file) =
+        debug!("Created callback pair: {:?}-{:?}", stm1, stm2);
+        let (mut input_shm, input_file) =
             SharedMemWriter::new(&audioipc::get_shm_path("input"), SHM_AREA_SIZE)?;
         let (output_shm, output_file) =
             SharedMemReader::new(&audioipc::get_shm_path("output"), SHM_AREA_SIZE)?;
 
         // This code is currently running on the Client/Server RPC
         // handling thread.  We need to move the registration of the
         // bind_client to the callback RPC handling thread.  This is
         // done by spawning a future on cb_remote.
@@ -372,69 +270,128 @@ impl CubebServer {
             assert_ne!(id, handle.id());
             let stream = UnixStream::from_stream(stm2, handle).unwrap();
             let transport = framed(stream, Default::default());
             let rpc = rpc::bind_client::<CallbackClient>(transport, handle);
             drop(tx.send(rpc));
             Ok(())
         });
 
-        let rpc: rpc::ClientProxy<CallbackReq, CallbackResp> = match rx.wait() {
+        let rpc_data: rpc::ClientProxy<CallbackReq, CallbackResp> = match rx.wait() {
             Ok(rpc) => rpc,
-            Err(_) => bail!("Failed to create callback rpc.")
+            Err(_) => bail!("Failed to create callback rpc."),
         };
+        let rpc_state = rpc_data.clone();
+
+        let mut builder = cubeb::StreamBuilder::new();
+
+        if let Some(ref stream_name) = params.stream_name {
+            builder.name(stream_name.clone());
+        }
+
+        if let Some(ref isp) = params.input_stream_params {
+            let input_stream_params =
+                unsafe { cubeb::StreamParamsRef::from_ptr(isp as *const StreamParams as *mut _) };
+            builder.input(input_device, input_stream_params);
+        }
+
+        if let Some(ref osp) = params.output_stream_params {
+            let output_stream_params =
+                unsafe { cubeb::StreamParamsRef::from_ptr(osp as *const StreamParams as *mut _) };
+            builder.output(output_device, output_stream_params);
+        }
+
+        builder
+            .latency(latency)
+            .data_callback(move |input, output| {
+                trace!("Stream data callback: {} {}", input.len(), output.len());
+
+                // len is of input and output is frame len. Turn these into the real lengths.
+                let real_input = unsafe {
+                    let nbytes = input.len() * input_frame_size as usize;
+                    slice::from_raw_parts(input.as_ptr(), nbytes)
+                };
+
+                input_shm.write(real_input).unwrap();
 
-        context
-            .stream_init(
-                &params,
-                Callback {
-                    input_frame_size: input_frame_size,
-                    output_frame_size: output_frame_size,
-                    input_shm: input_shm,
-                    output_shm: output_shm,
-                    rpc: rpc
+                let r = rpc_data
+                    .call(CallbackReq::Data(
+                        output.len() as isize,
+                        output_frame_size as usize,
+                    ))
+                    .wait();
+
+                match r {
+                    Ok(CallbackResp::Data(frames)) => {
+                        if frames >= 0 {
+                            let nbytes = frames as usize * output_frame_size as usize;
+                            let real_output = unsafe {
+                                trace!("Resize output to {}", nbytes);
+                                slice::from_raw_parts_mut(output.as_mut_ptr(), nbytes)
+                            };
+                            output_shm.read(&mut real_output[..nbytes]).unwrap();
+                        }
+                        frames
+                    }
+                    _ => {
+                        debug!("Unexpected message {:?} during data_callback", r);
+                        -1
+                    }
                 }
-            )
+            })
+            .state_callback(move |state| {
+                trace!("Stream state callback: {:?}", state);
+                let r = rpc_state.call(CallbackReq::State(state.into())).wait();
+                match r {
+                    Ok(CallbackResp::State) => {}
+                    _ => {
+                        debug!("Unexpected message {:?} during callback", r);
+                    }
+                }
+            });
+
+        builder
+            .init(context)
             .and_then(|stream| {
                 if !self.streams.has_available() {
                     trace!(
                         "server connection ran out of stream slots. reserving {} more.",
                         STREAM_CONN_CHUNK_SIZE
                     );
                     self.streams.reserve_exact(STREAM_CONN_CHUNK_SIZE);
                 }
 
                 let stm_tok = match self.streams.vacant_entry() {
                     Some(entry) => {
                         debug!("Registering stream {:?}", entry.index(),);
 
                         entry.insert(stream).index()
-                    },
+                    }
                     None => {
                         // TODO: Turn into error
                         panic!("Failed to insert stream into slab. No entries")
                     }
                 };
 
                 Ok(ClientMessage::StreamCreated(StreamCreate {
                     token: stm_tok,
                     fds: [
                         stm1.into_raw_fd(),
                         input_file.into_raw_fd(),
                         output_file.into_raw_fd(),
-                    ]
+                    ],
                 }))
             })
             .map_err(|e| e.into())
     }
 }
 
 struct ServerWrapper {
     core_thread: core::CoreThread,
-    callback_thread: core::CoreThread
+    callback_thread: core::CoreThread,
 }
 
 fn run() -> Result<ServerWrapper> {
     trace!("Starting up cubeb audio server event loop thread...");
 
     let callback_thread = try!(
         core::spawn_thread("AudioIPC Callback RPC", || {
             trace!("Starting up cubeb audio callback event loop thread...");
@@ -455,25 +412,25 @@ fn run() -> Result<ServerWrapper> {
                 e.description()
             );
             Err(e)
         })
     );
 
     Ok(ServerWrapper {
         core_thread: core_thread,
-        callback_thread: callback_thread
+        callback_thread: callback_thread,
     })
 }
 
 #[no_mangle]
 pub extern "C" fn audioipc_server_start() -> *mut c_void {
     match run() {
         Ok(server) => Box::into_raw(Box::new(server)) as *mut _,
-        Err(_) => ptr::null_mut() as *mut _
+        Err(_) => ptr::null_mut() as *mut _,
     }
 }
 
 #[no_mangle]
 pub extern "C" fn audioipc_server_new_client(p: *mut c_void) -> libc::c_int {
     let (wait_tx, wait_rx) = oneshot::channel();
     let wrapper: &ServerWrapper = unsafe { &*(p as *mut _) };