imported patch cubeb-pulse-improvement draft
authorAlex Chronopoulos <achronop@gmail.com>
Mon, 04 Jan 2016 19:18:16 +0200
changeset 321205 07aeae5b63b4e37cce9b1b8153d2156427f3de96
parent 321204 ffe4d752ee669b0efdfae105bacdd51e6bd1e697
child 321206 d0c7f02c8e47f6e385d597cbd32c9d7148fd73ae
push id9349
push userrjesup@wgate.com
push dateWed, 13 Jan 2016 06:48:48 +0000
milestone46.0a1
imported patch cubeb-pulse-improvement
media/libcubeb/src/cubeb_pulse.c
--- a/media/libcubeb/src/cubeb_pulse.c
+++ b/media/libcubeb/src/cubeb_pulse.c
@@ -70,25 +70,31 @@
   X(pa_threaded_mainloop_unlock)                \
   X(pa_threaded_mainloop_wait)                  \
   X(pa_usec_to_bytes)                           \
   X(pa_stream_set_read_callback)                \
   X(pa_stream_connect_record)                   \
   X(pa_stream_readable_size)                    \
   X(pa_stream_peek)                             \
   X(pa_stream_drop)                             \
-  X(pa_stream_writable_size)                    \
-  X(pa_stream_trigger)                          \
+  X(pa_stream_get_buffer_attr)                  \
 
 #define MAKE_TYPEDEF(x) static typeof(x) * cubeb_##x;
 LIBPULSE_API_VISIT(MAKE_TYPEDEF);
 #undef MAKE_TYPEDEF
 #endif
 
