Bug 1403048 - Update media/audioipc to b5559d28. r?kamidphish draft
authorMatthew Gregan <kinetik@flim.org>
Tue, 26 Sep 2017 15:49:26 +1300
changeset 670219 9fd56fc02256166fa79e5d12bbce13e1bb7237df
parent 670209 9db1c3303473492a4a239804d5e8d8b80fa4b788
child 670220 45dda7d4f98e8ce4dcf6c63d29329aad2d24e70d
push id81563
push userbmo:kinetik@flim.org
push dateTue, 26 Sep 2017 03:06:34 +0000
reviewerskamidphish
bugs1403048
milestone58.0a1
Bug 1403048 - Update media/audioipc to b5559d28. r?kamidphish
media/audioipc/audioipc/Cargo.toml
media/audioipc/audioipc/src/async.rs
media/audioipc/audioipc/src/codec.rs
media/audioipc/audioipc/src/connection.rs
media/audioipc/audioipc/src/lib.rs
media/audioipc/audioipc/src/messages.rs
media/audioipc/audioipc/src/msg.rs
media/audioipc/client/Cargo.toml
media/audioipc/client/src/context.rs
media/audioipc/client/src/lib.rs
media/audioipc/client/src/send_recv.rs
media/audioipc/client/src/stream.rs
media/audioipc/server/Cargo.toml
media/audioipc/server/src/channel.rs
media/audioipc/server/src/lib.rs
--- a/media/audioipc/audioipc/Cargo.toml
+++ b/media/audioipc/audioipc/Cargo.toml
@@ -3,18 +3,19 @@ name = "audioipc"
 version = "0.1.0"
 authors = [
         "Matthew Gregan <kinetik@flim.org>",
         "Dan Glastonbury <dan.glastonbury@gmail.com>"
         ]
 description = "Remote Cubeb IPC"
 
 [dependencies]
+cubeb-core = { path = "../../cubeb-rs/cubeb-core" }
+bincode = "0.8"
+bytes = "0.4"
 error-chain = "0.10.0"
+libc = "0.2"
 log = "^0.3.6"
+memmap = "0.5.2"
+mio = "0.6.7"
+mio-uds = "0.6.4"
 serde = "1.*.*"
 serde_derive = "1.*.*"
-bincode = "0.8"
-libc = "0.2"
-mio = "0.6.7"
-cubeb-core = { path = "../../cubeb-rs/cubeb-core" }
-byteorder = "1"
-memmap = "0.5.2"
new file mode 100644
--- /dev/null
+++ b/media/audioipc/audioipc/src/async.rs
@@ -0,0 +1,153 @@
+// Copyright © 2017 Mozilla Foundation
+//
+// This program is made available under an ISC-style license.  See the
+// accompanying file LICENSE for details
+
+//! Various async helpers modelled after futures-rs and tokio-io.
+
+use {RecvFd, SendFd};
+use bytes::{Buf, BufMut};
+use mio_uds;
+use std::io as std_io;
+use std::os::unix::io::RawFd;
+use std::os::unix::net;
+
+/// A convenience macro for working with `io::Result<T>` from the
+/// `std::io::Read` and `std::io::Write` traits.
+///
+/// This macro takes `io::Result<T>` as input, and returns `T` as the output. If
+/// the input type is of the `Err` variant, then `Async::NotReady` is returned if
+/// it indicates `WouldBlock` or otherwise `Err` is returned.
+#[macro_export]
+macro_rules! try_nb {
+    ($e:expr) => (match $e {
+        Ok(t) => t,
+        Err(ref e) if e.kind() == ::std::io::ErrorKind::WouldBlock => {
+            return Ok(Async::NotReady)
+        }
+        Err(e) => return Err(e.into()),
+    })
+}
+
+/////////////////////////////////////////////////////////////////////////////////////////
+// Async support - Handle EWOULDBLOCK/EAGAIN from non-blocking I/O operations.
+
+/// Return type for async methods, indicates whether the operation was
+/// ready or not.
+///
+/// * `Ok(Async::Ready(t))` means that the operation has completed successfully.
+/// * `Ok(Async::NotReady)` means that the underlying system is not ready to handle operation.
+/// * `Err(e)` means that the operation has completed with the given error `e`.
+pub type AsyncResult<T, E> = Result<Async<T>, E>;
+
+#[derive(Copy, Clone, Debug, PartialEq)]
+pub enum Async<T> {
+    /// Represents that a value is immediately ready.
+    Ready(T),
+    /// Represents that a value is not ready yet, but may be so later.
+    NotReady
+}
+
+impl<T> Async<T> {
+    pub fn is_ready(&self) -> bool {
+        match *self {
+            Async::Ready(_) => true,
+            Async::NotReady => false,
+        }
+    }
+
+    pub fn is_not_ready(&self) -> bool {
+        !self.is_ready()
+    }
+}
+
+/// Return type for an async attempt to send a value.
+///
+/// * `Ok(AsyncSend::Ready)` means that the operation has completed successfully.
+/// * `Ok(AsyncSend::NotReady(t))` means that the underlying system is not ready to handle
+///    send. returns the value that tried to be sent in `t`.
+/// * `Err(e)` means that operation has completed with the given error `e`.
+pub type AsyncSendResult<T, E> = Result<AsyncSend<T>, E>;
+
+#[derive(Copy, Clone, Debug, PartialEq)]
+pub enum AsyncSend<T> {
+    Ready,
+    NotReady(T)
+}
+
+pub trait AsyncRecvFd: RecvFd {
+    unsafe fn prepare_uninitialized_buffer(&self, bytes: &mut [u8]) -> bool {
+        for byte in bytes.iter_mut() {
+            *byte = 0;
+        }
+
+        true
+    }
+
+    /// Pull some bytes from this source into the specified `Buf`, returning
+    /// how many bytes were read.
+    ///
+    /// The `buf` provided will have bytes read into it and the internal cursor
+    /// will be advanced if any bytes were read. Note that this method typically
+    /// will not reallocate the buffer provided.
+    fn recv_buf_fd<B>(&mut self, buf: &mut B) -> AsyncResult<(usize, Option<RawFd>), std_io::Error>
+    where
+        Self: Sized,
+        B: BufMut,
+    {
+        if !buf.has_remaining_mut() {
+            return Ok(Async::Ready((0, None)));
+        }
+
+        unsafe {
+            let (n, fd) = {
+                let bytes = buf.bytes_mut();
+                self.prepare_uninitialized_buffer(bytes);
+                try_nb!(self.recv_fd(bytes))
+            };
+
+            buf.advance_mut(n);
+            Ok(Async::Ready((n, fd)))
+        }
+    }
+}
+
+impl AsyncRecvFd for net::UnixStream {}
+impl AsyncRecvFd for mio_uds::UnixStream {}
+
+/// A trait for writable objects which operated in an async fashion.
+///
+/// This trait inherits from `std::io::Write` and indicates that an I/O object is
+/// **nonblocking**, meaning that it will return an error instead of blocking
+/// when bytes cannot currently be written, but hasn't closed. Specifically
+/// this means that the `write` function for types that implement this trait
+/// can have a few return values:
+///
+/// * `Ok(n)` means that `n` bytes of data was immediately written .
+/// * `Err(e) if e.kind() == ErrorKind::WouldBlock` means that no data was
+///   written from the buffer provided. The I/O object is not currently
+///   writable but may become writable in the future.
+/// * `Err(e)` for other errors are standard I/O errors coming from the
+///   underlying object.
+pub trait AsyncSendFd: SendFd {
+    /// Write a `Buf` into this value, returning how many bytes were written.
+    ///
+    /// Note that this method will advance the `buf` provided automatically by
+    /// the number of bytes written.
+    fn send_buf_fd<B>(&mut self, buf: &mut B, fd: Option<RawFd>) -> AsyncResult<usize, std_io::Error>
+    where
+        Self: Sized,
+        B: Buf,
+    {
+        if !buf.has_remaining() {
+            return Ok(Async::Ready(0));
+        }
+
+        let n = try_nb!(self.send_fd(buf.bytes(), fd));
+        buf.advance(n);
+        Ok(Async::Ready(n))
+    }
+}
+
+impl AsyncSendFd for net::UnixStream {}
+impl AsyncSendFd for mio_uds::UnixStream {}
new file mode 100644
--- /dev/null
+++ b/media/audioipc/audioipc/src/codec.rs
@@ -0,0 +1,141 @@
+// Copyright © 2017 Mozilla Foundation
+//
+// This program is made available under an ISC-style license.  See the
+// accompanying file LICENSE for details
+
+//! `Encoder`s and `Decoder`s from items to/from `BytesMut` buffers.
+
+use bincode::{self, Bounded, deserialize, serialize_into, serialized_size};
+use bytes::{Buf, BufMut, BytesMut, LittleEndian};
+use serde::de::DeserializeOwned;
+use serde::ser::Serialize;
+use std::io as std_io;
+use std::io::Cursor;
+use std::mem;
+
+////////////////////////////////////////////////////////////////////////////////
+// Split buffer into size delimited frames - This appears more complicated than
+// might be necessary due to handling the possibility of messages being split
+// across reads.
+
+#[derive(Debug)]
+enum FrameState {
+    Head,
+    Data(usize)
+}
+
+#[derive(Debug)]
+pub struct Decoder {
+    state: FrameState
+}
+
+impl Decoder {
+    pub fn new() -> Self {
+        Decoder {
+            state: FrameState::Head
+        }
+    }
+
+    fn decode_head(&mut self, src: &mut BytesMut) -> std_io::Result<Option<usize>> {
+        let head_size = mem::size_of::<u16>();
+        if src.len() < head_size {
+            // Not enough data
+            return Ok(None);
+        }
+
+        let n = {
+            let mut src = Cursor::new(&mut *src);
+
+            // match endianess
+            let n = src.get_uint::<LittleEndian>(head_size);
+
+            if n > u64::from(u16::max_value()) {
+                return Err(std_io::Error::new(
+                    std_io::ErrorKind::InvalidData,
+                    "frame size too big"
+                ));
+            }
+
+            // The check above ensures there is no overflow
+            n as usize
+        };
+
+        // Consume the length field
+        let _ = src.split_to(head_size);
+
+        Ok(Some(n))
+    }
+
+    fn decode_data(&self, n: usize, src: &mut BytesMut) -> std_io::Result<Option<BytesMut>> {
+        // At this point, the buffer has already had the required capacity
+        // reserved. All there is to do is read.
+        if src.len() < n {
+            return Ok(None);
+        }
+
+        Ok(Some(src.split_to(n)))
+    }
+
+    pub fn split_frame(&mut self, src: &mut BytesMut) -> std_io::Result<Option<BytesMut>> {
+        let n = match self.state {
+            FrameState::Head => {
+                match try!(self.decode_head(src)) {
+                    Some(n) => {
+                        self.state = FrameState::Data(n);
+
+                        // Ensure that the buffer has enough space to read the
+                        // incoming payload
+                        src.reserve(n);
+
+                        n
+                    },
+                    None => return Ok(None),
+                }
+            },
+            FrameState::Data(n) => n,
+        };
+
+        match try!(self.decode_data(n, src)) {
+            Some(data) => {
+                // Update the decode state
+                self.state = FrameState::Head;
+
+                // Make sure the buffer has enough space to read the next head
+                src.reserve(mem::size_of::<u16>());
+
+                Ok(Some(data))
+            },
+            None => Ok(None),
+        }
+    }
+
+    pub fn decode<ITEM: DeserializeOwned>(&mut self, src: &mut BytesMut) -> Result<Option<ITEM>, bincode::Error> {
+        match try!(self.split_frame(src)) {
+            Some(buf) => deserialize::<ITEM>(buf.as_ref()).and_then(|t| Ok(Some(t))),
+            None => Ok(None),
+        }
+    }
+}
+
+impl Default for Decoder {
+    fn default() -> Self {
+        Self::new()
+    }
+}
+
+pub fn encode<ITEM: Serialize>(dst: &mut BytesMut, item: &ITEM) -> Result<(), bincode::Error> {
+    let head_len = mem::size_of::<u16>() as u64;
+    let item_len = serialized_size(item);
+
+    if head_len + item_len > u64::from(u16::max_value()) {
+        return Err(Box::new(bincode::ErrorKind::IoError(std_io::Error::new(
+            std_io::ErrorKind::InvalidInput,
+            "frame too big"
+        ))));
+    }
+
+    let n = (head_len + item_len) as usize;
+    dst.reserve(n);
+    dst.put_u16::<LittleEndian>(item_len as u16);
+    serialize_into(&mut dst.writer(), item, Bounded(item_len))
+}
--- a/media/audioipc/audioipc/src/connection.rs
+++ b/media/audioipc/audioipc/src/connection.rs
@@ -1,48 +1,50 @@
-use bincode::{self, deserialize, serialize};
+use {AutoCloseFd, RecvFd, SendFd};
+use async::{Async, AsyncRecvFd};
+use bytes::{BufMut, BytesMut};
+use codec::{Decoder, encode};
 use errors::*;
