Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
174 changes: 119 additions & 55 deletions pulse/pacat-simple-vchan.c
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,9 @@ const pa_buffer_attr * bufattr = NULL;

static int verbose = 1;

static bool connect_disconnect_rec_stream(
struct userdata *u, bool state_callback, bool new_rec_allowed);

static void context_state_callback(pa_context *c, void *userdata);

static void playback_stream_drain(struct userdata *u);
Expand Down Expand Up @@ -332,6 +335,7 @@ static void playback_stream_drain_cb(pa_stream *s, int success, void *userdata)
pacat_log("Playback vchan empty and playback stream drained. Corking playback stream.");
u->pending_play_cork = false;
pa_stream_cork(u->play_stream, 1, NULL, u);
u->play_corked = true;
} else if (space_in_vchan < 0) {
pacat_log("libvchan_data_ready() failed: return value %d", space_in_vchan);
quit(u, 1);
Expand All @@ -354,12 +358,13 @@ static void send_rec_data(pa_stream *s, struct userdata *u, bool discard_overrun
size_t rec_buffer_length, rec_buffer_index = 0;
int l, vchan_buffer_space;

assert(s);
assert(u);

if (!u->rec_allowed)
if (!u->rec_allowed || u->rec_stream_connect_in_progress)
return;

assert(s);

if (pa_stream_readable_size(s) <= 0)
return;

Expand Down Expand Up @@ -448,8 +453,6 @@ static void vchan_rec_callback(pa_mainloop_api *UNUSED(a),
uint32_t cmd;
while (libvchan_data_ready(u->rec_ctrl) >= (int)sizeof(cmd)) {
if (libvchan_read(u->rec_ctrl, (char*)&cmd, sizeof(cmd)) != sizeof(cmd)) {
if (!pa_stream_is_corked(u->rec_stream))
pa_stream_cork(u->rec_stream, 1, NULL, u);
fprintf(stderr, "Failed to read from vchan\n");
quit(u, 1);
return;
Expand All @@ -461,10 +464,10 @@ static void vchan_rec_callback(pa_mainloop_api *UNUSED(a),
if (!qdb_write(u->qdb, u->qdb_request_path, "1", 1)) {
pacat_log("Failed to write QubesDB %s: %s", u->qdb_request_path, strerror(errno));
}
if (u->rec_allowed) {
if (u->rec_allowed && !u->rec_stream_connect_in_progress) {
pacat_log("Recording start");
pa_stream_cork(u->rec_stream, 0, NULL, u);
} else
} else if (!u->rec_allowed)
pacat_log("Recording requested but not allowed");
g_mutex_unlock(&u->prop_mutex);
break;
Expand All @@ -474,13 +477,19 @@ static void vchan_rec_callback(pa_mainloop_api *UNUSED(a),
if (!qdb_write(u->qdb, u->qdb_request_path, "0", 1)) {
pacat_log("Failed to write QubesDB %s: %s", u->qdb_request_path, strerror(errno));
}
if (!pa_stream_is_corked(u->rec_stream)) {
if (u->rec_allowed && !u->rec_stream_connect_in_progress) {
pacat_log("Recording stop");
pa_stream_cork(u->rec_stream, 1, NULL, u);
}
g_mutex_unlock(&u->prop_mutex);
break;
case QUBES_PA_SINK_CORK_CMD:
if (pa_stream_is_corked(u->play_stream)) {
/* store it in case the stream isn't connected yet */
u->play_corked = true;
pacat_log("Stream already corked");
break;
}
u->pending_play_cork = true;
if (libvchan_data_ready(u->play_ctrl) > 0) {
pacat_log("Deferred stream drain");
Expand All @@ -496,6 +505,7 @@ static void vchan_rec_callback(pa_mainloop_api *UNUSED(a),
* do not clear u->draining, as draining while a drain
* operation is in progress is not safe
*/
u->play_corked = false;
pa_stream_cork(u->play_stream, 0, NULL, u);
break;
default:
Expand All @@ -520,10 +530,11 @@ static void stream_state_callback(pa_stream *s, void *userdata) {
assert(u->mainloop_api);
u->mainloop_api->io_free(u->play_stdio_event);
}
if (u->rec_stdio_event && u->rec_stream == s) {
if (u->rec_stream == s) {
pacat_log("rec stream terminated");
assert(u->mainloop_api);
u->mainloop_api->io_free(u->rec_stdio_event);
pa_stream_unref(u->rec_stream);
u->rec_stream = NULL;
connect_disconnect_rec_stream(u, true, false);
}
break;

Expand Down Expand Up @@ -557,13 +568,16 @@ static void stream_state_callback(pa_stream *s, void *userdata) {
pacat_log("io_new play failed");
quit(u, 1);
}
/* restore corked state after PA (re)connect */
if (u->play_corked)
pa_stream_cork(u->play_stream, 1, NULL, u);
}
if (u->rec_stream == s) {
u->rec_stdio_event = u->mainloop_api->io_new(u->mainloop_api,
libvchan_fd_for_select(u->rec_ctrl), PA_IO_EVENT_INPUT, vchan_rec_callback, u);
if (!u->rec_stdio_event) {
pacat_log("io_new rec failed");
quit(u, 1);
if (connect_disconnect_rec_stream(u, true, true)) {
if (u->rec_requested) {
pacat_log("Recording start");
pa_stream_cork(u->rec_stream, 0, NULL, NULL);
}
}
}
break;
Expand Down Expand Up @@ -638,13 +652,84 @@ static void stream_event_callback(pa_stream *s, const char *name, pa_proplist *p
pa_xfree(t);
}

/*
* Connect/disconnect rec stream based on new_rec_allowed. This can be called
* from a state callback (if state_callback=true) to finalize
* connecting/disconnecting.
* Returns if stream is in the desired state.
*/
static bool connect_disconnect_rec_stream_locked(
struct userdata *u, bool state_callback, bool new_rec_allowed)
{
if (state_callback) {
if (new_rec_allowed == u->rec_allowed) {
if (!qdb_write(u->qdb, u->qdb_status_path, u->rec_allowed ? "1" : "0", 1)) {
pacat_log("Failed to write QubesDB %s: %s", u->qdb_status_path, strerror(errno));
}
u->rec_stream_connect_in_progress = false;
return true;
}
}
u->rec_stream_connect_in_progress = true;
if (new_rec_allowed) {
pa_stream_flags_t flags =
PA_STREAM_START_CORKED | PA_STREAM_ADJUST_LATENCY;

/* setup recording stream */
assert(!u->rec_stream);

if (!(u->rec_stream = pa_stream_new_with_proplist(u->context, "record", &sample_spec, &u->channel_map, u->proplist))) {
pacat_log("rec pa_stream_new() failed: %s", pa_strerror(pa_context_errno(u->context)));
quit(u, 1);
}

pa_stream_set_state_callback(u->rec_stream, stream_state_callback, u);
/* pa_stream_set_write_callback */
pa_stream_set_read_callback(u->rec_stream, stream_read_callback, u);
pa_stream_set_suspended_callback(u->rec_stream, stream_suspended_callback, u);
pa_stream_set_moved_callback(u->rec_stream, stream_moved_callback, u);
pa_stream_set_underflow_callback(u->rec_stream, stream_underflow_callback, u);
pa_stream_set_overflow_callback(u->rec_stream, stream_overflow_callback, u);
pa_stream_set_started_callback(u->rec_stream, stream_started_callback, u);
pa_stream_set_event_callback(u->rec_stream, stream_event_callback, u);
pa_stream_set_buffer_attr_callback(u->rec_stream, stream_buffer_attr_callback, u);

if (pa_stream_connect_record(u->rec_stream,
u->rec_device,
bufattr,
flags) < 0) {
pacat_log("pa_stream_connect_record() failed: %s",
pa_strerror(pa_context_errno(u->context)));
u->rec_allowed = false;
u->rec_stream_connect_in_progress = false;
}
} else {
if (pa_stream_disconnect(u->rec_stream) < 0) {
pacat_log("pa_stream_disconnect() failed: %s",
pa_strerror(pa_context_errno(u->context)));
u->rec_stream_connect_in_progress = false;
}
}
return false;
}

static bool connect_disconnect_rec_stream(
struct userdata *u, bool state_callback, bool new_rec_allowed)
{
bool ret;

g_mutex_lock(&u->prop_mutex);
ret = connect_disconnect_rec_stream_locked(u, state_callback, new_rec_allowed);
g_mutex_unlock(&u->prop_mutex);
return ret;
}



/* This is called whenever the context status changes */
static void context_state_callback(pa_context *c, void *userdata) {
struct userdata *u = userdata;
pa_stream_flags_t flags = 0;
pa_channel_map channel_map;

assert(c);

Expand All @@ -656,9 +741,9 @@ static void context_state_callback(pa_context *c, void *userdata) {

case PA_CONTEXT_READY:

pa_channel_map_init_extend(&channel_map, sample_spec.channels, PA_CHANNEL_MAP_DEFAULT);
pa_channel_map_init_extend(&u->channel_map, sample_spec.channels, PA_CHANNEL_MAP_DEFAULT);

if (!pa_channel_map_compatible(&channel_map, &sample_spec)) {
if (!pa_channel_map_compatible(&u->channel_map, &sample_spec)) {
pacat_log("Channel map doesn't match sample specification");
goto fail;
}
Expand All @@ -669,7 +754,7 @@ static void context_state_callback(pa_context *c, void *userdata) {
if (verbose)
pacat_log("Connection established.%s", CLEAR_LINE);

if (!(u->play_stream = pa_stream_new_with_proplist(c, "playback", &sample_spec, &channel_map, u->proplist))) {
if (!(u->play_stream = pa_stream_new_with_proplist(c, "playback", &sample_spec, &u->channel_map, u->proplist))) {
pacat_log("play pa_stream_new() failed: %s", pa_strerror(pa_context_errno(c)));
goto fail;
}
Expand All @@ -693,37 +778,25 @@ static void context_state_callback(pa_context *c, void *userdata) {
goto fail;
}

/* setup recording stream */
assert(!u->rec_stream);

if (!(u->rec_stream = pa_stream_new_with_proplist(c, "record", &sample_spec, &channel_map, u->proplist))) {
pacat_log("rec pa_stream_new() failed: %s", pa_strerror(pa_context_errno(c)));
goto fail;
}

pa_stream_set_state_callback(u->rec_stream, stream_state_callback, u);
/* pa_stream_set_write_callback */
pa_stream_set_read_callback(u->rec_stream, stream_read_callback, u);
pa_stream_set_suspended_callback(u->rec_stream, stream_suspended_callback, u);
pa_stream_set_moved_callback(u->rec_stream, stream_moved_callback, u);
pa_stream_set_underflow_callback(u->rec_stream, stream_underflow_callback, u);
pa_stream_set_overflow_callback(u->rec_stream, stream_overflow_callback, u);
pa_stream_set_started_callback(u->rec_stream, stream_started_callback, u);
pa_stream_set_event_callback(u->rec_stream, stream_event_callback, u);
pa_stream_set_buffer_attr_callback(u->rec_stream, stream_buffer_attr_callback, u);

flags = PA_STREAM_START_CORKED | PA_STREAM_ADJUST_LATENCY;
u->rec_requested = 0;

if (pa_stream_connect_record(u->rec_stream, u->rec_device, bufattr, flags) < 0) {
pacat_log("pa_stream_connect_record() failed: %s", pa_strerror(pa_context_errno(c)));
goto fail;
/* and start watching for recording requests */
u->rec_stdio_event = u->mainloop_api->io_new(u->mainloop_api,
libvchan_fd_for_select(u->rec_ctrl), PA_IO_EVENT_INPUT, vchan_rec_callback, u);
if (!u->rec_stdio_event) {
pacat_log("io_new rec failed");
quit(u, 1);
}

if (u->rec_allowed)
connect_disconnect_rec_stream(u, false, u->rec_allowed);
break;

case PA_CONTEXT_TERMINATED:
pacat_log("pulseaudio connection terminated");
if (u->rec_stdio_event) {
assert(u->mainloop_api);
u->mainloop_api->io_free(u->rec_stdio_event);
}
quit(u, 0);
break;

Expand Down Expand Up @@ -865,18 +938,9 @@ static void control_socket_callback(pa_mainloop_api *UNUSED(a),
g_mutex_lock(&u->prop_mutex);
if (new_rec_allowed != u->rec_allowed) {
u->rec_allowed = new_rec_allowed;
if (!u->rec_stream_connect_in_progress)
connect_disconnect_rec_stream_locked(u, false, new_rec_allowed);
pacat_log("Setting audio-input to %s", u->rec_allowed ? "enabled" : "disabled");
if (u->rec_allowed && u->rec_requested) {
pacat_log("Recording start");
pa_stream_cork(u->rec_stream, 0, NULL, NULL);
} else if (!u->rec_allowed && u->rec_stream &&
(u->rec_requested || !pa_stream_is_corked(u->rec_stream))) {
pacat_log("Recording stop");
pa_stream_cork(u->rec_stream, 1, NULL, NULL);
}
if (!qdb_write(u->qdb, u->qdb_status_path, new_rec_allowed ? "1" : "0", 1)) {
pacat_log("Failed to write QubesDB %s: %s", u->qdb_status_path, strerror(errno));
}
}
g_mutex_unlock(&u->prop_mutex);
}
Expand Down Expand Up @@ -929,7 +993,7 @@ static int setup_control(struct userdata *u) {
if (socket_fd < 0)
goto fail;

rec_allowed = is_rec_allowed_from_qdb(u);
rec_allowed = is_rec_allowed_from_qdb(u);
if (rec_allowed >= 0) {
pacat_log("mic allowed: initial value read from Qubes DB '%d'", rec_allowed);
u->rec_allowed = rec_allowed;
Expand Down
3 changes: 3 additions & 0 deletions pulse/pacat-simple-vchan.h
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ struct userdata {
pa_io_event* play_ctrl_event;
pa_io_event* rec_ctrl_event;

pa_channel_map channel_map;
GMutex prop_mutex;
qdb_handle_t qdb;
qdb_handle_t watch_qdb; // separate connection for watches
Expand All @@ -38,11 +39,13 @@ struct userdata {
char *qdb_request_path;
int control_socket_fd;
pa_io_event* control_socket_event;
bool rec_stream_connect_in_progress;
bool rec_allowed;
bool rec_requested;
bool never_block;
bool pending_play_cork;
bool draining;
bool play_corked;

int domid;
int play_watch_fd, rec_watch_fd;
Expand Down