--- a/media/audioipc/README_MOZILLA
+++ b/media/audioipc/README_MOZILLA
@@ -1,8 +1,8 @@
The source from this directory was copied from the audioipc-2
git repository using the update.sh script. The only changes
made were those applied by update.sh and the addition of
Makefile.in build files for the Mozilla build system.
The audioipc-2 git repository is: https://github.com/djg/audioipc-2.git
-The git commit ID used was d7798606aa590ef402344b7a519a0053725a9805 (2018-01-27 09:07:03 +1000)
+The git commit ID used was 933fb48b252a10569ba8d598541577c6f2dc308f (2018-02-21 17:13:04 +1000)
--- a/media/audioipc/audioipc/Cargo.toml
+++ b/media/audioipc/audioipc/Cargo.toml
@@ -3,17 +3,17 @@ name = "audioipc"
version = "0.2.1"
authors = [
"Matthew Gregan <kinetik@flim.org>",
"Dan Glastonbury <dan.glastonbury@gmail.com>"
]
description = "Remote Cubeb IPC"
[dependencies]
-cubeb-core = { path = "../../cubeb-rs/cubeb-core" }
+cubeb = "0.4"
bincode = "0.8"
bytes = "0.4"
# rayon-core in Gecko uses futures 0.1.13
futures = "=0.1.13"
iovec = "0.1"
libc = "0.2"
log = "^0.3.6"
memmap = "0.5.2"
--- a/media/audioipc/audioipc/src/async.rs
+++ b/media/audioipc/audioipc/src/async.rs
@@ -108,21 +108,21 @@ impl AsyncRecvMsg for UnixStream {
Ok((n, cmsg_len, flags)) => {
unsafe {
buf.advance_mut(n);
}
unsafe {
cmsg.advance_mut(cmsg_len);
}
Ok((n, flags).into())
- },
+ }
Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
self.need_read();
Ok(Async::NotReady)
- },
+ }
Err(e) => Err(e),
}
}
}
impl AsyncSendMsg for UnixStream {
fn send_msg_buf<B, C>(&mut self, buf: &mut B, cmsg: &C) -> Poll<usize, io::Error>
where
@@ -134,41 +134,26 @@ impl AsyncSendMsg for UnixStream {
}
let r = {
// The `IoVec` type can't have a zero-length size, so create a dummy
// version from a 1-length slice which we'll overwrite with the
// `bytes_vec` method.
static DUMMY: &[u8] = &[0];
let nom = <&IoVec>::from(DUMMY);
let mut bufs = [
- nom,
- nom,
- nom,
- nom,
- nom,
- nom,
- nom,
- nom,
- nom,
- nom,
- nom,
- nom,
- nom,
- nom,
- nom,
- nom,
+ nom, nom, nom, nom, nom, nom, nom, nom, nom, nom, nom, nom, nom, nom, nom, nom
];
let n = buf.bytes_vec(&mut bufs);
self.send_msg(&bufs[..n], cmsg.bytes())
};
match r {
Ok(n) => {
buf.advance(n);
Ok(Async::Ready(n))
- },
+ }
Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
self.need_write();
Ok(Async::NotReady)
- },
+ }
Err(e) => Err(e),
}
}
}
--- a/media/audioipc/audioipc/src/cmsg.rs
+++ b/media/audioipc/audioipc/src/cmsg.rs
@@ -5,17 +5,17 @@
use bytes::{BufMut, Bytes, BytesMut};
use libc::{self, cmsghdr};
use std::{convert, mem, ops, slice};
use std::os::unix::io::RawFd;
#[derive(Clone, Debug)]
pub struct Fds {
- fds: Bytes
+ fds: Bytes,
}
impl convert::AsRef<[RawFd]> for Fds {
fn as_ref(&self) -> &[RawFd] {
let n = self.fds.len() / mem::size_of::<RawFd>();
unsafe { slice::from_raw_parts(self.fds.as_ptr() as *const _, n) }
}
}
@@ -25,23 +25,21 @@ impl ops::Deref for Fds {
#[inline]
fn deref(&self) -> &[RawFd] {
self.as_ref()
}
}
pub struct ControlMsgIter {
- control: Bytes
+ control: Bytes,
}
pub fn iterator(c: Bytes) -> ControlMsgIter {
- ControlMsgIter {
- control: c
- }
+ ControlMsgIter { control: c }
}
impl Iterator for ControlMsgIter {
type Item = Fds;
// This follows the logic in __cmsg_nxthdr from glibc
// /usr/include/bits/socket.h
fn next(&mut self) -> Option<Self::Item> {
@@ -68,60 +66,60 @@ impl Iterator for ControlMsgIter {
} else {
control.slice_from(next_cmsghdr)
};
match (cmsg.cmsg_level, cmsg.cmsg_type) {
(libc::SOL_SOCKET, libc::SCM_RIGHTS) => {
trace!("Found SCM_RIGHTS...");
return Some(Fds {
- fds: control.slice(cmsghdr_len, cmsg_len as _)
+ fds: control.slice(cmsghdr_len, cmsg_len as _),
});
- },
+ }
(level, kind) => {
trace!("Skipping cmsg level, {}, type={}...", level, kind);
- },
+ }
}
}
}
}
#[derive(Clone, Copy, Debug, Eq, Hash, Ord, PartialEq, PartialOrd)]
pub enum Error {
/// Not enough space in storage to insert control mesage.
- NoSpace
+ NoSpace,
}
#[must_use]
pub struct ControlMsgBuilder {
- result: Result<BytesMut, Error>
+ result: Result<BytesMut, Error>,
}
pub fn builder(buf: &mut BytesMut) -> ControlMsgBuilder {
let buf = aligned(buf);
- ControlMsgBuilder {
- result: Ok(buf)
- }
+ ControlMsgBuilder { result: Ok(buf) }
}
impl ControlMsgBuilder {
fn msg(mut self, level: libc::c_int, kind: libc::c_int, msg: &[u8]) -> Self {
self.result = self.result.and_then(align_buf).and_then(|mut cmsg| {
let cmsg_len = len(msg.len());
if cmsg.remaining_mut() < cmsg_len {
return Err(Error::NoSpace);
}
let cmsghdr = cmsghdr {
cmsg_len: cmsg_len as _,
cmsg_level: level,
- cmsg_type: kind
+ cmsg_type: kind,
};
- let cmsghdr = unsafe { slice::from_raw_parts(&cmsghdr as *const _ as *const _, mem::size_of::<cmsghdr>()) };
+ let cmsghdr = unsafe {
+ slice::from_raw_parts(&cmsghdr as *const _ as *const _, mem::size_of::<cmsghdr>())
+ };
cmsg.put_slice(cmsghdr);
let mut cmsg = try!(align_buf(cmsg));
cmsg.put_slice(msg);
Ok(cmsg)
});
self
--- a/media/audioipc/audioipc/src/codec.rs
+++ b/media/audioipc/audioipc/src/codec.rs
@@ -30,48 +30,48 @@ pub trait Codec {
/// A default method available to be called when there are no more bytes
/// available to be read from the I/O.
fn decode_eof(&mut self, buf: &mut BytesMut) -> io::Result<Self::Out> {
match try!(self.decode(buf)) {
Some(frame) => Ok(frame),
None => Err(io::Error::new(
io::ErrorKind::Other,
- "bytes remaining on stream"
- ))
+ "bytes remaining on stream",
+ )),
}
}
/// Encodes a frame intox the buffer provided.
fn encode(&mut self, msg: Self::In, buf: &mut BytesMut) -> io::Result<()>;
}
/// Codec based upon bincode serialization
///
/// Messages that have been serialized using bincode are prefixed with
/// the length of the message to aid in deserialization, so that it's
/// known if enough data has been received to decode a complete
/// message.
pub struct LengthDelimitedCodec<In, Out> {
state: State,
__in: PhantomData<In>,
- __out: PhantomData<Out>
+ __out: PhantomData<Out>,
}
enum State {
Length,
- Data(u16)
+ Data(u16),
}
impl<In, Out> Default for LengthDelimitedCodec<In, Out> {
fn default() -> Self {
LengthDelimitedCodec {
state: State::Length,
__in: PhantomData,
- __out: PhantomData
+ __out: PhantomData,
}
}
}
impl<In, Out> LengthDelimitedCodec<In, Out> {
// Lengths are encoded as little endian u16
fn decode_length(&mut self, buf: &mut BytesMut) -> io::Result<Option<u16>> {
if buf.len() < 2 {
@@ -84,97 +84,97 @@ impl<In, Out> LengthDelimitedCodec<In, O
// Consume the length field
let _ = buf.split_to(2);
Ok(Some(n))
}
fn decode_data(&mut self, buf: &mut BytesMut, n: u16) -> io::Result<Option<Out>>
where
- Out: DeserializeOwned + Debug
+ Out: DeserializeOwned + Debug,
{
// At this point, the buffer has already had the required capacity
// reserved. All there is to do is read.
let n = n as usize;
if buf.len() < n {
return Ok(None);
}
let buf = buf.split_to(n).freeze();
trace!("Attempting to decode");
let msg = try!(deserialize::<Out>(buf.as_ref()).map_err(|e| match *e {
bincode::ErrorKind::IoError(e) => e,
- _ => io::Error::new(io::ErrorKind::Other, *e)
+ _ => io::Error::new(io::ErrorKind::Other, *e),
}));
trace!("... Decoded {:?}", msg);
Ok(Some(msg))
}
}
impl<In, Out> Codec for LengthDelimitedCodec<In, Out>
where
In: Serialize + Debug,
- Out: DeserializeOwned + Debug
+ Out: DeserializeOwned + Debug,
{
type In = In;
type Out = Out;
fn decode(&mut self, buf: &mut BytesMut) -> io::Result<Option<Self::Out>> {
let n = match self.state {
State::Length => {
match try!(self.decode_length(buf)) {
Some(n) => {
self.state = State::Data(n);
// Ensure that the buffer has enough space to read the
// incoming payload
buf.reserve(n as usize);
n
- },
- None => return Ok(None)
+ }
+ None => return Ok(None),
}
- },
- State::Data(n) => n
+ }
+ State::Data(n) => n,
};
match try!(self.decode_data(buf, n)) {
Some(data) => {
// Update the decode state
self.state = State::Length;
// Make sure the buffer has enough space to read the next head
buf.reserve(2);
Ok(Some(data))
- },
- None => Ok(None)
+ }
+ None => Ok(None),
}
}
fn encode(&mut self, item: Self::In, buf: &mut BytesMut) -> io::Result<()> {
trace!("Attempting to encode");
let encoded_len = serialized_size(&item);
if encoded_len > 8 * 1024 {
return Err(io::Error::new(
io::ErrorKind::InvalidInput,
- "encoded message too big"
+ "encoded message too big",
));
}
buf.reserve((encoded_len + 2) as usize);
buf.put_u16::<LittleEndian>(encoded_len as u16);
if let Err(e) =
serialize_into::<_, Self::In, _>(&mut buf.writer(), &item, Bounded(encoded_len))
{
match *e {
bincode::ErrorKind::IoError(e) => return Err(e),
- _ => return Err(io::Error::new(io::ErrorKind::Other, *e))
+ _ => return Err(io::Error::new(io::ErrorKind::Other, *e)),
}
}
Ok(())
}
}
--- a/media/audioipc/audioipc/src/core.rs
+++ b/media/audioipc/audioipc/src/core.rs
@@ -11,37 +11,37 @@ scoped_thread_local! {
}
pub fn handle() -> Handle {
HANDLE.with(|handle| handle.clone())
}
pub fn spawn<F>(f: F)
where
- F: Future<Item = (), Error = ()> + 'static
+ F: Future<Item = (), Error = ()> + 'static,
{
HANDLE.with(|handle| handle.spawn(f))
}
pub fn spawn_fn<F, R>(f: F)
where
F: FnOnce() -> R + 'static,
- R: IntoFuture<Item = (), Error = ()> + 'static
+ R: IntoFuture<Item = (), Error = ()> + 'static,
{
HANDLE.with(|handle| handle.spawn_fn(f))
}
struct Inner {
join: thread::JoinHandle<()>,
- shutdown: oneshot::Sender<()>
+ shutdown: oneshot::Sender<()>,
}
pub struct CoreThread {
inner: Option<Inner>,
- remote: Remote
+ remote: Remote,
}
impl CoreThread {
pub fn remote(&self) -> Remote {
self.remote.clone()
}
}
@@ -60,17 +60,17 @@ impl fmt::Debug for CoreThread {
// f.debug_tuple("CoreThread").field(&"...").finish()
f.debug_tuple("CoreThread").field(&self.remote).finish()
}
}
pub fn spawn_thread<S, F>(name: S, f: F) -> io::Result<CoreThread>
where
S: Into<String>,
- F: FnOnce() -> io::Result<()> + Send + 'static
+ F: FnOnce() -> io::Result<()> + Send + 'static,
{
let (shutdown_tx, shutdown_rx) = oneshot::channel::<()>();
let (remote_tx, remote_rx) = mpsc::channel::<Remote>();
let join = try!(thread::Builder::new().name(name.into()).spawn(move || {
let mut core = Core::new().expect("Failed to create reactor::Core");
let handle = core.handle();
let remote = handle.remote().clone();
@@ -80,23 +80,21 @@ where
f().and_then(|_| {
let _ = core.run(shutdown_rx);
Ok(())
})
}));
trace!("thread shutdown...");
}));
- let remote = try!(remote_rx.recv().or_else(|_| {
- Err(io::Error::new(
- io::ErrorKind::Other,
- "Failed to receive remote handle from spawned thread"
- ))
- }));
+ let remote = try!(remote_rx.recv().or_else(|_| Err(io::Error::new(
+ io::ErrorKind::Other,
+ "Failed to receive remote handle from spawned thread"
+ ))));
Ok(CoreThread {
inner: Some(Inner {
join: join,
- shutdown: shutdown_tx
+ shutdown: shutdown_tx,
}),
- remote: remote
+ remote: remote,
})
}
--- a/media/audioipc/audioipc/src/errors.rs
+++ b/media/audioipc/audioipc/src/errors.rs
@@ -1,17 +1,17 @@
use bincode;
-use cubeb_core;
+use cubeb;
use std;
error_chain! {
// Maybe replace with chain_err to improve the error info.
foreign_links {
Bincode(bincode::Error);
Io(std::io::Error);
- Cubeb(cubeb_core::Error);
+ Cubeb(cubeb::Error);
}
// Replace bail!(str) with explicit errors.
errors {
Disconnected
}
}
--- a/media/audioipc/audioipc/src/fd_passing.rs
+++ b/media/audioipc/audioipc/src/fd_passing.rs
@@ -15,25 +15,25 @@ use std::collections::VecDeque;
use std::os::unix::io::RawFd;
const INITIAL_CAPACITY: usize = 1024;
const BACKPRESSURE_THRESHOLD: usize = 4 * INITIAL_CAPACITY;
const FDS_CAPACITY: usize = 16;
struct IncomingFds {
cmsg: BytesMut,
- recv_fds: Option<cmsg::ControlMsgIter>
+ recv_fds: Option<cmsg::ControlMsgIter>,
}
impl IncomingFds {
pub fn new(c: usize) -> Self {
let capacity = c * cmsg::space(mem::size_of::<[RawFd; 3]>());
IncomingFds {
cmsg: BytesMut::with_capacity(capacity),
- recv_fds: None
+ recv_fds: None,
}
}
pub fn take_fds(&mut self) -> Option<[RawFd; 3]> {
loop {
let fds = self.recv_fds
.as_mut()
.and_then(|recv_fds| recv_fds.next())
@@ -55,38 +55,38 @@ impl IncomingFds {
self.cmsg.reserve(cmsg::space(mem::size_of::<[RawFd; 3]>()));
&mut self.cmsg
}
}
#[derive(Debug)]
struct Frame {
msgs: Bytes,
- fds: Option<Bytes>
+ fds: Option<Bytes>,
}
/// A unified `Stream` and `Sink` interface over an I/O object, using
/// the `Codec` trait to encode and decode the payload.
pub struct FramedWithFds<A, C> {
io: A,
codec: C,
// Stream
read_buf: BytesMut,
incoming_fds: IncomingFds,
is_readable: bool,
eof: bool,
// Sink
frames: VecDeque<Frame>,
write_buf: BytesMut,
- outgoing_fds: BytesMut
+ outgoing_fds: BytesMut,
}
impl<A, C> FramedWithFds<A, C>
where
- A: AsyncSendMsg
+ A: AsyncSendMsg,
{
// If there is a buffered frame, try to write it to `A`
fn do_write(&mut self) -> Poll<(), io::Error> {
debug!("do_write...");
// Create a frame from any pending message in `write_buf`.
if !self.write_buf.is_empty() {
self.set_frame(None);
}
@@ -97,20 +97,20 @@ where
loop {
let n = match self.frames.front() {
Some(frame) => {
trace!("sending msg {:?}, fds {:?}", frame.msgs, frame.fds);
let mut msgs = frame.msgs.clone().into_buf();
let mut fds = match frame.fds {
Some(ref fds) => fds.clone(),
- None => Bytes::new()
+ None => Bytes::new(),
}.into_buf();
try_ready!(self.io.send_msg_buf(&mut msgs, &fds))
- },
+ }
_ => {
// No pending frames.
return Ok(().into());
}
};
match self.frames.pop_front() {
Some(mut frame) => {
@@ -132,18 +132,18 @@ where
// re-queue the remaining message at the head
// of the queue. (Don't need to resend the fds
// since they've been sent with the first
// part.)
drop(frame.msgs.split_to(n));
self.frames.push_front(frame);
break;
}
- },
- _ => panic!()
+ }
+ _ => panic!(),
}
}
debug!("process {} frames", processed);
trace!("pending frames: {:?}", self.frames);
Ok(().into())
}
@@ -153,28 +153,25 @@ where
assert!(fds.is_none());
trace!("set_frame: No pending messages...");
return;
}
let msgs = self.write_buf.take().freeze();
trace!("set_frame: msgs={:?} fds={:?}", msgs, fds);
- self.frames.push_back(Frame {
- msgs,
- fds
- });
+ self.frames.push_back(Frame { msgs, fds });
}
}
impl<A, C> Stream for FramedWithFds<A, C>
where
A: AsyncRecvMsg,
C: Codec,
- C::Out: AssocRawFd
+ C::Out: AssocRawFd,
{
type Item = C::Out;
type Error = io::Error;
fn poll(&mut self) -> Poll<Option<Self::Item>, Self::Error> {
loop {
// Repeatedly call `decode` or `decode_eof` as long as it is
// "readable". Readable is defined as not having returned `None`. If
@@ -221,25 +218,22 @@ where
}
}
}
impl<A, C> Sink for FramedWithFds<A, C>
where
A: AsyncSendMsg,
C: Codec,
- C::In: AssocRawFd + fmt::Debug
+ C::In: AssocRawFd + fmt::Debug,
{
type SinkItem = C::In;
type SinkError = io::Error;
- fn start_send(
- &mut self,
- item: Self::SinkItem
- ) -> StartSend<Self::SinkItem, Self::SinkError> {
+ fn start_send(&mut self, item: Self::SinkItem) -> StartSend<Self::SinkItem, Self::SinkError> {
trace!("start_send: item={:?}", item);
// If the buffer is already over BACKPRESSURE_THRESHOLD,
// then attempt to flush it. If after flush it's *still*
// over BACKPRESSURE_THRESHOLD, then reject the send.
if self.write_buf.len() > BACKPRESSURE_THRESHOLD {
try!(self.poll_complete());
if self.write_buf.len() > BACKPRESSURE_THRESHOLD {
@@ -290,29 +284,29 @@ pub fn framed_with_fds<A, C>(io: A, code
codec: codec,
read_buf: BytesMut::with_capacity(INITIAL_CAPACITY),
incoming_fds: IncomingFds::new(FDS_CAPACITY),
is_readable: false,
eof: false,
frames: VecDeque::new(),
write_buf: BytesMut::with_capacity(INITIAL_CAPACITY),
outgoing_fds: BytesMut::with_capacity(
- FDS_CAPACITY * cmsg::space(mem::size_of::<[RawFd; 3]>())
- )
+ FDS_CAPACITY * cmsg::space(mem::size_of::<[RawFd; 3]>()),
+ ),
}
}
fn write_zero() -> io::Error {
io::Error::new(io::ErrorKind::WriteZero, "failed to write frame to io")
}
fn clone_into_array<A, T>(slice: &[T]) -> A
where
A: Sized + Default + AsMut<[T]>,
- T: Clone
+ T: Clone,
{
let mut a = Default::default();
<A as AsMut<[T]>>::as_mut(&mut a).clone_from_slice(slice);
a
}
fn close_fds(fds: &[RawFd]) {
for fd in fds {
--- a/media/audioipc/audioipc/src/frame.rs
+++ b/media/audioipc/audioipc/src/frame.rs
@@ -16,17 +16,17 @@ const BACKPRESSURE_THRESHOLD: usize = 4
/// the `Codec` trait to encode and decode the payload.
pub struct Framed<A, C> {
io: A,
codec: C,
read_buf: BytesMut,
write_buf: BytesMut,
frame: Option<<Bytes as IntoBuf>::Buf>,
is_readable: bool,
- eof: bool
+ eof: bool,
}
impl<A, C> Framed<A, C>
where
A: AsyncWrite,
{
// If there is a buffered frame, try to write it to `A`
fn do_write(&mut self) -> Poll<(), io::Error> {
@@ -154,11 +154,11 @@ fn write_zero() -> io::Error {
pub fn framed<A, C>(io: A, codec: C) -> Framed<A, C> {
Framed {
io: io,
codec: codec,
read_buf: BytesMut::with_capacity(INITIAL_CAPACITY),
write_buf: BytesMut::with_capacity(INITIAL_CAPACITY),
frame: None,
is_readable: false,
- eof: false
+ eof: false,
}
}
--- a/media/audioipc/audioipc/src/lib.rs
+++ b/media/audioipc/audioipc/src/lib.rs
@@ -1,35 +1,34 @@
// Copyright © 2017 Mozilla Foundation
//
// This program is made available under an ISC-style license. See the
// accompanying file LICENSE for details
#![allow(dead_code)] // TODO: Remove.
-
#![recursion_limit = "1024"]
#[macro_use]
extern crate error_chain;
#[macro_use]
extern crate log;
#[macro_use]
extern crate serde_derive;
extern crate bincode;
extern crate bytes;
-extern crate cubeb_core;
+extern crate cubeb;
#[macro_use]
extern crate futures;
extern crate iovec;
extern crate libc;
extern crate memmap;
-extern crate serde;
#[macro_use]
extern crate scoped_tls;
+extern crate serde;
extern crate tokio_core;
#[macro_use]
extern crate tokio_io;
extern crate tokio_uds;
pub mod async;
pub mod cmsg;
pub mod codec;
@@ -38,41 +37,47 @@ pub mod fd_passing;
pub mod frame;
pub mod rpc;
pub mod core;
pub mod messages;
mod msg;
pub mod shm;
use iovec::IoVec;
-
#[cfg(target_os = "linux")]
use libc::MSG_CMSG_CLOEXEC;
pub use messages::{ClientMessage, ServerMessage};
-
use std::env::temp_dir;
use std::io;
use std::os::unix::io::{AsRawFd, FromRawFd, IntoRawFd, RawFd};
use std::path::PathBuf;
#[cfg(not(target_os = "linux"))]
const MSG_CMSG_CLOEXEC: libc::c_int = 0;
// Extend sys::os::unix::net::UnixStream to support sending and receiving a single file desc.
// We can extend UnixStream by using traits, eliminating the need to introduce a new wrapped
// UnixStream type.
pub trait RecvMsg {
- fn recv_msg(&mut self, iov: &mut [&mut IoVec], cmsg: &mut [u8]) -> io::Result<(usize, usize, i32)>;
+ fn recv_msg(
+ &mut self,
+ iov: &mut [&mut IoVec],
+ cmsg: &mut [u8],
+ ) -> io::Result<(usize, usize, i32)>;
}
pub trait SendMsg {
fn send_msg(&mut self, iov: &[&IoVec], cmsg: &[u8]) -> io::Result<usize>;
}
impl<T: AsRawFd> RecvMsg for T {
- fn recv_msg(&mut self, iov: &mut [&mut IoVec], cmsg: &mut [u8]) -> io::Result<(usize, usize, i32)> {
+ fn recv_msg(
+ &mut self,
+ iov: &mut [&mut IoVec],
+ cmsg: &mut [u8],
+ ) -> io::Result<(usize, usize, i32)> {
msg::recv_msg_with_flags(self.as_raw_fd(), iov, cmsg, MSG_CMSG_CLOEXEC)
}
}
impl<T: AsRawFd> SendMsg for T {
fn send_msg(&mut self, iov: &[&IoVec], cmsg: &[u8]) -> io::Result<usize> {
msg::send_msg_with_flags(self.as_raw_fd(), iov, cmsg, 0)
}
--- a/media/audioipc/audioipc/src/messages.rs
+++ b/media/audioipc/audioipc/src/messages.rs
@@ -1,48 +1,48 @@
// Copyright © 2017 Mozilla Foundation
//
// This program is made available under an ISC-style license. See the
// accompanying file LICENSE for details
-use cubeb_core::{self, ffi};
+use cubeb::{self, ffi};
use std::ffi::{CStr, CString};
-use std::os::raw::c_char;
+use std::os::raw::{c_char, c_int, c_uint};
use std::os::unix::io::RawFd;
use std::ptr;
#[derive(Debug, Serialize, Deserialize)]
pub struct Device {
pub output_name: Option<Vec<u8>>,
- pub input_name: Option<Vec<u8>>
+ pub input_name: Option<Vec<u8>>,
}
-impl<'a> From<cubeb_core::Device<'a>> for Device {
- fn from(info: cubeb_core::Device) -> Self {
+impl<'a> From<&'a cubeb::DeviceRef> for Device {
+ fn from(info: &'a cubeb::DeviceRef) -> Self {
Self {
output_name: info.output_name_bytes().map(|s| s.to_vec()),
- input_name: info.input_name_bytes().map(|s| s.to_vec())
+ input_name: info.input_name_bytes().map(|s| s.to_vec()),
}
}
}
impl From<ffi::cubeb_device> for Device {
fn from(info: ffi::cubeb_device) -> Self {
Self {
output_name: dup_str(info.output_name),
- input_name: dup_str(info.input_name)
+ input_name: dup_str(info.input_name),
}
}
}
impl From<Device> for ffi::cubeb_device {
fn from(info: Device) -> Self {
Self {
output_name: opt_str(info.output_name),
- input_name: opt_str(info.input_name)
+ input_name: opt_str(info.input_name),
}
}
}
#[derive(Debug, Serialize, Deserialize)]
pub struct DeviceInfo {
pub devid: usize,
pub device_id: Option<Vec<u8>>,
@@ -57,21 +57,22 @@ pub struct DeviceInfo {
pub format: ffi::cubeb_device_fmt,
pub default_format: ffi::cubeb_device_fmt,
pub max_channels: u32,
pub default_rate: u32,
pub max_rate: u32,
pub min_rate: u32,
pub latency_lo: u32,
- pub latency_hi: u32
+ pub latency_hi: u32,
}
-impl<'a> From<&'a ffi::cubeb_device_info> for DeviceInfo {
- fn from(info: &'a ffi::cubeb_device_info) -> Self {
+impl<'a> From<&'a cubeb::DeviceInfoRef> for DeviceInfo {
+ fn from(info: &'a cubeb::DeviceInfoRef) -> Self {
+ let info = unsafe { &*info.as_ptr() };
DeviceInfo {
devid: info.devid as _,
device_id: dup_str(info.device_id),
friendly_name: dup_str(info.friendly_name),
group_id: dup_str(info.group_id),
vendor_name: dup_str(info.vendor_name),
device_type: info.device_type,
@@ -81,17 +82,17 @@ impl<'a> From<&'a ffi::cubeb_device_info
format: info.format,
default_format: info.default_format,
max_channels: info.max_channels,
default_rate: info.default_rate,
max_rate: info.max_rate,
min_rate: info.min_rate,
latency_lo: info.latency_lo,
- latency_hi: info.latency_hi
+ latency_hi: info.latency_hi,
}
}
}
impl From<DeviceInfo> for ffi::cubeb_device_info {
fn from(info: DeviceInfo) -> Self {
ffi::cubeb_device_info {
devid: info.devid as _,
@@ -107,92 +108,72 @@ impl From<DeviceInfo> for ffi::cubeb_dev
format: info.format,
default_format: info.default_format,
max_channels: info.max_channels,
default_rate: info.default_rate,
max_rate: info.max_rate,
min_rate: info.min_rate,
latency_lo: info.latency_lo,
- latency_hi: info.latency_hi
+ latency_hi: info.latency_hi,
}
}
}
-#[derive(Debug, Serialize, Deserialize)]
+#[derive(Clone, Copy, Debug, Deserialize, Serialize)]
pub struct StreamParams {
- pub format: u32,
- pub rate: u16,
- pub channels: u8,
- pub layout: i32,
- pub prefs: i32
+ pub format: ffi::cubeb_sample_format,
+ pub rate: c_uint,
+ pub channels: c_uint,
+ pub layout: ffi::cubeb_channel_layout,
+ pub prefs: ffi::cubeb_stream_prefs,
}
-impl<'a> From<&'a ffi::cubeb_stream_params> for StreamParams {
- fn from(params: &'a ffi::cubeb_stream_params) -> Self {
- assert!(params.channels <= u32::from(u8::max_value()));
-
- StreamParams {
- format: params.format,
- rate: params.rate as u16,
- channels: params.channels as u8,
- layout: params.layout,
- prefs: params.prefs
- }
- }
-}
-
-impl<'a> From<&'a StreamParams> for ffi::cubeb_stream_params {
- fn from(params: &StreamParams) -> Self {
- ffi::cubeb_stream_params {
- format: params.format,
- rate: u32::from(params.rate),
- channels: u32::from(params.channels),
- layout: params.layout,
- prefs: params.prefs
- }
+impl<'a> From<&'a cubeb::StreamParamsRef> for StreamParams {
+ fn from(x: &cubeb::StreamParamsRef) -> StreamParams {
+ unsafe { *(x.as_ptr() as *mut StreamParams) }
}
}
#[derive(Debug, Serialize, Deserialize)]
pub struct StreamInitParams {
pub stream_name: Option<Vec<u8>>,
pub input_device: usize,
pub input_stream_params: Option<StreamParams>,
pub output_device: usize,
pub output_stream_params: Option<StreamParams>,
- pub latency_frames: u32
+ pub latency_frames: u32,
}
fn dup_str(s: *const c_char) -> Option<Vec<u8>> {
if s.is_null() {
None
} else {
let vec: Vec<u8> = unsafe { CStr::from_ptr(s) }.to_bytes().to_vec();
Some(vec)
}
}
-fn opt_str(v: Option<Vec<u8>>) -> *const c_char {
+fn opt_str(v: Option<Vec<u8>>) -> *mut c_char {
match v {
Some(v) => match CString::new(v) {
Ok(s) => s.into_raw(),
Err(_) => {
debug!("Failed to convert bytes to CString");
- ptr::null()
+ ptr::null_mut()
}
},
- None => ptr::null()
+ None => ptr::null_mut(),
}
}
#[derive(Debug, Serialize, Deserialize)]
pub struct StreamCreate {
pub token: usize,
- pub fds: [RawFd; 3]
+ pub fds: [RawFd; 3],
}
// Client -> Server messages.
// TODO: Callbacks should be different messages types so
// ServerConn::process_msg doesn't have a catch-all case.
#[derive(Debug, Serialize, Deserialize)]
pub enum ServerMessage {
ClientConnect,
@@ -210,17 +191,17 @@ pub enum ServerMessage {
StreamStart(usize),
StreamStop(usize),
StreamResetDefaultDevice(usize),
StreamGetPosition(usize),
StreamGetLatency(usize),
StreamSetVolume(usize, f32),
StreamSetPanning(usize, f32),
- StreamGetCurrentDevice(usize)
+ StreamGetCurrentDevice(usize),
}
// Server -> Client messages.
// TODO: Streams need id.
#[derive(Debug, Serialize, Deserialize)]
pub enum ClientMessage {
ClientConnected,
ClientDisconnected,
@@ -239,52 +220,84 @@ pub enum ClientMessage {
StreamStopped,
StreamDefaultDeviceReset,
StreamPosition(u64),
StreamLatency(u32),
StreamVolumeSet,
StreamPanningSet,
StreamCurrentDevice(Device),
- Error(ffi::cubeb_error_code)
+ Error(c_int),
}
#[derive(Debug, Deserialize, Serialize)]
pub enum CallbackReq {
Data(isize, usize),
- State(ffi::cubeb_state)
+ State(ffi::cubeb_state),
}
#[derive(Debug, Deserialize, Serialize)]
pub enum CallbackResp {
Data(isize),
- State
+ State,
}
pub trait AssocRawFd {
fn fd(&self) -> Option<[RawFd; 3]> {
None
}
fn take_fd<F>(&mut self, _: F)
where
- F: FnOnce() -> Option<[RawFd; 3]>
+ F: FnOnce() -> Option<[RawFd; 3]>,
{
}
}
impl AssocRawFd for ServerMessage {}
impl AssocRawFd for ClientMessage {
fn fd(&self) -> Option<[RawFd; 3]> {
match *self {
ClientMessage::StreamCreated(ref data) => Some(data.fds),
- _ => None
+ _ => None,
}
}
fn take_fd<F>(&mut self, f: F)
where
- F: FnOnce() -> Option<[RawFd; 3]>
+ F: FnOnce() -> Option<[RawFd; 3]>,
{
if let ClientMessage::StreamCreated(ref mut data) = *self {
data.fds = f().unwrap();
}
}
}
+
+#[cfg(test)]
+mod test {
+ use super::StreamParams;
+ use cubeb::ffi;
+ use std::mem;
+
+ #[test]
+ fn stream_params_size_check() {
+ assert_eq!(
+ mem::size_of::<StreamParams>(),
+ mem::size_of::<ffi::cubeb_stream_params>()
+ )
+ }
+
+ #[test]
+ fn stream_params_from() {
+ let mut raw = ffi::cubeb_stream_params::default();
+ raw.format = ffi::CUBEB_SAMPLE_FLOAT32BE;
+ raw.rate = 96_000;
+ raw.channels = 32;
+ raw.layout = ffi::CUBEB_LAYOUT_3F1_LFE;
+ raw.prefs = ffi::CUBEB_STREAM_PREF_LOOPBACK;
+ let wrapped = ::cubeb::StreamParams::from(raw);
+ let params = StreamParams::from(wrapped.as_ref());
+ assert_eq!(params.format, raw.format);
+ assert_eq!(params.rate, raw.rate);
+ assert_eq!(params.channels, raw.channels);
+ assert_eq!(params.layout, raw.layout);
+ assert_eq!(params.prefs, raw.prefs);
+ }
+}
--- a/media/audioipc/audioipc/src/msg.rs
+++ b/media/audioipc/audioipc/src/msg.rs
@@ -11,27 +11,27 @@ fn cvt(r: libc::ssize_t) -> io::Result<u
Ok(r as usize)
}
}
// Convert return of -1 into error message, handling retry on EINTR
fn cvt_r<F: FnMut() -> libc::ssize_t>(mut f: F) -> io::Result<usize> {
loop {
match cvt(f()) {
- Err(ref e) if e.kind() == io::ErrorKind::Interrupted => {},
- other => return other
+ Err(ref e) if e.kind() == io::ErrorKind::Interrupted => {}
+ other => return other,
}
}
}
pub fn recv_msg_with_flags(
socket: RawFd,
bufs: &mut [&mut IoVec],
cmsg: &mut [u8],
- flags: libc::c_int
+ flags: libc::c_int,
) -> io::Result<(usize, usize, libc::c_int)> {
let slice = iovec::as_os_slice_mut(bufs);
let len = cmp::min(<libc::c_int>::max_value() as usize, slice.len());
let (control, controllen) = if cmsg.is_empty() {
(ptr::null_mut(), 0)
} else {
(cmsg.as_ptr() as *mut _, cmsg.len())
};
@@ -51,17 +51,17 @@ pub fn recv_msg_with_flags(
let controllen = msghdr.msg_controllen as usize;
Ok((n, controllen, msghdr.msg_flags))
}
pub fn send_msg_with_flags(
socket: RawFd,
bufs: &[&IoVec],
cmsg: &[u8],
- flags: libc::c_int
+ flags: libc::c_int,
) -> io::Result<usize> {
let slice = iovec::as_os_slice(bufs);
let len = cmp::min(<libc::c_int>::max_value() as usize, slice.len());
let (control, controllen) = if cmsg.is_empty() {
(ptr::null_mut(), 0)
} else {
(cmsg.as_ptr() as *mut _, cmsg.len())
};
@@ -69,12 +69,10 @@ pub fn send_msg_with_flags(
let mut msghdr: libc::msghdr = unsafe { mem::zeroed() };
msghdr.msg_name = ptr::null_mut();
msghdr.msg_namelen = 0;
msghdr.msg_iov = slice.as_ptr() as *mut _;
msghdr.msg_iovlen = len as _;
msghdr.msg_control = control;
msghdr.msg_controllen = controllen as _;
- cvt_r(|| unsafe {
- libc::sendmsg(socket, &msghdr as *const _, flags)
- })
+ cvt_r(|| unsafe { libc::sendmsg(socket, &msghdr as *const _, flags) })
}
--- a/media/audioipc/audioipc/src/rpc/client/mod.rs
+++ b/media/audioipc/audioipc/src/rpc/client/mod.rs
@@ -48,28 +48,28 @@ use std::io;
use tokio_core::reactor::Handle;
mod proxy;
pub use self::proxy::{ClientProxy, Response};
pub fn bind_client<C>(
transport: C::Transport,
- handle: &Handle
+ handle: &Handle,
) -> proxy::ClientProxy<C::Request, C::Response>
where
- C: Client
+ C: Client,
{
let (tx, rx) = proxy::channel();
let fut = {
let handler = ClientHandler::<C> {
transport: transport,
requests: rx,
- in_flight: VecDeque::with_capacity(32)
+ in_flight: VecDeque::with_capacity(32),
};
Driver::new(handler)
};
// Spawn the RPC driver into task
handle.spawn(Box::new(fut.map_err(|_| ())));
tx
@@ -87,43 +87,43 @@ pub trait Client: 'static {
+ Stream<Item = Self::Response, Error = io::Error>
+ Sink<SinkItem = Self::Request, SinkError = io::Error>;
}
////////////////////////////////////////////////////////////////////////////////
struct ClientHandler<C>
where
- C: Client
+ C: Client,
{
transport: C::Transport,
requests: proxy::Receiver<C::Request, C::Response>,
- in_flight: VecDeque<oneshot::Sender<C::Response>>
+ in_flight: VecDeque<oneshot::Sender<C::Response>>,
}
impl<C> Handler for ClientHandler<C>
where
- C: Client
+ C: Client,
{
type In = C::Response;
type Out = C::Request;
type Transport = C::Transport;
fn transport(&mut self) -> &mut Self::Transport {
&mut self.transport
}
fn consume(&mut self, response: Self::In) -> io::Result<()> {
trace!("ClientHandler::consume");
if let Some(complete) = self.in_flight.pop_front() {
drop(complete.send(response));
} else {
return Err(io::Error::new(
io::ErrorKind::Other,
- "request / response mismatch"
+ "request / response mismatch",
));
}
Ok(())
}
/// Produce a message
fn produce(&mut self) -> Poll<Option<Self::Out>, io::Error> {
@@ -133,26 +133,26 @@ where
match self.requests.poll() {
Ok(Async::Ready(Some((request, complete)))) => {
trace!(" --> received request");
// Track complete handle
self.in_flight.push_back(complete);
Ok(Some(request).into())
- },
+ }
Ok(Async::Ready(None)) => {
trace!(" --> client dropped");
Ok(None.into())
- },
+ }
Ok(Async::NotReady) => {
trace!(" --> not ready");
Ok(Async::NotReady)
- },
- Err(_) => unreachable!()
+ }
+ Err(_) => unreachable!(),
}
}
/// RPC currently in flight
fn has_in_flight(&self) -> bool {
!self.in_flight.is_empty()
}
}
--- a/media/audioipc/audioipc/src/rpc/client/proxy.rs
+++ b/media/audioipc/audioipc/src/rpc/client/proxy.rs
@@ -52,66 +52,62 @@ use std::io;
/// client connection.
pub type Request<R, Q> = (R, oneshot::Sender<Q>);
/// Receive requests submitted to the client
pub type Receiver<R, Q> = mpsc::UnboundedReceiver<Request<R, Q>>;
/// Response future returned from a client
pub struct Response<Q> {
- inner: oneshot::Receiver<Q>
+ inner: oneshot::Receiver<Q>,
}
pub struct ClientProxy<R, Q> {
- tx: mpsc::UnboundedSender<Request<R, Q>>
+ tx: mpsc::UnboundedSender<Request<R, Q>>,
}
impl<R, Q> Clone for ClientProxy<R, Q> {
fn clone(&self) -> Self {
ClientProxy {
- tx: self.tx.clone()
+ tx: self.tx.clone(),
}
}
}
pub fn channel<R, Q>() -> (ClientProxy<R, Q>, Receiver<R, Q>) {
// Create a channel to send the requests to client-side of rpc.
let (tx, rx) = mpsc::unbounded();
// Wrap the `tx` part in ClientProxy so the rpc call interface
// can be implemented.
- let client = ClientProxy {
- tx
- };
+ let client = ClientProxy { tx };
(client, rx)
}
impl<R, Q> ClientProxy<R, Q> {
pub fn call(&self, request: R) -> Response<Q> {
// The response to returned from the rpc client task over a
// oneshot channel.
let (tx, rx) = oneshot::channel();
// If send returns an Err, its because the other side has been dropped.
// By ignoring it, we are just dropping the `tx`, which will mean the
// rx will return Canceled when polled. In turn, that is translated
// into a BrokenPipe, which conveys the proper error.
let _ = self.tx.send((request, tx));
- Response {
- inner: rx
- }
+ Response { inner: rx }
}
}
impl<R, Q> fmt::Debug for ClientProxy<R, Q>
where
R: fmt::Debug,
- Q: fmt::Debug
+ Q: fmt::Debug,
{
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
write!(f, "ClientProxy {{ ... }}")
}
}
impl<Q> Future for Response<Q> {
type Item = Q;
@@ -127,14 +123,14 @@ impl<Q> Future for Response<Q> {
Err(e)
}
}
}
}
impl<Q> fmt::Debug for Response<Q>
where
- Q: fmt::Debug
+ Q: fmt::Debug,
{
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
write!(f, "Response {{ ... }}")
}
}
--- a/media/audioipc/audioipc/src/rpc/driver.rs
+++ b/media/audioipc/audioipc/src/rpc/driver.rs
@@ -5,38 +5,38 @@
use futures::{Async, AsyncSink, Future, Poll, Sink, Stream};
use rpc::Handler;
use std::fmt;
use std::io;
pub struct Driver<T>
where
- T: Handler
+ T: Handler,
{
// Glue
handler: T,
// True as long as the connection has more request frames to read.
run: bool,
// True when the transport is fully flushed
- is_flushed: bool
+ is_flushed: bool,
}
impl<T> Driver<T>
where
- T: Handler
+ T: Handler,
{
/// Create a new rpc driver with the given service and transport.
pub fn new(handler: T) -> Driver<T> {
Driver {
handler: handler,
run: true,
- is_flushed: true
+ is_flushed: true,
}
}
/// Returns true if the driver has nothing left to do
fn is_done(&self) -> bool {
!self.run && self.is_flushed && !self.has_in_flight()
}
@@ -60,17 +60,17 @@ where
match req {
Some(message) => {
trace!("received message");
if let Err(e) = self.handler.consume(message) {
// TODO: Should handler be infalliable?
panic!("unimplemented error handling: {:?}", e);
}
- },
+ }
None => {
trace!("received None");
// At this point, we just return. This works
// because poll with be called again and go
// through the receive-cycle again.
self.run = false;
}
}
@@ -81,24 +81,24 @@ where
/// Send outgoing messages to the transport.
fn send_outgoing(&mut self) -> io::Result<()> {
trace!("send_responses");
loop {
match try!(self.handler.produce()) {
Async::Ready(Some(message)) => {
trace!(" --> got message");
try!(self.process_outgoing(message));
- },
+ }
Async::Ready(None) => {
trace!(" --> got None");
// The service is done with the connection.
break;
- },
+ }
// Nothing to dispatch
- Async::NotReady => break
+ Async::NotReady => break,
}
}
Ok(())
}
fn process_outgoing(&mut self, message: T::Out) -> io::Result<()> {
trace!("process_outgoing");
@@ -116,17 +116,17 @@ where
fn has_in_flight(&self) -> bool {
self.handler.has_in_flight()
}
}
impl<T> Future for Driver<T>
where
- T: Handler
+ T: Handler,
{
type Item = ();
type Error = io::Error;
fn poll(&mut self) -> Poll<(), Self::Error> {
trace!("rpc::Driver::tick");
// First read off data from the socket
@@ -148,23 +148,23 @@ where
}
fn assert_send<S: Sink>(s: &mut S, item: S::SinkItem) -> Result<(), S::SinkError> {
match try!(s.start_send(item)) {
AsyncSink::Ready => Ok(()),
AsyncSink::NotReady(_) => panic!(
"sink reported itself as ready after `poll_ready` but was \
then unable to accept a message"
- )
+ ),
}
}
impl<T> fmt::Debug for Driver<T>
where
- T: Handler + fmt::Debug
+ T: Handler + fmt::Debug,
{
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
f.debug_struct("rpc::Handler")
.field("handler", &self.handler)
.field("run", &self.run)
.field("is_flushed", &self.is_flushed)
.finish()
}
--- a/media/audioipc/audioipc/src/rpc/server.rs
+++ b/media/audioipc/audioipc/src/rpc/server.rs
@@ -44,23 +44,23 @@ use rpc::Handler;
use rpc::driver::Driver;
use std::collections::VecDeque;
use std::io;
use tokio_core::reactor::Handle;
/// Bind an async I/O object `io` to the `server`.
pub fn bind_server<S>(transport: S::Transport, server: S, handle: &Handle)
where
- S: Server
+ S: Server,
{
let fut = {
let handler = ServerHandler {
server: server,
transport: transport,
- in_flight: VecDeque::with_capacity(32)
+ in_flight: VecDeque::with_capacity(32),
};
Driver::new(handler)
};
// Spawn the RPC driver into task
handle.spawn(Box::new(fut.map_err(|_| ())))
}
@@ -79,34 +79,33 @@ pub trait Server: 'static {
type Transport: 'static
+ Stream<Item = Self::Request, Error = io::Error>
+ Sink<SinkItem = Self::Response, SinkError = io::Error>;
/// Process the request and return the response asynchronously.
fn process(&mut self, req: Self::Request) -> Self::Future;
}
-
////////////////////////////////////////////////////////////////////////////////
struct ServerHandler<S>
where
- S: Server
+ S: Server,
{
// The service handling the connection
server: S,
// The transport responsible for sending/receving messages over the wire
transport: S::Transport,
// FIFO of "in flight" responses to requests.
- in_flight: VecDeque<InFlight<S::Future>>
+ in_flight: VecDeque<InFlight<S::Future>>,
}
impl<S> Handler for ServerHandler<S>
where
- S: Server
+ S: Server,
{
type In = S::Request;
type Out = S::Response;
type Transport = S::Transport;
/// Mutable reference to the transport
fn transport(&mut self) -> &mut Self::Transport {
&mut self.transport
@@ -128,51 +127,51 @@ where
// Make progress on pending responses
for pending in &mut self.in_flight {
pending.poll();
}
// Is the head of the queue ready?
match self.in_flight.front() {
- Some(&InFlight::Done(_)) => {},
+ Some(&InFlight::Done(_)) => {}
_ => {
trace!(" --> not ready");
return Ok(Async::NotReady);
}
}
// Return the ready response
match self.in_flight.pop_front() {
Some(InFlight::Done(res)) => {
trace!(" --> received response");
Ok(Async::Ready(Some(res)))
- },
- _ => panic!()
+ }
+ _ => panic!(),
}
}
/// RPC currently in flight
fn has_in_flight(&self) -> bool {
!self.in_flight.is_empty()
}
}
////////////////////////////////////////////////////////////////////////////////
enum InFlight<F: Future<Error = ()>> {
Active(F),
- Done(F::Item)
+ Done(F::Item),
}
impl<F: Future<Error = ()>> InFlight<F> {
fn poll(&mut self) {
let res = match *self {
InFlight::Active(ref mut f) => match f.poll() {
Ok(Async::Ready(e)) => e,
Err(_) => unreachable!(),
- Ok(Async::NotReady) => return
+ Ok(Async::NotReady) => return,
},
- _ => return
+ _ => return,
};
*self = InFlight::Done(res);
}
}
--- a/media/audioipc/audioipc/src/shm.rs
+++ b/media/audioipc/audioipc/src/shm.rs
@@ -1,35 +1,30 @@
use errors::*;
use memmap::{Mmap, MmapViewSync, Protection};
use std::fs::{remove_file, File, OpenOptions};
use std::path::Path;
use std::sync::atomic;
pub struct SharedMemReader {
- mmap: Mmap
+ mmap: Mmap,
}
impl SharedMemReader {
pub fn new(path: &Path, size: usize) -> Result<(SharedMemReader, File)> {
let file = OpenOptions::new()
.read(true)
.write(true)
.create_new(true)
.open(path)?;
let _ = remove_file(path);
file.set_len(size as u64)?;
let mmap = Mmap::open(&file, Protection::Read)?;
assert_eq!(mmap.len(), size);
- Ok((
- SharedMemReader {
- mmap
- },
- file
- ))
+ Ok((SharedMemReader { mmap }, file))
}
pub fn read(&self, buf: &mut [u8]) -> Result<()> {
if buf.is_empty() {
return Ok(());
}
// TODO: Track how much is in the shm area.
if buf.len() <= self.mmap.len() {
@@ -41,27 +36,25 @@ impl SharedMemReader {
Ok(())
} else {
bail!("mmap size");
}
}
}
pub struct SharedMemSlice {
- view: MmapViewSync
+ view: MmapViewSync,
}
impl SharedMemSlice {
pub fn from(file: &File, size: usize) -> Result<SharedMemSlice> {
let mmap = Mmap::open(file, Protection::Read)?;
assert_eq!(mmap.len(), size);
let view = mmap.into_view_sync();
- Ok(SharedMemSlice {
- view
- })
+ Ok(SharedMemSlice { view })
}
pub fn get_slice(&self, size: usize) -> Result<&[u8]> {
if size == 0 {
return Ok(&[]);
}
// TODO: Track how much is in the shm area.
if size <= self.view.len() {
@@ -74,41 +67,36 @@ impl SharedMemSlice {
}
/// Clones the view of the memory map.
///
/// The underlying memory map is shared, and thus the caller must ensure that the memory
/// underlying the view is not illegally aliased.
pub unsafe fn clone_view(&self) -> Self {
SharedMemSlice {
- view: self.view.clone()
+ view: self.view.clone(),
}
}
}
pub struct SharedMemWriter {
- mmap: Mmap
+ mmap: Mmap,
}
impl SharedMemWriter {
pub fn new(path: &Path, size: usize) -> Result<(SharedMemWriter, File)> {
let file = OpenOptions::new()
.read(true)
.write(true)
.create_new(true)
.open(path)?;
let _ = remove_file(path);
file.set_len(size as u64)?;
let mmap = Mmap::open(&file, Protection::ReadWrite)?;
- Ok((
- SharedMemWriter {
- mmap
- },
- file
- ))
+ Ok((SharedMemWriter { mmap }, file))
}
pub fn write(&mut self, buf: &[u8]) -> Result<()> {
if buf.is_empty() {
return Ok(());
}
// TODO: Track how much is in the shm area.
if buf.len() <= self.mmap.len() {
@@ -119,27 +107,25 @@ impl SharedMemWriter {
Ok(())
} else {
bail!("mmap size");
}
}
}
pub struct SharedMemMutSlice {
- view: MmapViewSync
+ view: MmapViewSync,
}
impl SharedMemMutSlice {
pub fn from(file: &File, size: usize) -> Result<SharedMemMutSlice> {
let mmap = Mmap::open(file, Protection::ReadWrite)?;
assert_eq!(mmap.len(), size);
let view = mmap.into_view_sync();
- Ok(SharedMemMutSlice {
- view
- })
+ Ok(SharedMemMutSlice { view })
}
pub fn get_mut_slice(&mut self, size: usize) -> Result<&mut [u8]> {
if size == 0 {
return Ok(&mut []);
}
// TODO: Track how much is in the shm area.
if size <= self.view.len() {
@@ -153,12 +139,12 @@ impl SharedMemMutSlice {
/// Clones the view of the memory map.
///
/// The underlying memory map is shared, and thus the caller must
/// ensure that the memory underlying the view is not illegally
/// aliased.
pub unsafe fn clone_view(&self) -> Self {
SharedMemMutSlice {
- view: self.view.clone()
+ view: self.view.clone(),
}
}
}
--- a/media/audioipc/client/Cargo.toml
+++ b/media/audioipc/client/Cargo.toml
@@ -1,21 +1,21 @@
[package]
name = "audioipc-client"
-version = "0.2.0"
+version = "0.3.0"
authors = [
"Matthew Gregan <kinetik@flim.org>",
"Dan Glastonbury <dan.glastonbury@gmail.com>"
]
description = "Cubeb Backend for talking to remote cubeb server."
[dependencies]
audioipc = { path="../audioipc" }
-cubeb-backend = { path = "../../cubeb-rs/cubeb-backend" }
-cubeb-core = { path = "../../cubeb-rs/cubeb-core" }
+cubeb-backend = "0.4"
+foreign-types = "0.3"
# rayon-core in Gecko uses futures 0.1.13
futures = { version="=0.1.13", default-features=false, features=["use_std"] }
# futures-cpupool 0.1.5 matches futures 0.1.13
futures-cpupool = { version="=0.1.5", default-features=false }
libc = "0.2"
log = "^0.3.6"
tokio-core = "0.1"
tokio-uds = "0.1.7"
\ No newline at end of file
--- a/media/audioipc/client/src/context.rs
+++ b/media/audioipc/client/src/context.rs
@@ -4,56 +4,54 @@
// accompanying file LICENSE for details
use ClientStream;
use assert_not_in_callback;
use audioipc::{messages, ClientMessage, ServerMessage};
use audioipc::{core, rpc};
use audioipc::codec::LengthDelimitedCodec;
use audioipc::fd_passing::{framed_with_fds, FramedWithFds};
-use cubeb_backend::{Context, Ops};
-use cubeb_core::{ffi, DeviceId, DeviceType, Error, Result, StreamParams};
-use cubeb_core::binding::Binding;
+use cubeb_backend::{ffi, ChannelLayout, Context, ContextOps, DeviceCollectionRef, DeviceId,
+ DeviceType, Error, Ops, Result, Stream, StreamParams, StreamParamsRef};
use futures::Future;
use futures_cpupool::{self, CpuPool};
use libc;
-use std::{fmt, io, mem};
+use std::{fmt, io, mem, ptr};
use std::ffi::{CStr, CString};
use std::os::raw::c_void;
use std::os::unix::io::FromRawFd;
use std::os::unix::net;
use std::sync::mpsc;
use stream;
use tokio_core::reactor::{Handle, Remote};
use tokio_uds::UnixStream;
struct CubebClient;
impl rpc::Client for CubebClient {
type Request = ServerMessage;
type Response = ClientMessage;
- type Transport =
- FramedWithFds<UnixStream, LengthDelimitedCodec<Self::Request, Self::Response>>;
+ type Transport = FramedWithFds<UnixStream, LengthDelimitedCodec<Self::Request, Self::Response>>;
}
macro_rules! t(
($e:expr) => (
match $e {
Ok(e) => e,
Err(_) => return Err(Error::default())
}
));
pub const CLIENT_OPS: Ops = capi_new!(ClientContext, ClientStream);
pub struct ClientContext {
_ops: *const Ops,
rpc: rpc::ClientProxy<ServerMessage, ClientMessage>,
core: core::CoreThread,
- cpu_pool: CpuPool
+ cpu_pool: CpuPool,
}
impl ClientContext {
#[doc(hidden)]
pub fn remote(&self) -> Remote {
self.core.remote()
}
@@ -74,192 +72,202 @@ fn open_server_stream() -> Result<net::U
if let Some(fd) = super::G_SERVER_FD {
return Ok(net::UnixStream::from_raw_fd(fd));
}
Err(Error::default())
}
}
-impl Context for ClientContext {
- fn init(_context_name: Option<&CStr>) -> Result<*mut ffi::cubeb> {
+impl ContextOps for ClientContext {
+ fn init(_context_name: Option<&CStr>) -> Result<Context> {
fn bind_and_send_client(
stream: UnixStream,
handle: &Handle,
- tx_rpc: &mpsc::Sender<rpc::ClientProxy<ServerMessage, ClientMessage>>
+ tx_rpc: &mpsc::Sender<rpc::ClientProxy<ServerMessage, ClientMessage>>,
) -> Option<()> {
let transport = framed_with_fds(stream, Default::default());
let rpc = rpc::bind_client::<CubebClient>(transport, handle);
// If send fails then the rx end has closed
// which is unlikely here.
let _ = tx_rpc.send(rpc);
Some(())
}
assert_not_in_callback();
let (tx_rpc, rx_rpc) = mpsc::channel();
let core = t!(core::spawn_thread("AudioIPC Client RPC", move || {
let handle = core::handle();
- open_server_stream().ok()
+ open_server_stream()
+ .ok()
.and_then(|stream| UnixStream::from_stream(stream, &handle).ok())
.and_then(|stream| bind_and_send_client(stream, &handle, &tx_rpc))
- .ok_or_else(|| io::Error::new(
- io::ErrorKind::Other,
- "Failed to open stream and create rpc."
- ))
+ .ok_or_else(|| {
+ io::Error::new(
+ io::ErrorKind::Other,
+ "Failed to open stream and create rpc.",
+ )
+ })
}));
let rpc = t!(rx_rpc.recv());
let cpupool = futures_cpupool::Builder::new()
.name_prefix("AudioIPC")
.create();
let ctx = Box::new(ClientContext {
_ops: &CLIENT_OPS as *const _,
rpc: rpc,
core: core,
- cpu_pool: cpupool
+ cpu_pool: cpupool,
});
- Ok(Box::into_raw(ctx) as *mut _)
+ Ok(unsafe { Context::from_ptr(Box::into_raw(ctx) as *mut _) })
}
- fn backend_id(&self) -> &'static CStr {
+ fn backend_id(&mut self) -> &'static CStr {
assert_not_in_callback();
unsafe { CStr::from_ptr(b"remote\0".as_ptr() as *const _) }
}
- fn max_channel_count(&self) -> Result<u32> {
+ fn max_channel_count(&mut self) -> Result<u32> {
assert_not_in_callback();
send_recv!(self.rpc(), ContextGetMaxChannelCount => ContextMaxChannelCount())
}
- fn min_latency(&self, params: &StreamParams) -> Result<u32> {
+ fn min_latency(&mut self, params: StreamParams) -> Result<u32> {
assert_not_in_callback();
- let params = messages::StreamParams::from(unsafe { &*params.raw() });
+ let params = messages::StreamParams::from(params.as_ref());
send_recv!(self.rpc(), ContextGetMinLatency(params) => ContextMinLatency())
}
- fn preferred_sample_rate(&self) -> Result<u32> {
+ fn preferred_sample_rate(&mut self) -> Result<u32> {
assert_not_in_callback();
send_recv!(self.rpc(), ContextGetPreferredSampleRate => ContextPreferredSampleRate())
}
- fn preferred_channel_layout(&self) -> Result<ffi::cubeb_channel_layout> {
+ fn preferred_channel_layout(&mut self) -> Result<ChannelLayout> {
assert_not_in_callback();
send_recv!(self.rpc(),
ContextGetPreferredChannelLayout => ContextPreferredChannelLayout())
+ .map(|l| {
+ ChannelLayout::from(l)
+ })
}
- fn enumerate_devices(&self, devtype: DeviceType) -> Result<ffi::cubeb_device_collection> {
+ fn enumerate_devices(
+ &mut self,
+ devtype: DeviceType,
+ collection: &DeviceCollectionRef,
+ ) -> Result<()> {
assert_not_in_callback();
let v: Vec<ffi::cubeb_device_info> = match send_recv!(self.rpc(),
ContextGetDeviceEnumeration(devtype.bits()) =>
ContextEnumeratedDevices())
{
Ok(mut v) => v.drain(..).map(|i| i.into()).collect(),
- Err(e) => return Err(e)
+ Err(e) => return Err(e),
};
- let vs = v.into_boxed_slice();
- let coll = ffi::cubeb_device_collection {
- count: vs.len(),
- device: vs.as_ptr()
- };
+ let mut vs = v.into_boxed_slice();
+ let coll = unsafe { &mut *collection.as_ptr() };
+ coll.device = vs.as_mut_ptr();
+ coll.count = vs.len();
// Giving away the memory owned by vs. Don't free it!
// Reclaimed in `device_collection_destroy`.
mem::forget(vs);
- Ok(coll)
+ Ok(())
}
- fn device_collection_destroy(&self, collection: *mut ffi::cubeb_device_collection) {
+ fn device_collection_destroy(&mut self, collection: &mut DeviceCollectionRef) -> Result<()> {
assert_not_in_callback();
unsafe {
- let coll = &*collection;
+ let coll = &mut *collection.as_ptr();
let mut devices = Vec::from_raw_parts(
coll.device as *mut ffi::cubeb_device_info,
coll.count,
- coll.count
+ coll.count,
);
for dev in &mut devices {
if !dev.device_id.is_null() {
let _ = CString::from_raw(dev.device_id as *mut _);
}
if !dev.group_id.is_null() {
let _ = CString::from_raw(dev.group_id as *mut _);
}
if !dev.vendor_name.is_null() {
let _ = CString::from_raw(dev.vendor_name as *mut _);
}
if !dev.friendly_name.is_null() {
let _ = CString::from_raw(dev.friendly_name as *mut _);
}
}
+ coll.device = ptr::null_mut();
+ coll.count = 0;
+ Ok(())
}
}
fn stream_init(
- &self,
+ &mut self,
stream_name: Option<&CStr>,
input_device: DeviceId,
- input_stream_params: Option<&ffi::cubeb_stream_params>,
+ input_stream_params: Option<&StreamParamsRef>,
output_device: DeviceId,
- output_stream_params: Option<&ffi::cubeb_stream_params>,
+ output_stream_params: Option<&StreamParamsRef>,
latency_frame: u32,
// These params aren't sent to the server
data_callback: ffi::cubeb_data_callback,
state_callback: ffi::cubeb_state_callback,
- user_ptr: *mut c_void
- ) -> Result<*mut ffi::cubeb_stream> {
+ user_ptr: *mut c_void,
+ ) -> Result<Stream> {
assert_not_in_callback();
- fn opt_stream_params(
- p: Option<&ffi::cubeb_stream_params>
- ) -> Option<messages::StreamParams> {
+ fn opt_stream_params(p: Option<&StreamParamsRef>) -> Option<messages::StreamParams> {
match p {
- Some(raw) => Some(messages::StreamParams::from(raw)),
- None => None
+ Some(p) => Some(messages::StreamParams::from(p)),
+ None => None,
}
}
let stream_name = match stream_name {
Some(s) => Some(s.to_bytes().to_vec()),
- None => None
+ None => None,
};
let input_stream_params = opt_stream_params(input_stream_params);
let output_stream_params = opt_stream_params(output_stream_params);
let init_params = messages::StreamInitParams {
stream_name: stream_name,
- input_device: input_device.raw() as _,
+ input_device: input_device as usize,
input_stream_params: input_stream_params,
- output_device: output_device.raw() as _,
+ output_device: output_device as usize,
output_stream_params: output_stream_params,
- latency_frames: latency_frame
+ latency_frames: latency_frame,
};
stream::init(self, init_params, data_callback, state_callback, user_ptr)
}
fn register_device_collection_changed(
- &self,
+ &mut self,
_dev_type: DeviceType,
_collection_changed_callback: ffi::cubeb_device_collection_changed_callback,
- _user_ptr: *mut c_void
+ _user_ptr: *mut c_void,
) -> Result<()> {
assert_not_in_callback();
Ok(())
}
}
impl Drop for ClientContext {
fn drop(&mut self) {
- info!("ClientContext drop...");
+ debug!("ClientContext drop...");
let _ = send_recv!(self.rpc(), ClientDisconnect => ClientDisconnected);
unsafe {
if super::G_SERVER_FD.is_some() {
libc::close(super::G_SERVER_FD.take().unwrap());
}
}
}
}
--- a/media/audioipc/client/src/lib.rs
+++ b/media/audioipc/client/src/lib.rs
@@ -1,33 +1,32 @@
// Copyright © 2017 Mozilla Foundation
//
// This program is made available under an ISC-style license. See the
// accompanying file LICENSE for details.
extern crate audioipc;
#[macro_use]
extern crate cubeb_backend;
-extern crate cubeb_core;
+extern crate foreign_types;
extern crate futures;
extern crate futures_cpupool;
extern crate libc;
#[macro_use]
extern crate log;
extern crate tokio_core;
extern crate tokio_uds;
#[macro_use]
mod send_recv;
mod context;
mod stream;
use context::ClientContext;
-use cubeb_backend::capi;
-use cubeb_core::ffi;
+use cubeb_backend::{capi, ffi};
use std::os::raw::{c_char, c_int};
use std::os::unix::io::RawFd;
use stream::ClientStream;
thread_local!(static IN_CALLBACK: std::cell::RefCell<bool> = std::cell::RefCell::new(false));
fn set_in_callback(in_callback: bool) {
IN_CALLBACK.with(|b| {
@@ -44,17 +43,17 @@ fn assert_not_in_callback() {
static mut G_SERVER_FD: Option<RawFd> = None;
#[no_mangle]
/// Entry point from C code.
pub unsafe extern "C" fn audioipc_client_init(
c: *mut *mut ffi::cubeb,
context_name: *const c_char,
- server_connection: c_int
+ server_connection: c_int,
) -> c_int {
// TODO: Windows portability (for fd).
// TODO: Better way to pass extra parameters to Context impl.
if G_SERVER_FD.is_some() {
panic!("audioipc client's server connection already initialized.");
}
if server_connection >= 0 {
G_SERVER_FD = Some(server_connection);
--- a/media/audioipc/client/src/send_recv.rs
+++ b/media/audioipc/client/src/send_recv.rs
@@ -1,19 +1,19 @@
-use cubeb_core::Error;
-use cubeb_core::ffi;
+use cubeb_backend::Error;
+use std::os::raw::c_int;
#[doc(hidden)]
pub fn _err<E>(e: E) -> Error
where
- E: Into<Option<ffi::cubeb_error_code>>
+ E: Into<Option<c_int>>,
{
match e.into() {
Some(e) => unsafe { Error::from_raw(e) },
- None => Error::new()
+ None => Error::error(),
}
}
#[macro_export]
macro_rules! send_recv {
($rpc:expr, $smsg:ident => $rmsg:ident) => {{
let resp = send_recv!(__send $rpc, $smsg);
send_recv!(__recv resp, $rmsg)
--- a/media/audioipc/client/src/stream.rs
+++ b/media/audioipc/client/src/stream.rs
@@ -5,116 +5,135 @@
use {assert_not_in_callback, set_in_callback};
use ClientContext;
use audioipc::codec::LengthDelimitedCodec;
use audioipc::frame::{framed, Framed};
use audioipc::messages::{self, CallbackReq, CallbackResp, ClientMessage, ServerMessage};
use audioipc::rpc;
use audioipc::shm::{SharedMemMutSlice, SharedMemSlice};
-use cubeb_backend::Stream;
-use cubeb_core::{ffi, Result};
+use cubeb_backend::{ffi, DeviceRef, Error, Result, Stream, StreamOps};
use futures::Future;
use futures_cpupool::{CpuFuture, CpuPool};
use std::ffi::CString;
use std::fs::File;
use std::os::raw::c_void;
use std::os::unix::io::FromRawFd;
use std::os::unix::net;
use std::ptr;
use std::sync::mpsc;
use tokio_uds::UnixStream;
// TODO: Remove and let caller allocate based on cubeb backend requirements.
const SHM_AREA_SIZE: usize = 2 * 1024 * 1024;
+pub struct Device(ffi::cubeb_device);
+
+impl Drop for Device {
+ fn drop(&mut self) {
+ unsafe {
+ if !self.0.input_name.is_null() {
+ let _ = CString::from_raw(self.0.input_name as *mut _);
+ }
+ if !self.0.output_name.is_null() {
+ let _ = CString::from_raw(self.0.output_name as *mut _);
+ }
+ }
+ }
+}
+
+#[derive(Debug)]
pub struct ClientStream<'ctx> {
- // This must be a reference to Context for cubeb, cubeb accesses stream methods via stream->context->ops
+ // This must be a reference to Context for cubeb, cubeb accesses
+ // stream methods via stream->context->ops
context: &'ctx ClientContext,
- token: usize
+ token: usize,
}
struct CallbackServer {
input_shm: SharedMemSlice,
output_shm: SharedMemMutSlice,
data_cb: ffi::cubeb_data_callback,
state_cb: ffi::cubeb_state_callback,
user_ptr: usize,
- cpu_pool: CpuPool
+ cpu_pool: CpuPool,
}
impl rpc::Server for CallbackServer {
type Request = CallbackReq;
type Response = CallbackResp;
type Future = CpuFuture<Self::Response, ()>;
type Transport = Framed<UnixStream, LengthDelimitedCodec<Self::Response, Self::Request>>;
fn process(&mut self, req: Self::Request) -> Self::Future {
match req {
CallbackReq::Data(nframes, frame_size) => {
- info!(
+ debug!(
"stream_thread: Data Callback: nframes={} frame_size={}",
- nframes,
- frame_size
+ nframes, frame_size
);
// Clone values that need to be moved into the cpu pool thread.
let input_shm = unsafe { self.input_shm.clone_view() };
let mut output_shm = unsafe { self.output_shm.clone_view() };
let user_ptr = self.user_ptr;
- let cb = self.data_cb;
+ let cb = self.data_cb.unwrap();
self.cpu_pool.spawn_fn(move || {
// TODO: This is proof-of-concept. Make it better.
let input_ptr: *const u8 = input_shm
.get_slice(nframes as usize * frame_size)
.unwrap()
.as_ptr();
let output_ptr: *mut u8 = output_shm
.get_mut_slice(nframes as usize * frame_size)
.unwrap()
.as_mut_ptr();
set_in_callback(true);
- let nframes = cb(
- ptr::null_mut(),
- user_ptr as *mut c_void,
- input_ptr as *const _,
- output_ptr as *mut _,
- nframes as _
- );
+ let nframes = unsafe {
+ cb(
+ ptr::null_mut(),
+ user_ptr as *mut c_void,
+ input_ptr as *const _,
+ output_ptr as *mut _,
+ nframes as _,
+ )
+ };
set_in_callback(false);
Ok(CallbackResp::Data(nframes as isize))
})
- },
+ }
CallbackReq::State(state) => {
- info!("stream_thread: State Callback: {:?}", state);
+ debug!("stream_thread: State Callback: {:?}", state);
let user_ptr = self.user_ptr;
- let cb = self.state_cb;
+ let cb = self.state_cb.unwrap();
self.cpu_pool.spawn_fn(move || {
set_in_callback(true);
- cb(ptr::null_mut(), user_ptr as *mut _, state);
+ unsafe {
+ cb(ptr::null_mut(), user_ptr as *mut _, state);
+ }
set_in_callback(false);
Ok(CallbackResp::State)
})
}
}
}
}
impl<'ctx> ClientStream<'ctx> {
fn init(
ctx: &'ctx ClientContext,
init_params: messages::StreamInitParams,
data_callback: ffi::cubeb_data_callback,
state_callback: ffi::cubeb_state_callback,
- user_ptr: *mut c_void
- ) -> Result<*mut ffi::cubeb_stream> {
+ user_ptr: *mut c_void,
+ ) -> Result<Stream> {
assert_not_in_callback();
let rpc = ctx.rpc();
let data = try!(send_recv!(rpc, StreamInit(init_params) => StreamCreated()));
trace!("token = {}, fds = {:?}", data.token, data.fds);
let stm = data.fds[0];
@@ -133,124 +152,121 @@ impl<'ctx> ClientStream<'ctx> {
let cpu_pool = ctx.cpu_pool();
let server = CallbackServer {
input_shm: input_shm,
output_shm: output_shm,
data_cb: data_callback,
state_cb: state_callback,
user_ptr: user_data,
- cpu_pool: cpu_pool
+ cpu_pool: cpu_pool,
};
let (wait_tx, wait_rx) = mpsc::channel();
ctx.remote().spawn(move |handle| {
let stream = UnixStream::from_stream(stream, handle).unwrap();
let transport = framed(stream, Default::default());
rpc::bind_server(transport, server, handle);
wait_tx.send(()).unwrap();
Ok(())
});
wait_rx.recv().unwrap();
- Ok(Box::into_raw(Box::new(ClientStream {
+ let stream = Box::into_raw(Box::new(ClientStream {
context: ctx,
- token: data.token
- })) as _)
+ token: data.token,
+ }));
+ Ok(unsafe { Stream::from_ptr(stream as *mut _) })
}
}
impl<'ctx> Drop for ClientStream<'ctx> {
fn drop(&mut self) {
trace!("ClientStream drop...");
let rpc = self.context.rpc();
let _ = send_recv!(rpc, StreamDestroy(self.token) => StreamDestroyed);
}
}
-impl<'ctx> Stream for ClientStream<'ctx> {
- fn start(&self) -> Result<()> {
+impl<'ctx> StreamOps for ClientStream<'ctx> {
+ fn start(&mut self) -> Result<()> {
assert_not_in_callback();
let rpc = self.context.rpc();
send_recv!(rpc, StreamStart(self.token) => StreamStarted)
}
- fn stop(&self) -> Result<()> {
+ fn stop(&mut self) -> Result<()> {
assert_not_in_callback();
let rpc = self.context.rpc();
send_recv!(rpc, StreamStop(self.token) => StreamStopped)
}
- fn reset_default_device(&self) -> Result<()> {
+ fn reset_default_device(&mut self) -> Result<()> {
assert_not_in_callback();
let rpc = self.context.rpc();
send_recv!(rpc, StreamResetDefaultDevice(self.token) => StreamDefaultDeviceReset)
}
- fn position(&self) -> Result<u64> {
+ fn position(&mut self) -> Result<u64> {
assert_not_in_callback();
let rpc = self.context.rpc();
send_recv!(rpc, StreamGetPosition(self.token) => StreamPosition())
}
- fn latency(&self) -> Result<u32> {
+ fn latency(&mut self) -> Result<u32> {
assert_not_in_callback();
let rpc = self.context.rpc();
send_recv!(rpc, StreamGetLatency(self.token) => StreamLatency())
}
- fn set_volume(&self, volume: f32) -> Result<()> {
+ fn set_volume(&mut self, volume: f32) -> Result<()> {
assert_not_in_callback();
let rpc = self.context.rpc();
send_recv!(rpc, StreamSetVolume(self.token, volume) => StreamVolumeSet)
}
- fn set_panning(&self, panning: f32) -> Result<()> {
+ fn set_panning(&mut self, panning: f32) -> Result<()> {
assert_not_in_callback();
let rpc = self.context.rpc();
send_recv!(rpc, StreamSetPanning(self.token, panning) => StreamPanningSet)
}
- fn current_device(&self) -> Result<*const ffi::cubeb_device> {
+ fn current_device(&mut self) -> Result<&DeviceRef> {
assert_not_in_callback();
let rpc = self.context.rpc();
match send_recv!(rpc, StreamGetCurrentDevice(self.token) => StreamCurrentDevice()) {
- Ok(d) => Ok(Box::into_raw(Box::new(d.into()))),
- Err(e) => Err(e)
+ Ok(d) => Ok(unsafe { DeviceRef::from_ptr(Box::into_raw(Box::new(d.into()))) }),
+ Err(e) => Err(e),
}
}
- fn device_destroy(&self, device: *const ffi::cubeb_device) -> Result<()> {
+ fn device_destroy(&mut self, device: &DeviceRef) -> Result<()> {
assert_not_in_callback();
// It's all unsafe...
- if !device.is_null() {
+ if device.as_ptr().is_null() {
+ Err(Error::error())
+ } else {
unsafe {
- if !(*device).output_name.is_null() {
- let _ = CString::from_raw((*device).output_name as *mut _);
- }
- if !(*device).input_name.is_null() {
- let _ = CString::from_raw((*device).input_name as *mut _);
- }
- let _: Box<ffi::cubeb_device> = Box::from_raw(device as *mut _);
+ let _: Box<Device> = Box::from_raw(device.as_ptr() as *mut _);
}
+ Ok(())
}
- Ok(())
}
// TODO: How do we call this back? On what thread?
fn register_device_changed_callback(
- &self,
- _device_changed_callback: ffi::cubeb_device_changed_callback
+ &mut self,
+ _device_changed_callback: ffi::cubeb_device_changed_callback,
) -> Result<()> {
assert_not_in_callback();
Ok(())
}
}
pub fn init(
ctx: &ClientContext,
init_params: messages::StreamInitParams,
data_callback: ffi::cubeb_data_callback,
state_callback: ffi::cubeb_state_callback,
- user_ptr: *mut c_void
-) -> Result<*mut ffi::cubeb_stream> {
+ user_ptr: *mut c_void,
+) -> Result<Stream> {
ClientStream::init(ctx, init_params, data_callback, state_callback, user_ptr)
}
--- a/media/audioipc/gecko.patch
+++ b/media/audioipc/gecko.patch
@@ -7,54 +7,9 @@ Subject: gecko: Change paths to vendored
diff --git a/media/audioipc/Cargo.toml b/media/audioipc/Cargo.toml
index ede6064..d0a1979 100644
--- a/media/audioipc/Cargo.toml
+++ b/media/audioipc/Cargo.toml
@@ -1,2 +1,2 @@
[workspace]
-members = ["audioipc", "client", "server", "ipctest"]
+members = ["audioipc", "client", "server"]
-diff --git a/audioipc/Cargo.toml b/audioipc/Cargo.toml
-index 669c6ff..308cb5c 100644
---- a/media/audioipc/audioipc/Cargo.toml
-+++ b/media/audioipc/audioipc/Cargo.toml
-@@ -8,7 +8,7 @@ authors = [
- description = "Remote Cubeb IPC"
-
- [dependencies]
--cubeb-core = { git = "https://github.com/djg/cubeb-rs", version="^0.1" }
-+cubeb-core = { path = "../../cubeb-rs/cubeb-core" }
- bincode = "0.8"
- bytes = "0.4"
- # rayon-core in Gecko uses futures 0.1.13
-diff --git a/client/Cargo.toml b/client/Cargo.toml
-index c81b19a..9e3f8a5 100644
---- a/media/audioipc/client/Cargo.toml
-+++ b/media/audioipc/client/Cargo.toml
-@@ -9,8 +9,8 @@ description = "Cubeb Backend for talking to remote cubeb server."
-
- [dependencies]
- audioipc = { path="../audioipc" }
--cubeb-backend = { git="https://github.com/djg/cubeb-rs", version="^0.2" }
--cubeb-core = { git="https://github.com/djg/cubeb-rs", version="^0.1" }
-+cubeb-backend = { path = "../../cubeb-rs/cubeb-backend" }
-+cubeb-core = { path = "../../cubeb-rs/cubeb-core" }
- # rayon-core in Gecko uses futures 0.1.13
- futures = { version="=0.1.13", default-features=false, features=["use_std"] }
- # futures-cpupool 0.1.5 matches futures 0.1.13
-diff --git a/server/Cargo.toml b/server/Cargo.toml
-index 5b79b83..01463be 100644
---- a/media/audioipc/server/Cargo.toml
-+++ b/media/audioipc/server/Cargo.toml
-@@ -9,8 +9,8 @@ description = "Remote cubeb server"
-
- [dependencies]
- audioipc = { path = "../audioipc" }
--cubeb-core = { git = "https://github.com/djg/cubeb-rs", version="^0.1" }
--cubeb = { git = "https://github.com/djg/cubeb-rs", version="^0.3.2" }
-+cubeb-core = { path = "../../cubeb-rs/cubeb-core" }
-+cubeb = { path = "../../cubeb-rs/cubeb-api" }
- bytes = "0.4"
- lazycell = "^0.4"
- libc = "0.2"
---
-2.10.2
--- a/media/audioipc/server/Cargo.toml
+++ b/media/audioipc/server/Cargo.toml
@@ -4,18 +4,17 @@ version = "0.2.1"
authors = [
"Matthew Gregan <kinetik@flim.org>",
"Dan Glastonbury <dan.glastonbury@gmail.com>"
]
description = "Remote cubeb server"
[dependencies]
audioipc = { path = "../audioipc" }
-cubeb-core = { path = "../../cubeb-rs/cubeb-core" }
-cubeb = { path = "../../cubeb-rs/cubeb-api" }
+cubeb = "0.4"
bytes = "0.4"
lazycell = "^0.4"
libc = "0.2"
log = "^0.3.6"
slab = "0.3.0"
# rayon-core in Gecko uses futures 0.1.13
futures = "=0.1.13"
tokio-core = "0.1"
--- a/media/audioipc/server/src/lib.rs
+++ b/media/audioipc/server/src/lib.rs
@@ -2,34 +2,31 @@
extern crate error_chain;
#[macro_use]
extern crate log;
extern crate audioipc;
extern crate bytes;
extern crate cubeb;
-extern crate cubeb_core;
extern crate futures;
extern crate lazycell;
extern crate libc;
extern crate slab;
extern crate tokio_core;
extern crate tokio_uds;
use audioipc::codec::LengthDelimitedCodec;
use audioipc::core;
use audioipc::fd_passing::{framed_with_fds, FramedWithFds};
use audioipc::frame::{framed, Framed};
-use audioipc::messages::{CallbackReq, CallbackResp, ClientMessage, DeviceInfo, ServerMessage,
- StreamCreate, StreamInitParams, StreamParams};
+use audioipc::messages::{CallbackReq, CallbackResp, ClientMessage, Device, DeviceInfo,
+ ServerMessage, StreamCreate, StreamInitParams, StreamParams};
use audioipc::rpc;
use audioipc::shm::{SharedMemReader, SharedMemWriter};
-use cubeb_core::binding::Binding;
-use cubeb_core::ffi;
use futures::Future;
use futures::future::{self, FutureResult};
use futures::sync::oneshot;
use std::{ptr, slice};
use std::cell::RefCell;
use std::convert::From;
use std::error::Error;
use std::os::raw::c_void;
@@ -39,35 +36,36 @@ use tokio_core::reactor::Remote;
use tokio_uds::UnixStream;
pub mod errors {
error_chain! {
links {
AudioIPC(::audioipc::errors::Error, ::audioipc::errors::ErrorKind);
}
foreign_links {
- Cubeb(::cubeb_core::Error);
+ Cubeb(::cubeb::Error);
Io(::std::io::Error);
Canceled(::futures::sync::oneshot::Canceled);
}
}
}
use errors::*;
-thread_local!(static CONTEXT_KEY: RefCell<Option<cubeb::Result<cubeb::Context>>> = RefCell::new(None));
+type ContextKey = RefCell<Option<cubeb::Result<cubeb::Context>>>;
+thread_local!(static CONTEXT_KEY:ContextKey = RefCell::new(None));
fn with_local_context<T, F>(f: F) -> T
where
- F: FnOnce(&cubeb::Result<cubeb::Context>) -> T
+ F: FnOnce(&cubeb::Result<cubeb::Context>) -> T,
{
CONTEXT_KEY.with(|k| {
let mut context = k.borrow_mut();
if context.is_none() {
- *context = Some(cubeb::Context::init("AudioIPC Server", None));
+ *context = Some(cubeb::init("AudioIPC Server"));
}
f(context.as_ref().unwrap())
})
}
// TODO: Remove and let caller allocate based on cubeb backend requirements.
const SHM_AREA_SIZE: usize = 2 * 1024 * 1024;
@@ -77,130 +75,56 @@ const STREAM_CONN_CHUNK_SIZE: usize = 64
struct CallbackClient;
impl rpc::Client for CallbackClient {
type Request = CallbackReq;
type Response = CallbackResp;
type Transport = Framed<UnixStream, LengthDelimitedCodec<Self::Request, Self::Response>>;
}
-// TODO: this should forward to the client.
-struct Callback {
- /// Size of input frame in bytes
- input_frame_size: u16,
- /// Size of output frame in bytes
- output_frame_size: u16,
- input_shm: SharedMemWriter,
- output_shm: SharedMemReader,
- rpc: rpc::ClientProxy<CallbackReq, CallbackResp>
-}
-
-impl cubeb::StreamCallback for Callback {
- type Frame = u8;
-
- fn data_callback(&mut self, input: &[u8], output: &mut [u8]) -> isize {
- trace!("Stream data callback: {} {}", input.len(), output.len());
-
- // len is of input and output is frame len. Turn these into the real lengths.
- let real_input = unsafe {
- let size_bytes = input.len() * self.input_frame_size as usize;
- slice::from_raw_parts(input.as_ptr(), size_bytes)
- };
- let real_output = unsafe {
- let size_bytes = output.len() * self.output_frame_size as usize;
- trace!("Resize output to {}", size_bytes);
- slice::from_raw_parts_mut(output.as_mut_ptr(), size_bytes)
- };
-
- self.input_shm.write(real_input).unwrap();
-
- let r = self.rpc
- .call(CallbackReq::Data(
- output.len() as isize,
- self.output_frame_size as usize
- ))
- .wait();
-
- match r {
- Ok(CallbackResp::Data(cb_result)) => if cb_result >= 0 {
- let len = cb_result as usize * self.output_frame_size as usize;
- self.output_shm.read(&mut real_output[..len]).unwrap();
- cb_result
- } else {
- cb_result
- },
- _ => {
- debug!("Unexpected message {:?} during data_callback", r);
- -1
- }
- }
- }
-
- fn state_callback(&mut self, state: cubeb::State) {
- info!("Stream state callback: {:?}", state);
- // TODO: Share this conversion with the same code in cubeb-rs?
- let state = match state {
- cubeb::State::Started => ffi::CUBEB_STATE_STARTED,
- cubeb::State::Stopped => ffi::CUBEB_STATE_STOPPED,
- cubeb::State::Drained => ffi::CUBEB_STATE_DRAINED,
- cubeb::State::Error => ffi::CUBEB_STATE_ERROR
- };
-
- let r = self.rpc.call(CallbackReq::State(state)).wait();
-
- match r {
- Ok(CallbackResp::State) => {},
- _ => {
- debug!("Unexpected message {:?} during callback", r);
- }
- };
- }
-}
-
-type StreamSlab = slab::Slab<cubeb::Stream<Callback>, usize>;
+type StreamSlab = slab::Slab<cubeb::Stream<u8>, usize>;
pub struct CubebServer {
cb_remote: Remote,
- streams: StreamSlab
+ streams: StreamSlab,
}
impl rpc::Server for CubebServer {
type Request = ServerMessage;
type Response = ClientMessage;
type Future = FutureResult<Self::Response, ()>;
- type Transport =
- FramedWithFds<UnixStream, LengthDelimitedCodec<Self::Response, Self::Request>>;
+ type Transport = FramedWithFds<UnixStream, LengthDelimitedCodec<Self::Response, Self::Request>>;
fn process(&mut self, req: Self::Request) -> Self::Future {
let resp = with_local_context(|context| match *context {
- Err(_) => error(cubeb::Error::new()),
- Ok(ref context) => self.process_msg(context, &req)
+ Err(_) => error(cubeb::Error::error()),
+ Ok(ref context) => self.process_msg(context, &req),
});
future::ok(resp)
}
}
impl CubebServer {
pub fn new(cb_remote: Remote) -> Self {
CubebServer {
cb_remote: cb_remote,
- streams: StreamSlab::with_capacity(STREAM_CONN_CHUNK_SIZE)
+ streams: StreamSlab::with_capacity(STREAM_CONN_CHUNK_SIZE),
}
}
// Process a request coming from the client.
fn process_msg(&mut self, context: &cubeb::Context, msg: &ServerMessage) -> ClientMessage {
let resp: ClientMessage = match *msg {
ServerMessage::ClientConnect => panic!("already connected"),
ServerMessage::ClientDisconnect => {
// TODO:
//self.connection.client_disconnect();
ClientMessage::ClientDisconnected
- },
+ }
ServerMessage::ContextGetBackendId => ClientMessage::ContextBackendId(),
ServerMessage::ContextGetMaxChannelCount => context
.max_channel_count()
.map(ClientMessage::ContextMaxChannelCount)
.unwrap_or_else(error),
@@ -214,58 +138,58 @@ impl CubebServer {
.channels(u32::from(params.channels))
.layout(layout)
.take();
context
.min_latency(¶ms)
.map(ClientMessage::ContextMinLatency)
.unwrap_or_else(error)
- },
+ }
ServerMessage::ContextGetPreferredSampleRate => context
.preferred_sample_rate()
.map(ClientMessage::ContextPreferredSampleRate)
.unwrap_or_else(error),
ServerMessage::ContextGetPreferredChannelLayout => context
.preferred_channel_layout()
.map(|l| ClientMessage::ContextPreferredChannelLayout(l as _))
.unwrap_or_else(error),
ServerMessage::ContextGetDeviceEnumeration(device_type) => context
.enumerate_devices(cubeb::DeviceType::from_bits_truncate(device_type))
.map(|devices| {
- let v: Vec<DeviceInfo> = devices.iter().map(|i| i.raw().into()).collect();
+ let v: Vec<DeviceInfo> = devices.iter().map(|i| i.as_ref().into()).collect();
ClientMessage::ContextEnumeratedDevices(v)
})
.unwrap_or_else(error),
ServerMessage::StreamInit(ref params) => self.process_stream_init(context, params)
- .unwrap_or_else(|_| error(cubeb::Error::new())),
+ .unwrap_or_else(|_| error(cubeb::Error::error())),
ServerMessage::StreamDestroy(stm_tok) => {
self.streams.remove(stm_tok);
ClientMessage::StreamDestroyed
- },
+ }
ServerMessage::StreamStart(stm_tok) => {
let _ = self.streams[stm_tok].start();
ClientMessage::StreamStarted
- },
+ }
ServerMessage::StreamStop(stm_tok) => {
let _ = self.streams[stm_tok].stop();
ClientMessage::StreamStopped
- },
+ }
ServerMessage::StreamResetDefaultDevice(stm_tok) => {
let _ = self.streams[stm_tok].reset_default_device();
ClientMessage::StreamDefaultDeviceReset
- },
+ }
ServerMessage::StreamGetPosition(stm_tok) => self.streams[stm_tok]
.position()
.map(ClientMessage::StreamPosition)
.unwrap_or_else(error),
ServerMessage::StreamGetLatency(stm_tok) => self.streams[stm_tok]
.latency()
@@ -279,86 +203,60 @@ impl CubebServer {
ServerMessage::StreamSetPanning(stm_tok, panning) => self.streams[stm_tok]
.set_panning(panning)
.map(|_| ClientMessage::StreamPanningSet)
.unwrap_or_else(error),
ServerMessage::StreamGetCurrentDevice(stm_tok) => self.streams[stm_tok]
.current_device()
- .map(|device| ClientMessage::StreamCurrentDevice(device.into()))
- .unwrap_or_else(error)
+ .map(|device| ClientMessage::StreamCurrentDevice(Device::from(device)))
+ .unwrap_or_else(error),
};
debug!("process_msg: req={:?}, resp={:?}", msg, resp);
resp
}
// Stream init is special, so it's been separated from process_msg.
fn process_stream_init(
&mut self,
context: &cubeb::Context,
- params: &StreamInitParams
+ params: &StreamInitParams,
) -> Result<ClientMessage> {
- fn opt_stream_params(params: Option<&StreamParams>) -> Option<cubeb::StreamParams> {
- params.and_then(|p| {
- let raw = ffi::cubeb_stream_params::from(p);
- Some(unsafe { cubeb::StreamParams::from_raw(&raw as *const _) })
- })
- }
-
- fn frame_size_in_bytes(params: Option<cubeb::StreamParams>) -> u16 {
+ fn frame_size_in_bytes(params: Option<&StreamParams>) -> u16 {
params
.map(|p| {
- let sample_size = match p.format() {
+ let format = p.format.into();
+ let sample_size = match format {
cubeb::SampleFormat::S16LE
| cubeb::SampleFormat::S16BE
| cubeb::SampleFormat::S16NE => 2,
cubeb::SampleFormat::Float32LE
| cubeb::SampleFormat::Float32BE
- | cubeb::SampleFormat::Float32NE => 4
+ | cubeb::SampleFormat::Float32NE => 4,
};
- let channel_count = p.channels() as u16;
+ let channel_count = p.channels as u16;
sample_size * channel_count
})
.unwrap_or(0u16)
}
// TODO: Yuck!
- let input_device =
- unsafe { cubeb::DeviceId::from_raw(params.input_device as *const _) };
- let output_device =
- unsafe { cubeb::DeviceId::from_raw(params.output_device as *const _) };
+ let input_device = params.input_device as *const _;
+ let output_device = params.output_device as *const _;
let latency = params.latency_frames;
- let mut builder = cubeb::StreamInitOptionsBuilder::new();
- builder
- .input_device(input_device)
- .output_device(output_device)
- .latency(latency);
- if let Some(ref stream_name) = params.stream_name {
- builder.stream_name(stream_name);
- }
- let input_stream_params = opt_stream_params(params.input_stream_params.as_ref());
- if let Some(ref isp) = input_stream_params {
- builder.input_stream_param(isp);
- }
- let output_stream_params = opt_stream_params(params.output_stream_params.as_ref());
- if let Some(ref osp) = output_stream_params {
- builder.output_stream_param(osp);
- }
- let params = builder.take();
-
- let input_frame_size = frame_size_in_bytes(input_stream_params);
- let output_frame_size = frame_size_in_bytes(output_stream_params);
+ let input_frame_size = frame_size_in_bytes(params.input_stream_params.as_ref());
+ let output_frame_size = frame_size_in_bytes(params.output_stream_params.as_ref());
let (stm1, stm2) = net::UnixStream::pair()?;
- info!("Created callback pair: {:?}-{:?}", stm1, stm2);
- let (input_shm, input_file) =
+ debug!("Created callback pair: {:?}-{:?}", stm1, stm2);
+ let (mut input_shm, input_file) =
SharedMemWriter::new(&audioipc::get_shm_path("input"), SHM_AREA_SIZE)?;
let (output_shm, output_file) =
SharedMemReader::new(&audioipc::get_shm_path("output"), SHM_AREA_SIZE)?;
// This code is currently running on the Client/Server RPC
// handling thread. We need to move the registration of the
// bind_client to the callback RPC handling thread. This is
// done by spawning a future on cb_remote.
@@ -372,69 +270,128 @@ impl CubebServer {
assert_ne!(id, handle.id());
let stream = UnixStream::from_stream(stm2, handle).unwrap();
let transport = framed(stream, Default::default());
let rpc = rpc::bind_client::<CallbackClient>(transport, handle);
drop(tx.send(rpc));
Ok(())
});
- let rpc: rpc::ClientProxy<CallbackReq, CallbackResp> = match rx.wait() {
+ let rpc_data: rpc::ClientProxy<CallbackReq, CallbackResp> = match rx.wait() {
Ok(rpc) => rpc,
- Err(_) => bail!("Failed to create callback rpc.")
+ Err(_) => bail!("Failed to create callback rpc."),
};
+ let rpc_state = rpc_data.clone();
+
+ let mut builder = cubeb::StreamBuilder::new();
+
+ if let Some(ref stream_name) = params.stream_name {
+ builder.name(stream_name.clone());
+ }
+
+ if let Some(ref isp) = params.input_stream_params {
+ let input_stream_params =
+ unsafe { cubeb::StreamParamsRef::from_ptr(isp as *const StreamParams as *mut _) };
+ builder.input(input_device, input_stream_params);
+ }
+
+ if let Some(ref osp) = params.output_stream_params {
+ let output_stream_params =
+ unsafe { cubeb::StreamParamsRef::from_ptr(osp as *const StreamParams as *mut _) };
+ builder.output(output_device, output_stream_params);
+ }
+
+ builder
+ .latency(latency)
+ .data_callback(move |input, output| {
+ trace!("Stream data callback: {} {}", input.len(), output.len());
+
+ // len is of input and output is frame len. Turn these into the real lengths.
+ let real_input = unsafe {
+ let nbytes = input.len() * input_frame_size as usize;
+ slice::from_raw_parts(input.as_ptr(), nbytes)
+ };
+
+ input_shm.write(real_input).unwrap();
- context
- .stream_init(
- ¶ms,
- Callback {
- input_frame_size: input_frame_size,
- output_frame_size: output_frame_size,
- input_shm: input_shm,
- output_shm: output_shm,
- rpc: rpc
+ let r = rpc_data
+ .call(CallbackReq::Data(
+ output.len() as isize,
+ output_frame_size as usize,
+ ))
+ .wait();
+
+ match r {
+ Ok(CallbackResp::Data(frames)) => {
+ if frames >= 0 {
+ let nbytes = frames as usize * output_frame_size as usize;
+ let real_output = unsafe {
+ trace!("Resize output to {}", nbytes);
+ slice::from_raw_parts_mut(output.as_mut_ptr(), nbytes)
+ };
+ output_shm.read(&mut real_output[..nbytes]).unwrap();
+ }
+ frames
+ }
+ _ => {
+ debug!("Unexpected message {:?} during data_callback", r);
+ -1
+ }
}
- )
+ })
+ .state_callback(move |state| {
+ trace!("Stream state callback: {:?}", state);
+ let r = rpc_state.call(CallbackReq::State(state.into())).wait();
+ match r {
+ Ok(CallbackResp::State) => {}
+ _ => {
+ debug!("Unexpected message {:?} during callback", r);
+ }
+ }
+ });
+
+ builder
+ .init(context)
.and_then(|stream| {
if !self.streams.has_available() {
trace!(
"server connection ran out of stream slots. reserving {} more.",
STREAM_CONN_CHUNK_SIZE
);
self.streams.reserve_exact(STREAM_CONN_CHUNK_SIZE);
}
let stm_tok = match self.streams.vacant_entry() {
Some(entry) => {
debug!("Registering stream {:?}", entry.index(),);
entry.insert(stream).index()
- },
+ }
None => {
// TODO: Turn into error
panic!("Failed to insert stream into slab. No entries")
}
};
Ok(ClientMessage::StreamCreated(StreamCreate {
token: stm_tok,
fds: [
stm1.into_raw_fd(),
input_file.into_raw_fd(),
output_file.into_raw_fd(),
- ]
+ ],
}))
})
.map_err(|e| e.into())
}
}
struct ServerWrapper {
core_thread: core::CoreThread,
- callback_thread: core::CoreThread
+ callback_thread: core::CoreThread,
}
fn run() -> Result<ServerWrapper> {
trace!("Starting up cubeb audio server event loop thread...");
let callback_thread = try!(
core::spawn_thread("AudioIPC Callback RPC", || {
trace!("Starting up cubeb audio callback event loop thread...");
@@ -455,25 +412,25 @@ fn run() -> Result<ServerWrapper> {
e.description()
);
Err(e)
})
);
Ok(ServerWrapper {
core_thread: core_thread,
- callback_thread: callback_thread
+ callback_thread: callback_thread,
})
}
#[no_mangle]
pub extern "C" fn audioipc_server_start() -> *mut c_void {
match run() {
Ok(server) => Box::into_raw(Box::new(server)) as *mut _,
- Err(_) => ptr::null_mut() as *mut _
+ Err(_) => ptr::null_mut() as *mut _,
}
}
#[no_mangle]
pub extern "C" fn audioipc_server_new_client(p: *mut c_void) -> libc::c_int {
let (wait_tx, wait_rx) = oneshot::channel();
let wrapper: &ServerWrapper = unsafe { &*(p as *mut _) };