new file mode 100644
--- /dev/null
+++ b/media/audioipc/Cargo.toml
@@ -0,0 +1,2 @@
+[workspace]
+members = ["audioipc", "client", "server"]
new file mode 100644
--- /dev/null
+++ b/media/audioipc/README.md
@@ -0,0 +1,1 @@
+# Cubeb Audio Remoting Prototype
new file mode 100644
--- /dev/null
+++ b/media/audioipc/audioipc/Cargo.toml
@@ -0,0 +1,20 @@
+[package]
+name = "audioipc"
+version = "0.1.0"
+authors = [
+ "Matthew Gregan <kinetik@flim.org>",
+ "Dan Glastonbury <dan.glastonbury@gmail.com>"
+ ]
+description = "Remote Cubeb IPC"
+
+[dependencies]
+error-chain = "0.10.0"
+log = "^0.3.6"
+serde = "1.*.*"
+serde_derive = "1.*.*"
+bincode = "0.8"
+libc = "0.2"
+mio = "0.6.7"
+cubeb-core = { git = "https://github.com/djg/cubeb-rs", version="^0.1" }
+byteorder = "1"
+memmap = "0.5.2"
new file mode 100644
--- /dev/null
+++ b/media/audioipc/audioipc/src/connection.rs
@@ -0,0 +1,199 @@
+use bincode::{self, deserialize, serialize};
+use errors::*;
+use msg;
+use mio::{Poll, PollOpt, Ready, Token};
+use mio::event::Evented;
+use mio::unix::EventedFd;
+use serde::de::DeserializeOwned;
+use serde::ser::Serialize;
+use std::fmt::Debug;
+use std::io::{self, Read};
+use std::os::unix::io::{AsRawFd, RawFd};
+use std::os::unix::net;
+use std::os::unix::prelude::*;
+use libc;
+use byteorder::{LittleEndian, ReadBytesExt, WriteBytesExt};
+
+pub trait RecvFd {
+ fn recv_fd(&mut self, bytes: &mut [u8]) -> io::Result<(usize, Option<RawFd>)>;
+}
+
+pub trait SendFd {
+ fn send_fd<FD: IntoRawFd>(&mut self, bytes: &[u8], fd: Option<FD>) -> io::Result<(usize)>;
+}
+
+// Because of the trait implementation rules in Rust, this needs to be
+// a wrapper class to allow implementation of a trait from another
+// crate on a struct from yet another crate.
+//
+// This class is effectively net::UnixStream.
+
+#[derive(Debug)]
+pub struct Connection {
+ stream: net::UnixStream
+}
+
+impl Connection {
+ pub fn new(stream: net::UnixStream) -> Connection {
+ info!("Create new connection");
+ Connection {
+ stream: stream
+ }
+ }
+
+ /// Creates an unnamed pair of connected sockets.
+ ///
+ /// Returns two `Connection`s which are connected to each other.
+ ///
+ /// # Examples
+ ///
+ /// ```no_run
+ /// use audioipc::Connection;
+ ///
+ /// let (conn1, conn2) = match Connection::pair() {
+ /// Ok((conn1, conn2)) => (conn1, conn2),
+ /// Err(e) => {
+ /// println!("Couldn't create a pair of connections: {:?}", e);
+ /// return
+ /// }
+ /// };
+ /// ```
+ pub fn pair() -> io::Result<(Connection, Connection)> {
+ let (s1, s2) = net::UnixStream::pair()?;
+ Ok((
+ Connection {
+ stream: s1
+ },
+ Connection {
+ stream: s2
+ }
+ ))
+ }
+
+ pub fn receive<RT>(&mut self) -> Result<RT>
+ where
+ RT: DeserializeOwned + Debug,
+ {
+ match self.receive_with_fd() {
+ Ok((r, None)) => Ok(r),
+ Ok((_, Some(_))) => panic!("unexpected fd received"),
+ Err(e) => Err(e),
+ }
+ }
+
+ pub fn receive_with_fd<RT>(&mut self) -> Result<(RT, Option<RawFd>)>
+ where
+ RT: DeserializeOwned + Debug,
+ {
+ // TODO: Check deserialize_from and serialize_into.
+ let mut encoded = vec![0; 32 * 1024]; // TODO: Get max size from bincode, or at least assert.
+ // TODO: Read until block, EOF, or error.
+ // TODO: Switch back to recv_fd.
+ match self.stream.recv_fd(&mut encoded) {
+ Ok((0, _)) => Err(ErrorKind::Disconnected.into()),
+ // TODO: Handle partial read?
+ Ok((n, fd)) => {
+ let r = deserialize(&encoded[..n]);
+ debug!("receive {:?}", r);
+ match r {
+ Ok(r) => Ok((r, fd)),
+ Err(e) => Err(e).chain_err(|| "Failed to deserialize message"),
+ }
+ },
+ // TODO: Handle dropped message.
+ // Err(ref e) if e.kind() == std::io::ErrorKind::WouldBlock => panic!("wouldblock"),
+ _ => bail!("socket write"),
+ }
+ }
+
+ pub fn send<ST>(&mut self, msg: ST) -> Result<usize>
+ where
+ ST: Serialize + Debug,
+ {
+ self.send_with_fd::<ST, Connection>(msg, None)
+ }
+
+ pub fn send_with_fd<ST, FD>(&mut self, msg: ST, fd_to_send: Option<FD>) -> Result<usize>
+ where
+ ST: Serialize + Debug,
+ FD: IntoRawFd + Debug,
+ {
+ let encoded: Vec<u8> = serialize(&msg, bincode::Infinite)?;
+ info!("send_with_fd {:?}, {:?}", msg, fd_to_send);
+ self.stream.send_fd(&encoded, fd_to_send).chain_err(
+ || "Failed to send message with fd"
+ )
+ }
+}
+
+impl Evented for Connection {
+ fn register(&self, poll: &Poll, token: Token, events: Ready, opts: PollOpt) -> io::Result<()> {
+ EventedFd(&self.stream.as_raw_fd()).register(poll, token, events, opts)
+ }
+
+ fn reregister(&self, poll: &Poll, token: Token, events: Ready, opts: PollOpt) -> io::Result<()> {
+ EventedFd(&self.stream.as_raw_fd()).reregister(poll, token, events, opts)
+ }
+
+ fn deregister(&self, poll: &Poll) -> io::Result<()> {
+ EventedFd(&self.stream.as_raw_fd()).deregister(poll)
+ }
+}
+
+impl Read for Connection {
+ fn read(&mut self, bytes: &mut [u8]) -> io::Result<usize> {
+ self.stream.read(bytes)
+ }
+}
+
+// TODO: Is this required?
+impl<'a> Read for &'a Connection {
+ fn read(&mut self, bytes: &mut [u8]) -> io::Result<usize> {
+ (&self.stream).read(bytes)
+ }
+}
+
+impl RecvFd for net::UnixStream {
+ fn recv_fd(&mut self, buf_to_recv: &mut [u8]) -> io::Result<(usize, Option<RawFd>)> {
+ let length = self.read_u32::<LittleEndian>()?;
+
+ msg::recvmsg(self.as_raw_fd(), &mut buf_to_recv[..length as usize])
+ }
+}
+
+impl RecvFd for Connection {
+ fn recv_fd(&mut self, buf_to_recv: &mut [u8]) -> io::Result<(usize, Option<RawFd>)> {
+ self.stream.recv_fd(buf_to_recv)
+ }
+}
+
+impl FromRawFd for Connection {
+ unsafe fn from_raw_fd(fd: RawFd) -> Connection {
+ Connection {
+ stream: net::UnixStream::from_raw_fd(fd)
+ }
+ }
+}
+
+impl IntoRawFd for Connection {
+ fn into_raw_fd(self) -> RawFd {
+ self.stream.into_raw_fd()
+ }
+}
+
+impl SendFd for net::UnixStream {
+ fn send_fd<FD: IntoRawFd>(&mut self, buf_to_send: &[u8], fd_to_send: Option<FD>) -> io::Result<usize> {
+ self.write_u32::<LittleEndian>(buf_to_send.len() as u32)?;
+
+ let fd_to_send = fd_to_send.map(|fd| fd.into_raw_fd());
+ let r = msg::sendmsg(self.as_raw_fd(), buf_to_send, fd_to_send);
+ fd_to_send.map(|fd| unsafe { libc::close(fd) });
+ r
+ }
+}
+
+impl SendFd for Connection {
+ fn send_fd<FD: IntoRawFd>(&mut self, buf_to_send: &[u8], fd_to_send: Option<FD>) -> io::Result<usize> {
+ self.stream.send_fd(buf_to_send, fd_to_send)
+ }
+}
new file mode 100644
--- /dev/null
+++ b/media/audioipc/audioipc/src/errors.rs
@@ -0,0 +1,17 @@
+use bincode;
+use cubeb_core;
+use std;
+
+error_chain! {
+ // Maybe replace with chain_err to improve the error info.
+ foreign_links {
+ Bincode(bincode::Error);
+ Io(std::io::Error);
+ Cubeb(cubeb_core::Error);
+ }
+
+ // Replace bail!(str) with explicit errors.
+ errors {
+ Disconnected
+ }
+}
new file mode 100644
--- /dev/null
+++ b/media/audioipc/audioipc/src/lib.rs
@@ -0,0 +1,51 @@
+// Copyright © 2017 Mozilla Foundation
+//
+// This program is made available under an ISC-style license. See the
+// accompanying file LICENSE for details
+#![allow(dead_code)] // TODO: Remove.
+
+#![recursion_limit = "1024"]
+#[macro_use]
+extern crate error_chain;
+
+#[macro_use]
+extern crate log;
+
+#[macro_use]
+extern crate serde_derive;
+extern crate serde;
+extern crate bincode;
+
+extern crate mio;
+
+extern crate cubeb_core;
+
+extern crate libc;
+extern crate byteorder;
+
+extern crate memmap;
+
+mod connection;
+pub mod errors;
+pub mod messages;
+mod msg;
+pub mod shm;
+
+pub use connection::*;
+pub use messages::{ClientMessage, ServerMessage};
+use std::env::temp_dir;
+use std::path::PathBuf;
+
+fn get_temp_path(name: &str) -> PathBuf {
+ let mut path = temp_dir();
+ path.push(name);
+ path
+}
+
+pub fn get_uds_path() -> PathBuf {
+ get_temp_path("cubeb-sock")
+}
+
+pub fn get_shm_path(dir: &str) -> PathBuf {
+ get_temp_path(&format!("cubeb-shm-{}", dir))
+}
new file mode 100644
--- /dev/null
+++ b/media/audioipc/audioipc/src/messages.rs
@@ -0,0 +1,249 @@
+// Copyright © 2017 Mozilla Foundation
+//
+// This program is made available under an ISC-style license. See the
+// accompanying file LICENSE for details
+
+use cubeb_core::{self, ffi};
+use std::ffi::{CStr, CString};
+use std::os::raw::c_char;
+use std::ptr;
+
+#[derive(Debug, Serialize, Deserialize)]
+pub struct Device {
+ pub output_name: Option<Vec<u8>>,
+ pub input_name: Option<Vec<u8>>
+}
+
+impl<'a> From<cubeb_core::Device<'a>> for Device {
+ fn from(info: cubeb_core::Device) -> Self {
+ Self {
+ output_name: info.output_name_bytes().map(|s| s.to_vec()),
+ input_name: info.input_name_bytes().map(|s| s.to_vec())
+ }
+ }
+}
+
+impl From<ffi::cubeb_device> for Device {
+ fn from(info: ffi::cubeb_device) -> Self {
+ Self {
+ output_name: dup_str(info.output_name),
+ input_name: dup_str(info.input_name)
+ }
+ }
+}
+
+impl From<Device> for ffi::cubeb_device {
+ fn from(info: Device) -> Self {
+ Self {
+ output_name: opt_str(info.output_name),
+ input_name: opt_str(info.input_name)
+ }
+ }
+}
+
+#[derive(Debug, Serialize, Deserialize)]
+pub struct DeviceInfo {
+ pub devid: usize,
+ pub device_id: Option<Vec<u8>>,
+ pub friendly_name: Option<Vec<u8>>,
+ pub group_id: Option<Vec<u8>>,
+ pub vendor_name: Option<Vec<u8>>,
+
+ pub device_type: ffi::cubeb_device_type,
+ pub state: ffi::cubeb_device_state,
+ pub preferred: ffi::cubeb_device_pref,
+
+ pub format: ffi::cubeb_device_fmt,
+ pub default_format: ffi::cubeb_device_fmt,
+ pub max_channels: u32,
+ pub default_rate: u32,
+ pub max_rate: u32,
+ pub min_rate: u32,
+
+ pub latency_lo: u32,
+ pub latency_hi: u32
+}
+
+impl<'a> From<&'a ffi::cubeb_device_info> for DeviceInfo {
+ fn from(info: &'a ffi::cubeb_device_info) -> Self {
+ DeviceInfo {
+ devid: info.devid as _,
+ device_id: dup_str(info.device_id),
+ friendly_name: dup_str(info.friendly_name),
+ group_id: dup_str(info.group_id),
+ vendor_name: dup_str(info.vendor_name),
+
+ device_type: info.device_type,
+ state: info.state,
+ preferred: info.preferred,
+
+ format: info.format,
+ default_format: info.default_format,
+ max_channels: info.max_channels,
+ default_rate: info.default_rate,
+ max_rate: info.max_rate,
+ min_rate: info.min_rate,
+
+ latency_lo: info.latency_lo,
+ latency_hi: info.latency_hi
+ }
+ }
+}
+
+impl From<DeviceInfo> for ffi::cubeb_device_info {
+ fn from(info: DeviceInfo) -> Self {
+ ffi::cubeb_device_info {
+ devid: info.devid as _,
+ device_id: opt_str(info.device_id),
+ friendly_name: opt_str(info.friendly_name),
+ group_id: opt_str(info.group_id),
+ vendor_name: opt_str(info.vendor_name),
+
+ device_type: info.device_type,
+ state: info.state,
+ preferred: info.preferred,
+
+ format: info.format,
+ default_format: info.default_format,
+ max_channels: info.max_channels,
+ default_rate: info.default_rate,
+ max_rate: info.max_rate,
+ min_rate: info.min_rate,
+
+ latency_lo: info.latency_lo,
+ latency_hi: info.latency_hi
+ }
+ }
+}
+
+#[derive(Debug, Serialize, Deserialize)]
+pub struct StreamParams {
+ pub format: u32,
+ pub rate: u16,
+ pub channels: u8,
+ pub layout: i32
+}
+
+impl<'a> From<&'a ffi::cubeb_stream_params> for StreamParams {
+ fn from(params: &'a ffi::cubeb_stream_params) -> Self {
+ assert!(params.channels <= u8::max_value() as u32);
+
+ StreamParams {
+ format: params.format,
+ rate: params.rate as u16,
+ channels: params.channels as u8,
+ layout: params.layout
+ }
+ }
+}
+
+impl<'a> From<&'a StreamParams> for ffi::cubeb_stream_params {
+ fn from(params: &StreamParams) -> Self {
+ ffi::cubeb_stream_params {
+ format: params.format,
+ rate: params.rate as u32,
+ channels: params.channels as u32,
+ layout: params.layout
+ }
+ }
+}
+
+#[derive(Debug, Serialize, Deserialize)]
+pub struct StreamInitParams {
+ pub stream_name: Option<Vec<u8>>,
+ pub input_device: usize,
+ pub input_stream_params: Option<StreamParams>,
+ pub output_device: usize,
+ pub output_stream_params: Option<StreamParams>,
+ pub latency_frames: u32
+}
+
+fn dup_str(s: *const c_char) -> Option<Vec<u8>> {
+ if s.is_null() {
+ None
+ } else {
+ let vec: Vec<u8> = unsafe { CStr::from_ptr(s) }.to_bytes().to_vec();
+ Some(vec)
+ }
+}
+
+fn opt_str(v: Option<Vec<u8>>) -> *const c_char {
+ match v {
+ Some(v) => {
+ match CString::new(v) {
+ Ok(s) => s.into_raw(),
+ Err(_) => {
+ debug!("Failed to convert bytes to CString");
+ ptr::null()
+ },
+ }
+ },
+ None => ptr::null(),
+ }
+}
+
+// Client -> Server messages.
+// TODO: Callbacks should be different messages types so
+// ServerConn::process_msg doesn't have a catch-all case.
+#[derive(Debug, Serialize, Deserialize)]
+pub enum ServerMessage {
+ ClientConnect,
+ ClientDisconnect,
+
+ ContextGetBackendId,
+ ContextGetMaxChannelCount,
+ ContextGetMinLatency(StreamParams),
+ ContextGetPreferredSampleRate,
+ ContextGetPreferredChannelLayout,
+ ContextGetDeviceEnumeration(ffi::cubeb_device_type),
+
+ StreamInit(StreamInitParams),
+ StreamDestroy(usize),
+
+ StreamStart(usize),
+ StreamStop(usize),
+ StreamResetDefaultDevice(usize),
+ StreamGetPosition(usize),
+ StreamGetLatency(usize),
+ StreamSetVolume(usize, f32),
+ StreamSetPanning(usize, f32),
+ StreamGetCurrentDevice(usize),
+
+ StreamDataCallback(isize)
+}
+
+// Server -> Client messages.
+// TODO: Streams need id.
+#[derive(Debug, Serialize, Deserialize)]
+pub enum ClientMessage {
+ ClientConnected,
+ ClientDisconnected,
+
+ ContextBackendId(),
+ ContextMaxChannelCount(u32),
+ ContextMinLatency(u32),
+ ContextPreferredSampleRate(u32),
+ ContextPreferredChannelLayout(ffi::cubeb_channel_layout),
+ ContextEnumeratedDevices(Vec<DeviceInfo>),
+
+ StreamCreated(usize), /*(RawFd)*/
+ StreamCreatedInputShm, /*(RawFd)*/
+ StreamCreatedOutputShm, /*(RawFd)*/
+ StreamDestroyed,
+
+ StreamStarted,
+ StreamStopped,
+ StreamDefaultDeviceReset,
+ StreamPosition(u64),
+ StreamLatency(u32),
+ StreamVolumeSet,
+ StreamPanningSet,
+ StreamCurrentDevice(Device),
+
+ StreamDataCallback(isize, usize),
+ StreamStateCallback(ffi::cubeb_state),
+
+ ContextError(ffi::cubeb_error_code),
+ StreamError, /*(Error)*/
+ ClientError /*(Error)*/
+}
new file mode 100644
--- /dev/null
+++ b/media/audioipc/audioipc/src/msg.rs
@@ -0,0 +1,105 @@
+use libc;
+use std::io;
+use std::mem;
+use std::ptr;
+use std::os::unix::io::RawFd;
+use std;
+
+// Note: The following fields must be laid out together, the OS expects them
+// to be part of a single allocation.
+#[repr(C)]
+struct CmsgSpace {
+ cmsghdr: libc::cmsghdr,
+ data: libc::c_int,
+}
+
+unsafe fn sendmsg_retry(fd: libc::c_int, msg: *const libc::msghdr, flags: libc::c_int) -> libc::ssize_t {
+ loop {
+ let r = libc::sendmsg(fd, msg, flags);
+ if r == -1 && io::Error::last_os_error().raw_os_error().unwrap() == libc::EAGAIN {
+ std::thread::yield_now();
+ continue;
+ }
+ return r;
+ }
+}
+
+pub fn sendmsg(fd: RawFd, to_send: &[u8], fd_to_send: Option<RawFd>) -> io::Result<usize> {
+ let mut msghdr: libc::msghdr = unsafe { mem::zeroed() };
+ let mut iovec: libc::iovec = unsafe { mem::zeroed() };
+ let mut cmsg: CmsgSpace = unsafe { mem::zeroed() };
+
+ msghdr.msg_iov = &mut iovec as *mut _;
+ msghdr.msg_iovlen = 1;
+ if fd_to_send.is_some() {
+ msghdr.msg_control = &mut cmsg.cmsghdr as *mut _ as *mut _;
+ msghdr.msg_controllen = mem::size_of::<CmsgSpace>() as _;
+ }
+
+ iovec.iov_base = if to_send.is_empty() {
+ // Empty Vecs have a non-null pointer.
+ ptr::null_mut()
+ } else {
+ to_send.as_ptr() as *const _ as *mut _
+ };
+ iovec.iov_len = to_send.len();
+
+ cmsg.cmsghdr.cmsg_len = msghdr.msg_controllen;
+ cmsg.cmsghdr.cmsg_level = libc::SOL_SOCKET;
+ cmsg.cmsghdr.cmsg_type = libc::SCM_RIGHTS;
+
+ cmsg.data = fd_to_send.unwrap_or(-1);
+
+ let result = unsafe { sendmsg_retry(fd, &msghdr, 0) };
+ if result >= 0 {
+ Ok(result as usize)
+ } else {
+ Err(io::Error::last_os_error())
+ }
+}
+
+unsafe fn recvmsg_retry(fd: libc::c_int, msg: *mut libc::msghdr, flags: libc::c_int) -> libc::ssize_t {
+ loop {
+ let r = libc::recvmsg(fd, msg, flags);
+ if r == -1 && io::Error::last_os_error().raw_os_error().unwrap() == libc::EAGAIN {
+ std::thread::yield_now();
+ continue;
+ }
+ return r;
+ }
+}
+
+pub fn recvmsg(fd: RawFd, to_recv: &mut [u8]) -> io::Result<(usize, Option<RawFd>)> {
+ let mut msghdr: libc::msghdr = unsafe { mem::zeroed() };
+ let mut iovec: libc::iovec = unsafe { mem::zeroed() };
+ let mut cmsg: CmsgSpace = unsafe { mem::zeroed() };
+
+ msghdr.msg_iov = &mut iovec as *mut _;
+ msghdr.msg_iovlen = 1;
+ msghdr.msg_control = &mut cmsg.cmsghdr as *mut _ as *mut _;
+ msghdr.msg_controllen = mem::size_of::<CmsgSpace>() as _;
+
+ iovec.iov_base = if to_recv.is_empty() {
+ // Empty Vecs have a non-null pointer.
+ ptr::null_mut()
+ } else {
+ to_recv.as_ptr() as *const _ as *mut _
+ };
+ iovec.iov_len = to_recv.len();
+
+ let result = unsafe { recvmsg_retry(fd, &mut msghdr, 0) };
+ if result >= 0 {
+ let fd = if msghdr.msg_controllen == mem::size_of::<CmsgSpace>() as _ &&
+ cmsg.cmsghdr.cmsg_len == mem::size_of::<CmsgSpace>() as _ &&
+ cmsg.cmsghdr.cmsg_level == libc::SOL_SOCKET &&
+ cmsg.cmsghdr.cmsg_type == libc::SCM_RIGHTS {
+ Some(cmsg.data)
+ } else {
+ None
+ };
+
+ Ok((result as usize, fd))
+ } else {
+ Err(io::Error::last_os_error())
+ }
+}
new file mode 100644
--- /dev/null
+++ b/media/audioipc/audioipc/src/shm.rs
@@ -0,0 +1,133 @@
+use std::path::Path;
+use std::fs::{remove_file, OpenOptions, File};
+use std::sync::atomic;
+use memmap::{Mmap, Protection};
+use errors::*;
+
+pub struct SharedMemReader {
+ mmap: Mmap,
+}
+
+impl SharedMemReader {
+ pub fn new(path: &Path, size: usize) -> Result<(SharedMemReader, File)> {
+ let file = OpenOptions::new().read(true)
+ .write(true)
+ .create_new(true)
+ .open(path)?;
+ let _ = remove_file(path);
+ file.set_len(size as u64)?;
+ let mmap = Mmap::open(&file, Protection::Read)?;
+ assert_eq!(mmap.len(), size);
+ Ok((SharedMemReader {
+ mmap
+ }, file))
+ }
+
+ pub fn read(&self, buf: &mut [u8]) -> Result<()> {
+ if buf.is_empty() {
+ return Ok(());
+ }
+ // TODO: Track how much is in the shm area.
+ if buf.len() <= self.mmap.len() {
+ atomic::fence(atomic::Ordering::Acquire);
+ unsafe {
+ let len = buf.len();
+ buf.copy_from_slice(&self.mmap.as_slice()[..len]);
+ }
+ Ok(())
+ } else {
+ bail!("mmap size");
+ }
+ }
+}
+
+pub struct SharedMemSlice {
+ mmap: Mmap,
+}
+
+impl SharedMemSlice {
+ pub fn from(file: File, size: usize) -> Result<SharedMemSlice> {
+ let mmap = Mmap::open(&file, Protection::Read)?;
+ assert_eq!(mmap.len(), size);
+ Ok(SharedMemSlice {
+ mmap
+ })
+ }
+
+ pub fn get_slice(&self, size: usize) -> Result<&[u8]> {
+ if size == 0 {
+ return Ok(&[]);
+ }
+ // TODO: Track how much is in the shm area.
+ if size <= self.mmap.len() {
+ atomic::fence(atomic::Ordering::Acquire);
+ let buf = unsafe { &self.mmap.as_slice()[..size] };
+ Ok(buf)
+ } else {
+ bail!("mmap size");
+ }
+ }
+}
+
+pub struct SharedMemWriter {
+ mmap: Mmap,
+}
+
+impl SharedMemWriter {
+ pub fn new(path: &Path, size: usize) -> Result<(SharedMemWriter, File)> {
+ let file = OpenOptions::new().read(true)
+ .write(true)
+ .create_new(true)
+ .open(path)?;
+ let _ = remove_file(path);
+ file.set_len(size as u64)?;
+ let mmap = Mmap::open(&file, Protection::ReadWrite)?;
+ Ok((SharedMemWriter {
+ mmap
+ }, file))
+ }
+
+ pub fn write(&mut self, buf: &[u8]) -> Result<()> {
+ if buf.is_empty() {
+ return Ok(());
+ }
+ // TODO: Track how much is in the shm area.
+ if buf.len() <= self.mmap.len() {
+ unsafe {
+ self.mmap.as_mut_slice()[..buf.len()].copy_from_slice(buf);
+ }
+ atomic::fence(atomic::Ordering::Release);
+ Ok(())
+ } else {
+ bail!("mmap size");
+ }
+ }
+}
+
+pub struct SharedMemMutSlice {
+ mmap: Mmap,
+}
+
+impl SharedMemMutSlice {
+ pub fn from(file: File, size: usize) -> Result<SharedMemMutSlice> {
+ let mmap = Mmap::open(&file, Protection::ReadWrite)?;
+ assert_eq!(mmap.len(), size);
+ Ok(SharedMemMutSlice {
+ mmap
+ })
+ }
+
+ pub fn get_mut_slice(&mut self, size: usize) -> Result<&mut [u8]> {
+ if size == 0 {
+ return Ok(&mut []);
+ }
+ // TODO: Track how much is in the shm area.
+ if size <= self.mmap.len() {
+ let buf = unsafe { &mut self.mmap.as_mut_slice()[..size] };
+ atomic::fence(atomic::Ordering::Release);
+ Ok(buf)
+ } else {
+ bail!("mmap size");
+ }
+ }
+}
new file mode 100644
--- /dev/null
+++ b/media/audioipc/client/Cargo.toml
@@ -0,0 +1,11 @@
+[package]
+name = "audioipc-client"
+version = "0.1.0"
+authors = ["Dan Glastonbury <dan.glastonbury@gmail.com>"]
+description = "Cubeb Backend for talking to remote cubeb server."
+
+[dependencies]
+audioipc = { path="../audioipc" }
+cubeb-core = { git="https://github.com/djg/cubeb-rs", version="^0.1" }
+cubeb-backend = { git="https://github.com/djg/cubeb-rs", version="^0.2" }
+log = "^0.3.6"
new file mode 100644
--- /dev/null
+++ b/media/audioipc/client/src/context.rs
@@ -0,0 +1,170 @@
+// Copyright © 2017 Mozilla Foundation
+//
+// This program is made available under an ISC-style license. See the
+// accompanying file LICENSE for details
+
+use ClientStream;
+use audioipc::{self, ClientMessage, Connection, ServerMessage, messages};
+use cubeb_backend::{Context, Ops};
+use cubeb_core::{DeviceId, DeviceType, Error, Result, StreamParams, ffi};
+use cubeb_core::binding::Binding;
+use std::ffi::{CStr, CString};
+use std::mem;
+use std::os::raw::c_void;
+use std::os::unix::net::UnixStream;
+use std::sync::{Mutex, MutexGuard};
+use stream;
+
+#[derive(Debug)]
+pub struct ClientContext {
+ _ops: *const Ops,
+ connection: Mutex<Connection>
+}
+
+macro_rules! t(
+ ($e:expr) => (
+ match $e {
+ Ok(e) => e,
+ Err(_) => return Err(Error::default())
+ }
+ ));
+
+pub const CLIENT_OPS: Ops = capi_new!(ClientContext, ClientStream);
+
+impl ClientContext {
+ #[doc(hidden)]
+ pub fn conn(&self) -> MutexGuard<Connection> {
+ self.connection.lock().unwrap()
+ }
+}
+
+impl Context for ClientContext {
+ fn init(_context_name: Option<&CStr>) -> Result<*mut ffi::cubeb> {
+ // TODO: encapsulate connect, etc inside audioipc.
+ let stream = t!(UnixStream::connect(audioipc::get_uds_path()));
+ let ctx = Box::new(ClientContext {
+ _ops: &CLIENT_OPS as *const _,
+ connection: Mutex::new(Connection::new(stream))
+ });
+ Ok(Box::into_raw(ctx) as *mut _)
+ }
+
+ fn backend_id(&self) -> &'static CStr {
+ unsafe { CStr::from_ptr(b"remote\0".as_ptr() as *const _) }
+ }
+
+ fn max_channel_count(&self) -> Result<u32> {
+ send_recv!(self.conn(), ContextGetMaxChannelCount => ContextMaxChannelCount())
+ }
+
+ fn min_latency(&self, params: &StreamParams) -> Result<u32> {
+ let params = messages::StreamParams::from(unsafe { &*params.raw() });
+ send_recv!(self.conn(), ContextGetMinLatency(params) => ContextMinLatency())
+ }
+
+ fn preferred_sample_rate(&self) -> Result<u32> {
+ send_recv!(self.conn(), ContextGetPreferredSampleRate => ContextPreferredSampleRate())
+ }
+
+ fn preferred_channel_layout(&self) -> Result<ffi::cubeb_channel_layout> {
+ send_recv!(self.conn(), ContextGetPreferredChannelLayout => ContextPreferredChannelLayout())
+ }
+
+ fn enumerate_devices(&self, devtype: DeviceType) -> Result<ffi::cubeb_device_collection> {
+ let v: Vec<ffi::cubeb_device_info> =
+ match send_recv!(self.conn(), ContextGetDeviceEnumeration(devtype.bits()) => ContextEnumeratedDevices()) {
+ Ok(mut v) => v.drain(..).map(|i| i.into()).collect(),
+ Err(e) => return Err(e),
+ };
+ let vs = v.into_boxed_slice();
+ let coll = ffi::cubeb_device_collection {
+ count: vs.len(),
+ device: vs.as_ptr()
+ };
+ // Giving away the memory owned by vs. Don't free it!
+ // Reclaimed in `device_collection_destroy`.
+ mem::forget(vs);
+ Ok(coll)
+ }
+
+ fn device_collection_destroy(&self, collection: *mut ffi::cubeb_device_collection) {
+ unsafe {
+ let coll = &*collection;
+ let mut devices = Vec::from_raw_parts(
+ coll.device as *mut ffi::cubeb_device_info,
+ coll.count,
+ coll.count
+ );
+ for dev in devices.iter_mut() {
+ if !dev.device_id.is_null() {
+ let _ = CString::from_raw(dev.device_id as *mut _);
+ }
+ if !dev.group_id.is_null() {
+ let _ = CString::from_raw(dev.group_id as *mut _);
+ }
+ if !dev.vendor_name.is_null() {
+ let _ = CString::from_raw(dev.vendor_name as *mut _);
+ }
+ if !dev.friendly_name.is_null() {
+ let _ = CString::from_raw(dev.friendly_name as *mut _);
+ }
+ }
+ }
+ }
+
+ fn stream_init(
+ &self,
+ stream_name: Option<&CStr>,
+ input_device: DeviceId,
+ input_stream_params: Option<&ffi::cubeb_stream_params>,
+ output_device: DeviceId,
+ output_stream_params: Option<&ffi::cubeb_stream_params>,
+ latency_frame: u32,
+ // These params aren't sent to the server
+ data_callback: ffi::cubeb_data_callback,
+ state_callback: ffi::cubeb_state_callback,
+ user_ptr: *mut c_void,
+ ) -> Result<*mut ffi::cubeb_stream> {
+
+ fn opt_stream_params(p: Option<&ffi::cubeb_stream_params>) -> Option<messages::StreamParams> {
+ match p {
+ Some(raw) => Some(messages::StreamParams::from(raw)),
+ None => None,
+ }
+ }
+
+ let stream_name = match stream_name {
+ Some(s) => Some(s.to_bytes().to_vec()),
+ None => None,
+ };
+
+ let input_stream_params = opt_stream_params(input_stream_params);
+ let output_stream_params = opt_stream_params(output_stream_params);
+
+ let init_params = messages::StreamInitParams {
+ stream_name: stream_name,
+ input_device: input_device.raw() as _,
+ input_stream_params: input_stream_params,
+ output_device: output_device.raw() as _,
+ output_stream_params: output_stream_params,
+ latency_frames: latency_frame
+ };
+ stream::init(&self, init_params, data_callback, state_callback, user_ptr)
+ }
+
+ fn register_device_collection_changed(
+ &self,
+ _dev_type: DeviceType,
+ _collection_changed_callback: ffi::cubeb_device_collection_changed_callback,
+ _user_ptr: *mut c_void,
+ ) -> Result<()> {
+ Ok(())
+ }
+}
+
+impl Drop for ClientContext {
+ fn drop(&mut self) {
+ info!("ClientContext drop...");
+ let _: Result<()> = send_recv!(self.conn(), ClientDisconnect => ClientDisconnected);
+ }
+}
new file mode 100644
--- /dev/null
+++ b/media/audioipc/client/src/lib.rs
@@ -0,0 +1,28 @@
+// Copyright © 2017 Mozilla Foundation
+//
+// This program is made available under an ISC-style license. See the
+// accompanying file LICENSE for details.
+
+extern crate audioipc;
+extern crate cubeb_core;
+#[macro_use]
+extern crate cubeb_backend;
+#[macro_use]
+extern crate log;
+
+#[macro_use]
+mod send_recv;
+mod context;
+mod stream;
+
+use context::ClientContext;
+use cubeb_backend::capi;
+use cubeb_core::ffi;
+use std::os::raw::{c_char, c_int};
+use stream::ClientStream;
+
+#[no_mangle]
+/// Entry point from C code.
+pub unsafe extern "C" fn audioipc_client_init(c: *mut *mut ffi::cubeb, context_name: *const c_char) -> c_int {
+ capi::capi_init::<ClientContext>(c, context_name)
+}
new file mode 100644
--- /dev/null
+++ b/media/audioipc/client/src/send_recv.rs
@@ -0,0 +1,42 @@
+#[macro_export]
+macro_rules! send_recv {
+ ($conn:expr, $smsg:ident => $rmsg:ident) => {{
+ send_recv!(__send $conn, $smsg);
+ send_recv!(__recv $conn, $rmsg)
+ }};
+ ($conn:expr, $smsg:ident => $rmsg:ident()) => {{
+ send_recv!(__send $conn, $smsg);
+ send_recv!(__recv $conn, $rmsg __result)
+ }};
+ ($conn:expr, $smsg:ident($($a:expr),*) => $rmsg:ident) => {{
+ send_recv!(__send $conn, $smsg, $($a),*);
+ send_recv!(__recv $conn, $rmsg)
+ }};
+ ($conn:expr, $smsg:ident($($a:expr),*) => $rmsg:ident()) => {{
+ send_recv!(__send $conn, $smsg, $($a),*);
+ send_recv!(__recv $conn, $rmsg __result)
+ }};
+ //
+ (__send $conn:expr, $smsg:ident) => (
+ $conn.send(ServerMessage::$smsg)
+ .unwrap();
+ );
+ (__send $conn:expr, $smsg:ident, $($a:expr),*) => (
+ $conn.send(ServerMessage::$smsg($($a),*))
+ .unwrap();
+ );
+ (__recv $conn:expr, $rmsg:ident) => (
+ if let ClientMessage::$rmsg = $conn.receive().unwrap() {
+ Ok(())
+ } else {
+ panic!("wrong message received");
+ }
+ );
+ (__recv $conn:expr, $rmsg:ident __result) => (
+ if let ClientMessage::$rmsg(v) = $conn.receive().unwrap() {
+ Ok(v)
+ } else {
+ panic!("wrong message received");
+ }
+ )
+}
new file mode 100644
--- /dev/null
+++ b/media/audioipc/client/src/stream.rs
@@ -0,0 +1,239 @@
+// Copyright © 2017 Mozilla Foundation
+//
+// This program is made available under an ISC-style license. See the
+// accompanying file LICENSE for details
+
+use ClientContext;
+use audioipc::{ClientMessage, Connection, ServerMessage, messages};
+use audioipc::shm::{SharedMemSlice, SharedMemMutSlice};
+use cubeb_backend::Stream;
+use cubeb_core::{ErrorCode, Result, ffi};
+use std::ffi::CString;
+use std::os::raw::c_void;
+use std::os::unix::io::FromRawFd;
+use std::fs::File;
+use std::ptr;
+use std::thread;
+
+// TODO: Remove and let caller allocate based on cubeb backend requirements.
+const SHM_AREA_SIZE: usize = 2 * 1024 * 1024;
+
+pub struct ClientStream<'ctx> {
+ // This must be a reference to Context for cubeb, cubeb accesses stream methods via stream->context->ops
+ context: &'ctx ClientContext,
+ token: usize,
+ join_handle: Option<thread::JoinHandle<()>>
+}
+
+fn stream_thread(
+ mut conn: Connection,
+ input_shm: SharedMemSlice,
+ mut output_shm: SharedMemMutSlice,
+ data_cb: ffi::cubeb_data_callback,
+ state_cb: ffi::cubeb_state_callback,
+ user_ptr: usize,
+) {
+ loop {
+ let r = match conn.receive::<ClientMessage>() {
+ Ok(r) => r,
+ Err(e) => {
+ debug!("stream_thread: Failed to receive message: {:?}", e);
+ continue;
+ },
+ };
+
+ match r {
+ ClientMessage::StreamDestroyed => {
+ info!("stream_thread: Shutdown callback thread.");
+ return;
+ },
+ ClientMessage::StreamDataCallback(nframes, frame_size) => {
+ info!(
+ "stream_thread: Data Callback: nframes={} frame_size={}",
+ nframes,
+ frame_size
+ );
+ // TODO: This is proof-of-concept. Make it better.
+ let input_ptr: *const u8 = input_shm.get_slice(nframes as usize * frame_size).unwrap().as_ptr();
+ let output_ptr: *mut u8 = output_shm.get_mut_slice(nframes as usize * frame_size).unwrap().as_mut_ptr();
+ let nframes = data_cb(
+ ptr::null_mut(),
+ user_ptr as *mut c_void,
+ input_ptr as *const _,
+ output_ptr as *mut _,
+ nframes as _
+ );
+ conn.send(ServerMessage::StreamDataCallback(nframes as isize)).unwrap();
+ },
+ ClientMessage::StreamStateCallback(state) => {
+ info!("stream_thread: State Callback: {:?}", state);
+ state_cb(ptr::null_mut(), user_ptr as *mut _, state);
+ },
+ m => {
+ info!("Unexpected ClientMessage: {:?}", m);
+ },
+ }
+ }
+}
+
+impl<'ctx> ClientStream<'ctx> {
+ fn init(
+ ctx: &'ctx ClientContext,
+ init_params: messages::StreamInitParams,
+ data_callback: ffi::cubeb_data_callback,
+ state_callback: ffi::cubeb_state_callback,
+ user_ptr: *mut c_void,
+ ) -> Result<*mut ffi::cubeb_stream> {
+
+ ctx.conn()
+ .send(ServerMessage::StreamInit(init_params))
+ .unwrap();
+
+ let r = match ctx.conn().receive_with_fd::<ClientMessage>() {
+ Ok(r) => r,
+ Err(_) => return Err(ErrorCode::Error.into()),
+ };
+
+ let (token, conn) = match r {
+ (ClientMessage::StreamCreated(tok), Some(fd)) => (tok, unsafe {
+ Connection::from_raw_fd(fd)
+ }),
+ (ClientMessage::StreamCreated(_), None) => {
+ debug!("Missing fd!");
+ return Err(ErrorCode::Error.into());
+ },
+ (m, _) => {
+ debug!("Unexpected message: {:?}", m);
+ return Err(ErrorCode::Error.into());
+ },
+ };
+
+ // TODO: It'd be nicer to receive these two fds as part of
+ // StreamCreated, but that requires changing sendmsg/recvmsg to
+ // support multiple fds.
+ let r = match ctx.conn().receive_with_fd::<ClientMessage>() {
+ Ok(r) => r,
+ Err(_) => return Err(ErrorCode::Error.into()),
+ };
+
+ let input_file = match r {
+ (ClientMessage::StreamCreatedInputShm, Some(fd)) => unsafe {
+ File::from_raw_fd(fd)
+ },
+ (m, _) => {
+ debug!("Unexpected message: {:?}", m);
+ return Err(ErrorCode::Error.into());
+ },
+ };
+
+ let input_shm = SharedMemSlice::from(input_file,
+ SHM_AREA_SIZE).unwrap();
+
+ let r = match ctx.conn().receive_with_fd::<ClientMessage>() {
+ Ok(r) => r,
+ Err(_) => return Err(ErrorCode::Error.into()),
+ };
+
+ let output_file = match r {
+ (ClientMessage::StreamCreatedOutputShm, Some(fd)) => unsafe {
+ File::from_raw_fd(fd)
+ },
+ (m, _) => {
+ debug!("Unexpected message: {:?}", m);
+ return Err(ErrorCode::Error.into());
+ },
+ };
+
+ let output_shm = SharedMemMutSlice::from(output_file,
+ SHM_AREA_SIZE).unwrap();
+
+ let user_data = user_ptr as usize;
+ let join_handle = thread::spawn(move || {
+ stream_thread(conn, input_shm, output_shm, data_callback, state_callback, user_data)
+ });
+
+ Ok(Box::into_raw(Box::new(ClientStream {
+ context: ctx,
+ token: token,
+ join_handle: Some(join_handle)
+ })) as _)
+ }
+}
+
+impl<'ctx> Drop for ClientStream<'ctx> {
+ fn drop(&mut self) {
+ let _: Result<()> = send_recv!(self.context.conn(), StreamDestroy(self.token) => StreamDestroyed);
+ self.join_handle.take().unwrap().join().unwrap();
+ }
+}
+
+impl<'ctx> Stream for ClientStream<'ctx> {
+ fn start(&self) -> Result<()> {
+ send_recv!(self.context.conn(), StreamStart(self.token) => StreamStarted)
+ }
+
+ fn stop(&self) -> Result<()> {
+ send_recv!(self.context.conn(), StreamStop(self.token) => StreamStopped)
+ }
+
+ fn reset_default_device(&self) -> Result<()> {
+ send_recv!(self.context.conn(), StreamResetDefaultDevice(self.token) => StreamDefaultDeviceReset)
+ }
+
+ fn position(&self) -> Result<u64> {
+ send_recv!(self.context.conn(), StreamGetPosition(self.token) => StreamPosition())
+ }
+
+ fn latency(&self) -> Result<u32> {
+ send_recv!(self.context.conn(), StreamGetLatency(self.token) => StreamLatency())
+ }
+
+ fn set_volume(&self, volume: f32) -> Result<()> {
+ send_recv!(self.context.conn(), StreamSetVolume(self.token, volume) => StreamVolumeSet)
+ }
+
+ fn set_panning(&self, panning: f32) -> Result<()> {
+ send_recv!(self.context.conn(), StreamSetPanning(self.token, panning) => StreamPanningSet)
+ }
+
+ fn current_device(&self) -> Result<*const ffi::cubeb_device> {
+ match send_recv!(self.context.conn(), StreamGetCurrentDevice(self.token) => StreamCurrentDevice()) {
+ Ok(d) => Ok(Box::into_raw(Box::new(d.into()))),
+ Err(e) => Err(e),
+ }
+ }
+
+ fn device_destroy(&self, device: *const ffi::cubeb_device) -> Result<()> {
+ // It's all unsafe...
+ if !device.is_null() {
+ unsafe {
+ if !(*device).output_name.is_null() {
+ let _ = CString::from_raw((*device).output_name as *mut _);
+ }
+ if !(*device).input_name.is_null() {
+ let _ = CString::from_raw((*device).input_name as *mut _);
+ }
+ let _: Box<ffi::cubeb_device> = Box::from_raw(device as *mut _);
+ }
+ }
+ Ok(())
+ }
+
+ // TODO: How do we call this back? On what thread?
+ fn register_device_changed_callback(
+ &self,
+ _device_changed_callback: ffi::cubeb_device_changed_callback,
+ ) -> Result<()> {
+ Ok(())
+ }
+}
+
+pub fn init(
+ ctx: &ClientContext,
+ init_params: messages::StreamInitParams,
+ data_callback: ffi::cubeb_data_callback,
+ state_callback: ffi::cubeb_state_callback,
+ user_ptr: *mut c_void,
+) -> Result<*mut ffi::cubeb_stream> {
+ ClientStream::init(ctx, init_params, data_callback, state_callback, user_ptr)
+}
new file mode 100644
--- /dev/null
+++ b/media/audioipc/rustfmt.toml
@@ -0,0 +1,10 @@
+ideal_width = 80
+match_block_trailing_comma = true
+max_width = 120
+newline_style = "Unix"
+normalize_comments = false
+struct_lit_multiline_style = "ForceMulti"
+where_trailing_comma = true
+reorder_imports = true
+reorder_imported_names = true
+trailing_comma = "Never"
\ No newline at end of file
new file mode 100644
--- /dev/null
+++ b/media/audioipc/server/Cargo.toml
@@ -0,0 +1,17 @@
+[package]
+name = "audioipc-server"
+version = "0.1.0"
+authors = ["Dan Glastonbury <dan.glastonbury@gmail.com>"]
+description = "Remote cubeb server"
+
+[dependencies]
+audioipc = { path = "../audioipc" }
+cubeb = { git = "https://github.com/djg/cubeb-rs", version="^0.3" }
+cubeb-core = { git = "https://github.com/djg/cubeb-rs", version="^0.1" }
+error-chain = "0.10.0"
+lazycell = "^0.4"
+log = "^0.3.6"
+mio = "0.6.7"
+mio-uds = "0.6.4"
+slab = "0.3.0"
+
new file mode 100644
--- /dev/null
+++ b/media/audioipc/server/src/channel.rs
@@ -0,0 +1,251 @@
+//! Thread safe communication channel implementing `Evented`
+
+use lazycell::{AtomicLazyCell, LazyCell};
+use mio::{Evented, Poll, PollOpt, Ready, Registration, SetReadiness, Token};
+use std::{fmt, io};
+use std::any::Any;
+use std::error;
+use std::sync::{Arc, mpsc};
+use std::sync::atomic::{AtomicUsize, Ordering};
+
+pub fn ctl_pair() -> (SenderCtl, ReceiverCtl) {
+ let inner = Arc::new(Inner {
+ pending: AtomicUsize::new(0),
+ senders: AtomicUsize::new(1),
+ set_readiness: AtomicLazyCell::new()
+ });
+
+ let tx = SenderCtl {
+ inner: inner.clone()
+ };
+
+ let rx = ReceiverCtl {
+ registration: LazyCell::new(),
+ inner: inner
+ };
+
+ (tx, rx)
+}
+
+/// Tracks messages sent on a channel in order to update readiness.
+pub struct SenderCtl {
+ inner: Arc<Inner>
+}
+
+/// Tracks messages received on a channel in order to track readiness.
+pub struct ReceiverCtl {
+ registration: LazyCell<Registration>,
+ inner: Arc<Inner>
+}
+
+pub enum SendError<T> {
+ Io(io::Error),
+ Disconnected(T)
+}
+
+pub enum TrySendError<T> {
+ Io(io::Error),
+ Full(T),
+ Disconnected(T)
+}
+
+struct Inner {
+ // The number of outstanding messages for the receiver to read
+ pending: AtomicUsize,
+ // The number of sender handles
+ senders: AtomicUsize,
+ // The set readiness handle
+ set_readiness: AtomicLazyCell<SetReadiness>
+}
+
+/*
+ *
+ * ===== SenderCtl / ReceiverCtl =====
+ *
+ */
+
+impl SenderCtl {
+ /// Call to track that a message has been sent
+ pub fn inc(&self) -> io::Result<()> {
+ let cnt = self.inner.pending.fetch_add(1, Ordering::Acquire);
+
+ if 0 == cnt {
+ // Toggle readiness to readable
+ if let Some(set_readiness) = self.inner.set_readiness.borrow() {
+ try!(set_readiness.set_readiness(Ready::readable()));
+ }
+ }
+
+ Ok(())
+ }
+}
+
+impl Clone for SenderCtl {
+ fn clone(&self) -> SenderCtl {
+ self.inner.senders.fetch_add(1, Ordering::Relaxed);
+ SenderCtl {
+ inner: self.inner.clone()
+ }
+ }
+}
+
+impl Drop for SenderCtl {
+ fn drop(&mut self) {
+ if self.inner.senders.fetch_sub(1, Ordering::Release) == 1 {
+ let _ = self.inc();
+ }
+ }
+}
+
+impl Evented for ReceiverCtl {
+ fn register(&self, poll: &Poll, token: Token, interest: Ready, opts: PollOpt) -> io::Result<()> {
+ if self.registration.borrow().is_some() {
+ return Err(io::Error::new(
+ io::ErrorKind::Other,
+ "receiver already registered"
+ ));
+ }
+
+ let (registration, set_readiness) = Registration::new2();
+ try!(registration.register(poll, token, interest, opts));
+
+ if self.inner.pending.load(Ordering::Relaxed) > 0 {
+ // TODO: Don't drop readiness
+ let _ = set_readiness.set_readiness(Ready::readable());
+ }
+
+ self.registration.fill(registration).ok().expect(
+ "unexpected state encountered"
+ );
+ self.inner.set_readiness.fill(set_readiness).ok().expect(
+ "unexpected state encountered"
+ );
+
+ Ok(())
+ }
+
+ fn reregister(&self, poll: &Poll, token: Token, interest: Ready, opts: PollOpt) -> io::Result<()> {
+ match self.registration.borrow() {
+ Some(registration) => registration.reregister(poll, token, interest, opts),
+ None => Err(io::Error::new(
+ io::ErrorKind::Other,
+ "receiver not registered"
+ )),
+ }
+ }
+
+ fn deregister(&self, poll: &Poll) -> io::Result<()> {
+ match self.registration.borrow() {
+ Some(registration) => <Registration as Evented>::deregister(®istration, poll),
+ None => Err(io::Error::new(
+ io::ErrorKind::Other,
+ "receiver not registered"
+ )),
+ }
+ }
+}
+
+/*
+ *
+ * ===== Error conversions =====
+ *
+ */
+
+impl<T> From<mpsc::SendError<T>> for SendError<T> {
+ fn from(src: mpsc::SendError<T>) -> SendError<T> {
+ SendError::Disconnected(src.0)
+ }
+}
+
+impl<T> From<io::Error> for SendError<T> {
+ fn from(src: io::Error) -> SendError<T> {
+ SendError::Io(src)
+ }
+}
+
+impl<T> From<mpsc::TrySendError<T>> for TrySendError<T> {
+ fn from(src: mpsc::TrySendError<T>) -> TrySendError<T> {
+ match src {
+ mpsc::TrySendError::Full(v) => TrySendError::Full(v),
+ mpsc::TrySendError::Disconnected(v) => TrySendError::Disconnected(v),
+ }
+ }
+}
+
+impl<T> From<mpsc::SendError<T>> for TrySendError<T> {
+ fn from(src: mpsc::SendError<T>) -> TrySendError<T> {
+ TrySendError::Disconnected(src.0)
+ }
+}
+
+impl<T> From<io::Error> for TrySendError<T> {
+ fn from(src: io::Error) -> TrySendError<T> {
+ TrySendError::Io(src)
+ }
+}
+
+/*
+ *
+ * ===== Implement Error, Debug and Display for Errors =====
+ *
+ */
+
+impl<T: Any> error::Error for SendError<T> {
+ fn description(&self) -> &str {
+ match self {
+ &SendError::Io(ref io_err) => io_err.description(),
+ &SendError::Disconnected(..) => "Disconnected",
+ }
+ }
+}
+
+impl<T: Any> error::Error for TrySendError<T> {
+ fn description(&self) -> &str {
+ match self {
+ &TrySendError::Io(ref io_err) => io_err.description(),
+ &TrySendError::Full(..) => "Full",
+ &TrySendError::Disconnected(..) => "Disconnected",
+ }
+ }
+}
+
+impl<T> fmt::Debug for SendError<T> {
+ fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
+ format_send_error(self, f)
+ }
+}
+
+impl<T> fmt::Display for SendError<T> {
+ fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
+ format_send_error(self, f)
+ }
+}
+
+impl<T> fmt::Debug for TrySendError<T> {
+ fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
+ format_try_send_error(self, f)
+ }
+}
+
+impl<T> fmt::Display for TrySendError<T> {
+ fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
+ format_try_send_error(self, f)
+ }
+}
+
+#[inline]
+fn format_send_error<T>(e: &SendError<T>, f: &mut fmt::Formatter) -> fmt::Result {
+ match e {
+ &SendError::Io(ref io_err) => write!(f, "{}", io_err),
+ &SendError::Disconnected(..) => write!(f, "Disconnected"),
+ }
+}
+
+#[inline]
+fn format_try_send_error<T>(e: &TrySendError<T>, f: &mut fmt::Formatter) -> fmt::Result {
+ match e {
+ &TrySendError::Io(ref io_err) => write!(f, "{}", io_err),
+ &TrySendError::Full(..) => write!(f, "Full"),
+ &TrySendError::Disconnected(..) => write!(f, "Disconnected"),
+ }
+}
new file mode 100644
--- /dev/null
+++ b/media/audioipc/server/src/lib.rs
@@ -0,0 +1,636 @@
+#[macro_use]
+extern crate error_chain;
+
+#[macro_use]
+extern crate log;
+
+extern crate audioipc;
+extern crate cubeb;
+extern crate cubeb_core;
+extern crate lazycell;
+extern crate mio;
+extern crate mio_uds;
+extern crate slab;
+
+use audioipc::messages::{ClientMessage, DeviceInfo, ServerMessage, StreamParams};
+use audioipc::shm::{SharedMemReader, SharedMemWriter};
+use cubeb_core::binding::Binding;
+use cubeb_core::ffi;
+use mio::Token;
+use mio_uds::UnixListener;
+use std::{slice, thread};
+use std::convert::From;
+use std::os::raw::c_void;
+use std::os::unix::prelude::*;
+use std::sync::Arc;
+use std::sync::atomic::{AtomicBool, Ordering};
+
+mod channel;
+
+pub mod errors {
+ error_chain! {
+ links {
+ AudioIPC(::audioipc::errors::Error, ::audioipc::errors::ErrorKind);
+ }
+ foreign_links {
+ Cubeb(::cubeb_core::Error);
+ Io(::std::io::Error);
+ }
+ }
+}
+
+use errors::*;
+
+// TODO: Remove and let caller allocate based on cubeb backend requirements.
+const SHM_AREA_SIZE: usize = 2 * 1024 * 1024;
+
+// TODO: this should forward to the client.
+struct Callback {
+ /// Size of input frame in bytes
+ input_frame_size: u16,
+ /// Size of output frame in bytes
+ output_frame_size: u16,
+ connection: audioipc::Connection,
+ input_shm: SharedMemWriter,
+ output_shm: SharedMemReader,
+}
+
+impl cubeb::StreamCallback for Callback {
+ type Frame = u8;
+
+ fn data_callback(&mut self, input: &[u8], output: &mut [u8]) -> isize {
+ info!("Stream data callback: {} {}", input.len(), output.len());
+
+ // len is of input and output is frame len. Turn these into the real lengths.
+ let real_input = unsafe {
+ let size_bytes = input.len() * self.input_frame_size as usize;
+ slice::from_raw_parts(input.as_ptr(), size_bytes)
+ };
+ let real_output = unsafe {
+ let size_bytes = output.len() * self.output_frame_size as usize;
+ info!("Resize output to {}", size_bytes);
+ slice::from_raw_parts_mut(output.as_mut_ptr(), size_bytes)
+ };
+
+ self.input_shm.write(&real_input).unwrap();
+
+ self.connection
+ .send(ClientMessage::StreamDataCallback(
+ output.len() as isize,
+ self.output_frame_size as usize
+ ))
+ .unwrap();
+
+ let r = self.connection.receive();
+ match r {
+ Ok(ServerMessage::StreamDataCallback(cb_result)) => {
+ if cb_result >= 0 {
+ let len = cb_result as usize * self.output_frame_size as usize;
+ self.output_shm.read(&mut real_output[..len]).unwrap();
+ cb_result
+ } else {
+ cb_result
+ }
+ },
+ _ => {
+ debug!("Unexpected message {:?} during callback", r);
+ -1
+ },
+ }
+ }
+
+ fn state_callback(&mut self, state: cubeb::State) {
+ info!("Stream state callback: {:?}", state);
+ }
+}
+
+impl Drop for Callback {
+ fn drop(&mut self) {
+ self.connection
+ .send(ClientMessage::StreamDestroyed)
+ .unwrap();
+ }
+}
+
+type Slab<T> = slab::Slab<T, Token>;
+type StreamSlab = slab::Slab<cubeb::Stream<Callback>, usize>;
+
+// TODO: Server token must be outside range used by server.connections slab.
+// usize::MAX is already used internally in mio.
+const QUIT: Token = Token(std::usize::MAX - 2);
+const SERVER: Token = Token(std::usize::MAX - 1);
+
+struct ServerConn {
+ connection: audioipc::Connection,
+ token: Option<Token>,
+ streams: StreamSlab
+}
+
+impl ServerConn {
+ fn new<FD>(fd: FD) -> ServerConn
+ where
+ FD: IntoRawFd,
+ {
+ ServerConn {
+ connection: unsafe { audioipc::Connection::from_raw_fd(fd.into_raw_fd()) },
+ token: None,
+ // TODO: Handle increasing slab size. Pick a good default size.
+ streams: StreamSlab::with_capacity(64)
+ }
+ }
+
+ fn process(&mut self, poll: &mut mio::Poll, context: &Result<Option<cubeb::Context>>) -> Result<()> {
+ let r = self.connection.receive();
+ info!("ServerConn::process: got {:?}", r);
+
+ if let &Ok(Some(ref ctx)) = context {
+ // TODO: Might need a simple state machine to deal with
+ // create/use/destroy ordering, etc.
+ // TODO: receive() and all this handling should be moved out
+ // of this event loop code.
+ let msg = try!(r);
+ let _ = try!(self.process_msg(&msg, ctx));
+ } else {
+ self.send_error(cubeb::Error::new());
+ }
+
+ poll.reregister(
+ &self.connection,
+ self.token.unwrap(),
+ mio::Ready::readable(),
+ mio::PollOpt::edge() | mio::PollOpt::oneshot()
+ ).unwrap();
+
+ Ok(())
+ }
+
+ fn process_msg(&mut self, msg: &ServerMessage, context: &cubeb::Context) -> Result<()> {
+ match msg {
+ &ServerMessage::ClientConnect => {
+ panic!("already connected");
+ },
+ &ServerMessage::ClientDisconnect => {
+ // TODO:
+ //self.connection.client_disconnect();
+ self.connection
+ .send(ClientMessage::ClientDisconnected)
+ .unwrap();
+ },
+
+ &ServerMessage::ContextGetBackendId => {},
+
+ &ServerMessage::ContextGetMaxChannelCount => {
+ match context.max_channel_count() {
+ Ok(channel_count) => {
+ self.connection
+ .send(ClientMessage::ContextMaxChannelCount(channel_count))
+ .unwrap();
+ },
+ Err(e) => {
+ self.send_error(e);
+ },
+ }
+ },
+
+ &ServerMessage::ContextGetMinLatency(ref params) => {
+
+ let format = cubeb::SampleFormat::from(params.format);
+ let layout = cubeb::ChannelLayout::from(params.layout);
+
+ let params = cubeb::StreamParamsBuilder::new()
+ .format(format)
+ .rate(params.rate as _)
+ .channels(params.channels as _)
+ .layout(layout)
+ .take();
+
+ match context.min_latency(¶ms) {
+ Ok(latency) => {
+ self.connection
+ .send(ClientMessage::ContextMinLatency(latency))
+ .unwrap();
+ },
+ Err(e) => {
+ self.send_error(e);
+ },
+ }
+ },
+
+ &ServerMessage::ContextGetPreferredSampleRate => {
+ match context.preferred_sample_rate() {
+ Ok(rate) => {
+ self.connection
+ .send(ClientMessage::ContextPreferredSampleRate(rate))
+ .unwrap();
+ },
+ Err(e) => {
+ self.send_error(e);
+ },
+ }
+ },
+
+ &ServerMessage::ContextGetPreferredChannelLayout => {
+ match context.preferred_channel_layout() {
+ Ok(layout) => {
+ self.connection
+ .send(ClientMessage::ContextPreferredChannelLayout(layout as _))
+ .unwrap();
+ },
+ Err(e) => {
+ self.send_error(e);
+ },
+ }
+ },
+
+ &ServerMessage::ContextGetDeviceEnumeration(device_type) => {
+ match context.enumerate_devices(cubeb::DeviceType::from_bits_truncate(device_type)) {
+ Ok(devices) => {
+ let v: Vec<DeviceInfo> = devices.iter().map(|i| i.raw().into()).collect();
+ self.connection
+ .send(ClientMessage::ContextEnumeratedDevices(v))
+ .unwrap();
+ },
+ Err(e) => {
+ self.send_error(e);
+ },
+ }
+ },
+
+ &ServerMessage::StreamInit(ref params) => {
+ fn opt_stream_params(params: Option<&StreamParams>) -> Option<cubeb::StreamParams> {
+ match params {
+ Some(p) => {
+ let raw = ffi::cubeb_stream_params::from(p);
+ Some(unsafe { cubeb::StreamParams::from_raw(&raw as *const _) })
+ },
+ None => None,
+ }
+ }
+
+ fn frame_size_in_bytes(params: Option<cubeb::StreamParams>) -> u16 {
+ match params.as_ref() {
+ Some(p) => {
+ let sample_size = match p.format() {
+ cubeb::SampleFormat::S16LE |
+ cubeb::SampleFormat::S16BE |
+ cubeb::SampleFormat::S16NE => 2,
+ cubeb::SampleFormat::Float32LE |
+ cubeb::SampleFormat::Float32BE |
+ cubeb::SampleFormat::Float32NE => 4,
+ };
+ let channel_count = p.channels() as u16;
+ sample_size * channel_count
+ },
+ None => 0,
+ }
+ }
+
+ // TODO: Yuck!
+ let input_device = unsafe { cubeb::DeviceId::from_raw(params.input_device as *const _) };
+ let output_device = unsafe { cubeb::DeviceId::from_raw(params.output_device as *const _) };
+ let latency = params.latency_frames;
+ let mut builder = cubeb::StreamInitOptionsBuilder::new();
+ builder
+ .input_device(input_device)
+ .output_device(output_device)
+ .latency(latency);
+
+ if let Some(ref stream_name) = params.stream_name {
+ builder.stream_name(stream_name);
+ }
+ let input_stream_params = opt_stream_params(params.input_stream_params.as_ref());
+ if let Some(ref isp) = input_stream_params {
+ builder.input_stream_param(isp);
+ }
+ let output_stream_params = opt_stream_params(params.output_stream_params.as_ref());
+ if let Some(ref osp) = output_stream_params {
+ builder.output_stream_param(osp);
+ }
+ let params = builder.take();
+
+ let input_frame_size = frame_size_in_bytes(input_stream_params);
+ let output_frame_size = frame_size_in_bytes(output_stream_params);
+
+ let (conn1, conn2) = audioipc::Connection::pair()?;
+ info!("Created connection pair: {:?}-{:?}", conn1, conn2);
+
+ let (input_shm, input_file) =
+ SharedMemWriter::new(&audioipc::get_shm_path("input"), SHM_AREA_SIZE)?;
+ let (output_shm, output_file) =
+ SharedMemReader::new(&audioipc::get_shm_path("output"), SHM_AREA_SIZE)?;
+
+ match context.stream_init(
+ ¶ms,
+ Callback {
+ input_frame_size: input_frame_size,
+ output_frame_size: output_frame_size,
+ connection: conn2,
+ input_shm: input_shm,
+ output_shm: output_shm,
+ }
+ ) {
+ Ok(stream) => {
+ let stm_tok = match self.streams.vacant_entry() {
+ Some(entry) => {
+ debug!(
+ "Registering stream {:?}",
+ entry.index(),
+ );
+
+ entry.insert(stream).index()
+ },
+ None => {
+ // TODO: Turn into error
+ panic!("Failed to insert stream into slab. No entries");
+ },
+ };
+
+ self.connection
+ .send_with_fd(ClientMessage::StreamCreated(stm_tok), Some(conn1))
+ .unwrap();
+ // TODO: It'd be nicer to send these as part of
+ // StreamCreated, but that requires changing
+ // sendmsg/recvmsg to support multiple fds.
+ self.connection
+ .send_with_fd(ClientMessage::StreamCreatedInputShm, Some(input_file))
+ .unwrap();
+ self.connection
+ .send_with_fd(ClientMessage::StreamCreatedOutputShm, Some(output_file))
+ .unwrap();
+ },
+ Err(e) => {
+ self.send_error(e);
+ },
+ }
+ },
+
+ &ServerMessage::StreamDestroy(stm_tok) => {
+ self.streams.remove(stm_tok);
+ self.connection
+ .send(ClientMessage::StreamDestroyed)
+ .unwrap();
+ },
+
+ &ServerMessage::StreamStart(stm_tok) => {
+ let _ = self.streams[stm_tok].start();
+ self.connection.send(ClientMessage::StreamStarted).unwrap();
+ },
+ &ServerMessage::StreamStop(stm_tok) => {
+ let _ = self.streams[stm_tok].stop();
+ self.connection.send(ClientMessage::StreamStopped).unwrap();
+ },
+ &ServerMessage::StreamGetPosition(stm_tok) => {
+ match self.streams[stm_tok].position() {
+ Ok(position) => {
+ self.connection
+ .send(ClientMessage::StreamPosition(position))
+ .unwrap();
+ },
+ Err(e) => {
+ self.send_error(e);
+ },
+ }
+ },
+ &ServerMessage::StreamGetLatency(stm_tok) => {
+ match self.streams[stm_tok].latency() {
+ Ok(latency) => {
+ self.connection
+ .send(ClientMessage::StreamLatency(latency))
+ .unwrap();
+ },
+ Err(e) => self.send_error(e),
+ }
+ },
+ &ServerMessage::StreamSetVolume(stm_tok, volume) => {
+ let _ = self.streams[stm_tok].set_volume(volume);
+ self.connection
+ .send(ClientMessage::StreamVolumeSet)
+ .unwrap();
+ },
+ &ServerMessage::StreamSetPanning(stm_tok, panning) => {
+ let _ = self.streams[stm_tok].set_panning(panning);
+ self.connection
+ .send(ClientMessage::StreamPanningSet)
+ .unwrap();
+ },
+ &ServerMessage::StreamGetCurrentDevice(stm_tok) => {
+ let err = match self.streams[stm_tok].current_device() {
+ Ok(device) => {
+ // TODO: Yuck!
+ self.connection
+ .send(ClientMessage::StreamCurrentDevice(device.into()))
+ .unwrap();
+ None
+ },
+ Err(e) => Some(e),
+ };
+ if let Some(e) = err {
+ self.send_error(e);
+ }
+ },
+ _ => {
+ bail!("Unexpected Message");
+ },
+ }
+ Ok(())
+ }
+
+ fn send_error(&mut self, error: cubeb::Error) {
+ self.connection
+ .send(ClientMessage::ContextError(error.raw_code()))
+ .unwrap();
+ }
+}
+
+pub struct Server {
+ socket: UnixListener,
+ // Ok(None) - Server hasn't tried to create cubeb::Context.
+ // Ok(Some(ctx)) - Server has successfully created cubeb::Context.
+ // Err(_) - Server has tried and failed to create cubeb::Context.
+ // Don't try again.
+ context: Result<Option<cubeb::Context>>,
+ conns: Slab<ServerConn>
+}
+
+impl Server {
+ pub fn new(socket: UnixListener) -> Server {
+ Server {
+ socket: socket,
+ context: Ok(None),
+ conns: Slab::with_capacity(16)
+ }
+ }
+
+ fn accept(&mut self, poll: &mut mio::Poll) -> Result<()> {
+ debug!("Server accepting connection");
+
+ let client_socket = match self.socket.accept() {
+ Err(e) => {
+ error!("server accept error: {}", e);
+ return Err(e.into());
+ },
+ Ok(None) => panic!("accept returned EAGAIN unexpectedly"),
+ Ok(Some((socket, _))) => socket,
+ };
+ let token = match self.conns.vacant_entry() {
+ Some(entry) => {
+ debug!("registering {:?}", entry.index());
+ let cxn = ServerConn::new(client_socket);
+ entry.insert(cxn).index()
+ },
+ None => {
+ panic!("failed to insert connection");
+ },
+ };
+
+ // Register the connection
+ self.conns[token].token = Some(token);
+ poll.register(
+ &self.conns[token].connection,
+ token,
+ mio::Ready::readable(),
+ mio::PollOpt::edge() | mio::PollOpt::oneshot()
+ ).unwrap();
+ /*
+ let r = self.conns[token].receive();
+ debug!("received {:?}", r);
+ let r = self.conns[token].send(ClientMessage::ClientConnected);
+ debug!("sent {:?} (ClientConnected)", r);
+ */
+
+ // Since we have a connection try creating a cubeb context. If
+ // it fails, mark the failure with Err.
+ if let Ok(None) = self.context {
+ self.context = cubeb::Context::init("AudioIPC Server", None)
+ .and_then(|ctx| Ok(Some(ctx)))
+ .or_else(|err| Err(err.into()));
+ }
+
+ Ok(())
+ }
+
+ pub fn poll(&mut self, poll: &mut mio::Poll) -> Result<()> {
+ let mut events = mio::Events::with_capacity(16);
+
+ match poll.poll(&mut events, None) {
+ Ok(_) => {},
+ Err(e) => error!("server poll error: {}", e),
+ }
+
+ for event in events.iter() {
+ match event.token() {
+ SERVER => {
+ match self.accept(poll) {
+ Err(e) => {
+ error!("server accept error: {}", e);
+ },
+ _ => {},
+ };
+ },
+ QUIT => {
+ info!("Quitting Audio Server loop");
+ bail!("quit");
+ },
+ token => {
+ debug!("token {:?} ready", token);
+
+ let r = self.conns[token].process(poll, &self.context);
+
+ debug!("got {:?}", r);
+
+ // TODO: Handle disconnection etc.
+ // TODO: Should be handled at a higher level by a
+ // disconnect message.
+ if let Err(e) = r {
+ debug!("dropped client {:?} due to error {:?}", token, e);
+ self.conns.remove(token);
+ continue;
+ }
+
+ // poll.reregister(
+ // &self.conn(token).connection,
+ // token,
+ // mio::Ready::readable(),
+ // mio::PollOpt::edge() | mio::PollOpt::oneshot()
+ // ).unwrap();
+ },
+ }
+ }
+
+ Ok(())
+ }
+}
+
+
+// TODO: This should take an "Evented" instead of opening the UDS path
+// directly (and let caller set up the Evented), but need a way to describe
+// it as an Evented that we can send/recv file descriptors (or HANDLEs on
+// Windows) over.
+pub fn run(running: Arc<AtomicBool>) -> Result<()> {
+
+ // Ignore result.
+ let _ = std::fs::remove_file(audioipc::get_uds_path());
+
+ // TODO: Use a SEQPACKET, wrap it in UnixStream?
+ let mut poll = mio::Poll::new()?;
+ let mut server = Server::new(UnixListener::bind(audioipc::get_uds_path())?);
+
+ poll.register(
+ &server.socket,
+ SERVER,
+ mio::Ready::readable(),
+ mio::PollOpt::edge()
+ ).unwrap();
+
+ loop {
+ if !running.load(Ordering::SeqCst) {
+ bail!("server quit due to ctrl-c");
+ }
+
+ let _ = try!(server.poll(&mut poll));
+ }
+
+ //poll.deregister(&server.socket).unwrap();
+}
+
+#[no_mangle]
+pub extern "C" fn audioipc_server_start() -> *mut c_void {
+
+ let (tx, rx) = channel::ctl_pair();
+
+ thread::spawn(move || {
+ // Ignore result.
+ let _ = std::fs::remove_file(audioipc::get_uds_path());
+
+ // TODO: Use a SEQPACKET, wrap it in UnixStream?
+ let mut poll = mio::Poll::new().unwrap();
+ let mut server = Server::new(UnixListener::bind(audioipc::get_uds_path()).unwrap());
+
+ poll.register(
+ &server.socket,
+ SERVER,
+ mio::Ready::readable(),
+ mio::PollOpt::edge()
+ ).unwrap();
+
+ poll.register(&rx, QUIT, mio::Ready::readable(), mio::PollOpt::edge())
+ .unwrap();
+
+ loop {
+ match server.poll(&mut poll) {
+ Err(_) => {
+ return;
+ },
+ _ => (),
+ }
+ }
+ });
+
+ Box::into_raw(Box::new(tx)) as *mut _
+}
+
+#[no_mangle]
+pub extern "C" fn audioipc_server_stop(p: *mut c_void) {
+ // Dropping SenderCtl here will notify the other end.
+ let _ = unsafe { Box::<channel::SenderCtl>::from_raw(p as *mut _) };
+}