-use msg;
 use mio::{Poll, PollOpt, Ready, Token};
 use mio::event::Evented;
 use mio::unix::EventedFd;
 use serde::de::DeserializeOwned;
 use serde::ser::Serialize;
+use std::collections::VecDeque;
 use std::fmt::Debug;
 use std::io::{self, Read};
 use std::os::unix::io::{AsRawFd, RawFd};
 use std::os::unix::net;
 use std::os::unix::prelude::*;
-use libc;
-use byteorder::{LittleEndian, ReadBytesExt, WriteBytesExt};
-
-pub trait RecvFd {
-    fn recv_fd(&mut self, bytes: &mut [u8]) -> io::Result<(usize, Option<RawFd>)>;
-}
-
-pub trait SendFd {
-    fn send_fd<FD: IntoRawFd>(&mut self, bytes: &[u8], fd: Option<FD>) -> io::Result<(usize)>;
-}
 
 // Because of the trait implementation rules in Rust, this needs to be
 // a wrapper class to allow implementation of a trait from another
 // crate on a struct from yet another crate.
 //
 // This class is effectively net::UnixStream.
 
 #[derive(Debug)]
 pub struct Connection {
-    stream: net::UnixStream
+    stream: net::UnixStream,
+    recv_buffer: BytesMut,
+    recv_fd: VecDeque<AutoCloseFd>,
+    send_buffer: BytesMut,
+    decoder: Decoder
 }
 
 impl Connection {
     pub fn new(stream: net::UnixStream) -> Connection {
         info!("Create new connection");
+        stream.set_nonblocking(false).unwrap();
         Connection {
-            stream: stream
+            stream: stream,
+            recv_buffer: BytesMut::with_capacity(1024),
+            recv_fd: VecDeque::new(),
+            send_buffer: BytesMut::with_capacity(1024),
+            decoder: Decoder::new()
         }
     }
 
     /// Creates an unnamed pair of connected sockets.
     ///
     /// Returns two `Connection`s which are connected to each other.
     ///
     /// # Examples
@@ -55,77 +57,102 @@ impl Connection {
     ///     Err(e) => {
     ///         println!("Couldn't create a pair of connections: {:?}", e);
     ///         return
     ///     }
     /// };
     /// ```
     pub fn pair() -> io::Result<(Connection, Connection)> {
         let (s1, s2) = net::UnixStream::pair()?;
-        Ok((
-            Connection {
-                stream: s1
-            },
-            Connection {
-                stream: s2
-            }
-        ))
+        Ok((Connection::new(s1), Connection::new(s2)))
+    }
+
+    pub fn take_fd(&mut self) -> Option<RawFd> {
+        self.recv_fd.pop_front().map(|fd| fd.into_raw_fd())
     }
 
     pub fn receive<RT>(&mut self) -> Result<RT>
     where
         RT: DeserializeOwned + Debug,
     {
-        match self.receive_with_fd() {
-            Ok((r, None)) => Ok(r),
-            Ok((_, Some(_))) => panic!("unexpected fd received"),
-            Err(e) => Err(e),
-        }
+        self.receive_with_fd()
     }
 
-    pub fn receive_with_fd<RT>(&mut self) -> Result<(RT, Option<RawFd>)>
+    pub fn receive_with_fd<RT>(&mut self) -> Result<RT>
     where
         RT: DeserializeOwned + Debug,
     {
-        // TODO: Check deserialize_from and serialize_into.
-        let mut encoded = vec![0; 32 * 1024]; // TODO: Get max size from bincode, or at least assert.
-        // TODO: Read until block, EOF, or error.
-        // TODO: Switch back to recv_fd.
-        match self.stream.recv_fd(&mut encoded) {
-            Ok((0, _)) => Err(ErrorKind::Disconnected.into()),
-            // TODO: Handle partial read?
-            Ok((n, fd)) => {
-                let r = deserialize(&encoded[..n]);
-                debug!("receive {:?}", r);
+        trace!("received_with_fd...");
+        loop {
+            trace!("   recv_buffer = {:?}", self.recv_buffer);
+            if !self.recv_buffer.is_empty() {
+                let r = self.decoder.decode(&mut self.recv_buffer);
+                trace!("receive {:?}", r);
                 match r {
-                    Ok(r) => Ok((r, fd)),
-                    Err(e) => Err(e).chain_err(|| "Failed to deserialize message"),
+                    Ok(Some(r)) => return Ok(r),
+                    Ok(None) => {
+                        /* Buffer doesn't contain enough data for a complete
+                         * message, so need to enter recv_buf_fd to get more. */
+                    },
+                    Err(e) => return Err(e).chain_err(|| "Failed to deserialize message"),
                 }
-            },
-            // TODO: Handle dropped message.
-            // Err(ref e) if e.kind() == std::io::ErrorKind::WouldBlock => panic!("wouldblock"),
-            _ => bail!("socket write"),
+            }
+
+            // Otherwise, try to read more data and try again. Make sure we've
+            // got room for at least one byte to read to ensure that we don't
+            // get a spurious 0 that looks like EOF
+
+            // The decoder.decode should have reserved an amount for
+            // the next bit it needs to read.  Check that we reserved
+            // enough space for, at least the 2 byte size prefix.
+            assert!(self.recv_buffer.remaining_mut() > 2);
+
+            // TODO: Read until block, EOF, or error.
+            // TODO: Switch back to recv_fd.
+            match self.stream.recv_buf_fd(&mut self.recv_buffer) {
+                Ok(Async::Ready((0, _))) => return Err(ErrorKind::Disconnected.into()),
+                // TODO: Handle partial read?
+                Ok(Async::Ready((_, fd))) => {
+                    trace!(
+                        "   recv_buf_fd: recv_buffer: {:?}, recv_fd: {:?}, fd: {:?}",
+                        self.recv_buffer,
+                        self.recv_fd,
+                        fd
+                    );
+                    if let Some(fd) = fd {
+                        self.recv_fd.push_back(
+                            unsafe { AutoCloseFd::from_raw_fd(fd) }
+                        );
+                    }
+                },
+                Ok(Async::NotReady) => bail!("Socket should be blocking."),
+                // TODO: Handle dropped message.
+                // Err(ref e) if e.kind() == std::io::ErrorKind::WouldBlock => panic!("wouldblock"),
+                _ => bail!("socket write"),
+            }
         }
     }
 
     pub fn send<ST>(&mut self, msg: ST) -> Result<usize>
     where
         ST: Serialize + Debug,
     {
         self.send_with_fd::<ST, Connection>(msg, None)
     }
 
     pub fn send_with_fd<ST, FD>(&mut self, msg: ST, fd_to_send: Option<FD>) -> Result<usize>
     where
         ST: Serialize + Debug,
         FD: IntoRawFd + Debug,
     {
-        let encoded: Vec<u8> = serialize(&msg, bincode::Infinite)?;
-        info!("send_with_fd {:?}, {:?}", msg, fd_to_send);
-        self.stream.send_fd(&encoded, fd_to_send).chain_err(
+        trace!("send_with_fd {:?}, {:?}", msg, fd_to_send);
+        try!(encode(&mut self.send_buffer, &msg));
+        let fd_to_send = fd_to_send.map(|fd| fd.into_raw_fd());
+        let send = self.send_buffer.take().freeze();
+        self.stream.send_fd(send.as_ref(), fd_to_send).chain_err(
             || "Failed to send message with fd"
         )
     }
 }
 
 impl Evented for Connection {
     fn register(&self, poll: &Poll, token: Token, events: Ready, opts: PollOpt) -> io::Result<()> {
         EventedFd(&self.stream.as_raw_fd()).register(poll, token, events, opts)
@@ -148,52 +175,31 @@ impl Read for Connection {
 
 // TODO: Is this required?
 impl<'a> Read for &'a Connection {
     fn read(&mut self, bytes: &mut [u8]) -> io::Result<usize> {
         (&self.stream).read(bytes)
     }
 }
 
-impl RecvFd for net::UnixStream {
-    fn recv_fd(&mut self, buf_to_recv: &mut [u8]) -> io::Result<(usize, Option<RawFd>)> {
-        let length = self.read_u32::<LittleEndian>()?;
-
-        msg::recvmsg(self.as_raw_fd(), &mut buf_to_recv[..length as usize])
-    }
-}
-
 impl RecvFd for Connection {
     fn recv_fd(&mut self, buf_to_recv: &mut [u8]) -> io::Result<(usize, Option<RawFd>)> {
         self.stream.recv_fd(buf_to_recv)
     }
 }
 
 impl FromRawFd for Connection {
     unsafe fn from_raw_fd(fd: RawFd) -> Connection {
-        Connection {
-            stream: net::UnixStream::from_raw_fd(fd)
-        }
+        Connection::new(net::UnixStream::from_raw_fd(fd))
     }
 }
 
 impl IntoRawFd for Connection {
     fn into_raw_fd(self) -> RawFd {
         self.stream.into_raw_fd()
     }
 }
 
-impl SendFd for net::UnixStream {
-    fn send_fd<FD: IntoRawFd>(&mut self, buf_to_send: &[u8], fd_to_send: Option<FD>) -> io::Result<usize> {
-        self.write_u32::<LittleEndian>(buf_to_send.len() as u32)?;
-
-        let fd_to_send = fd_to_send.map(|fd| fd.into_raw_fd());
-        let r = msg::sendmsg(self.as_raw_fd(), buf_to_send, fd_to_send);
-        fd_to_send.map(|fd| unsafe { libc::close(fd) });
-        r
-    }
-}
-
 impl SendFd for Connection {
-    fn send_fd<FD: IntoRawFd>(&mut self, buf_to_send: &[u8], fd_to_send: Option<FD>) -> io::Result<usize> {
+    fn send_fd(&mut self, buf_to_send: &[u8], fd_to_send: Option<RawFd>) -> io::Result<usize> {
         self.stream.send_fd(buf_to_send, fd_to_send)
     }
 }
--- a/media/audioipc/audioipc/src/lib.rs
+++ b/media/audioipc/audioipc/src/lib.rs
@@ -8,39 +8,119 @@
 #[macro_use]
 extern crate error_chain;
 
 #[macro_use]
 extern crate log;
 
 #[macro_use]
 extern crate serde_derive;
-extern crate serde;
+
 extern crate bincode;
-
-extern crate mio;
-
+extern crate bytes;
 extern crate cubeb_core;
+extern crate libc;
+extern crate memmap;
+extern crate mio;
+extern crate mio_uds;
+extern crate serde;
 
-extern crate libc;
-extern crate byteorder;
-
-extern crate memmap;
-
+pub mod async;
+pub mod codec;
 mod connection;
 pub mod errors;
 pub mod messages;
 mod msg;
 pub mod shm;
 
 pub use connection::*;
 pub use messages::{ClientMessage, ServerMessage};
+
 use std::env::temp_dir;
+use std::io;
+use std::os::unix::io::{AsRawFd, FromRawFd, IntoRawFd, RawFd};
+use std::os::unix::net;
 use std::path::PathBuf;
 
+// Extend sys::os::unix::net::UnixStream to support sending and receiving a single file desc.
+// We can extend UnixStream by using traits, eliminating the need to introduce a new wrapped
+// UnixStream type.
+pub trait RecvFd {
+    fn recv_fd(&mut self, bytes: &mut [u8]) -> io::Result<(usize, Option<RawFd>)>;
+}
+
+pub trait SendFd {
+    fn send_fd(&mut self, bytes: &[u8], fd: Option<RawFd>) -> io::Result<(usize)>;
+}
+
+impl RecvFd for net::UnixStream {
+    fn recv_fd(&mut self, buf_to_recv: &mut [u8]) -> io::Result<(usize, Option<RawFd>)> {
+        msg::recvmsg(self.as_raw_fd(), buf_to_recv)
+    }
+}
+
+impl RecvFd for mio_uds::UnixStream {
+    fn recv_fd(&mut self, buf_to_recv: &mut [u8]) -> io::Result<(usize, Option<RawFd>)> {
+        msg::recvmsg(self.as_raw_fd(), buf_to_recv)
+    }
+}
+
+impl SendFd for net::UnixStream {
+    fn send_fd(&mut self, buf_to_send: &[u8], fd_to_send: Option<RawFd>) -> io::Result<usize> {
+        msg::sendmsg(self.as_raw_fd(), buf_to_send, fd_to_send)
+    }
+}
+
+impl SendFd for mio_uds::UnixStream {
+    fn send_fd(&mut self, buf_to_send: &[u8], fd_to_send: Option<RawFd>) -> io::Result<usize> {
+        msg::sendmsg(self.as_raw_fd(), buf_to_send, fd_to_send)
+    }
+}
+
+////////////////////////////////////////////////////////////////////////////////
+
+#[derive(Debug)]
+pub struct AutoCloseFd(RawFd);
+
+impl Drop for AutoCloseFd {
+    fn drop(&mut self) {
+        unsafe {
+            libc::close(self.0);
+        }
+    }
+}
+
+impl FromRawFd for AutoCloseFd {
+    unsafe fn from_raw_fd(fd: RawFd) -> Self {
+        AutoCloseFd(fd)
+    }
+}
+
+impl IntoRawFd for AutoCloseFd {
+    fn into_raw_fd(self) -> RawFd {
+        let fd = self.0;
+        ::std::mem::forget(self);
+        fd
+    }
+}
+
+impl AsRawFd for AutoCloseFd {
+    fn as_raw_fd(&self) -> RawFd {
+        self.0
+    }
+}
+
+impl<'a> AsRawFd for &'a AutoCloseFd {
+    fn as_raw_fd(&self) -> RawFd {
+        self.0
+    }
+}
+
+////////////////////////////////////////////////////////////////////////////////
+
 fn get_temp_path(name: &str) -> PathBuf {
     let mut path = temp_dir();
     path.push(name);
     path
 }
 
 pub fn get_uds_path() -> PathBuf {
     get_temp_path("cubeb-sock")
--- a/media/audioipc/audioipc/src/messages.rs
+++ b/media/audioipc/audioipc/src/messages.rs
@@ -121,33 +121,33 @@ pub struct StreamParams {
     pub format: u32,
     pub rate: u16,
     pub channels: u8,
     pub layout: i32
 }
 
 impl<'a> From<&'a ffi::cubeb_stream_params> for StreamParams {
     fn from(params: &'a ffi::cubeb_stream_params) -> Self {
-        assert!(params.channels <= u8::max_value() as u32);
+        assert!(params.channels <= u32::from(u8::max_value()));
 
         StreamParams {
             format: params.format,
             rate: params.rate as u16,
             channels: params.channels as u8,
             layout: params.layout
         }
     }
 }
 
 impl<'a> From<&'a StreamParams> for ffi::cubeb_stream_params {
     fn from(params: &StreamParams) -> Self {
         ffi::cubeb_stream_params {
             format: params.format,
-            rate: params.rate as u32,
-            channels: params.channels as u32,
+            rate: u32::from(params.rate),
+            channels: u32::from(params.channels),
             layout: params.layout
         }
     }
 }
 
 #[derive(Debug, Serialize, Deserialize)]
 pub struct StreamInitParams {
     pub stream_name: Option<Vec<u8>>,
--- a/media/audioipc/audioipc/src/msg.rs
+++ b/media/audioipc/audioipc/src/msg.rs
@@ -1,105 +1,161 @@
 use libc;
 use std::io;
 use std::mem;
+use std::os::unix::io::RawFd;
 use std::ptr;
-use std::os::unix::io::RawFd;
-use std;
 
 // Note: The following fields must be laid out together, the OS expects them
 // to be part of a single allocation.
 #[repr(C)]
 struct CmsgSpace {
     cmsghdr: libc::cmsghdr,
+    #[cfg(not(target_os = "macos"))]
+    __padding: [usize; 0],
     data: libc::c_int,
 }
 
-unsafe fn sendmsg_retry(fd: libc::c_int, msg: *const libc::msghdr, flags: libc::c_int) -> libc::ssize_t {
-    loop {
-        let r = libc::sendmsg(fd, msg, flags);
-        if r == -1 && io::Error::last_os_error().raw_os_error().unwrap() == libc::EAGAIN {
-            std::thread::yield_now();
-            continue;
-        }
-        return r;
-    }
+#[cfg(not(target_os = "macos"))]
+fn cmsg_align(len: usize) -> usize {
+    let align_bytes = mem::size_of::<usize>() - 1;
+    (len + align_bytes) & !align_bytes
+}
+
+#[cfg(target_os = "macos")]
+fn cmsg_align(len: usize) -> usize {
+    len
+}
+
+fn cmsg_space() -> usize {
+    mem::size_of::<CmsgSpace>()
+}
+
+fn cmsg_len() -> usize {
+    cmsg_align(mem::size_of::<libc::cmsghdr>()) + mem::size_of::<libc::c_int>()
 }
 
 pub fn sendmsg(fd: RawFd, to_send: &[u8], fd_to_send: Option<RawFd>) -> io::Result<usize> {
     let mut msghdr: libc::msghdr = unsafe { mem::zeroed() };
     let mut iovec: libc::iovec = unsafe { mem::zeroed() };
     let mut cmsg: CmsgSpace = unsafe { mem::zeroed() };
 
     msghdr.msg_iov = &mut iovec as *mut _;
     msghdr.msg_iovlen = 1;
     if fd_to_send.is_some() {
         msghdr.msg_control = &mut cmsg.cmsghdr as *mut _ as *mut _;
-        msghdr.msg_controllen = mem::size_of::<CmsgSpace>() as _;
+        msghdr.msg_controllen = cmsg_space() as _;
     }
 
     iovec.iov_base = if to_send.is_empty() {
         // Empty Vecs have a non-null pointer.
         ptr::null_mut()
     } else {
         to_send.as_ptr() as *const _ as *mut _
     };
     iovec.iov_len = to_send.len();
 
-    cmsg.cmsghdr.cmsg_len = msghdr.msg_controllen;
+    cmsg.cmsghdr.cmsg_len = cmsg_len() as _;
     cmsg.cmsghdr.cmsg_level = libc::SOL_SOCKET;
     cmsg.cmsghdr.cmsg_type = libc::SCM_RIGHTS;
 
     cmsg.data = fd_to_send.unwrap_or(-1);
 
-    let result = unsafe { sendmsg_retry(fd, &msghdr, 0) };
+    let result = unsafe { libc::sendmsg(fd, &msghdr, 0) };
     if result >= 0 {
         Ok(result as usize)
     } else {
         Err(io::Error::last_os_error())
     }
 }
 
-unsafe fn recvmsg_retry(fd: libc::c_int, msg: *mut libc::msghdr, flags: libc::c_int) -> libc::ssize_t {
-    loop {
-        let r = libc::recvmsg(fd, msg, flags);
-        if r == -1 && io::Error::last_os_error().raw_os_error().unwrap() == libc::EAGAIN {
-            std::thread::yield_now();
-            continue;
-        }
-        return r;
-    }
-}
-
 pub fn recvmsg(fd: RawFd, to_recv: &mut [u8]) -> io::Result<(usize, Option<RawFd>)> {
     let mut msghdr: libc::msghdr = unsafe { mem::zeroed() };
     let mut iovec: libc::iovec = unsafe { mem::zeroed() };
     let mut cmsg: CmsgSpace = unsafe { mem::zeroed() };
 
     msghdr.msg_iov = &mut iovec as *mut _;
     msghdr.msg_iovlen = 1;
     msghdr.msg_control = &mut cmsg.cmsghdr as *mut _ as *mut _;
-    msghdr.msg_controllen = mem::size_of::<CmsgSpace>() as _;
+    msghdr.msg_controllen = cmsg_space() as _;
 
     iovec.iov_base = if to_recv.is_empty() {
         // Empty Vecs have a non-null pointer.
         ptr::null_mut()
     } else {
         to_recv.as_ptr() as *const _ as *mut _
     };
     iovec.iov_len = to_recv.len();
 
-    let result = unsafe { recvmsg_retry(fd, &mut msghdr, 0) };
+    let result = unsafe { libc::recvmsg(fd, &mut msghdr, 0) };
     if result >= 0 {
-        let fd = if msghdr.msg_controllen == mem::size_of::<CmsgSpace>() as _ &&
-            cmsg.cmsghdr.cmsg_len == mem::size_of::<CmsgSpace>() as _ &&
+        let fd = if msghdr.msg_controllen == cmsg_space() as _ &&
+            cmsg.cmsghdr.cmsg_len == cmsg_len() as _ &&
             cmsg.cmsghdr.cmsg_level == libc::SOL_SOCKET &&
             cmsg.cmsghdr.cmsg_type == libc::SCM_RIGHTS {
                 Some(cmsg.data)
             } else {
                 None
             };
 
         Ok((result as usize, fd))
     } else {
         Err(io::Error::last_os_error())
     }
 }
+
+#[cfg(test)]
+mod tests {
+    use libc;
+    use std::mem;
+    use std::os::unix::net::UnixStream;
+    use std::os::unix::io::{AsRawFd, FromRawFd, IntoRawFd};
+    use std::io::{Read, Write};
+    use super::{cmsg_len, cmsg_space, sendmsg, recvmsg};
+
+    #[test]
+    fn portable_sizes() {
+        if cfg!(all(target_os = "linux", target_pointer_width = "64")) {
+            assert_eq!(mem::size_of::<libc::cmsghdr>(), 16);
+            assert_eq!(cmsg_len(), 20);
+            assert_eq!(cmsg_space(), 24);
+        } else if cfg!(all(target_os = "linux", target_pointer_width = "32")) {
+            assert_eq!(mem::size_of::<libc::cmsghdr>(), 12);
+            assert_eq!(cmsg_len(), 16);
+            assert_eq!(cmsg_space(), 16);
+        } else if cfg!(target_os = "macos") {
+            assert_eq!(mem::size_of::<libc::cmsghdr>(), 12);
+            assert_eq!(cmsg_len(), 16);
+            assert_eq!(cmsg_space(), 16);
+        } else if cfg!(target_pointer_width = "64") {
+            assert_eq!(mem::size_of::<libc::cmsghdr>(), 12);
+            assert_eq!(cmsg_len(), 20);
+            assert_eq!(cmsg_space(), 24);
+        } else {
+            assert_eq!(mem::size_of::<libc::cmsghdr>(), 12);
+            assert_eq!(cmsg_len(), 16);
+            assert_eq!(cmsg_space(), 16);
+        }
+    }
+
+    #[test]
+    fn fd_passing() {
+        let (tx, rx) = UnixStream::pair().unwrap();
+
+        let (send_tx, mut send_rx) = UnixStream::pair().unwrap();
+
+        let fd = send_tx.into_raw_fd();
+        assert_eq!(sendmsg(tx.as_raw_fd(), b"a", Some(fd)).unwrap(), 1);
+        unsafe { libc::close(fd) };
+
+        let mut buf = [0u8];
+        let (got, fd) = recvmsg(rx.as_raw_fd(), &mut buf).unwrap();
+        assert_eq!(got, 1);
+        assert_eq!(&buf, b"a");
+
+        let mut send_tx = unsafe { UnixStream::from_raw_fd(fd.unwrap()) };
+        assert_eq!(send_tx.write(b"b").unwrap(), 1);
+
+        let mut buf = [0u8];
+        assert_eq!(send_rx.read(&mut buf).unwrap(), 1);
+        assert_eq!(&buf, b"b");
+    }
+}
--- a/media/audioipc/client/Cargo.toml
+++ b/media/audioipc/client/Cargo.toml
@@ -1,11 +1,11 @@
 [package]
 name = "audioipc-client"
 version = "0.1.0"
 authors = ["Dan Glastonbury <dan.glastonbury@gmail.com>"]
 description = "Cubeb Backend for talking to remote cubeb server."
 
 [dependencies]
 audioipc = { path="../audioipc" }
+cubeb-backend = { path="../../cubeb-rs/cubeb-backend" }
 cubeb-core = { path="../../cubeb-rs/cubeb-core" }
-cubeb-backend = { path="../../cubeb-rs/cubeb-backend" }
 log = "^0.3.6"
--- a/media/audioipc/client/src/context.rs
+++ b/media/audioipc/client/src/context.rs
@@ -1,17 +1,18 @@
 // Copyright © 2017 Mozilla Foundation
 //
 // This program is made available under an ISC-style license.  See the
 // accompanying file LICENSE for details
 
 use ClientStream;
+use assert_not_in_callback;
 use audioipc::{self, ClientMessage, Connection, ServerMessage, messages};
 use cubeb_backend::{Context, Ops};
-use cubeb_core::{DeviceId, DeviceType, Error, Result, StreamParams, ffi};
+use cubeb_core::{DeviceId, DeviceType, Error, ErrorCode, Result, StreamParams, ffi};
 use cubeb_core::binding::Binding;
 use std::ffi::{CStr, CString};
 use std::mem;
 use std::os::raw::c_void;
 use std::os::unix::net::UnixStream;
 use std::sync::{Mutex, MutexGuard};
 use stream;
 
@@ -28,79 +29,96 @@ macro_rules! t(
             Err(_) => return Err(Error::default())
         }
     ));
 
 pub const CLIENT_OPS: Ops = capi_new!(ClientContext, ClientStream);
 
 impl ClientContext {
     #[doc(hidden)]
-    pub fn conn(&self) -> MutexGuard<Connection> {
+    pub fn connection(&self) -> MutexGuard<Connection> {
         self.connection.lock().unwrap()
     }
 }
 
 impl Context for ClientContext {
     fn init(_context_name: Option<&CStr>) -> Result<*mut ffi::cubeb> {
+        assert_not_in_callback();
         // TODO: encapsulate connect, etc inside audioipc.
         let stream = t!(UnixStream::connect(audioipc::get_uds_path()));
         let ctx = Box::new(ClientContext {
             _ops: &CLIENT_OPS as *const _,
             connection: Mutex::new(Connection::new(stream))
         });
         Ok(Box::into_raw(ctx) as *mut _)
     }
 
     fn backend_id(&self) -> &'static CStr {
+        // HACK: This is called reentrantly from Gecko's AudioStream::DataCallback.
+        //assert_not_in_callback();
         unsafe { CStr::from_ptr(b"remote\0".as_ptr() as *const _) }
     }
 
     fn max_channel_count(&self) -> Result<u32> {
-        send_recv!(self.conn(), ContextGetMaxChannelCount => ContextMaxChannelCount())
+        // HACK: This needs to be reentrant as MSG calls it from within data_callback.
+        //assert_not_in_callback();
+        //let mut conn = self.connection();
+        //send_recv!(conn, ContextGetMaxChannelCount => ContextMaxChannelCount())
+        warn!("Context::max_channel_count lying about result until reentrancy issues resolved.");
+        Ok(2)
     }
 
     fn min_latency(&self, params: &StreamParams) -> Result<u32> {
+        assert_not_in_callback();
         let params = messages::StreamParams::from(unsafe { &*params.raw() });
-        send_recv!(self.conn(), ContextGetMinLatency(params) => ContextMinLatency())
+        let mut conn = self.connection();
+        send_recv!(conn, ContextGetMinLatency(params) => ContextMinLatency())
     }
 
     fn preferred_sample_rate(&self) -> Result<u32> {
-        send_recv!(self.conn(), ContextGetPreferredSampleRate => ContextPreferredSampleRate())
+        assert_not_in_callback();
+        let mut conn = self.connection();
+        send_recv!(conn, ContextGetPreferredSampleRate => ContextPreferredSampleRate())
     }
 
     fn preferred_channel_layout(&self) -> Result<ffi::cubeb_channel_layout> {
-        send_recv!(self.conn(), ContextGetPreferredChannelLayout => ContextPreferredChannelLayout())
+        assert_not_in_callback();
+        let mut conn = self.connection();
+        send_recv!(conn, ContextGetPreferredChannelLayout => ContextPreferredChannelLayout())
     }
 
     fn enumerate_devices(&self, devtype: DeviceType) -> Result<ffi::cubeb_device_collection> {
+        assert_not_in_callback();
+        let mut conn = self.connection();
         let v: Vec<ffi::cubeb_device_info> =
-            match send_recv!(self.conn(), ContextGetDeviceEnumeration(devtype.bits()) => ContextEnumeratedDevices()) {
+            match send_recv!(conn, ContextGetDeviceEnumeration(devtype.bits()) => ContextEnumeratedDevices()) {
                 Ok(mut v) => v.drain(..).map(|i| i.into()).collect(),
                 Err(e) => return Err(e),
             };
         let vs = v.into_boxed_slice();
         let coll = ffi::cubeb_device_collection {
             count: vs.len(),
             device: vs.as_ptr()
         };
         // Giving away the memory owned by vs.  Don't free it!
         // Reclaimed in `device_collection_destroy`.
         mem::forget(vs);
         Ok(coll)
     }
 
     fn device_collection_destroy(&self, collection: *mut ffi::cubeb_device_collection) {
+        assert_not_in_callback();
         unsafe {
             let coll = &*collection;
             let mut devices = Vec::from_raw_parts(
                 coll.device as *mut ffi::cubeb_device_info,
                 coll.count,
                 coll.count
             );
-            for dev in devices.iter_mut() {
+            for dev in &mut devices {
                 if !dev.device_id.is_null() {
                     let _ = CString::from_raw(dev.device_id as *mut _);
                 }
                 if !dev.group_id.is_null() {
                     let _ = CString::from_raw(dev.group_id as *mut _);
                 }
                 if !dev.vendor_name.is_null() {
                     let _ = CString::from_raw(dev.vendor_name as *mut _);
@@ -120,16 +138,17 @@ impl Context for ClientContext {
         output_device: DeviceId,
         output_stream_params: Option<&ffi::cubeb_stream_params>,
         latency_frame: u32,
         // These params aren't sent to the server
         data_callback: ffi::cubeb_data_callback,
         state_callback: ffi::cubeb_state_callback,
         user_ptr: *mut c_void,
     ) -> Result<*mut ffi::cubeb_stream> {
+        assert_not_in_callback();
 
         fn opt_stream_params(p: Option<&ffi::cubeb_stream_params>) -> Option<messages::StreamParams> {
             match p {
                 Some(raw) => Some(messages::StreamParams::from(raw)),
                 None => None,
             }
         }
 
@@ -144,27 +163,38 @@ impl Context for ClientContext {
         let init_params = messages::StreamInitParams {
             stream_name: stream_name,
             input_device: input_device.raw() as _,
             input_stream_params: input_stream_params,
             output_device: output_device.raw() as _,
             output_stream_params: output_stream_params,
             latency_frames: latency_frame
         };
-        stream::init(&self, init_params, data_callback, state_callback, user_ptr)
+        stream::init(self, init_params, data_callback, state_callback, user_ptr)
     }
 
     fn register_device_collection_changed(
         &self,
         _dev_type: DeviceType,
         _collection_changed_callback: ffi::cubeb_device_collection_changed_callback,
         _user_ptr: *mut c_void,
     ) -> Result<()> {
+        assert_not_in_callback();
         Ok(())
     }
 }
 
 impl Drop for ClientContext {
     fn drop(&mut self) {
+        let mut conn = self.connection();
         info!("ClientContext drop...");
-        let _: Result<()> = send_recv!(self.conn(), ClientDisconnect => ClientDisconnected);
+        let r = conn.send(ServerMessage::ClientDisconnect);
+        if r.is_err() {
+            debug!("ClientContext::Drop send error={:?}", r);
+        } else {
+            let r = conn.receive();
+            if let Ok(ClientMessage::ClientDisconnected) = r {
+            } else {
+                debug!("ClientContext::Drop receive error={:?}", r);
+            }
+        }
     }
 }
--- a/media/audioipc/client/src/lib.rs
+++ b/media/audioipc/client/src/lib.rs
@@ -16,13 +16,28 @@ mod context;
 mod stream;
 
 use context::ClientContext;
 use cubeb_backend::capi;
 use cubeb_core::ffi;
 use std::os::raw::{c_char, c_int};
 use stream::ClientStream;
 
+thread_local!(static IN_CALLBACK: std::cell::RefCell<bool> = std::cell::RefCell::new(false));
+
+fn set_in_callback(in_callback: bool) {
+    IN_CALLBACK.with(|b| {
+        assert_eq!(*b.borrow(), !in_callback);
+        *b.borrow_mut() = in_callback;
+    });
+}
+
+fn assert_not_in_callback() {
+    IN_CALLBACK.with(|b| {
+        assert_eq!(*b.borrow(), false);
+    });
+}
+
 #[no_mangle]
 /// Entry point from C code.
 pub unsafe extern "C" fn audioipc_client_init(c: *mut *mut ffi::cubeb, context_name: *const c_char) -> c_int {
     capi::capi_init::<ClientContext>(c, context_name)
 }
--- a/media/audioipc/client/src/send_recv.rs
+++ b/media/audioipc/client/src/send_recv.rs
@@ -12,31 +12,41 @@ macro_rules! send_recv {
         send_recv!(__send $conn, $smsg, $($a),*);
         send_recv!(__recv $conn, $rmsg)
     }};
     ($conn:expr, $smsg:ident($($a:expr),*) => $rmsg:ident()) => {{
         send_recv!(__send $conn, $smsg, $($a),*);
         send_recv!(__recv $conn, $rmsg __result)
     }};
     //
-    (__send $conn:expr, $smsg:ident) => (
-        $conn.send(ServerMessage::$smsg)
-            .unwrap();
-    );
-    (__send $conn:expr, $smsg:ident, $($a:expr),*) => (
-        $conn.send(ServerMessage::$smsg($($a),*))
-            .unwrap();
-    );
-    (__recv $conn:expr, $rmsg:ident) => (
-        if let ClientMessage::$rmsg = $conn.receive().unwrap() {
+    (__send $conn:expr, $smsg:ident) => ({
+        let r = $conn.send(ServerMessage::$smsg);
+        if r.is_err() {
+            debug!("send error - got={:?}", r);
+            return Err(ErrorCode::Error.into());
+        }
+    });
+    (__send $conn:expr, $smsg:ident, $($a:expr),*) => ({
+        let r = $conn.send(ServerMessage::$smsg($($a),*));
+        if r.is_err() {
+            debug!("send error - got={:?}", r);
+            return Err(ErrorCode::Error.into());
+        }
+    });
+    (__recv $conn:expr, $rmsg:ident) => ({
+        let r = $conn.receive().unwrap();
+        if let ClientMessage::$rmsg = r {
             Ok(())
         } else {
-            panic!("wrong message received");
+            debug!("receive error - got={:?}", r);
+            Err(ErrorCode::Error.into())
         }
-    );
-    (__recv $conn:expr, $rmsg:ident __result) => (
-        if let ClientMessage::$rmsg(v) = $conn.receive().unwrap() {
+    });
+    (__recv $conn:expr, $rmsg:ident __result) => ({
+        let r = $conn.receive().unwrap();
+        if let ClientMessage::$rmsg(v) = r {
             Ok(v)
         } else {
-            panic!("wrong message received");
+            debug!("receive error - got={:?}", r);
+            Err(ErrorCode::Error.into())
         }
-    )
+    })
 }
--- a/media/audioipc/client/src/stream.rs
+++ b/media/audioipc/client/src/stream.rs
@@ -1,214 +1,277 @@
 // Copyright © 2017 Mozilla Foundation
 //
 // This program is made available under an ISC-style license.  See the
 // accompanying file LICENSE for details
 
 use ClientContext;
+use {set_in_callback, assert_not_in_callback};
 use audioipc::{ClientMessage, Connection, ServerMessage, messages};
-use audioipc::shm::{SharedMemSlice, SharedMemMutSlice};
+use audioipc::shm::{SharedMemMutSlice, SharedMemSlice};
 use cubeb_backend::Stream;
 use cubeb_core::{ErrorCode, Result, ffi};
 use std::ffi::CString;
+use std::fs::File;
 use std::os::raw::c_void;
 use std::os::unix::io::FromRawFd;
-use std::fs::File;
 use std::ptr;
 use std::thread;
 
 // TODO: Remove and let caller allocate based on cubeb backend requirements.
 const SHM_AREA_SIZE: usize = 2 * 1024 * 1024;
 
 pub struct ClientStream<'ctx> {
     // This must be a reference to Context for cubeb, cubeb accesses stream methods via stream->context->ops
     context: &'ctx ClientContext,
     token: usize,
     join_handle: Option<thread::JoinHandle<()>>
 }
 
 fn stream_thread(
     mut conn: Connection,
-    input_shm: SharedMemSlice,
+    input_shm: &SharedMemSlice,
     mut output_shm: SharedMemMutSlice,
     data_cb: ffi::cubeb_data_callback,
     state_cb: ffi::cubeb_state_callback,
     user_ptr: usize,
 ) {
     loop {
         let r = match conn.receive::<ClientMessage>() {
             Ok(r) => r,
             Err(e) => {
                 debug!("stream_thread: Failed to receive message: {:?}", e);
-                continue;
+                return;
             },
         };
 
         match r {
             ClientMessage::StreamDestroyed => {
                 info!("stream_thread: Shutdown callback thread.");
                 return;
             },
             ClientMessage::StreamDataCallback(nframes, frame_size) => {
-                info!(
+                trace!(
                     "stream_thread: Data Callback: nframes={} frame_size={}",
                     nframes,
                     frame_size
                 );
                 // TODO: This is proof-of-concept. Make it better.
-                let input_ptr: *const u8 = input_shm.get_slice(nframes as usize * frame_size).unwrap().as_ptr();
-                let output_ptr: *mut u8 = output_shm.get_mut_slice(nframes as usize * frame_size).unwrap().as_mut_ptr();
+                let input_ptr: *const u8 = input_shm
+                    .get_slice(nframes as usize * frame_size)
+                    .unwrap()
+                    .as_ptr();
+                let output_ptr: *mut u8 = output_shm
+                    .get_mut_slice(nframes as usize * frame_size)
+                    .unwrap()
+                    .as_mut_ptr();
+                set_in_callback(true);
                 let nframes = data_cb(
                     ptr::null_mut(),
                     user_ptr as *mut c_void,
                     input_ptr as *const _,
                     output_ptr as *mut _,
                     nframes as _
                 );
-                conn.send(ServerMessage::StreamDataCallback(nframes as isize)).unwrap();
+                set_in_callback(false);
+                let r = conn.send(ServerMessage::StreamDataCallback(nframes as isize));
+                if r.is_err() {
+                    debug!("stream_thread: Failed to send StreamDataCallback: {:?}", r);
+                    return;
+                }
             },
             ClientMessage::StreamStateCallback(state) => {
                 info!("stream_thread: State Callback: {:?}", state);
+                set_in_callback(true);
                 state_cb(ptr::null_mut(), user_ptr as *mut _, state);
+                set_in_callback(false);
             },
             m => {
                 info!("Unexpected ClientMessage: {:?}", m);
             },
         }
     }
 }
 
 impl<'ctx> ClientStream<'ctx> {
     fn init(
         ctx: &'ctx ClientContext,
         init_params: messages::StreamInitParams,
         data_callback: ffi::cubeb_data_callback,
         state_callback: ffi::cubeb_state_callback,
         user_ptr: *mut c_void,
     ) -> Result<*mut ffi::cubeb_stream> {
+        assert_not_in_callback();
+        let mut conn = ctx.connection();
 
-        ctx.conn()
-            .send(ServerMessage::StreamInit(init_params))
-            .unwrap();
+        let r = conn.send(ServerMessage::StreamInit(init_params));
+        if r.is_err() {
+            debug!("ClientStream::init: Failed to send StreamInit: {:?}", r);
+            return Err(ErrorCode::Error.into());
+        }
 
-        let r = match ctx.conn().receive_with_fd::<ClientMessage>() {
+        let r = match conn.receive_with_fd::<ClientMessage>() {
             Ok(r) => r,
             Err(_) => return Err(ErrorCode::Error.into()),
         };
 
-        let (token, conn) = match r {
-            (ClientMessage::StreamCreated(tok), Some(fd)) => (tok, unsafe {
-                Connection::from_raw_fd(fd)
-            }),
-            (ClientMessage::StreamCreated(_), None) => {
-                debug!("Missing fd!");
-                return Err(ErrorCode::Error.into());
+        let (token, conn2) = match r {
+            ClientMessage::StreamCreated(tok) => {
+                let fd = conn.take_fd();
+                if fd.is_none() {
+                    debug!("Missing fd!");
+                    return Err(ErrorCode::Error.into());
+                }
+                (tok, unsafe { Connection::from_raw_fd(fd.unwrap()) })
             },
-            (m, _) => {
+            m => {
                 debug!("Unexpected message: {:?}", m);
                 return Err(ErrorCode::Error.into());
             },
         };
 
         // TODO: It'd be nicer to receive these two fds as part of
         // StreamCreated, but that requires changing sendmsg/recvmsg to
         // support multiple fds.
-        let r = match ctx.conn().receive_with_fd::<ClientMessage>() {
+        let r = match conn.receive_with_fd::<ClientMessage>() {
             Ok(r) => r,
             Err(_) => return Err(ErrorCode::Error.into()),
         };
 
         let input_file = match r {
-            (ClientMessage::StreamCreatedInputShm, Some(fd)) => unsafe {
-                File::from_raw_fd(fd)
+            ClientMessage::StreamCreatedInputShm => {
+                let fd = conn.take_fd();
+                if fd.is_none() {
+                    debug!("Missing fd!");
+                    return Err(ErrorCode::Error.into());
+                }
+                unsafe { File::from_raw_fd(fd.unwrap()) }
             },
-            (m, _) => {
+            m => {
                 debug!("Unexpected message: {:?}", m);
                 return Err(ErrorCode::Error.into());
             },
         };
 
-        let input_shm = SharedMemSlice::from(input_file,
-                                             SHM_AREA_SIZE).unwrap();
+        let input_shm = SharedMemSlice::from(input_file, SHM_AREA_SIZE).unwrap();
 
-        let r = match ctx.conn().receive_with_fd::<ClientMessage>() {
+        let r = match conn.receive_with_fd::<ClientMessage>() {
             Ok(r) => r,
             Err(_) => return Err(ErrorCode::Error.into()),
         };
 
         let output_file = match r {
-            (ClientMessage::StreamCreatedOutputShm, Some(fd)) => unsafe {
-                File::from_raw_fd(fd)
+            ClientMessage::StreamCreatedOutputShm => {
+                let fd = conn.take_fd();
+                if fd.is_none() {
+                    debug!("Missing fd!");
+                    return Err(ErrorCode::Error.into());
+                }
+                unsafe { File::from_raw_fd(fd.unwrap()) }
             },
-            (m, _) => {
+            m => {
                 debug!("Unexpected message: {:?}", m);
                 return Err(ErrorCode::Error.into());
             },
         };
 
-        let output_shm = SharedMemMutSlice::from(output_file,
-                                                 SHM_AREA_SIZE).unwrap();
+        let output_shm = SharedMemMutSlice::from(output_file, SHM_AREA_SIZE).unwrap();
 
         let user_data = user_ptr as usize;
         let join_handle = thread::spawn(move || {
-            stream_thread(conn, input_shm, output_shm, data_callback, state_callback, user_data)
+            stream_thread(
+                conn2,
+                &input_shm,
+                output_shm,
+                data_callback,
+                state_callback,
+                user_data
+            )
         });
 
         Ok(Box::into_raw(Box::new(ClientStream {
             context: ctx,
             token: token,
             join_handle: Some(join_handle)
         })) as _)
     }
 }
 
 impl<'ctx> Drop for ClientStream<'ctx> {
     fn drop(&mut self) {
-        let _: Result<()> = send_recv!(self.context.conn(), StreamDestroy(self.token) => StreamDestroyed);
+        let mut conn = self.context.connection();
+        let r = conn.send(ServerMessage::StreamDestroy(self.token));
+        if r.is_err() {
+            debug!("ClientStream::Drop send error={:?}", r);
+        } else {
+            let r = conn.receive();
+            if let Ok(ClientMessage::StreamDestroyed) = r {
+            } else {
+                debug!("ClientStream::Drop receive error={:?}", r);
+            }
+        }
+        // XXX: This is guaranteed to wait forever if the send failed.
         self.join_handle.take().unwrap().join().unwrap();
     }
 }
 
 impl<'ctx> Stream for ClientStream<'ctx> {
     fn start(&self) -> Result<()> {
-        send_recv!(self.context.conn(), StreamStart(self.token) => StreamStarted)
+        assert_not_in_callback();
+        let mut conn = self.context.connection();
+        send_recv!(conn, StreamStart(self.token) => StreamStarted)
     }
 
     fn stop(&self) -> Result<()> {
-        send_recv!(self.context.conn(), StreamStop(self.token) => StreamStopped)
+        assert_not_in_callback();
+        let mut conn = self.context.connection();
+        send_recv!(conn, StreamStop(self.token) => StreamStopped)
     }
 
     fn reset_default_device(&self) -> Result<()> {
-        send_recv!(self.context.conn(), StreamResetDefaultDevice(self.token) => StreamDefaultDeviceReset)
+        assert_not_in_callback();
+        let mut conn = self.context.connection();
+        send_recv!(conn, StreamResetDefaultDevice(self.token) => StreamDefaultDeviceReset)
     }
 
     fn position(&self) -> Result<u64> {
-        send_recv!(self.context.conn(), StreamGetPosition(self.token) => StreamPosition())
+        assert_not_in_callback();
+        let mut conn = self.context.connection();
+        send_recv!(conn, StreamGetPosition(self.token) => StreamPosition())
     }
 
     fn latency(&self) -> Result<u32> {
-        send_recv!(self.context.conn(), StreamGetLatency(self.token) => StreamLatency())
+        assert_not_in_callback();
+        let mut conn = self.context.connection();
+        send_recv!(conn, StreamGetLatency(self.token) => StreamLatency())
     }
 
     fn set_volume(&self, volume: f32) -> Result<()> {
-        send_recv!(self.context.conn(), StreamSetVolume(self.token, volume) => StreamVolumeSet)
+        assert_not_in_callback();
+        let mut conn = self.context.connection();
+        send_recv!(conn, StreamSetVolume(self.token, volume) => StreamVolumeSet)
     }
 
     fn set_panning(&self, panning: f32) -> Result<()> {
-        send_recv!(self.context.conn(), StreamSetPanning(self.token, panning) => StreamPanningSet)
+        assert_not_in_callback();
+        let mut conn = self.context.connection();
+        send_recv!(conn, StreamSetPanning(self.token, panning) => StreamPanningSet)
     }
 
     fn current_device(&self) -> Result<*const ffi::cubeb_device> {
-        match send_recv!(self.context.conn(), StreamGetCurrentDevice(self.token) => StreamCurrentDevice()) {
+        assert_not_in_callback();
+        let mut conn = self.context.connection();
+        match send_recv!(conn, StreamGetCurrentDevice(self.token) => StreamCurrentDevice()) {
             Ok(d) => Ok(Box::into_raw(Box::new(d.into()))),
             Err(e) => Err(e),
         }
     }
 
     fn device_destroy(&self, device: *const ffi::cubeb_device) -> Result<()> {
+        assert_not_in_callback();
         // It's all unsafe...
         if !device.is_null() {
             unsafe {
                 if !(*device).output_name.is_null() {
                     let _ = CString::from_raw((*device).output_name as *mut _);
                 }
                 if !(*device).input_name.is_null() {
                     let _ = CString::from_raw((*device).input_name as *mut _);
@@ -219,16 +282,17 @@ impl<'ctx> Stream for ClientStream<'ctx>
         Ok(())
     }
 
     // TODO: How do we call this back? On what thread?
     fn register_device_changed_callback(
         &self,
         _device_changed_callback: ffi::cubeb_device_changed_callback,
     ) -> Result<()> {
+        assert_not_in_callback();
         Ok(())
     }
 }
 
 pub fn init(
     ctx: &ClientContext,
     init_params: messages::StreamInitParams,
     data_callback: ffi::cubeb_data_callback,
--- a/media/audioipc/server/Cargo.toml
+++ b/media/audioipc/server/Cargo.toml
@@ -1,17 +1,18 @@
 [package]
 name = "audioipc-server"
 version = "0.1.0"
 authors = ["Dan Glastonbury <dan.glastonbury@gmail.com>"]
 description = "Remote cubeb server"
 
 [dependencies]
 audioipc = { path = "../audioipc" }
+cubeb-core = { path = "../../cubeb-rs/cubeb-core" }
 cubeb = { path = "../../cubeb-rs/cubeb-api" }
-cubeb-core = { path = "../../cubeb-rs/cubeb-core" }
+bytes = "0.4"
 error-chain = "0.10.0"
 lazycell = "^0.4"
 log = "^0.3.6"
 mio = "0.6.7"
 mio-uds = "0.6.4"
 slab = "0.3.0"
 
--- a/media/audioipc/server/src/channel.rs
+++ b/media/audioipc/server/src/channel.rs
@@ -109,20 +109,20 @@ impl Evented for ReceiverCtl {
         let (registration, set_readiness) = Registration::new2();
         try!(registration.register(poll, token, interest, opts));
 
         if self.inner.pending.load(Ordering::Relaxed) > 0 {
             // TODO: Don't drop readiness
             let _ = set_readiness.set_readiness(Ready::readable());
         }
 
-        self.registration.fill(registration).ok().expect(
+        self.registration.fill(registration).expect(
             "unexpected state encountered"
         );
-        self.inner.set_readiness.fill(set_readiness).ok().expect(
+        self.inner.set_readiness.fill(set_readiness).expect(
             "unexpected state encountered"
         );
 
         Ok(())
     }
 
     fn reregister(&self, poll: &Poll, token: Token, interest: Ready, opts: PollOpt) -> io::Result<()> {
         match self.registration.borrow() {
@@ -131,17 +131,17 @@ impl Evented for ReceiverCtl {
                 io::ErrorKind::Other,
                 "receiver not registered"
             )),
         }
     }
 
     fn deregister(&self, poll: &Poll) -> io::Result<()> {
         match self.registration.borrow() {
-            Some(registration) => <Registration as Evented>::deregister(&registration, poll),
+            Some(registration) => <Registration as Evented>::deregister(registration, poll),
             None => Err(io::Error::new(
                 io::ErrorKind::Other,
                 "receiver not registered"
             )),
         }
     }
 }
 
@@ -187,29 +187,29 @@ impl<T> From<io::Error> for TrySendError
 /*
  *
  * ===== Implement Error, Debug and Display for Errors =====
  *
  */
 
 impl<T: Any> error::Error for SendError<T> {
     fn description(&self) -> &str {
-        match self {
-            &SendError::Io(ref io_err) => io_err.description(),
-            &SendError::Disconnected(..) => "Disconnected",
+        match *self {
+            SendError::Io(ref io_err) => io_err.description(),
+            SendError::Disconnected(..) => "Disconnected",
         }
     }
 }
 
 impl<T: Any> error::Error for TrySendError<T> {
     fn description(&self) -> &str {
-        match self {
-            &TrySendError::Io(ref io_err) => io_err.description(),
-            &TrySendError::Full(..) => "Full",
-            &TrySendError::Disconnected(..) => "Disconnected",
+        match *self {
+            TrySendError::Io(ref io_err) => io_err.description(),
+            TrySendError::Full(..) => "Full",
+            TrySendError::Disconnected(..) => "Disconnected",
         }
     }
 }
 
 impl<T> fmt::Debug for SendError<T> {
     fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
         format_send_error(self, f)
     }
@@ -230,22 +230,22 @@ impl<T> fmt::Debug for TrySendError<T> {
 impl<T> fmt::Display for TrySendError<T> {
     fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
         format_try_send_error(self, f)
     }
 }
 
 #[inline]
 fn format_send_error<T>(e: &SendError<T>, f: &mut fmt::Formatter) -> fmt::Result {
-    match e {
-        &SendError::Io(ref io_err) => write!(f, "{}", io_err),
-        &SendError::Disconnected(..) => write!(f, "Disconnected"),
+    match *e {
+        SendError::Io(ref io_err) => write!(f, "{}", io_err),
+        SendError::Disconnected(..) => write!(f, "Disconnected"),
     }
 }
 
 #[inline]
 fn format_try_send_error<T>(e: &TrySendError<T>, f: &mut fmt::Formatter) -> fmt::Result {
-    match e {
-        &TrySendError::Io(ref io_err) => write!(f, "{}", io_err),
-        &TrySendError::Full(..) => write!(f, "Full"),
-        &TrySendError::Disconnected(..) => write!(f, "Disconnected"),
+    match *e {
+        TrySendError::Io(ref io_err) => write!(f, "{}", io_err),
+        TrySendError::Full(..) => write!(f, "Full"),
+        TrySendError::Disconnected(..) => write!(f, "Disconnected"),
     }
 }
--- a/media/audioipc/server/src/lib.rs
+++ b/media/audioipc/server/src/lib.rs
@@ -1,30 +1,38 @@
 #[macro_use]
 extern crate error_chain;
 
 #[macro_use]
 extern crate log;
 
 extern crate audioipc;
+extern crate bytes;
 extern crate cubeb;
 extern crate cubeb_core;
 extern crate lazycell;
 extern crate mio;
 extern crate mio_uds;
 extern crate slab;
 
-use audioipc::messages::{ClientMessage, DeviceInfo, ServerMessage, StreamParams};
+use audioipc::AutoCloseFd;
+use audioipc::async::{Async, AsyncRecvFd, AsyncSendFd};
+use audioipc::codec::{Decoder, encode};
+use audioipc::messages::{ClientMessage, DeviceInfo, ServerMessage, StreamInitParams, StreamParams};
 use audioipc::shm::{SharedMemReader, SharedMemWriter};
+use bytes::{Bytes, BytesMut};
 use cubeb_core::binding::Binding;
 use cubeb_core::ffi;
-use mio::Token;
-use mio_uds::UnixListener;
+use mio::{Ready, Token};
+use mio_uds::{UnixListener, UnixStream};
 use std::{slice, thread};
+use std::collections::VecDeque;
+use std::collections::HashSet;
 use std::convert::From;
+use std::io::Cursor;
 use std::os::raw::c_void;
 use std::os::unix::prelude::*;
 use std::sync::Arc;
 use std::sync::atomic::{AtomicBool, Ordering};
 
 mod channel;
 
 pub mod errors {
@@ -47,44 +55,46 @@ const SHM_AREA_SIZE: usize = 2 * 1024 * 
 // TODO: this should forward to the client.
 struct Callback {
     /// Size of input frame in bytes
     input_frame_size: u16,
     /// Size of output frame in bytes
     output_frame_size: u16,
     connection: audioipc::Connection,
     input_shm: SharedMemWriter,
-    output_shm: SharedMemReader,
+    output_shm: SharedMemReader
 }
 
 impl cubeb::StreamCallback for Callback {
     type Frame = u8;
 
     fn data_callback(&mut self, input: &[u8], output: &mut [u8]) -> isize {
-        info!("Stream data callback: {} {}", input.len(), output.len());
+        trace!("Stream data callback: {} {}", input.len(), output.len());
 
         // len is of input and output is frame len. Turn these into the real lengths.
         let real_input = unsafe {
             let size_bytes = input.len() * self.input_frame_size as usize;
             slice::from_raw_parts(input.as_ptr(), size_bytes)
         };
         let real_output = unsafe {
             let size_bytes = output.len() * self.output_frame_size as usize;
-            info!("Resize output to {}", size_bytes);
+            trace!("Resize output to {}", size_bytes);
             slice::from_raw_parts_mut(output.as_mut_ptr(), size_bytes)
         };
 
-        self.input_shm.write(&real_input).unwrap();
+        self.input_shm.write(real_input).unwrap();
 
-        self.connection
-            .send(ClientMessage::StreamDataCallback(
-                output.len() as isize,
-                self.output_frame_size as usize
-            ))
-            .unwrap();
+        let r = self.connection.send(ClientMessage::StreamDataCallback(
+            output.len() as isize,
+            self.output_frame_size as usize
+        ));
+        if r.is_err() {
+            debug!("data_callback: Failed to send to client - got={:?}", r);
+            return -1;
+        }
 
         let r = self.connection.receive();
         match r {
             Ok(ServerMessage::StreamDataCallback(cb_result)) => {
                 if cb_result >= 0 {
                     let len = cb_result as usize * self.output_frame_size as usize;
                     self.output_shm.read(&mut real_output[..len]).unwrap();
                     cb_result
@@ -96,372 +106,486 @@ impl cubeb::StreamCallback for Callback 
                 debug!("Unexpected message {:?} during callback", r);
                 -1
             },
         }
     }
 
     fn state_callback(&mut self, state: cubeb::State) {
         info!("Stream state callback: {:?}", state);
+        // TODO: Share this conversion with the same code in cubeb-rs?
+        let state = match state {
+            cubeb::State::Started => ffi::CUBEB_STATE_STARTED,
+            cubeb::State::Stopped => ffi::CUBEB_STATE_STOPPED,
+            cubeb::State::Drained => ffi::CUBEB_STATE_DRAINED,
+            cubeb::State::Error => ffi::CUBEB_STATE_ERROR,
+        };
+        let r = self.connection.send(
+            ClientMessage::StreamStateCallback(state)
+        );
+        if r.is_err() {
+            debug!("state_callback: Failed to send to client - got={:?}", r);
+        }
     }
 }
 
 impl Drop for Callback {
     fn drop(&mut self) {
-        self.connection
-            .send(ClientMessage::StreamDestroyed)
-            .unwrap();
+        let r = self.connection.send(ClientMessage::StreamDestroyed);
+        if r.is_err() {
+            debug!("Callback::drop failed to send StreamDestroyed = {:?}", r);
+        }
     }
 }
 
 type Slab<T> = slab::Slab<T, Token>;
 type StreamSlab = slab::Slab<cubeb::Stream<Callback>, usize>;
 
 // TODO: Server token must be outside range used by server.connections slab.
 // usize::MAX is already used internally in mio.
 const QUIT: Token = Token(std::usize::MAX - 2);
 const SERVER: Token = Token(std::usize::MAX - 1);
 
 struct ServerConn {
-    connection: audioipc::Connection,
+    //connection: audioipc::Connection,
+    io: UnixStream,
     token: Option<Token>,
-    streams: StreamSlab
+    streams: StreamSlab,
+    decoder: Decoder,
+    recv_buffer: BytesMut,
+    send_buffer: BytesMut,
+    pending_send: VecDeque<(Bytes, Option<AutoCloseFd>)>,
+    device_ids: HashSet<usize>
 }
 
 impl ServerConn {
-    fn new<FD>(fd: FD) -> ServerConn
-    where
-        FD: IntoRawFd,
-    {
-        ServerConn {
-            connection: unsafe { audioipc::Connection::from_raw_fd(fd.into_raw_fd()) },
+    fn new(io: UnixStream) -> ServerConn {
+        let mut sc = ServerConn {
+            io: io,
             token: None,
             // TODO: Handle increasing slab size. Pick a good default size.
-            streams: StreamSlab::with_capacity(64)
+            streams: StreamSlab::with_capacity(64),
+            decoder: Decoder::new(),
+            recv_buffer: BytesMut::with_capacity(4096),
+            send_buffer: BytesMut::with_capacity(4096),
+            pending_send: VecDeque::new(),
+            device_ids: HashSet::new()
+        };
+        sc.device_ids.insert(0); // nullptr is always a valid (default) device id.
+        sc
+    }
+
+    fn process_read(&mut self, context: &Result<cubeb::Context>) -> Result<Ready> {
+        // According to *something*, processing non-blocking stream
+        // should attempt to read until EWOULDBLOCK is returned.
+        while let Async::Ready((n, fd)) = try!(self.io.recv_buf_fd(&mut self.recv_buffer)) {
+            trace!("Received {} bytes and fd {:?}", n, fd);
+
+            // Reading 0 signifies EOF
+            if n == 0 {
+                return Err(
+                    ::errors::ErrorKind::AudioIPC(::audioipc::errors::ErrorKind::Disconnected).into()
+                );
+            }
+
+            if let Some(fd) = fd {
+                trace!("Unexpectedly received an fd from client.");
+                let _ = unsafe { AutoCloseFd::from_raw_fd(fd) };
+            }
+
+            // Process all the complete messages contained in
+            // send.recv_buffer.  It's possible that a read might not
+            // return a complete message, so self.decoder.decode
+            // returns Ok(None).
+            loop {
+                match self.decoder.decode::<ServerMessage>(&mut self.recv_buffer) {
+                    Ok(Some(msg)) => {
+                        info!("ServerConn::process: got {:?}", msg);
+                        try!(self.process_msg(&msg, context));
+                    },
+                    Ok(None) => {
+                        break;
+                    },
+                    Err(e) => {
+                        return Err(e).chain_err(|| "Failed to decoder ServerMessage");
+                    },
+                }
+            }
         }
+
+        // Send any pending responses to client.
+        self.flush_pending_send()
     }
 
-    fn process(&mut self, poll: &mut mio::Poll, context: &Result<Option<cubeb::Context>>) -> Result<()> {
-        let r = self.connection.receive();
-        info!("ServerConn::process: got {:?}", r);
+    // Process a request coming from the client.
+    fn process_msg(&mut self, msg: &ServerMessage, context: &Result<cubeb::Context>) -> Result<()> {
+        let resp: ClientMessage = if let Ok(ref context) = *context {
+            if let ServerMessage::StreamInit(ref params) = *msg {
+                return self.process_stream_init(context, params);
+            };
+
+            match *msg {
+                ServerMessage::ClientConnect => {
+                    panic!("already connected");
+                },
+
+                ServerMessage::ClientDisconnect => {
+                    // TODO:
+                    //self.connection.client_disconnect();
+                    ClientMessage::ClientDisconnected
+                },
+
+                ServerMessage::ContextGetBackendId => ClientMessage::ContextBackendId(),
+
+                ServerMessage::ContextGetMaxChannelCount => {
+                    context
+                        .max_channel_count()
+                        .map(ClientMessage::ContextMaxChannelCount)
+                        .unwrap_or_else(error)
+                },
+
+                ServerMessage::ContextGetMinLatency(ref params) => {
+                    let format = cubeb::SampleFormat::from(params.format);
+                    let layout = cubeb::ChannelLayout::from(params.layout);
+
+                    let params = cubeb::StreamParamsBuilder::new()
+                        .format(format)
+                        .rate(u32::from(params.rate))
+                        .channels(u32::from(params.channels))
+                        .layout(layout)
+                        .take();
+
+                    context
+                        .min_latency(&params)
+                        .map(ClientMessage::ContextMinLatency)
+                        .unwrap_or_else(error)
+                },
+
+                ServerMessage::ContextGetPreferredSampleRate => {
+                    context
+                        .preferred_sample_rate()
+                        .map(ClientMessage::ContextPreferredSampleRate)
+                        .unwrap_or_else(error)
+                },
+
+                ServerMessage::ContextGetPreferredChannelLayout => {
+                    context
+                        .preferred_channel_layout()
+                        .map(|l| ClientMessage::ContextPreferredChannelLayout(l as _))
+                        .unwrap_or_else(error)
+                },
+
+                ServerMessage::ContextGetDeviceEnumeration(device_type) => {
+                    context
+                        .enumerate_devices(cubeb::DeviceType::from_bits_truncate(device_type))
+                        .map(|devices| {
+                            let v: Vec<DeviceInfo> = devices.iter()
+                                                            .map(|i| i.raw().into())
+                                                            .collect();
+                            for i in &v {
+                                self.device_ids.insert(i.devid);
+                            }
+                            ClientMessage::ContextEnumeratedDevices(v)
+                        })
+                        .unwrap_or_else(error)
+                },
 
-        if let &Ok(Some(ref ctx)) = context {
-            // TODO: Might need a simple state machine to deal with
-            // create/use/destroy ordering, etc.
-            // TODO: receive() and all this handling should be moved out
-            // of this event loop code.
-            let msg = try!(r);
-            let _ = try!(self.process_msg(&msg, ctx));
+                ServerMessage::StreamInit(_) => {
+                    panic!("StreamInit should have already been handled.");
+                },
+
+                ServerMessage::StreamDestroy(stm_tok) => {
+                    self.streams.remove(stm_tok);
+                    ClientMessage::StreamDestroyed
+                },
+
+                ServerMessage::StreamStart(stm_tok) => {
+                    let _ = self.streams[stm_tok].start();
+                    ClientMessage::StreamStarted
+                },
+
+                ServerMessage::StreamStop(stm_tok) => {
+                    let _ = self.streams[stm_tok].stop();
+                    ClientMessage::StreamStopped
+                },
+
+                ServerMessage::StreamGetPosition(stm_tok) => {
+                    self.streams[stm_tok]
+                        .position()
+                        .map(ClientMessage::StreamPosition)
+                        .unwrap_or_else(error)
+                },
+
+                ServerMessage::StreamGetLatency(stm_tok) => {
+                    self.streams[stm_tok]
+                        .latency()
+                        .map(ClientMessage::StreamLatency)
+                        .unwrap_or_else(error)
+                },
+
+                ServerMessage::StreamSetVolume(stm_tok, volume) => {
+                    self.streams[stm_tok]
+                        .set_volume(volume)
+                        .map(|_| ClientMessage::StreamVolumeSet)
+                        .unwrap_or_else(error)
+                },
+
+                ServerMessage::StreamSetPanning(stm_tok, panning) => {
+                    self.streams[stm_tok]
+                        .set_panning(panning)
+                        .map(|_| ClientMessage::StreamPanningSet)
+                        .unwrap_or_else(error)
+                },
+
+                ServerMessage::StreamGetCurrentDevice(stm_tok) => {
+                    self.streams[stm_tok]
+                        .current_device()
+                        .map(|device| ClientMessage::StreamCurrentDevice(device.into()))
+                        .unwrap_or_else(error)
+                },
+
+                _ => {
+                    bail!("Unexpected Message");
+                },
+            }
         } else {
-            self.send_error(cubeb::Error::new());
+            error(cubeb::Error::new())
+        };
+
+        debug!("process_msg: req={:?}, resp={:?}", msg, resp);
+
+        self.queue_message(resp)
+    }
+
+    // Stream init is special, so it's been separated from process_msg.
+    fn process_stream_init(&mut self, context: &cubeb::Context, params: &StreamInitParams) -> Result<()> {
+        fn opt_stream_params(params: Option<&StreamParams>) -> Option<cubeb::StreamParams> {
+            params.and_then(|p| {
+                let raw = ffi::cubeb_stream_params::from(p);
+                Some(unsafe { cubeb::StreamParams::from_raw(&raw as *const _) })
+            })
         }
 
-        poll.reregister(
-            &self.connection,
-            self.token.unwrap(),
-            mio::Ready::readable(),
-            mio::PollOpt::edge() | mio::PollOpt::oneshot()
-        ).unwrap();
+        fn frame_size_in_bytes(params: Option<cubeb::StreamParams>) -> u16 {
+            params
+                .map(|p| {
+                    let sample_size = match p.format() {
+                        cubeb::SampleFormat::S16LE |
+                        cubeb::SampleFormat::S16BE |
+                        cubeb::SampleFormat::S16NE => 2,
+                        cubeb::SampleFormat::Float32LE |
+                        cubeb::SampleFormat::Float32BE |
+                        cubeb::SampleFormat::Float32NE => 4,
+                    };
+                    let channel_count = p.channels() as u16;
+                    sample_size * channel_count
+                })
+                .unwrap_or(0u16)
+        }
+
+
+        if !self.device_ids.contains(&params.input_device) {
+            bail!("Invalid input_device passed to stream_init");
+        }
+        // TODO: Yuck!
+        let input_device = unsafe { cubeb::DeviceId::from_raw(params.input_device as *const _) };
+
+        if !self.device_ids.contains(&params.output_device) {
+            bail!("Invalid output_device passed to stream_init");
+        }
+        // TODO: Yuck!
+        let output_device = unsafe { cubeb::DeviceId::from_raw(params.output_device as *const _) };
+
+        let latency = params.latency_frames;
+        let mut builder = cubeb::StreamInitOptionsBuilder::new();
+        builder
+            .input_device(input_device)
+            .output_device(output_device)
+            .latency(latency);
+
+        if let Some(ref stream_name) = params.stream_name {
+            builder.stream_name(stream_name);
+        }
+        let input_stream_params = opt_stream_params(params.input_stream_params.as_ref());
+        if let Some(ref isp) = input_stream_params {
+            builder.input_stream_param(isp);
+        }
+        let output_stream_params = opt_stream_params(params.output_stream_params.as_ref());
+        if let Some(ref osp) = output_stream_params {
+            builder.output_stream_param(osp);
+        }
+        let params = builder.take();
+
+        let input_frame_size = frame_size_in_bytes(input_stream_params);
+        let output_frame_size = frame_size_in_bytes(output_stream_params);
+
+        let (conn1, conn2) = audioipc::Connection::pair()?;
+        info!("Created connection pair: {:?}-{:?}", conn1, conn2);
+
+        let (input_shm, input_file) = SharedMemWriter::new(&audioipc::get_shm_path("input"), SHM_AREA_SIZE)?;
+        let (output_shm, output_file) = SharedMemReader::new(&audioipc::get_shm_path("output"), SHM_AREA_SIZE)?;
+
+        let err = match context.stream_init(
+            &params,
+            Callback {
+                input_frame_size: input_frame_size,
+                output_frame_size: output_frame_size,
+                connection: conn2,
+                input_shm: input_shm,
+                output_shm: output_shm
+            }
+        ) {
+            Ok(stream) => {
+                let stm_tok = match self.streams.vacant_entry() {
+                    Some(entry) => {
+                        debug!(
+                            "Registering stream {:?}",
+                            entry.index(),
+                        );
+
+                        entry.insert(stream).index()
+                    },
+                    None => {
+                        // TODO: Turn into error
+                        panic!("Failed to insert stream into slab. No entries");
+                    },
+                };
+
+                try!(self.queue_init_messages(
+                    stm_tok,
+                    conn1,
+                    input_file,
+                    output_file
+                ));
+                None
+            },
+            Err(e) => Some(error(e)),
+        };
+
+        if let Some(err) = err {
+            try!(self.queue_message(err))
+        }
 
         Ok(())
     }
 
-    fn process_msg(&mut self, msg: &ServerMessage, context: &cubeb::Context) -> Result<()> {
-        match msg {
-            &ServerMessage::ClientConnect => {
-                panic!("already connected");
-            },
-            &ServerMessage::ClientDisconnect => {
-                // TODO:
-                //self.connection.client_disconnect();
-                self.connection
-                    .send(ClientMessage::ClientDisconnected)
-                    .unwrap();
-            },
-
-            &ServerMessage::ContextGetBackendId => {},
-
-            &ServerMessage::ContextGetMaxChannelCount => {
-                match context.max_channel_count() {
-                    Ok(channel_count) => {
-                        self.connection
-                            .send(ClientMessage::ContextMaxChannelCount(channel_count))
-                            .unwrap();
-                    },
-                    Err(e) => {
-                        self.send_error(e);
-                    },
-                }
-            },
-
-            &ServerMessage::ContextGetMinLatency(ref params) => {
-
-                let format = cubeb::SampleFormat::from(params.format);
-                let layout = cubeb::ChannelLayout::from(params.layout);
-
-                let params = cubeb::StreamParamsBuilder::new()
-                    .format(format)
-                    .rate(params.rate as _)
-                    .channels(params.channels as _)
-                    .layout(layout)
-                    .take();
-
-                match context.min_latency(&params) {
-                    Ok(latency) => {
-                        self.connection
-                            .send(ClientMessage::ContextMinLatency(latency))
-                            .unwrap();
-                    },
-                    Err(e) => {
-                        self.send_error(e);
-                    },
-                }
-            },
-
-            &ServerMessage::ContextGetPreferredSampleRate => {
-                match context.preferred_sample_rate() {
-                    Ok(rate) => {
-                        self.connection
-                            .send(ClientMessage::ContextPreferredSampleRate(rate))
-                            .unwrap();
-                    },
-                    Err(e) => {
-                        self.send_error(e);
-                    },
-                }
-            },
-
-            &ServerMessage::ContextGetPreferredChannelLayout => {
-                match context.preferred_channel_layout() {
-                    Ok(layout) => {
-                        self.connection
-                            .send(ClientMessage::ContextPreferredChannelLayout(layout as _))
-                            .unwrap();
-                    },
-                    Err(e) => {
-                        self.send_error(e);
-                    },
-                }
-            },
-
-            &ServerMessage::ContextGetDeviceEnumeration(device_type) => {
-                match context.enumerate_devices(cubeb::DeviceType::from_bits_truncate(device_type)) {
-                    Ok(devices) => {
-                        let v: Vec<DeviceInfo> = devices.iter().map(|i| i.raw().into()).collect();
-                        self.connection
-                            .send(ClientMessage::ContextEnumeratedDevices(v))
-                            .unwrap();
-                    },
-                    Err(e) => {
-                        self.send_error(e);
-                    },
-                }
-            },
-
-            &ServerMessage::StreamInit(ref params) => {
-                fn opt_stream_params(params: Option<&StreamParams>) -> Option<cubeb::StreamParams> {
-                    match params {
-                        Some(p) => {
-                            let raw = ffi::cubeb_stream_params::from(p);
-                            Some(unsafe { cubeb::StreamParams::from_raw(&raw as *const _) })
-                        },
-                        None => None,
-                    }
-                }
-
-                fn frame_size_in_bytes(params: Option<cubeb::StreamParams>) -> u16 {
-                    match params.as_ref() {
-                        Some(p) => {
-                            let sample_size = match p.format() {
-                                cubeb::SampleFormat::S16LE |
-                                cubeb::SampleFormat::S16BE |
-                                cubeb::SampleFormat::S16NE => 2,
-                                cubeb::SampleFormat::Float32LE |
-                                cubeb::SampleFormat::Float32BE |
-                                cubeb::SampleFormat::Float32NE => 4,
-                            };
-                            let channel_count = p.channels() as u16;
-                            sample_size * channel_count
-                        },
-                        None => 0,
-                    }
-                }
-
-                // TODO: Yuck!
-                let input_device = unsafe { cubeb::DeviceId::from_raw(params.input_device as *const _) };
-                let output_device = unsafe { cubeb::DeviceId::from_raw(params.output_device as *const _) };
-                let latency = params.latency_frames;
-                let mut builder = cubeb::StreamInitOptionsBuilder::new();
-                builder
-                    .input_device(input_device)
-                    .output_device(output_device)
-                    .latency(latency);
+    fn queue_init_messages<T, U, V>(&mut self, stm_tok: usize, conn: T, input_file: U, output_file: V) -> Result<()>
+    where
+        T: IntoRawFd,
+        U: IntoRawFd,
+        V: IntoRawFd,
+    {
+        try!(self.queue_message_fd(
+            ClientMessage::StreamCreated(stm_tok),
+            conn
+        ));
+        try!(self.queue_message_fd(
+            ClientMessage::StreamCreatedInputShm,
+            input_file
+        ));
+        try!(self.queue_message_fd(
+            ClientMessage::StreamCreatedOutputShm,
+            output_file
+        ));
+        Ok(())
+    }
 
-                if let Some(ref stream_name) = params.stream_name {
-                    builder.stream_name(stream_name);
-                }
-                let input_stream_params = opt_stream_params(params.input_stream_params.as_ref());
-                if let Some(ref isp) = input_stream_params {
-                    builder.input_stream_param(isp);
-                }
-                let output_stream_params = opt_stream_params(params.output_stream_params.as_ref());
-                if let Some(ref osp) = output_stream_params {
-                    builder.output_stream_param(osp);
-                }
-                let params = builder.take();
-
-                let input_frame_size = frame_size_in_bytes(input_stream_params);
-                let output_frame_size = frame_size_in_bytes(output_stream_params);
-
-                let (conn1, conn2) = audioipc::Connection::pair()?;
-                info!("Created connection pair: {:?}-{:?}", conn1, conn2);
-
-                let (input_shm, input_file) =
-                    SharedMemWriter::new(&audioipc::get_shm_path("input"), SHM_AREA_SIZE)?;
-                let (output_shm, output_file) =
-                    SharedMemReader::new(&audioipc::get_shm_path("output"), SHM_AREA_SIZE)?;
-
-                match context.stream_init(
-                    &params,
-                    Callback {
-                        input_frame_size: input_frame_size,
-                        output_frame_size: output_frame_size,
-                        connection: conn2,
-                        input_shm: input_shm,
-                        output_shm: output_shm,
-                    }
-                ) {
-                    Ok(stream) => {
-                        let stm_tok = match self.streams.vacant_entry() {
-                            Some(entry) => {
-                                debug!(
-                                    "Registering stream {:?}",
-                                    entry.index(),
-                                );
-
-                                entry.insert(stream).index()
-                            },
-                            None => {
-                                // TODO: Turn into error
-                                panic!("Failed to insert stream into slab. No entries");
-                            },
-                        };
-
-                        self.connection
-                            .send_with_fd(ClientMessage::StreamCreated(stm_tok), Some(conn1))
-                            .unwrap();
-                        // TODO: It'd be nicer to send these as part of
-                        // StreamCreated, but that requires changing
-                        // sendmsg/recvmsg to support multiple fds.
-                        self.connection
-                            .send_with_fd(ClientMessage::StreamCreatedInputShm, Some(input_file))
-                            .unwrap();
-                        self.connection
-                            .send_with_fd(ClientMessage::StreamCreatedOutputShm, Some(output_file))
-                            .unwrap();
-                    },
-                    Err(e) => {
-                        self.send_error(e);
-                    },
-                }
-            },
+    fn queue_message(&mut self, msg: ClientMessage) -> Result<()> {
+        debug!("queue_message: {:?}", msg);
+        encode::<ClientMessage>(&mut self.send_buffer, &msg).or_else(|e| {
+            Err(e).chain_err(|| "Failed to encode msg into send buffer")
+        })
+    }
 
-            &ServerMessage::StreamDestroy(stm_tok) => {
-                self.streams.remove(stm_tok);
-                self.connection
-                    .send(ClientMessage::StreamDestroyed)
-                    .unwrap();
-            },
-
-            &ServerMessage::StreamStart(stm_tok) => {
-                let _ = self.streams[stm_tok].start();
-                self.connection.send(ClientMessage::StreamStarted).unwrap();
-            },
-            &ServerMessage::StreamStop(stm_tok) => {
-                let _ = self.streams[stm_tok].stop();
-                self.connection.send(ClientMessage::StreamStopped).unwrap();
-            },
-            &ServerMessage::StreamGetPosition(stm_tok) => {
-                match self.streams[stm_tok].position() {
-                    Ok(position) => {
-                        self.connection
-                            .send(ClientMessage::StreamPosition(position))
-                            .unwrap();
-                    },
-                    Err(e) => {
-                        self.send_error(e);
-                    },
-                }
-            },
-            &ServerMessage::StreamGetLatency(stm_tok) => {
-                match self.streams[stm_tok].latency() {
-                    Ok(latency) => {
-                        self.connection
-                            .send(ClientMessage::StreamLatency(latency))
-                            .unwrap();
-                    },
-                    Err(e) => self.send_error(e),
-                }
-            },
-            &ServerMessage::StreamSetVolume(stm_tok, volume) => {
-                let _ = self.streams[stm_tok].set_volume(volume);
-                self.connection
-                    .send(ClientMessage::StreamVolumeSet)
-                    .unwrap();
-            },
-            &ServerMessage::StreamSetPanning(stm_tok, panning) => {
-                let _ = self.streams[stm_tok].set_panning(panning);
-                self.connection
-                    .send(ClientMessage::StreamPanningSet)
-                    .unwrap();
-            },
-            &ServerMessage::StreamGetCurrentDevice(stm_tok) => {
-                let err = match self.streams[stm_tok].current_device() {
-                    Ok(device) => {
-                        // TODO: Yuck!
-                        self.connection
-                            .send(ClientMessage::StreamCurrentDevice(device.into()))
-                            .unwrap();
-                        None
-                    },
-                    Err(e) => Some(e),
-                };
-                if let Some(e) = err {
-                    self.send_error(e);
-                }
-            },
-            _ => {
-                bail!("Unexpected Message");
-            },
-        }
+    // Since send_fd supports sending one RawFd at a time, queuing a
+    // message with a RawFd forces use to take the current send_buffer
+    // and move it pending queue.
+    fn queue_message_fd<FD: IntoRawFd>(&mut self, msg: ClientMessage, fd: FD) -> Result<()> {
+        let fd = fd.into_raw_fd();
+        debug!("queue_message_fd: {:?} {:?}", msg, fd);
+        try!(self.queue_message(msg));
+        self.take_pending_send(Some(fd));
         Ok(())
     }
 
-    fn send_error(&mut self, error: cubeb::Error) {
-        self.connection
-            .send(ClientMessage::ContextError(error.raw_code()))
-            .unwrap();
+    // Take the current messages in the send_buffer and move them to
+    // pending queue.
+    fn take_pending_send(&mut self, fd: Option<RawFd>) {
+        let pending = self.send_buffer.take().freeze();
+        debug!("take_pending_send: ({:?} {:?})", pending, fd);
+        self.pending_send.push_back((
+            pending,
+            fd.map(|fd| unsafe { AutoCloseFd::from_raw_fd(fd) })
+        ));
+    }
+
+    // Process the pending queue and send them to client.
+    fn flush_pending_send(&mut self) -> Result<Ready> {
+        debug!("flush_pending_send");
+        // take any pending messages in the send buffer.
+        if !self.send_buffer.is_empty() {
+            self.take_pending_send(None);
+        }
+
+        trace!("pending queue: {:?}", self.pending_send);
+
+        let mut result = Ready::readable();
+        let mut processed = 0;
+
+        for &mut (ref mut buf, ref mut fd) in &mut self.pending_send {
+            trace!("sending buf {:?}, fd {:?}", buf, fd);
+            let r = {
+                let mut src = Cursor::new(buf.as_ref());
+                let fd = match *fd {
+                    Some(ref fd) => Some(fd.as_raw_fd()),
+                    None => None,
+                };
+                try!(self.io.send_buf_fd(&mut src, fd))
+            };
+            match r {
+                Async::Ready(n) if n == buf.len() => {
+                    processed += 1;
+                },
+                Async::Ready(n) => {
+                    let _ = buf.split_to(n);
+                    let _ = fd.take();
+                    result.insert(Ready::writable());
+                    break;
+                },
+                Async::NotReady => {
+                    result.insert(Ready::writable());
+                },
+            }
+        }
+
+        debug!("processed {} buffers", processed);
+
+        self.pending_send = self.pending_send.split_off(processed);
+
+        trace!("pending queue: {:?}", self.pending_send);
+
+        Ok(result)
     }
 }
 
 pub struct Server {
     socket: UnixListener,
     // Ok(None)      - Server hasn't tried to create cubeb::Context.
     // Ok(Some(ctx)) - Server has successfully created cubeb::Context.
     // Err(_)        - Server has tried and failed to create cubeb::Context.
     //                 Don't try again.
-    context: Result<Option<cubeb::Context>>,
+    context: Option<Result<cubeb::Context>>,
     conns: Slab<ServerConn>
 }
 
 impl Server {
     pub fn new(socket: UnixListener) -> Server {
         Server {
             socket: socket,
-            context: Ok(None),
+            context: None,
             conns: Slab::with_capacity(16)
         }
     }
 
     fn accept(&mut self, poll: &mut mio::Poll) -> Result<()> {
         debug!("Server accepting connection");
 
         let client_socket = match self.socket.accept() {
@@ -481,83 +605,98 @@ impl Server {
             None => {
                 panic!("failed to insert connection");
             },
         };
 
         // Register the connection
         self.conns[token].token = Some(token);
         poll.register(
-            &self.conns[token].connection,
+            &self.conns[token].io,
             token,
             mio::Ready::readable(),
             mio::PollOpt::edge() | mio::PollOpt::oneshot()
         ).unwrap();
         /*
         let r = self.conns[token].receive();
         debug!("received {:?}", r);
         let r = self.conns[token].send(ClientMessage::ClientConnected);
         debug!("sent {:?} (ClientConnected)", r);
          */
 
         // Since we have a connection try creating a cubeb context. If
         // it fails, mark the failure with Err.
-        if let Ok(None) = self.context {
-            self.context = cubeb::Context::init("AudioIPC Server", None)
-                .and_then(|ctx| Ok(Some(ctx)))
-                .or_else(|err| Err(err.into()));
+        if self.context.is_none() {
+            self.context = Some(cubeb::Context::init("AudioIPC Server", None).or_else(|e| {
+                Err(e).chain_err(|| "Unable to create cubeb context.")
+            }))
         }
 
         Ok(())
     }
 
     pub fn poll(&mut self, poll: &mut mio::Poll) -> Result<()> {
         let mut events = mio::Events::with_capacity(16);
 
         match poll.poll(&mut events, None) {
             Ok(_) => {},
             Err(e) => error!("server poll error: {}", e),
         }
 
         for event in events.iter() {
             match event.token() {
                 SERVER => {
-                    match self.accept(poll) {
-                        Err(e) => {
-                            error!("server accept error: {}", e);
-                        },
-                        _ => {},
+                    if let Err(e) = self.accept(poll) {
+                        error!("server accept error: {}", e);
                     };
                 },
                 QUIT => {
                     info!("Quitting Audio Server loop");
                     bail!("quit");
                 },
                 token => {
-                    debug!("token {:?} ready", token);
+                    trace!("token {:?} ready", token);
+
+                    let context = self.context.as_ref().expect(
+                        "Shouldn't receive a message before accepting connection."
+                    );
+
+                    let mut readiness = Ready::readable();
 
-                    let r = self.conns[token].process(poll, &self.context);
+                    if event.readiness().is_readable() {
+                        let r = self.conns[token].process_read(context);
+                        trace!("got {:?}", r);
 
-                    debug!("got {:?}", r);
+                        if let Err(e) = r {
+                            debug!("dropped client {:?} due to error {:?}", token, e);
+                            self.conns.remove(token);
+                            continue;
+                        }
+                    };
 
-                    // TODO: Handle disconnection etc.
-                    // TODO: Should be handled at a higher level by a
-                    // disconnect message.
-                    if let Err(e) = r {
-                        debug!("dropped client {:?} due to error {:?}", token, e);
-                        self.conns.remove(token);
-                        continue;
-                    }
+                    if event.readiness().is_writable() {
+                        let r = self.conns[token].flush_pending_send();
+                        trace!("got {:?}", r);
 
-                    // poll.reregister(
-                    //     &self.conn(token).connection,
-                    //     token,
-                    //     mio::Ready::readable(),
-                    //     mio::PollOpt::edge() | mio::PollOpt::oneshot()
-                    // ).unwrap();
+                        match r {
+                            Ok(r) => readiness = r,
+                            Err(e) => {
+                                debug!("dropped client {:?} due to error {:?}", token, e);
+                                self.conns.remove(token);
+                                continue;
+                            },
+                        }
+                    };
+
+                    poll.reregister(
+                        &self.conns[token].io,
+                        token,
+                        readiness,
+                        mio::PollOpt::edge() | mio::PollOpt::oneshot()
+                    ).unwrap();
                 },
             }
         }
 
         Ok(())
     }
 }
 
@@ -582,28 +721,45 @@ pub fn run(running: Arc<AtomicBool>) -> 
         mio::PollOpt::edge()
     ).unwrap();
 
     loop {
         if !running.load(Ordering::SeqCst) {
             bail!("server quit due to ctrl-c");
         }
 
-        let _ = try!(server.poll(&mut poll));
+        try!(server.poll(&mut poll));
     }
 
     //poll.deregister(&server.socket).unwrap();
 }
 
+fn error(error: cubeb::Error) -> ClientMessage {
+    ClientMessage::ContextError(error.raw_code())
+}
+
+struct ServerWrapper {
+    thread_handle: std::thread::JoinHandle<()>,
+    sender_ctl: channel::SenderCtl,
+}
+
+impl ServerWrapper {
+    fn shutdown(self) {
+        // Dropping SenderCtl here will notify the other end.
+        drop(self.sender_ctl);
+        self.thread_handle.join().unwrap();
+    }
+}
+
 #[no_mangle]
 pub extern "C" fn audioipc_server_start() -> *mut c_void {
 
     let (tx, rx) = channel::ctl_pair();
 
-    thread::spawn(move || {
+    let handle = thread::spawn(move || {
         // Ignore result.
         let _ = std::fs::remove_file(audioipc::get_uds_path());
 
         // TODO: Use a SEQPACKET, wrap it in UnixStream?
         let mut poll = mio::Poll::new().unwrap();
         let mut server = Server::new(UnixListener::bind(audioipc::get_uds_path()).unwrap());
 
         poll.register(
@@ -612,25 +768,27 @@ pub extern "C" fn audioipc_server_start(
             mio::Ready::readable(),
             mio::PollOpt::edge()
         ).unwrap();
 
         poll.register(&rx, QUIT, mio::Ready::readable(), mio::PollOpt::edge())
             .unwrap();
 
         loop {
-            match server.poll(&mut poll) {
-                Err(_) => {
-                    return;
-                },
-                _ => (),
+            if server.poll(&mut poll).is_err() {
+                return;
             }
         }
     });
 
-    Box::into_raw(Box::new(tx)) as *mut _
+    let wrapper = ServerWrapper {
+        thread_handle: handle,
+        sender_ctl: tx
+    };
+
+    Box::into_raw(Box::new(wrapper)) as *mut _
 }
 
 #[no_mangle]
 pub extern "C" fn audioipc_server_stop(p: *mut c_void) {
-    // Dropping SenderCtl here will notify the other end.
-    let _ = unsafe { Box::<channel::SenderCtl>::from_raw(p as *mut _) };
+    let wrapper = unsafe { Box::<ServerWrapper>::from_raw(p as *mut _) };
+    wrapper.shutdown();
 }