--- a/media/libcubeb/src/cubeb_pulse.c
+++ b/media/libcubeb/src/cubeb_pulse.c
@@ -70,25 +70,31 @@
X(pa_threaded_mainloop_unlock) \
X(pa_threaded_mainloop_wait) \
X(pa_usec_to_bytes) \
X(pa_stream_set_read_callback) \
X(pa_stream_connect_record) \
X(pa_stream_readable_size) \
X(pa_stream_peek) \
X(pa_stream_drop) \
- X(pa_stream_writable_size) \
- X(pa_stream_trigger) \
+ X(pa_stream_get_buffer_attr) \
#define MAKE_TYPEDEF(x) static typeof(x) * cubeb_##x;
LIBPULSE_API_VISIT(MAKE_TYPEDEF);
#undef MAKE_TYPEDEF
#endif
-#define MIN_LATENCY_MS 30
+//#define LOGGING_ENABLED
+#ifdef LOGGING_ENABLED
+#define LOG(...) do { \
+ fprintf(stderr, __VA_ARGS__); \
+ } while(0)
+#else
+#define LOG(...)
+#endif
static struct cubeb_ops const pulse_ops;
struct cubeb {
struct cubeb_ops const * ops;
void * libpulse;
pa_threaded_mainloop * mainloop;
pa_context * context;
@@ -101,16 +107,17 @@ struct cubeb_stream {
cubeb * context;
pa_stream * output_stream;
pa_stream * input_stream;
cubeb_data_callback data_callback;
cubeb_state_callback state_callback;
void * user_ptr;
pa_time_event * drain_timer;
pa_sample_spec output_sample_spec;
+ pa_sample_spec input_sample_spec;
int shutdown;
float volume;
};
const float PULSE_NO_GAIN = -1.0;
enum cork_state {
UNCORK = 0,
@@ -182,37 +189,43 @@ stream_state_callback(pa_stream * s, voi
static void
write_to_output(pa_stream * s, void* input_data, size_t nbytes, cubeb_stream * stm)
{
void * buffer;
size_t size;
int r;
long got;
- size_t towrite;
+ size_t towrite, read_offset;
size_t frame_size;
frame_size = WRAP(pa_frame_size)(&stm->output_sample_spec);
assert(nbytes % frame_size == 0);
towrite = nbytes;
+ read_offset = 0;
while (towrite) {
size = towrite;
r = WRAP(pa_stream_begin_write)(s, &buffer, &size);
assert(r == 0);
assert(size > 0);
assert(size % frame_size == 0);
-// printf("data cb buffer size %zd\n", size);
- got = stm->data_callback(stm, stm->user_ptr, input_data, buffer, size / frame_size);
+ LOG("Data callback offered output buffer size %zd, read_offset=%zd\n", size, read_offset);
+ got = stm->data_callback(stm, stm->user_ptr, (uint8_t*)input_data + read_offset, buffer, size / frame_size);
if (got < 0) {
WRAP(pa_stream_cancel_write)(s);
stm->shutdown = 1;
return;
}
+ // If more iterations move offset of read buffer
+ if (input_data){
+ size_t in_frame_size = WRAP(pa_frame_size)(&stm->input_sample_spec);
+ read_offset += (size / frame_size) * in_frame_size;
+ }
if (stm->volume != PULSE_NO_GAIN) {
uint32_t samples = size * stm->output_sample_spec.channels / frame_size ;
if (stm->output_sample_spec.format == PA_SAMPLE_S16BE ||
stm->output_sample_spec.format == PA_SAMPLE_S16LE) {
short * b = buffer;
for (uint32_t i = 0; i < samples; i++) {
@@ -245,87 +258,86 @@ write_to_output(pa_stream * s, void* inp
}
towrite -= size;
}
assert(towrite == 0);
}
+static int
+read_from_input(pa_stream* s, const void** buffer, size_t* size)
+{
+ if (WRAP(pa_stream_readable_size)(s) > 0) {
+ if (WRAP(pa_stream_peek)(s, buffer, size) < 0) {
+ return -1;
+ }
+ }
+ return WRAP(pa_stream_readable_size)(s);
+}
+
static void
-stream_request_callback(pa_stream * s, size_t nbytes, void * u)
+stream_write_callback(pa_stream * s, size_t nbytes, void * u)
{
-// printf("-- write size %zd -->\n", nbytes);
+ LOG("Output callback to be written buffer size %zd\n", nbytes);
cubeb_stream * stm;
stm = u;
if (stm->shutdown) {
return;
}
- // input/capture + output/record operation
- // return here it is handled by read callback
- if (stm->input_stream) {
- return;
+ if (!stm->input_stream){
+ // Output/record only operation.
+ // Write directly to output
+ assert(!stm->input_stream && stm->output_stream);
+ write_to_output(s, NULL, nbytes, stm);
}
-
- // Output/record only operation
- assert(!stm->input_stream && stm->output_stream);
- write_to_output(s, NULL, nbytes, stm);
}
static void
stream_read_callback(pa_stream * s, size_t nbytes, void * u)
{
-// printf("<-- read size %zd -- ", nbytes);
+ LOG("Input callback buffer size %zd\n", nbytes);
+
cubeb_stream * stm;
stm = u;
if (stm->shutdown) {
return;
}
- while (WRAP(pa_stream_readable_size)(s) > 0) {
- const void *data;
- size_t read_size;
- if (WRAP(pa_stream_peek)(s, &data, &read_size) < 0) {
- return;
- }
-
+ const void *read_data=NULL;
+ size_t read_size;
+ while (read_from_input(s, &read_data, &read_size) > 0) {
const pa_sample_spec* in_ss = WRAP(pa_stream_get_sample_spec)(s);
size_t in_frame_size = WRAP(pa_frame_size)(in_ss);
size_t read_frames = read_size / in_frame_size;
cubeb_stream* stm = u;
if (stm->output_stream) {
// input/capture + output/record operation
+ void* read_buffer = (void*)read_data;
+ // Write directly to output
size_t out_frame_size = WRAP(pa_frame_size)(&stm->output_sample_spec);
size_t write_size = read_frames * out_frame_size;
- size_t writable_size = WRAP(pa_stream_writable_size)(stm->output_stream);
-
-// printf("writable size %zd\n", writable_size);
- void* read_buffer = (void*)data;
- if (writable_size< write_size) {
- // Trancate read
- write_to_output(stm->output_stream, read_buffer, writable_size, stm);
- // Trigger to play output stream.
- WRAP(pa_stream_trigger)(stm->output_stream, NULL, NULL);
- } else {
- write_to_output(stm->output_stream, read_buffer, write_size, stm);
- }
+ // Write input buffer directly to output
+ write_to_output(stm->output_stream, read_buffer, write_size, stm);
} else {
-// printf("\n");
// input/capture only operation. Call callback directly
- void* read_buffer = (void*)data;
+ void* read_buffer = (void*)read_data;
if (stm->data_callback(stm, stm->user_ptr, read_buffer, NULL, read_frames) < 0) {
WRAP(pa_stream_cancel_write)(s);
stm->shutdown = 1;
- return;
+ break;
}
}
+ WRAP(pa_stream_drop)(s);
- WRAP(pa_stream_drop)(s);
+ if (stm->shutdown) {
+ return;
+ }
}
}
static int
wait_until_context_ready(cubeb * ctx)
{
for (;;) {
pa_context_state_t state = WRAP(pa_context_get_state)(ctx->context);
@@ -673,66 +685,89 @@ pulse_stream_init(cubeb * context,
battr.minreq = -1;
battr.fragsize = -1;
WRAP(pa_threaded_mainloop_lock)(stm->context->mainloop);
if (output_stream_params) {
ss.format = cube_to_pulse_format(output_stream_params->format);
ss.rate = output_stream_params->rate;
ss.channels = output_stream_params->channels;
+ stm->output_sample_spec = ss;
// update buffer attributes
- stm->output_sample_spec = ss;
- unsigned int latency_min = (latency < MIN_LATENCY_MS)? MIN_LATENCY_MS: latency;
- battr.tlength = WRAP(pa_usec_to_bytes)(latency_min * PA_USEC_PER_MSEC, &stm->output_sample_spec);
+ battr.tlength = WRAP(pa_usec_to_bytes)(latency * PA_USEC_PER_MSEC, &stm->output_sample_spec);
battr.minreq = battr.tlength / 4;
battr.fragsize = battr.minreq;
+ LOG("Stream init requested buffer attributes maxlength %u, tlength %u, prebuf %u, minreq %u, fragsize %u\n",battr.maxlength, battr.tlength,
+ battr.prebuf, battr.minreq, battr.fragsize);
+
stm->output_stream = WRAP(pa_stream_new)(stm->context->context, stream_name, &ss, NULL);
if (!stm->output_stream) {
WRAP(pa_threaded_mainloop_unlock)(stm->context->mainloop);
pulse_stream_destroy(stm);
return CUBEB_ERROR;
}
WRAP(pa_stream_set_state_callback)(stm->output_stream, stream_state_callback, stm);
- WRAP(pa_stream_set_write_callback)(stm->output_stream, stream_request_callback, stm);
+ WRAP(pa_stream_set_write_callback)(stm->output_stream, stream_write_callback, stm);
WRAP(pa_stream_connect_playback)(stm->output_stream,
- output_stream_params->devid,
- &battr,
+ output_stream_params->devid,
+ &battr,
PA_STREAM_AUTO_TIMING_UPDATE | PA_STREAM_INTERPOLATE_TIMING |
- PA_STREAM_START_CORKED,
+ PA_STREAM_START_CORKED | PA_STREAM_ADJUST_LATENCY,
NULL, NULL);
}
// Set up input stream
if (input_stream_params){
pa_sample_spec in_ss;
in_ss.channels = input_stream_params->channels;
in_ss.rate = input_stream_params->rate;
in_ss.format = cube_to_pulse_format(input_stream_params->format);
+ stm->input_sample_spec = in_ss;
+
stm->input_stream = WRAP(pa_stream_new)(stm->context->context, "input stream", &in_ss, NULL);
if (!stm->input_stream) {
WRAP(pa_threaded_mainloop_unlock)(stm->context->mainloop);
pulse_stream_destroy(stm);
return CUBEB_ERROR;
}
WRAP(pa_stream_set_state_callback)(stm->input_stream, stream_state_callback, stm);
WRAP(pa_stream_set_read_callback)(stm->input_stream, stream_read_callback, stm);
WRAP(pa_stream_connect_record)(stm->input_stream,
- input_stream_params->devid,
- &battr,
- PA_STREAM_START_CORKED);
+ input_stream_params->devid,
+ &battr,
+ PA_STREAM_AUTO_TIMING_UPDATE | PA_STREAM_INTERPOLATE_TIMING |
+ PA_STREAM_NOT_MONOTONIC |
+ PA_STREAM_START_CORKED | PA_STREAM_ADJUST_LATENCY);
}
r = wait_until_stream_ready(stm);
if (r == 0) {
/* force a timing update now, otherwise timing info does not become valid
until some point after initialization has completed. */
r = stream_update_timing_info(stm);
}
+
+#ifdef LOGGING_ENABLED
+ if (output_stream_params){
+ const pa_buffer_attr* output_att;
+ output_att = WRAP(pa_stream_get_buffer_attr)(stm->output_stream);
+ LOG("Output buffer attributes maxlength %u, tlength %u, prebuf %u, minreq %u, fragsize %u\n",output_att->maxlength, output_att->tlength,
+ output_att->prebuf, output_att->minreq, output_att->fragsize);
+ }
+
+ if (input_stream_params){
+ const pa_buffer_attr* input_att;
+ input_att = WRAP(pa_stream_get_buffer_attr)(stm->input_stream);
+ LOG("Input buffer attributes maxlength %u, tlength %u, prebuf %u, minreq %u, fragsize %u\n",input_att->maxlength, input_att->tlength,
+ input_att->prebuf, input_att->minreq, input_att->fragsize);
+ }
+#endif
+
WRAP(pa_threaded_mainloop_unlock)(stm->context->mainloop);
if (r != 0) {
pulse_stream_destroy(stm);
return CUBEB_ERROR;
}
*stream = stm;
@@ -941,19 +976,21 @@ pulse_ensure_dev_list_data_list_size (pu
sizeof(cubeb_device_info) * list_data->max);
}
}
static cubeb_device_state
pulse_get_state_from_sink_port(pa_sink_port_info * info)
{
if (info != NULL) {
+#if PA_CHECK_VERSION(2, 0, 0)
if (info->available == PA_PORT_AVAILABLE_NO)
return CUBEB_DEVICE_STATE_UNPLUGGED;
else /*if (info->available == PA_PORT_AVAILABLE_YES) + UNKNOWN */
+#endif
return CUBEB_DEVICE_STATE_ENABLED;
}
return CUBEB_DEVICE_STATE_DISABLED;
}
static void
pulse_sink_info_cb(pa_context * context, const pa_sink_info * info,
@@ -997,19 +1034,21 @@ pulse_sink_info_cb(pa_context * context,
pulse_ensure_dev_list_data_list_size (list_data);
list_data->devinfo[list_data->count++] = devinfo;
}
static cubeb_device_state
pulse_get_state_from_source_port(pa_source_port_info * info)
{
if (info != NULL) {
+#if PA_CHECK_VERSION(2, 0, 0)
if (info->available == PA_PORT_AVAILABLE_NO)
return CUBEB_DEVICE_STATE_UNPLUGGED;
else /*if (info->available == PA_PORT_AVAILABLE_YES) + UNKNOWN */
+#endif
return CUBEB_DEVICE_STATE_ENABLED;
}
return CUBEB_DEVICE_STATE_DISABLED;
}
static void
pulse_source_info_cb(pa_context * context, const pa_source_info * info,
@@ -1096,17 +1135,17 @@ pulse_enumerate_devices(cubeb * context,
pulse_source_info_cb, &user_data);
if (o) {
operation_wait(context, NULL, o);
WRAP(pa_operation_unref)(o);
}
}
*collection = malloc(sizeof(cubeb_device_collection) +
- sizeof(cubeb_device_info*) * user_data.count);
+ sizeof(cubeb_device_info*) * (user_data.count > 0 ? user_data.count - 1 : 0));
(*collection)->count = user_data.count;
for (i = 0; i < user_data.count; i++)
(*collection)->device[i] = user_data.devinfo[i];
free(user_data.default_sink_name);
free(user_data.default_source_name);
free(user_data.devinfo);
return CUBEB_OK;