--- a/media/audioipc/README_MOZILLA
+++ b/media/audioipc/README_MOZILLA
@@ -1,8 +1,8 @@
The source from this directory was copied from the audioipc-2
git repository using the update.sh script. The only changes
made were those applied by update.sh and the addition of
Makefile.in build files for the Mozilla build system.
The audioipc-2 git repository is: https://github.com/djg/audioipc-2.git
-The git commit ID used was 864332fde124761da15c444e58889faf598a219f (2017-10-30 10:55:41 +1300)
+The git commit ID used was 71cc67f44b803e0288997247c060375196f1cf9b (2018-01-19 17:16:33 +1000)
--- a/media/audioipc/audioipc/Cargo.toml
+++ b/media/audioipc/audioipc/Cargo.toml
@@ -1,24 +1,29 @@
[package]
name = "audioipc"
-version = "0.1.0"
+version = "0.2.0"
authors = [
"Matthew Gregan <kinetik@flim.org>",
"Dan Glastonbury <dan.glastonbury@gmail.com>"
]
description = "Remote Cubeb IPC"
[dependencies]
cubeb-core = { path = "../../cubeb-rs/cubeb-core" }
bincode = "0.8"
bytes = "0.4"
+# rayon-core in Gecko uses futures 0.1.13
+futures = "=0.1.13"
+iovec = "0.1"
libc = "0.2"
log = "^0.3.6"
memmap = "0.5.2"
-mio = "0.6.7"
-mio-uds = "0.6.4"
+scoped-tls = "0.1"
serde = "1.*.*"
serde_derive = "1.*.*"
+tokio-core = "0.1"
+tokio-io = "0.1"
+tokio-uds = "0.1.7"
[dependencies.error-chain]
version = "0.11.0"
-default-features = false
+default-features = false
\ No newline at end of file
--- a/media/audioipc/audioipc/src/async.rs
+++ b/media/audioipc/audioipc/src/async.rs
@@ -1,153 +1,174 @@
// Copyright © 2017 Mozilla Foundation
//
// This program is made available under an ISC-style license. See the
// accompanying file LICENSE for details
//! Various async helpers modelled after futures-rs and tokio-io.
-use {RecvFd, SendFd};
+use {RecvMsg, SendMsg};
use bytes::{Buf, BufMut};
-use mio_uds;
-use std::io as std_io;
-use std::os::unix::io::RawFd;
-use std::os::unix::net;
-
-/// A convenience macro for working with `io::Result<T>` from the
-/// `std::io::Read` and `std::io::Write` traits.
-///
-/// This macro takes `io::Result<T>` as input, and returns `T` as the output. If
-/// the input type is of the `Err` variant, then `Async::NotReady` is returned if
-/// it indicates `WouldBlock` or otherwise `Err` is returned.
-#[macro_export]
-macro_rules! try_nb {
- ($e:expr) => (match $e {
- Ok(t) => t,
- Err(ref e) if e.kind() == ::std::io::ErrorKind::WouldBlock => {
- return Ok(Async::NotReady)
- }
- Err(e) => return Err(e.into()),
- })
-}
-
-/////////////////////////////////////////////////////////////////////////////////////////
-// Async support - Handle EWOULDBLOCK/EAGAIN from non-blocking I/O operations.
-
-/// Return type for async methods, indicates whether the operation was
-/// ready or not.
-///
-/// * `Ok(Async::Ready(t))` means that the operation has completed successfully.
-/// * `Ok(Async::NotReady)` means that the underlying system is not ready to handle operation.
-/// * `Err(e)` means that the operation has completed with the given error `e`.
-pub type AsyncResult<T, E> = Result<Async<T>, E>;
+use futures::{Async, Poll};
+use iovec::IoVec;
+use std::io;
+use tokio_io::{AsyncRead, AsyncWrite};
+use tokio_uds::UnixStream;
-#[derive(Copy, Clone, Debug, PartialEq)]
-pub enum Async<T> {
- /// Represents that a value is immediately ready.
- Ready(T),
- /// Represents that a value is not ready yet, but may be so later.
- NotReady
-}
-
-impl<T> Async<T> {
- pub fn is_ready(&self) -> bool {
- match *self {
- Async::Ready(_) => true,
- Async::NotReady => false,
- }
- }
-
- pub fn is_not_ready(&self) -> bool {
- !self.is_ready()
- }
-}
-
-/// Return type for an async attempt to send a value.
-///
-/// * `Ok(AsyncSend::Ready)` means that the operation has completed successfully.
-/// * `Ok(AsyncSend::NotReady(t))` means that the underlying system is not ready to handle
-/// send. returns the value that tried to be sent in `t`.
-/// * `Err(e)` means that operation has completed with the given error `e`.
-pub type AsyncSendResult<T, E> = Result<AsyncSend<T>, E>;
-
-#[derive(Copy, Clone, Debug, PartialEq)]
-pub enum AsyncSend<T> {
- Ready,
- NotReady(T)
-}
-
-pub trait AsyncRecvFd: RecvFd {
- unsafe fn prepare_uninitialized_buffer(&self, bytes: &mut [u8]) -> bool {
- for byte in bytes.iter_mut() {
- *byte = 0;
- }
-
- true
- }
-
+pub trait AsyncRecvMsg: AsyncRead {
/// Pull some bytes from this source into the specified `Buf`, returning
/// how many bytes were read.
///
/// The `buf` provided will have bytes read into it and the internal cursor
/// will be advanced if any bytes were read. Note that this method typically
/// will not reallocate the buffer provided.
- fn recv_buf_fd<B>(&mut self, buf: &mut B) -> AsyncResult<(usize, Option<RawFd>), std_io::Error>
+ fn recv_msg_buf<B>(&mut self, buf: &mut B, cmsg: &mut B) -> Poll<(usize, i32), io::Error>
where
Self: Sized,
- B: BufMut,
- {
- if !buf.has_remaining_mut() {
- return Ok(Async::Ready((0, None)));
- }
-
- unsafe {
- let (n, fd) = {
- let bytes = buf.bytes_mut();
- self.prepare_uninitialized_buffer(bytes);
- try_nb!(self.recv_fd(bytes))
- };
-
- buf.advance_mut(n);
- Ok(Async::Ready((n, fd)))
- }
- }
+ B: BufMut;
}
-impl AsyncRecvFd for net::UnixStream {}
-impl AsyncRecvFd for mio_uds::UnixStream {}
-
/// A trait for writable objects which operated in an async fashion.
///
/// This trait inherits from `std::io::Write` and indicates that an I/O object is
/// **nonblocking**, meaning that it will return an error instead of blocking
/// when bytes cannot currently be written, but hasn't closed. Specifically
/// this means that the `write` function for types that implement this trait
/// can have a few return values:
///
/// * `Ok(n)` means that `n` bytes of data was immediately written .
/// * `Err(e) if e.kind() == ErrorKind::WouldBlock` means that no data was
/// written from the buffer provided. The I/O object is not currently
/// writable but may become writable in the future.
/// * `Err(e)` for other errors are standard I/O errors coming from the
/// underlying object.
-pub trait AsyncSendFd: SendFd {
+pub trait AsyncSendMsg: AsyncWrite {
/// Write a `Buf` into this value, returning how many bytes were written.
///
/// Note that this method will advance the `buf` provided automatically by
/// the number of bytes written.
- fn send_buf_fd<B>(&mut self, buf: &mut B, fd: Option<RawFd>) -> AsyncResult<usize, std_io::Error>
+ fn send_msg_buf<B, C>(&mut self, buf: &mut B, cmsg: &C) -> Poll<usize, io::Error>
where
Self: Sized,
B: Buf,
+ C: Buf;
+}
+
+////////////////////////////////////////////////////////////////////////////////
+
+impl AsyncRecvMsg for UnixStream {
+ fn recv_msg_buf<B>(&mut self, buf: &mut B, cmsg: &mut B) -> Poll<(usize, i32), io::Error>
+ where
+ B: BufMut,
{
- if !buf.has_remaining() {
- return Ok(Async::Ready(0));
+ if let Async::NotReady = <UnixStream>::poll_read(self) {
+ return Ok(Async::NotReady);
}
+ let r = unsafe {
+ // The `IoVec` type can't have a 0-length size, so we create a bunch
+ // of dummy versions on the stack with 1 length which we'll quickly
+ // overwrite.
+ let b1: &mut [u8] = &mut [0];
+ let b2: &mut [u8] = &mut [0];
+ let b3: &mut [u8] = &mut [0];
+ let b4: &mut [u8] = &mut [0];
+ let b5: &mut [u8] = &mut [0];
+ let b6: &mut [u8] = &mut [0];
+ let b7: &mut [u8] = &mut [0];
+ let b8: &mut [u8] = &mut [0];
+ let b9: &mut [u8] = &mut [0];
+ let b10: &mut [u8] = &mut [0];
+ let b11: &mut [u8] = &mut [0];
+ let b12: &mut [u8] = &mut [0];
+ let b13: &mut [u8] = &mut [0];
+ let b14: &mut [u8] = &mut [0];
+ let b15: &mut [u8] = &mut [0];
+ let b16: &mut [u8] = &mut [0];
+ let mut bufs: [&mut IoVec; 16] = [
+ b1.into(),
+ b2.into(),
+ b3.into(),
+ b4.into(),
+ b5.into(),
+ b6.into(),
+ b7.into(),
+ b8.into(),
+ b9.into(),
+ b10.into(),
+ b11.into(),
+ b12.into(),
+ b13.into(),
+ b14.into(),
+ b15.into(),
+ b16.into(),
+ ];
+ let n = buf.bytes_vec_mut(&mut bufs);
+ self.recv_msg(&mut bufs[..n], cmsg.bytes_mut())
+ };
- let n = try_nb!(self.send_fd(buf.bytes(), fd));
- buf.advance(n);
- Ok(Async::Ready(n))
+ match r {
+ Ok((n, cmsg_len, flags)) => {
+ unsafe {
+ buf.advance_mut(n);
+ }
+ unsafe {
+ cmsg.advance_mut(cmsg_len);
+ }
+ Ok((n, flags).into())
+ },
+ Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
+ self.need_read();
+ Ok(Async::NotReady)
+ },
+ Err(e) => Err(e),
+ }
}
}
-impl AsyncSendFd for net::UnixStream {}
-impl AsyncSendFd for mio_uds::UnixStream {}
+impl AsyncSendMsg for UnixStream {
+ fn send_msg_buf<B, C>(&mut self, buf: &mut B, cmsg: &C) -> Poll<usize, io::Error>
+ where
+ B: Buf,
+ C: Buf,
+ {
+ if let Async::NotReady = <UnixStream>::poll_write(self) {
+ return Ok(Async::NotReady);
+ }
+ let r = {
+ // The `IoVec` type can't have a zero-length size, so create a dummy
+ // version from a 1-length slice which we'll overwrite with the
+ // `bytes_vec` method.
+ static DUMMY: &[u8] = &[0];
+ let nom = <&IoVec>::from(DUMMY);
+ let mut bufs = [
+ nom,
+ nom,
+ nom,
+ nom,
+ nom,
+ nom,
+ nom,
+ nom,
+ nom,
+ nom,
+ nom,
+ nom,
+ nom,
+ nom,
+ nom,
+ nom,
+ ];
+ let n = buf.bytes_vec(&mut bufs);
+ self.send_msg(&bufs[..n], cmsg.bytes())
+ };
+ match r {
+ Ok(n) => {
+ buf.advance(n);
+ Ok(Async::Ready(n))
+ },
+ Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
+ self.need_write();
+ Ok(Async::NotReady)
+ },
+ Err(e) => Err(e),
+ }
+ }
+}
new file mode 100644
--- /dev/null
+++ b/media/audioipc/audioipc/src/cmsg.rs
@@ -0,0 +1,187 @@
+// Copyright © 2017 Mozilla Foundation
+//
+// This program is made available under an ISC-style license. See the
+// accompanying file LICENSE for details
+
+use bytes::{BufMut, Bytes, BytesMut};
+use libc::{self, cmsghdr};
+use std::{convert, mem, ops, slice};
+use std::os::unix::io::RawFd;
+
+#[derive(Clone, Debug)]
+pub struct Fds {
+ fds: Bytes
+}
+
+impl convert::AsRef<[RawFd]> for Fds {
+ fn as_ref(&self) -> &[RawFd] {
+ let n = self.fds.len() / mem::size_of::<RawFd>();
+ unsafe { slice::from_raw_parts(self.fds.as_ptr() as *const _, n) }
+ }
+}
+
+impl ops::Deref for Fds {
+ type Target = [RawFd];
+
+ #[inline]
+ fn deref(&self) -> &[RawFd] {
+ self.as_ref()
+ }
+}
+
+pub struct ControlMsgIter {
+ control: Bytes
+}
+
+pub fn iterator(c: Bytes) -> ControlMsgIter {
+ ControlMsgIter {
+ control: c
+ }
+}
+
+impl Iterator for ControlMsgIter {
+ type Item = Fds;
+
+ // This follows the logic in __cmsg_nxthdr from glibc
+ // /usr/include/bits/socket.h
+ fn next(&mut self) -> Option<Self::Item> {
+ loop {
+ let control = self.control.clone();
+ let cmsghdr_len = align(mem::size_of::<cmsghdr>());
+
+ if control.len() < cmsghdr_len {
+ // No more entries---not enough data in `control` for a
+ // complete message.
+ return None;
+ }
+
+ let cmsg: &cmsghdr = unsafe { &*(control.as_ptr() as *const _) };
+ // The offset to the next cmsghdr in control. This must be
+ // aligned to a boundary that matches the type used to
+ // represent the length of the message.
+ let cmsg_len = cmsg.cmsg_len;
+ let next_cmsghdr = align(cmsg_len as _);
+ self.control = if next_cmsghdr > control.len() {
+ // No more entries---not enough data in `control` for a
+ // complete message.
+ Bytes::new()
+ } else {
+ control.slice_from(next_cmsghdr)
+ };
+
+ match (cmsg.cmsg_level, cmsg.cmsg_type) {
+ (libc::SOL_SOCKET, libc::SCM_RIGHTS) => {
+ trace!("Found SCM_RIGHTS...");
+ return Some(Fds {
+ fds: control.slice(cmsghdr_len, cmsg_len as _)
+ });
+ },
+ (level, kind) => {
+ trace!("Skipping cmsg level, {}, type={}...", level, kind);
+ },
+ }
+ }
+ }
+}
+
+#[derive(Clone, Copy, Debug, Eq, Hash, Ord, PartialEq, PartialOrd)]
+pub enum Error {
+ /// Not enough space in storage to insert control mesage.
+ NoSpace
+}
+
+#[must_use]
+pub struct ControlMsgBuilder {
+ result: Result<BytesMut, Error>
+}
+
+pub fn builder(buf: &mut BytesMut) -> ControlMsgBuilder {
+ let buf = aligned(buf);
+ ControlMsgBuilder {
+ result: Ok(buf)
+ }
+}
+
+impl ControlMsgBuilder {
+ fn msg(mut self, level: libc::c_int, kind: libc::c_int, msg: &[u8]) -> Self {
+ self.result = self.result.and_then(align_buf).and_then(|mut cmsg| {
+ let cmsg_len = len(msg.len());
+ if cmsg.remaining_mut() < cmsg_len {
+ return Err(Error::NoSpace);
+ }
+
+ let cmsghdr = cmsghdr {
+ cmsg_len: cmsg_len as _,
+ cmsg_level: level,
+ cmsg_type: kind
+ };
+
+ let cmsghdr = unsafe { slice::from_raw_parts(&cmsghdr as *const _ as *const _, mem::size_of::<cmsghdr>()) };
+ cmsg.put_slice(cmsghdr);
+ let mut cmsg = try!(align_buf(cmsg));
+ cmsg.put_slice(msg);
+
+ Ok(cmsg)
+ });
+
+ self
+ }
+
+ pub fn rights(self, fds: &[RawFd]) -> Self {
+ self.msg(libc::SOL_SOCKET, libc::SCM_RIGHTS, fds.as_bytes())
+ }
+
+ pub fn finish(self) -> Result<Bytes, Error> {
+ self.result.map(|mut cmsg| cmsg.take().freeze())
+ }
+}
+
+pub trait AsBytes {
+ fn as_bytes(&self) -> &[u8];
+}
+
+impl<'a, T: Sized> AsBytes for &'a [T] {
+ fn as_bytes(&self) -> &[u8] {
+ // TODO: This should account for the alignment of T
+ let byte_count = self.len() * mem::size_of::<T>();
+ unsafe { slice::from_raw_parts(self.as_ptr() as *const _, byte_count) }
+ }
+}
+
+fn aligned(buf: &BytesMut) -> BytesMut {
+ let mut aligned_buf = buf.clone();
+ aligned_buf.reserve(buf.remaining_mut());
+ let cmsghdr_align = mem::align_of::<cmsghdr>();
+ let n = unsafe { aligned_buf.bytes_mut().as_ptr() } as usize & (cmsghdr_align - 1);
+ if n != 0 {
+ unsafe { aligned_buf.advance_mut(n) };
+ drop(aligned_buf.take());
+ }
+ aligned_buf
+}
+
+fn align_buf(mut cmsg: BytesMut) -> Result<BytesMut, Error> {
+ let offset = unsafe { cmsg.bytes_mut().as_ptr() } as usize;
+ let adjust = align(offset) - offset;
+ if cmsg.remaining_mut() < adjust {
+ return Err(Error::NoSpace);
+ }
+
+ for _ in 0..adjust {
+ cmsg.put_u8(0);
+ }
+ Ok(cmsg)
+}
+
+fn align(len: usize) -> usize {
+ let cmsghdr_align = mem::align_of::<cmsghdr>();
+ (len + cmsghdr_align - 1) & !(cmsghdr_align - 1)
+}
+
+pub fn len(len: usize) -> usize {
+ align(mem::size_of::<cmsghdr>()) + len
+}
+
+pub fn space(len: usize) -> usize {
+ align(mem::size_of::<cmsghdr>()) + align(len)
+}
--- a/media/audioipc/audioipc/src/codec.rs
+++ b/media/audioipc/audioipc/src/codec.rs
@@ -1,141 +1,180 @@
// Copyright © 2017 Mozilla Foundation
//
// This program is made available under an ISC-style license. See the
// accompanying file LICENSE for details
//! `Encoder`s and `Decoder`s from items to/from `BytesMut` buffers.
-use bincode::{self, Bounded, deserialize, serialize_into, serialized_size};
-use bytes::{Buf, BufMut, BytesMut, LittleEndian};
+use bincode::{self, deserialize, serialize_into, serialized_size, Bounded};
+use bytes::{BufMut, ByteOrder, BytesMut, LittleEndian};
use serde::de::DeserializeOwned;
use serde::ser::Serialize;
-use std::io as std_io;
-use std::io::Cursor;
-use std::mem;
+use std::fmt::Debug;
+use std::io;
+use std::marker::PhantomData;
////////////////////////////////////////////////////////////////////////////////
// Split buffer into size delimited frames - This appears more complicated than
// might be necessary due to handling the possibility of messages being split
// across reads.
-#[derive(Debug)]
-enum FrameState {
- Head,
- Data(usize)
-}
+pub trait Codec {
+ /// The type of items to be encoded into byte buffer
+ type In;
+
+ /// The type of items to be returned by decoding from byte buffer
+ type Out;
+
+ /// Attempts to decode a frame from the provided buffer of bytes.
+ fn decode(&mut self, buf: &mut BytesMut) -> io::Result<Option<Self::Out>>;
-#[derive(Debug)]
-pub struct Decoder {
- state: FrameState
-}
-
-impl Decoder {
- pub fn new() -> Self {
- Decoder {
- state: FrameState::Head
+ /// A default method available to be called when there are no more bytes
+ /// available to be read from the I/O.
+ fn decode_eof(&mut self, buf: &mut BytesMut) -> io::Result<Self::Out> {
+ match try!(self.decode(buf)) {
+ Some(frame) => Ok(frame),
+ None => Err(io::Error::new(
+ io::ErrorKind::Other,
+ "bytes remaining on stream"
+ ))
}
}
- fn decode_head(&mut self, src: &mut BytesMut) -> std_io::Result<Option<usize>> {
- let head_size = mem::size_of::<u16>();
- if src.len() < head_size {
+ /// Encodes a frame intox the buffer provided.
+ fn encode(&mut self, msg: Self::In, buf: &mut BytesMut) -> io::Result<()>;
+}
+
+/// Codec based upon bincode serialization
+///
+/// Messages that have been serialized using bincode are prefixed with
+/// the length of the message to aid in deserialization, so that it's
+/// known if enough data has been received to decode a complete
+/// message.
+pub struct LengthDelimitedCodec<In, Out> {
+ state: State,
+ __in: PhantomData<In>,
+ __out: PhantomData<Out>
+}
+
+enum State {
+ Length,
+ Data(u16)
+}
+
+impl<In, Out> Default for LengthDelimitedCodec<In, Out> {
+ fn default() -> Self {
+ LengthDelimitedCodec {
+ state: State::Length,
+ __in: PhantomData,
+ __out: PhantomData
+ }
+ }
+}
+
+impl<In, Out> LengthDelimitedCodec<In, Out> {
+ // Lengths are encoded as little endian u16
+ fn decode_length(&mut self, buf: &mut BytesMut) -> io::Result<Option<u16>> {
+ if buf.len() < 2 {
// Not enough data
return Ok(None);
}
- let n = {
- let mut src = Cursor::new(&mut *src);
-
- // match endianess
- let n = src.get_uint::<LittleEndian>(head_size);
-
- if n > u64::from(u16::max_value()) {
- return Err(std_io::Error::new(
- std_io::ErrorKind::InvalidData,
- "frame size too big"
- ));
- }
-
- // The check above ensures there is no overflow
- n as usize
- };
+ let n = LittleEndian::read_u16(buf.as_ref());
// Consume the length field
- let _ = src.split_to(head_size);
+ let _ = buf.split_to(2);
Ok(Some(n))
}
- fn decode_data(&self, n: usize, src: &mut BytesMut) -> std_io::Result<Option<BytesMut>> {
+ fn decode_data(&mut self, buf: &mut BytesMut, n: u16) -> io::Result<Option<Out>>
+ where
+ Out: DeserializeOwned + Debug
+ {
// At this point, the buffer has already had the required capacity
// reserved. All there is to do is read.
- if src.len() < n {
+ let n = n as usize;
+ if buf.len() < n {
return Ok(None);
}
- Ok(Some(src.split_to(n)))
+ let buf = buf.split_to(n).freeze();
+
+ trace!("Attempting to decode");
+ let msg = try!(deserialize::<Out>(buf.as_ref()).map_err(|e| match *e {
+ bincode::ErrorKind::IoError(e) => e,
+ _ => io::Error::new(io::ErrorKind::Other, *e)
+ }));
+
+ trace!("... Decoded {:?}", msg);
+ Ok(Some(msg))
}
+}
- pub fn split_frame(&mut self, src: &mut BytesMut) -> std_io::Result<Option<BytesMut>> {
+impl<In, Out> Codec for LengthDelimitedCodec<In, Out>
+where
+ In: Serialize + Debug,
+ Out: DeserializeOwned + Debug
+{
+ type In = In;
+ type Out = Out;
+
+ fn decode(&mut self, buf: &mut BytesMut) -> io::Result<Option<Self::Out>> {
let n = match self.state {
- FrameState::Head => {
- match try!(self.decode_head(src)) {
+ State::Length => {
+ match try!(self.decode_length(buf)) {
Some(n) => {
- self.state = FrameState::Data(n);
+ self.state = State::Data(n);
// Ensure that the buffer has enough space to read the
// incoming payload
- src.reserve(n);
+ buf.reserve(n as usize);
n
},
- None => return Ok(None),
+ None => return Ok(None)
}
},
- FrameState::Data(n) => n,
+ State::Data(n) => n
};
- match try!(self.decode_data(n, src)) {
+ match try!(self.decode_data(buf, n)) {
Some(data) => {
// Update the decode state
- self.state = FrameState::Head;
+ self.state = State::Length;
// Make sure the buffer has enough space to read the next head
- src.reserve(mem::size_of::<u16>());
+ buf.reserve(2);
Ok(Some(data))
},
- None => Ok(None),
+ None => Ok(None)
}
}
- pub fn decode<ITEM: DeserializeOwned>(&mut self, src: &mut BytesMut) -> Result<Option<ITEM>, bincode::Error> {
- match try!(self.split_frame(src)) {
- Some(buf) => deserialize::<ITEM>(buf.as_ref()).and_then(|t| Ok(Some(t))),
- None => Ok(None),
+ fn encode(&mut self, item: Self::In, buf: &mut BytesMut) -> io::Result<()> {
+ trace!("Attempting to encode");
+ let encoded_len = serialized_size(&item);
+ if encoded_len > 8 * 1024 {
+ return Err(io::Error::new(
+ io::ErrorKind::InvalidInput,
+ "encoded message too big"
+ ));
}
- }
-}
+
+ buf.reserve((encoded_len + 2) as usize);
+
+ buf.put_u16::<LittleEndian>(encoded_len as u16);
-impl Default for Decoder {
- fn default() -> Self {
- Self::new()
+ if let Err(e) =
+ serialize_into::<_, Self::In, _>(&mut buf.writer(), &item, Bounded(encoded_len))
+ {
+ match *e {
+ bincode::ErrorKind::IoError(e) => return Err(e),
+ _ => return Err(io::Error::new(io::ErrorKind::Other, *e))
+ }
+ }
+
+ Ok(())
}
}
-
-pub fn encode<ITEM: Serialize>(dst: &mut BytesMut, item: &ITEM) -> Result<(), bincode::Error> {
- let head_len = mem::size_of::<u16>() as u64;
- let item_len = serialized_size(item);
-
- if head_len + item_len > u64::from(u16::max_value()) {
- return Err(Box::new(bincode::ErrorKind::IoError(std_io::Error::new(
- std_io::ErrorKind::InvalidInput,
- "frame too big"
- ))));
- }
-
- let n = (head_len + item_len) as usize;
- dst.reserve(n);
- dst.put_u16::<LittleEndian>(item_len as u16);
- serialize_into(&mut dst.writer(), item, Bounded(item_len))
-}
deleted file mode 100644
--- a/media/audioipc/audioipc/src/connection.rs
+++ /dev/null
@@ -1,206 +0,0 @@
-use {AutoCloseFd, RecvFd, SendFd};
-use async::{Async, AsyncRecvFd};
-use bytes::{BufMut, BytesMut};
-use codec::{Decoder, encode};
-use errors::*;
-use mio::{Poll, PollOpt, Ready, Token};
-use mio::event::Evented;
-use mio::unix::EventedFd;
-use serde::de::DeserializeOwned;
-use serde::ser::Serialize;
-use std::collections::VecDeque;
-use std::error::Error;
-use std::fmt::Debug;
-use std::io::{self, Read};
-use std::os::unix::io::{AsRawFd, RawFd};
-use std::os::unix::net;
-use std::os::unix::prelude::*;
-
-// Because of the trait implementation rules in Rust, this needs to be
-// a wrapper class to allow implementation of a trait from another
-// crate on a struct from yet another crate.
-//
-// This class is effectively net::UnixStream.
-
-#[derive(Debug)]
-pub struct Connection {
- stream: net::UnixStream,
- recv_buffer: BytesMut,
- recv_fd: VecDeque<AutoCloseFd>,
- send_buffer: BytesMut,
- decoder: Decoder
-}
-
-impl Connection {
- pub fn new(stream: net::UnixStream) -> Connection {
- info!("Create new connection");
- stream.set_nonblocking(false).unwrap();
- Connection {
- stream: stream,
- recv_buffer: BytesMut::with_capacity(1024),
- recv_fd: VecDeque::new(),
- send_buffer: BytesMut::with_capacity(1024),
- decoder: Decoder::new()
- }
- }
-
- /// Creates an unnamed pair of connected sockets.
- ///
- /// Returns two `Connection`s which are connected to each other.
- ///
- /// # Examples
- ///
- /// ```no_run
- /// use audioipc::Connection;
- ///
- /// let (conn1, conn2) = match Connection::pair() {
- /// Ok((conn1, conn2)) => (conn1, conn2),
- /// Err(e) => {
- /// println!("Couldn't create a pair of connections: {:?}", e);
- /// return
- /// }
- /// };
- /// ```
- pub fn pair() -> io::Result<(Connection, Connection)> {
- let (s1, s2) = net::UnixStream::pair()?;
- Ok((Connection::new(s1), Connection::new(s2)))
- }
-
- pub fn take_fd(&mut self) -> Option<RawFd> {
- self.recv_fd.pop_front().map(|fd| fd.into_raw_fd())
- }
-
- pub fn receive<RT>(&mut self) -> Result<RT>
- where
- RT: DeserializeOwned + Debug,
- {
- self.receive_with_fd()
- }
-
- pub fn receive_with_fd<RT>(&mut self) -> Result<RT>
- where
- RT: DeserializeOwned + Debug,
- {
- trace!("received_with_fd...");
- loop {
- trace!(" recv_buffer = {:?}", self.recv_buffer);
- if !self.recv_buffer.is_empty() {
- let r = self.decoder.decode(&mut self.recv_buffer);
- trace!("receive {:?}", r);
- match r {
- Ok(Some(r)) => return Ok(r),
- Ok(None) => {
- /* Buffer doesn't contain enough data for a complete
- * message, so need to enter recv_buf_fd to get more. */
- },
- Err(e) => return Err(e).chain_err(|| "Failed to deserialize message"),
- }
- }
-
- // Otherwise, try to read more data and try again. Make sure we've
- // got room for at least one byte to read to ensure that we don't
- // get a spurious 0 that looks like EOF
-
- // The decoder.decode should have reserved an amount for
- // the next bit it needs to read. Check that we reserved
- // enough space for, at least the 2 byte size prefix.
- assert!(self.recv_buffer.remaining_mut() > 2);
-
- // TODO: Read until block, EOF, or error.
- // TODO: Switch back to recv_fd.
- match self.stream.recv_buf_fd(&mut self.recv_buffer) {
- Ok(Async::Ready((0, _))) => return Err(ErrorKind::Disconnected.into()),
- // TODO: Handle partial read?
- Ok(Async::Ready((_, fd))) => {
- trace!(
- " recv_buf_fd: recv_buffer: {:?}, recv_fd: {:?}, fd: {:?}",
- self.recv_buffer,
- self.recv_fd,
- fd
- );
- if let Some(fd) = fd {
- self.recv_fd.push_back(
- unsafe { AutoCloseFd::from_raw_fd(fd) }
- );
- }
- },
- Ok(Async::NotReady) => bail!("Socket should be blocking."),
- // TODO: Handle dropped message.
- // Err(ref e) if e.kind() == std::io::ErrorKind::WouldBlock => panic!("wouldblock"),
- Err(e) => bail!("stream recv_buf_fd returned: {}", e.description()),
- }
- }
- }
-
- pub fn send<ST>(&mut self, msg: ST) -> Result<usize>
- where
- ST: Serialize + Debug,
- {
- self.send_with_fd::<ST, Connection>(msg, None)
- }
-
- pub fn send_with_fd<ST, FD>(&mut self, msg: ST, fd_to_send: Option<FD>) -> Result<usize>
- where
- ST: Serialize + Debug,
- FD: IntoRawFd + Debug,
- {
- trace!("send_with_fd {:?}, {:?}", msg, fd_to_send);
- try!(encode(&mut self.send_buffer, &msg));
- let fd_to_send = fd_to_send.map(|fd| fd.into_raw_fd());
- let send = self.send_buffer.take().freeze();
- self.stream.send_fd(send.as_ref(), fd_to_send).chain_err(
- || "Failed to send message with fd"
- )
- }
-}
-
-impl Evented for Connection {
- fn register(&self, poll: &Poll, token: Token, events: Ready, opts: PollOpt) -> io::Result<()> {
- EventedFd(&self.stream.as_raw_fd()).register(poll, token, events, opts)
- }
-
- fn reregister(&self, poll: &Poll, token: Token, events: Ready, opts: PollOpt) -> io::Result<()> {
- EventedFd(&self.stream.as_raw_fd()).reregister(poll, token, events, opts)
- }
-
- fn deregister(&self, poll: &Poll) -> io::Result<()> {
- EventedFd(&self.stream.as_raw_fd()).deregister(poll)
- }
-}
-
-impl Read for Connection {
- fn read(&mut self, bytes: &mut [u8]) -> io::Result<usize> {
- self.stream.read(bytes)
- }
-}
-
-// TODO: Is this required?
-impl<'a> Read for &'a Connection {
- fn read(&mut self, bytes: &mut [u8]) -> io::Result<usize> {
- (&self.stream).read(bytes)
- }
-}
-
-impl RecvFd for Connection {
- fn recv_fd(&mut self, buf_to_recv: &mut [u8]) -> io::Result<(usize, Option<RawFd>)> {
- self.stream.recv_fd(buf_to_recv)
- }
-}
-
-impl FromRawFd for Connection {
- unsafe fn from_raw_fd(fd: RawFd) -> Connection {
- Connection::new(net::UnixStream::from_raw_fd(fd))
- }
-}
-
-impl IntoRawFd for Connection {
- fn into_raw_fd(self) -> RawFd {
- self.stream.into_raw_fd()
- }
-}
-
-impl SendFd for Connection {
- fn send_fd(&mut self, buf_to_send: &[u8], fd_to_send: Option<RawFd>) -> io::Result<usize> {
- self.stream.send_fd(buf_to_send, fd_to_send)
- }
-}
new file mode 100644
--- /dev/null
+++ b/media/audioipc/audioipc/src/core.rs
@@ -0,0 +1,102 @@
+// Ease accessing reactor::Core handles.
+
+use futures::{Future, IntoFuture};
+use futures::sync::oneshot;
+use std::{fmt, io, thread};
+use std::sync::mpsc;
+use tokio_core::reactor::{Core, Handle, Remote};
+
+scoped_thread_local! {
+ static HANDLE: Handle
+}
+
+pub fn handle() -> Handle {
+ HANDLE.with(|handle| handle.clone())
+}
+
+pub fn spawn<F>(f: F)
+where
+ F: Future<Item = (), Error = ()> + 'static
+{
+ HANDLE.with(|handle| handle.spawn(f))
+}
+
+pub fn spawn_fn<F, R>(f: F)
+where
+ F: FnOnce() -> R + 'static,
+ R: IntoFuture<Item = (), Error = ()> + 'static
+{
+ HANDLE.with(|handle| handle.spawn_fn(f))
+}
+
+struct Inner {
+ join: thread::JoinHandle<()>,
+ shutdown: oneshot::Sender<()>
+}
+
+pub struct CoreThread {
+ inner: Option<Inner>,
+ remote: Remote
+}
+
+impl CoreThread {
+ pub fn remote(&self) -> Remote {
+ self.remote.clone()
+ }
+}
+
+impl Drop for CoreThread {
+ fn drop(&mut self) {
+ trace!("Shutting down {:?}", self);
+ if let Some(inner) = self.inner.take() {
+ let _ = inner.shutdown.send(());
+ drop(inner.join.join());
+ }
+ }
+}
+
+impl fmt::Debug for CoreThread {
+ fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
+ // f.debug_tuple("CoreThread").field(&"...").finish()
+ f.debug_tuple("CoreThread").field(&self.remote).finish()
+ }
+}
+
+pub fn spawn_thread<S, F>(name: S, f: F) -> io::Result<CoreThread>
+where
+ S: Into<String>,
+ F: FnOnce() -> io::Result<()> + Send + 'static
+{
+ let (shutdown_tx, shutdown_rx) = oneshot::channel::<()>();
+ let (remote_tx, remote_rx) = mpsc::channel::<Remote>();
+
+ let join = try!(thread::Builder::new().name(name.into()).spawn(move || {
+ let mut core = Core::new().expect("Failed to create reactor::Core");
+ let handle = core.handle();
+ let remote = handle.remote().clone();
+ drop(remote_tx.send(remote));
+
+ drop(HANDLE.set(&handle, || {
+ f().and_then(|_| {
+ let _ = core.run(shutdown_rx);
+ Ok(())
+ })
+ }));
+ trace!("thread shutdown...");
+ }));
+
+ let remote = try!(remote_rx.recv().or_else(|_| {
+ Err(io::Error::new(
+ io::ErrorKind::Other,
+ "Failed to receive remote handle from spawned thread"
+ ))
+ }));
+
+ Ok(CoreThread {
+ inner: Some(Inner {
+ join: join,
+ shutdown: shutdown_tx
+ }),
+ remote: remote
+ })
+}
new file mode 100644
--- /dev/null
+++ b/media/audioipc/audioipc/src/fd_passing.rs
@@ -0,0 +1,364 @@
+// Copyright © 2017 Mozilla Foundation
+//
+// This program is made available under an ISC-style license. See the
+// accompanying file LICENSE for details
+
+use async::{AsyncRecvMsg, AsyncSendMsg};
+use bytes::{Bytes, BytesMut, IntoBuf};
+use cmsg;
+use codec::Codec;
+use futures::{AsyncSink, Poll, Sink, StartSend, Stream};
+use libc;
+use messages::AssocRawFd;
+use std::{fmt, io, mem};
+use std::collections::VecDeque;
+use std::os::unix::io::RawFd;
+
+const INITIAL_CAPACITY: usize = 1024;
+const BACKPRESSURE_THRESHOLD: usize = 4 * INITIAL_CAPACITY;
+const FDS_CAPACITY: usize = 16;
+
+struct IncomingFds {
+ cmsg: BytesMut,
+ recv_fds: Option<cmsg::ControlMsgIter>
+}
+
+impl IncomingFds {
+ pub fn new(c: usize) -> Self {
+ let capacity = c * cmsg::space(mem::size_of::<[RawFd; 3]>());
+ IncomingFds {
+ cmsg: BytesMut::with_capacity(capacity),
+ recv_fds: None
+ }
+ }
+
+ pub fn take_fds(&mut self) -> Option<[RawFd; 3]> {
+ loop {
+ let fds = self.recv_fds
+ .as_mut()
+ .and_then(|recv_fds| recv_fds.next())
+ .and_then(|fds| Some(clone_into_array(&fds)));
+
+ if fds.is_some() {
+ return fds;
+ }
+
+ if self.cmsg.is_empty() {
+ return None;
+ }
+
+ self.recv_fds = Some(cmsg::iterator(self.cmsg.take().freeze()));
+ }
+ }
+
+ pub fn cmsg(&mut self) -> &mut BytesMut {
+ self.cmsg.reserve(cmsg::space(mem::size_of::<[RawFd; 3]>()));
+ &mut self.cmsg
+ }
+}
+
+#[derive(Debug)]
+struct Frame {
+ msgs: Bytes,
+ fds: Option<Bytes>
+}
+
+/// A unified `Stream` and `Sink` interface over an I/O object, using
+/// the `Codec` trait to encode and decode the payload.
+pub struct FramedWithFds<A, C> {
+ io: A,
+ codec: C,
+ // Stream
+ read_buf: BytesMut,
+ incoming_fds: IncomingFds,
+ is_readable: bool,
+ eof: bool,
+ // Sink
+ frames: VecDeque<Frame>,
+ write_buf: BytesMut,
+ outgoing_fds: BytesMut
+}
+
+impl<A, C> FramedWithFds<A, C>
+where
+ A: AsyncSendMsg
+{
+ // If there is a buffered frame, try to write it to `A`
+ fn do_write(&mut self) -> Poll<(), io::Error> {
+ debug!("do_write...");
+ // Create a frame from any pending message in `write_buf`.
+ if !self.write_buf.is_empty() {
+ self.set_frame(None);
+ }
+
+ trace!("pending frames: {:?}", self.frames);
+
+ let mut processed = 0;
+
+ loop {
+ let n = match self.frames.front() {
+ Some(frame) => {
+ trace!("sending msg {:?}, fds {:?}", frame.msgs, frame.fds);
+ let mut msgs = frame.msgs.clone().into_buf();
+ let mut fds = match frame.fds {
+ Some(ref fds) => fds.clone(),
+ None => Bytes::new()
+ }.into_buf();
+ try_ready!(self.io.send_msg_buf(&mut msgs, &fds))
+ },
+ _ => {
+ // No pending frames.
+ return Ok(().into());
+ }
+ };
+
+ match self.frames.pop_front() {
+ Some(mut frame) => {
+ processed += 1;
+
+ // Close any fds that have been sent. The fds are
+ // encoded in cmsg format inside frame.fds. Use
+ // the cmsg iterator to access msg and extract
+ // RawFds.
+ frame.fds.take().and_then(|cmsg| {
+ for fds in cmsg::iterator(cmsg) {
+ close_fds(&*fds)
+ }
+ Some(())
+ });
+
+ if n != frame.msgs.len() {
+ // If only part of the message was sent then
+ // re-queue the remaining message at the head
+ // of the queue. (Don't need to resend the fds
+ // since they've been sent with the first
+ // part.)
+ drop(frame.msgs.split_to(n));
+ self.frames.push_front(frame);
+ break;
+ }
+ },
+ _ => panic!()
+ }
+ }
+ debug!("process {} frames", processed);
+
+ trace!("pending frames: {:?}", self.frames);
+
+ Ok(().into())
+ }
+
+ fn set_frame(&mut self, fds: Option<Bytes>) {
+ if self.write_buf.is_empty() {
+ assert!(fds.is_none());
+ trace!("set_frame: No pending messages...");
+ return;
+ }
+
+ let msgs = self.write_buf.take().freeze();
+ trace!("set_frame: msgs={:?} fds={:?}", msgs, fds);
+
+ self.frames.push_back(Frame {
+ msgs,
+ fds
+ });
+ }
+}
+
+impl<A, C> Stream for FramedWithFds<A, C>
+where
+ A: AsyncRecvMsg,
+ C: Codec,
+ C::Out: AssocRawFd
+{
+ type Item = C::Out;
+ type Error = io::Error;
+
+ fn poll(&mut self) -> Poll<Option<Self::Item>, Self::Error> {
+ loop {
+ // Repeatedly call `decode` or `decode_eof` as long as it is
+ // "readable". Readable is defined as not having returned `None`. If
+ // the upstream has returned EOF, and the decoder is no longer
+ // readable, it can be assumed that the decoder will never become
+ // readable again, at which point the stream is terminated.
+ if self.is_readable {
+ if self.eof {
+ let mut item = try!(self.codec.decode_eof(&mut self.read_buf));
+ item.take_fd(|| self.incoming_fds.take_fds());
+ return Ok(Some(item).into());
+ }
+
+ trace!("attempting to decode a frame");
+
+ if let Some(mut item) = try!(self.codec.decode(&mut self.read_buf)) {
+ trace!("frame decoded from buffer");
+ item.take_fd(|| self.incoming_fds.take_fds());
+ return Ok(Some(item).into());
+ }
+
+ self.is_readable = false;
+ }
+
+ assert!(!self.eof);
+
+ // Otherwise, try to read more data and try again. Make sure we've
+ // got room for at least one byte to read to ensure that we don't
+ // get a spurious 0 that looks like EOF
+ let (n, _) = try_ready!(
+ self.io
+ .recv_msg_buf(&mut self.read_buf, self.incoming_fds.cmsg())
+ );
+
+ // if flags != 0 {
+ // error!("recv_msg_buf: flags = {:x}", flags)
+ // }
+
+ if n == 0 {
+ self.eof = true;
+ }
+
+ self.is_readable = true;
+ }
+ }
+}
+
+impl<A, C> Sink for FramedWithFds<A, C>
+where
+ A: AsyncSendMsg,
+ C: Codec,
+ C::In: AssocRawFd + fmt::Debug
+{
+ type SinkItem = C::In;
+ type SinkError = io::Error;
+
+ fn start_send(
+ &mut self,
+ item: Self::SinkItem
+ ) -> StartSend<Self::SinkItem, Self::SinkError> {
+ trace!("start_send: item={:?}", item);
+
+ // If the buffer is already over BACKPRESSURE_THRESHOLD,
+ // then attempt to flush it. If after flush it's *still*
+ // over BACKPRESSURE_THRESHOLD, then reject the send.
+ if self.write_buf.len() > BACKPRESSURE_THRESHOLD {
+ try!(self.poll_complete());
+ if self.write_buf.len() > BACKPRESSURE_THRESHOLD {
+ return Ok(AsyncSink::NotReady(item));
+ }
+ }
+
+ let fds = item.fd();
+ try!(self.codec.encode(item, &mut self.write_buf));
+ let fds = fds.and_then(|fds| {
+ cmsg::builder(&mut self.outgoing_fds)
+ .rights(&fds[..])
+ .finish()
+ .ok()
+ });
+
+ trace!("item fds: {:?}", fds);
+
+ if fds.is_some() {
+ // Enforce splitting sends on messages that contain file
+ // descriptors.
+ self.set_frame(fds);
+ }
+
+ Ok(AsyncSink::Ready)
+ }
+
+ fn poll_complete(&mut self) -> Poll<(), Self::SinkError> {
+ trace!("flushing framed transport");
+
+ try_ready!(self.do_write());
+
+ try_nb!(self.io.flush());
+
+ trace!("framed transport flushed");
+ Ok(().into())
+ }
+
+ fn close(&mut self) -> Poll<(), Self::SinkError> {
+ try_ready!(self.poll_complete());
+ self.io.shutdown()
+ }
+}
+
+pub fn framed_with_fds<A, C>(io: A, codec: C) -> FramedWithFds<A, C> {
+ FramedWithFds {
+ io: io,
+ codec: codec,
+ read_buf: BytesMut::with_capacity(INITIAL_CAPACITY),
+ incoming_fds: IncomingFds::new(FDS_CAPACITY),
+ is_readable: false,
+ eof: false,
+ frames: VecDeque::new(),
+ write_buf: BytesMut::with_capacity(INITIAL_CAPACITY),
+ outgoing_fds: BytesMut::with_capacity(
+ FDS_CAPACITY * cmsg::space(mem::size_of::<[RawFd; 3]>())
+ )
+ }
+}
+
+fn write_zero() -> io::Error {
+ io::Error::new(io::ErrorKind::WriteZero, "failed to write frame to io")
+}
+
+fn clone_into_array<A, T>(slice: &[T]) -> A
+where
+ A: Sized + Default + AsMut<[T]>,
+ T: Clone
+{
+ let mut a = Default::default();
+ <A as AsMut<[T]>>::as_mut(&mut a).clone_from_slice(slice);
+ a
+}
+
+fn close_fds(fds: &[RawFd]) {
+ for fd in fds {
+ unsafe {
+ libc::close(*fd);
+ }
+ }
+}
+
+#[cfg(test)]
+mod tests {
+ use bytes::BufMut;
+
+ const CMSG_BYTES: &[u8] =
+ b"\x1c\0\0\0\0\0\0\0\x01\0\0\0\x01\0\0\02\0\0\0[\0\0\0\\\0\0\0\xe5\xe5\xe5\xe5";
+
+ #[test]
+ fn single_cmsg() {
+ let mut incoming = super::IncomingFds::new(16);
+
+ incoming.cmsg().put_slice(CMSG_BYTES);
+ assert!(incoming.take_fds().is_some());
+ assert!(incoming.take_fds().is_none());
+ }
+
+ #[test]
+ fn multiple_cmsg_1() {
+ let mut incoming = super::IncomingFds::new(16);
+
+ incoming.cmsg().put_slice(CMSG_BYTES);
+ assert!(incoming.take_fds().is_some());
+ incoming.cmsg().put_slice(CMSG_BYTES);
+ assert!(incoming.take_fds().is_some());
+ assert!(incoming.take_fds().is_none());
+ }
+
+ #[test]
+ fn multiple_cmsg_2() {
+ let mut incoming = super::IncomingFds::new(16);
+
+ incoming.cmsg().put_slice(CMSG_BYTES);
+ incoming.cmsg().put_slice(CMSG_BYTES);
+ assert!(incoming.take_fds().is_some());
+ incoming.cmsg().put_slice(CMSG_BYTES);
+ assert!(incoming.take_fds().is_some());
+ assert!(incoming.take_fds().is_some());
+ assert!(incoming.take_fds().is_none());
+ }
+}
new file mode 100644
--- /dev/null
+++ b/media/audioipc/audioipc/src/frame.rs
@@ -0,0 +1,164 @@
+// Copyright © 2017 Mozilla Foundation
+//
+// This program is made available under an ISC-style license. See the
+// accompanying file LICENSE for details
+
+use bytes::{Buf, Bytes, BytesMut, IntoBuf};
+use codec::Codec;
+use futures::{AsyncSink, Poll, Sink, StartSend, Stream};
+use std::io;
+use tokio_io::{AsyncRead, AsyncWrite};
+
+const INITIAL_CAPACITY: usize = 1024;
+const BACKPRESSURE_THRESHOLD: usize = 4 * INITIAL_CAPACITY;
+
+/// A unified `Stream` and `Sink` interface over an I/O object, using
+/// the `Codec` trait to encode and decode the payload.
+pub struct Framed<A, C> {
+ io: A,
+ codec: C,
+ read_buf: BytesMut,
+ write_buf: BytesMut,
+ frame: Option<<Bytes as IntoBuf>::Buf>,
+ is_readable: bool,
+ eof: bool
+}
+
+impl<A, C> Framed<A, C>
+where
+ A: AsyncWrite,
+{
+ // If there is a buffered frame, try to write it to `A`
+ fn do_write(&mut self) -> Poll<(), io::Error> {
+ loop {
+ if self.frame.is_none() {
+ self.set_frame();
+ }
+
+ if self.frame.is_none() {
+ return Ok(().into());
+ }
+
+ let done = {
+ let frame = self.frame.as_mut().unwrap();
+ try_ready!(self.io.write_buf(frame));
+ !frame.has_remaining()
+ };
+
+ if done {
+ self.frame = None;
+ }
+ }
+ }
+
+ fn set_frame(&mut self) {
+ if self.write_buf.is_empty() {
+ return;
+ }
+
+ debug_assert!(self.frame.is_none());
+
+ self.frame = Some(self.write_buf.take().freeze().into_buf());
+ }
+}
+
+impl<A, C> Stream for Framed<A, C>
+where
+ A: AsyncRead,
+ C: Codec,
+{
+ type Item = C::Out;
+ type Error = io::Error;
+
+ fn poll(&mut self) -> Poll<Option<Self::Item>, Self::Error> {
+ loop {
+ // Repeatedly call `decode` or `decode_eof` as long as it is
+ // "readable". Readable is defined as not having returned `None`. If
+ // the upstream has returned EOF, and the decoder is no longer
+ // readable, it can be assumed that the decoder will never become
+ // readable again, at which point the stream is terminated.
+ if self.is_readable {
+ if self.eof {
+ let frame = try!(self.codec.decode_eof(&mut self.read_buf));
+ return Ok(Some(frame).into());
+ }
+
+ trace!("attempting to decode a frame");
+
+ if let Some(frame) = try!(self.codec.decode(&mut self.read_buf)) {
+ trace!("frame decoded from buffer");
+ return Ok(Some(frame).into());
+ }
+
+ self.is_readable = false;
+ }
+
+ assert!(!self.eof);
+
+ // Otherwise, try to read more data and try again. Make sure we've
+ // got room for at least one byte to read to ensure that we don't
+ // get a spurious 0 that looks like EOF
+ if try_ready!(self.io.read_buf(&mut self.read_buf)) == 0 {
+ self.eof = true;
+ }
+
+ self.is_readable = true;
+ }
+ }
+}
+
+impl<A, C> Sink for Framed<A, C>
+where
+ A: AsyncWrite,
+ C: Codec,
+{
+ type SinkItem = C::In;
+ type SinkError = io::Error;
+
+ fn start_send(&mut self, item: Self::SinkItem) -> StartSend<Self::SinkItem, Self::SinkError> {
+ // If the buffer is already over BACKPRESSURE_THRESHOLD,
+ // then attempt to flush it. If after flush it's *still*
+ // over BACKPRESSURE_THRESHOLD, then reject the send.
+ if self.write_buf.len() > BACKPRESSURE_THRESHOLD {
+ try!(self.poll_complete());
+ if self.write_buf.len() > BACKPRESSURE_THRESHOLD {
+ return Ok(AsyncSink::NotReady(item));
+ }
+ }
+
+ try!(self.codec.encode(item, &mut self.write_buf));
+ Ok(AsyncSink::Ready)
+ }
+
+ fn poll_complete(&mut self) -> Poll<(), Self::SinkError> {
+ trace!("flushing framed transport");
+
+ try_ready!(self.do_write());
+
+ try_nb!(self.io.flush());
+
+ trace!("framed transport flushed");
+ Ok(().into())
+ }
+
+ fn close(&mut self) -> Poll<(), Self::SinkError> {
+ try_ready!(self.poll_complete());
+ self.io.shutdown()
+ }
+}
+
+fn write_zero() -> io::Error {
+ io::Error::new(io::ErrorKind::WriteZero, "failed to write frame to io")
+}
+
+pub fn framed<A, C>(io: A, codec: C) -> Framed<A, C> {
+ Framed {
+ io: io,
+ codec: codec,
+ read_buf: BytesMut::with_capacity(INITIAL_CAPACITY),
+ write_buf: BytesMut::with_capacity(INITIAL_CAPACITY),
+ frame: None,
+ is_readable: false,
+ eof: false
+ }
+}
--- a/media/audioipc/audioipc/src/lib.rs
+++ b/media/audioipc/audioipc/src/lib.rs
@@ -12,71 +12,74 @@ extern crate error_chain;
extern crate log;
#[macro_use]
extern crate serde_derive;
extern crate bincode;
extern crate bytes;
extern crate cubeb_core;
+#[macro_use]
+extern crate futures;
+extern crate iovec;
extern crate libc;
extern crate memmap;
-extern crate mio;
-extern crate mio_uds;
extern crate serde;
+#[macro_use]
+extern crate scoped_tls;
+extern crate tokio_core;
+#[macro_use]
+extern crate tokio_io;
+extern crate tokio_uds;
pub mod async;
+pub mod cmsg;
pub mod codec;
-mod connection;
pub mod errors;
+pub mod fd_passing;
+pub mod frame;
+pub mod rpc;
+pub mod core;
pub mod messages;
mod msg;
pub mod shm;
-pub use connection::*;
+use iovec::IoVec;
+
+#[cfg(target_os = "linux")]
+use libc::MSG_CMSG_CLOEXEC;
pub use messages::{ClientMessage, ServerMessage};
use std::env::temp_dir;
use std::io;
use std::os::unix::io::{AsRawFd, FromRawFd, IntoRawFd, RawFd};
-use std::os::unix::net;
use std::path::PathBuf;
+#[cfg(not(target_os = "linux"))]
+const MSG_CMSG_CLOEXEC: libc::c_int = 0;
// Extend sys::os::unix::net::UnixStream to support sending and receiving a single file desc.
// We can extend UnixStream by using traits, eliminating the need to introduce a new wrapped
// UnixStream type.
-pub trait RecvFd {
- fn recv_fd(&mut self, bytes: &mut [u8]) -> io::Result<(usize, Option<RawFd>)>;
+pub trait RecvMsg {
+ fn recv_msg(&mut self, iov: &mut [&mut IoVec], cmsg: &mut [u8]) -> io::Result<(usize, usize, i32)>;
}
-pub trait SendFd {
- fn send_fd(&mut self, bytes: &[u8], fd: Option<RawFd>) -> io::Result<(usize)>;
+pub trait SendMsg {
+ fn send_msg(&mut self, iov: &[&IoVec], cmsg: &[u8]) -> io::Result<usize>;
}
-impl RecvFd for net::UnixStream {
- fn recv_fd(&mut self, buf_to_recv: &mut [u8]) -> io::Result<(usize, Option<RawFd>)> {
- msg::recvmsg(self.as_raw_fd(), buf_to_recv)
+impl<T: AsRawFd> RecvMsg for T {
+ fn recv_msg(&mut self, iov: &mut [&mut IoVec], cmsg: &mut [u8]) -> io::Result<(usize, usize, i32)> {
+ msg::recv_msg_with_flags(self.as_raw_fd(), iov, cmsg, MSG_CMSG_CLOEXEC)
}
}
-impl RecvFd for mio_uds::UnixStream {
- fn recv_fd(&mut self, buf_to_recv: &mut [u8]) -> io::Result<(usize, Option<RawFd>)> {
- msg::recvmsg(self.as_raw_fd(), buf_to_recv)
- }
-}
-
-impl SendFd for net::UnixStream {
- fn send_fd(&mut self, buf_to_send: &[u8], fd_to_send: Option<RawFd>) -> io::Result<usize> {
- msg::sendmsg(self.as_raw_fd(), buf_to_send, fd_to_send)
- }
-}
-
-impl SendFd for mio_uds::UnixStream {
- fn send_fd(&mut self, buf_to_send: &[u8], fd_to_send: Option<RawFd>) -> io::Result<usize> {
- msg::sendmsg(self.as_raw_fd(), buf_to_send, fd_to_send)
+impl<T: AsRawFd> SendMsg for T {
+ fn send_msg(&mut self, iov: &[&IoVec], cmsg: &[u8]) -> io::Result<usize> {
+ msg::send_msg_with_flags(self.as_raw_fd(), iov, cmsg, 0)
}
}
////////////////////////////////////////////////////////////////////////////////
#[derive(Debug)]
pub struct AutoCloseFd(RawFd);
--- a/media/audioipc/audioipc/src/messages.rs
+++ b/media/audioipc/audioipc/src/messages.rs
@@ -1,16 +1,17 @@
// Copyright © 2017 Mozilla Foundation
//
// This program is made available under an ISC-style license. See the
// accompanying file LICENSE for details
use cubeb_core::{self, ffi};
use std::ffi::{CStr, CString};
use std::os::raw::c_char;
+use std::os::unix::io::RawFd;
use std::ptr;
#[derive(Debug, Serialize, Deserialize)]
pub struct Device {
pub output_name: Option<Vec<u8>>,
pub input_name: Option<Vec<u8>>
}
@@ -164,29 +165,33 @@ fn dup_str(s: *const c_char) -> Option<V
} else {
let vec: Vec<u8> = unsafe { CStr::from_ptr(s) }.to_bytes().to_vec();
Some(vec)
}
}
fn opt_str(v: Option<Vec<u8>>) -> *const c_char {
match v {
- Some(v) => {
- match CString::new(v) {
- Ok(s) => s.into_raw(),
- Err(_) => {
- debug!("Failed to convert bytes to CString");
- ptr::null()
- },
+ Some(v) => match CString::new(v) {
+ Ok(s) => s.into_raw(),
+ Err(_) => {
+ debug!("Failed to convert bytes to CString");
+ ptr::null()
}
},
- None => ptr::null(),
+ None => ptr::null()
}
}
+#[derive(Debug, Serialize, Deserialize)]
+pub struct StreamCreate {
+ pub token: usize,
+ pub fds: [RawFd; 3]
+}
+
// Client -> Server messages.
// TODO: Callbacks should be different messages types so
// ServerConn::process_msg doesn't have a catch-all case.
#[derive(Debug, Serialize, Deserialize)]
pub enum ServerMessage {
ClientConnect,
ClientDisconnect,
@@ -202,47 +207,81 @@ pub enum ServerMessage {
StreamStart(usize),
StreamStop(usize),
StreamResetDefaultDevice(usize),
StreamGetPosition(usize),
StreamGetLatency(usize),
StreamSetVolume(usize, f32),
StreamSetPanning(usize, f32),
- StreamGetCurrentDevice(usize),
-
- StreamDataCallback(isize),
- StreamStateCallback
+ StreamGetCurrentDevice(usize)
}
// Server -> Client messages.
// TODO: Streams need id.
#[derive(Debug, Serialize, Deserialize)]
pub enum ClientMessage {
ClientConnected,
ClientDisconnected,
ContextBackendId(),
ContextMaxChannelCount(u32),
ContextMinLatency(u32),
ContextPreferredSampleRate(u32),
ContextPreferredChannelLayout(ffi::cubeb_channel_layout),
ContextEnumeratedDevices(Vec<DeviceInfo>),
- StreamCreated(usize), /*(RawFd)*/
- StreamCreatedInputShm, /*(RawFd)*/
- StreamCreatedOutputShm, /*(RawFd)*/
+ StreamCreated(StreamCreate),
StreamDestroyed,
StreamStarted,
StreamStopped,
StreamDefaultDeviceReset,
StreamPosition(u64),
StreamLatency(u32),
StreamVolumeSet,
StreamPanningSet,
StreamCurrentDevice(Device),
- StreamDataCallback(isize, usize),
- StreamStateCallback(ffi::cubeb_state),
-
Error(ffi::cubeb_error_code)
}
+
+#[derive(Debug, Deserialize, Serialize)]
+pub enum CallbackReq {
+ Data(isize, usize),
+ State(ffi::cubeb_state)
+}
+
+#[derive(Debug, Deserialize, Serialize)]
+pub enum CallbackResp {
+ Data(isize),
+ State
+}
+
+pub trait AssocRawFd {
+ fn fd(&self) -> Option<[RawFd; 3]> {
+ None
+ }
+ fn take_fd<F>(&mut self, _: F)
+ where
+ F: FnOnce() -> Option<[RawFd; 3]>
+ {
+ }
+}
+
+impl AssocRawFd for ServerMessage {}
+impl AssocRawFd for ClientMessage {
+ fn fd(&self) -> Option<[RawFd; 3]> {
+ match *self {
+ ClientMessage::StreamCreated(ref data) => Some(data.fds),
+ _ => None
+ }
+ }
+
+ fn take_fd<F>(&mut self, f: F)
+ where
+ F: FnOnce() -> Option<[RawFd; 3]>
+ {
+ if let ClientMessage::StreamCreated(ref mut data) = *self {
+ data.fds = f().unwrap();
+ }
+ }
+}
--- a/media/audioipc/audioipc/src/msg.rs
+++ b/media/audioipc/audioipc/src/msg.rs
@@ -1,170 +1,80 @@
+use iovec::IoVec;
+use iovec::unix as iovec;
use libc;
-use std::io;
-use std::mem;
+use std::{cmp, io, mem, ptr};
use std::os::unix::io::RawFd;
-use std::ptr;
fn cvt(r: libc::ssize_t) -> io::Result<usize> {
if r == -1 {
Err(io::Error::last_os_error())
} else {
Ok(r as usize)
}
}
// Convert return of -1 into error message, handling retry on EINTR
fn cvt_r<F: FnMut() -> libc::ssize_t>(mut f: F) -> io::Result<usize> {
loop {
match cvt(f()) {
Err(ref e) if e.kind() == io::ErrorKind::Interrupted => {},
- other => return other,
+ other => return other
}
}
}
-// Note: The following fields must be laid out together, the OS expects them
-// to be part of a single allocation.
-#[repr(C)]
-struct CmsgSpace {
- cmsghdr: libc::cmsghdr,
- #[cfg(not(target_os = "macos"))]
- __padding: [usize; 0],
- data: libc::c_int,
-}
-
-#[cfg(not(target_os = "macos"))]
-fn cmsg_align(len: usize) -> usize {
- let align_bytes = mem::size_of::<usize>() - 1;
- (len + align_bytes) & !align_bytes
-}
-
-#[cfg(target_os = "macos")]
-fn cmsg_align(len: usize) -> usize {
- len
-}
-
-fn cmsg_space() -> usize {
- mem::size_of::<CmsgSpace>()
-}
+pub fn recv_msg_with_flags(
+ socket: RawFd,
+ bufs: &mut [&mut IoVec],
+ cmsg: &mut [u8],
+ flags: libc::c_int
+) -> io::Result<(usize, usize, libc::c_int)> {
+ let slice = iovec::as_os_slice_mut(bufs);
+ let len = cmp::min(<libc::c_int>::max_value() as usize, slice.len());
+ let (control, controllen) = if cmsg.is_empty() {
+ (ptr::null_mut(), 0)
+ } else {
+ (cmsg.as_ptr() as *mut _, cmsg.len())
+ };
-fn cmsg_len() -> usize {
- cmsg_align(mem::size_of::<libc::cmsghdr>()) + mem::size_of::<libc::c_int>()
-}
-
-pub fn sendmsg(fd: RawFd, to_send: &[u8], fd_to_send: Option<RawFd>) -> io::Result<usize> {
let mut msghdr: libc::msghdr = unsafe { mem::zeroed() };
- let mut iovec: libc::iovec = unsafe { mem::zeroed() };
- let mut cmsg: CmsgSpace = unsafe { mem::zeroed() };
-
- msghdr.msg_iov = &mut iovec as *mut _;
- msghdr.msg_iovlen = 1;
- if fd_to_send.is_some() {
- msghdr.msg_control = &mut cmsg.cmsghdr as *mut _ as *mut _;
- msghdr.msg_controllen = cmsg_space() as _;
- }
+ msghdr.msg_name = ptr::null_mut();
+ msghdr.msg_namelen = 0;
+ msghdr.msg_iov = slice.as_mut_ptr();
+ msghdr.msg_iovlen = len as _;
+ msghdr.msg_control = control;
+ msghdr.msg_controllen = controllen as _;
- iovec.iov_base = if to_send.is_empty() {
- // Empty Vecs have a non-null pointer.
- ptr::null_mut()
- } else {
- to_send.as_ptr() as *const _ as *mut _
- };
- iovec.iov_len = to_send.len();
+ let n = try!(cvt_r(|| unsafe {
+ libc::recvmsg(socket, &mut msghdr as *mut _, flags)
+ }));
- cmsg.cmsghdr.cmsg_len = cmsg_len() as _;
- cmsg.cmsghdr.cmsg_level = libc::SOL_SOCKET;
- cmsg.cmsghdr.cmsg_type = libc::SCM_RIGHTS;
-
- cmsg.data = fd_to_send.unwrap_or(-1);
-
- cvt_r(|| unsafe { libc::sendmsg(fd, &msghdr, 0) })
+ let controllen = msghdr.msg_controllen as usize;
+ Ok((n, controllen, msghdr.msg_flags))
}
-pub fn recvmsg(fd: RawFd, to_recv: &mut [u8]) -> io::Result<(usize, Option<RawFd>)> {
- let mut msghdr: libc::msghdr = unsafe { mem::zeroed() };
- let mut iovec: libc::iovec = unsafe { mem::zeroed() };
- let mut cmsg: CmsgSpace = unsafe { mem::zeroed() };
-
- msghdr.msg_iov = &mut iovec as *mut _;
- msghdr.msg_iovlen = 1;
- msghdr.msg_control = &mut cmsg.cmsghdr as *mut _ as *mut _;
- msghdr.msg_controllen = cmsg_space() as _;
-
- iovec.iov_base = if to_recv.is_empty() {
- // Empty Vecs have a non-null pointer.
- ptr::null_mut()
+pub fn send_msg_with_flags(
+ socket: RawFd,
+ bufs: &[&IoVec],
+ cmsg: &[u8],
+ flags: libc::c_int
+) -> io::Result<usize> {
+ let slice = iovec::as_os_slice(bufs);
+ let len = cmp::min(<libc::c_int>::max_value() as usize, slice.len());
+ let (control, controllen) = if cmsg.is_empty() {
+ (ptr::null_mut(), 0)
} else {
- to_recv.as_ptr() as *const _ as *mut _
- };
- iovec.iov_len = to_recv.len();
-
- let result = try!(cvt_r(|| unsafe { libc::recvmsg(fd, &mut msghdr, 0) }));
- let fd = if msghdr.msg_controllen == cmsg_space() as _ &&
- cmsg.cmsghdr.cmsg_len == cmsg_len() as _ &&
- cmsg.cmsghdr.cmsg_level == libc::SOL_SOCKET &&
- cmsg.cmsghdr.cmsg_type == libc::SCM_RIGHTS {
- Some(cmsg.data)
- } else {
- None
+ (cmsg.as_ptr() as *mut _, cmsg.len())
};
- Ok((result as usize, fd))
-}
-
-#[cfg(test)]
-mod tests {
- use libc;
- use std::mem;
- use std::os::unix::net::UnixStream;
- use std::os::unix::io::{AsRawFd, FromRawFd, IntoRawFd};
- use std::io::{Read, Write};
- use super::{cmsg_len, cmsg_space, sendmsg, recvmsg};
+ let mut msghdr: libc::msghdr = unsafe { mem::zeroed() };
+ msghdr.msg_name = ptr::null_mut();
+ msghdr.msg_namelen = 0;
+ msghdr.msg_iov = slice.as_ptr() as *mut _;
+ msghdr.msg_iovlen = len as _;
+ msghdr.msg_control = control;
+ msghdr.msg_controllen = controllen as _;
- #[test]
- fn portable_sizes() {
- if cfg!(all(target_os = "linux", target_pointer_width = "64")) {
- assert_eq!(mem::size_of::<libc::cmsghdr>(), 16);
- assert_eq!(cmsg_len(), 20);
- assert_eq!(cmsg_space(), 24);
- } else if cfg!(all(target_os = "linux", target_pointer_width = "32")) {
- assert_eq!(mem::size_of::<libc::cmsghdr>(), 12);
- assert_eq!(cmsg_len(), 16);
- assert_eq!(cmsg_space(), 16);
- } else if cfg!(target_os = "macos") {
- assert_eq!(mem::size_of::<libc::cmsghdr>(), 12);
- assert_eq!(cmsg_len(), 16);
- assert_eq!(cmsg_space(), 16);
- } else if cfg!(target_pointer_width = "64") {
- assert_eq!(mem::size_of::<libc::cmsghdr>(), 12);
- assert_eq!(cmsg_len(), 20);
- assert_eq!(cmsg_space(), 24);
- } else {
- assert_eq!(mem::size_of::<libc::cmsghdr>(), 12);
- assert_eq!(cmsg_len(), 16);
- assert_eq!(cmsg_space(), 16);
- }
- }
-
- #[test]
- fn fd_passing() {
- let (tx, rx) = UnixStream::pair().unwrap();
-
- let (send_tx, mut send_rx) = UnixStream::pair().unwrap();
-
- let fd = send_tx.into_raw_fd();
- assert_eq!(sendmsg(tx.as_raw_fd(), b"a", Some(fd)).unwrap(), 1);
- unsafe { libc::close(fd) };
-
- let mut buf = [0u8];
- let (got, fd) = recvmsg(rx.as_raw_fd(), &mut buf).unwrap();
- assert_eq!(got, 1);
- assert_eq!(&buf, b"a");
-
- let mut send_tx = unsafe { UnixStream::from_raw_fd(fd.unwrap()) };
- assert_eq!(send_tx.write(b"b").unwrap(), 1);
-
- let mut buf = [0u8];
- assert_eq!(send_rx.read(&mut buf).unwrap(), 1);
- assert_eq!(&buf, b"b");
- }
+ cvt_r(|| unsafe {
+ libc::sendmsg(socket, &msghdr as *const _, flags)
+ })
}
new file mode 100644
--- /dev/null
+++ b/media/audioipc/audioipc/src/rpc/client/mod.rs
@@ -0,0 +1,158 @@
+// This is a derived version of simple/pipeline/client.rs from
+// tokio_proto crate used under MIT license.
+//
+// Original version of client.rs:
+// https://github.com/tokio-rs/tokio-proto/commit/8fb8e482dcd55cf02ceee165f8e08eee799c96d3
+//
+// The following modifications were made:
+// * Simplify the code to implement RPC for pipeline requests that
+// contain simple request/response messages:
+// * Remove `Error` types,
+// * Remove `bind_transport` fn & `BindTransport` type,
+// * Remove all "Lift"ing functionality.
+// * Remove `Service` trait since audioipc doesn't use `tokio_service`
+// crate.
+//
+// Copyright (c) 2016 Tokio contributors
+//
+// Permission is hereby granted, free of charge, to any
+// person obtaining a copy of this software and associated
+// documentation files (the "Software"), to deal in the
+// Software without restriction, including without
+// limitation the rights to use, copy, modify, merge,
+// publish, distribute, sublicense, and/or sell copies of
+// the Software, and to permit persons to whom the Software
+// is furnished to do so, subject to the following
+// conditions:
+//
+// The above copyright notice and this permission notice
+// shall be included in all copies or substantial portions
+// of the Software.
+//
+// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF
+// ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED
+// TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A
+// PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT
+// SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY
+// CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION
+// OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR
+// IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
+// DEALINGS IN THE SOFTWARE.
+
+use futures::{Async, Future, Poll, Sink, Stream};
+use futures::sync::oneshot;
+use rpc::Handler;
+use rpc::driver::Driver;
+use std::collections::VecDeque;
+use std::io;
+use tokio_core::reactor::Handle;
+
+mod proxy;
+
+pub use self::proxy::{ClientProxy, Response};
+
+pub fn bind_client<C>(
+ transport: C::Transport,
+ handle: &Handle
+) -> proxy::ClientProxy<C::Request, C::Response>
+where
+ C: Client
+{
+ let (tx, rx) = proxy::channel();
+
+ let fut = {
+ let handler = ClientHandler::<C> {
+ transport: transport,
+ requests: rx,
+ in_flight: VecDeque::with_capacity(32)
+ };
+ Driver::new(handler)
+ };
+
+ // Spawn the RPC driver into task
+ handle.spawn(Box::new(fut.map_err(|_| ())));
+
+ tx
+}
+
+pub trait Client: 'static {
+ /// Request
+ type Request: 'static;
+
+ /// Response
+ type Response: 'static;
+
+ /// The message transport, which works with async I/O objects of type `A`
+ type Transport: 'static
+ + Stream<Item = Self::Response, Error = io::Error>
+ + Sink<SinkItem = Self::Request, SinkError = io::Error>;
+}
+
+////////////////////////////////////////////////////////////////////////////////
+
+struct ClientHandler<C>
+where
+ C: Client
+{
+ transport: C::Transport,
+ requests: proxy::Receiver<C::Request, C::Response>,
+ in_flight: VecDeque<oneshot::Sender<C::Response>>
+}
+
+impl<C> Handler for ClientHandler<C>
+where
+ C: Client
+{
+ type In = C::Response;
+ type Out = C::Request;
+ type Transport = C::Transport;
+
+ fn transport(&mut self) -> &mut Self::Transport {
+ &mut self.transport
+ }
+
+ fn consume(&mut self, response: Self::In) -> io::Result<()> {
+ trace!("ClientHandler::consume");
+ if let Some(complete) = self.in_flight.pop_front() {
+ drop(complete.send(response));
+ } else {
+ return Err(io::Error::new(
+ io::ErrorKind::Other,
+ "request / response mismatch"
+ ));
+ }
+
+ Ok(())
+ }
+
+ /// Produce a message
+ fn produce(&mut self) -> Poll<Option<Self::Out>, io::Error> {
+ trace!("ClientHandler::produce");
+
+ // Try to get a new request
+ match self.requests.poll() {
+ Ok(Async::Ready(Some((request, complete)))) => {
+ trace!(" --> received request");
+
+ // Track complete handle
+ self.in_flight.push_back(complete);
+
+ Ok(Some(request).into())
+ },
+ Ok(Async::Ready(None)) => {
+ trace!(" --> client dropped");
+ Ok(None.into())
+ },
+ Ok(Async::NotReady) => {
+ trace!(" --> not ready");
+ Ok(Async::NotReady)
+ },
+ Err(_) => unreachable!()
+ }
+ }
+
+ /// RPC currently in flight
+ fn has_in_flight(&self) -> bool {
+ !self.in_flight.is_empty()
+ }
+}
new file mode 100644
--- /dev/null
+++ b/media/audioipc/audioipc/src/rpc/client/proxy.rs
@@ -0,0 +1,140 @@
+// This is a derived version of client_proxy.rs from
+// tokio_proto crate used under MIT license.
+//
+// Original version of client_proxy.rs:
+// https://github.com/tokio-rs/tokio-proto/commit/8fb8e482dcd55cf02ceee165f8e08eee799c96d3
+//
+// The following modifications were made:
+// * Remove `Service` trait since audioipc doesn't use `tokio_service`
+// crate.
+// * Remove `RefCell` from `ClientProxy` since cubeb is called from
+// multiple threads. `mpsc::UnboundedSender` is thread safe.
+// * Simplify the interface to just request (`R`) and response (`Q`)
+// removing error (`E`).
+// * Remove the `Envelope` type.
+// * Renamed `pair` to `channel` to represent that an `rpc::channel`
+// is being created.
+//
+// Original License:
+//
+// Copyright (c) 2016 Tokio contributors
+//
+// Permission is hereby granted, free of charge, to any
+// person obtaining a copy of this software and associated
+// documentation files (the "Software"), to deal in the
+// Software without restriction, including without
+// limitation the rights to use, copy, modify, merge,
+// publish, distribute, sublicense, and/or sell copies of
+// the Software, and to permit persons to whom the Software
+// is furnished to do so, subject to the following
+// conditions:
+//
+// The above copyright notice and this permission notice
+// shall be included in all copies or substantial portions
+// of the Software.
+//
+// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF
+// ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED
+// TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A
+// PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT
+// SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY
+// CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION
+// OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR
+// IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
+// DEALINGS IN THE SOFTWARE.
+
+use futures::{Async, Future, Poll};
+use futures::sync::{mpsc, oneshot};
+use std::fmt;
+use std::io;
+
+/// Message used to dispatch requests to the task managing the
+/// client connection.
+pub type Request<R, Q> = (R, oneshot::Sender<Q>);
+
+/// Receive requests submitted to the client
+pub type Receiver<R, Q> = mpsc::UnboundedReceiver<Request<R, Q>>;
+
+/// Response future returned from a client
+pub struct Response<Q> {
+ inner: oneshot::Receiver<Q>
+}
+
+pub struct ClientProxy<R, Q> {
+ tx: mpsc::UnboundedSender<Request<R, Q>>
+}
+
+impl<R, Q> Clone for ClientProxy<R, Q> {
+ fn clone(&self) -> Self {
+ ClientProxy {
+ tx: self.tx.clone()
+ }
+ }
+}
+
+pub fn channel<R, Q>() -> (ClientProxy<R, Q>, Receiver<R, Q>) {
+ // Create a channel to send the requests to client-side of rpc.
+ let (tx, rx) = mpsc::unbounded();
+
+ // Wrap the `tx` part in ClientProxy so the rpc call interface
+ // can be implemented.
+ let client = ClientProxy {
+ tx
+ };
+
+ (client, rx)
+}
+
+impl<R, Q> ClientProxy<R, Q> {
+ pub fn call(&self, request: R) -> Response<Q> {
+ // The response to returned from the rpc client task over a
+ // oneshot channel.
+ let (tx, rx) = oneshot::channel();
+
+ // If send returns an Err, its because the other side has been dropped.
+ // By ignoring it, we are just dropping the `tx`, which will mean the
+ // rx will return Canceled when polled. In turn, that is translated
+ // into a BrokenPipe, which conveys the proper error.
+ let _ = self.tx.send((request, tx));
+
+ Response {
+ inner: rx
+ }
+ }
+}
+
+impl<R, Q> fmt::Debug for ClientProxy<R, Q>
+where
+ R: fmt::Debug,
+ Q: fmt::Debug
+{
+ fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
+ write!(f, "ClientProxy {{ ... }}")
+ }
+}
+
+impl<Q> Future for Response<Q> {
+ type Item = Q;
+ type Error = io::Error;
+
+ fn poll(&mut self) -> Poll<Q, io::Error> {
+ match self.inner.poll() {
+ Ok(Async::Ready(res)) => Ok(Async::Ready(res)),
+ Ok(Async::NotReady) => Ok(Async::NotReady),
+ // Convert oneshot::Canceled into io::Error
+ Err(_) => {
+ let e = io::Error::new(io::ErrorKind::BrokenPipe, "broken pipe");
+ Err(e)
+ }
+ }
+ }
+}
+
+impl<Q> fmt::Debug for Response<Q>
+where
+ Q: fmt::Debug
+{
+ fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
+ write!(f, "Response {{ ... }}")
+ }
+}
new file mode 100644
--- /dev/null
+++ b/media/audioipc/audioipc/src/rpc/driver.rs
@@ -0,0 +1,171 @@
+// Copyright © 2017 Mozilla Foundation
+//
+// This program is made available under an ISC-style license. See the
+// accompanying file LICENSE for details
+
+use futures::{Async, AsyncSink, Future, Poll, Sink, Stream};
+use rpc::Handler;
+use std::fmt;
+use std::io;
+
+pub struct Driver<T>
+where
+ T: Handler
+{
+ // Glue
+ handler: T,
+
+ // True as long as the connection has more request frames to read.
+ run: bool,
+
+ // True when the transport is fully flushed
+ is_flushed: bool
+}
+
+impl<T> Driver<T>
+where
+ T: Handler
+{
+ /// Create a new rpc driver with the given service and transport.
+ pub fn new(handler: T) -> Driver<T> {
+ Driver {
+ handler: handler,
+ run: true,
+ is_flushed: true
+ }
+ }
+
+ /// Returns true if the driver has nothing left to do
+ fn is_done(&self) -> bool {
+ !self.run && self.is_flushed && !self.has_in_flight()
+ }
+
+ /// Process incoming messages off the transport.
+ fn receive_incoming(&mut self) -> io::Result<()> {
+ while self.run {
+ if let Async::Ready(req) = try!(self.handler.transport().poll()) {
+ try!(self.process_incoming(req));
+ } else {
+ break;
+ }
+ }
+ Ok(())
+ }
+
+ /// Process an incoming message
+ fn process_incoming(&mut self, req: Option<T::In>) -> io::Result<()> {
+ trace!("process_incoming");
+ // At this point, the service & transport are ready to process the
+ // request, no matter what it is.
+ match req {
+ Some(message) => {
+ trace!("received message");
+
+ if let Err(e) = self.handler.consume(message) {
+ // TODO: Should handler be infalliable?
+ panic!("unimplemented error handling: {:?}", e);
+ }
+ },
+ None => {
+ trace!("received None");
+ // At this point, we just return. This works
+ // because poll with be called again and go
+ // through the receive-cycle again.
+ self.run = false;
+ }
+ }
+
+ Ok(())
+ }
+
+ /// Send outgoing messages to the transport.
+ fn send_outgoing(&mut self) -> io::Result<()> {
+ trace!("send_responses");
+ loop {
+ match try!(self.handler.produce()) {
+ Async::Ready(Some(message)) => {
+ trace!(" --> got message");
+ try!(self.process_outgoing(message));
+ },
+ Async::Ready(None) => {
+ trace!(" --> got None");
+ // The service is done with the connection.
+ break;
+ },
+ // Nothing to dispatch
+ Async::NotReady => break
+ }
+ }
+
+ Ok(())
+ }
+
+ fn process_outgoing(&mut self, message: T::Out) -> io::Result<()> {
+ trace!("process_outgoing");
+ try!(assert_send(&mut self.handler.transport(), message));
+
+ Ok(())
+ }
+
+ fn flush(&mut self) -> io::Result<()> {
+ self.is_flushed = try!(self.handler.transport().poll_complete()).is_ready();
+
+ // TODO:
+ Ok(())
+ }
+
+ fn has_in_flight(&self) -> bool {
+ self.handler.has_in_flight()
+ }
+}
+
+impl<T> Future for Driver<T>
+where
+ T: Handler
+{
+ type Item = ();
+ type Error = io::Error;
+
+ fn poll(&mut self) -> Poll<(), Self::Error> {
+ trace!("rpc::Driver::tick");
+
+ // First read off data from the socket
+ try!(self.receive_incoming());
+
+ // Handle completed responses
+ try!(self.send_outgoing());
+
+ // Try flushing buffered writes
+ try!(self.flush());
+
+ if self.is_done() {
+ return Ok(().into());
+ }
+
+ // Tick again later
+ Ok(Async::NotReady)
+ }
+}
+
+fn assert_send<S: Sink>(s: &mut S, item: S::SinkItem) -> Result<(), S::SinkError> {
+ match try!(s.start_send(item)) {
+ AsyncSink::Ready => Ok(()),
+ AsyncSink::NotReady(_) => panic!(
+ "sink reported itself as ready after `poll_ready` but was \
+ then unable to accept a message"
+ )
+ }
+}
+
+impl<T> fmt::Debug for Driver<T>
+where
+ T: Handler + fmt::Debug
+{
+ fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
+ f.debug_struct("rpc::Handler")
+ .field("handler", &self.handler)
+ .field("run", &self.run)
+ .field("is_flushed", &self.is_flushed)
+ .finish()
+ }
+}
new file mode 100644
--- /dev/null
+++ b/media/audioipc/audioipc/src/rpc/mod.rs
@@ -0,0 +1,36 @@
+// Copyright © 2017 Mozilla Foundation
+//
+// This program is made available under an ISC-style license. See the
+// accompanying file LICENSE for details
+
+use futures::{Poll, Sink, Stream};
+use std::io;
+
+mod driver;
+mod client;
+mod server;
+
+pub use self::client::{bind_client, Client, ClientProxy, Response};
+pub use self::server::{bind_server, Server};
+
+pub trait Handler {
+ /// Message type read from transport
+ type In;
+ /// Message type written to transport
+ type Out;
+ type Transport: 'static
+ + Stream<Item = Self::In, Error = io::Error>
+ + Sink<SinkItem = Self::Out, SinkError = io::Error>;
+
+ /// Mutable reference to the transport
+ fn transport(&mut self) -> &mut Self::Transport;
+
+ /// Consume a request
+ fn consume(&mut self, message: Self::In) -> io::Result<()>;
+
+ /// Produce a response
+ fn produce(&mut self) -> Poll<Option<Self::Out>, io::Error>;
+
+ /// RPC currently in flight
+ fn has_in_flight(&self) -> bool;
+}
new file mode 100644
--- /dev/null
+++ b/media/audioipc/audioipc/src/rpc/server.rs
@@ -0,0 +1,178 @@
+// This is a derived version of simple/pipeline/server.rs from
+// tokio_proto crate used under MIT license.
+//
+// Original version of server.rs:
+// https://github.com/tokio-rs/tokio-proto/commit/8fb8e482dcd55cf02ceee165f8e08eee799c96d3
+//
+// The following modifications were made:
+// * Simplify the code to implement RPC for pipeline requests that
+// contain simple request/response messages:
+// * Remove `Error` types,
+// * Remove `bind_transport` fn & `BindTransport` type,
+// * Remove all "Lift"ing functionality.
+// * Remove `Service` trait since audioipc doesn't use `tokio_service`
+// crate.
+//
+// Copyright (c) 2016 Tokio contributors
+//
+// Permission is hereby granted, free of charge, to any
+// person obtaining a copy of this software and associated
+// documentation files (the "Software"), to deal in the
+// Software without restriction, including without
+// limitation the rights to use, copy, modify, merge,
+// publish, distribute, sublicense, and/or sell copies of
+// the Software, and to permit persons to whom the Software
+// is furnished to do so, subject to the following
+// conditions:
+//
+// The above copyright notice and this permission notice
+// shall be included in all copies or substantial portions
+// of the Software.
+//
+// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF
+// ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED
+// TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A
+// PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT
+// SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY
+// CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION
+// OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR
+// IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
+// DEALINGS IN THE SOFTWARE.
+
+use futures::{Async, Future, Poll, Sink, Stream};
+use rpc::Handler;
+use rpc::driver::Driver;
+use std::collections::VecDeque;
+use std::io;
+use tokio_core::reactor::Handle;
+
+/// Bind an async I/O object `io` to the `server`.
+pub fn bind_server<S>(transport: S::Transport, server: S, handle: &Handle)
+where
+ S: Server
+{
+ let fut = {
+ let handler = ServerHandler {
+ server: server,
+ transport: transport,
+ in_flight: VecDeque::with_capacity(32)
+ };
+ Driver::new(handler)
+ };
+
+ // Spawn the RPC driver into task
+ handle.spawn(Box::new(fut.map_err(|_| ())))
+}
+
+pub trait Server: 'static {
+ /// Request
+ type Request: 'static;
+
+ /// Response
+ type Response: 'static;
+
+ /// Future
+ type Future: Future<Item = Self::Response, Error = ()>;
+
+ /// The message transport, which works with async I/O objects of
+ /// type `A`.
+ type Transport: 'static
+ + Stream<Item = Self::Request, Error = io::Error>
+ + Sink<SinkItem = Self::Response, SinkError = io::Error>;
+
+ /// Process the request and return the response asynchronously.
+ fn process(&mut self, req: Self::Request) -> Self::Future;
+}
+
+
+////////////////////////////////////////////////////////////////////////////////
+
+struct ServerHandler<S>
+where
+ S: Server
+{
+ // The service handling the connection
+ server: S,
+ // The transport responsible for sending/receving messages over the wire
+ transport: S::Transport,
+ // FIFO of "in flight" responses to requests.
+ in_flight: VecDeque<InFlight<S::Future>>
+}
+
+impl<S> Handler for ServerHandler<S>
+where
+ S: Server
+{
+ type In = S::Request;
+ type Out = S::Response;
+ type Transport = S::Transport;
+
+ /// Mutable reference to the transport
+ fn transport(&mut self) -> &mut Self::Transport {
+ &mut self.transport
+ }
+
+ /// Consume a message
+ fn consume(&mut self, request: Self::In) -> io::Result<()> {
+ trace!("ServerHandler::consume");
+ let response = self.server.process(request);
+ self.in_flight.push_back(InFlight::Active(response));
+
+ // TODO: Should the error be handled differently?
+ Ok(())
+ }
+
+ /// Produce a message
+ fn produce(&mut self) -> Poll<Option<Self::Out>, io::Error> {
+ trace!("ServerHandler::produce");
+
+ // Make progress on pending responses
+ for pending in &mut self.in_flight {
+ pending.poll();
+ }
+
+ // Is the head of the queue ready?
+ match self.in_flight.front() {
+ Some(&InFlight::Done(_)) => {},
+ _ => {
+ trace!(" --> not ready");
+ return Ok(Async::NotReady);
+ }
+ }
+
+ // Return the ready response
+ match self.in_flight.pop_front() {
+ Some(InFlight::Done(res)) => {
+ trace!(" --> received response");
+ Ok(Async::Ready(Some(res)))
+ },
+ _ => panic!()
+ }
+ }
+
+ /// RPC currently in flight
+ fn has_in_flight(&self) -> bool {
+ !self.in_flight.is_empty()
+ }
+}
+
+////////////////////////////////////////////////////////////////////////////////
+
+enum InFlight<F: Future<Error = ()>> {
+ Active(F),
+ Done(F::Item)
+}
+
+impl<F: Future<Error = ()>> InFlight<F> {
+ fn poll(&mut self) {
+ let res = match *self {
+ InFlight::Active(ref mut f) => match f.poll() {
+ Ok(Async::Ready(e)) => e,
+ Err(_) => unreachable!(),
+ Ok(Async::NotReady) => return
+ },
+ _ => return
+ };
+ *self = InFlight::Done(res);
+ }
+}
--- a/media/audioipc/audioipc/src/shm.rs
+++ b/media/audioipc/audioipc/src/shm.rs
@@ -1,31 +1,35 @@
+use errors::*;
+use memmap::{Mmap, MmapViewSync, Protection};
+use std::fs::{remove_file, File, OpenOptions};
use std::path::Path;
-use std::fs::{remove_file, OpenOptions, File};
use std::sync::atomic;
-use memmap::{Mmap, Protection};
-use errors::*;
pub struct SharedMemReader {
- mmap: Mmap,
+ mmap: Mmap
}
impl SharedMemReader {
pub fn new(path: &Path, size: usize) -> Result<(SharedMemReader, File)> {
- let file = OpenOptions::new().read(true)
- .write(true)
- .create_new(true)
- .open(path)?;
+ let file = OpenOptions::new()
+ .read(true)
+ .write(true)
+ .create_new(true)
+ .open(path)?;
let _ = remove_file(path);
file.set_len(size as u64)?;
let mmap = Mmap::open(&file, Protection::Read)?;
assert_eq!(mmap.len(), size);
- Ok((SharedMemReader {
- mmap
- }, file))
+ Ok((
+ SharedMemReader {
+ mmap
+ },
+ file
+ ))
}
pub fn read(&self, buf: &mut [u8]) -> Result<()> {
if buf.is_empty() {
return Ok(());
}
// TODO: Track how much is in the shm area.
if buf.len() <= self.mmap.len() {
@@ -37,59 +41,74 @@ impl SharedMemReader {
Ok(())
} else {
bail!("mmap size");
}
}
}
pub struct SharedMemSlice {
- mmap: Mmap,
+ view: MmapViewSync
}
impl SharedMemSlice {
- pub fn from(file: File, size: usize) -> Result<SharedMemSlice> {
- let mmap = Mmap::open(&file, Protection::Read)?;
+ pub fn from(file: &File, size: usize) -> Result<SharedMemSlice> {
+ let mmap = Mmap::open(file, Protection::Read)?;
assert_eq!(mmap.len(), size);
+ let view = mmap.into_view_sync();
Ok(SharedMemSlice {
- mmap
+ view
})
}
pub fn get_slice(&self, size: usize) -> Result<&[u8]> {
if size == 0 {
return Ok(&[]);
}
// TODO: Track how much is in the shm area.
- if size <= self.mmap.len() {
+ if size <= self.view.len() {
atomic::fence(atomic::Ordering::Acquire);
- let buf = unsafe { &self.mmap.as_slice()[..size] };
+ let buf = unsafe { &self.view.as_slice()[..size] };
Ok(buf)
} else {
bail!("mmap size");
}
}
+
+ /// Clones the view of the memory map.
+ ///
+ /// The underlying memory map is shared, and thus the caller must ensure that the memory
+ /// underlying the view is not illegally aliased.
+ pub unsafe fn clone_view(&self) -> Self {
+ SharedMemSlice {
+ view: self.view.clone()
+ }
+ }
}
pub struct SharedMemWriter {
- mmap: Mmap,
+ mmap: Mmap
}
impl SharedMemWriter {
pub fn new(path: &Path, size: usize) -> Result<(SharedMemWriter, File)> {
- let file = OpenOptions::new().read(true)
- .write(true)
- .create_new(true)
- .open(path)?;
+ let file = OpenOptions::new()
+ .read(true)
+ .write(true)
+ .create_new(true)
+ .open(path)?;
let _ = remove_file(path);
file.set_len(size as u64)?;
let mmap = Mmap::open(&file, Protection::ReadWrite)?;
- Ok((SharedMemWriter {
- mmap
- }, file))
+ Ok((
+ SharedMemWriter {
+ mmap
+ },
+ file
+ ))
}
pub fn write(&mut self, buf: &[u8]) -> Result<()> {
if buf.is_empty() {
return Ok(());
}
// TODO: Track how much is in the shm area.
if buf.len() <= self.mmap.len() {
@@ -100,34 +119,46 @@ impl SharedMemWriter {
Ok(())
} else {
bail!("mmap size");
}
}
}
pub struct SharedMemMutSlice {
- mmap: Mmap,
+ view: MmapViewSync
}
impl SharedMemMutSlice {
- pub fn from(file: File, size: usize) -> Result<SharedMemMutSlice> {
- let mmap = Mmap::open(&file, Protection::ReadWrite)?;
+ pub fn from(file: &File, size: usize) -> Result<SharedMemMutSlice> {
+ let mmap = Mmap::open(file, Protection::ReadWrite)?;
assert_eq!(mmap.len(), size);
+ let view = mmap.into_view_sync();
Ok(SharedMemMutSlice {
- mmap
+ view
})
}
pub fn get_mut_slice(&mut self, size: usize) -> Result<&mut [u8]> {
if size == 0 {
return Ok(&mut []);
}
// TODO: Track how much is in the shm area.
- if size <= self.mmap.len() {
- let buf = unsafe { &mut self.mmap.as_mut_slice()[..size] };
+ if size <= self.view.len() {
+ let buf = unsafe { &mut self.view.as_mut_slice()[..size] };
atomic::fence(atomic::Ordering::Release);
Ok(buf)
} else {
bail!("mmap size");
}
}
+
+ /// Clones the view of the memory map.
+ ///
+ /// The underlying memory map is shared, and thus the caller must
+ /// ensure that the memory underlying the view is not illegally
+ /// aliased.
+ pub unsafe fn clone_view(&self) -> Self {
+ SharedMemMutSlice {
+ view: self.view.clone()
+ }
+ }
}
--- a/media/audioipc/client/Cargo.toml
+++ b/media/audioipc/client/Cargo.toml
@@ -1,12 +1,21 @@
[package]
name = "audioipc-client"
-version = "0.1.0"
-authors = ["Dan Glastonbury <dan.glastonbury@gmail.com>"]
+version = "0.2.0"
+authors = [
+ "Matthew Gregan <kinetik@flim.org>",
+ "Dan Glastonbury <dan.glastonbury@gmail.com>"
+ ]
description = "Cubeb Backend for talking to remote cubeb server."
[dependencies]
audioipc = { path="../audioipc" }
-cubeb-backend = { path="../../cubeb-rs/cubeb-backend" }
-cubeb-core = { path="../../cubeb-rs/cubeb-core" }
+cubeb-backend = { path = "../../cubeb-rs/cubeb-backend" }
+cubeb-core = { path = "../../cubeb-rs/cubeb-core" }
+# rayon-core in Gecko uses futures 0.1.13
+futures = { version="=0.1.13", default-features=false, features=["use_std"] }
+# futures-cpupool 0.1.5 matches futures 0.1.13
+futures-cpupool = { version="=0.1.5", default-features=false }
libc = "0.2"
log = "^0.3.6"
+tokio-core = "0.1"
+tokio-uds = "0.1.7"
\ No newline at end of file
--- a/media/audioipc/client/src/context.rs
+++ b/media/audioipc/client/src/context.rs
@@ -1,102 +1,171 @@
// Copyright © 2017 Mozilla Foundation
//
// This program is made available under an ISC-style license. See the
// accompanying file LICENSE for details
use ClientStream;
use assert_not_in_callback;
-use audioipc::{ClientMessage, Connection, ServerMessage, messages};
+use audioipc::{messages, ClientMessage, ServerMessage};
+use audioipc::{core, rpc};
+use audioipc::codec::LengthDelimitedCodec;
+use audioipc::fd_passing::{framed_with_fds, FramedWithFds};
use cubeb_backend::{Context, Ops};
-use cubeb_core::{DeviceId, DeviceType, Error, ErrorCode, Result, StreamParams, ffi};
+use cubeb_core::{ffi, DeviceId, DeviceType, Error, Result, StreamParams};
use cubeb_core::binding::Binding;
+use futures::Future;
+use futures_cpupool::{self, CpuPool};
+use libc;
+use std::{fmt, io, mem};
use std::ffi::{CStr, CString};
-use std::mem;
use std::os::raw::c_void;
-use std::os::unix::net::UnixStream;
use std::os::unix::io::FromRawFd;
-use std::sync::{Mutex, MutexGuard};
+use std::os::unix::net;
+use std::sync::mpsc;
use stream;
-use libc;
+use tokio_core::reactor::{Handle, Remote};
+use tokio_uds::UnixStream;
+
+struct CubebClient;
-#[derive(Debug)]
-pub struct ClientContext {
- _ops: *const Ops,
- connection: Mutex<Connection>
+impl rpc::Client for CubebClient {
+ type Request = ServerMessage;
+ type Response = ClientMessage;
+ type Transport =
+ FramedWithFds<UnixStream, LengthDelimitedCodec<Self::Request, Self::Response>>;
}
+macro_rules! t(
+ ($e:expr) => (
+ match $e {
+ Ok(e) => e,
+ Err(_) => return Err(Error::default())
+ }
+ ));
+
pub const CLIENT_OPS: Ops = capi_new!(ClientContext, ClientStream);
+pub struct ClientContext {
+ _ops: *const Ops,
+ rpc: rpc::ClientProxy<ServerMessage, ClientMessage>,
+ core: core::CoreThread,
+ cpu_pool: CpuPool
+}
+
impl ClientContext {
#[doc(hidden)]
- pub fn connection(&self) -> MutexGuard<Connection> {
- self.connection.lock().unwrap()
+ pub fn remote(&self) -> Remote {
+ self.core.remote()
+ }
+
+ #[doc(hidden)]
+ pub fn rpc(&self) -> rpc::ClientProxy<ServerMessage, ClientMessage> {
+ self.rpc.clone()
+ }
+
+ #[doc(hidden)]
+ pub fn cpu_pool(&self) -> CpuPool {
+ self.cpu_pool.clone()
}
}
// TODO: encapsulate connect, etc inside audioipc.
-fn open_server_stream() -> Result<UnixStream> {
+fn open_server_stream() -> Result<net::UnixStream> {
unsafe {
if let Some(fd) = super::G_SERVER_FD {
- return Ok(UnixStream::from_raw_fd(fd));
+ return Ok(net::UnixStream::from_raw_fd(fd));
}
Err(Error::default())
}
}
impl Context for ClientContext {
fn init(_context_name: Option<&CStr>) -> Result<*mut ffi::cubeb> {
+ fn bind_and_send_client(
+ stream: UnixStream,
+ handle: &Handle,
+ tx_rpc: &mpsc::Sender<rpc::ClientProxy<ServerMessage, ClientMessage>>
+ ) -> Option<()> {
+ let transport = framed_with_fds(stream, Default::default());
+ let rpc = rpc::bind_client::<CubebClient>(transport, handle);
+ // If send fails then the rx end has closed
+ // which is unlikely here.
+ let _ = tx_rpc.send(rpc);
+ Some(())
+ }
+
assert_not_in_callback();
+
+ let (tx_rpc, rx_rpc) = mpsc::channel();
+
+ let core = t!(core::spawn_thread("AudioIPC Client RPC", move || {
+ let handle = core::handle();
+
+ open_server_stream().ok()
+ .and_then(|stream| UnixStream::from_stream(stream, &handle).ok())
+ .and_then(|stream| bind_and_send_client(stream, &handle, &tx_rpc))
+ .ok_or_else(|| io::Error::new(
+ io::ErrorKind::Other,
+ "Failed to open stream and create rpc."
+ ))
+ }));
+
+ let rpc = t!(rx_rpc.recv());
+
+ let cpupool = futures_cpupool::Builder::new()
+ .name_prefix("AudioIPC")
+ .create();
+
let ctx = Box::new(ClientContext {
_ops: &CLIENT_OPS as *const _,
- connection: Mutex::new(Connection::new(open_server_stream()?))
+ rpc: rpc,
+ core: core,
+ cpu_pool: cpupool
});
Ok(Box::into_raw(ctx) as *mut _)
}
fn backend_id(&self) -> &'static CStr {
assert_not_in_callback();
unsafe { CStr::from_ptr(b"remote\0".as_ptr() as *const _) }
}
fn max_channel_count(&self) -> Result<u32> {
assert_not_in_callback();
- let mut conn = self.connection();
- send_recv!(conn, ContextGetMaxChannelCount => ContextMaxChannelCount())
+ send_recv!(self.rpc(), ContextGetMaxChannelCount => ContextMaxChannelCount())
}
fn min_latency(&self, params: &StreamParams) -> Result<u32> {
assert_not_in_callback();
let params = messages::StreamParams::from(unsafe { &*params.raw() });
- let mut conn = self.connection();
- send_recv!(conn, ContextGetMinLatency(params) => ContextMinLatency())
+ send_recv!(self.rpc(), ContextGetMinLatency(params) => ContextMinLatency())
}
fn preferred_sample_rate(&self) -> Result<u32> {
assert_not_in_callback();
- let mut conn = self.connection();
- send_recv!(conn, ContextGetPreferredSampleRate => ContextPreferredSampleRate())
+ send_recv!(self.rpc(), ContextGetPreferredSampleRate => ContextPreferredSampleRate())
}
fn preferred_channel_layout(&self) -> Result<ffi::cubeb_channel_layout> {
assert_not_in_callback();
- let mut conn = self.connection();
- send_recv!(conn, ContextGetPreferredChannelLayout => ContextPreferredChannelLayout())
+ send_recv!(self.rpc(),
+ ContextGetPreferredChannelLayout => ContextPreferredChannelLayout())
}
fn enumerate_devices(&self, devtype: DeviceType) -> Result<ffi::cubeb_device_collection> {
assert_not_in_callback();
- let mut conn = self.connection();
- let v: Vec<ffi::cubeb_device_info> =
- match send_recv!(conn, ContextGetDeviceEnumeration(devtype.bits()) => ContextEnumeratedDevices()) {
- Ok(mut v) => v.drain(..).map(|i| i.into()).collect(),
- Err(e) => return Err(e),
- };
+ let v: Vec<ffi::cubeb_device_info> = match send_recv!(self.rpc(),
+ ContextGetDeviceEnumeration(devtype.bits()) =>
+ ContextEnumeratedDevices())
+ {
+ Ok(mut v) => v.drain(..).map(|i| i.into()).collect(),
+ Err(e) => return Err(e)
+ };
let vs = v.into_boxed_slice();
let coll = ffi::cubeb_device_collection {
count: vs.len(),
device: vs.as_ptr()
};
// Giving away the memory owned by vs. Don't free it!
// Reclaimed in `device_collection_destroy`.
mem::forget(vs);
@@ -135,30 +204,32 @@ impl Context for ClientContext {
input_device: DeviceId,
input_stream_params: Option<&ffi::cubeb_stream_params>,
output_device: DeviceId,
output_stream_params: Option<&ffi::cubeb_stream_params>,
latency_frame: u32,
// These params aren't sent to the server
data_callback: ffi::cubeb_data_callback,
state_callback: ffi::cubeb_state_callback,
- user_ptr: *mut c_void,
+ user_ptr: *mut c_void
) -> Result<*mut ffi::cubeb_stream> {
assert_not_in_callback();
- fn opt_stream_params(p: Option<&ffi::cubeb_stream_params>) -> Option<messages::StreamParams> {
+ fn opt_stream_params(
+ p: Option<&ffi::cubeb_stream_params>
+ ) -> Option<messages::StreamParams> {
match p {
Some(raw) => Some(messages::StreamParams::from(raw)),
- None => None,
+ None => None
}
}
let stream_name = match stream_name {
Some(s) => Some(s.to_bytes().to_vec()),
- None => None,
+ None => None
};
let input_stream_params = opt_stream_params(input_stream_params);
let output_stream_params = opt_stream_params(output_stream_params);
let init_params = messages::StreamInitParams {
stream_name: stream_name,
input_device: input_device.raw() as _,
@@ -169,36 +240,37 @@ impl Context for ClientContext {
};
stream::init(self, init_params, data_callback, state_callback, user_ptr)
}
fn register_device_collection_changed(
&self,
_dev_type: DeviceType,
_collection_changed_callback: ffi::cubeb_device_collection_changed_callback,
- _user_ptr: *mut c_void,
+ _user_ptr: *mut c_void
) -> Result<()> {
assert_not_in_callback();
Ok(())
}
}
impl Drop for ClientContext {
fn drop(&mut self) {
- let mut conn = self.connection();
info!("ClientContext drop...");
- let r = conn.send(ServerMessage::ClientDisconnect);
- if r.is_err() {
- debug!("ClientContext::Drop send error={:?}", r);
- } else {
- let r = conn.receive();
- if let Ok(ClientMessage::ClientDisconnected) = r {
- } else {
- debug!("ClientContext::Drop receive error={:?}", r);
- }
- }
+ let _ = send_recv!(self.rpc(), ClientDisconnect => ClientDisconnected);
unsafe {
if super::G_SERVER_FD.is_some() {
libc::close(super::G_SERVER_FD.take().unwrap());
}
}
}
}
+
+impl fmt::Debug for ClientContext {
+ fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
+ f.debug_struct("ClientContext")
+ .field("_ops", &self._ops)
+ .field("rpc", &self.rpc)
+ .field("core", &self.core)
+ .field("cpu_pool", &"...")
+ .finish()
+ }
+}
--- a/media/audioipc/client/src/lib.rs
+++ b/media/audioipc/client/src/lib.rs
@@ -1,20 +1,24 @@
// Copyright © 2017 Mozilla Foundation
//
// This program is made available under an ISC-style license. See the
// accompanying file LICENSE for details.
extern crate audioipc;
-extern crate cubeb_core;
#[macro_use]
extern crate cubeb_backend;
+extern crate cubeb_core;
+extern crate futures;
+extern crate futures_cpupool;
extern crate libc;
#[macro_use]
extern crate log;
+extern crate tokio_core;
+extern crate tokio_uds;
#[macro_use]
mod send_recv;
mod context;
mod stream;
use context::ClientContext;
use cubeb_backend::capi;
@@ -37,17 +41,21 @@ fn assert_not_in_callback() {
assert_eq!(*b.borrow(), false);
});
}
static mut G_SERVER_FD: Option<RawFd> = None;
#[no_mangle]
/// Entry point from C code.
-pub unsafe extern "C" fn audioipc_client_init(c: *mut *mut ffi::cubeb, context_name: *const c_char, server_connection: c_int) -> c_int {
+pub unsafe extern "C" fn audioipc_client_init(
+ c: *mut *mut ffi::cubeb,
+ context_name: *const c_char,
+ server_connection: c_int
+) -> c_int {
// TODO: Windows portability (for fd).
// TODO: Better way to pass extra parameters to Context impl.
if G_SERVER_FD.is_some() {
panic!("audioipc client's server connection already initialized.");
}
if server_connection >= 0 {
G_SERVER_FD = Some(server_connection);
}
--- a/media/audioipc/client/src/send_recv.rs
+++ b/media/audioipc/client/src/send_recv.rs
@@ -1,66 +1,68 @@
+use cubeb_core::Error;
+use cubeb_core::ffi;
+
+#[doc(hidden)]
+pub fn _err<E>(e: E) -> Error
+where
+ E: Into<Option<ffi::cubeb_error_code>>
+{
+ match e.into() {
+ Some(e) => unsafe { Error::from_raw(e) },
+ None => Error::new()
+ }
+}
+
#[macro_export]
macro_rules! send_recv {
- ($conn:expr, $smsg:ident => $rmsg:ident) => {{
- send_recv!(__send $conn, $smsg);
- send_recv!(__recv $conn, $rmsg)
+ ($rpc:expr, $smsg:ident => $rmsg:ident) => {{
+ let resp = send_recv!(__send $rpc, $smsg);
+ send_recv!(__recv resp, $rmsg)
}};
- ($conn:expr, $smsg:ident => $rmsg:ident()) => {{
- send_recv!(__send $conn, $smsg);
- send_recv!(__recv $conn, $rmsg __result)
+ ($rpc:expr, $smsg:ident => $rmsg:ident()) => {{
+ let resp = send_recv!(__send $rpc, $smsg);
+ send_recv!(__recv resp, $rmsg __result)
}};
- ($conn:expr, $smsg:ident($($a:expr),*) => $rmsg:ident) => {{
- send_recv!(__send $conn, $smsg, $($a),*);
- send_recv!(__recv $conn, $rmsg)
+ ($rpc:expr, $smsg:ident($($a:expr),*) => $rmsg:ident) => {{
+ let resp = send_recv!(__send $rpc, $smsg, $($a),*);
+ send_recv!(__recv resp, $rmsg)
}};
- ($conn:expr, $smsg:ident($($a:expr),*) => $rmsg:ident()) => {{
- send_recv!(__send $conn, $smsg, $($a),*);
- send_recv!(__recv $conn, $rmsg __result)
+ ($rpc:expr, $smsg:ident($($a:expr),*) => $rmsg:ident()) => {{
+ let resp = send_recv!(__send $rpc, $smsg, $($a),*);
+ send_recv!(__recv resp, $rmsg __result)
}};
//
- (__send $conn:expr, $smsg:ident) => ({
- let r = $conn.send(ServerMessage::$smsg);
- if r.is_err() {
- debug!("send error - got={:?}", r);
- return Err(ErrorCode::Error.into());
- }
+ (__send $rpc:expr, $smsg:ident) => ({
+ $rpc.call(ServerMessage::$smsg)
+ });
+ (__send $rpc:expr, $smsg:ident, $($a:expr),*) => ({
+ $rpc.call(ServerMessage::$smsg($($a),*))
});
- (__send $conn:expr, $smsg:ident, $($a:expr),*) => ({
- let r = $conn.send(ServerMessage::$smsg($($a),*));
- if r.is_err() {
- debug!("send error - got={:?}", r);
- return Err(ErrorCode::Error.into());
- }
- });
- (__recv $conn:expr, $rmsg:ident) => ({
- match $conn.receive() {
+ (__recv $resp:expr, $rmsg:ident) => ({
+ match $resp.wait() {
Ok(ClientMessage::$rmsg) => Ok(()),
- // Macro can see ErrorCode but not Error? I don't understand.
- // ::cubeb_core::Error is fragile but this isn't general purpose code.
- Ok(ClientMessage::Error(e)) => Err(unsafe { ::cubeb_core::Error::from_raw(e) }),
+ Ok(ClientMessage::Error(e)) => Err($crate::send_recv::_err(e)),
Ok(m) => {
- debug!("receive unexpected message - got={:?}", m);
- Err(ErrorCode::Error.into())
+ debug!("received wrong message - got={:?}", m);
+ Err($crate::send_recv::_err(None))
},
Err(e) => {
- debug!("receive error - got={:?}", e);
- Err(ErrorCode::Error.into())
+ debug!("received error from rpc - got={:?}", e);
+ Err($crate::send_recv::_err(None))
},
}
});
- (__recv $conn:expr, $rmsg:ident __result) => ({
- match $conn.receive() {
+ (__recv $resp:expr, $rmsg:ident __result) => ({
+ match $resp.wait() {
Ok(ClientMessage::$rmsg(v)) => Ok(v),
- // Macro can see ErrorCode but not Error? I don't understand.
- // ::cubeb_core::Error is fragile but this isn't general purpose code.
- Ok(ClientMessage::Error(e)) => Err(unsafe { ::cubeb_core::Error::from_raw(e) }),
+ Ok(ClientMessage::Error(e)) => Err($crate::send_recv::_err(e)),
Ok(m) => {
- debug!("receive unexpected message - got={:?}", m);
- Err(ErrorCode::Error.into())
+ debug!("received wrong message - got={:?}", m);
+ Err($crate::send_recv::_err(None))
},
Err(e) => {
- debug!("receive error - got={:?}", e);
- Err(ErrorCode::Error.into())
+ debug!("received error - got={:?}", e);
+ Err($crate::send_recv::_err(None))
},
}
})
}
--- a/media/audioipc/client/src/stream.rs
+++ b/media/audioipc/client/src/stream.rs
@@ -1,277 +1,225 @@
// Copyright © 2017 Mozilla Foundation
//
// This program is made available under an ISC-style license. See the
// accompanying file LICENSE for details
+use {assert_not_in_callback, set_in_callback};
use ClientContext;
-use {set_in_callback, assert_not_in_callback};
-use audioipc::{ClientMessage, Connection, ServerMessage, messages};
+use audioipc::codec::LengthDelimitedCodec;
+use audioipc::frame::{framed, Framed};
+use audioipc::messages::{self, CallbackReq, CallbackResp, ClientMessage, ServerMessage};
+use audioipc::rpc;
use audioipc::shm::{SharedMemMutSlice, SharedMemSlice};
use cubeb_backend::Stream;
-use cubeb_core::{ErrorCode, Result, ffi};
+use cubeb_core::{ffi, Result};
+use futures::Future;
+use futures_cpupool::{CpuFuture, CpuPool};
use std::ffi::CString;
use std::fs::File;
use std::os::raw::c_void;
use std::os::unix::io::FromRawFd;
+use std::os::unix::net;
use std::ptr;
-use std::thread;
+use std::sync::mpsc;
+use tokio_uds::UnixStream;
// TODO: Remove and let caller allocate based on cubeb backend requirements.
const SHM_AREA_SIZE: usize = 2 * 1024 * 1024;
pub struct ClientStream<'ctx> {
// This must be a reference to Context for cubeb, cubeb accesses stream methods via stream->context->ops
context: &'ctx ClientContext,
- token: usize,
- join_handle: Option<thread::JoinHandle<()>>
+ token: usize
}
-fn stream_thread(
- mut conn: Connection,
- input_shm: &SharedMemSlice,
- mut output_shm: SharedMemMutSlice,
+struct CallbackServer {
+ input_shm: SharedMemSlice,
+ output_shm: SharedMemMutSlice,
data_cb: ffi::cubeb_data_callback,
state_cb: ffi::cubeb_state_callback,
user_ptr: usize,
-) {
- loop {
- let r = match conn.receive::<ClientMessage>() {
- Ok(r) => r,
- Err(e) => {
- debug!("stream_thread: Failed to receive message: {:?}", e);
- return;
- },
- };
+ cpu_pool: CpuPool
+}
- match r {
- ClientMessage::StreamDestroyed => {
- info!("stream_thread: Shutdown callback thread.");
- return;
- },
- ClientMessage::StreamDataCallback(nframes, frame_size) => {
- trace!(
+impl rpc::Server for CallbackServer {
+ type Request = CallbackReq;
+ type Response = CallbackResp;
+ type Future = CpuFuture<Self::Response, ()>;
+ type Transport = Framed<UnixStream, LengthDelimitedCodec<Self::Response, Self::Request>>;
+
+ fn process(&mut self, req: Self::Request) -> Self::Future {
+ match req {
+ CallbackReq::Data(nframes, frame_size) => {
+ info!(
"stream_thread: Data Callback: nframes={} frame_size={}",
nframes,
frame_size
);
- // TODO: This is proof-of-concept. Make it better.
- let input_ptr: *const u8 = input_shm
- .get_slice(nframes as usize * frame_size)
- .unwrap()
- .as_ptr();
- let output_ptr: *mut u8 = output_shm
- .get_mut_slice(nframes as usize * frame_size)
- .unwrap()
- .as_mut_ptr();
- set_in_callback(true);
- let nframes = data_cb(
- ptr::null_mut(),
- user_ptr as *mut c_void,
- input_ptr as *const _,
- output_ptr as *mut _,
- nframes as _
- );
- set_in_callback(false);
- let r = conn.send(ServerMessage::StreamDataCallback(nframes as isize));
- if r.is_err() {
- debug!("stream_thread: Failed to send StreamDataCallback: {:?}", r);
- return;
- }
+
+ // Clone values that need to be moved into the cpu pool thread.
+ let input_shm = unsafe { self.input_shm.clone_view() };
+ let mut output_shm = unsafe { self.output_shm.clone_view() };
+ let user_ptr = self.user_ptr;
+ let cb = self.data_cb;
+
+ self.cpu_pool.spawn_fn(move || {
+ // TODO: This is proof-of-concept. Make it better.
+ let input_ptr: *const u8 = input_shm
+ .get_slice(nframes as usize * frame_size)
+ .unwrap()
+ .as_ptr();
+ let output_ptr: *mut u8 = output_shm
+ .get_mut_slice(nframes as usize * frame_size)
+ .unwrap()
+ .as_mut_ptr();
+
+ set_in_callback(true);
+ let nframes = cb(
+ ptr::null_mut(),
+ user_ptr as *mut c_void,
+ input_ptr as *const _,
+ output_ptr as *mut _,
+ nframes as _
+ );
+ set_in_callback(false);
+
+ Ok(CallbackResp::Data(nframes as isize))
+ })
},
- ClientMessage::StreamStateCallback(state) => {
+ CallbackReq::State(state) => {
info!("stream_thread: State Callback: {:?}", state);
- set_in_callback(true);
- state_cb(ptr::null_mut(), user_ptr as *mut _, state);
- set_in_callback(false);
- let r = conn.send(ServerMessage::StreamStateCallback);
- if r.is_err() {
- debug!("stream_thread: Failed to send StreamStateCallback: {:?}", r);
- return;
- }
- },
- m => {
- info!("Unexpected ClientMessage: {:?}", m);
- },
+ let user_ptr = self.user_ptr;
+ let cb = self.state_cb;
+ self.cpu_pool.spawn_fn(move || {
+ set_in_callback(true);
+ cb(ptr::null_mut(), user_ptr as *mut _, state);
+ set_in_callback(false);
+
+ Ok(CallbackResp::State)
+ })
+ }
}
}
}
impl<'ctx> ClientStream<'ctx> {
fn init(
ctx: &'ctx ClientContext,
init_params: messages::StreamInitParams,
data_callback: ffi::cubeb_data_callback,
state_callback: ffi::cubeb_state_callback,
- user_ptr: *mut c_void,
+ user_ptr: *mut c_void
) -> Result<*mut ffi::cubeb_stream> {
assert_not_in_callback();
- let mut conn = ctx.connection();
+
+ let rpc = ctx.rpc();
+ let data = try!(send_recv!(rpc, StreamInit(init_params) => StreamCreated()));
+
+ trace!("token = {}, fds = {:?}", data.token, data.fds);
- let r = conn.send(ServerMessage::StreamInit(init_params));
- if r.is_err() {
- debug!("ClientStream::init: Failed to send StreamInit: {:?}", r);
- return Err(ErrorCode::Error.into());
- }
+ let stm = data.fds[0];
+ let stream = unsafe { net::UnixStream::from_raw_fd(stm) };
- let r = match conn.receive_with_fd::<ClientMessage>() {
- Ok(r) => r,
- Err(_) => return Err(ErrorCode::Error.into()),
- };
+ let input = data.fds[1];
+ let input_file = unsafe { File::from_raw_fd(input) };
+ let input_shm = SharedMemSlice::from(&input_file, SHM_AREA_SIZE).unwrap();
- let (token, conn2) = match r {
- ClientMessage::StreamCreated(tok) => {
- let fd = conn.take_fd();
- if fd.is_none() {
- debug!("Missing fd!");
- return Err(ErrorCode::Error.into());
- }
- (tok, unsafe { Connection::from_raw_fd(fd.unwrap()) })
- },
- m => {
- debug!("Unexpected message: {:?}", m);
- return Err(ErrorCode::Error.into());
- },
- };
+ let output = data.fds[2];
+ let output_file = unsafe { File::from_raw_fd(output) };
+ let output_shm = SharedMemMutSlice::from(&output_file, SHM_AREA_SIZE).unwrap();
+
+ let user_data = user_ptr as usize;
+
+ let cpu_pool = ctx.cpu_pool();
- // TODO: It'd be nicer to receive these two fds as part of
- // StreamCreated, but that requires changing sendmsg/recvmsg to
- // support multiple fds.
- let r = match conn.receive_with_fd::<ClientMessage>() {
- Ok(r) => r,
- Err(_) => return Err(ErrorCode::Error.into()),
+ let server = CallbackServer {
+ input_shm: input_shm,
+ output_shm: output_shm,
+ data_cb: data_callback,
+ state_cb: state_callback,
+ user_ptr: user_data,
+ cpu_pool: cpu_pool
};
- let input_file = match r {
- ClientMessage::StreamCreatedInputShm => {
- let fd = conn.take_fd();
- if fd.is_none() {
- debug!("Missing fd!");
- return Err(ErrorCode::Error.into());
- }
- unsafe { File::from_raw_fd(fd.unwrap()) }
- },
- m => {
- debug!("Unexpected message: {:?}", m);
- return Err(ErrorCode::Error.into());
- },
- };
-
- let input_shm = SharedMemSlice::from(input_file, SHM_AREA_SIZE).unwrap();
-
- let r = match conn.receive_with_fd::<ClientMessage>() {
- Ok(r) => r,
- Err(_) => return Err(ErrorCode::Error.into()),
- };
-
- let output_file = match r {
- ClientMessage::StreamCreatedOutputShm => {
- let fd = conn.take_fd();
- if fd.is_none() {
- debug!("Missing fd!");
- return Err(ErrorCode::Error.into());
- }
- unsafe { File::from_raw_fd(fd.unwrap()) }
- },
- m => {
- debug!("Unexpected message: {:?}", m);
- return Err(ErrorCode::Error.into());
- },
- };
-
- let output_shm = SharedMemMutSlice::from(output_file, SHM_AREA_SIZE).unwrap();
-
- let user_data = user_ptr as usize;
- let join_handle = thread::spawn(move || {
- stream_thread(
- conn2,
- &input_shm,
- output_shm,
- data_callback,
- state_callback,
- user_data
- )
+ let (wait_tx, wait_rx) = mpsc::channel();
+ ctx.remote().spawn(move |handle| {
+ let stream = UnixStream::from_stream(stream, handle).unwrap();
+ let transport = framed(stream, Default::default());
+ rpc::bind_server(transport, server, handle);
+ wait_tx.send(()).unwrap();
+ Ok(())
});
+ wait_rx.recv().unwrap();
Ok(Box::into_raw(Box::new(ClientStream {
context: ctx,
- token: token,
- join_handle: Some(join_handle)
+ token: data.token
})) as _)
}
}
impl<'ctx> Drop for ClientStream<'ctx> {
fn drop(&mut self) {
- let mut conn = self.context.connection();
- let r = conn.send(ServerMessage::StreamDestroy(self.token));
- if r.is_err() {
- debug!("ClientStream::Drop send error={:?}", r);
- } else {
- let r = conn.receive();
- if let Ok(ClientMessage::StreamDestroyed) = r {
- } else {
- debug!("ClientStream::Drop receive error={:?}", r);
- }
- }
- // XXX: This is guaranteed to wait forever if the send failed.
- self.join_handle.take().unwrap().join().unwrap();
+ trace!("ClientStream drop...");
+ let rpc = self.context.rpc();
+ let _ = send_recv!(rpc, StreamDestroy(self.token) => StreamDestroyed);
}
}
impl<'ctx> Stream for ClientStream<'ctx> {
fn start(&self) -> Result<()> {
assert_not_in_callback();
- let mut conn = self.context.connection();
- send_recv!(conn, StreamStart(self.token) => StreamStarted)
+ let rpc = self.context.rpc();
+ send_recv!(rpc, StreamStart(self.token) => StreamStarted)
}
fn stop(&self) -> Result<()> {
assert_not_in_callback();
- let mut conn = self.context.connection();
- send_recv!(conn, StreamStop(self.token) => StreamStopped)
+ let rpc = self.context.rpc();
+ send_recv!(rpc, StreamStop(self.token) => StreamStopped)
}
fn reset_default_device(&self) -> Result<()> {
assert_not_in_callback();
- let mut conn = self.context.connection();
- send_recv!(conn, StreamResetDefaultDevice(self.token) => StreamDefaultDeviceReset)
+ let rpc = self.context.rpc();
+ send_recv!(rpc, StreamResetDefaultDevice(self.token) => StreamDefaultDeviceReset)
}
fn position(&self) -> Result<u64> {
assert_not_in_callback();
- let mut conn = self.context.connection();
- send_recv!(conn, StreamGetPosition(self.token) => StreamPosition())
+ let rpc = self.context.rpc();
+ send_recv!(rpc, StreamGetPosition(self.token) => StreamPosition())
}
fn latency(&self) -> Result<u32> {
assert_not_in_callback();
- let mut conn = self.context.connection();
- send_recv!(conn, StreamGetLatency(self.token) => StreamLatency())
+ let rpc = self.context.rpc();
+ send_recv!(rpc, StreamGetLatency(self.token) => StreamLatency())
}
fn set_volume(&self, volume: f32) -> Result<()> {
assert_not_in_callback();
- let mut conn = self.context.connection();
- send_recv!(conn, StreamSetVolume(self.token, volume) => StreamVolumeSet)
+ let rpc = self.context.rpc();
+ send_recv!(rpc, StreamSetVolume(self.token, volume) => StreamVolumeSet)
}
fn set_panning(&self, panning: f32) -> Result<()> {
assert_not_in_callback();
- let mut conn = self.context.connection();
- send_recv!(conn, StreamSetPanning(self.token, panning) => StreamPanningSet)
+ let rpc = self.context.rpc();
+ send_recv!(rpc, StreamSetPanning(self.token, panning) => StreamPanningSet)
}
fn current_device(&self) -> Result<*const ffi::cubeb_device> {
assert_not_in_callback();
- let mut conn = self.context.connection();
- match send_recv!(conn, StreamGetCurrentDevice(self.token) => StreamCurrentDevice()) {
+ let rpc = self.context.rpc();
+ match send_recv!(rpc, StreamGetCurrentDevice(self.token) => StreamCurrentDevice()) {
Ok(d) => Ok(Box::into_raw(Box::new(d.into()))),
- Err(e) => Err(e),
+ Err(e) => Err(e)
}
}
fn device_destroy(&self, device: *const ffi::cubeb_device) -> Result<()> {
assert_not_in_callback();
// It's all unsafe...
if !device.is_null() {
unsafe {
@@ -285,24 +233,24 @@ impl<'ctx> Stream for ClientStream<'ctx>
}
}
Ok(())
}
// TODO: How do we call this back? On what thread?
fn register_device_changed_callback(
&self,
- _device_changed_callback: ffi::cubeb_device_changed_callback,
+ _device_changed_callback: ffi::cubeb_device_changed_callback
) -> Result<()> {
assert_not_in_callback();
Ok(())
}
}
pub fn init(
ctx: &ClientContext,
init_params: messages::StreamInitParams,
data_callback: ffi::cubeb_data_callback,
state_callback: ffi::cubeb_state_callback,
- user_ptr: *mut c_void,
+ user_ptr: *mut c_void
) -> Result<*mut ffi::cubeb_stream> {
ClientStream::init(ctx, init_params, data_callback, state_callback, user_ptr)
}
new file mode 100644
--- /dev/null
+++ b/media/audioipc/gecko.patch
@@ -0,0 +1,60 @@
+From 2c1d5189aedc7f90b603807a36a57254dc24471c Mon Sep 17 00:00:00 2001
+From: Dan Glastonbury <dglastonbury@mozilla.com>
+Date: Fri, 19 Jan 2018 11:56:58 +1000
+Subject: gecko: Change paths to vendored crates.
+
+
+diff --git a/media/audioipc/Cargo.toml b/media/audioipc/Cargo.toml
+index ede6064..d0a1979 100644
+--- a/media/audioipc/Cargo.toml
++++ b/media/audioipc/Cargo.toml
+@@ -1,2 +1,2 @@
+ [workspace]
+-members = ["audioipc", "client", "server", "ipctest"]
++members = ["audioipc", "client", "server"]
+diff --git a/audioipc/Cargo.toml b/audioipc/Cargo.toml
+index 669c6ff..308cb5c 100644
+--- a/media/audioipc/audioipc/Cargo.toml
++++ b/media/audioipc/audioipc/Cargo.toml
+@@ -8,7 +8,7 @@ authors = [
+ description = "Remote Cubeb IPC"
+
+ [dependencies]
+-cubeb-core = { git = "https://github.com/djg/cubeb-rs", version="^0.1" }
++cubeb-core = { path = "../../cubeb-rs/cubeb-core" }
+ bincode = "0.8"
+ bytes = "0.4"
+ # rayon-core in Gecko uses futures 0.1.13
+diff --git a/client/Cargo.toml b/client/Cargo.toml
+index c81b19a..9e3f8a5 100644
+--- a/media/audioipc/client/Cargo.toml
++++ b/media/audioipc/client/Cargo.toml
+@@ -9,8 +9,8 @@ description = "Cubeb Backend for talking to remote cubeb server."
+
+ [dependencies]
+ audioipc = { path="../audioipc" }
+-cubeb-backend = { git="https://github.com/djg/cubeb-rs", version="^0.2" }
+-cubeb-core = { git="https://github.com/djg/cubeb-rs", version="^0.1" }
++cubeb-backend = { path = "../../cubeb-rs/cubeb-backend" }
++cubeb-core = { path = "../../cubeb-rs/cubeb-core" }
+ # rayon-core in Gecko uses futures 0.1.13
+ futures = { version="=0.1.13", default-features=false, features=["use_std"] }
+ # futures-cpupool 0.1.5 matches futures 0.1.13
+diff --git a/server/Cargo.toml b/server/Cargo.toml
+index 5b79b83..01463be 100644
+--- a/media/audioipc/server/Cargo.toml
++++ b/media/audioipc/server/Cargo.toml
+@@ -9,8 +9,8 @@ description = "Remote cubeb server"
+
+ [dependencies]
+ audioipc = { path = "../audioipc" }
+-cubeb-core = { git = "https://github.com/djg/cubeb-rs", version="^0.1" }
+-cubeb = { git = "https://github.com/djg/cubeb-rs", version="^0.3.1" }
++cubeb-core = { path = "../../cubeb-rs/cubeb-core" }
++cubeb = { path = "../../cubeb-rs/cubeb-api" }
+ bytes = "0.4"
+ lazycell = "^0.4"
+ libc = "0.2"
+--
+2.10.2
+
--- a/media/audioipc/server/Cargo.toml
+++ b/media/audioipc/server/Cargo.toml
@@ -1,21 +1,26 @@
[package]
name = "audioipc-server"
-version = "0.1.0"
-authors = ["Dan Glastonbury <dan.glastonbury@gmail.com>"]
+version = "0.2.0"
+authors = [
+ "Matthew Gregan <kinetik@flim.org>",
+ "Dan Glastonbury <dan.glastonbury@gmail.com>"
+ ]
description = "Remote cubeb server"
[dependencies]
audioipc = { path = "../audioipc" }
cubeb-core = { path = "../../cubeb-rs/cubeb-core" }
cubeb = { path = "../../cubeb-rs/cubeb-api" }
bytes = "0.4"
lazycell = "^0.4"
libc = "0.2"
log = "^0.3.6"
-mio = "0.6.7"
-mio-uds = "0.6.4"
slab = "0.3.0"
+# rayon-core in Gecko uses futures 0.1.13
+futures = "=0.1.13"
+tokio-core = "0.1"
+tokio-uds = "0.1.7"
[dependencies.error-chain]
version = "0.11.0"
-default-features = false
+default-features = false
\ No newline at end of file
deleted file mode 100644
--- a/media/audioipc/server/src/channel.rs
+++ /dev/null
@@ -1,394 +0,0 @@
-/* NOTE: Imported from https://github.com/carllerche/mio with minimal
- modifications to build locally. License and copyright per
- https://raw.githubusercontent.com/carllerche/mio/master/LICENSE */
-
-//! Thread safe communication channel implementing `Evented`
-
-#![allow(unused_imports, dead_code, deprecated, missing_debug_implementations)]
-
-use mio::{Evented, Ready, Poll, PollOpt, Registration, SetReadiness, Token};
-use lazycell::{LazyCell, AtomicLazyCell};
-use std::any::Any;
-use std::fmt;
-use std::io;
-use std::error;
-use std::sync::{mpsc, Arc};
-use std::sync::atomic::{AtomicUsize, Ordering};
-
-/// Creates a new asynchronous channel, where the `Receiver` can be registered
-/// with `Poll`.
-pub fn channel<T>() -> (Sender<T>, Receiver<T>) {
- let (tx_ctl, rx_ctl) = ctl_pair();
- let (tx, rx) = mpsc::channel();
-
- let tx = Sender {
- tx: tx,
- ctl: tx_ctl,
- };
-
- let rx = Receiver {
- rx: rx,
- ctl: rx_ctl,
- };
-
- (tx, rx)
-}
-
-/// Creates a new synchronous, bounded channel where the `Receiver` can be
-/// registered with `Poll`.
-pub fn sync_channel<T>(bound: usize) -> (SyncSender<T>, Receiver<T>) {
- let (tx_ctl, rx_ctl) = ctl_pair();
- let (tx, rx) = mpsc::sync_channel(bound);
-
- let tx = SyncSender {
- tx: tx,
- ctl: tx_ctl,
- };
-
- let rx = Receiver {
- rx: rx,
- ctl: rx_ctl,
- };
-
- (tx, rx)
-}
-
-pub fn ctl_pair() -> (SenderCtl, ReceiverCtl) {
- let inner = Arc::new(Inner {
- pending: AtomicUsize::new(0),
- senders: AtomicUsize::new(1),
- set_readiness: AtomicLazyCell::new(),
- });
-
- let tx = SenderCtl {
- inner: inner.clone(),
- };
-
- let rx = ReceiverCtl {
- registration: LazyCell::new(),
- inner: inner,
- };
-
- (tx, rx)
-}
-
-/// Tracks messages sent on a channel in order to update readiness.
-pub struct SenderCtl {
- inner: Arc<Inner>,
-}
-
-/// Tracks messages received on a channel in order to track readiness.
-pub struct ReceiverCtl {
- registration: LazyCell<Registration>,
- inner: Arc<Inner>,
-}
-
-pub struct Sender<T> {
- tx: mpsc::Sender<T>,
- ctl: SenderCtl,
-}
-
-pub struct SyncSender<T> {
- tx: mpsc::SyncSender<T>,
- ctl: SenderCtl,
-}
-
-pub struct Receiver<T> {
- rx: mpsc::Receiver<T>,
- ctl: ReceiverCtl,
-}
-
-pub enum SendError<T> {
- Io(io::Error),
- Disconnected(T),
-}
-
-pub enum TrySendError<T> {
- Io(io::Error),
- Full(T),
- Disconnected(T),
-}
-
-struct Inner {
- // The number of outstanding messages for the receiver to read
- pending: AtomicUsize,
- // The number of sender handles
- senders: AtomicUsize,
- // The set readiness handle
- set_readiness: AtomicLazyCell<SetReadiness>,
-}
-
-impl<T> Sender<T> {
- pub fn send(&self, t: T) -> Result<(), SendError<T>> {
- self.tx.send(t)
- .map_err(SendError::from)
- .and_then(|_| {
- self.ctl.inc()?;
- Ok(())
- })
- }
-}
-
-impl<T> Clone for Sender<T> {
- fn clone(&self) -> Sender<T> {
- Sender {
- tx: self.tx.clone(),
- ctl: self.ctl.clone(),
- }
- }
-}
-
-impl<T> SyncSender<T> {
- pub fn send(&self, t: T) -> Result<(), SendError<T>> {
- self.tx.send(t)
- .map_err(From::from)
- .and_then(|_| {
- self.ctl.inc()?;
- Ok(())
- })
- }
-
- pub fn try_send(&self, t: T) -> Result<(), TrySendError<T>> {
- self.tx.try_send(t)
- .map_err(From::from)
- .and_then(|_| {
- self.ctl.inc()?;
- Ok(())
- })
- }
-}
-
-impl<T> Clone for SyncSender<T> {
- fn clone(&self) -> SyncSender<T> {
- SyncSender {
- tx: self.tx.clone(),
- ctl: self.ctl.clone(),
- }
- }
-}
-
-impl<T> Receiver<T> {
- pub fn try_recv(&self) -> Result<T, mpsc::TryRecvError> {
- self.rx.try_recv().and_then(|res| {
- let _ = self.ctl.dec();
- Ok(res)
- })
- }
-}
-
-impl<T> Evented for Receiver<T> {
- fn register(&self, poll: &Poll, token: Token, interest: Ready, opts: PollOpt) -> io::Result<()> {
- self.ctl.register(poll, token, interest, opts)
- }
-
- fn reregister(&self, poll: &Poll, token: Token, interest: Ready, opts: PollOpt) -> io::Result<()> {
- self.ctl.reregister(poll, token, interest, opts)
- }
-
- fn deregister(&self, poll: &Poll) -> io::Result<()> {
- self.ctl.deregister(poll)
- }
-}
-
-/*
- *
- * ===== SenderCtl / ReceiverCtl =====
- *
- */
-
-impl SenderCtl {
- /// Call to track that a message has been sent
- pub fn inc(&self) -> io::Result<()> {
- let cnt = self.inner.pending.fetch_add(1, Ordering::Acquire);
-
- if 0 == cnt {
- // Toggle readiness to readable
- if let Some(set_readiness) = self.inner.set_readiness.borrow() {
- set_readiness.set_readiness(Ready::readable())?;
- }
- }
-
- Ok(())
- }
-}
-
-impl Clone for SenderCtl {
- fn clone(&self) -> SenderCtl {
- self.inner.senders.fetch_add(1, Ordering::Relaxed);
- SenderCtl { inner: self.inner.clone() }
- }
-}
-
-impl Drop for SenderCtl {
- fn drop(&mut self) {
- if self.inner.senders.fetch_sub(1, Ordering::Release) == 1 {
- let _ = self.inc();
- }
- }
-}
-
-impl ReceiverCtl {
- pub fn dec(&self) -> io::Result<()> {
- let first = self.inner.pending.load(Ordering::Acquire);
-
- if first == 1 {
- // Unset readiness
- if let Some(set_readiness) = self.inner.set_readiness.borrow() {
- set_readiness.set_readiness(Ready::empty())?;
- }
- }
-
- // Decrement
- let second = self.inner.pending.fetch_sub(1, Ordering::AcqRel);
-
- if first == 1 && second > 1 {
- // There are still pending messages. Since readiness was
- // previously unset, it must be reset here
- if let Some(set_readiness) = self.inner.set_readiness.borrow() {
- set_readiness.set_readiness(Ready::readable())?;
- }
- }
-
- Ok(())
- }
-}
-
-impl Evented for ReceiverCtl {
- fn register(&self, poll: &Poll, token: Token, interest: Ready, opts: PollOpt) -> io::Result<()> {
- if self.registration.borrow().is_some() {
- return Err(io::Error::new(io::ErrorKind::Other, "receiver already registered"));
- }
-
- let (registration, set_readiness) = Registration::new(poll, token, interest, opts);
-
-
- if self.inner.pending.load(Ordering::Relaxed) > 0 {
- // TODO: Don't drop readiness
- let _ = set_readiness.set_readiness(Ready::readable());
- }
-
- self.registration.fill(registration).ok().expect("unexpected state encountered");
- self.inner.set_readiness.fill(set_readiness).ok().expect("unexpected state encountered");
-
- Ok(())
- }
-
- fn reregister(&self, poll: &Poll, token: Token, interest: Ready, opts: PollOpt) -> io::Result<()> {
- match self.registration.borrow() {
- Some(registration) => registration.update(poll, token, interest, opts),
- None => Err(io::Error::new(io::ErrorKind::Other, "receiver not registered")),
- }
- }
-
- fn deregister(&self, poll: &Poll) -> io::Result<()> {
- match self.registration.borrow() {
- Some(registration) => registration.deregister(poll),
- None => Err(io::Error::new(io::ErrorKind::Other, "receiver not registered")),
- }
- }
-}
-
-/*
- *
- * ===== Error conversions =====
- *
- */
-
-impl<T> From<mpsc::SendError<T>> for SendError<T> {
- fn from(src: mpsc::SendError<T>) -> SendError<T> {
- SendError::Disconnected(src.0)
- }
-}
-
-impl<T> From<io::Error> for SendError<T> {
- fn from(src: io::Error) -> SendError<T> {
- SendError::Io(src)
- }
-}
-
-impl<T> From<mpsc::TrySendError<T>> for TrySendError<T> {
- fn from(src: mpsc::TrySendError<T>) -> TrySendError<T> {
- match src {
- mpsc::TrySendError::Full(v) => TrySendError::Full(v),
- mpsc::TrySendError::Disconnected(v) => TrySendError::Disconnected(v),
- }
- }
-}
-
-impl<T> From<mpsc::SendError<T>> for TrySendError<T> {
- fn from(src: mpsc::SendError<T>) -> TrySendError<T> {
- TrySendError::Disconnected(src.0)
- }
-}
-
-impl<T> From<io::Error> for TrySendError<T> {
- fn from(src: io::Error) -> TrySendError<T> {
- TrySendError::Io(src)
- }
-}
-
-/*
- *
- * ===== Implement Error, Debug and Display for Errors =====
- *
- */
-
-impl<T: Any> error::Error for SendError<T> {
- fn description(&self) -> &str {
- match self {
- &SendError::Io(ref io_err) => io_err.description(),
- &SendError::Disconnected(..) => "Disconnected",
- }
- }
-}
-
-impl<T: Any> error::Error for TrySendError<T> {
- fn description(&self) -> &str {
- match self {
- &TrySendError::Io(ref io_err) => io_err.description(),
- &TrySendError::Full(..) => "Full",
- &TrySendError::Disconnected(..) => "Disconnected",
- }
- }
-}
-
-impl<T> fmt::Debug for SendError<T> {
- fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
- format_send_error(self, f)
- }
-}
-
-impl<T> fmt::Display for SendError<T> {
- fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
- format_send_error(self, f)
- }
-}
-
-impl<T> fmt::Debug for TrySendError<T> {
- fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
- format_try_send_error(self, f)
- }
-}
-
-impl<T> fmt::Display for TrySendError<T> {
- fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
- format_try_send_error(self, f)
- }
-}
-
-#[inline]
-fn format_send_error<T>(e: &SendError<T>, f: &mut fmt::Formatter) -> fmt::Result {
- match e {
- &SendError::Io(ref io_err) => write!(f, "{}", io_err),
- &SendError::Disconnected(..) => write!(f, "Disconnected"),
- }
-}
-
-#[inline]
-fn format_try_send_error<T>(e: &TrySendError<T>, f: &mut fmt::Formatter) -> fmt::Result {
- match e {
- &TrySendError::Io(ref io_err) => write!(f, "{}", io_err),
- &TrySendError::Full(..) => write!(f, "Full"),
- &TrySendError::Disconnected(..) => write!(f, "Disconnected"),
- }
-}
--- a/media/audioipc/server/src/lib.rs
+++ b/media/audioipc/server/src/lib.rs
@@ -3,81 +3,99 @@ extern crate error_chain;
#[macro_use]
extern crate log;
extern crate audioipc;
extern crate bytes;
extern crate cubeb;
extern crate cubeb_core;
+extern crate futures;
extern crate lazycell;
extern crate libc;
-extern crate mio;
-extern crate mio_uds;
extern crate slab;
+extern crate tokio_core;
+extern crate tokio_uds;
-use audioipc::AutoCloseFd;
-use audioipc::async::{Async, AsyncRecvFd, AsyncSendFd};
-use audioipc::codec::{Decoder, encode};
-use audioipc::messages::{ClientMessage, DeviceInfo, ServerMessage, StreamInitParams, StreamParams};
+use audioipc::codec::LengthDelimitedCodec;
+use audioipc::core;
+use audioipc::fd_passing::{framed_with_fds, FramedWithFds};
+use audioipc::frame::{framed, Framed};
+use audioipc::messages::{CallbackReq, CallbackResp, ClientMessage, DeviceInfo, ServerMessage,
+ StreamCreate, StreamInitParams, StreamParams};
+use audioipc::rpc;
use audioipc::shm::{SharedMemReader, SharedMemWriter};
-use bytes::{Bytes, BytesMut};
use cubeb_core::binding::Binding;
use cubeb_core::ffi;
-use mio::{Ready, Token};
-use mio_uds::UnixStream;
-use std::{slice, thread};
-use std::collections::{HashSet, VecDeque};
+use futures::Future;
+use futures::future::{self, FutureResult};
+use futures::sync::oneshot;
+use std::{ptr, slice};
+use std::cell::RefCell;
use std::convert::From;
-use std::io::Cursor;
+use std::error::Error;
use std::os::raw::c_void;
+use std::os::unix::net;
use std::os::unix::prelude::*;
-use std::sync::{Mutex, MutexGuard};
-
-mod channel;
+use tokio_core::reactor::Remote;
+use tokio_uds::UnixStream;
pub mod errors {
error_chain! {
links {
AudioIPC(::audioipc::errors::Error, ::audioipc::errors::ErrorKind);
}
foreign_links {
Cubeb(::cubeb_core::Error);
Io(::std::io::Error);
+ Canceled(::futures::sync::oneshot::Canceled);
}
}
}
use errors::*;
+thread_local!(static CONTEXT_KEY: RefCell<Option<cubeb::Result<cubeb::Context>>> = RefCell::new(None));
+
+fn with_local_context<T, F>(f: F) -> T
+where
+ F: FnOnce(&cubeb::Result<cubeb::Context>) -> T
+{
+ CONTEXT_KEY.with(|k| {
+ let mut context = k.borrow_mut();
+ if context.is_none() {
+ *context = Some(cubeb::Context::init("AudioIPC Server", None));
+ }
+ f(context.as_ref().unwrap())
+ })
+}
+
// TODO: Remove and let caller allocate based on cubeb backend requirements.
const SHM_AREA_SIZE: usize = 2 * 1024 * 1024;
-// The size in which the server conns slab is grown.
-const SERVER_CONN_CHUNK_SIZE: usize = 16;
-
// The size in which the stream slab is grown.
const STREAM_CONN_CHUNK_SIZE: usize = 64;
+struct CallbackClient;
+
+impl rpc::Client for CallbackClient {
+ type Request = CallbackReq;
+ type Response = CallbackResp;
+ type Transport = Framed<UnixStream, LengthDelimitedCodec<Self::Request, Self::Response>>;
+}
+
// TODO: this should forward to the client.
struct Callback {
/// Size of input frame in bytes
input_frame_size: u16,
/// Size of output frame in bytes
output_frame_size: u16,
- connection: Mutex<audioipc::Connection>,
input_shm: SharedMemWriter,
- output_shm: SharedMemReader
-}
-
-impl Callback {
- #[doc(hidden)]
- fn connection(&self) -> MutexGuard<audioipc::Connection> {
- self.connection.lock().unwrap()
- }
+ output_shm: SharedMemReader,
+ rpc: rpc::ClientProxy<CallbackReq, CallbackResp>
}
impl cubeb::StreamCallback for Callback {
type Frame = u8;
fn data_callback(&mut self, input: &[u8], output: &mut [u8]) -> isize {
trace!("Stream data callback: {} {}", input.len(), output.len());
@@ -89,340 +107,232 @@ impl cubeb::StreamCallback for Callback
let real_output = unsafe {
let size_bytes = output.len() * self.output_frame_size as usize;
trace!("Resize output to {}", size_bytes);
slice::from_raw_parts_mut(output.as_mut_ptr(), size_bytes)
};
self.input_shm.write(real_input).unwrap();
- let mut conn = self.connection();
- let r = conn.send(ClientMessage::StreamDataCallback(
- output.len() as isize,
- self.output_frame_size as usize
- ));
- if r.is_err() {
- debug!("data_callback: Failed to send to client - got={:?}", r);
- return -1;
- }
+ let r = self.rpc
+ .call(CallbackReq::Data(
+ output.len() as isize,
+ self.output_frame_size as usize
+ ))
+ .wait();
- let r = conn.receive();
match r {
- Ok(ServerMessage::StreamDataCallback(cb_result)) => {
- if cb_result >= 0 {
- let len = cb_result as usize * self.output_frame_size as usize;
- self.output_shm.read(&mut real_output[..len]).unwrap();
- cb_result
- } else {
- cb_result
- }
+ Ok(CallbackResp::Data(cb_result)) => if cb_result >= 0 {
+ let len = cb_result as usize * self.output_frame_size as usize;
+ self.output_shm.read(&mut real_output[..len]).unwrap();
+ cb_result
+ } else {
+ cb_result
},
_ => {
debug!("Unexpected message {:?} during data_callback", r);
-1
- },
+ }
}
}
fn state_callback(&mut self, state: cubeb::State) {
info!("Stream state callback: {:?}", state);
// TODO: Share this conversion with the same code in cubeb-rs?
let state = match state {
cubeb::State::Started => ffi::CUBEB_STATE_STARTED,
cubeb::State::Stopped => ffi::CUBEB_STATE_STOPPED,
cubeb::State::Drained => ffi::CUBEB_STATE_DRAINED,
- cubeb::State::Error => ffi::CUBEB_STATE_ERROR,
+ cubeb::State::Error => ffi::CUBEB_STATE_ERROR
};
- let mut conn = self.connection();
- let r = conn.send(ClientMessage::StreamStateCallback(state));
- if r.is_err() {
- debug!("state_callback: Failed to send to client - got={:?}", r);
- }
- // Bug 1414623 - We need to block on an ACK from the client
- // side to make state_callback synchronous. If not, then there
- // exists a race on cubeb_stream_stop()/cubeb_stream_destroy()
- // in Gecko that results in a UAF.
- let r = conn.receive();
+ let r = self.rpc.call(CallbackReq::State(state)).wait();
+
match r {
- Ok(ServerMessage::StreamStateCallback) => {},
+ Ok(CallbackResp::State) => {},
_ => {
- debug!("Unexpected message {:?} during state_callback", r);
- },
- }
- }
-}
-
-impl Drop for Callback {
- fn drop(&mut self) {
- let mut conn = self.connection();
- let r = conn.send(ClientMessage::StreamDestroyed);
- if r.is_err() {
- debug!("Callback::drop failed to send StreamDestroyed = {:?}", r);
- }
+ debug!("Unexpected message {:?} during callback", r);
+ }
+ };
}
}
-type Slab<T> = slab::Slab<T, Token>;
type StreamSlab = slab::Slab<cubeb::Stream<Callback>, usize>;
-// TODO: Server command token must be outside range used by server.connections slab.
-// usize::MAX is already used internally in mio.
-const CMD: Token = Token(std::usize::MAX - 1);
-
-struct ServerConn {
- //connection: audioipc::Connection,
- io: UnixStream,
- token: Option<Token>,
- streams: StreamSlab,
- decoder: Decoder,
- recv_buffer: BytesMut,
- send_buffer: BytesMut,
- pending_send: VecDeque<(Bytes, Option<AutoCloseFd>)>,
- device_ids: HashSet<usize>
+pub struct CubebServer {
+ cb_remote: Remote,
+ streams: StreamSlab
}
-impl ServerConn {
- fn new(io: UnixStream) -> ServerConn {
- let mut sc = ServerConn {
- io: io,
- token: None,
- // TODO: Handle increasing slab size. Pick a good default size.
- streams: StreamSlab::with_capacity(STREAM_CONN_CHUNK_SIZE),
- decoder: Decoder::new(),
- recv_buffer: BytesMut::with_capacity(4096),
- send_buffer: BytesMut::with_capacity(4096),
- pending_send: VecDeque::new(),
- device_ids: HashSet::new()
- };
- sc.device_ids.insert(0); // nullptr is always a valid (default) device id.
- sc
- }
-
- fn process_read(&mut self, context: &Result<cubeb::Context>) -> Result<Ready> {
- // According to *something*, processing non-blocking stream
- // should attempt to read until EWOULDBLOCK is returned.
- while let Async::Ready((n, fd)) = try!(self.io.recv_buf_fd(&mut self.recv_buffer)) {
- trace!("Received {} bytes and fd {:?}", n, fd);
+impl rpc::Server for CubebServer {
+ type Request = ServerMessage;
+ type Response = ClientMessage;
+ type Future = FutureResult<Self::Response, ()>;
+ type Transport =
+ FramedWithFds<UnixStream, LengthDelimitedCodec<Self::Response, Self::Request>>;
- // Reading 0 signifies EOF
- if n == 0 {
- return Err(
- ::errors::ErrorKind::AudioIPC(::audioipc::errors::ErrorKind::Disconnected).into()
- );
- }
-
- if let Some(fd) = fd {
- trace!("Unexpectedly received an fd from client.");
- let _ = unsafe { AutoCloseFd::from_raw_fd(fd) };
- }
+ fn process(&mut self, req: Self::Request) -> Self::Future {
+ let resp = with_local_context(|context| match *context {
+ Err(_) => error(cubeb::Error::new()),
+ Ok(ref context) => self.process_msg(context, &req)
+ });
+ future::ok(resp)
+ }
+}
- // Process all the complete messages contained in
- // send.recv_buffer. It's possible that a read might not
- // return a complete message, so self.decoder.decode
- // returns Ok(None).
- loop {
- match self.decoder.decode::<ServerMessage>(&mut self.recv_buffer) {
- Ok(Some(msg)) => {
- info!("ServerConn::process: got {:?}", msg);
- try!(self.process_msg(&msg, context));
- },
- Ok(None) => {
- break;
- },
- Err(e) => {
- return Err(e).chain_err(|| "Failed to decoder ServerMessage");
- },
- }
- }
+impl CubebServer {
+ pub fn new(cb_remote: Remote) -> Self {
+ CubebServer {
+ cb_remote: cb_remote,
+ streams: StreamSlab::with_capacity(STREAM_CONN_CHUNK_SIZE)
}
-
- // Send any pending responses to client.
- self.flush_pending_send()
}
// Process a request coming from the client.
- fn process_msg(&mut self, msg: &ServerMessage, context: &Result<cubeb::Context>) -> Result<()> {
- let resp: ClientMessage = if let Ok(ref context) = *context {
- if let ServerMessage::StreamInit(ref params) = *msg {
- return self.process_stream_init(context, params);
- };
+ fn process_msg(&mut self, context: &cubeb::Context, msg: &ServerMessage) -> ClientMessage {
+ let resp: ClientMessage = match *msg {
+ ServerMessage::ClientConnect => panic!("already connected"),
- match *msg {
- ServerMessage::ClientConnect => {
- panic!("already connected");
- },
+ ServerMessage::ClientDisconnect => {
+ // TODO:
+ //self.connection.client_disconnect();
+ ClientMessage::ClientDisconnected
+ },
- ServerMessage::ClientDisconnect => {
- // TODO:
- //self.connection.client_disconnect();
- ClientMessage::ClientDisconnected
- },
+ ServerMessage::ContextGetBackendId => ClientMessage::ContextBackendId(),
- ServerMessage::ContextGetBackendId => ClientMessage::ContextBackendId(),
+ ServerMessage::ContextGetMaxChannelCount => context
+ .max_channel_count()
+ .map(ClientMessage::ContextMaxChannelCount)
+ .unwrap_or_else(error),
- ServerMessage::ContextGetMaxChannelCount => {
- context
- .max_channel_count()
- .map(ClientMessage::ContextMaxChannelCount)
- .unwrap_or_else(error)
- },
+ ServerMessage::ContextGetMinLatency(ref params) => {
+ let format = cubeb::SampleFormat::from(params.format);
+ let layout = cubeb::ChannelLayout::from(params.layout);
- ServerMessage::ContextGetMinLatency(ref params) => {
- let format = cubeb::SampleFormat::from(params.format);
- let layout = cubeb::ChannelLayout::from(params.layout);
-
- let params = cubeb::StreamParamsBuilder::new()
- .format(format)
- .rate(u32::from(params.rate))
- .channels(u32::from(params.channels))
- .layout(layout)
- .take();
+ let params = cubeb::StreamParamsBuilder::new()
+ .format(format)
+ .rate(u32::from(params.rate))
+ .channels(u32::from(params.channels))
+ .layout(layout)
+ .take();
- context
- .min_latency(¶ms)
- .map(ClientMessage::ContextMinLatency)
- .unwrap_or_else(error)
- },
+ context
+ .min_latency(¶ms)
+ .map(ClientMessage::ContextMinLatency)
+ .unwrap_or_else(error)
+ },
- ServerMessage::ContextGetPreferredSampleRate => {
- context
- .preferred_sample_rate()
- .map(ClientMessage::ContextPreferredSampleRate)
- .unwrap_or_else(error)
- },
+ ServerMessage::ContextGetPreferredSampleRate => context
+ .preferred_sample_rate()
+ .map(ClientMessage::ContextPreferredSampleRate)
+ .unwrap_or_else(error),
- ServerMessage::ContextGetPreferredChannelLayout => {
- context
- .preferred_channel_layout()
- .map(|l| ClientMessage::ContextPreferredChannelLayout(l as _))
- .unwrap_or_else(error)
- },
+ ServerMessage::ContextGetPreferredChannelLayout => context
+ .preferred_channel_layout()
+ .map(|l| ClientMessage::ContextPreferredChannelLayout(l as _))
+ .unwrap_or_else(error),
- ServerMessage::ContextGetDeviceEnumeration(device_type) => {
- context
- .enumerate_devices(cubeb::DeviceType::from_bits_truncate(device_type))
- .map(|devices| {
- let v: Vec<DeviceInfo> = devices.iter()
- .map(|i| i.raw().into())
- .collect();
- for i in &v {
- self.device_ids.insert(i.devid);
- }
- ClientMessage::ContextEnumeratedDevices(v)
- })
- .unwrap_or_else(error)
- },
+ ServerMessage::ContextGetDeviceEnumeration(device_type) => context
+ .enumerate_devices(cubeb::DeviceType::from_bits_truncate(device_type))
+ .map(|devices| {
+ let v: Vec<DeviceInfo> = devices.iter().map(|i| i.raw().into()).collect();
+ ClientMessage::ContextEnumeratedDevices(v)
+ })
+ .unwrap_or_else(error),
+
+ ServerMessage::StreamInit(ref params) => self.process_stream_init(context, params)
+ .unwrap_or_else(|_| error(cubeb::Error::new())),
- ServerMessage::StreamInit(_) => {
- panic!("StreamInit should have already been handled.");
- },
-
- ServerMessage::StreamDestroy(stm_tok) => {
- self.streams.remove(stm_tok);
- ClientMessage::StreamDestroyed
- },
+ ServerMessage::StreamDestroy(stm_tok) => {
+ self.streams.remove(stm_tok);
+ ClientMessage::StreamDestroyed
+ },
- ServerMessage::StreamStart(stm_tok) => {
- let _ = self.streams[stm_tok].start();
- ClientMessage::StreamStarted
- },
+ ServerMessage::StreamStart(stm_tok) => {
+ let _ = self.streams[stm_tok].start();
+ ClientMessage::StreamStarted
+ },
- ServerMessage::StreamStop(stm_tok) => {
- let _ = self.streams[stm_tok].stop();
- ClientMessage::StreamStopped
- },
+ ServerMessage::StreamStop(stm_tok) => {
+ let _ = self.streams[stm_tok].stop();
+ ClientMessage::StreamStopped
+ },
- ServerMessage::StreamGetPosition(stm_tok) => {
- self.streams[stm_tok]
- .position()
- .map(ClientMessage::StreamPosition)
- .unwrap_or_else(error)
- },
+ ServerMessage::StreamResetDefaultDevice(stm_tok) => {
+ let _ = self.streams[stm_tok].reset_default_device();
+ ClientMessage::StreamDefaultDeviceReset
+ },
- ServerMessage::StreamGetLatency(stm_tok) => {
- self.streams[stm_tok]
- .latency()
- .map(ClientMessage::StreamLatency)
- .unwrap_or_else(error)
- },
+ ServerMessage::StreamGetPosition(stm_tok) => self.streams[stm_tok]
+ .position()
+ .map(ClientMessage::StreamPosition)
+ .unwrap_or_else(error),
- ServerMessage::StreamSetVolume(stm_tok, volume) => {
- self.streams[stm_tok]
- .set_volume(volume)
- .map(|_| ClientMessage::StreamVolumeSet)
- .unwrap_or_else(error)
- },
+ ServerMessage::StreamGetLatency(stm_tok) => self.streams[stm_tok]
+ .latency()
+ .map(ClientMessage::StreamLatency)
+ .unwrap_or_else(error),
- ServerMessage::StreamSetPanning(stm_tok, panning) => {
- self.streams[stm_tok]
- .set_panning(panning)
- .map(|_| ClientMessage::StreamPanningSet)
- .unwrap_or_else(error)
- },
+ ServerMessage::StreamSetVolume(stm_tok, volume) => self.streams[stm_tok]
+ .set_volume(volume)
+ .map(|_| ClientMessage::StreamVolumeSet)
+ .unwrap_or_else(error),
- ServerMessage::StreamGetCurrentDevice(stm_tok) => {
- self.streams[stm_tok]
- .current_device()
- .map(|device| ClientMessage::StreamCurrentDevice(device.into()))
- .unwrap_or_else(error)
- },
+ ServerMessage::StreamSetPanning(stm_tok, panning) => self.streams[stm_tok]
+ .set_panning(panning)
+ .map(|_| ClientMessage::StreamPanningSet)
+ .unwrap_or_else(error),
- _ => {
- bail!("Unexpected Message");
- },
- }
- } else {
- error(cubeb::Error::new())
+ ServerMessage::StreamGetCurrentDevice(stm_tok) => self.streams[stm_tok]
+ .current_device()
+ .map(|device| ClientMessage::StreamCurrentDevice(device.into()))
+ .unwrap_or_else(error)
};
debug!("process_msg: req={:?}, resp={:?}", msg, resp);
- self.queue_message(resp)
+ resp
}
// Stream init is special, so it's been separated from process_msg.
- fn process_stream_init(&mut self, context: &cubeb::Context, params: &StreamInitParams) -> Result<()> {
+ fn process_stream_init(
+ &mut self,
+ context: &cubeb::Context,
+ params: &StreamInitParams
+ ) -> Result<ClientMessage> {
fn opt_stream_params(params: Option<&StreamParams>) -> Option<cubeb::StreamParams> {
params.and_then(|p| {
let raw = ffi::cubeb_stream_params::from(p);
Some(unsafe { cubeb::StreamParams::from_raw(&raw as *const _) })
})
}
fn frame_size_in_bytes(params: Option<cubeb::StreamParams>) -> u16 {
params
.map(|p| {
let sample_size = match p.format() {
- cubeb::SampleFormat::S16LE |
- cubeb::SampleFormat::S16BE |
- cubeb::SampleFormat::S16NE => 2,
- cubeb::SampleFormat::Float32LE |
- cubeb::SampleFormat::Float32BE |
- cubeb::SampleFormat::Float32NE => 4,
+ cubeb::SampleFormat::S16LE
+ | cubeb::SampleFormat::S16BE
+ | cubeb::SampleFormat::S16NE => 2,
+ cubeb::SampleFormat::Float32LE
+ | cubeb::SampleFormat::Float32BE
+ | cubeb::SampleFormat::Float32NE => 4
};
let channel_count = p.channels() as u16;
sample_size * channel_count
})
.unwrap_or(0u16)
}
-
- if !self.device_ids.contains(¶ms.input_device) {
- bail!("Invalid input_device passed to stream_init");
- }
// TODO: Yuck!
- let input_device = unsafe { cubeb::DeviceId::from_raw(params.input_device as *const _) };
-
- if !self.device_ids.contains(¶ms.output_device) {
- bail!("Invalid output_device passed to stream_init");
- }
- // TODO: Yuck!
- let output_device = unsafe { cubeb::DeviceId::from_raw(params.output_device as *const _) };
-
+ let input_device =
+ unsafe { cubeb::DeviceId::from_raw(params.input_device as *const _) };
+ let output_device =
+ unsafe { cubeb::DeviceId::from_raw(params.output_device as *const _) };
let latency = params.latency_frames;
let mut builder = cubeb::StreamInitOptionsBuilder::new();
builder
.input_device(input_device)
.output_device(output_device)
.latency(latency);
if let Some(ref stream_name) = params.stream_name {
@@ -436,405 +346,172 @@ impl ServerConn {
if let Some(ref osp) = output_stream_params {
builder.output_stream_param(osp);
}
let params = builder.take();
let input_frame_size = frame_size_in_bytes(input_stream_params);
let output_frame_size = frame_size_in_bytes(output_stream_params);
- let (conn1, conn2) = audioipc::Connection::pair()?;
- info!("Created connection pair: {:?}-{:?}", conn1, conn2);
+ let (stm1, stm2) = net::UnixStream::pair()?;
+ info!("Created callback pair: {:?}-{:?}", stm1, stm2);
+ let (input_shm, input_file) =
+ SharedMemWriter::new(&audioipc::get_shm_path("input"), SHM_AREA_SIZE)?;
+ let (output_shm, output_file) =
+ SharedMemReader::new(&audioipc::get_shm_path("output"), SHM_AREA_SIZE)?;
- let (input_shm, input_file) = SharedMemWriter::new(&audioipc::get_shm_path("input"), SHM_AREA_SIZE)?;
- let (output_shm, output_file) = SharedMemReader::new(&audioipc::get_shm_path("output"), SHM_AREA_SIZE)?;
+ // This code is currently running on the Client/Server RPC
+ // handling thread. We need to move the registration of the
+ // bind_client to the callback RPC handling thread. This is
+ // done by spawning a future on cb_remote.
+
+ let id = core::handle().id();
- let err = match context.stream_init(
- ¶ms,
- Callback {
- input_frame_size: input_frame_size,
- output_frame_size: output_frame_size,
- connection: Mutex::new(conn2),
- input_shm: input_shm,
- output_shm: output_shm
- }
- ) {
- Ok(stream) => {
+ let (tx, rx) = oneshot::channel();
+ self.cb_remote.spawn(move |handle| {
+ // Ensure we're running on a loop different to the one
+ // invoking spawn_fn.
+ assert_ne!(id, handle.id());
+ let stream = UnixStream::from_stream(stm2, handle).unwrap();
+ let transport = framed(stream, Default::default());
+ let rpc = rpc::bind_client::<CallbackClient>(transport, handle);
+ drop(tx.send(rpc));
+ Ok(())
+ });
+
+ let rpc: rpc::ClientProxy<CallbackReq, CallbackResp> = match rx.wait() {
+ Ok(rpc) => rpc,
+ Err(_) => bail!("Failed to create callback rpc.")
+ };
+
+ context
+ .stream_init(
+ ¶ms,
+ Callback {
+ input_frame_size: input_frame_size,
+ output_frame_size: output_frame_size,
+ input_shm: input_shm,
+ output_shm: output_shm,
+ rpc: rpc
+ }
+ )
+ .and_then(|stream| {
if !self.streams.has_available() {
trace!(
"server connection ran out of stream slots. reserving {} more.",
STREAM_CONN_CHUNK_SIZE
);
self.streams.reserve_exact(STREAM_CONN_CHUNK_SIZE);
}
let stm_tok = match self.streams.vacant_entry() {
Some(entry) => {
- debug!(
- "Registering stream {:?}",
- entry.index(),
- );
+ debug!("Registering stream {:?}", entry.index(),);
entry.insert(stream).index()
},
None => {
// TODO: Turn into error
- panic!("Failed to insert stream into slab. No entries");
- },
+ panic!("Failed to insert stream into slab. No entries")
+ }
};
- try!(self.queue_init_messages(
- stm_tok,
- conn1,
- input_file,
- output_file
- ));
- None
- },
- Err(e) => Some(error(e)),
- };
-
- if let Some(err) = err {
- try!(self.queue_message(err))
- }
-
- Ok(())
- }
-
- fn queue_init_messages<T, U, V>(&mut self, stm_tok: usize, conn: T, input_file: U, output_file: V) -> Result<()>
- where
- T: IntoRawFd,
- U: IntoRawFd,
- V: IntoRawFd,
- {
- try!(self.queue_message_fd(
- ClientMessage::StreamCreated(stm_tok),
- conn
- ));
- try!(self.queue_message_fd(
- ClientMessage::StreamCreatedInputShm,
- input_file
- ));
- try!(self.queue_message_fd(
- ClientMessage::StreamCreatedOutputShm,
- output_file
- ));
- Ok(())
- }
-
- fn queue_message(&mut self, msg: ClientMessage) -> Result<()> {
- debug!("queue_message: {:?}", msg);
- encode::<ClientMessage>(&mut self.send_buffer, &msg).or_else(|e| {
- Err(e).chain_err(|| "Failed to encode msg into send buffer")
- })
- }
-
- // Since send_fd supports sending one RawFd at a time, queuing a
- // message with a RawFd forces use to take the current send_buffer
- // and move it pending queue.
- fn queue_message_fd<FD: IntoRawFd>(&mut self, msg: ClientMessage, fd: FD) -> Result<()> {
- let fd = fd.into_raw_fd();
- debug!("queue_message_fd: {:?} {:?}", msg, fd);
- try!(self.queue_message(msg));
- self.take_pending_send(Some(fd));
- Ok(())
- }
-
- // Take the current messages in the send_buffer and move them to
- // pending queue.
- fn take_pending_send(&mut self, fd: Option<RawFd>) {
- let pending = self.send_buffer.take().freeze();
- debug!("take_pending_send: ({:?} {:?})", pending, fd);
- self.pending_send.push_back((
- pending,
- fd.map(|fd| unsafe { AutoCloseFd::from_raw_fd(fd) })
- ));
- }
-
- // Process the pending queue and send them to client.
- fn flush_pending_send(&mut self) -> Result<Ready> {
- debug!("flush_pending_send");
- // take any pending messages in the send buffer.
- if !self.send_buffer.is_empty() {
- self.take_pending_send(None);
- }
-
- trace!("pending queue: {:?}", self.pending_send);
-
- let mut result = Ready::readable();
- let mut processed = 0;
-
- for &mut (ref mut buf, ref mut fd) in &mut self.pending_send {
- trace!("sending buf {:?}, fd {:?}", buf, fd);
- let r = {
- let mut src = Cursor::new(buf.as_ref());
- let fd = match *fd {
- Some(ref fd) => Some(fd.as_raw_fd()),
- None => None,
- };
- try!(self.io.send_buf_fd(&mut src, fd))
- };
- match r {
- Async::Ready(n) if n == buf.len() => {
- processed += 1;
- },
- Async::Ready(n) => {
- let _ = buf.split_to(n);
- let _ = fd.take();
- result.insert(Ready::writable());
- break;
- },
- Async::NotReady => {
- result.insert(Ready::writable());
- },
- }
- }
-
- debug!("processed {} buffers", processed);
-
- self.pending_send = self.pending_send.split_off(processed);
-
- trace!("pending queue: {:?}", self.pending_send);
-
- Ok(result)
+ Ok(ClientMessage::StreamCreated(StreamCreate {
+ token: stm_tok,
+ fds: [
+ stm1.into_raw_fd(),
+ input_file.into_raw_fd(),
+ output_file.into_raw_fd(),
+ ]
+ }))
+ })
+ .map_err(|e| e.into())
}
}
-pub struct Server {
- // Ok(None) - Server hasn't tried to create cubeb::Context.
- // Ok(Some(ctx)) - Server has successfully created cubeb::Context.
- // Err(_) - Server has tried and failed to create cubeb::Context.
- // Don't try again.
- context: Option<Result<cubeb::Context>>,
- conns: Slab<ServerConn>,
- rx: channel::Receiver<Command>,
- tx: std::sync::mpsc::Sender<Response>,
+struct ServerWrapper {
+ core_thread: core::CoreThread,
+ callback_thread: core::CoreThread
}
-impl Drop for Server {
- fn drop(&mut self) {
- // ServerConns rely on the cubeb context, so we must free them
- // explicitly before the context is dropped.
- if !self.conns.is_empty() {
- debug!("dropping Server with {} live ServerConns", self.conns.len());
- self.conns.clear();
- }
+fn run() -> Result<ServerWrapper> {
+ trace!("Starting up cubeb audio server event loop thread...");
+
+ let callback_thread = try!(
+ core::spawn_thread("AudioIPC Callback RPC", || {
+ trace!("Starting up cubeb audio callback event loop thread...");
+ Ok(())
+ }).or_else(|e| {
+ debug!(
+ "Failed to start cubeb audio callback event loop thread: {:?}",
+ e.description()
+ );
+ Err(e)
+ })
+ );
+
+ let core_thread = try!(
+ core::spawn_thread("AudioIPC Server RPC", move || Ok(())).or_else(|e| {
+ debug!(
+ "Failed to cubeb audio core event loop thread: {:?}",
+ e.description()
+ );
+ Err(e)
+ })
+ );
+
+ Ok(ServerWrapper {
+ core_thread: core_thread,
+ callback_thread: callback_thread
+ })
+}
+
+#[no_mangle]
+pub extern "C" fn audioipc_server_start() -> *mut c_void {
+ match run() {
+ Ok(server) => Box::into_raw(Box::new(server)) as *mut _,
+ Err(_) => ptr::null_mut() as *mut _
}
}
-impl Server {
- pub fn new(rx: channel::Receiver<Command>,
- tx: std::sync::mpsc::Sender<Response>) -> Server {
- Server {
- context: None,
- conns: slab::Slab::with_capacity(SERVER_CONN_CHUNK_SIZE),
- rx: rx,
- tx: tx,
- }
- }
-
- fn handle_new_connection(&mut self, poll: &mut mio::Poll, client_socket: UnixStream) -> Result<()> {
- if !self.conns.has_available() {
- trace!(
- "server ran out of connection slots. reserving {} more.",
- SERVER_CONN_CHUNK_SIZE
- );
- self.conns.reserve_exact(SERVER_CONN_CHUNK_SIZE);
- }
-
- let token = match self.conns.vacant_entry() {
- Some(entry) => {
- debug!("registering {:?}", entry.index());
- let cxn = ServerConn::new(client_socket);
- entry.insert(cxn).index()
- },
- None => {
- panic!("failed to insert connection");
- },
- };
+#[no_mangle]
+pub extern "C" fn audioipc_server_new_client(p: *mut c_void) -> libc::c_int {
+ let (wait_tx, wait_rx) = oneshot::channel();
+ let wrapper: &ServerWrapper = unsafe { &*(p as *mut _) };
- // Register the connection
- self.conns[token].token = Some(token);
- poll.register(
- &self.conns[token].io,
- token,
- mio::Ready::readable(),
- mio::PollOpt::edge() | mio::PollOpt::oneshot()
- ).unwrap();
- /*
- let r = self.conns[token].receive();
- debug!("received {:?}", r);
- let r = self.conns[token].send(ClientMessage::ClientConnected);
- debug!("sent {:?} (ClientConnected)", r);
- */
-
- // Since we have a connection try creating a cubeb context. If
- // it fails, mark the failure with Err.
- if self.context.is_none() {
- self.context = Some(cubeb::Context::init("AudioIPC Server", None).or_else(|e| {
- Err(e).chain_err(|| "Unable to create cubeb context.")
- }))
- }
-
- Ok(())
- }
-
- fn new_connection(&mut self, poll: &mut mio::Poll) -> Result<AutoCloseFd> {
- let (s1, s2) = UnixStream::pair()?;
- self.handle_new_connection(poll, s1)?;
- unsafe {
- Ok(audioipc::AutoCloseFd::from_raw_fd(s2.into_raw_fd()))
- }
- }
-
- pub fn poll(&mut self, poll: &mut mio::Poll) -> Result<()> {
- let mut events = mio::Events::with_capacity(16);
+ let cb_remote = wrapper.callback_thread.remote();
- match poll.poll(&mut events, None) {
- Ok(_) => {},
- Err(ref e) if e.kind() == std::io::ErrorKind::Interrupted => return Ok(()),
- Err(e) => warn!("server poll error: {}", e),
- }
-
- for event in events.iter() {
- match event.token() {
- CMD => {
- match self.rx.try_recv().unwrap() {
- Command::Quit => {
- info!("Quitting Audio Server loop");
- self.tx.send(Response::Quit).unwrap();
- bail!("quit");
- },
- Command::NewConnection => {
- info!("Creating new connection");
- let fd = self.new_connection(poll)?;
- self.tx.send(Response::Connection(fd)).unwrap();
- }
- }
- },
- token => {
- trace!("token {:?} ready", token);
-
- let context = self.context.as_ref().expect(
- "Shouldn't receive a message before accepting connection."
- );
-
- let mut readiness = Ready::readable();
+ // We create a pair of connected unix domain sockets. One socket is
+ // registered with the reactor core, the other is returned to the
+ // caller.
+ net::UnixStream::pair()
+ .and_then(|(sock1, sock2)| {
+ // Spawn closure to run on same thread as reactor::Core
+ // via remote handle.
+ wrapper.core_thread.remote().spawn(|handle| {
+ trace!("Incoming connection");
+ UnixStream::from_stream(sock2, handle)
+ .and_then(|sock| {
+ let transport = framed_with_fds(sock, Default::default());
+ rpc::bind_server(transport, CubebServer::new(cb_remote), handle);
+ Ok(())
+ })
+ .map_err(|_| ())
+ // Notify waiting thread that sock2 has been registered.
+ .and_then(|_| wait_tx.send(()))
+ });
+ // Wait for notification that sock2 has been registered
+ // with reactor::Core.
+ let _ = wait_rx.wait();
+ Ok(sock1.into_raw_fd())
+ })
+ .unwrap_or(-1)
+}
- if event.readiness().is_readable() {
- let r = self.conns[token].process_read(context);
- trace!("got {:?}", r);
-
- if let Err(e) = r {
- debug!("dropped client {:?} due to error {:?}", token, e);
- self.conns.remove(token);
- continue;
- }
- };
-
- if event.readiness().is_writable() {
- let r = self.conns[token].flush_pending_send();
- trace!("got {:?}", r);
-
- match r {
- Ok(r) => readiness = r,
- Err(e) => {
- debug!("dropped client {:?} due to error {:?}", token, e);
- self.conns.remove(token);
- continue;
- },
- }
- };
-
- poll.reregister(
- &self.conns[token].io,
- token,
- readiness,
- mio::PollOpt::edge() | mio::PollOpt::oneshot()
- ).unwrap();
- },
- }
- }
-
- Ok(())
- }
+#[no_mangle]
+pub extern "C" fn audioipc_server_stop(p: *mut c_void) {
+ let wrapper = unsafe { Box::<ServerWrapper>::from_raw(p as *mut _) };
+ drop(wrapper);
}
fn error(error: cubeb::Error) -> ClientMessage {
ClientMessage::Error(error.raw_code())
}
-
-struct ServerWrapper {
- thread_handle: std::thread::JoinHandle<()>,
- tx: channel::Sender<Command>,
- rx: std::sync::mpsc::Receiver<Response>,
-}
-
-#[derive(Debug)]
-pub enum Command {
- Quit,
- NewConnection,
-}
-
-#[derive(Debug)]
-pub enum Response {
- Quit,
- Connection(AutoCloseFd)
-}
-
-impl ServerWrapper {
- fn shutdown(self) {
- let _ = self.tx.send(Command::Quit);
- let r = self.rx.recv();
- match r {
- Ok(Response::Quit) => info!("server quit response received"),
- e => warn!("unexpected response to server quit: {:?}", e),
- };
- self.thread_handle.join().unwrap();
- }
-}
-
-#[no_mangle]
-pub extern "C" fn audioipc_server_start() -> *mut c_void {
- let (tx, rx) = channel::channel();
- let (tx2, rx2) = std::sync::mpsc::channel();
-
- let handle = thread::spawn(move || {
- let mut poll = mio::Poll::new().unwrap();
- let mut server = Server::new(rx, tx2);
-
- poll.register(&server.rx, CMD, mio::Ready::readable(), mio::PollOpt::edge())
- .unwrap();
-
- loop {
- if server.poll(&mut poll).is_err() {
- return;
- }
- }
- });
-
- let wrapper = ServerWrapper {
- thread_handle: handle,
- tx: tx,
- rx: rx2,
- };
-
- Box::into_raw(Box::new(wrapper)) as *mut _
-}
-
-#[no_mangle]
-pub extern "C" fn audioipc_server_new_client(p: *mut c_void) -> libc::c_int {
- let wrapper: &mut ServerWrapper = unsafe { &mut *(p as *mut _) };
-
- wrapper.tx.send(Command::NewConnection).unwrap();
-
- if let Response::Connection(fd) = wrapper.rx.recv().unwrap() {
- return fd.into_raw_fd();
- }
-
- -1
-}
-
-#[no_mangle]
-pub extern "C" fn audioipc_server_stop(p: *mut c_void) {
- let wrapper = unsafe { Box::<ServerWrapper>::from_raw(p as *mut _) };
- wrapper.shutdown();
-}