Bug 1391523 - P3: Import audioipc crates. r=kinetik draft
authorDan Glastonbury <dan.glastonbury@gmail.com>
Thu, 10 Aug 2017 10:28:26 +1000
changeset 652726 dc4b466664b65fbd485eefd2647ff651a5923b20
parent 652725 03eb89b3a61c45d8d62934d3cf137e6687c66792
child 652727 0c18bcec9246429a62209faa6433b9d9b11023af
push id76138
push userbmo:dglastonbury@mozilla.com
push dateFri, 25 Aug 2017 06:15:19 +0000
reviewerskinetik
bugs1391523
milestone57.0a1
Bug 1391523 - P3: Import audioipc crates. r=kinetik MozReview-Commit-ID: Ci2xbenAd8i
media/audioipc/Cargo.toml
media/audioipc/README.md
media/audioipc/audioipc/Cargo.toml
media/audioipc/audioipc/src/connection.rs
media/audioipc/audioipc/src/errors.rs
media/audioipc/audioipc/src/lib.rs
media/audioipc/audioipc/src/messages.rs
media/audioipc/audioipc/src/msg.rs
media/audioipc/audioipc/src/shm.rs
media/audioipc/client/Cargo.toml
media/audioipc/client/src/context.rs
media/audioipc/client/src/lib.rs
media/audioipc/client/src/send_recv.rs
media/audioipc/client/src/stream.rs
media/audioipc/rustfmt.toml
media/audioipc/server/Cargo.toml
media/audioipc/server/src/channel.rs
media/audioipc/server/src/lib.rs
new file mode 100644
--- /dev/null
+++ b/media/audioipc/Cargo.toml
@@ -0,0 +1,2 @@
+[workspace]
+members = ["audioipc", "client", "server"]
new file mode 100644
--- /dev/null
+++ b/media/audioipc/README.md
@@ -0,0 +1,1 @@
+# Cubeb Audio Remoting Prototype
new file mode 100644
--- /dev/null
+++ b/media/audioipc/audioipc/Cargo.toml
@@ -0,0 +1,20 @@
+[package]
+name = "audioipc"
+version = "0.1.0"
+authors = [
+        "Matthew Gregan <kinetik@flim.org>",
+        "Dan Glastonbury <dan.glastonbury@gmail.com>"
+        ]
+description = "Remote Cubeb IPC"
+
+[dependencies]
+error-chain = "0.10.0"
+log = "^0.3.6"
+serde = "1.*.*"
+serde_derive = "1.*.*"
+bincode = "0.8"
+libc = "0.2"
+mio = "0.6.7"
+cubeb-core = { git = "https://github.com/djg/cubeb-rs", version="^0.1" }
+byteorder = "1"
+memmap = "0.5.2"
new file mode 100644
--- /dev/null
+++ b/media/audioipc/audioipc/src/connection.rs
@@ -0,0 +1,199 @@
+use bincode::{self, deserialize, serialize};
+use errors::*;
+use msg;
+use mio::{Poll, PollOpt, Ready, Token};
+use mio::event::Evented;
+use mio::unix::EventedFd;
+use serde::de::DeserializeOwned;
+use serde::ser::Serialize;
+use std::fmt::Debug;
+use std::io::{self, Read};
+use std::os::unix::io::{AsRawFd, RawFd};
+use std::os::unix::net;
+use std::os::unix::prelude::*;
+use libc;
+use byteorder::{LittleEndian, ReadBytesExt, WriteBytesExt};
+
+pub trait RecvFd {
+    fn recv_fd(&mut self, bytes: &mut [u8]) -> io::Result<(usize, Option<RawFd>)>;
+}
+
+pub trait SendFd {
+    fn send_fd<FD: IntoRawFd>(&mut self, bytes: &[u8], fd: Option<FD>) -> io::Result<(usize)>;
+}
+
+// Because of the trait implementation rules in Rust, this needs to be
+// a wrapper class to allow implementation of a trait from another
+// crate on a struct from yet another crate.
+//
+// This class is effectively net::UnixStream.
+
+#[derive(Debug)]
+pub struct Connection {
+    stream: net::UnixStream
+}
+
+impl Connection {
+    pub fn new(stream: net::UnixStream) -> Connection {
+        info!("Create new connection");
+        Connection {
+            stream: stream
+        }
+    }
+
+    /// Creates an unnamed pair of connected sockets.
+    ///
+    /// Returns two `Connection`s which are connected to each other.
+    ///
+    /// # Examples
+    ///
+    /// ```no_run
+    /// use audioipc::Connection;
+    ///
+    /// let (conn1, conn2) = match Connection::pair() {
+    ///     Ok((conn1, conn2)) => (conn1, conn2),
+    ///     Err(e) => {
+    ///         println!("Couldn't create a pair of connections: {:?}", e);
+    ///         return
+    ///     }
+    /// };
+    /// ```
+    pub fn pair() -> io::Result<(Connection, Connection)> {
+        let (s1, s2) = net::UnixStream::pair()?;
+        Ok((
+            Connection {
+                stream: s1
+            },
+            Connection {
+                stream: s2
+            }
+        ))
+    }
+
+    pub fn receive<RT>(&mut self) -> Result<RT>
+    where
+        RT: DeserializeOwned + Debug,
+    {
+        match self.receive_with_fd() {
+            Ok((r, None)) => Ok(r),
+            Ok((_, Some(_))) => panic!("unexpected fd received"),
+            Err(e) => Err(e),
+        }
+    }
+
+    pub fn receive_with_fd<RT>(&mut self) -> Result<(RT, Option<RawFd>)>
+    where
+        RT: DeserializeOwned + Debug,
+    {
+        // TODO: Check deserialize_from and serialize_into.
+        let mut encoded = vec![0; 32 * 1024]; // TODO: Get max size from bincode, or at least assert.
+        // TODO: Read until block, EOF, or error.
+        // TODO: Switch back to recv_fd.
+        match self.stream.recv_fd(&mut encoded) {
+            Ok((0, _)) => Err(ErrorKind::Disconnected.into()),
+            // TODO: Handle partial read?
+            Ok((n, fd)) => {
+                let r = deserialize(&encoded[..n]);
+                debug!("receive {:?}", r);
+                match r {
+                    Ok(r) => Ok((r, fd)),
+                    Err(e) => Err(e).chain_err(|| "Failed to deserialize message"),
+                }
+            },
+            // TODO: Handle dropped message.
+            // Err(ref e) if e.kind() == std::io::ErrorKind::WouldBlock => panic!("wouldblock"),
+            _ => bail!("socket write"),
+        }
+    }
+
+    pub fn send<ST>(&mut self, msg: ST) -> Result<usize>
+    where
+        ST: Serialize + Debug,
+    {
+        self.send_with_fd::<ST, Connection>(msg, None)
+    }
+
+    pub fn send_with_fd<ST, FD>(&mut self, msg: ST, fd_to_send: Option<FD>) -> Result<usize>
+    where
+        ST: Serialize + Debug,
+        FD: IntoRawFd + Debug,
+    {
+        let encoded: Vec<u8> = serialize(&msg, bincode::Infinite)?;
+        info!("send_with_fd {:?}, {:?}", msg, fd_to_send);
+        self.stream.send_fd(&encoded, fd_to_send).chain_err(
+            || "Failed to send message with fd"
+        )
+    }
+}
+
+impl Evented for Connection {
+    fn register(&self, poll: &Poll, token: Token, events: Ready, opts: PollOpt) -> io::Result<()> {
+        EventedFd(&self.stream.as_raw_fd()).register(poll, token, events, opts)
+    }
+
+    fn reregister(&self, poll: &Poll, token: Token, events: Ready, opts: PollOpt) -> io::Result<()> {
+        EventedFd(&self.stream.as_raw_fd()).reregister(poll, token, events, opts)
+    }
+
+    fn deregister(&self, poll: &Poll) -> io::Result<()> {
+        EventedFd(&self.stream.as_raw_fd()).deregister(poll)
+    }
+}
+
+impl Read for Connection {
+    fn read(&mut self, bytes: &mut [u8]) -> io::Result<usize> {
+        self.stream.read(bytes)
+    }
+}
+
+// TODO: Is this required?
+impl<'a> Read for &'a Connection {
+    fn read(&mut self, bytes: &mut [u8]) -> io::Result<usize> {
+        (&self.stream).read(bytes)
+    }
+}
+
+impl RecvFd for net::UnixStream {
+    fn recv_fd(&mut self, buf_to_recv: &mut [u8]) -> io::Result<(usize, Option<RawFd>)> {
+        let length = self.read_u32::<LittleEndian>()?;
+
+        msg::recvmsg(self.as_raw_fd(), &mut buf_to_recv[..length as usize])
+    }
+}
+
+impl RecvFd for Connection {
+    fn recv_fd(&mut self, buf_to_recv: &mut [u8]) -> io::Result<(usize, Option<RawFd>)> {
+        self.stream.recv_fd(buf_to_recv)
+    }
+}
+
+impl FromRawFd for Connection {
+    unsafe fn from_raw_fd(fd: RawFd) -> Connection {
+        Connection {
+            stream: net::UnixStream::from_raw_fd(fd)
+        }
+    }
+}
+
+impl IntoRawFd for Connection {
+    fn into_raw_fd(self) -> RawFd {
+        self.stream.into_raw_fd()
+    }
+}
+
+impl SendFd for net::UnixStream {
+    fn send_fd<FD: IntoRawFd>(&mut self, buf_to_send: &[u8], fd_to_send: Option<FD>) -> io::Result<usize> {
+        self.write_u32::<LittleEndian>(buf_to_send.len() as u32)?;
+
+        let fd_to_send = fd_to_send.map(|fd| fd.into_raw_fd());
+        let r = msg::sendmsg(self.as_raw_fd(), buf_to_send, fd_to_send);
+        fd_to_send.map(|fd| unsafe { libc::close(fd) });
+        r
+    }
+}
+
+impl SendFd for Connection {
+    fn send_fd<FD: IntoRawFd>(&mut self, buf_to_send: &[u8], fd_to_send: Option<FD>) -> io::Result<usize> {
+        self.stream.send_fd(buf_to_send, fd_to_send)
+    }
+}
new file mode 100644
--- /dev/null
+++ b/media/audioipc/audioipc/src/errors.rs
@@ -0,0 +1,17 @@
+use bincode;
+use cubeb_core;
+use std;
+
+error_chain! {
+    // Maybe replace with chain_err to improve the error info.
+    foreign_links {
+        Bincode(bincode::Error);
+        Io(std::io::Error);
+        Cubeb(cubeb_core::Error);
+    }
+
+    // Replace bail!(str) with explicit errors.
+    errors {
+        Disconnected
+    }
+}
new file mode 100644
--- /dev/null
+++ b/media/audioipc/audioipc/src/lib.rs
@@ -0,0 +1,51 @@
+// Copyright © 2017 Mozilla Foundation
+//
+// This program is made available under an ISC-style license.  See the
+// accompanying file LICENSE for details
+#![allow(dead_code)] // TODO: Remove.
+
+#![recursion_limit = "1024"]
+#[macro_use]
+extern crate error_chain;
+
+#[macro_use]
+extern crate log;
+
+#[macro_use]
+extern crate serde_derive;
+extern crate serde;
+extern crate bincode;
+
+extern crate mio;
+
+extern crate cubeb_core;
+
+extern crate libc;
+extern crate byteorder;
+
+extern crate memmap;
+
+mod connection;
+pub mod errors;
+pub mod messages;
+mod msg;
+pub mod shm;
+
+pub use connection::*;
+pub use messages::{ClientMessage, ServerMessage};
+use std::env::temp_dir;
+use std::path::PathBuf;
+
+fn get_temp_path(name: &str) -> PathBuf {
+    let mut path = temp_dir();
+    path.push(name);
+    path
+}
+
+pub fn get_uds_path() -> PathBuf {
+    get_temp_path("cubeb-sock")
+}
+
+pub fn get_shm_path(dir: &str) -> PathBuf {
+    get_temp_path(&format!("cubeb-shm-{}", dir))
+}
new file mode 100644
--- /dev/null
+++ b/media/audioipc/audioipc/src/messages.rs
@@ -0,0 +1,249 @@
+// Copyright © 2017 Mozilla Foundation
+//
+// This program is made available under an ISC-style license.  See the
+// accompanying file LICENSE for details
+
+use cubeb_core::{self, ffi};
+use std::ffi::{CStr, CString};
+use std::os::raw::c_char;
+use std::ptr;
+
+#[derive(Debug, Serialize, Deserialize)]
+pub struct Device {
+    pub output_name: Option<Vec<u8>>,
+    pub input_name: Option<Vec<u8>>
+}
+
+impl<'a> From<cubeb_core::Device<'a>> for Device {
+    fn from(info: cubeb_core::Device) -> Self {
+        Self {
+            output_name: info.output_name_bytes().map(|s| s.to_vec()),
+            input_name: info.input_name_bytes().map(|s| s.to_vec())
+        }
+    }
+}
+
+impl From<ffi::cubeb_device> for Device {
+    fn from(info: ffi::cubeb_device) -> Self {
+        Self {
+            output_name: dup_str(info.output_name),
+            input_name: dup_str(info.input_name)
+        }
+    }
+}
+
+impl From<Device> for ffi::cubeb_device {
+    fn from(info: Device) -> Self {
+        Self {
+            output_name: opt_str(info.output_name),
+            input_name: opt_str(info.input_name)
+        }
+    }
+}
+
+#[derive(Debug, Serialize, Deserialize)]
+pub struct DeviceInfo {
+    pub devid: usize,
+    pub device_id: Option<Vec<u8>>,
+    pub friendly_name: Option<Vec<u8>>,
+    pub group_id: Option<Vec<u8>>,
+    pub vendor_name: Option<Vec<u8>>,
+
+    pub device_type: ffi::cubeb_device_type,
+    pub state: ffi::cubeb_device_state,
+    pub preferred: ffi::cubeb_device_pref,
+
+    pub format: ffi::cubeb_device_fmt,
+    pub default_format: ffi::cubeb_device_fmt,
+    pub max_channels: u32,
+    pub default_rate: u32,
+    pub max_rate: u32,
+    pub min_rate: u32,
+
+    pub latency_lo: u32,
+    pub latency_hi: u32
+}
+
+impl<'a> From<&'a ffi::cubeb_device_info> for DeviceInfo {
+    fn from(info: &'a ffi::cubeb_device_info) -> Self {
+        DeviceInfo {
+            devid: info.devid as _,
+            device_id: dup_str(info.device_id),
+            friendly_name: dup_str(info.friendly_name),
+            group_id: dup_str(info.group_id),
+            vendor_name: dup_str(info.vendor_name),
+
+            device_type: info.device_type,
+            state: info.state,
+            preferred: info.preferred,
+
+            format: info.format,
+            default_format: info.default_format,
+            max_channels: info.max_channels,
+            default_rate: info.default_rate,
+            max_rate: info.max_rate,
+            min_rate: info.min_rate,
+
+            latency_lo: info.latency_lo,
+            latency_hi: info.latency_hi
+        }
+    }
+}
+
+impl From<DeviceInfo> for ffi::cubeb_device_info {
+    fn from(info: DeviceInfo) -> Self {
+        ffi::cubeb_device_info {
+            devid: info.devid as _,
+            device_id: opt_str(info.device_id),
+            friendly_name: opt_str(info.friendly_name),
+            group_id: opt_str(info.group_id),
+            vendor_name: opt_str(info.vendor_name),
+
+            device_type: info.device_type,
+            state: info.state,
+            preferred: info.preferred,
+
+            format: info.format,
+            default_format: info.default_format,
+            max_channels: info.max_channels,
+            default_rate: info.default_rate,
+            max_rate: info.max_rate,
+            min_rate: info.min_rate,
+
+            latency_lo: info.latency_lo,
+            latency_hi: info.latency_hi
+        }
+    }
+}
+
+#[derive(Debug, Serialize, Deserialize)]
+pub struct StreamParams {
+    pub format: u32,
+    pub rate: u16,
+    pub channels: u8,
+    pub layout: i32
+}
+
+impl<'a> From<&'a ffi::cubeb_stream_params> for StreamParams {
+    fn from(params: &'a ffi::cubeb_stream_params) -> Self {
+        assert!(params.channels <= u8::max_value() as u32);
+
+        StreamParams {
+            format: params.format,
+            rate: params.rate as u16,
+            channels: params.channels as u8,
+            layout: params.layout
+        }
+    }
+}
+
+impl<'a> From<&'a StreamParams> for ffi::cubeb_stream_params {
+    fn from(params: &StreamParams) -> Self {
+        ffi::cubeb_stream_params {
+            format: params.format,
+            rate: params.rate as u32,
+            channels: params.channels as u32,
+            layout: params.layout
+        }
+    }
+}
+
+#[derive(Debug, Serialize, Deserialize)]
+pub struct StreamInitParams {
+    pub stream_name: Option<Vec<u8>>,
+    pub input_device: usize,
+    pub input_stream_params: Option<StreamParams>,
+    pub output_device: usize,
+    pub output_stream_params: Option<StreamParams>,
+    pub latency_frames: u32
+}
+
+fn dup_str(s: *const c_char) -> Option<Vec<u8>> {
+    if s.is_null() {
+        None
+    } else {
+        let vec: Vec<u8> = unsafe { CStr::from_ptr(s) }.to_bytes().to_vec();
+        Some(vec)
+    }
+}
+
+fn opt_str(v: Option<Vec<u8>>) -> *const c_char {
+    match v {
+        Some(v) => {
+            match CString::new(v) {
+                Ok(s) => s.into_raw(),
+                Err(_) => {
+                    debug!("Failed to convert bytes to CString");
+                    ptr::null()
+                },
+            }
+        },
+        None => ptr::null(),
+    }
+}
+
+// Client -> Server messages.
+// TODO: Callbacks should be different messages types so
+// ServerConn::process_msg doesn't have a catch-all case.
+#[derive(Debug, Serialize, Deserialize)]
+pub enum ServerMessage {
+    ClientConnect,
+    ClientDisconnect,
+
+    ContextGetBackendId,
+    ContextGetMaxChannelCount,
+    ContextGetMinLatency(StreamParams),
+    ContextGetPreferredSampleRate,
+    ContextGetPreferredChannelLayout,
+    ContextGetDeviceEnumeration(ffi::cubeb_device_type),
+
+    StreamInit(StreamInitParams),
+    StreamDestroy(usize),
+
+    StreamStart(usize),
+    StreamStop(usize),
+    StreamResetDefaultDevice(usize),
+    StreamGetPosition(usize),
+    StreamGetLatency(usize),
+    StreamSetVolume(usize, f32),
+    StreamSetPanning(usize, f32),
+    StreamGetCurrentDevice(usize),
+
+    StreamDataCallback(isize)
+}
+
+// Server -> Client messages.
+// TODO: Streams need id.
+#[derive(Debug, Serialize, Deserialize)]
+pub enum ClientMessage {
+    ClientConnected,
+    ClientDisconnected,
+
+    ContextBackendId(),
+    ContextMaxChannelCount(u32),
+    ContextMinLatency(u32),
+    ContextPreferredSampleRate(u32),
+    ContextPreferredChannelLayout(ffi::cubeb_channel_layout),
+    ContextEnumeratedDevices(Vec<DeviceInfo>),
+
+    StreamCreated(usize), /*(RawFd)*/
+    StreamCreatedInputShm, /*(RawFd)*/
+    StreamCreatedOutputShm, /*(RawFd)*/
+    StreamDestroyed,
+
+    StreamStarted,
+    StreamStopped,
+    StreamDefaultDeviceReset,
+    StreamPosition(u64),
+    StreamLatency(u32),
+    StreamVolumeSet,
+    StreamPanningSet,
+    StreamCurrentDevice(Device),
+
+    StreamDataCallback(isize, usize),
+    StreamStateCallback(ffi::cubeb_state),
+
+    ContextError(ffi::cubeb_error_code),
+    StreamError, /*(Error)*/
+    ClientError /*(Error)*/
+}
new file mode 100644
--- /dev/null
+++ b/media/audioipc/audioipc/src/msg.rs
@@ -0,0 +1,105 @@
+use libc;
+use std::io;
+use std::mem;
+use std::ptr;
+use std::os::unix::io::RawFd;
+use std;
+
+// Note: The following fields must be laid out together, the OS expects them
+// to be part of a single allocation.
+#[repr(C)]
+struct CmsgSpace {
+    cmsghdr: libc::cmsghdr,
+    data: libc::c_int,
+}
+
+unsafe fn sendmsg_retry(fd: libc::c_int, msg: *const libc::msghdr, flags: libc::c_int) -> libc::ssize_t {
+    loop {
+        let r = libc::sendmsg(fd, msg, flags);
+        if r == -1 && io::Error::last_os_error().raw_os_error().unwrap() == libc::EAGAIN {
+            std::thread::yield_now();
+            continue;
+        }
+        return r;
+    }
+}
+
+pub fn sendmsg(fd: RawFd, to_send: &[u8], fd_to_send: Option<RawFd>) -> io::Result<usize> {
+    let mut msghdr: libc::msghdr = unsafe { mem::zeroed() };
+    let mut iovec: libc::iovec = unsafe { mem::zeroed() };
+    let mut cmsg: CmsgSpace = unsafe { mem::zeroed() };
+
+    msghdr.msg_iov = &mut iovec as *mut _;
+    msghdr.msg_iovlen = 1;
+    if fd_to_send.is_some() {
+        msghdr.msg_control = &mut cmsg.cmsghdr as *mut _ as *mut _;
+        msghdr.msg_controllen = mem::size_of::<CmsgSpace>() as _;
+    }
+
+    iovec.iov_base = if to_send.is_empty() {
+        // Empty Vecs have a non-null pointer.
+        ptr::null_mut()
+    } else {
+        to_send.as_ptr() as *const _ as *mut _
+    };
+    iovec.iov_len = to_send.len();
+
+    cmsg.cmsghdr.cmsg_len = msghdr.msg_controllen;
+    cmsg.cmsghdr.cmsg_level = libc::SOL_SOCKET;
+    cmsg.cmsghdr.cmsg_type = libc::SCM_RIGHTS;
+
+    cmsg.data = fd_to_send.unwrap_or(-1);
+
+    let result = unsafe { sendmsg_retry(fd, &msghdr, 0) };
+    if result >= 0 {
+        Ok(result as usize)
+    } else {
+        Err(io::Error::last_os_error())
+    }
+}
+
+unsafe fn recvmsg_retry(fd: libc::c_int, msg: *mut libc::msghdr, flags: libc::c_int) -> libc::ssize_t {
+    loop {
+        let r = libc::recvmsg(fd, msg, flags);
+        if r == -1 && io::Error::last_os_error().raw_os_error().unwrap() == libc::EAGAIN {
+            std::thread::yield_now();
+            continue;
+        }
+        return r;
+    }
+}
+
+pub fn recvmsg(fd: RawFd, to_recv: &mut [u8]) -> io::Result<(usize, Option<RawFd>)> {
+    let mut msghdr: libc::msghdr = unsafe { mem::zeroed() };
+    let mut iovec: libc::iovec = unsafe { mem::zeroed() };
+    let mut cmsg: CmsgSpace = unsafe { mem::zeroed() };
+
+    msghdr.msg_iov = &mut iovec as *mut _;
+    msghdr.msg_iovlen = 1;
+    msghdr.msg_control = &mut cmsg.cmsghdr as *mut _ as *mut _;
+    msghdr.msg_controllen = mem::size_of::<CmsgSpace>() as _;
+
+    iovec.iov_base = if to_recv.is_empty() {
+        // Empty Vecs have a non-null pointer.
+        ptr::null_mut()
+    } else {
+        to_recv.as_ptr() as *const _ as *mut _
+    };
+    iovec.iov_len = to_recv.len();
+
+    let result = unsafe { recvmsg_retry(fd, &mut msghdr, 0) };
+    if result >= 0 {
+        let fd = if msghdr.msg_controllen == mem::size_of::<CmsgSpace>() as _ &&
+            cmsg.cmsghdr.cmsg_len == mem::size_of::<CmsgSpace>() as _ &&
+            cmsg.cmsghdr.cmsg_level == libc::SOL_SOCKET &&
+            cmsg.cmsghdr.cmsg_type == libc::SCM_RIGHTS {
+                Some(cmsg.data)
+            } else {
+                None
+            };
+
+        Ok((result as usize, fd))
+    } else {
+        Err(io::Error::last_os_error())
+    }
+}
new file mode 100644
--- /dev/null
+++ b/media/audioipc/audioipc/src/shm.rs
@@ -0,0 +1,133 @@
+use std::path::Path;
+use std::fs::{remove_file, OpenOptions, File};
+use std::sync::atomic;
+use memmap::{Mmap, Protection};
+use errors::*;
+
+pub struct SharedMemReader {
+    mmap: Mmap,
+}
+
+impl SharedMemReader {
+    pub fn new(path: &Path, size: usize) -> Result<(SharedMemReader, File)> {
+        let file = OpenOptions::new().read(true)
+                                     .write(true)
+                                     .create_new(true)
+                                     .open(path)?;
+        let _ = remove_file(path);
+        file.set_len(size as u64)?;
+        let mmap = Mmap::open(&file, Protection::Read)?;
+        assert_eq!(mmap.len(), size);
+        Ok((SharedMemReader {
+            mmap
+        }, file))
+    }
+
+    pub fn read(&self, buf: &mut [u8]) -> Result<()> {
+        if buf.is_empty() {
+            return Ok(());
+        }
+        // TODO: Track how much is in the shm area.
+        if buf.len() <= self.mmap.len() {
+            atomic::fence(atomic::Ordering::Acquire);
+            unsafe {
+                let len = buf.len();
+                buf.copy_from_slice(&self.mmap.as_slice()[..len]);
+            }
+            Ok(())
+        } else {
+            bail!("mmap size");
+        }
+    }
+}
+
+pub struct SharedMemSlice {
+    mmap: Mmap,
+}
+
+impl SharedMemSlice {
+    pub fn from(file: File, size: usize) -> Result<SharedMemSlice> {
+        let mmap = Mmap::open(&file, Protection::Read)?;
+        assert_eq!(mmap.len(), size);
+        Ok(SharedMemSlice {
+            mmap
+        })
+    }
+
+    pub fn get_slice(&self, size: usize) -> Result<&[u8]> {
+        if size == 0 {
+            return Ok(&[]);
+        }
+        // TODO: Track how much is in the shm area.
+        if size <= self.mmap.len() {
+            atomic::fence(atomic::Ordering::Acquire);
+            let buf = unsafe { &self.mmap.as_slice()[..size] };
+            Ok(buf)
+        } else {
+            bail!("mmap size");
+        }
+    }
+}
+
+pub struct SharedMemWriter {
+    mmap: Mmap,
+}
+
+impl SharedMemWriter {
+    pub fn new(path: &Path, size: usize) -> Result<(SharedMemWriter, File)> {
+        let file = OpenOptions::new().read(true)
+                                     .write(true)
+                                     .create_new(true)
+                                     .open(path)?;
+        let _ = remove_file(path);
+        file.set_len(size as u64)?;
+        let mmap = Mmap::open(&file, Protection::ReadWrite)?;
+        Ok((SharedMemWriter {
+            mmap
+        }, file))
+    }
+
+    pub fn write(&mut self, buf: &[u8]) -> Result<()> {
+        if buf.is_empty() {
+            return Ok(());
+        }
+        // TODO: Track how much is in the shm area.
+        if buf.len() <= self.mmap.len() {
+            unsafe {
+                self.mmap.as_mut_slice()[..buf.len()].copy_from_slice(buf);
+            }
+            atomic::fence(atomic::Ordering::Release);
+            Ok(())
+        } else {
+            bail!("mmap size");
+        }
+    }
+}
+
+pub struct SharedMemMutSlice {
+    mmap: Mmap,
+}
+
+impl SharedMemMutSlice {
+    pub fn from(file: File, size: usize) -> Result<SharedMemMutSlice> {
+        let mmap = Mmap::open(&file, Protection::ReadWrite)?;
+        assert_eq!(mmap.len(), size);
+        Ok(SharedMemMutSlice {
+            mmap
+        })
+    }
+
+    pub fn get_mut_slice(&mut self, size: usize) -> Result<&mut [u8]> {
+        if size == 0 {
+            return Ok(&mut []);
+        }
+        // TODO: Track how much is in the shm area.
+        if size <= self.mmap.len() {
+            let buf = unsafe { &mut self.mmap.as_mut_slice()[..size] };
+            atomic::fence(atomic::Ordering::Release);
+            Ok(buf)
+        } else {
+            bail!("mmap size");
+        }
+    }
+}
new file mode 100644
--- /dev/null
+++ b/media/audioipc/client/Cargo.toml
@@ -0,0 +1,11 @@
+[package]
+name = "audioipc-client"
+version = "0.1.0"
+authors = ["Dan Glastonbury <dan.glastonbury@gmail.com>"]
+description = "Cubeb Backend for talking to remote cubeb server."
+
+[dependencies]
+audioipc = { path="../audioipc" }
+cubeb-core = { git="https://github.com/djg/cubeb-rs", version="^0.1" }
+cubeb-backend = { git="https://github.com/djg/cubeb-rs", version="^0.2" }
+log = "^0.3.6"
new file mode 100644
--- /dev/null
+++ b/media/audioipc/client/src/context.rs
@@ -0,0 +1,170 @@
+// Copyright © 2017 Mozilla Foundation
+//
+// This program is made available under an ISC-style license.  See the
+// accompanying file LICENSE for details
+
+use ClientStream;
+use audioipc::{self, ClientMessage, Connection, ServerMessage, messages};
+use cubeb_backend::{Context, Ops};
+use cubeb_core::{DeviceId, DeviceType, Error, Result, StreamParams, ffi};
+use cubeb_core::binding::Binding;
+use std::ffi::{CStr, CString};
+use std::mem;
+use std::os::raw::c_void;
+use std::os::unix::net::UnixStream;
+use std::sync::{Mutex, MutexGuard};
+use stream;
+
+#[derive(Debug)]
+pub struct ClientContext {
+    _ops: *const Ops,
+    connection: Mutex<Connection>
+}
+
+macro_rules! t(
+    ($e:expr) => (
+        match $e {
+            Ok(e) => e,
+            Err(_) => return Err(Error::default())
+        }
+    ));
+
+pub const CLIENT_OPS: Ops = capi_new!(ClientContext, ClientStream);
+
+impl ClientContext {
+    #[doc(hidden)]
+    pub fn conn(&self) -> MutexGuard<Connection> {
+        self.connection.lock().unwrap()
+    }
+}
+
+impl Context for ClientContext {
+    fn init(_context_name: Option<&CStr>) -> Result<*mut ffi::cubeb> {
+        // TODO: encapsulate connect, etc inside audioipc.
+        let stream = t!(UnixStream::connect(audioipc::get_uds_path()));
+        let ctx = Box::new(ClientContext {
+            _ops: &CLIENT_OPS as *const _,
+            connection: Mutex::new(Connection::new(stream))
+        });
+        Ok(Box::into_raw(ctx) as *mut _)
+    }
+
+    fn backend_id(&self) -> &'static CStr {
+        unsafe { CStr::from_ptr(b"remote\0".as_ptr() as *const _) }
+    }
+
+    fn max_channel_count(&self) -> Result<u32> {
+        send_recv!(self.conn(), ContextGetMaxChannelCount => ContextMaxChannelCount())
+    }
+
+    fn min_latency(&self, params: &StreamParams) -> Result<u32> {
+        let params = messages::StreamParams::from(unsafe { &*params.raw() });
+        send_recv!(self.conn(), ContextGetMinLatency(params) => ContextMinLatency())
+    }
+
+    fn preferred_sample_rate(&self) -> Result<u32> {
+        send_recv!(self.conn(), ContextGetPreferredSampleRate => ContextPreferredSampleRate())
+    }
+
+    fn preferred_channel_layout(&self) -> Result<ffi::cubeb_channel_layout> {
+        send_recv!(self.conn(), ContextGetPreferredChannelLayout => ContextPreferredChannelLayout())
+    }
+
+    fn enumerate_devices(&self, devtype: DeviceType) -> Result<ffi::cubeb_device_collection> {
+        let v: Vec<ffi::cubeb_device_info> =
+            match send_recv!(self.conn(), ContextGetDeviceEnumeration(devtype.bits()) => ContextEnumeratedDevices()) {
+                Ok(mut v) => v.drain(..).map(|i| i.into()).collect(),
+                Err(e) => return Err(e),
+            };
+        let vs = v.into_boxed_slice();
+        let coll = ffi::cubeb_device_collection {
+            count: vs.len(),
+            device: vs.as_ptr()
+        };
+        // Giving away the memory owned by vs.  Don't free it!
+        // Reclaimed in `device_collection_destroy`.
+        mem::forget(vs);
+        Ok(coll)
+    }
+
+    fn device_collection_destroy(&self, collection: *mut ffi::cubeb_device_collection) {
+        unsafe {
+            let coll = &*collection;
+            let mut devices = Vec::from_raw_parts(
+                coll.device as *mut ffi::cubeb_device_info,
+                coll.count,
+                coll.count
+            );
+            for dev in devices.iter_mut() {
+                if !dev.device_id.is_null() {
+                    let _ = CString::from_raw(dev.device_id as *mut _);
+                }
+                if !dev.group_id.is_null() {
+                    let _ = CString::from_raw(dev.group_id as *mut _);
+                }
+                if !dev.vendor_name.is_null() {
+                    let _ = CString::from_raw(dev.vendor_name as *mut _);
+                }
+                if !dev.friendly_name.is_null() {
+                    let _ = CString::from_raw(dev.friendly_name as *mut _);
+                }
+            }
+        }
+    }
+
+    fn stream_init(
+        &self,
+        stream_name: Option<&CStr>,
+        input_device: DeviceId,
+        input_stream_params: Option<&ffi::cubeb_stream_params>,
+        output_device: DeviceId,
+        output_stream_params: Option<&ffi::cubeb_stream_params>,
+        latency_frame: u32,
+        // These params aren't sent to the server
+        data_callback: ffi::cubeb_data_callback,
+        state_callback: ffi::cubeb_state_callback,
+        user_ptr: *mut c_void,
+    ) -> Result<*mut ffi::cubeb_stream> {
+
+        fn opt_stream_params(p: Option<&ffi::cubeb_stream_params>) -> Option<messages::StreamParams> {
+            match p {
+                Some(raw) => Some(messages::StreamParams::from(raw)),
+                None => None,
+            }
+        }
+
+        let stream_name = match stream_name {
+            Some(s) => Some(s.to_bytes().to_vec()),
+            None => None,
+        };
+
+        let input_stream_params = opt_stream_params(input_stream_params);
+        let output_stream_params = opt_stream_params(output_stream_params);
+
+        let init_params = messages::StreamInitParams {
+            stream_name: stream_name,
+            input_device: input_device.raw() as _,
+            input_stream_params: input_stream_params,
+            output_device: output_device.raw() as _,
+            output_stream_params: output_stream_params,
+            latency_frames: latency_frame
+        };
+        stream::init(&self, init_params, data_callback, state_callback, user_ptr)
+    }
+
+    fn register_device_collection_changed(
+        &self,
+        _dev_type: DeviceType,
+        _collection_changed_callback: ffi::cubeb_device_collection_changed_callback,
+        _user_ptr: *mut c_void,
+    ) -> Result<()> {
+        Ok(())
+    }
+}
+
+impl Drop for ClientContext {
+    fn drop(&mut self) {
+        info!("ClientContext drop...");
+        let _: Result<()> = send_recv!(self.conn(), ClientDisconnect => ClientDisconnected);
+    }
+}
new file mode 100644
--- /dev/null
+++ b/media/audioipc/client/src/lib.rs
@@ -0,0 +1,28 @@
+// Copyright © 2017 Mozilla Foundation
+//
+// This program is made available under an ISC-style license.  See the
+// accompanying file LICENSE for details.
+
+extern crate audioipc;
+extern crate cubeb_core;
+#[macro_use]
+extern crate cubeb_backend;
+#[macro_use]
+extern crate log;
+
+#[macro_use]
+mod send_recv;
+mod context;
+mod stream;
+
+use context::ClientContext;
+use cubeb_backend::capi;
+use cubeb_core::ffi;
+use std::os::raw::{c_char, c_int};
+use stream::ClientStream;
+
+#[no_mangle]
+/// Entry point from C code.
+pub unsafe extern "C" fn audioipc_client_init(c: *mut *mut ffi::cubeb, context_name: *const c_char) -> c_int {
+    capi::capi_init::<ClientContext>(c, context_name)
+}
new file mode 100644
--- /dev/null
+++ b/media/audioipc/client/src/send_recv.rs
@@ -0,0 +1,42 @@
+#[macro_export]
+macro_rules! send_recv {
+    ($conn:expr, $smsg:ident => $rmsg:ident) => {{
+        send_recv!(__send $conn, $smsg);
+        send_recv!(__recv $conn, $rmsg)
+    }};
+    ($conn:expr, $smsg:ident => $rmsg:ident()) => {{
+        send_recv!(__send $conn, $smsg);
+        send_recv!(__recv $conn, $rmsg __result)
+    }};
+    ($conn:expr, $smsg:ident($($a:expr),*) => $rmsg:ident) => {{
+        send_recv!(__send $conn, $smsg, $($a),*);
+        send_recv!(__recv $conn, $rmsg)
+    }};
+    ($conn:expr, $smsg:ident($($a:expr),*) => $rmsg:ident()) => {{
+        send_recv!(__send $conn, $smsg, $($a),*);
+        send_recv!(__recv $conn, $rmsg __result)
+    }};
+    //
+    (__send $conn:expr, $smsg:ident) => (
+        $conn.send(ServerMessage::$smsg)
+            .unwrap();
+    );
+    (__send $conn:expr, $smsg:ident, $($a:expr),*) => (
+        $conn.send(ServerMessage::$smsg($($a),*))
+            .unwrap();
+    );
+    (__recv $conn:expr, $rmsg:ident) => (
+        if let ClientMessage::$rmsg = $conn.receive().unwrap() {
+            Ok(())
+        } else {
+            panic!("wrong message received");
+        }
+    );
+    (__recv $conn:expr, $rmsg:ident __result) => (
+        if let ClientMessage::$rmsg(v) = $conn.receive().unwrap() {
+            Ok(v)
+        } else {
+            panic!("wrong message received");
+        }
+    )
+}
new file mode 100644
--- /dev/null
+++ b/media/audioipc/client/src/stream.rs
@@ -0,0 +1,239 @@
+// Copyright © 2017 Mozilla Foundation
+//
+// This program is made available under an ISC-style license.  See the
+// accompanying file LICENSE for details
+
+use ClientContext;
+use audioipc::{ClientMessage, Connection, ServerMessage, messages};
+use audioipc::shm::{SharedMemSlice, SharedMemMutSlice};
+use cubeb_backend::Stream;
+use cubeb_core::{ErrorCode, Result, ffi};
+use std::ffi::CString;
+use std::os::raw::c_void;
+use std::os::unix::io::FromRawFd;
+use std::fs::File;
+use std::ptr;
+use std::thread;
+
+// TODO: Remove and let caller allocate based on cubeb backend requirements.
+const SHM_AREA_SIZE: usize = 2 * 1024 * 1024;
+
+pub struct ClientStream<'ctx> {
+    // This must be a reference to Context for cubeb, cubeb accesses stream methods via stream->context->ops
+    context: &'ctx ClientContext,
+    token: usize,
+    join_handle: Option<thread::JoinHandle<()>>
+}
+
+fn stream_thread(
+    mut conn: Connection,
+    input_shm: SharedMemSlice,
+    mut output_shm: SharedMemMutSlice,
+    data_cb: ffi::cubeb_data_callback,
+    state_cb: ffi::cubeb_state_callback,
+    user_ptr: usize,
+) {
+    loop {
+        let r = match conn.receive::<ClientMessage>() {
+            Ok(r) => r,
+            Err(e) => {
+                debug!("stream_thread: Failed to receive message: {:?}", e);
+                continue;
+            },
+        };
+
+        match r {
+            ClientMessage::StreamDestroyed => {
+                info!("stream_thread: Shutdown callback thread.");
+                return;
+            },
+            ClientMessage::StreamDataCallback(nframes, frame_size) => {
+                info!(
+                    "stream_thread: Data Callback: nframes={} frame_size={}",
+                    nframes,
+                    frame_size
+                );
+                // TODO: This is proof-of-concept. Make it better.
+                let input_ptr: *const u8 = input_shm.get_slice(nframes as usize * frame_size).unwrap().as_ptr();
+                let output_ptr: *mut u8 = output_shm.get_mut_slice(nframes as usize * frame_size).unwrap().as_mut_ptr();
+                let nframes = data_cb(
+                    ptr::null_mut(),
+                    user_ptr as *mut c_void,
+                    input_ptr as *const _,
+                    output_ptr as *mut _,
+                    nframes as _
+                );
+                conn.send(ServerMessage::StreamDataCallback(nframes as isize)).unwrap();
+            },
+            ClientMessage::StreamStateCallback(state) => {
+                info!("stream_thread: State Callback: {:?}", state);
+                state_cb(ptr::null_mut(), user_ptr as *mut _, state);
+            },
+            m => {
+                info!("Unexpected ClientMessage: {:?}", m);
+            },
+        }
+    }
+}
+
+impl<'ctx> ClientStream<'ctx> {
+    fn init(
+        ctx: &'ctx ClientContext,
+        init_params: messages::StreamInitParams,
+        data_callback: ffi::cubeb_data_callback,
+        state_callback: ffi::cubeb_state_callback,
+        user_ptr: *mut c_void,
+    ) -> Result<*mut ffi::cubeb_stream> {
+
+        ctx.conn()
+            .send(ServerMessage::StreamInit(init_params))
+            .unwrap();
+
+        let r = match ctx.conn().receive_with_fd::<ClientMessage>() {
+            Ok(r) => r,
+            Err(_) => return Err(ErrorCode::Error.into()),
+        };
+
+        let (token, conn) = match r {
+            (ClientMessage::StreamCreated(tok), Some(fd)) => (tok, unsafe {
+                Connection::from_raw_fd(fd)
+            }),
+            (ClientMessage::StreamCreated(_), None) => {
+                debug!("Missing fd!");
+                return Err(ErrorCode::Error.into());
+            },
+            (m, _) => {
+                debug!("Unexpected message: {:?}", m);
+                return Err(ErrorCode::Error.into());
+            },
+        };
+
+        // TODO: It'd be nicer to receive these two fds as part of
+        // StreamCreated, but that requires changing sendmsg/recvmsg to
+        // support multiple fds.
+        let r = match ctx.conn().receive_with_fd::<ClientMessage>() {
+            Ok(r) => r,
+            Err(_) => return Err(ErrorCode::Error.into()),
+        };
+
+        let input_file = match r {
+            (ClientMessage::StreamCreatedInputShm, Some(fd)) => unsafe {
+                File::from_raw_fd(fd)
+            },
+            (m, _) => {
+                debug!("Unexpected message: {:?}", m);
+                return Err(ErrorCode::Error.into());
+            },
+        };
+
+        let input_shm = SharedMemSlice::from(input_file,
+                                             SHM_AREA_SIZE).unwrap();
+
+        let r = match ctx.conn().receive_with_fd::<ClientMessage>() {
+            Ok(r) => r,
+            Err(_) => return Err(ErrorCode::Error.into()),
+        };
+
+        let output_file = match r {
+            (ClientMessage::StreamCreatedOutputShm, Some(fd)) => unsafe {
+                File::from_raw_fd(fd)
+            },
+            (m, _) => {
+                debug!("Unexpected message: {:?}", m);
+                return Err(ErrorCode::Error.into());
+            },
+        };
+
+        let output_shm = SharedMemMutSlice::from(output_file,
+                                                 SHM_AREA_SIZE).unwrap();
+
+        let user_data = user_ptr as usize;
+        let join_handle = thread::spawn(move || {
+            stream_thread(conn, input_shm, output_shm, data_callback, state_callback, user_data)
+        });
+
+        Ok(Box::into_raw(Box::new(ClientStream {
+            context: ctx,
+            token: token,
+            join_handle: Some(join_handle)
+        })) as _)
+    }
+}
+
+impl<'ctx> Drop for ClientStream<'ctx> {
+    fn drop(&mut self) {
+        let _: Result<()> = send_recv!(self.context.conn(), StreamDestroy(self.token) => StreamDestroyed);
+        self.join_handle.take().unwrap().join().unwrap();
+    }
+}
+
+impl<'ctx> Stream for ClientStream<'ctx> {
+    fn start(&self) -> Result<()> {
+        send_recv!(self.context.conn(), StreamStart(self.token) => StreamStarted)
+    }
+
+    fn stop(&self) -> Result<()> {
+        send_recv!(self.context.conn(), StreamStop(self.token) => StreamStopped)
+    }
+
+    fn reset_default_device(&self) -> Result<()> {
+        send_recv!(self.context.conn(), StreamResetDefaultDevice(self.token) => StreamDefaultDeviceReset)
+    }
+
+    fn position(&self) -> Result<u64> {
+        send_recv!(self.context.conn(), StreamGetPosition(self.token) => StreamPosition())
+    }
+
+    fn latency(&self) -> Result<u32> {
+        send_recv!(self.context.conn(), StreamGetLatency(self.token) => StreamLatency())
+    }
+
+    fn set_volume(&self, volume: f32) -> Result<()> {
+        send_recv!(self.context.conn(), StreamSetVolume(self.token, volume) => StreamVolumeSet)
+    }
+
+    fn set_panning(&self, panning: f32) -> Result<()> {
+        send_recv!(self.context.conn(), StreamSetPanning(self.token, panning) => StreamPanningSet)
+    }
+
+    fn current_device(&self) -> Result<*const ffi::cubeb_device> {
+        match send_recv!(self.context.conn(), StreamGetCurrentDevice(self.token) => StreamCurrentDevice()) {
+            Ok(d) => Ok(Box::into_raw(Box::new(d.into()))),
+            Err(e) => Err(e),
+        }
+    }
+
+    fn device_destroy(&self, device: *const ffi::cubeb_device) -> Result<()> {
+        // It's all unsafe...
+        if !device.is_null() {
+            unsafe {
+                if !(*device).output_name.is_null() {
+                    let _ = CString::from_raw((*device).output_name as *mut _);
+                }
+                if !(*device).input_name.is_null() {
+                    let _ = CString::from_raw((*device).input_name as *mut _);
+                }
+                let _: Box<ffi::cubeb_device> = Box::from_raw(device as *mut _);
+            }
+        }
+        Ok(())
+    }
+
+    // TODO: How do we call this back? On what thread?
+    fn register_device_changed_callback(
+        &self,
+        _device_changed_callback: ffi::cubeb_device_changed_callback,
+    ) -> Result<()> {
+        Ok(())
+    }
+}
+
+pub fn init(
+    ctx: &ClientContext,
+    init_params: messages::StreamInitParams,
+    data_callback: ffi::cubeb_data_callback,
+    state_callback: ffi::cubeb_state_callback,
+    user_ptr: *mut c_void,
+) -> Result<*mut ffi::cubeb_stream> {
+    ClientStream::init(ctx, init_params, data_callback, state_callback, user_ptr)
+}
new file mode 100644
--- /dev/null
+++ b/media/audioipc/rustfmt.toml
@@ -0,0 +1,10 @@
+ideal_width = 80
+match_block_trailing_comma = true
+max_width = 120
+newline_style = "Unix"
+normalize_comments = false
+struct_lit_multiline_style = "ForceMulti"
+where_trailing_comma = true
+reorder_imports = true
+reorder_imported_names = true
+trailing_comma = "Never"
\ No newline at end of file
new file mode 100644
--- /dev/null
+++ b/media/audioipc/server/Cargo.toml
@@ -0,0 +1,17 @@
+[package]
+name = "audioipc-server"
+version = "0.1.0"
+authors = ["Dan Glastonbury <dan.glastonbury@gmail.com>"]
+description = "Remote cubeb server"
+
+[dependencies]
+audioipc = { path = "../audioipc" }
+cubeb = { git = "https://github.com/djg/cubeb-rs", version="^0.3" }
+cubeb-core = { git = "https://github.com/djg/cubeb-rs", version="^0.1" }
+error-chain = "0.10.0"
+lazycell = "^0.4"
+log = "^0.3.6"
+mio = "0.6.7"
+mio-uds = "0.6.4"
+slab = "0.3.0"
+
new file mode 100644
--- /dev/null
+++ b/media/audioipc/server/src/channel.rs
@@ -0,0 +1,251 @@
+//! Thread safe communication channel implementing `Evented`
+
+use lazycell::{AtomicLazyCell, LazyCell};
+use mio::{Evented, Poll, PollOpt, Ready, Registration, SetReadiness, Token};
+use std::{fmt, io};
+use std::any::Any;
+use std::error;
+use std::sync::{Arc, mpsc};
+use std::sync::atomic::{AtomicUsize, Ordering};
+
+pub fn ctl_pair() -> (SenderCtl, ReceiverCtl) {
+    let inner = Arc::new(Inner {
+        pending: AtomicUsize::new(0),
+        senders: AtomicUsize::new(1),
+        set_readiness: AtomicLazyCell::new()
+    });
+
+    let tx = SenderCtl {
+        inner: inner.clone()
+    };
+
+    let rx = ReceiverCtl {
+        registration: LazyCell::new(),
+        inner: inner
+    };
+
+    (tx, rx)
+}
+
+/// Tracks messages sent on a channel in order to update readiness.
+pub struct SenderCtl {
+    inner: Arc<Inner>
+}
+
+/// Tracks messages received on a channel in order to track readiness.
+pub struct ReceiverCtl {
+    registration: LazyCell<Registration>,
+    inner: Arc<Inner>
+}
+
+pub enum SendError<T> {
+    Io(io::Error),
+    Disconnected(T)
+}
+
+pub enum TrySendError<T> {
+    Io(io::Error),
+    Full(T),
+    Disconnected(T)
+}
+
+struct Inner {
+    // The number of outstanding messages for the receiver to read
+    pending: AtomicUsize,
+    // The number of sender handles
+    senders: AtomicUsize,
+    // The set readiness handle
+    set_readiness: AtomicLazyCell<SetReadiness>
+}
+
+/*
+ *
+ * ===== SenderCtl / ReceiverCtl =====
+ *
+ */
+
+impl SenderCtl {
+    /// Call to track that a message has been sent
+    pub fn inc(&self) -> io::Result<()> {
+        let cnt = self.inner.pending.fetch_add(1, Ordering::Acquire);
+
+        if 0 == cnt {
+            // Toggle readiness to readable
+            if let Some(set_readiness) = self.inner.set_readiness.borrow() {
+                try!(set_readiness.set_readiness(Ready::readable()));
+            }
+        }
+
+        Ok(())
+    }
+}
+
+impl Clone for SenderCtl {
+    fn clone(&self) -> SenderCtl {
+        self.inner.senders.fetch_add(1, Ordering::Relaxed);
+        SenderCtl {
+            inner: self.inner.clone()
+        }
+    }
+}
+
+impl Drop for SenderCtl {
+    fn drop(&mut self) {
+        if self.inner.senders.fetch_sub(1, Ordering::Release) == 1 {
+            let _ = self.inc();
+        }
+    }
+}
+
+impl Evented for ReceiverCtl {
+    fn register(&self, poll: &Poll, token: Token, interest: Ready, opts: PollOpt) -> io::Result<()> {
+        if self.registration.borrow().is_some() {
+            return Err(io::Error::new(
+                io::ErrorKind::Other,
+                "receiver already registered"
+            ));
+        }
+
+        let (registration, set_readiness) = Registration::new2();
+        try!(registration.register(poll, token, interest, opts));
+
+        if self.inner.pending.load(Ordering::Relaxed) > 0 {
+            // TODO: Don't drop readiness
+            let _ = set_readiness.set_readiness(Ready::readable());
+        }
+
+        self.registration.fill(registration).ok().expect(
+            "unexpected state encountered"
+        );
+        self.inner.set_readiness.fill(set_readiness).ok().expect(
+            "unexpected state encountered"
+        );
+
+        Ok(())
+    }
+
+    fn reregister(&self, poll: &Poll, token: Token, interest: Ready, opts: PollOpt) -> io::Result<()> {
+        match self.registration.borrow() {
+            Some(registration) => registration.reregister(poll, token, interest, opts),
+            None => Err(io::Error::new(
+                io::ErrorKind::Other,
+                "receiver not registered"
+            )),
+        }
+    }
+
+    fn deregister(&self, poll: &Poll) -> io::Result<()> {
+        match self.registration.borrow() {
+            Some(registration) => <Registration as Evented>::deregister(&registration, poll),
+            None => Err(io::Error::new(
+                io::ErrorKind::Other,
+                "receiver not registered"
+            )),
+        }
+    }
+}
+
+/*
+ *
+ * ===== Error conversions =====
+ *
+ */
+
+impl<T> From<mpsc::SendError<T>> for SendError<T> {
+    fn from(src: mpsc::SendError<T>) -> SendError<T> {
+        SendError::Disconnected(src.0)
+    }
+}
+
+impl<T> From<io::Error> for SendError<T> {
+    fn from(src: io::Error) -> SendError<T> {
+        SendError::Io(src)
+    }
+}
+
+impl<T> From<mpsc::TrySendError<T>> for TrySendError<T> {
+    fn from(src: mpsc::TrySendError<T>) -> TrySendError<T> {
+        match src {
+            mpsc::TrySendError::Full(v) => TrySendError::Full(v),
+            mpsc::TrySendError::Disconnected(v) => TrySendError::Disconnected(v),
+        }
+    }
+}
+
+impl<T> From<mpsc::SendError<T>> for TrySendError<T> {
+    fn from(src: mpsc::SendError<T>) -> TrySendError<T> {
+        TrySendError::Disconnected(src.0)
+    }
+}
+
+impl<T> From<io::Error> for TrySendError<T> {
+    fn from(src: io::Error) -> TrySendError<T> {
+        TrySendError::Io(src)
+    }
+}
+
+/*
+ *
+ * ===== Implement Error, Debug and Display for Errors =====
+ *
+ */
+
+impl<T: Any> error::Error for SendError<T> {
+    fn description(&self) -> &str {
+        match self {
+            &SendError::Io(ref io_err) => io_err.description(),
+            &SendError::Disconnected(..) => "Disconnected",
+        }
+    }
+}
+
+impl<T: Any> error::Error for TrySendError<T> {
+    fn description(&self) -> &str {
+        match self {
+            &TrySendError::Io(ref io_err) => io_err.description(),
+            &TrySendError::Full(..) => "Full",
+            &TrySendError::Disconnected(..) => "Disconnected",
+        }
+    }
+}
+
+impl<T> fmt::Debug for SendError<T> {
+    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
+        format_send_error(self, f)
+    }
+}
+
+impl<T> fmt::Display for SendError<T> {
+    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
+        format_send_error(self, f)
+    }
+}
+
+impl<T> fmt::Debug for TrySendError<T> {
+    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
+        format_try_send_error(self, f)
+    }
+}
+
+impl<T> fmt::Display for TrySendError<T> {
+    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
+        format_try_send_error(self, f)
+    }
+}
+
+#[inline]
+fn format_send_error<T>(e: &SendError<T>, f: &mut fmt::Formatter) -> fmt::Result {
+    match e {
+        &SendError::Io(ref io_err) => write!(f, "{}", io_err),
+        &SendError::Disconnected(..) => write!(f, "Disconnected"),
+    }
+}
+
+#[inline]
+fn format_try_send_error<T>(e: &TrySendError<T>, f: &mut fmt::Formatter) -> fmt::Result {
+    match e {
+        &TrySendError::Io(ref io_err) => write!(f, "{}", io_err),
+        &TrySendError::Full(..) => write!(f, "Full"),
+        &TrySendError::Disconnected(..) => write!(f, "Disconnected"),
+    }
+}
new file mode 100644
--- /dev/null
+++ b/media/audioipc/server/src/lib.rs
@@ -0,0 +1,636 @@
+#[macro_use]
+extern crate error_chain;
+
+#[macro_use]
+extern crate log;
+
+extern crate audioipc;
+extern crate cubeb;
+extern crate cubeb_core;
+extern crate lazycell;
+extern crate mio;
+extern crate mio_uds;
+extern crate slab;
+
+use audioipc::messages::{ClientMessage, DeviceInfo, ServerMessage, StreamParams};
+use audioipc::shm::{SharedMemReader, SharedMemWriter};
+use cubeb_core::binding::Binding;
+use cubeb_core::ffi;
+use mio::Token;
+use mio_uds::UnixListener;
+use std::{slice, thread};
+use std::convert::From;
+use std::os::raw::c_void;
+use std::os::unix::prelude::*;
+use std::sync::Arc;
+use std::sync::atomic::{AtomicBool, Ordering};
+
+mod channel;
+
+pub mod errors {
+    error_chain! {
+        links {
+            AudioIPC(::audioipc::errors::Error, ::audioipc::errors::ErrorKind);
+        }
+        foreign_links {
+            Cubeb(::cubeb_core::Error);
+            Io(::std::io::Error);
+        }
+    }
+}
+
+use errors::*;
+
+// TODO: Remove and let caller allocate based on cubeb backend requirements.
+const SHM_AREA_SIZE: usize = 2 * 1024 * 1024;
+
+// TODO: this should forward to the client.
+struct Callback {
+    /// Size of input frame in bytes
+    input_frame_size: u16,
+    /// Size of output frame in bytes
+    output_frame_size: u16,
+    connection: audioipc::Connection,
+    input_shm: SharedMemWriter,
+    output_shm: SharedMemReader,
+}
+
+impl cubeb::StreamCallback for Callback {
+    type Frame = u8;
+
+    fn data_callback(&mut self, input: &[u8], output: &mut [u8]) -> isize {
+        info!("Stream data callback: {} {}", input.len(), output.len());
+
+        // len is of input and output is frame len. Turn these into the real lengths.
+        let real_input = unsafe {
+            let size_bytes = input.len() * self.input_frame_size as usize;
+            slice::from_raw_parts(input.as_ptr(), size_bytes)
+        };
+        let real_output = unsafe {
+            let size_bytes = output.len() * self.output_frame_size as usize;
+            info!("Resize output to {}", size_bytes);
+            slice::from_raw_parts_mut(output.as_mut_ptr(), size_bytes)
+        };
+
+        self.input_shm.write(&real_input).unwrap();
+
+        self.connection
+            .send(ClientMessage::StreamDataCallback(
+                output.len() as isize,
+                self.output_frame_size as usize
+            ))
+            .unwrap();
+
+        let r = self.connection.receive();
+        match r {
+            Ok(ServerMessage::StreamDataCallback(cb_result)) => {
+                if cb_result >= 0 {
+                    let len = cb_result as usize * self.output_frame_size as usize;
+                    self.output_shm.read(&mut real_output[..len]).unwrap();
+                    cb_result
+                } else {
+                    cb_result
+                }
+            },
+            _ => {
+                debug!("Unexpected message {:?} during callback", r);
+                -1
+            },
+        }
+    }
+
+    fn state_callback(&mut self, state: cubeb::State) {
+        info!("Stream state callback: {:?}", state);
+    }
+}
+
+impl Drop for Callback {
+    fn drop(&mut self) {
+        self.connection
+            .send(ClientMessage::StreamDestroyed)
+            .unwrap();
+    }
+}
+
+type Slab<T> = slab::Slab<T, Token>;
+type StreamSlab = slab::Slab<cubeb::Stream<Callback>, usize>;
+
+// TODO: Server token must be outside range used by server.connections slab.
+// usize::MAX is already used internally in mio.
+const QUIT: Token = Token(std::usize::MAX - 2);
+const SERVER: Token = Token(std::usize::MAX - 1);
+
+struct ServerConn {
+    connection: audioipc::Connection,
+    token: Option<Token>,
+    streams: StreamSlab
+}
+
+impl ServerConn {
+    fn new<FD>(fd: FD) -> ServerConn
+    where
+        FD: IntoRawFd,
+    {
+        ServerConn {
+            connection: unsafe { audioipc::Connection::from_raw_fd(fd.into_raw_fd()) },
+            token: None,
+            // TODO: Handle increasing slab size. Pick a good default size.
+            streams: StreamSlab::with_capacity(64)
+        }
+    }
+
+    fn process(&mut self, poll: &mut mio::Poll, context: &Result<Option<cubeb::Context>>) -> Result<()> {
+        let r = self.connection.receive();
+        info!("ServerConn::process: got {:?}", r);
+
+        if let &Ok(Some(ref ctx)) = context {
+            // TODO: Might need a simple state machine to deal with
+            // create/use/destroy ordering, etc.
+            // TODO: receive() and all this handling should be moved out
+            // of this event loop code.
+            let msg = try!(r);
+            let _ = try!(self.process_msg(&msg, ctx));
+        } else {
+            self.send_error(cubeb::Error::new());
+        }
+
+        poll.reregister(
+            &self.connection,
+            self.token.unwrap(),
+            mio::Ready::readable(),
+            mio::PollOpt::edge() | mio::PollOpt::oneshot()
+        ).unwrap();
+
+        Ok(())
+    }
+
+    fn process_msg(&mut self, msg: &ServerMessage, context: &cubeb::Context) -> Result<()> {
+        match msg {
+            &ServerMessage::ClientConnect => {
+                panic!("already connected");
+            },
+            &ServerMessage::ClientDisconnect => {
+                // TODO:
+                //self.connection.client_disconnect();
+                self.connection
+                    .send(ClientMessage::ClientDisconnected)
+                    .unwrap();
+            },
+
+            &ServerMessage::ContextGetBackendId => {},
+
+            &ServerMessage::ContextGetMaxChannelCount => {
+                match context.max_channel_count() {
+                    Ok(channel_count) => {
+                        self.connection
+                            .send(ClientMessage::ContextMaxChannelCount(channel_count))
+                            .unwrap();
+                    },
+                    Err(e) => {
+                        self.send_error(e);
+                    },
+                }
+            },
+
+            &ServerMessage::ContextGetMinLatency(ref params) => {
+
+                let format = cubeb::SampleFormat::from(params.format);
+                let layout = cubeb::ChannelLayout::from(params.layout);
+
+                let params = cubeb::StreamParamsBuilder::new()
+                    .format(format)
+                    .rate(params.rate as _)
+                    .channels(params.channels as _)
+                    .layout(layout)
+                    .take();
+
+                match context.min_latency(&params) {
+                    Ok(latency) => {
+                        self.connection
+                            .send(ClientMessage::ContextMinLatency(latency))
+                            .unwrap();
+                    },
+                    Err(e) => {
+                        self.send_error(e);
+                    },
+                }
+            },
+
+            &ServerMessage::ContextGetPreferredSampleRate => {
+                match context.preferred_sample_rate() {
+                    Ok(rate) => {
+                        self.connection
+                            .send(ClientMessage::ContextPreferredSampleRate(rate))
+                            .unwrap();
+                    },
+                    Err(e) => {
+                        self.send_error(e);
+                    },
+                }
+            },
+
+            &ServerMessage::ContextGetPreferredChannelLayout => {
+                match context.preferred_channel_layout() {
+                    Ok(layout) => {
+                        self.connection
+                            .send(ClientMessage::ContextPreferredChannelLayout(layout as _))
+                            .unwrap();
+                    },
+                    Err(e) => {
+                        self.send_error(e);
+                    },
+                }
+            },
+
+            &ServerMessage::ContextGetDeviceEnumeration(device_type) => {
+                match context.enumerate_devices(cubeb::DeviceType::from_bits_truncate(device_type)) {
+                    Ok(devices) => {
+                        let v: Vec<DeviceInfo> = devices.iter().map(|i| i.raw().into()).collect();
+                        self.connection
+                            .send(ClientMessage::ContextEnumeratedDevices(v))
+                            .unwrap();
+                    },
+                    Err(e) => {
+                        self.send_error(e);
+                    },
+                }
+            },
+
+            &ServerMessage::StreamInit(ref params) => {
+                fn opt_stream_params(params: Option<&StreamParams>) -> Option<cubeb::StreamParams> {
+                    match params {
+                        Some(p) => {
+                            let raw = ffi::cubeb_stream_params::from(p);
+                            Some(unsafe { cubeb::StreamParams::from_raw(&raw as *const _) })
+                        },
+                        None => None,
+                    }
+                }
+
+                fn frame_size_in_bytes(params: Option<cubeb::StreamParams>) -> u16 {
+                    match params.as_ref() {
+                        Some(p) => {
+                            let sample_size = match p.format() {
+                                cubeb::SampleFormat::S16LE |
+                                cubeb::SampleFormat::S16BE |
+                                cubeb::SampleFormat::S16NE => 2,
+                                cubeb::SampleFormat::Float32LE |
+                                cubeb::SampleFormat::Float32BE |
+                                cubeb::SampleFormat::Float32NE => 4,
+                            };
+                            let channel_count = p.channels() as u16;
+                            sample_size * channel_count
+                        },
+                        None => 0,
+                    }
+                }
+
+                // TODO: Yuck!
+                let input_device = unsafe { cubeb::DeviceId::from_raw(params.input_device as *const _) };
+                let output_device = unsafe { cubeb::DeviceId::from_raw(params.output_device as *const _) };
+                let latency = params.latency_frames;
+                let mut builder = cubeb::StreamInitOptionsBuilder::new();
+                builder
+                    .input_device(input_device)
+                    .output_device(output_device)
+                    .latency(latency);
+
+                if let Some(ref stream_name) = params.stream_name {
+                    builder.stream_name(stream_name);
+                }
+                let input_stream_params = opt_stream_params(params.input_stream_params.as_ref());
+                if let Some(ref isp) = input_stream_params {
+                    builder.input_stream_param(isp);
+                }
+                let output_stream_params = opt_stream_params(params.output_stream_params.as_ref());
+                if let Some(ref osp) = output_stream_params {
+                    builder.output_stream_param(osp);
+                }
+                let params = builder.take();
+
+                let input_frame_size = frame_size_in_bytes(input_stream_params);
+                let output_frame_size = frame_size_in_bytes(output_stream_params);
+
+                let (conn1, conn2) = audioipc::Connection::pair()?;
+                info!("Created connection pair: {:?}-{:?}", conn1, conn2);
+
+                let (input_shm, input_file) =
+                    SharedMemWriter::new(&audioipc::get_shm_path("input"), SHM_AREA_SIZE)?;
+                let (output_shm, output_file) =
+                    SharedMemReader::new(&audioipc::get_shm_path("output"), SHM_AREA_SIZE)?;
+
+                match context.stream_init(
+                    &params,
+                    Callback {
+                        input_frame_size: input_frame_size,
+                        output_frame_size: output_frame_size,
+                        connection: conn2,
+                        input_shm: input_shm,
+                        output_shm: output_shm,
+                    }
+                ) {
+                    Ok(stream) => {
+                        let stm_tok = match self.streams.vacant_entry() {
+                            Some(entry) => {
+                                debug!(
+                                    "Registering stream {:?}",
+                                    entry.index(),
+                                );
+
+                                entry.insert(stream).index()
+                            },
+                            None => {
+                                // TODO: Turn into error
+                                panic!("Failed to insert stream into slab. No entries");
+                            },
+                        };
+
+                        self.connection
+                            .send_with_fd(ClientMessage::StreamCreated(stm_tok), Some(conn1))
+                            .unwrap();
+                        // TODO: It'd be nicer to send these as part of
+                        // StreamCreated, but that requires changing
+                        // sendmsg/recvmsg to support multiple fds.
+                        self.connection
+                            .send_with_fd(ClientMessage::StreamCreatedInputShm, Some(input_file))
+                            .unwrap();
+                        self.connection
+                            .send_with_fd(ClientMessage::StreamCreatedOutputShm, Some(output_file))
+                            .unwrap();
+                    },
+                    Err(e) => {
+                        self.send_error(e);
+                    },
+                }
+            },
+
+            &ServerMessage::StreamDestroy(stm_tok) => {
+                self.streams.remove(stm_tok);
+                self.connection
+                    .send(ClientMessage::StreamDestroyed)
+                    .unwrap();
+            },
+
+            &ServerMessage::StreamStart(stm_tok) => {
+                let _ = self.streams[stm_tok].start();
+                self.connection.send(ClientMessage::StreamStarted).unwrap();
+            },
+            &ServerMessage::StreamStop(stm_tok) => {
+                let _ = self.streams[stm_tok].stop();
+                self.connection.send(ClientMessage::StreamStopped).unwrap();
+            },
+            &ServerMessage::StreamGetPosition(stm_tok) => {
+                match self.streams[stm_tok].position() {
+                    Ok(position) => {
+                        self.connection
+                            .send(ClientMessage::StreamPosition(position))
+                            .unwrap();
+                    },
+                    Err(e) => {
+                        self.send_error(e);
+                    },
+                }
+            },
+            &ServerMessage::StreamGetLatency(stm_tok) => {
+                match self.streams[stm_tok].latency() {
+                    Ok(latency) => {
+                        self.connection
+                            .send(ClientMessage::StreamLatency(latency))
+                            .unwrap();
+                    },
+                    Err(e) => self.send_error(e),
+                }
+            },
+            &ServerMessage::StreamSetVolume(stm_tok, volume) => {
+                let _ = self.streams[stm_tok].set_volume(volume);
+                self.connection
+                    .send(ClientMessage::StreamVolumeSet)
+                    .unwrap();
+            },
+            &ServerMessage::StreamSetPanning(stm_tok, panning) => {
+                let _ = self.streams[stm_tok].set_panning(panning);
+                self.connection
+                    .send(ClientMessage::StreamPanningSet)
+                    .unwrap();
+            },
+            &ServerMessage::StreamGetCurrentDevice(stm_tok) => {
+                let err = match self.streams[stm_tok].current_device() {
+                    Ok(device) => {
+                        // TODO: Yuck!
+                        self.connection
+                            .send(ClientMessage::StreamCurrentDevice(device.into()))
+                            .unwrap();
+                        None
+                    },
+                    Err(e) => Some(e),
+                };
+                if let Some(e) = err {
+                    self.send_error(e);
+                }
+            },
+            _ => {
+                bail!("Unexpected Message");
+            },
+        }
+        Ok(())
+    }
+
+    fn send_error(&mut self, error: cubeb::Error) {
+        self.connection
+            .send(ClientMessage::ContextError(error.raw_code()))
+            .unwrap();
+    }
+}
+
+pub struct Server {
+    socket: UnixListener,
+    // Ok(None)      - Server hasn't tried to create cubeb::Context.
+    // Ok(Some(ctx)) - Server has successfully created cubeb::Context.
+    // Err(_)        - Server has tried and failed to create cubeb::Context.
+    //                 Don't try again.
+    context: Result<Option<cubeb::Context>>,
+    conns: Slab<ServerConn>
+}
+
+impl Server {
+    pub fn new(socket: UnixListener) -> Server {
+        Server {
+            socket: socket,
+            context: Ok(None),
+            conns: Slab::with_capacity(16)
+        }
+    }
+
+    fn accept(&mut self, poll: &mut mio::Poll) -> Result<()> {
+        debug!("Server accepting connection");
+
+        let client_socket = match self.socket.accept() {
+            Err(e) => {
+                error!("server accept error: {}", e);
+                return Err(e.into());
+            },
+            Ok(None) => panic!("accept returned EAGAIN unexpectedly"),
+            Ok(Some((socket, _))) => socket,
+        };
+        let token = match self.conns.vacant_entry() {
+            Some(entry) => {
+                debug!("registering {:?}", entry.index());
+                let cxn = ServerConn::new(client_socket);
+                entry.insert(cxn).index()
+            },
+            None => {
+                panic!("failed to insert connection");
+            },
+        };
+
+        // Register the connection
+        self.conns[token].token = Some(token);
+        poll.register(
+            &self.conns[token].connection,
+            token,
+            mio::Ready::readable(),
+            mio::PollOpt::edge() | mio::PollOpt::oneshot()
+        ).unwrap();
+        /*
+        let r = self.conns[token].receive();
+        debug!("received {:?}", r);
+        let r = self.conns[token].send(ClientMessage::ClientConnected);
+        debug!("sent {:?} (ClientConnected)", r);
+         */
+
+        // Since we have a connection try creating a cubeb context. If
+        // it fails, mark the failure with Err.
+        if let Ok(None) = self.context {
+            self.context = cubeb::Context::init("AudioIPC Server", None)
+                .and_then(|ctx| Ok(Some(ctx)))
+                .or_else(|err| Err(err.into()));
+        }
+
+        Ok(())
+    }
+
+    pub fn poll(&mut self, poll: &mut mio::Poll) -> Result<()> {
+        let mut events = mio::Events::with_capacity(16);
+
+        match poll.poll(&mut events, None) {
+            Ok(_) => {},
+            Err(e) => error!("server poll error: {}", e),
+        }
+
+        for event in events.iter() {
+            match event.token() {
+                SERVER => {
+                    match self.accept(poll) {
+                        Err(e) => {
+                            error!("server accept error: {}", e);
+                        },
+                        _ => {},
+                    };
+                },
+                QUIT => {
+                    info!("Quitting Audio Server loop");
+                    bail!("quit");
+                },
+                token => {
+                    debug!("token {:?} ready", token);
+
+                    let r = self.conns[token].process(poll, &self.context);
+
+                    debug!("got {:?}", r);
+
+                    // TODO: Handle disconnection etc.
+                    // TODO: Should be handled at a higher level by a
+                    // disconnect message.
+                    if let Err(e) = r {
+                        debug!("dropped client {:?} due to error {:?}", token, e);
+                        self.conns.remove(token);
+                        continue;
+                    }
+
+                    // poll.reregister(
+                    //     &self.conn(token).connection,
+                    //     token,
+                    //     mio::Ready::readable(),
+                    //     mio::PollOpt::edge() | mio::PollOpt::oneshot()
+                    // ).unwrap();
+                },
+            }
+        }
+
+        Ok(())
+    }
+}
+
+
+// TODO: This should take an "Evented" instead of opening the UDS path
+// directly (and let caller set up the Evented), but need a way to describe
+// it as an Evented that we can send/recv file descriptors (or HANDLEs on
+// Windows) over.
+pub fn run(running: Arc<AtomicBool>) -> Result<()> {
+
+    // Ignore result.
+    let _ = std::fs::remove_file(audioipc::get_uds_path());
+
+    // TODO: Use a SEQPACKET, wrap it in UnixStream?
+    let mut poll = mio::Poll::new()?;
+    let mut server = Server::new(UnixListener::bind(audioipc::get_uds_path())?);
+
+    poll.register(
+        &server.socket,
+        SERVER,
+        mio::Ready::readable(),
+        mio::PollOpt::edge()
+    ).unwrap();
+
+    loop {
+        if !running.load(Ordering::SeqCst) {
+            bail!("server quit due to ctrl-c");
+        }
+
+        let _ = try!(server.poll(&mut poll));
+    }
+
+    //poll.deregister(&server.socket).unwrap();
+}
+
+#[no_mangle]
+pub extern "C" fn audioipc_server_start() -> *mut c_void {
+
+    let (tx, rx) = channel::ctl_pair();
+
+    thread::spawn(move || {
+        // Ignore result.
+        let _ = std::fs::remove_file(audioipc::get_uds_path());
+
+        // TODO: Use a SEQPACKET, wrap it in UnixStream?
+        let mut poll = mio::Poll::new().unwrap();
+        let mut server = Server::new(UnixListener::bind(audioipc::get_uds_path()).unwrap());
+
+        poll.register(
+            &server.socket,
+            SERVER,
+            mio::Ready::readable(),
+            mio::PollOpt::edge()
+        ).unwrap();
+
+        poll.register(&rx, QUIT, mio::Ready::readable(), mio::PollOpt::edge())
+            .unwrap();
+
+        loop {
+            match server.poll(&mut poll) {
+                Err(_) => {
+                    return;
+                },
+                _ => (),
+            }
+        }
+    });
+
+    Box::into_raw(Box::new(tx)) as *mut _
+}
+
+#[no_mangle]
+pub extern "C" fn audioipc_server_stop(p: *mut c_void) {
+    // Dropping SenderCtl here will notify the other end.
+    let _ = unsafe { Box::<channel::SenderCtl>::from_raw(p as *mut _) };
+}