From 9cb77923d6a05e24ad49df7d38b0d4d3a8dbe5c0 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Marek=20Marczykowski-G=C3=B3recki?= Date: Thu, 12 Jun 2025 02:43:32 +0200 Subject: [PATCH] pulse: connect recording stream only when recording is allowed This makes streams list in pulseaudio/pipewire less cluttered (no streams that wouldn't really record), but also fixes recording indicator in xfce (which looks at connected streams, regardless of their corked state). Move stream creation to a separate function (connect_disconnect_rec_stream) and call it instead of just uncorking the stream. And also, still call it on pulseaudio connection, if recording was allowed initially. Since the stream connection is asynchronous, the "allow recording" logic is split into two parts, and both are put into connect_disconnect_rec_stream - selected by the state_callback parameter. This mechanism is a bit fragile, as there are quite a few states. Introduce rec_stream_connect_in_progress variable that is set when stream connect/disconnect is in progress. In that case, do not schedule another connect/disconnect, but at stream state change callback will check if the requested state didn't change in the meantime. This also means that if rec_stream_connect_in_progress is set, the rec_allowed value is unreliable (it may be a stream that is created but not connected yet for example) - so, if rec_stream_connect_in_progress is set, do not send any data. Since recording stream isn't initially connected now, decouple adding recording vchan callback from it - since that vchan is also used for commands. And adjust that callback to not assume the stream is always connected (do not use pa_stream_is_corked, don't assert for stream before checking if recording is allowed etc). This change allows commands to be handled before playback stream is connected, so add a variable to keep "corked" state to be set/restored on stream connection (connecting the stream resets its corked state). This should also fix preserving corked state in case of pulseaudio daemon crash/restart. And also drop pa_stream_cork in vchan error hander - the process quits few lines below anyway. Fixes QubesOS/qubes-issues#9999 --- pulse/pacat-simple-vchan.c | 174 +++++++++++++++++++++++++------------ pulse/pacat-simple-vchan.h | 3 + 2 files changed, 122 insertions(+), 55 deletions(-) diff --git a/pulse/pacat-simple-vchan.c b/pulse/pacat-simple-vchan.c index da06e9a5..cb592f44 100644 --- a/pulse/pacat-simple-vchan.c +++ b/pulse/pacat-simple-vchan.c @@ -92,6 +92,9 @@ const pa_buffer_attr * bufattr = NULL; static int verbose = 1; +static bool connect_disconnect_rec_stream( + struct userdata *u, bool state_callback, bool new_rec_allowed); + static void context_state_callback(pa_context *c, void *userdata); static void playback_stream_drain(struct userdata *u); @@ -332,6 +335,7 @@ static void playback_stream_drain_cb(pa_stream *s, int success, void *userdata) pacat_log("Playback vchan empty and playback stream drained. Corking playback stream."); u->pending_play_cork = false; pa_stream_cork(u->play_stream, 1, NULL, u); + u->play_corked = true; } else if (space_in_vchan < 0) { pacat_log("libvchan_data_ready() failed: return value %d", space_in_vchan); quit(u, 1); @@ -354,12 +358,13 @@ static void send_rec_data(pa_stream *s, struct userdata *u, bool discard_overrun size_t rec_buffer_length, rec_buffer_index = 0; int l, vchan_buffer_space; - assert(s); assert(u); - if (!u->rec_allowed) + if (!u->rec_allowed || u->rec_stream_connect_in_progress) return; + assert(s); + if (pa_stream_readable_size(s) <= 0) return; @@ -448,8 +453,6 @@ static void vchan_rec_callback(pa_mainloop_api *UNUSED(a), uint32_t cmd; while (libvchan_data_ready(u->rec_ctrl) >= (int)sizeof(cmd)) { if (libvchan_read(u->rec_ctrl, (char*)&cmd, sizeof(cmd)) != sizeof(cmd)) { - if (!pa_stream_is_corked(u->rec_stream)) - pa_stream_cork(u->rec_stream, 1, NULL, u); fprintf(stderr, "Failed to read from vchan\n"); quit(u, 1); return; @@ -461,10 +464,10 @@ static void vchan_rec_callback(pa_mainloop_api *UNUSED(a), if (!qdb_write(u->qdb, u->qdb_request_path, "1", 1)) { pacat_log("Failed to write QubesDB %s: %s", u->qdb_request_path, strerror(errno)); } - if (u->rec_allowed) { + if (u->rec_allowed && !u->rec_stream_connect_in_progress) { pacat_log("Recording start"); pa_stream_cork(u->rec_stream, 0, NULL, u); - } else + } else if (!u->rec_allowed) pacat_log("Recording requested but not allowed"); g_mutex_unlock(&u->prop_mutex); break; @@ -474,13 +477,19 @@ static void vchan_rec_callback(pa_mainloop_api *UNUSED(a), if (!qdb_write(u->qdb, u->qdb_request_path, "0", 1)) { pacat_log("Failed to write QubesDB %s: %s", u->qdb_request_path, strerror(errno)); } - if (!pa_stream_is_corked(u->rec_stream)) { + if (u->rec_allowed && !u->rec_stream_connect_in_progress) { pacat_log("Recording stop"); pa_stream_cork(u->rec_stream, 1, NULL, u); } g_mutex_unlock(&u->prop_mutex); break; case QUBES_PA_SINK_CORK_CMD: + if (pa_stream_is_corked(u->play_stream)) { + /* store it in case the stream isn't connected yet */ + u->play_corked = true; + pacat_log("Stream already corked"); + break; + } u->pending_play_cork = true; if (libvchan_data_ready(u->play_ctrl) > 0) { pacat_log("Deferred stream drain"); @@ -496,6 +505,7 @@ static void vchan_rec_callback(pa_mainloop_api *UNUSED(a), * do not clear u->draining, as draining while a drain * operation is in progress is not safe */ + u->play_corked = false; pa_stream_cork(u->play_stream, 0, NULL, u); break; default: @@ -520,10 +530,11 @@ static void stream_state_callback(pa_stream *s, void *userdata) { assert(u->mainloop_api); u->mainloop_api->io_free(u->play_stdio_event); } - if (u->rec_stdio_event && u->rec_stream == s) { + if (u->rec_stream == s) { pacat_log("rec stream terminated"); - assert(u->mainloop_api); - u->mainloop_api->io_free(u->rec_stdio_event); + pa_stream_unref(u->rec_stream); + u->rec_stream = NULL; + connect_disconnect_rec_stream(u, true, false); } break; @@ -557,13 +568,16 @@ static void stream_state_callback(pa_stream *s, void *userdata) { pacat_log("io_new play failed"); quit(u, 1); } + /* restore corked state after PA (re)connect */ + if (u->play_corked) + pa_stream_cork(u->play_stream, 1, NULL, u); } if (u->rec_stream == s) { - u->rec_stdio_event = u->mainloop_api->io_new(u->mainloop_api, - libvchan_fd_for_select(u->rec_ctrl), PA_IO_EVENT_INPUT, vchan_rec_callback, u); - if (!u->rec_stdio_event) { - pacat_log("io_new rec failed"); - quit(u, 1); + if (connect_disconnect_rec_stream(u, true, true)) { + if (u->rec_requested) { + pacat_log("Recording start"); + pa_stream_cork(u->rec_stream, 0, NULL, NULL); + } } } break; @@ -638,13 +652,84 @@ static void stream_event_callback(pa_stream *s, const char *name, pa_proplist *p pa_xfree(t); } +/* + * Connect/disconnect rec stream based on new_rec_allowed. This can be called + * from a state callback (if state_callback=true) to finalize + * connecting/disconnecting. + * Returns if stream is in the desired state. + */ +static bool connect_disconnect_rec_stream_locked( + struct userdata *u, bool state_callback, bool new_rec_allowed) +{ + if (state_callback) { + if (new_rec_allowed == u->rec_allowed) { + if (!qdb_write(u->qdb, u->qdb_status_path, u->rec_allowed ? "1" : "0", 1)) { + pacat_log("Failed to write QubesDB %s: %s", u->qdb_status_path, strerror(errno)); + } + u->rec_stream_connect_in_progress = false; + return true; + } + } + u->rec_stream_connect_in_progress = true; + if (new_rec_allowed) { + pa_stream_flags_t flags = + PA_STREAM_START_CORKED | PA_STREAM_ADJUST_LATENCY; + + /* setup recording stream */ + assert(!u->rec_stream); + + if (!(u->rec_stream = pa_stream_new_with_proplist(u->context, "record", &sample_spec, &u->channel_map, u->proplist))) { + pacat_log("rec pa_stream_new() failed: %s", pa_strerror(pa_context_errno(u->context))); + quit(u, 1); + } + + pa_stream_set_state_callback(u->rec_stream, stream_state_callback, u); + /* pa_stream_set_write_callback */ + pa_stream_set_read_callback(u->rec_stream, stream_read_callback, u); + pa_stream_set_suspended_callback(u->rec_stream, stream_suspended_callback, u); + pa_stream_set_moved_callback(u->rec_stream, stream_moved_callback, u); + pa_stream_set_underflow_callback(u->rec_stream, stream_underflow_callback, u); + pa_stream_set_overflow_callback(u->rec_stream, stream_overflow_callback, u); + pa_stream_set_started_callback(u->rec_stream, stream_started_callback, u); + pa_stream_set_event_callback(u->rec_stream, stream_event_callback, u); + pa_stream_set_buffer_attr_callback(u->rec_stream, stream_buffer_attr_callback, u); + + if (pa_stream_connect_record(u->rec_stream, + u->rec_device, + bufattr, + flags) < 0) { + pacat_log("pa_stream_connect_record() failed: %s", + pa_strerror(pa_context_errno(u->context))); + u->rec_allowed = false; + u->rec_stream_connect_in_progress = false; + } + } else { + if (pa_stream_disconnect(u->rec_stream) < 0) { + pacat_log("pa_stream_disconnect() failed: %s", + pa_strerror(pa_context_errno(u->context))); + u->rec_stream_connect_in_progress = false; + } + } + return false; +} + +static bool connect_disconnect_rec_stream( + struct userdata *u, bool state_callback, bool new_rec_allowed) +{ + bool ret; + + g_mutex_lock(&u->prop_mutex); + ret = connect_disconnect_rec_stream_locked(u, state_callback, new_rec_allowed); + g_mutex_unlock(&u->prop_mutex); + return ret; +} + /* This is called whenever the context status changes */ static void context_state_callback(pa_context *c, void *userdata) { struct userdata *u = userdata; pa_stream_flags_t flags = 0; - pa_channel_map channel_map; assert(c); @@ -656,9 +741,9 @@ static void context_state_callback(pa_context *c, void *userdata) { case PA_CONTEXT_READY: - pa_channel_map_init_extend(&channel_map, sample_spec.channels, PA_CHANNEL_MAP_DEFAULT); + pa_channel_map_init_extend(&u->channel_map, sample_spec.channels, PA_CHANNEL_MAP_DEFAULT); - if (!pa_channel_map_compatible(&channel_map, &sample_spec)) { + if (!pa_channel_map_compatible(&u->channel_map, &sample_spec)) { pacat_log("Channel map doesn't match sample specification"); goto fail; } @@ -669,7 +754,7 @@ static void context_state_callback(pa_context *c, void *userdata) { if (verbose) pacat_log("Connection established.%s", CLEAR_LINE); - if (!(u->play_stream = pa_stream_new_with_proplist(c, "playback", &sample_spec, &channel_map, u->proplist))) { + if (!(u->play_stream = pa_stream_new_with_proplist(c, "playback", &sample_spec, &u->channel_map, u->proplist))) { pacat_log("play pa_stream_new() failed: %s", pa_strerror(pa_context_errno(c))); goto fail; } @@ -693,37 +778,25 @@ static void context_state_callback(pa_context *c, void *userdata) { goto fail; } - /* setup recording stream */ - assert(!u->rec_stream); - - if (!(u->rec_stream = pa_stream_new_with_proplist(c, "record", &sample_spec, &channel_map, u->proplist))) { - pacat_log("rec pa_stream_new() failed: %s", pa_strerror(pa_context_errno(c))); - goto fail; - } - - pa_stream_set_state_callback(u->rec_stream, stream_state_callback, u); - /* pa_stream_set_write_callback */ - pa_stream_set_read_callback(u->rec_stream, stream_read_callback, u); - pa_stream_set_suspended_callback(u->rec_stream, stream_suspended_callback, u); - pa_stream_set_moved_callback(u->rec_stream, stream_moved_callback, u); - pa_stream_set_underflow_callback(u->rec_stream, stream_underflow_callback, u); - pa_stream_set_overflow_callback(u->rec_stream, stream_overflow_callback, u); - pa_stream_set_started_callback(u->rec_stream, stream_started_callback, u); - pa_stream_set_event_callback(u->rec_stream, stream_event_callback, u); - pa_stream_set_buffer_attr_callback(u->rec_stream, stream_buffer_attr_callback, u); - - flags = PA_STREAM_START_CORKED | PA_STREAM_ADJUST_LATENCY; u->rec_requested = 0; - if (pa_stream_connect_record(u->rec_stream, u->rec_device, bufattr, flags) < 0) { - pacat_log("pa_stream_connect_record() failed: %s", pa_strerror(pa_context_errno(c))); - goto fail; + /* and start watching for recording requests */ + u->rec_stdio_event = u->mainloop_api->io_new(u->mainloop_api, + libvchan_fd_for_select(u->rec_ctrl), PA_IO_EVENT_INPUT, vchan_rec_callback, u); + if (!u->rec_stdio_event) { + pacat_log("io_new rec failed"); + quit(u, 1); } - + if (u->rec_allowed) + connect_disconnect_rec_stream(u, false, u->rec_allowed); break; case PA_CONTEXT_TERMINATED: pacat_log("pulseaudio connection terminated"); + if (u->rec_stdio_event) { + assert(u->mainloop_api); + u->mainloop_api->io_free(u->rec_stdio_event); + } quit(u, 0); break; @@ -865,18 +938,9 @@ static void control_socket_callback(pa_mainloop_api *UNUSED(a), g_mutex_lock(&u->prop_mutex); if (new_rec_allowed != u->rec_allowed) { u->rec_allowed = new_rec_allowed; + if (!u->rec_stream_connect_in_progress) + connect_disconnect_rec_stream_locked(u, false, new_rec_allowed); pacat_log("Setting audio-input to %s", u->rec_allowed ? "enabled" : "disabled"); - if (u->rec_allowed && u->rec_requested) { - pacat_log("Recording start"); - pa_stream_cork(u->rec_stream, 0, NULL, NULL); - } else if (!u->rec_allowed && u->rec_stream && - (u->rec_requested || !pa_stream_is_corked(u->rec_stream))) { - pacat_log("Recording stop"); - pa_stream_cork(u->rec_stream, 1, NULL, NULL); - } - if (!qdb_write(u->qdb, u->qdb_status_path, new_rec_allowed ? "1" : "0", 1)) { - pacat_log("Failed to write QubesDB %s: %s", u->qdb_status_path, strerror(errno)); - } } g_mutex_unlock(&u->prop_mutex); } @@ -929,7 +993,7 @@ static int setup_control(struct userdata *u) { if (socket_fd < 0) goto fail; - rec_allowed = is_rec_allowed_from_qdb(u); + rec_allowed = is_rec_allowed_from_qdb(u); if (rec_allowed >= 0) { pacat_log("mic allowed: initial value read from Qubes DB '%d'", rec_allowed); u->rec_allowed = rec_allowed; diff --git a/pulse/pacat-simple-vchan.h b/pulse/pacat-simple-vchan.h index 07e26a2a..3451b4c7 100644 --- a/pulse/pacat-simple-vchan.h +++ b/pulse/pacat-simple-vchan.h @@ -30,6 +30,7 @@ struct userdata { pa_io_event* play_ctrl_event; pa_io_event* rec_ctrl_event; + pa_channel_map channel_map; GMutex prop_mutex; qdb_handle_t qdb; qdb_handle_t watch_qdb; // separate connection for watches @@ -38,11 +39,13 @@ struct userdata { char *qdb_request_path; int control_socket_fd; pa_io_event* control_socket_event; + bool rec_stream_connect_in_progress; bool rec_allowed; bool rec_requested; bool never_block; bool pending_play_cork; bool draining; + bool play_corked; int domid; int play_watch_fd, rec_watch_fd;