Bug 1410107 - Grow audioipc server slabs as required. r?kinetik draft
authorDan Glastonbury <dan.glastonbury@gmail.com>
Wed, 25 Oct 2017 09:14:45 +1000
changeset 685622 d458cab5582f26d26f783466e4e294a4040b6caf
parent 685069 9056f2ee492fa481aa86146aba236c074628e9fd
child 737216 2e358bb4a7e4eeb3d2eebf495b3fef263ccfafea
push id86003
push userbmo:dglastonbury@mozilla.com
push dateTue, 24 Oct 2017 23:19:46 +0000
reviewerskinetik
bugs1410107
milestone58.0a1
Bug 1410107 - Grow audioipc server slabs as required. r?kinetik MozReview-Commit-ID: zFfex3LX7K
media/audioipc/server/src/lib.rs
--- a/media/audioipc/server/src/lib.rs
+++ b/media/audioipc/server/src/lib.rs
@@ -48,16 +48,22 @@ pub mod errors {
     }
 }
 
 use errors::*;
 
 // TODO: Remove and let caller allocate based on cubeb backend requirements.
 const SHM_AREA_SIZE: usize = 2 * 1024 * 1024;
 
+// The size in which the server conns slab is grown.
+const SERVER_CONN_CHUNK_SIZE: usize = 16;
+
+// The size in which the stream slab is grown.
+const STREAM_CONN_CHUNK_SIZE: usize = 64;
+
 // TODO: this should forward to the client.
 struct Callback {
     /// Size of input frame in bytes
     input_frame_size: u16,
     /// Size of output frame in bytes
     output_frame_size: u16,
     connection: audioipc::Connection,
     input_shm: SharedMemWriter,
@@ -158,17 +164,17 @@ struct ServerConn {
 }
 
 impl ServerConn {
     fn new(io: UnixStream) -> ServerConn {
         let mut sc = ServerConn {
             io: io,
             token: None,
             // TODO: Handle increasing slab size. Pick a good default size.
-            streams: StreamSlab::with_capacity(64),
+            streams: StreamSlab::with_capacity(STREAM_CONN_CHUNK_SIZE),
             decoder: Decoder::new(),
             recv_buffer: BytesMut::with_capacity(4096),
             send_buffer: BytesMut::with_capacity(4096),
             pending_send: VecDeque::new(),
             device_ids: HashSet::new()
         };
         sc.device_ids.insert(0); // nullptr is always a valid (default) device id.
         sc
@@ -430,16 +436,24 @@ impl ServerConn {
                 input_frame_size: input_frame_size,
                 output_frame_size: output_frame_size,
                 connection: conn2,
                 input_shm: input_shm,
                 output_shm: output_shm
             }
         ) {
             Ok(stream) => {
+                if !self.streams.has_available() {
+                    trace!(
+                        "server connection ran out of stream slots. reserving {} more.",
+                        STREAM_CONN_CHUNK_SIZE
+                    );
+                    self.streams.reserve_exact(STREAM_CONN_CHUNK_SIZE);
+                }
+
                 let stm_tok = match self.streams.vacant_entry() {
                     Some(entry) => {
                         debug!(
                             "Registering stream {:?}",
                             entry.index(),
                         );
 
                         entry.insert(stream).index()
@@ -577,31 +591,40 @@ pub struct Server {
     conns: Slab<ServerConn>
 }
 
 impl Server {
     pub fn new(socket: UnixListener) -> Server {
         Server {
             socket: socket,
             context: None,
-            conns: Slab::with_capacity(16)
+            conns: Slab::with_capacity(SERVER_CONN_CHUNK_SIZE)
         }
     }
 
     fn accept(&mut self, poll: &mut mio::Poll) -> Result<()> {
         debug!("Server accepting connection");
 
         let client_socket = match self.socket.accept() {
             Err(e) => {
                 error!("server accept error: {}", e);
                 return Err(e.into());
             },
             Ok(None) => panic!("accept returned EAGAIN unexpectedly"),
             Ok(Some((socket, _))) => socket,
         };
+
+        if !self.conns.has_available() {
+            trace!(
+                "server ran out of connection slots. reserving {} more.",
+                SERVER_CONN_CHUNK_SIZE
+            );
+            self.conns.reserve_exact(SERVER_CONN_CHUNK_SIZE);
+        }
+
         let token = match self.conns.vacant_entry() {
             Some(entry) => {
                 debug!("registering {:?}", entry.index());
                 let cxn = ServerConn::new(client_socket);
                 entry.insert(cxn).index()
             },
             None => {
                 panic!("failed to insert connection");