diff --git a/pulse/pacat-simple-vchan.c b/pulse/pacat-simple-vchan.c index da06e9a5..cb592f44 100644 --- a/pulse/pacat-simple-vchan.c +++ b/pulse/pacat-simple-vchan.c @@ -92,6 +92,9 @@ const pa_buffer_attr * bufattr = NULL; static int verbose = 1; +static bool connect_disconnect_rec_stream( + struct userdata *u, bool state_callback, bool new_rec_allowed); + static void context_state_callback(pa_context *c, void *userdata); static void playback_stream_drain(struct userdata *u); @@ -332,6 +335,7 @@ static void playback_stream_drain_cb(pa_stream *s, int success, void *userdata) pacat_log("Playback vchan empty and playback stream drained. Corking playback stream."); u->pending_play_cork = false; pa_stream_cork(u->play_stream, 1, NULL, u); + u->play_corked = true; } else if (space_in_vchan < 0) { pacat_log("libvchan_data_ready() failed: return value %d", space_in_vchan); quit(u, 1); @@ -354,12 +358,13 @@ static void send_rec_data(pa_stream *s, struct userdata *u, bool discard_overrun size_t rec_buffer_length, rec_buffer_index = 0; int l, vchan_buffer_space; - assert(s); assert(u); - if (!u->rec_allowed) + if (!u->rec_allowed || u->rec_stream_connect_in_progress) return; + assert(s); + if (pa_stream_readable_size(s) <= 0) return; @@ -448,8 +453,6 @@ static void vchan_rec_callback(pa_mainloop_api *UNUSED(a), uint32_t cmd; while (libvchan_data_ready(u->rec_ctrl) >= (int)sizeof(cmd)) { if (libvchan_read(u->rec_ctrl, (char*)&cmd, sizeof(cmd)) != sizeof(cmd)) { - if (!pa_stream_is_corked(u->rec_stream)) - pa_stream_cork(u->rec_stream, 1, NULL, u); fprintf(stderr, "Failed to read from vchan\n"); quit(u, 1); return; @@ -461,10 +464,10 @@ static void vchan_rec_callback(pa_mainloop_api *UNUSED(a), if (!qdb_write(u->qdb, u->qdb_request_path, "1", 1)) { pacat_log("Failed to write QubesDB %s: %s", u->qdb_request_path, strerror(errno)); } - if (u->rec_allowed) { + if (u->rec_allowed && !u->rec_stream_connect_in_progress) { pacat_log("Recording start"); pa_stream_cork(u->rec_stream, 0, NULL, u); - } else + } else if (!u->rec_allowed) pacat_log("Recording requested but not allowed"); g_mutex_unlock(&u->prop_mutex); break; @@ -474,13 +477,19 @@ static void vchan_rec_callback(pa_mainloop_api *UNUSED(a), if (!qdb_write(u->qdb, u->qdb_request_path, "0", 1)) { pacat_log("Failed to write QubesDB %s: %s", u->qdb_request_path, strerror(errno)); } - if (!pa_stream_is_corked(u->rec_stream)) { + if (u->rec_allowed && !u->rec_stream_connect_in_progress) { pacat_log("Recording stop"); pa_stream_cork(u->rec_stream, 1, NULL, u); } g_mutex_unlock(&u->prop_mutex); break; case QUBES_PA_SINK_CORK_CMD: + if (pa_stream_is_corked(u->play_stream)) { + /* store it in case the stream isn't connected yet */ + u->play_corked = true; + pacat_log("Stream already corked"); + break; + } u->pending_play_cork = true; if (libvchan_data_ready(u->play_ctrl) > 0) { pacat_log("Deferred stream drain"); @@ -496,6 +505,7 @@ static void vchan_rec_callback(pa_mainloop_api *UNUSED(a), * do not clear u->draining, as draining while a drain * operation is in progress is not safe */ + u->play_corked = false; pa_stream_cork(u->play_stream, 0, NULL, u); break; default: @@ -520,10 +530,11 @@ static void stream_state_callback(pa_stream *s, void *userdata) { assert(u->mainloop_api); u->mainloop_api->io_free(u->play_stdio_event); } - if (u->rec_stdio_event && u->rec_stream == s) { + if (u->rec_stream == s) { pacat_log("rec stream terminated"); - assert(u->mainloop_api); - u->mainloop_api->io_free(u->rec_stdio_event); + pa_stream_unref(u->rec_stream); + u->rec_stream = NULL; + connect_disconnect_rec_stream(u, true, false); } break; @@ -557,13 +568,16 @@ static void stream_state_callback(pa_stream *s, void *userdata) { pacat_log("io_new play failed"); quit(u, 1); } + /* restore corked state after PA (re)connect */ + if (u->play_corked) + pa_stream_cork(u->play_stream, 1, NULL, u); } if (u->rec_stream == s) { - u->rec_stdio_event = u->mainloop_api->io_new(u->mainloop_api, - libvchan_fd_for_select(u->rec_ctrl), PA_IO_EVENT_INPUT, vchan_rec_callback, u); - if (!u->rec_stdio_event) { - pacat_log("io_new rec failed"); - quit(u, 1); + if (connect_disconnect_rec_stream(u, true, true)) { + if (u->rec_requested) { + pacat_log("Recording start"); + pa_stream_cork(u->rec_stream, 0, NULL, NULL); + } } } break; @@ -638,13 +652,84 @@ static void stream_event_callback(pa_stream *s, const char *name, pa_proplist *p pa_xfree(t); } +/* + * Connect/disconnect rec stream based on new_rec_allowed. This can be called + * from a state callback (if state_callback=true) to finalize + * connecting/disconnecting. + * Returns if stream is in the desired state. + */ +static bool connect_disconnect_rec_stream_locked( + struct userdata *u, bool state_callback, bool new_rec_allowed) +{ + if (state_callback) { + if (new_rec_allowed == u->rec_allowed) { + if (!qdb_write(u->qdb, u->qdb_status_path, u->rec_allowed ? "1" : "0", 1)) { + pacat_log("Failed to write QubesDB %s: %s", u->qdb_status_path, strerror(errno)); + } + u->rec_stream_connect_in_progress = false; + return true; + } + } + u->rec_stream_connect_in_progress = true; + if (new_rec_allowed) { + pa_stream_flags_t flags = + PA_STREAM_START_CORKED | PA_STREAM_ADJUST_LATENCY; + + /* setup recording stream */ + assert(!u->rec_stream); + + if (!(u->rec_stream = pa_stream_new_with_proplist(u->context, "record", &sample_spec, &u->channel_map, u->proplist))) { + pacat_log("rec pa_stream_new() failed: %s", pa_strerror(pa_context_errno(u->context))); + quit(u, 1); + } + + pa_stream_set_state_callback(u->rec_stream, stream_state_callback, u); + /* pa_stream_set_write_callback */ + pa_stream_set_read_callback(u->rec_stream, stream_read_callback, u); + pa_stream_set_suspended_callback(u->rec_stream, stream_suspended_callback, u); + pa_stream_set_moved_callback(u->rec_stream, stream_moved_callback, u); + pa_stream_set_underflow_callback(u->rec_stream, stream_underflow_callback, u); + pa_stream_set_overflow_callback(u->rec_stream, stream_overflow_callback, u); + pa_stream_set_started_callback(u->rec_stream, stream_started_callback, u); + pa_stream_set_event_callback(u->rec_stream, stream_event_callback, u); + pa_stream_set_buffer_attr_callback(u->rec_stream, stream_buffer_attr_callback, u); + + if (pa_stream_connect_record(u->rec_stream, + u->rec_device, + bufattr, + flags) < 0) { + pacat_log("pa_stream_connect_record() failed: %s", + pa_strerror(pa_context_errno(u->context))); + u->rec_allowed = false; + u->rec_stream_connect_in_progress = false; + } + } else { + if (pa_stream_disconnect(u->rec_stream) < 0) { + pacat_log("pa_stream_disconnect() failed: %s", + pa_strerror(pa_context_errno(u->context))); + u->rec_stream_connect_in_progress = false; + } + } + return false; +} + +static bool connect_disconnect_rec_stream( + struct userdata *u, bool state_callback, bool new_rec_allowed) +{ + bool ret; + + g_mutex_lock(&u->prop_mutex); + ret = connect_disconnect_rec_stream_locked(u, state_callback, new_rec_allowed); + g_mutex_unlock(&u->prop_mutex); + return ret; +} + /* This is called whenever the context status changes */ static void context_state_callback(pa_context *c, void *userdata) { struct userdata *u = userdata; pa_stream_flags_t flags = 0; - pa_channel_map channel_map; assert(c); @@ -656,9 +741,9 @@ static void context_state_callback(pa_context *c, void *userdata) { case PA_CONTEXT_READY: - pa_channel_map_init_extend(&channel_map, sample_spec.channels, PA_CHANNEL_MAP_DEFAULT); + pa_channel_map_init_extend(&u->channel_map, sample_spec.channels, PA_CHANNEL_MAP_DEFAULT); - if (!pa_channel_map_compatible(&channel_map, &sample_spec)) { + if (!pa_channel_map_compatible(&u->channel_map, &sample_spec)) { pacat_log("Channel map doesn't match sample specification"); goto fail; } @@ -669,7 +754,7 @@ static void context_state_callback(pa_context *c, void *userdata) { if (verbose) pacat_log("Connection established.%s", CLEAR_LINE); - if (!(u->play_stream = pa_stream_new_with_proplist(c, "playback", &sample_spec, &channel_map, u->proplist))) { + if (!(u->play_stream = pa_stream_new_with_proplist(c, "playback", &sample_spec, &u->channel_map, u->proplist))) { pacat_log("play pa_stream_new() failed: %s", pa_strerror(pa_context_errno(c))); goto fail; } @@ -693,37 +778,25 @@ static void context_state_callback(pa_context *c, void *userdata) { goto fail; } - /* setup recording stream */ - assert(!u->rec_stream); - - if (!(u->rec_stream = pa_stream_new_with_proplist(c, "record", &sample_spec, &channel_map, u->proplist))) { - pacat_log("rec pa_stream_new() failed: %s", pa_strerror(pa_context_errno(c))); - goto fail; - } - - pa_stream_set_state_callback(u->rec_stream, stream_state_callback, u); - /* pa_stream_set_write_callback */ - pa_stream_set_read_callback(u->rec_stream, stream_read_callback, u); - pa_stream_set_suspended_callback(u->rec_stream, stream_suspended_callback, u); - pa_stream_set_moved_callback(u->rec_stream, stream_moved_callback, u); - pa_stream_set_underflow_callback(u->rec_stream, stream_underflow_callback, u); - pa_stream_set_overflow_callback(u->rec_stream, stream_overflow_callback, u); - pa_stream_set_started_callback(u->rec_stream, stream_started_callback, u); - pa_stream_set_event_callback(u->rec_stream, stream_event_callback, u); - pa_stream_set_buffer_attr_callback(u->rec_stream, stream_buffer_attr_callback, u); - - flags = PA_STREAM_START_CORKED | PA_STREAM_ADJUST_LATENCY; u->rec_requested = 0; - if (pa_stream_connect_record(u->rec_stream, u->rec_device, bufattr, flags) < 0) { - pacat_log("pa_stream_connect_record() failed: %s", pa_strerror(pa_context_errno(c))); - goto fail; + /* and start watching for recording requests */ + u->rec_stdio_event = u->mainloop_api->io_new(u->mainloop_api, + libvchan_fd_for_select(u->rec_ctrl), PA_IO_EVENT_INPUT, vchan_rec_callback, u); + if (!u->rec_stdio_event) { + pacat_log("io_new rec failed"); + quit(u, 1); } - + if (u->rec_allowed) + connect_disconnect_rec_stream(u, false, u->rec_allowed); break; case PA_CONTEXT_TERMINATED: pacat_log("pulseaudio connection terminated"); + if (u->rec_stdio_event) { + assert(u->mainloop_api); + u->mainloop_api->io_free(u->rec_stdio_event); + } quit(u, 0); break; @@ -865,18 +938,9 @@ static void control_socket_callback(pa_mainloop_api *UNUSED(a), g_mutex_lock(&u->prop_mutex); if (new_rec_allowed != u->rec_allowed) { u->rec_allowed = new_rec_allowed; + if (!u->rec_stream_connect_in_progress) + connect_disconnect_rec_stream_locked(u, false, new_rec_allowed); pacat_log("Setting audio-input to %s", u->rec_allowed ? "enabled" : "disabled"); - if (u->rec_allowed && u->rec_requested) { - pacat_log("Recording start"); - pa_stream_cork(u->rec_stream, 0, NULL, NULL); - } else if (!u->rec_allowed && u->rec_stream && - (u->rec_requested || !pa_stream_is_corked(u->rec_stream))) { - pacat_log("Recording stop"); - pa_stream_cork(u->rec_stream, 1, NULL, NULL); - } - if (!qdb_write(u->qdb, u->qdb_status_path, new_rec_allowed ? "1" : "0", 1)) { - pacat_log("Failed to write QubesDB %s: %s", u->qdb_status_path, strerror(errno)); - } } g_mutex_unlock(&u->prop_mutex); } @@ -929,7 +993,7 @@ static int setup_control(struct userdata *u) { if (socket_fd < 0) goto fail; - rec_allowed = is_rec_allowed_from_qdb(u); + rec_allowed = is_rec_allowed_from_qdb(u); if (rec_allowed >= 0) { pacat_log("mic allowed: initial value read from Qubes DB '%d'", rec_allowed); u->rec_allowed = rec_allowed; diff --git a/pulse/pacat-simple-vchan.h b/pulse/pacat-simple-vchan.h index 07e26a2a..3451b4c7 100644 --- a/pulse/pacat-simple-vchan.h +++ b/pulse/pacat-simple-vchan.h @@ -30,6 +30,7 @@ struct userdata { pa_io_event* play_ctrl_event; pa_io_event* rec_ctrl_event; + pa_channel_map channel_map; GMutex prop_mutex; qdb_handle_t qdb; qdb_handle_t watch_qdb; // separate connection for watches @@ -38,11 +39,13 @@ struct userdata { char *qdb_request_path; int control_socket_fd; pa_io_event* control_socket_event; + bool rec_stream_connect_in_progress; bool rec_allowed; bool rec_requested; bool never_block; bool pending_play_cork; bool draining; + bool play_corked; int domid; int play_watch_fd, rec_watch_fd;