-#define MIN_LATENCY_MS 30
+//#define LOGGING_ENABLED
+#ifdef LOGGING_ENABLED
+#define LOG(...) do {                           \
+    fprintf(stderr, __VA_ARGS__);               \
+  } while(0)
+#else
+#define LOG(...)
+#endif
 
 static struct cubeb_ops const pulse_ops;
 
 struct cubeb {
   struct cubeb_ops const * ops;
   void * libpulse;
   pa_threaded_mainloop * mainloop;
   pa_context * context;
@@ -101,16 +107,17 @@ struct cubeb_stream {
   cubeb * context;
   pa_stream * output_stream;
   pa_stream * input_stream;
   cubeb_data_callback data_callback;
   cubeb_state_callback state_callback;
   void * user_ptr;
   pa_time_event * drain_timer;
   pa_sample_spec output_sample_spec;
+  pa_sample_spec input_sample_spec;
   int shutdown;
   float volume;
 };
 
 const float PULSE_NO_GAIN = -1.0;
 
 enum cork_state {
   UNCORK = 0,
@@ -182,37 +189,43 @@ stream_state_callback(pa_stream * s, voi
 
 static void
 write_to_output(pa_stream * s, void* input_data, size_t nbytes, cubeb_stream * stm)
 {
   void * buffer;
   size_t size;
   int r;
   long got;
-  size_t towrite;
+  size_t towrite, read_offset;
   size_t frame_size;
 
   frame_size = WRAP(pa_frame_size)(&stm->output_sample_spec);
   assert(nbytes % frame_size == 0);
 
   towrite = nbytes;
+  read_offset = 0;
   while (towrite) {
     size = towrite;
     r = WRAP(pa_stream_begin_write)(s, &buffer, &size);
     assert(r == 0);
     assert(size > 0);
     assert(size % frame_size == 0);
 
-//    printf("data cb buffer size %zd\n", size);
-    got = stm->data_callback(stm, stm->user_ptr, input_data, buffer, size / frame_size);
+    LOG("Data callback offered output buffer size %zd, read_offset=%zd\n", size, read_offset);
+    got = stm->data_callback(stm, stm->user_ptr, (uint8_t*)input_data + read_offset, buffer, size / frame_size);
     if (got < 0) {
       WRAP(pa_stream_cancel_write)(s);
       stm->shutdown = 1;
       return;
     }
+    // If more iterations move offset of read buffer
+    if (input_data){
+      size_t in_frame_size = WRAP(pa_frame_size)(&stm->input_sample_spec);
+      read_offset += (size / frame_size) * in_frame_size;
+    }
 
     if (stm->volume != PULSE_NO_GAIN) {
       uint32_t samples =  size * stm->output_sample_spec.channels / frame_size ;
 
       if (stm->output_sample_spec.format == PA_SAMPLE_S16BE ||
           stm->output_sample_spec.format == PA_SAMPLE_S16LE) {
         short * b = buffer;
         for (uint32_t i = 0; i < samples; i++) {
@@ -245,87 +258,86 @@ write_to_output(pa_stream * s, void* inp
     }
 
     towrite -= size;
   }
 
   assert(towrite == 0);
 }
 
+static int
+read_from_input(pa_stream* s, const void** buffer, size_t* size)
+{
+  if (WRAP(pa_stream_readable_size)(s) > 0) {
+    if (WRAP(pa_stream_peek)(s, buffer, size) < 0) {
+      return -1;
+    }
+  }
+  return WRAP(pa_stream_readable_size)(s);
+}
+
 static void
-stream_request_callback(pa_stream * s, size_t nbytes, void * u)
+stream_write_callback(pa_stream * s, size_t nbytes, void * u)
 {
-//  printf("-- write size %zd -->\n", nbytes);
+  LOG("Output callback to be written buffer size %zd\n", nbytes);
   cubeb_stream * stm;
   stm = u;
   if (stm->shutdown) {
     return;
   }
 
-  // input/capture + output/record operation
-  // return here it is handled by read callback
-  if (stm->input_stream) {
-    return;
+  if (!stm->input_stream){
+    // Output/record only operation.
+    // Write directly to output
+    assert(!stm->input_stream && stm->output_stream);
+    write_to_output(s, NULL, nbytes, stm);
   }
-
-  // Output/record only operation
-  assert(!stm->input_stream && stm->output_stream);
-  write_to_output(s, NULL, nbytes, stm);
 }
 
 static void
 stream_read_callback(pa_stream * s, size_t nbytes, void * u)
 {
-//  printf("<-- read size %zd -- ", nbytes);
+  LOG("Input callback buffer size %zd\n", nbytes);
+
   cubeb_stream * stm;
   stm = u;
   if (stm->shutdown) {
     return;
   }
 
-  while (WRAP(pa_stream_readable_size)(s) > 0) {
-    const void *data;
-    size_t read_size;
-    if (WRAP(pa_stream_peek)(s, &data, &read_size) < 0) {
-      return;
-    }
-
+  const void *read_data=NULL;
+  size_t read_size;
+  while (read_from_input(s, &read_data, &read_size) > 0) {
     const pa_sample_spec* in_ss =  WRAP(pa_stream_get_sample_spec)(s);
     size_t in_frame_size = WRAP(pa_frame_size)(in_ss);
     size_t read_frames = read_size / in_frame_size;
 
     cubeb_stream* stm = u;
     if (stm->output_stream) {
       // input/capture + output/record operation
+      void* read_buffer = (void*)read_data;
+      // Write directly to output
       size_t out_frame_size = WRAP(pa_frame_size)(&stm->output_sample_spec);
       size_t write_size = read_frames * out_frame_size;
-      size_t writable_size = WRAP(pa_stream_writable_size)(stm->output_stream);
-
-//      printf("writable size %zd\n", writable_size);
-      void* read_buffer = (void*)data;
-      if (writable_size< write_size) {
-        // Trancate read
-        write_to_output(stm->output_stream, read_buffer, writable_size, stm);
-        // Trigger to play output stream.
-        WRAP(pa_stream_trigger)(stm->output_stream, NULL, NULL);
-      } else {
-        write_to_output(stm->output_stream, read_buffer, write_size, stm);
-      }
+      // Write input buffer directly to output
+      write_to_output(stm->output_stream, read_buffer, write_size, stm);
     } else {
-//      printf("\n");
       // input/capture only operation. Call callback directly
-      void* read_buffer = (void*)data;
+      void* read_buffer = (void*)read_data;
       if (stm->data_callback(stm, stm->user_ptr, read_buffer, NULL, read_frames) < 0) {
         WRAP(pa_stream_cancel_write)(s);
         stm->shutdown = 1;
-        return;
+        break;
       }
     }
+    WRAP(pa_stream_drop)(s);
 
-    WRAP(pa_stream_drop)(s);
+    if (stm->shutdown) {
+      return;
+    }
   }
 }
 
 static int
 wait_until_context_ready(cubeb * ctx)
 {
   for (;;) {
     pa_context_state_t state = WRAP(pa_context_get_state)(ctx->context);
@@ -673,66 +685,89 @@ pulse_stream_init(cubeb * context,
   battr.minreq = -1;
   battr.fragsize = -1;
 
   WRAP(pa_threaded_mainloop_lock)(stm->context->mainloop);
   if (output_stream_params) {
     ss.format = cube_to_pulse_format(output_stream_params->format);
     ss.rate = output_stream_params->rate;
     ss.channels = output_stream_params->channels;
+    stm->output_sample_spec = ss;
 
     // update buffer attributes
-    stm->output_sample_spec = ss;
-    unsigned int latency_min = (latency < MIN_LATENCY_MS)? MIN_LATENCY_MS: latency;
-    battr.tlength = WRAP(pa_usec_to_bytes)(latency_min * PA_USEC_PER_MSEC, &stm->output_sample_spec);
+    battr.tlength = WRAP(pa_usec_to_bytes)(latency * PA_USEC_PER_MSEC, &stm->output_sample_spec);
     battr.minreq = battr.tlength / 4;
     battr.fragsize = battr.minreq;
 
+    LOG("Stream init requested buffer attributes maxlength %u, tlength %u, prebuf %u, minreq %u, fragsize %u\n",battr.maxlength, battr.tlength,
+    		battr.prebuf, battr.minreq, battr.fragsize);
+
     stm->output_stream = WRAP(pa_stream_new)(stm->context->context, stream_name, &ss, NULL);
     if (!stm->output_stream) {
       WRAP(pa_threaded_mainloop_unlock)(stm->context->mainloop);
       pulse_stream_destroy(stm);
       return CUBEB_ERROR;
     }
     WRAP(pa_stream_set_state_callback)(stm->output_stream, stream_state_callback, stm);
-    WRAP(pa_stream_set_write_callback)(stm->output_stream, stream_request_callback, stm);
+    WRAP(pa_stream_set_write_callback)(stm->output_stream, stream_write_callback, stm);
     WRAP(pa_stream_connect_playback)(stm->output_stream,
-    								 output_stream_params->devid,
-									 &battr,
+    								                 output_stream_params->devid,
+    								                 &battr,
                                      PA_STREAM_AUTO_TIMING_UPDATE | PA_STREAM_INTERPOLATE_TIMING |
-                                     PA_STREAM_START_CORKED,
+                                     PA_STREAM_START_CORKED | PA_STREAM_ADJUST_LATENCY,
                                      NULL, NULL);
   }
 
   // Set up input stream
   if (input_stream_params){
     pa_sample_spec in_ss;
     in_ss.channels = input_stream_params->channels;
     in_ss.rate = input_stream_params->rate;
     in_ss.format = cube_to_pulse_format(input_stream_params->format);
+    stm->input_sample_spec = in_ss;
+
     stm->input_stream = WRAP(pa_stream_new)(stm->context->context, "input stream", &in_ss, NULL);
     if (!stm->input_stream) {
       WRAP(pa_threaded_mainloop_unlock)(stm->context->mainloop);
       pulse_stream_destroy(stm);
       return CUBEB_ERROR;
     }
     WRAP(pa_stream_set_state_callback)(stm->input_stream, stream_state_callback, stm);
     WRAP(pa_stream_set_read_callback)(stm->input_stream, stream_read_callback, stm);
     WRAP(pa_stream_connect_record)(stm->input_stream,
-    							   input_stream_params->devid,
-								   &battr,
-                                   PA_STREAM_START_CORKED);
+                                   input_stream_params->devid,
+                                   &battr,
+                                   PA_STREAM_AUTO_TIMING_UPDATE | PA_STREAM_INTERPOLATE_TIMING |
+                                   PA_STREAM_NOT_MONOTONIC |
+                                   PA_STREAM_START_CORKED | PA_STREAM_ADJUST_LATENCY);
   }
 
   r = wait_until_stream_ready(stm);
   if (r == 0) {
     /* force a timing update now, otherwise timing info does not become valid
        until some point after initialization has completed. */
     r = stream_update_timing_info(stm);
   }
+
+#ifdef LOGGING_ENABLED
+  if (output_stream_params){
+    const pa_buffer_attr* output_att;
+    output_att = WRAP(pa_stream_get_buffer_attr)(stm->output_stream);
+    LOG("Output buffer attributes maxlength %u, tlength %u, prebuf %u, minreq %u, fragsize %u\n",output_att->maxlength, output_att->tlength,
+        output_att->prebuf, output_att->minreq, output_att->fragsize);
+  }
+
+  if (input_stream_params){
+    const pa_buffer_attr* input_att;
+    input_att = WRAP(pa_stream_get_buffer_attr)(stm->input_stream);
+    LOG("Input buffer attributes maxlength %u, tlength %u, prebuf %u, minreq %u, fragsize %u\n",input_att->maxlength, input_att->tlength,
+        input_att->prebuf, input_att->minreq, input_att->fragsize);
+  }
+#endif
+
   WRAP(pa_threaded_mainloop_unlock)(stm->context->mainloop);
 
   if (r != 0) {
     pulse_stream_destroy(stm);
     return CUBEB_ERROR;
   }
 
   *stream = stm;
@@ -941,19 +976,21 @@ pulse_ensure_dev_list_data_list_size (pu
         sizeof(cubeb_device_info) * list_data->max);
   }
 }
 
 static cubeb_device_state
 pulse_get_state_from_sink_port(pa_sink_port_info * info)
 {
   if (info != NULL) {
+#if PA_CHECK_VERSION(2, 0, 0)
     if (info->available == PA_PORT_AVAILABLE_NO)
       return CUBEB_DEVICE_STATE_UNPLUGGED;
     else /*if (info->available == PA_PORT_AVAILABLE_YES) + UNKNOWN */
+#endif
       return CUBEB_DEVICE_STATE_ENABLED;
   }
 
   return CUBEB_DEVICE_STATE_DISABLED;
 }
 
 static void
 pulse_sink_info_cb(pa_context * context, const pa_sink_info * info,
@@ -997,19 +1034,21 @@ pulse_sink_info_cb(pa_context * context,
   pulse_ensure_dev_list_data_list_size (list_data);
   list_data->devinfo[list_data->count++] = devinfo;
 }
 
 static cubeb_device_state
 pulse_get_state_from_source_port(pa_source_port_info * info)
 {
   if (info != NULL) {
+#if PA_CHECK_VERSION(2, 0, 0)
     if (info->available == PA_PORT_AVAILABLE_NO)
       return CUBEB_DEVICE_STATE_UNPLUGGED;
     else /*if (info->available == PA_PORT_AVAILABLE_YES) + UNKNOWN */
+#endif
       return CUBEB_DEVICE_STATE_ENABLED;
   }
 
   return CUBEB_DEVICE_STATE_DISABLED;
 }
 
 static void
 pulse_source_info_cb(pa_context * context, const pa_source_info * info,
@@ -1096,17 +1135,17 @@ pulse_enumerate_devices(cubeb * context,
         pulse_source_info_cb, &user_data);
     if (o) {
       operation_wait(context, NULL, o);
       WRAP(pa_operation_unref)(o);
     }
   }
 
   *collection = malloc(sizeof(cubeb_device_collection) +
-      sizeof(cubeb_device_info*) * user_data.count);
+      sizeof(cubeb_device_info*) * (user_data.count > 0 ? user_data.count - 1 : 0));
   (*collection)->count = user_data.count;
   for (i = 0; i < user_data.count; i++)
     (*collection)->device[i] = user_data.devinfo[i];
 
   free(user_data.default_sink_name);
   free(user_data.default_source_name);
   free(user_data.devinfo);
   return CUBEB_OK;