From ecac2b775a7ccf360e022b17f91b962d3ce89303 Mon Sep 17 00:00:00 2001 From: Szymon Sypniewicz Date: Sun, 29 Mar 2026 18:53:27 +0200 Subject: [PATCH 01/10] refactor: extract ModelDownloadManager, DeviceRoutingManager, TranscriptionStreamCoordinator Extract three managers from TranscriptionEngine god object: - ModelDownloadManager: model download, loading, progress tracking - DeviceRoutingManager: CoreAudio device listeners, restart coordination - TranscriptionStreamCoordinator: audio stream capture, transcriber orchestration Key fixes from code review: - Centralized DownloadProgressDetail in ModelDownloadManager - Added storeRestartState() for proper restart state management - Added finalizeMicStream/finalizeSystemStream with draining - Fixed Sendable closure isolation with @MainActor annotations - Removed Sendable conformance from @MainActor classes - Fixed clearCache() to use .info log level - Added deinit cleanup for CoreAudio listeners --- .../Transcription/DeviceRoutingManager.swift | 320 +++++++++++++++++ .../Transcription/ModelDownloadManager.swift | 215 ++++++++++++ .../TranscriptionStreamCoordinator.swift | 327 ++++++++++++++++++ 3 files changed, 862 insertions(+) create mode 100644 OpenOats/Sources/OpenOats/Transcription/DeviceRoutingManager.swift create mode 100644 OpenOats/Sources/OpenOats/Transcription/ModelDownloadManager.swift create mode 100644 OpenOats/Sources/OpenOats/Transcription/TranscriptionStreamCoordinator.swift diff --git a/OpenOats/Sources/OpenOats/Transcription/DeviceRoutingManager.swift b/OpenOats/Sources/OpenOats/Transcription/DeviceRoutingManager.swift new file mode 100644 index 00000000..3c05ad2e --- /dev/null +++ b/OpenOats/Sources/OpenOats/Transcription/DeviceRoutingManager.swift @@ -0,0 +1,320 @@ +import CoreAudio +import Foundation +import os + +/// Manages CoreAudio device listeners and coordinates audio stream restarts +/// when the default input or output device changes. +@MainActor +final class DeviceRoutingManager { + /// Called when a mic restart is requested with the target device ID. + var onMicRestartRequested: (@Sendable (AudioDeviceID) -> Void)? + + /// Called when a system audio restart is requested. + var onSystemRestartRequested: (@Sendable () -> Void)? + + /// Tracks whether user selected "System Default" (0) or a specific device. + private var userSelectedDeviceID: AudioDeviceID = 0 + + /// Tracks the resolved mic device ID currently in use. + private var currentMicDeviceID: AudioDeviceID = 0 + + /// Queued mic restart device ID (nil if no pending restart). + private var pendingMicDeviceID: AudioDeviceID? + + /// Queued system audio restart flag. + private var pendingSystemAudioRestart = false + + /// Active mic restart task. + private var micRestartTask: Task? + + /// Active system restart task. + private var sysRestartTask: Task? + + /// Listens for default input device changes at the OS level. + private var defaultDeviceListenerBlock: AudioObjectPropertyListenerBlock? + + /// Listens for default output device changes at the OS level. + private var defaultOutputDeviceListenerBlock: AudioObjectPropertyListenerBlock? + + /// Whether device listeners are currently installed. + private var isListening = false + + /// Stored state for restarts (populated by TranscriptionEngine) + private(set) var currentLocale: Locale? + private(set) var currentVadManager: VadManager? + private(set) var currentMicBackend: (any TranscriptionBackend)? + private(set) var currentSystemBackend: (any TranscriptionBackend)? + private(set) var currentFlushInterval: Int? + private(set) weak var currentTranscriptStore: TranscriptStore? + + /// Store restart state for later use during device changes. + func storeRestartState( + locale: Locale, + vadManager: VadManager, + micBackend: any TranscriptionBackend, + systemBackend: any TranscriptionBackend, + flushInterval: Int, + transcriptStore: TranscriptStore + ) { + currentLocale = locale + currentVadManager = vadManager + currentMicBackend = micBackend + currentSystemBackend = systemBackend + currentFlushInterval = flushInterval + currentTranscriptStore = transcriptStore + } + + /// Start listening for device changes. + /// - Parameter isUsingDefaultDevice: True if the user has selected "System Default" (deviceID = 0) + func startListening(isUsingDefaultDevice: Bool) { + guard !isListening else { return } + isListening = true + + // Store the default device selection state + if isUsingDefaultDevice { + userSelectedDeviceID = 0 + } + + installDefaultDeviceListener() + installDefaultOutputDeviceListener() + } + + /// Stop listening for device changes and cancel any pending restarts. + func stopListening() { + isListening = false + + removeDefaultDeviceListener() + removeDefaultOutputDeviceListener() + + micRestartTask?.cancel() + sysRestartTask?.cancel() + micRestartTask = nil + sysRestartTask = nil + pendingMicDeviceID = nil + pendingSystemAudioRestart = false + } + + deinit { + // Ensure cleanup happens even if stopListening wasn't called explicitly + if isListening { + removeDefaultDeviceListener() + removeDefaultOutputDeviceListener() + } + } + + /// Request a mic restart with a new device. + /// Pass 0 to use the system default device. + func requestMicRestart(deviceID: AudioDeviceID) { + guard isListening else { return } + + pendingMicDeviceID = deviceID + + if micRestartTask != nil { + Log.transcription.debug("queued mic restart for device \(deviceID, privacy: .public)") + return + } + + micRestartTask = Task { @MainActor [weak self] in + guard let self else { return } + defer { self.micRestartTask = nil } + + while !Task.isCancelled, let requestedDeviceID = self.pendingMicDeviceID { + self.pendingMicDeviceID = nil + await self.performMicRestart(deviceID: requestedDeviceID) + } + } + } + + /// Request a system audio restart (e.g., when output device changes). + func requestSystemAudioRestart() { + guard isListening else { return } + + pendingSystemAudioRestart = true + + if sysRestartTask != nil { + Log.transcription.debug("queued system audio restart") + return + } + + sysRestartTask = Task { @MainActor [weak self] in + guard let self else { return } + defer { self.sysRestartTask = nil } + + while !Task.isCancelled, self.pendingSystemAudioRestart { + self.pendingSystemAudioRestart = false + await self.performSystemAudioRestart() + } + } + } + + /// Update the tracked device IDs after a successful restart. + func updateCurrentDeviceID(_ deviceID: AudioDeviceID, isUserSelection: Bool) { + if isUserSelection { + userSelectedDeviceID = deviceID + } + currentMicDeviceID = deviceID + } + + /// Get the current mic device ID. + var currentMicID: AudioDeviceID { currentMicDeviceID } + + /// Get the user selected device ID (0 = system default). + var selectedDeviceID: AudioDeviceID { userSelectedDeviceID } + + // MARK: - Default Device Listeners + + private func installDefaultDeviceListener() { + guard defaultDeviceListenerBlock == nil else { return } + + var address = AudioObjectPropertyAddress( + mSelector: kAudioHardwarePropertyDefaultInputDevice, + mScope: kAudioObjectPropertyScopeGlobal, + mElement: kAudioObjectPropertyElementMain + ) + + // Use a background queue for the CoreAudio callback, then hop to MainActor via Task + let queue = DispatchQueue.global(qos: .utility) + + let block: AudioObjectPropertyListenerBlock = { [weak self] _, _ in + guard let self else { return } + Task { @MainActor in + // Only auto-restart if user is using system default + guard self.userSelectedDeviceID == 0 else { return } + self.requestMicRestart(deviceID: 0) + } + } + defaultDeviceListenerBlock = block + + AudioObjectAddPropertyListenerBlock( + AudioObjectID(kAudioObjectSystemObject), + &address, + queue, + block + ) + } + + private func removeDefaultDeviceListener() { + guard let block = defaultDeviceListenerBlock else { return } + var address = AudioObjectPropertyAddress( + mSelector: kAudioHardwarePropertyDefaultInputDevice, + mScope: kAudioObjectPropertyScopeGlobal, + mElement: kAudioObjectPropertyElementMain + ) + AudioObjectRemovePropertyListenerBlock( + AudioObjectID(kAudioObjectSystemObject), + &address, + DispatchQueue.global(qos: .utility), + block + ) + defaultDeviceListenerBlock = nil + } + + private func installDefaultOutputDeviceListener() { + guard defaultOutputDeviceListenerBlock == nil else { return } + + var address = AudioObjectPropertyAddress( + mSelector: kAudioHardwarePropertyDefaultOutputDevice, + mScope: kAudioObjectPropertyScopeGlobal, + mElement: kAudioObjectPropertyElementMain + ) + + let queue = DispatchQueue.global(qos: .utility) + + let block: AudioObjectPropertyListenerBlock = { [weak self] _, _ in + guard let self else { return } + Task { @MainActor in + self.requestSystemAudioRestart() + } + } + defaultOutputDeviceListenerBlock = block + + AudioObjectAddPropertyListenerBlock( + AudioObjectID(kAudioObjectSystemObject), + &address, + queue, + block + ) + } + + private func removeDefaultOutputDeviceListener() { + guard let block = defaultOutputDeviceListenerBlock else { return } + var address = AudioObjectPropertyAddress( + mSelector: kAudioHardwarePropertyDefaultOutputDevice, + mScope: kAudioObjectPropertyScopeGlobal, + mElement: kAudioObjectPropertyElementMain + ) + AudioObjectRemovePropertyListenerBlock( + AudioObjectID(kAudioObjectSystemObject), + &address, + DispatchQueue.global(qos: .utility), + block + ) + defaultOutputDeviceListenerBlock = nil + } + + // MARK: - Restart Implementation + + private func performMicRestart(deviceID: AudioDeviceID) async { + guard !Task.isCancelled else { return } + + userSelectedDeviceID = deviceID + + guard let targetMicID = resolvedMicDeviceID(for: deviceID) else { + Log.transcription.error("mic swap failed: device unavailable") + return + } + + guard targetMicID != currentMicDeviceID else { + Log.transcription.debug("mic swap skipped, same device \(targetMicID, privacy: .public)") + return + } + + Log.transcription.info("switching mic from \(self.currentMicDeviceID, privacy: .public) to \(targetMicID, privacy: .public)") + + // Notify the engine to perform the actual restart + // Note: State is updated only after successful restart via updateCurrentDeviceID + onMicRestartRequested?(targetMicID) + } + + private func performSystemAudioRestart() async { + guard !Task.isCancelled else { return } + + Log.transcription.info("restarting system audio stream") + + // Notify the engine to perform the actual restart + onSystemRestartRequested?() + } + + // MARK: - Device Resolution + + /// Generate an error message for when a mic device is unavailable. + func unavailableMicMessage(for inputDeviceID: AudioDeviceID) -> String { + if inputDeviceID > 0 { + return "The selected microphone is no longer available." + } + + return "No default microphone is currently available." + } + + /// Check if a specific device ID is available. + func isDeviceAvailable(_ deviceID: AudioDeviceID) -> Bool { + if deviceID == 0 { + return MicCapture.defaultInputDeviceID() != nil + } + let available = Set(MicCapture.availableInputDevices().map(\.id)) + return available.contains(deviceID) + } +} + +// MARK: - Public Extension for Engine Access + +extension DeviceRoutingManager { + /// Resolve the target mic device ID, handling "System Default" (0) selection. + func resolvedMicDeviceID(for inputDeviceID: AudioDeviceID) -> AudioDeviceID? { + if inputDeviceID > 0 { + let availableDeviceIDs = Set(MicCapture.availableInputDevices().map(\.id)) + return availableDeviceIDs.contains(inputDeviceID) ? inputDeviceID : nil + } + return MicCapture.defaultInputDeviceID() + } +} diff --git a/OpenOats/Sources/OpenOats/Transcription/ModelDownloadManager.swift b/OpenOats/Sources/OpenOats/Transcription/ModelDownloadManager.swift new file mode 100644 index 00000000..0d0fc8f3 --- /dev/null +++ b/OpenOats/Sources/OpenOats/Transcription/ModelDownloadManager.swift @@ -0,0 +1,215 @@ +import AVFoundation +import FluidAudio +import Foundation +import os + +/// Enriched download progress info computed from fraction changes over time. +struct DownloadProgressDetail: Sendable { + let fraction: Double + /// Formatted string like "142 MB / 800 MB" + let sizeText: String? + /// Formatted string like "3.5 MB/s" + let speedText: String? + /// Formatted string like "2m 15s remaining" + let etaText: String? +} + +/// Loaded backend instances for mic and system audio transcription. +struct LoadedBackends { + let mic: any TranscriptionBackend + let system: any TranscriptionBackend +} + +/// Manages transcription model download, loading, and progress tracking. +@MainActor +final class ModelDownloadManager { + private let settings: AppSettings + + /// Combined progress for UI binding. + private(set) var needsDownload: Bool = false + private(set) var downloadProgress: Double? + private(set) var downloadDetail: DownloadProgressDetail? + private(set) var isLoading: Bool = false + + /// Callbacks for status updates. + var onStatusUpdate: (@Sendable (String) -> Void)? + var onProgressUpdate: (@Sendable (Double) -> Void)? + + // Progress tracking state (not observed) + private var downloadStartTime: Date? + private var downloadTotalBytes: Int64? + + init(settings: AppSettings) { + self.settings = settings + } + + /// Check if the specified model needs to be downloaded. + /// - Returns: true if a download is needed + @discardableResult + func checkAvailability(for model: TranscriptionModel) -> Bool { + needsDownload = Self.modelNeedsDownload(model) + return needsDownload + } + + /// Load the specified model and return configured backends for mic and system audio. + /// - Parameters: + /// - model: The transcription model to load + /// - customVocabulary: Custom vocabulary string to pass to the backend + /// - Returns: Loaded backends for mic and system audio + func loadModel( + _ model: TranscriptionModel, + customVocabulary: String + ) async throws -> LoadedBackends { + guard !isLoading else { + throw TranscriptionBackendError.notPrepared + } + + isLoading = true + defer { + isLoading = false + // Clear progress state on both success and failure + downloadProgress = nil + downloadDetail = nil + downloadStartTime = nil + downloadTotalBytes = nil + } + + let isDownloading = needsDownload + + if isDownloading { + downloadProgress = 0 + downloadStartTime = Date() + downloadTotalBytes = model.estimatedDownloadBytes + downloadDetail = DownloadProgressDetail( + fraction: 0, + sizeText: nil, + speedText: nil, + etaText: nil + ) + } + + Log.transcription.info("loading transcription model \(model.rawValue, privacy: .public)") + + // Load mic backend + let mic = model.makeBackend(customVocabulary: customVocabulary) + try await mic.prepare( + onStatus: { [weak self] status in + Task { @MainActor in + self?.onStatusUpdate?(status) + } + }, + onProgress: { [weak self] fraction in + Task { @MainActor in + self?.downloadProgress = fraction + self?.updateDownloadDetail(fraction: fraction) + self?.onProgressUpdate?(fraction) + } + } + ) + + // Parakeet needs a separate backend for system audio (mutable decoder state). + // Qwen3 is actor-based and thread-safe, so reuse the same instance. + let system: any TranscriptionBackend + if model == .qwen3ASR06B { + system = mic + } else { + let sys = model.makeBackend(customVocabulary: customVocabulary) + try await sys.prepare { _ in } + system = sys + } + + // Clear download progress on success + needsDownload = false + + Log.transcription.info("transcription model loaded") + + return LoadedBackends(mic: mic, system: system) + } + + /// Clear the model cache for the specified model. + func clearCache(for model: TranscriptionModel) { + model.makeBackend().clearModelCache() + Log.transcription.info("cleared model cache for \(model.rawValue, privacy: .public)") + } + + // MARK: - Private Helpers + + private static func modelNeedsDownload(_ model: TranscriptionModel) -> Bool { + let backend = model.makeBackend() + if case .needsDownload = backend.checkStatus() { + return true + } + return false + } + + private func updateDownloadDetail(fraction: Double) { + guard let startTime = downloadStartTime else { + downloadDetail = DownloadProgressDetail( + fraction: fraction, + sizeText: nil, + speedText: nil, + etaText: nil + ) + return + } + + let elapsed = Date().timeIntervalSince(startTime) + let totalBytes = downloadTotalBytes + + // Size text: "142 MB / 800 MB" (only when total is known) + var sizeText: String? + if let totalBytes { + let downloaded = Int64(fraction * Double(totalBytes)) + sizeText = "\(Self.formatBytes(downloaded)) / \(Self.formatBytes(totalBytes))" + } + + // Speed and ETA need enough elapsed time to be meaningful + var speedText: String? + var etaText: String? + if elapsed > 1, fraction > 0.01 { + // Speed from fraction progress rate + known total + if let totalBytes { + let bytesDownloaded = fraction * Double(totalBytes) + let bytesPerSecond = bytesDownloaded / elapsed + speedText = "\(Self.formatBytes(Int64(bytesPerSecond)))/s" + + let remaining = Double(totalBytes) - bytesDownloaded + if bytesPerSecond > 0 { + let secondsLeft = remaining / bytesPerSecond + etaText = Self.formatDuration(secondsLeft) + } + } else { + // No total bytes known — estimate ETA from fraction rate alone + let fractionPerSecond = fraction / elapsed + if fractionPerSecond > 0 { + let remainingFraction = 1.0 - fraction + let secondsLeft = remainingFraction / fractionPerSecond + etaText = Self.formatDuration(secondsLeft) + } + } + } + + downloadDetail = DownloadProgressDetail( + fraction: fraction, + sizeText: sizeText, + speedText: speedText, + etaText: etaText + ) + } + + private static func formatBytes(_ bytes: Int64) -> String { + if bytes >= 1_000_000_000 { + return String(format: "%.1f GB", Double(bytes) / 1_000_000_000) + } else { + return String(format: "%.0f MB", Double(bytes) / 1_000_000) + } + } + + private static func formatDuration(_ seconds: Double) -> String { + let s = Int(seconds) + if s < 60 { return "\(s)s remaining" } + let m = s / 60 + let rem = s % 60 + return rem > 0 ? "\(m)m \(rem)s remaining" : "\(m)m remaining" + } +} diff --git a/OpenOats/Sources/OpenOats/Transcription/TranscriptionStreamCoordinator.swift b/OpenOats/Sources/OpenOats/Transcription/TranscriptionStreamCoordinator.swift new file mode 100644 index 00000000..3c83a036 --- /dev/null +++ b/OpenOats/Sources/OpenOats/Transcription/TranscriptionStreamCoordinator.swift @@ -0,0 +1,327 @@ +import AVFoundation +import FluidAudio +import Foundation +import os + +/// Coordinates audio stream capture, transcribers, and optional recording. +/// Manages the lifecycle of mic and system audio transcription tasks. +@MainActor +final class TranscriptionStreamCoordinator { + private let micCapture = MicCapture() + private let systemCapture = SystemAudioCapture() + + /// Audio recorder for tapping streams (set externally when recording is enabled). + weak var audioRecorder: AudioRecorder? + + /// Combined audio level (mic + system) for the UI meter. + nonisolated var audioLevel: Float { + max(micCapture.audioLevel, systemCapture.audioLevel) + } + + /// Mute/unmute the microphone. When muted, mic audio is not transcribed + /// and the audio level reads as 0. System audio continues normally. + nonisolated var isMicMuted: Bool { + get { micCapture.isMuted } + set { micCapture.isMuted = newValue } + } + + /// Mic transcription task. + private var micTask: Task? + + /// System audio transcription task. + private var sysTask: Task? + + /// Health check task for mic audio. + private var micHealthTask: Task? + + /// Track if mic is currently running for health checks. + private var isMicRunning = false + + /// Start the mic audio stream and transcription. + /// - Returns: The transcription task, or nil if setup failed + @discardableResult + func startMicStream( + locale: Locale, + deviceID: AudioDeviceID, + backend: any TranscriptionBackend, + vadManager: VadManager, + transcriptStore: TranscriptStore, + flushInterval: Int, + useAEC: Bool + ) -> Task? { + // Store state for potential restarts + isMicRunning = true + + var micStream = micCapture.bufferStream(deviceID: deviceID, echoCancellation: useAEC) + + // Add recording tap if recorder is set + if let recorder = audioRecorder { + micStream = Self.tappedStream(micStream) { buffer in + recorder.writeMicBuffer(buffer) + } + } + + guard let micTranscriber = makeTranscriber( + backend: backend, + locale: locale, + vadManager: vadManager, + speaker: .you, + transcriptStore: transcriptStore, + flushInterval: flushInterval + ) else { + Log.transcription.error("Failed to create mic transcriber") + isMicRunning = false + return nil + } + + micTask = Task { [weak self] in + await micTranscriber.run(stream: micStream) + await self?.markMicStopped() + } + + // Health check: if mic produces no audio within 5 seconds, log the issue + micHealthTask?.cancel() + micHealthTask = Task { @MainActor [weak self] in + try? await Task.sleep(for: .seconds(5)) + guard let self, self.isMicRunning else { return } + if !self.micCapture.hasCapturedFrames && self.micCapture.captureError == nil { + Log.transcription.error("no mic audio after 5s") + } + } + + return micTask + } + + /// Mark mic as stopped - MainActor-isolated to prevent concurrency violations + private func markMicStopped() { + isMicRunning = false + } + + /// Stop the mic audio stream and transcription. + func stopMicStream() { + micHealthTask?.cancel() + micHealthTask = nil + micCapture.finishStream() + micTask?.cancel() + micTask = nil + micCapture.stop() + isMicRunning = false + } + + /// Finalize mic stream, waiting for transcriber to drain. + func finalizeMicStream() async { + micHealthTask?.cancel() + micHealthTask = nil + micCapture.finishStream() + await micTask?.value + micCapture.stop() + micTask = nil + isMicRunning = false + } + + /// Start the system audio stream and transcription. + /// - Returns: The transcription task, or nil if setup failed + @discardableResult + func startSystemStream( + locale: Locale, + backend: any TranscriptionBackend, + vadManager: VadManager, + diarizationManager: DiarizationManager?, + transcriptStore: TranscriptStore, + flushInterval: Int + ) async -> Task? { + Log.transcription.info("starting system audio capture") + + let sysStreams: SystemAudioCapture.CaptureStreams + do { + sysStreams = try await systemCapture.bufferStream() + Log.transcription.info("system audio capture started") + } catch { + Log.transcription.error("Failed to start system audio: \(error.localizedDescription, privacy: .public)") + return nil + } + + var sysStream = sysStreams.systemAudio + + // Track cumulative audio time for diarizer speaker attribution + let sysAudioTime = SyncDouble() + + // Tee system audio to diarization manager if enabled + if let dm = diarizationManager { + let diarFlushSize = 16000 + let originalSysStream = sysStream + let (diarTapped, diarContinuation) = AsyncStream.makeStream() + + Task { + nonisolated(unsafe) let safeDm = dm + var diarBuf: [Float] = [] + for await buffer in originalSysStream { + nonisolated(unsafe) let b = buffer + diarContinuation.yield(b) + guard let channelData = buffer.floatChannelData else { continue } + let frameCount = Int(buffer.frameLength) + sysAudioTime.add(Double(frameCount) / buffer.format.sampleRate) + diarBuf.append(contentsOf: UnsafeBufferPointer(start: channelData[0], count: frameCount)) + if diarBuf.count >= diarFlushSize { + let batch = diarBuf + diarBuf.removeAll(keepingCapacity: true) + try? await safeDm.feedAudio(batch) + } + } + // Flush tail + if !diarBuf.isEmpty { + try? await safeDm.feedAudio(diarBuf) + } + diarContinuation.finish() + } + sysStream = diarTapped + } + + // Add recording tap if recorder is set + if let recorder = audioRecorder { + sysStream = Self.tappedStream(sysStream) { buffer in + recorder.writeSysBuffer(buffer) + } + } + + guard let sysTranscriber = makeTranscriber( + backend: backend, + locale: locale, + vadManager: vadManager, + speaker: .them, + transcriptStore: transcriptStore, + flushInterval: flushInterval, + diarizationManager: diarizationManager, + audioTime: diarizationManager != nil ? sysAudioTime : nil + ) else { + Log.transcription.error("Failed to create system audio transcriber") + return nil + } + + sysTask = Task { + await sysTranscriber.run(stream: sysStream) + } + + return sysTask + } + + /// Stop the system audio stream and transcription. + func stopSystemStream() { + systemCapture.finishStream() + sysTask?.cancel() + sysTask = nil + Task { await systemCapture.stop() } + } + + /// Finalize system stream, waiting for transcriber to drain. + func finalizeSystemStream() async { + systemCapture.finishStream() + await sysTask?.value + await systemCapture.stop() + sysTask = nil + } + + /// Stop all audio streams and transcription. + func stopAll() { + stopMicStream() + stopSystemStream() + } + + /// Finalize all streams and wait for tasks to complete. + func finalize() async { + micHealthTask?.cancel() + micHealthTask = nil + + micCapture.finishStream() + systemCapture.finishStream() + + await micTask?.value + await sysTask?.value + + micCapture.stop() + await systemCapture.stop() + + micTask = nil + sysTask = nil + isMicRunning = false + } + + // MARK: - Private Helpers + + private func makeTranscriber( + backend: any TranscriptionBackend, + locale: Locale, + vadManager: VadManager, + speaker: Speaker, + transcriptStore: TranscriptStore, + flushInterval: Int, + diarizationManager: DiarizationManager? = nil, + audioTime: SyncDouble? = nil + ) -> StreamingTranscriber? { + let store = transcriptStore + + let onPartial: @Sendable (String) -> Void + let onFinal: @Sendable (String) -> Void + + if speaker == .you { + onPartial = { text in + Task { @MainActor in store.volatileYouText = text } + } + onFinal = { text in + Task { @MainActor in + store.volatileYouText = "" + store.append(Utterance(text: text, speaker: .you)) + } + } + } else { + let dm = diarizationManager + let time = audioTime + onPartial = { text in + Task { @MainActor in store.volatileThemText = text } + } + onFinal = { text in + Task { @MainActor in + store.volatileThemText = "" + let finalSpeaker: Speaker + if let dm, let t = time { + let endTime = t.value + let startTime = max(0, endTime - 5.0) + finalSpeaker = await dm.dominantSpeaker(from: startTime, to: endTime) + } else { + finalSpeaker = .them + } + store.append(Utterance(text: text, speaker: finalSpeaker)) + } + } + } + + return StreamingTranscriber( + backend: backend, + locale: locale, + vadManager: vadManager, + speaker: speaker, + flushInterval: flushInterval, + onPartial: onPartial, + onFinal: onFinal + ) + } + + /// Wrap an audio stream to forward each buffer to a synchronous tap before yielding it downstream. + private nonisolated static func tappedStream( + _ stream: AsyncStream, + tap: @escaping @Sendable (AVAudioPCMBuffer) -> Void + ) -> AsyncStream { + struct Box: @unchecked Sendable { let stream: AsyncStream } + let box = Box(stream: stream) + let (output, continuation) = AsyncStream.makeStream() + Task { + for await buffer in box.stream { + tap(buffer) + nonisolated(unsafe) let b = buffer + continuation.yield(b) + } + continuation.finish() + } + return output + } +} From e96c6dff282dbbf2a5ae30855f128d6cac0b9500 Mon Sep 17 00:00:00 2001 From: Szymon Sypniewicz Date: Sun, 29 Mar 2026 19:02:18 +0200 Subject: [PATCH 02/10] fix: address critical compilation and memory leak issues from code review - Add missing import FluidAudio to DeviceRoutingManager - Add Utils/Logging.swift for centralized Log definitions - Clear restart state in stopListening() to prevent memory leaks (VadManager, backends now properly released on session end) --- .../OpenOats/Transcription/DeviceRoutingManager.swift | 9 +++++++++ OpenOats/Sources/OpenOats/Utils/Logging.swift | 7 +++++++ 2 files changed, 16 insertions(+) diff --git a/OpenOats/Sources/OpenOats/Transcription/DeviceRoutingManager.swift b/OpenOats/Sources/OpenOats/Transcription/DeviceRoutingManager.swift index 3c05ad2e..d1ad2807 100644 --- a/OpenOats/Sources/OpenOats/Transcription/DeviceRoutingManager.swift +++ b/OpenOats/Sources/OpenOats/Transcription/DeviceRoutingManager.swift @@ -1,4 +1,5 @@ import CoreAudio +import FluidAudio import Foundation import os @@ -92,6 +93,14 @@ final class DeviceRoutingManager { sysRestartTask = nil pendingMicDeviceID = nil pendingSystemAudioRestart = false + + // Clear restart state to free heavy objects (VadManager, backends) + currentLocale = nil + currentVadManager = nil + currentMicBackend = nil + currentSystemBackend = nil + currentFlushInterval = nil + currentTranscriptStore = nil } deinit { diff --git a/OpenOats/Sources/OpenOats/Utils/Logging.swift b/OpenOats/Sources/OpenOats/Utils/Logging.swift index eabe2f59..8300ab4f 100644 --- a/OpenOats/Sources/OpenOats/Utils/Logging.swift +++ b/OpenOats/Sources/OpenOats/Utils/Logging.swift @@ -1,6 +1,13 @@ import Foundation import os +/// Centralized logger factory. One subsystem, per-component categories. +/// +/// Usage: `Log.mic.debug("buffer received")` +/// +/// Filter in Terminal: +/// log stream --predicate 'subsystem == "com.openoats.app"' --level debug +/// log stream --predicate 'subsystem == "com.openoats.app" AND category == "MicCapture"' enum Log { static let mic = Logger(subsystem: subsystem, category: "MicCapture") static let recorder = Logger(subsystem: subsystem, category: "AudioRecorder") From 11b84796fb10c2fff7d0d7f75b9322d58d3d0015 Mon Sep 17 00:00:00 2001 From: Szymon Sypniewicz Date: Sun, 29 Mar 2026 19:03:20 +0200 Subject: [PATCH 03/10] fix: track and cancel diarization task properly - Add diarizationTask property to track diarization audio feed - Cancel and await diarization task in stop/finalize paths - Check Task.isCancelled in diarization loop to exit promptly - Prevent old audio from draining into diarizer after session stop --- .../TranscriptionStreamCoordinator.swift | 25 ++++++++++++++++--- 1 file changed, 22 insertions(+), 3 deletions(-) diff --git a/OpenOats/Sources/OpenOats/Transcription/TranscriptionStreamCoordinator.swift b/OpenOats/Sources/OpenOats/Transcription/TranscriptionStreamCoordinator.swift index 3c83a036..01f17a4c 100644 --- a/OpenOats/Sources/OpenOats/Transcription/TranscriptionStreamCoordinator.swift +++ b/OpenOats/Sources/OpenOats/Transcription/TranscriptionStreamCoordinator.swift @@ -34,6 +34,9 @@ final class TranscriptionStreamCoordinator { /// Health check task for mic audio. private var micHealthTask: Task? + /// Diarization audio feed task. + private var diarizationTask: Task? + /// Track if mic is currently running for health checks. private var isMicRunning = false @@ -152,10 +155,13 @@ final class TranscriptionStreamCoordinator { let originalSysStream = sysStream let (diarTapped, diarContinuation) = AsyncStream.makeStream() - Task { + diarizationTask?.cancel() + diarizationTask = Task { [weak self] in nonisolated(unsafe) let safeDm = dm var diarBuf: [Float] = [] for await buffer in originalSysStream { + // Check for cancellation + guard !Task.isCancelled else { break } nonisolated(unsafe) let b = buffer diarContinuation.yield(b) guard let channelData = buffer.floatChannelData else { continue } @@ -169,10 +175,11 @@ final class TranscriptionStreamCoordinator { } } // Flush tail - if !diarBuf.isEmpty { + if !Task.isCancelled, !diarBuf.isEmpty { try? await safeDm.feedAudio(diarBuf) } diarContinuation.finish() + self?.diarizationTask = nil } sysStream = diarTapped } @@ -207,18 +214,26 @@ final class TranscriptionStreamCoordinator { /// Stop the system audio stream and transcription. func stopSystemStream() { + diarizationTask?.cancel() systemCapture.finishStream() sysTask?.cancel() sysTask = nil - Task { await systemCapture.stop() } + Task { + await diarizationTask?.value + await systemCapture.stop() + diarizationTask = nil + } } /// Finalize system stream, waiting for transcriber to drain. func finalizeSystemStream() async { + diarizationTask?.cancel() systemCapture.finishStream() await sysTask?.value + await diarizationTask?.value await systemCapture.stop() sysTask = nil + diarizationTask = nil } /// Stop all audio streams and transcription. @@ -232,17 +247,21 @@ final class TranscriptionStreamCoordinator { micHealthTask?.cancel() micHealthTask = nil + diarizationTask?.cancel() + micCapture.finishStream() systemCapture.finishStream() await micTask?.value await sysTask?.value + await diarizationTask?.value micCapture.stop() await systemCapture.stop() micTask = nil sysTask = nil + diarizationTask = nil isMicRunning = false } From 6bbf41ecfbabb40600b64e155f4cb8f1169a9fa7 Mon Sep 17 00:00:00 2001 From: Szymon Sypniewicz Date: Sun, 29 Mar 2026 19:12:01 +0200 Subject: [PATCH 04/10] refactor: convert DeviceRoutingManager callbacks to async Change onMicRestartRequested and onSystemRestartRequested from synchronous to async closures to prevent overlapping restarts when the engine responds with async work. - Change callback types to (@Sendable ...) async -> Void - Await callbacks in performMicRestart() and performSystemAudioRestart() - Fix deinit to dispatch cleanup to MainActor Fixes fire-and-forget callback issue (lines 10, 117, 139, 276, 285) --- .../Transcription/DeviceRoutingManager.swift | 16 ++++++++++------ 1 file changed, 10 insertions(+), 6 deletions(-) diff --git a/OpenOats/Sources/OpenOats/Transcription/DeviceRoutingManager.swift b/OpenOats/Sources/OpenOats/Transcription/DeviceRoutingManager.swift index d1ad2807..919446e9 100644 --- a/OpenOats/Sources/OpenOats/Transcription/DeviceRoutingManager.swift +++ b/OpenOats/Sources/OpenOats/Transcription/DeviceRoutingManager.swift @@ -8,10 +8,10 @@ import os @MainActor final class DeviceRoutingManager { /// Called when a mic restart is requested with the target device ID. - var onMicRestartRequested: (@Sendable (AudioDeviceID) -> Void)? + var onMicRestartRequested: (@Sendable (AudioDeviceID) async -> Void)? /// Called when a system audio restart is requested. - var onSystemRestartRequested: (@Sendable () -> Void)? + var onSystemRestartRequested: (@Sendable () async -> Void)? /// Tracks whether user selected "System Default" (0) or a specific device. private var userSelectedDeviceID: AudioDeviceID = 0 @@ -105,9 +105,13 @@ final class DeviceRoutingManager { deinit { // Ensure cleanup happens even if stopListening wasn't called explicitly + // Dispatch to MainActor since these methods are MainActor-isolated if isListening { - removeDefaultDeviceListener() - removeDefaultOutputDeviceListener() + Task { @MainActor [weak self] in + guard let self else { return } + self.removeDefaultDeviceListener() + self.removeDefaultOutputDeviceListener() + } } } @@ -282,7 +286,7 @@ final class DeviceRoutingManager { // Notify the engine to perform the actual restart // Note: State is updated only after successful restart via updateCurrentDeviceID - onMicRestartRequested?(targetMicID) + await onMicRestartRequested?(targetMicID) } private func performSystemAudioRestart() async { @@ -291,7 +295,7 @@ final class DeviceRoutingManager { Log.transcription.info("restarting system audio stream") // Notify the engine to perform the actual restart - onSystemRestartRequested?() + await onSystemRestartRequested?() } // MARK: - Device Resolution From 65b666de36e59ca9429176f8b6950a6fe0a85343 Mon Sep 17 00:00:00 2001 From: Szymon Sypniewicz Date: Sun, 29 Mar 2026 19:12:07 +0200 Subject: [PATCH 05/10] refactor: add bounded buffering and error propagation to coordinator Add bounded buffering to AsyncStream to prevent unbounded memory growth when transcription/diarization falls behind. Add proper error propagation with user-facing error messages. - Use .bufferingNewest(1) for all AsyncStream.makeStream() calls - Add TranscriptionStreamError enum with localized error descriptions - Change startMicStream/startSystemStream to return Result type - Provide user-friendly error messages for common failure modes Fixes unbounded buffering (lines 153, 316) and error propagation issues --- .../TranscriptionStreamCoordinator.swift | 50 +++++++++++++++---- 1 file changed, 39 insertions(+), 11 deletions(-) diff --git a/OpenOats/Sources/OpenOats/Transcription/TranscriptionStreamCoordinator.swift b/OpenOats/Sources/OpenOats/Transcription/TranscriptionStreamCoordinator.swift index 01f17a4c..2f393db4 100644 --- a/OpenOats/Sources/OpenOats/Transcription/TranscriptionStreamCoordinator.swift +++ b/OpenOats/Sources/OpenOats/Transcription/TranscriptionStreamCoordinator.swift @@ -3,6 +3,27 @@ import FluidAudio import Foundation import os +/// Errors that can occur during stream coordination with user-facing messages. +enum TranscriptionStreamError: LocalizedError { + case microphonePermissionDenied + case microphoneUnavailable + case systemAudioCaptureFailed(Error) + case transcriberCreationFailed(Speaker) + + var errorDescription: String? { + switch self { + case .microphonePermissionDenied: + return "Microphone access was denied. Enable it in System Settings > Privacy & Security > Microphone." + case .microphoneUnavailable: + return "The selected microphone is no longer available." + case .systemAudioCaptureFailed: + return "System audio capture failed to start." + case .transcriberCreationFailed(let speaker): + return "Failed to create the \(speaker == .you ? "microphone" : "system audio") transcriber. Try restarting." + } + } +} + /// Coordinates audio stream capture, transcribers, and optional recording. /// Manages the lifecycle of mic and system audio transcription tasks. @MainActor @@ -41,7 +62,7 @@ final class TranscriptionStreamCoordinator { private var isMicRunning = false /// Start the mic audio stream and transcription. - /// - Returns: The transcription task, or nil if setup failed + /// - Returns: The transcription task on success, or an error on failure @discardableResult func startMicStream( locale: Locale, @@ -51,12 +72,19 @@ final class TranscriptionStreamCoordinator { transcriptStore: TranscriptStore, flushInterval: Int, useAEC: Bool - ) -> Task? { + ) -> Result, TranscriptionStreamError> { // Store state for potential restarts isMicRunning = true var micStream = micCapture.bufferStream(deviceID: deviceID, echoCancellation: useAEC) + // Check for immediate mic capture failure + if let micError = micCapture.captureError { + Log.transcription.error("Mic capture setup error: \(micError)") + isMicRunning = false + return .failure(.microphoneUnavailable) + } + // Add recording tap if recorder is set if let recorder = audioRecorder { micStream = Self.tappedStream(micStream) { buffer in @@ -74,7 +102,7 @@ final class TranscriptionStreamCoordinator { ) else { Log.transcription.error("Failed to create mic transcriber") isMicRunning = false - return nil + return .failure(.transcriberCreationFailed(.you)) } micTask = Task { [weak self] in @@ -92,7 +120,7 @@ final class TranscriptionStreamCoordinator { } } - return micTask + return .success(micTask!) } /// Mark mic as stopped - MainActor-isolated to prevent concurrency violations @@ -123,7 +151,7 @@ final class TranscriptionStreamCoordinator { } /// Start the system audio stream and transcription. - /// - Returns: The transcription task, or nil if setup failed + /// - Returns: The transcription task on success, or an error on failure @discardableResult func startSystemStream( locale: Locale, @@ -132,7 +160,7 @@ final class TranscriptionStreamCoordinator { diarizationManager: DiarizationManager?, transcriptStore: TranscriptStore, flushInterval: Int - ) async -> Task? { + ) async -> Result, TranscriptionStreamError> { Log.transcription.info("starting system audio capture") let sysStreams: SystemAudioCapture.CaptureStreams @@ -141,7 +169,7 @@ final class TranscriptionStreamCoordinator { Log.transcription.info("system audio capture started") } catch { Log.transcription.error("Failed to start system audio: \(error.localizedDescription, privacy: .public)") - return nil + return .failure(.systemAudioCaptureFailed(error)) } var sysStream = sysStreams.systemAudio @@ -153,7 +181,7 @@ final class TranscriptionStreamCoordinator { if let dm = diarizationManager { let diarFlushSize = 16000 let originalSysStream = sysStream - let (diarTapped, diarContinuation) = AsyncStream.makeStream() + let (diarTapped, diarContinuation) = AsyncStream.makeStream(bufferingPolicy: .bufferingNewest(1)) diarizationTask?.cancel() diarizationTask = Task { [weak self] in @@ -202,14 +230,14 @@ final class TranscriptionStreamCoordinator { audioTime: diarizationManager != nil ? sysAudioTime : nil ) else { Log.transcription.error("Failed to create system audio transcriber") - return nil + return .failure(.transcriberCreationFailed(.them)) } sysTask = Task { await sysTranscriber.run(stream: sysStream) } - return sysTask + return .success(sysTask!) } /// Stop the system audio stream and transcription. @@ -332,7 +360,7 @@ final class TranscriptionStreamCoordinator { ) -> AsyncStream { struct Box: @unchecked Sendable { let stream: AsyncStream } let box = Box(stream: stream) - let (output, continuation) = AsyncStream.makeStream() + let (output, continuation) = AsyncStream.makeStream(bufferingPolicy: .bufferingNewest(1)) Task { for await buffer in box.stream { tap(buffer) From 5f2dd8e3152a882589badffd742776a41f811768 Mon Sep 17 00:00:00 2001 From: Szymon Sypniewicz Date: Sun, 29 Mar 2026 19:12:12 +0200 Subject: [PATCH 06/10] fix: recompute model availability at start of loadModel() Fix stale state dependency where loadModel() trusted the cached needsDownload flag instead of recomputing from the model argument. If the caller changed models between calls, the manager misreported state. - Recompute availability from model parameter at top of loadModel() - Rename local variable to avoid shadowing property Fixes stale state dependency (lines 49, 77) --- .../OpenOats/Transcription/ModelDownloadManager.swift | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/OpenOats/Sources/OpenOats/Transcription/ModelDownloadManager.swift b/OpenOats/Sources/OpenOats/Transcription/ModelDownloadManager.swift index 0d0fc8f3..73289154 100644 --- a/OpenOats/Sources/OpenOats/Transcription/ModelDownloadManager.swift +++ b/OpenOats/Sources/OpenOats/Transcription/ModelDownloadManager.swift @@ -64,6 +64,10 @@ final class ModelDownloadManager { throw TranscriptionBackendError.notPrepared } + // Recompute availability based on the model parameter, not cached flag + let modelNeedsDownload = Self.modelNeedsDownload(model) + self.needsDownload = modelNeedsDownload + isLoading = true defer { isLoading = false @@ -74,7 +78,7 @@ final class ModelDownloadManager { downloadTotalBytes = nil } - let isDownloading = needsDownload + let isDownloading = modelNeedsDownload if isDownloading { downloadProgress = 0 From e90e685436fa7f05afa84331277237ef78272692 Mon Sep 17 00:00:00 2001 From: Szymon Sypniewicz Date: Sun, 29 Mar 2026 19:12:17 +0200 Subject: [PATCH 07/10] cleanup: remove duplicate DownloadProgressDetail definition Remove the duplicate DownloadProgressDetail struct from TranscriptionEngine. The type is now defined in ModelDownloadManager and shared across the module. Fixes invalid redeclaration compilation error. --- .../OpenOats/Transcription/TranscriptionEngine.swift | 11 ----------- 1 file changed, 11 deletions(-) diff --git a/OpenOats/Sources/OpenOats/Transcription/TranscriptionEngine.swift b/OpenOats/Sources/OpenOats/Transcription/TranscriptionEngine.swift index e48da036..3a373d77 100644 --- a/OpenOats/Sources/OpenOats/Transcription/TranscriptionEngine.swift +++ b/OpenOats/Sources/OpenOats/Transcription/TranscriptionEngine.swift @@ -15,17 +15,6 @@ enum TranscriptionEngineError: LocalizedError { } } -/// Enriched download progress info computed from fraction changes over time. -struct DownloadProgressDetail: Sendable { - let fraction: Double - /// Formatted string like "142 MB / 800 MB" - let sizeText: String? - /// Formatted string like "3.5 MB/s" - let speedText: String? - /// Formatted string like "2m 15s remaining" - let etaText: String? -} - /// Orchestrates dual StreamingTranscriber instances for mic (you) and system audio (them). @Observable @MainActor From 13a50b2071236259857cad30bcb73f904d71f457 Mon Sep 17 00:00:00 2001 From: Szymon Sypniewicz Date: Sun, 29 Mar 2026 22:43:15 +0200 Subject: [PATCH 08/10] fix: Swift 6 concurrency safety in TranscriptionStreamCoordinator - Remove nonisolated(unsafe) from diarization task (capture dm properly) - Remove @unchecked Sendable Box struct from tappedStream - Make MicCapture and SystemAudioCapture injectable via init All unsafe concurrency patterns have been removed. --- .../TranscriptionStreamCoordinator.swift | 26 +++++++++---------- 1 file changed, 13 insertions(+), 13 deletions(-) diff --git a/OpenOats/Sources/OpenOats/Transcription/TranscriptionStreamCoordinator.swift b/OpenOats/Sources/OpenOats/Transcription/TranscriptionStreamCoordinator.swift index 2f393db4..eacb8826 100644 --- a/OpenOats/Sources/OpenOats/Transcription/TranscriptionStreamCoordinator.swift +++ b/OpenOats/Sources/OpenOats/Transcription/TranscriptionStreamCoordinator.swift @@ -28,8 +28,13 @@ enum TranscriptionStreamError: LocalizedError { /// Manages the lifecycle of mic and system audio transcription tasks. @MainActor final class TranscriptionStreamCoordinator { - private let micCapture = MicCapture() - private let systemCapture = SystemAudioCapture() + private let micCapture: MicCapture + private let systemCapture: SystemAudioCapture + + init(micCapture: MicCapture = MicCapture(), systemCapture: SystemAudioCapture = SystemAudioCapture()) { + self.micCapture = micCapture + self.systemCapture = systemCapture + } /// Audio recorder for tapping streams (set externally when recording is enabled). weak var audioRecorder: AudioRecorder? @@ -184,14 +189,12 @@ final class TranscriptionStreamCoordinator { let (diarTapped, diarContinuation) = AsyncStream.makeStream(bufferingPolicy: .bufferingNewest(1)) diarizationTask?.cancel() - diarizationTask = Task { [weak self] in - nonisolated(unsafe) let safeDm = dm + diarizationTask = Task { [weak self, dm] in var diarBuf: [Float] = [] for await buffer in originalSysStream { // Check for cancellation guard !Task.isCancelled else { break } - nonisolated(unsafe) let b = buffer - diarContinuation.yield(b) + diarContinuation.yield(buffer) guard let channelData = buffer.floatChannelData else { continue } let frameCount = Int(buffer.frameLength) sysAudioTime.add(Double(frameCount) / buffer.format.sampleRate) @@ -199,12 +202,12 @@ final class TranscriptionStreamCoordinator { if diarBuf.count >= diarFlushSize { let batch = diarBuf diarBuf.removeAll(keepingCapacity: true) - try? await safeDm.feedAudio(batch) + try? await dm.feedAudio(batch) } } // Flush tail if !Task.isCancelled, !diarBuf.isEmpty { - try? await safeDm.feedAudio(diarBuf) + try? await dm.feedAudio(diarBuf) } diarContinuation.finish() self?.diarizationTask = nil @@ -358,14 +361,11 @@ final class TranscriptionStreamCoordinator { _ stream: AsyncStream, tap: @escaping @Sendable (AVAudioPCMBuffer) -> Void ) -> AsyncStream { - struct Box: @unchecked Sendable { let stream: AsyncStream } - let box = Box(stream: stream) let (output, continuation) = AsyncStream.makeStream(bufferingPolicy: .bufferingNewest(1)) Task { - for await buffer in box.stream { + for await buffer in stream { tap(buffer) - nonisolated(unsafe) let b = buffer - continuation.yield(b) + continuation.yield(buffer) } continuation.finish() } From fc3478ea9d14f2dc10c3843d98be048b9f1cb178 Mon Sep 17 00:00:00 2001 From: Szymon Sypniewicz Date: Sun, 29 Mar 2026 22:48:54 +0200 Subject: [PATCH 09/10] refactor: integrate new managers into TranscriptionEngine - TranscriptionEngine reduced from 949 to 551 lines (42% reduction) - Delegates to ModelDownloadManager for model lifecycle - Delegates to DeviceRoutingManager for device listeners/restarts - Delegates to TranscriptionStreamCoordinator for stream orchestration - Public API preserved for SwiftUI compatibility --- .../Transcription/TranscriptionEngine.swift | 780 +++++------------- 1 file changed, 191 insertions(+), 589 deletions(-) diff --git a/OpenOats/Sources/OpenOats/Transcription/TranscriptionEngine.swift b/OpenOats/Sources/OpenOats/Transcription/TranscriptionEngine.swift index 3a373d77..68f7fa79 100644 --- a/OpenOats/Sources/OpenOats/Transcription/TranscriptionEngine.swift +++ b/OpenOats/Sources/OpenOats/Transcription/TranscriptionEngine.swift @@ -76,22 +76,20 @@ final class TranscriptionEngine { set { withMutation(keyPath: \.downloadDetail) { _downloadDetail = newValue } } } - // Progress tracking state (not observed) - @ObservationIgnored private var downloadStartTime: Date? - @ObservationIgnored private var downloadTotalBytes: Int64? - - private let systemCapture = SystemAudioCapture() - private let micCapture = MicCapture() private let transcriptStore: TranscriptStore private let settings: AppSettings private let mode: Mode + private let modelDownloadManager: ModelDownloadManager + private let deviceRoutingManager: DeviceRoutingManager + private let streamCoordinator: TranscriptionStreamCoordinator + /// Combined audio level (mic + system) for the UI meter. /// nonisolated is safe here — both audioLevel properties are thread-safe (NSLock). nonisolated var audioLevel: Float { switch mode { case .live: - max(micCapture.audioLevel, systemCapture.audioLevel) + streamCoordinator.audioLevel case .scripted: _isRunning ? 0.35 : 0 } @@ -100,8 +98,8 @@ final class TranscriptionEngine { /// Mute/unmute the microphone. When muted, mic audio is not transcribed /// and the audio level reads as 0. System audio continues normally. nonisolated var isMicMuted: Bool { - get { micCapture.isMuted } - set { micCapture.isMuted = newValue } + get { streamCoordinator.isMicMuted } + set { streamCoordinator.isMicMuted = newValue } } private var micTask: Task? @@ -118,33 +116,35 @@ final class TranscriptionEngine { private var vadManager: VadManager? /// Audio recorder for tapping streams (set by ContentView when recording is enabled). - var audioRecorder: AudioRecorder? + var audioRecorder: AudioRecorder? { + didSet { + streamCoordinator.audioRecorder = audioRecorder + } + } /// Speaker diarization manager for system audio (nil when diarization is disabled). private var diarizationManager: DiarizationManager? - /// Tracks the resolved mic device ID currently in use. - private var currentMicDeviceID: AudioDeviceID = 0 - - /// Tracks whether user selected "System Default" (0) or a specific device. - private var userSelectedDeviceID: AudioDeviceID = 0 - - /// Listens for default input device changes at the OS level. - private var defaultDeviceListenerBlock: AudioObjectPropertyListenerBlock? - /// Listens for default output device changes at the OS level. - private var defaultOutputDeviceListenerBlock: AudioObjectPropertyListenerBlock? - private var micRestartTask: Task? - private var sysRestartTask: Task? - private var pendingMicDeviceID: AudioDeviceID? - private var pendingSystemAudioRestart = false - init(transcriptStore: TranscriptStore, settings: AppSettings, mode: Mode = .live) { self.transcriptStore = transcriptStore self.settings = settings self.mode = mode + + self.modelDownloadManager = ModelDownloadManager(settings: settings) + self.deviceRoutingManager = DeviceRoutingManager() + self.streamCoordinator = TranscriptionStreamCoordinator() + + // Wire up callbacks + self.deviceRoutingManager.onMicRestartRequested = { [weak self] deviceID in + await self?.performMicRestart(deviceID: deviceID) + } + self.deviceRoutingManager.onSystemRestartRequested = { [weak self] in + await self?.performSystemAudioRestart() + } + switch mode { case .live: - self.needsModelDownload = Self.modelNeedsDownload(settings.transcriptionModel) + self.needsModelDownload = modelDownloadManager.checkAvailability(for: settings.transcriptionModel) case .scripted: self.needsModelDownload = false } @@ -153,7 +153,7 @@ final class TranscriptionEngine { func refreshModelAvailability() { switch mode { case .live: - needsModelDownload = Self.modelNeedsDownload(settings.transcriptionModel) + needsModelDownload = modelDownloadManager.checkAvailability(for: settings.transcriptionModel) case .scripted: needsModelDownload = false } @@ -197,160 +197,133 @@ final class TranscriptionEngine { isRunning = true - // 1. Load transcription models via backend protocol - let isDownloading = needsModelDownload - assetStatus = isDownloading - ? "Downloading \(transcriptionModel.displayName)..." - : "Loading \(transcriptionModel.displayName)..." - if isDownloading { - downloadProgress = 0 - downloadStartTime = Date() - downloadTotalBytes = transcriptionModel.estimatedDownloadBytes - downloadDetail = DownloadProgressDetail(fraction: 0, sizeText: nil, speedText: nil, etaText: nil) - } - Log.transcription.info("Loading transcription model \(transcriptionModel.rawValue, privacy: .public)") + // 1. Load transcription models via manager + assetStatus = "Loading transcription model..." + let backends: LoadedBackends do { - let vocab = settings.transcriptionCustomVocabulary - let mic = transcriptionModel.makeBackend(customVocabulary: vocab) - try await mic.prepare( - onStatus: { [weak self] status in - Task { @MainActor in - self?.assetStatus = status - } - }, - onProgress: { [weak self] fraction in - Task { @MainActor in - self?.downloadProgress = fraction - self?.updateDownloadDetail(fraction: fraction) - } - } + backends = try await modelDownloadManager.loadModel( + settings.transcriptionModel, + customVocabulary: settings.transcriptionCustomVocabulary ) - self.micBackend = mic - - // Parakeet needs a separate backend for system audio (mutable decoder state). - // Qwen3 is actor-based and thread-safe, so reuse the same instance. - if transcriptionModel == .qwen3ASR06B { - self.systemBackend = mic - } else { - let sys = transcriptionModel.makeBackend(customVocabulary: vocab) - try await sys.prepare { _ in } - self.systemBackend = sys - } + self.micBackend = backends.mic + self.systemBackend = backends.system + } catch { + lastError = "Failed to load models: \(error.localizedDescription)" + assetStatus = "Ready" + isRunning = false + modelDownloadManager.clearCache(for: settings.transcriptionModel) + return + } + + // Sync observable properties from manager + self.needsModelDownload = modelDownloadManager.needsDownload + self.downloadProgress = modelDownloadManager.downloadProgress + self.downloadDetail = modelDownloadManager.downloadDetail - assetStatus = "Loading VAD model..." - Log.transcription.info("Loading VAD model") + // Load VAD model + assetStatus = "Loading VAD model..." + Log.transcription.info("Loading VAD model") + do { let vad = try await VadManager() self.vadManager = vad + } catch { + lastError = "Failed to load VAD model: \(error.localizedDescription)" + assetStatus = "Ready" + isRunning = false + return + } - // Optionally load speaker diarization model - if settings.enableDiarization { - assetStatus = "Loading diarization model..." - Log.transcription.info("Loading LS-EEND diarization model") - let dm = DiarizationManager() - let variant = LSEENDVariant(rawValue: settings.diarizationVariant.rawValue) ?? .dihard3 + // Optionally load speaker diarization model + if settings.enableDiarization { + assetStatus = "Loading diarization model..." + Log.transcription.info("Loading LS-EEND diarization model") + let dm = DiarizationManager() + let variant = LSEENDVariant(rawValue: settings.diarizationVariant.rawValue) ?? .dihard3 + do { try await dm.load(variant: variant) self.diarizationManager = dm Log.transcription.info("Diarization model loaded") - } else { + } catch { + Log.transcription.error("Failed to load diarization model: \(error, privacy: .public)") + // Non-fatal: continue without diarization self.diarizationManager = nil } - - needsModelDownload = false - downloadConfirmed = false - downloadProgress = nil - downloadDetail = nil - downloadStartTime = nil - downloadTotalBytes = nil - assetStatus = "Models ready" - Log.transcription.info("Transcription model loaded") - } catch { - let msg = "Failed to load models: \(error.localizedDescription)" - Log.transcription.error("Failed to load models: \(msg, privacy: .public)") - lastError = msg - assetStatus = "Ready" - isRunning = false - downloadProgress = nil - downloadDetail = nil - downloadStartTime = nil - downloadTotalBytes = nil - // Clear corrupt cache so the next attempt triggers a fresh download - settings.transcriptionModel.makeBackend().clearModelCache() - Log.transcription.info("Cleared model cache for \(self.settings.transcriptionModel.rawValue, privacy: .public)") - needsModelDownload = true - downloadConfirmed = false - return + } else { + self.diarizationManager = nil } + assetStatus = "Models ready" + Log.transcription.info("Transcription model loaded") + guard let vadManager else { return } - // 2. Start mic capture - userSelectedDeviceID = inputDeviceID - guard let targetMicID = resolvedMicDeviceID(for: inputDeviceID) else { - let msg = unavailableMicMessage(for: inputDeviceID) + // 2. Resolve mic device and start listening for device changes + guard let targetMicID = deviceRoutingManager.resolvedMicDeviceID(for: inputDeviceID) else { + let msg = deviceRoutingManager.unavailableMicMessage(for: inputDeviceID) Log.transcription.error("Mic unavailable: \(msg, privacy: .public)") lastError = msg assetStatus = "Ready" isRunning = false return } - currentMicDeviceID = targetMicID - // AEC (voice processing) conflicts with system audio capture on macOS — - // both cause CoreAudio aggregate-device reconfiguration that can stall the - // mic stream. Since system audio capture is always active during recording, - // AEC must be disabled to prevent capture failures. + + deviceRoutingManager.startListening(isUsingDefaultDevice: inputDeviceID == 0) + deviceRoutingManager.updateCurrentDeviceID(targetMicID, isUserSelection: true) + + // AEC (voice processing) conflicts with system audio capture on macOS let useAEC = false if settings.enableEchoCancellation { Log.transcription.info("AEC disabled - conflicts with system audio capture") } + // 3. Start mic stream via coordinator Log.transcription.info("Starting mic capture, targetMicID=\(targetMicID, privacy: .public), aec=\(useAEC, privacy: .public)") - startMicStream( + let micResult = streamCoordinator.startMicStream( locale: locale, - vadManager: vadManager, deviceID: targetMicID, - echoCancellation: useAEC + backend: micBackend!, + vadManager: vadManager, + transcriptStore: transcriptStore, + flushInterval: settings.transcriptionModel.flushIntervalSamples, + useAEC: useAEC ) - - // Check for immediate mic capture failure - if let micError = micCapture.captureError { - Log.transcription.error("Mic capture error: \(micError, privacy: .public)") - lastError = micError + switch micResult { + case .success(let task): + self.micTask = task + case .failure(let error): + lastError = error.localizedDescription + isRunning = false + return } - // Health check: if mic produces no audio within 5 seconds, retry once - // without AEC before surfacing the error. - Task { @MainActor [weak self] in - try? await Task.sleep(for: .seconds(5)) - guard let self, self.isRunning else { return } - if !self.micCapture.hasCapturedFrames && self.micCapture.captureError == nil { - if useAEC { - Log.transcription.error("No mic audio after 5s with AEC, retrying without") - self.micCapture.finishStream() - await self.micTask?.value - self.micTask = nil - self.micCapture.stop() - self.startMicStream( - locale: locale, - vadManager: vadManager, - deviceID: targetMicID, - echoCancellation: false - ) - } else { - Log.transcription.error("No mic audio after 5s") - self.lastError = "Microphone is not producing audio. Check your input device in System Settings." - } - } + // 4. Start system audio stream via coordinator + let sysResult = await streamCoordinator.startSystemStream( + locale: locale, + backend: systemBackend!, + vadManager: vadManager, + diarizationManager: diarizationManager, + transcriptStore: transcriptStore, + flushInterval: settings.transcriptionModel.flushIntervalSamples + ) + switch sysResult { + case .success(let task): + self.sysTask = task + case .failure(let error): + lastError = error.localizedDescription } - // 3. Start system audio capture - await startSystemAudioStream(locale: locale, vadManager: vadManager) - assetStatus = "Transcribing (\(micBackend?.displayName ?? transcriptionModel.displayName))" Log.transcription.info("All transcription tasks started") - // Install CoreAudio listeners for live device routing changes - installDefaultDeviceListener() - installDefaultOutputDeviceListener() + // Store state for restarts + deviceRoutingManager.storeRestartState( + locale: locale, + vadManager: vadManager, + micBackend: micBackend!, + systemBackend: systemBackend!, + flushInterval: settings.transcriptionModel.flushIntervalSamples, + transcriptStore: transcriptStore + ) } /// Restart only the mic capture with a new device, keeping system audio and models intact. @@ -358,108 +331,7 @@ final class TranscriptionEngine { func restartMic(inputDeviceID: AudioDeviceID) { if case .scripted = mode { return } guard isRunning else { return } - pendingMicDeviceID = inputDeviceID - - if micRestartTask != nil { - Log.transcription.info("Queued mic restart for device \(inputDeviceID, privacy: .public)") - return - } - - micRestartTask = Task { @MainActor [weak self] in - guard let self else { return } - defer { self.micRestartTask = nil } - - while self.isRunning, let requestedDeviceID = self.pendingMicDeviceID { - self.pendingMicDeviceID = nil - await self.performMicRestart(inputDeviceID: requestedDeviceID) - } - } - } - - // MARK: - Default Device Listener - - private func installDefaultDeviceListener() { - guard defaultDeviceListenerBlock == nil else { return } - - var address = AudioObjectPropertyAddress( - mSelector: kAudioHardwarePropertyDefaultInputDevice, - mScope: kAudioObjectPropertyScopeGlobal, - mElement: kAudioObjectPropertyElementMain - ) - - let block: AudioObjectPropertyListenerBlock = { [weak self] _, _ in - guard let self else { return } - Task { @MainActor in - guard self.isRunning, self.userSelectedDeviceID == 0 else { return } - self.restartMic(inputDeviceID: 0) - } - } - defaultDeviceListenerBlock = block - - AudioObjectAddPropertyListenerBlock( - AudioObjectID(kAudioObjectSystemObject), - &address, - DispatchQueue.main, - block - ) - } - - private func removeDefaultDeviceListener() { - guard let block = defaultDeviceListenerBlock else { return } - var address = AudioObjectPropertyAddress( - mSelector: kAudioHardwarePropertyDefaultInputDevice, - mScope: kAudioObjectPropertyScopeGlobal, - mElement: kAudioObjectPropertyElementMain - ) - AudioObjectRemovePropertyListenerBlock( - AudioObjectID(kAudioObjectSystemObject), - &address, - DispatchQueue.main, - block - ) - defaultDeviceListenerBlock = nil - } - - private func installDefaultOutputDeviceListener() { - guard defaultOutputDeviceListenerBlock == nil else { return } - - var address = AudioObjectPropertyAddress( - mSelector: kAudioHardwarePropertyDefaultOutputDevice, - mScope: kAudioObjectPropertyScopeGlobal, - mElement: kAudioObjectPropertyElementMain - ) - - let block: AudioObjectPropertyListenerBlock = { [weak self] _, _ in - guard let self else { return } - Task { @MainActor in - guard self.isRunning else { return } - self.restartSystemAudio() - } - } - defaultOutputDeviceListenerBlock = block - - AudioObjectAddPropertyListenerBlock( - AudioObjectID(kAudioObjectSystemObject), - &address, - DispatchQueue.main, - block - ) - } - - private func removeDefaultOutputDeviceListener() { - guard let block = defaultOutputDeviceListenerBlock else { return } - var address = AudioObjectPropertyAddress( - mSelector: kAudioHardwarePropertyDefaultOutputDevice, - mScope: kAudioObjectPropertyScopeGlobal, - mElement: kAudioObjectPropertyElementMain - ) - AudioObjectRemovePropertyListenerBlock( - AudioObjectID(kAudioObjectSystemObject), - &address, - DispatchQueue.main, - block - ) - defaultOutputDeviceListenerBlock = nil + deviceRoutingManager.requestMicRestart(deviceID: inputDeviceID) } private func ensureMicrophonePermission() async -> Bool { @@ -485,6 +357,8 @@ final class TranscriptionEngine { } func finalize() async { + Log.transcription.info("finalize() called") + if case .scripted = mode { isRunning = false assetStatus = "Ready" @@ -493,30 +367,15 @@ final class TranscriptionEngine { return } - removeDefaultDeviceListener() - removeDefaultOutputDeviceListener() - micRestartTask?.cancel() - sysRestartTask?.cancel() - micRestartTask = nil - sysRestartTask = nil - pendingMicDeviceID = nil - pendingSystemAudioRestart = false - micKeepAliveTask?.cancel() - - micCapture.finishStream() - systemCapture.finishStream() + isRunning = false + assetStatus = "Finalizing..." - await micTask?.value - await sysTask?.value + // Stop listening for device changes + deviceRoutingManager.stopListening() - micCapture.stop() - await systemCapture.stop() + // Finalize streams via coordinator + await streamCoordinator.finalize() - micTask = nil - sysTask = nil - pendingMicDeviceID = nil - micKeepAliveTask = nil - currentMicDeviceID = 0 // Finalize and release diarization manager if let dm = diarizationManager { await dm.finalize() @@ -525,13 +384,18 @@ final class TranscriptionEngine { micBackend = nil systemBackend = nil + micTask = nil + sysTask = nil transcriptStore.volatileYouText = "" transcriptStore.volatileThemText = "" - isRunning = false + assetStatus = "Ready" + Log.transcription.info("finalize() completed") } func stop() { + Log.transcription.info("stop() called") + if case .scripted = mode { isRunning = false assetStatus = "Ready" @@ -540,87 +404,73 @@ final class TranscriptionEngine { return } - removeDefaultDeviceListener() - removeDefaultOutputDeviceListener() - micRestartTask?.cancel() - sysRestartTask?.cancel() - micRestartTask = nil - sysRestartTask = nil - pendingMicDeviceID = nil - pendingSystemAudioRestart = false - micTask?.cancel() - sysTask?.cancel() + isRunning = false + assetStatus = "Ready" + micKeepAliveTask?.cancel() + micKeepAliveTask = nil + + // Stop device listeners + deviceRoutingManager.stopListening() + + // Stop streams via coordinator + streamCoordinator.stopAll() + micTask = nil sysTask = nil - micKeepAliveTask = nil - Task { await systemCapture.stop() } - micCapture.stop() - currentMicDeviceID = 0 micBackend = nil systemBackend = nil + diarizationManager = nil transcriptStore.volatileYouText = "" transcriptStore.volatileThemText = "" - isRunning = false - assetStatus = "Ready" + + Log.transcription.info("stop() completed") } - private func performMicRestart(inputDeviceID: AudioDeviceID) async { - guard isRunning, let vadManager else { return } + // MARK: - Restart Implementation (Called by DeviceRoutingManager) - userSelectedDeviceID = inputDeviceID + private func performMicRestart(deviceID: AudioDeviceID) async { + guard isRunning, let vadManager else { return } - guard let targetMicID = resolvedMicDeviceID(for: inputDeviceID) else { - let msg = unavailableMicMessage(for: inputDeviceID) - Log.transcription.error("Mic swap failed: \(msg, privacy: .public)") - lastError = msg + guard let targetMicID = deviceRoutingManager.resolvedMicDeviceID(for: deviceID) else { + Log.transcription.error("Mic swap failed: device unavailable") return } - guard targetMicID != currentMicDeviceID else { + guard targetMicID != deviceRoutingManager.currentMicID else { Log.transcription.debug("Mic swap skipped, same device \(targetMicID, privacy: .public)") return } - Log.transcription.info("Switching mic from \(self.currentMicDeviceID, privacy: .public) to \(targetMicID, privacy: .public)") + Log.transcription.info("Switching mic to \(targetMicID, privacy: .public)") - micCapture.finishStream() - await micTask?.value + // Stop current mic stream + streamCoordinator.stopMicStream() + micTask = nil if Task.isCancelled || !isRunning { return } - micTask = nil - micCapture.stop() - startMicStream( - locale: settings.locale, + // Start new mic stream + guard let micBackend else { return } + let micResult = streamCoordinator.startMicStream( + locale: deviceRoutingManager.currentLocale ?? settings.locale, + deviceID: targetMicID, + backend: micBackend, vadManager: vadManager, - deviceID: targetMicID + transcriptStore: transcriptStore, + flushInterval: deviceRoutingManager.currentFlushInterval ?? settings.transcriptionModel.flushIntervalSamples, + useAEC: false ) - currentMicDeviceID = targetMicID - lastError = nil - - Log.transcription.info("Mic restarted on device \(targetMicID, privacy: .public)") - } - - private func restartSystemAudio() { - guard isRunning else { return } - pendingSystemAudioRestart = true - - if sysRestartTask != nil { - Log.transcription.info("Queued system audio restart") - return - } - - sysRestartTask = Task { @MainActor [weak self] in - guard let self else { return } - defer { self.sysRestartTask = nil } - - while self.isRunning, self.pendingSystemAudioRestart { - self.pendingSystemAudioRestart = false - await self.performSystemAudioRestart() - } + switch micResult { + case .success(let task): + self.micTask = task + deviceRoutingManager.updateCurrentDeviceID(targetMicID, isUserSelection: false) + lastError = nil + Log.transcription.info("Mic restarted on device \(targetMicID, privacy: .public)") + case .failure(let error): + lastError = error.localizedDescription } } @@ -629,214 +479,34 @@ final class TranscriptionEngine { Log.transcription.info("Restarting system audio stream") - systemCapture.finishStream() - await sysTask?.value - - if Task.isCancelled || !isRunning { - return - } - + // Stop current system stream + await streamCoordinator.finalizeSystemStream() sysTask = nil - await systemCapture.stop() - await startSystemAudioStream(locale: settings.locale, vadManager: vadManager) - - Log.transcription.info("System audio stream restarted") - } - - private func startMicStream( - locale: Locale, - vadManager: VadManager, - deviceID: AudioDeviceID, - echoCancellation: Bool = false - ) { - var micStream = micCapture.bufferStream(deviceID: deviceID, echoCancellation: echoCancellation) - if let recorder = audioRecorder { - micStream = Self.tappedStream(micStream) { buffer in - recorder.writeMicBuffer(buffer) - } - } - let store = transcriptStore - guard let micTranscriber = makeTranscriber( - locale: locale, - speaker: .you, - vadManager: vadManager, - onPartial: { text in - Task { @MainActor in store.volatileYouText = text } - }, - onFinal: { text in - Task { @MainActor in - store.volatileYouText = "" - store.append(Utterance(text: text, speaker: .you)) - } - } - ) else { - lastError = "Failed to create transcriber. Try restarting." - isRunning = false - assetStatus = "Ready" - return - } - micTask = Task.detached { - await micTranscriber.run(stream: micStream) - } - } - private func startSystemAudioStream( - locale: Locale, - vadManager: VadManager - ) async { - Log.transcription.info("Starting system audio capture") - - let sysStreams: SystemAudioCapture.CaptureStreams - do { - sysStreams = try await systemCapture.bufferStream() - Log.transcription.info("System audio capture started") - clearSystemAudioErrorIfPresent() - } catch { - let msg = "Failed to start system audio: \(error.localizedDescription)" - Log.transcription.error("Failed to start system audio: \(msg, privacy: .public)") - lastError = msg - return - } - - var sysStream = sysStreams.systemAudio - if let recorder = audioRecorder { - sysStream = Self.tappedStream(sysStream) { buffer in - recorder.writeSysBuffer(buffer) - } - } - - // Track cumulative audio time for diarizer speaker attribution - let sysAudioTime = SyncDouble() - - // Tee system audio to diarization manager if enabled - if let dm = diarizationManager { - let diarFlushSize = 16000 - let originalSysStream = sysStream - let (diarTapped, diarContinuation) = AsyncStream.makeStream() - Task { - nonisolated(unsafe) let safeDm = dm - var diarBuf: [Float] = [] - for await buffer in originalSysStream { - nonisolated(unsafe) let b = buffer - diarContinuation.yield(b) - guard let channelData = buffer.floatChannelData else { continue } - let frameCount = Int(buffer.frameLength) - sysAudioTime.add(Double(frameCount) / buffer.format.sampleRate) - diarBuf.append(contentsOf: UnsafeBufferPointer(start: channelData[0], count: frameCount)) - if diarBuf.count >= diarFlushSize { - let batch = diarBuf - diarBuf.removeAll(keepingCapacity: true) - try? await safeDm.feedAudio(batch) - } - } - // Flush tail - if !diarBuf.isEmpty { - try? await safeDm.feedAudio(diarBuf) - } - diarContinuation.finish() - } - sysStream = diarTapped - } - - let store = transcriptStore - guard let sysTranscriber = makeTranscriber( - locale: locale, - speaker: .them, - vadManager: vadManager, - onPartial: { text in - Task { @MainActor in store.volatileThemText = text } - }, - onFinal: { [weak self] text in - Task { @MainActor in - store.volatileThemText = "" - let speaker: Speaker - if let dm = self?.diarizationManager { - // Estimate segment time: each onFinal is ~3-5s of speech - let endTime = sysAudioTime.value - let startTime = max(0, endTime - 5.0) - speaker = await dm.dominantSpeaker(from: startTime, to: endTime) - } else { - speaker = .them - } - store.append(Utterance(text: text, speaker: speaker)) - } - } - ) else { - lastError = "Failed to create the system-audio transcriber. Try restarting." + if Task.isCancelled || !isRunning { return } - sysTask = Task.detached { - await sysTranscriber.run(stream: sysStream) - } - } - - private func makeTranscriber( - locale: Locale, - speaker: Speaker, - vadManager: VadManager, - onPartial: @escaping @Sendable (String) -> Void, - onFinal: @escaping @Sendable (String) -> Void - ) -> StreamingTranscriber? { - let backend = speaker == .you ? micBackend : systemBackend - guard let backend else { - Log.transcription.error("makeTranscriber called without initialized backend for \(speaker.storageKey, privacy: .public)") - return nil - } - return StreamingTranscriber( - backend: backend, - locale: locale, + // Start new system stream + guard let systemBackend else { return } + let sysResult = await streamCoordinator.startSystemStream( + locale: deviceRoutingManager.currentLocale ?? settings.locale, + backend: systemBackend, vadManager: vadManager, - speaker: speaker, - flushInterval: settings.transcriptionModel.flushIntervalSamples, - onPartial: onPartial, - onFinal: onFinal + diarizationManager: diarizationManager, + transcriptStore: transcriptStore, + flushInterval: deviceRoutingManager.currentFlushInterval ?? settings.transcriptionModel.flushIntervalSamples ) - } - - private func resolvedMicDeviceID(for inputDeviceID: AudioDeviceID) -> AudioDeviceID? { - if inputDeviceID > 0 { - let availableDeviceIDs = Set(MicCapture.availableInputDevices().map(\.id)) - return availableDeviceIDs.contains(inputDeviceID) ? inputDeviceID : nil - } - - return MicCapture.defaultInputDeviceID() - } - - private func unavailableMicMessage(for inputDeviceID: AudioDeviceID) -> String { - if inputDeviceID > 0 { - return "The selected microphone is no longer available." - } - - return "No default microphone is currently available." - } - - private static func modelNeedsDownload(_ model: TranscriptionModel) -> Bool { - let backend = model.makeBackend() - if case .needsDownload = backend.checkStatus() { - return true + switch sysResult { + case .success(let task): + self.sysTask = task + Log.transcription.info("System audio stream restarted") + case .failure(let error): + lastError = error.localizedDescription } - return false } - /// Wrap an audio stream to forward each buffer to a synchronous tap before yielding it downstream. - private nonisolated static func tappedStream( - _ stream: AsyncStream, - tap: @escaping @Sendable (AVAudioPCMBuffer) -> Void - ) -> AsyncStream { - struct Box: @unchecked Sendable { let stream: AsyncStream } - let box = Box(stream: stream) - let (output, continuation) = AsyncStream.makeStream() - Task { - for await buffer in box.stream { - tap(buffer) - nonisolated(unsafe) let b = buffer - continuation.yield(b) - } - continuation.finish() - } - return output - } + // MARK: - Utility Methods private func localeMismatchMessage( for locale: Locale, @@ -865,72 +535,4 @@ final class TranscriptionEngine { self.lastError = nil } } - - // MARK: - Download Progress Detail - - private func updateDownloadDetail(fraction: Double) { - guard let startTime = downloadStartTime else { - downloadDetail = DownloadProgressDetail(fraction: fraction, sizeText: nil, speedText: nil, etaText: nil) - return - } - - let elapsed = Date().timeIntervalSince(startTime) - let totalBytes = downloadTotalBytes - - // Size text: "142 MB / 800 MB" (only when total is known) - var sizeText: String? - if let totalBytes { - let downloaded = Int64(fraction * Double(totalBytes)) - sizeText = "\(Self.formatBytes(downloaded)) / \(Self.formatBytes(totalBytes))" - } - - // Speed and ETA need enough elapsed time to be meaningful - var speedText: String? - var etaText: String? - if elapsed > 1, fraction > 0.01 { - // Speed from fraction progress rate + known total - if let totalBytes { - let bytesDownloaded = fraction * Double(totalBytes) - let bytesPerSecond = bytesDownloaded / elapsed - speedText = "\(Self.formatBytes(Int64(bytesPerSecond)))/s" - - let remaining = Double(totalBytes) - bytesDownloaded - if bytesPerSecond > 0 { - let secondsLeft = remaining / bytesPerSecond - etaText = Self.formatDuration(secondsLeft) - } - } else { - // No total bytes known — estimate ETA from fraction rate alone - let fractionPerSecond = fraction / elapsed - if fractionPerSecond > 0 { - let remainingFraction = 1.0 - fraction - let secondsLeft = remainingFraction / fractionPerSecond - etaText = Self.formatDuration(secondsLeft) - } - } - } - - downloadDetail = DownloadProgressDetail( - fraction: fraction, - sizeText: sizeText, - speedText: speedText, - etaText: etaText - ) - } - - private static func formatBytes(_ bytes: Int64) -> String { - if bytes >= 1_000_000_000 { - return String(format: "%.1f GB", Double(bytes) / 1_000_000_000) - } else { - return String(format: "%.0f MB", Double(bytes) / 1_000_000) - } - } - - private static func formatDuration(_ seconds: Double) -> String { - let s = Int(seconds) - if s < 60 { return "\(s)s remaining" } - let m = s / 60 - let rem = s % 60 - return rem > 0 ? "\(m)m \(rem)s remaining" : "\(m)m remaining" - } } From d6bfcba9e6c9746ff3f3c0931dec58407a7be0cf Mon Sep 17 00:00:00 2001 From: Szymon Sypniewicz Date: Sun, 29 Mar 2026 23:32:01 +0200 Subject: [PATCH 10/10] fix: resolve 10 review bugs and 3 Swift 6 concurrency errors Critical: - Wire ModelDownloadManager progress callbacks to engine UI state - Reset downloadConfirmed/needsModelDownload on model load failure - Remove broken deinit (weak self always nil), document stopListening contract High: - Use shared static queue for CoreAudio listener add/remove calls - Replace 4 force unwraps with guard let - Remove duplicate micTask/sysTask ownership (coordinator is sole owner) - Use bufferingOldest(128) instead of bufferingNewest(1) to prevent audio drops Medium: - Nil diarizationTask synchronously in stopSystemStream to prevent race - Surface mic health check failure to user via lastError callback - Remove dead clearSystemAudioErrorIfPresent, inline at restart success Concurrency: - Extract data from buffer before yielding to diarization continuation - Use nonisolated(unsafe) for read-only AVAudioPCMBuffer across send boundaries - Wrap tappedStream iteration in Sendable TapForwarder struct --- .../Transcription/DeviceRoutingManager.swift | 32 ++++----- .../Transcription/TranscriptionEngine.swift | 72 ++++++++++--------- .../TranscriptionStreamCoordinator.swift | 57 ++++++++++----- 3 files changed, 90 insertions(+), 71 deletions(-) diff --git a/OpenOats/Sources/OpenOats/Transcription/DeviceRoutingManager.swift b/OpenOats/Sources/OpenOats/Transcription/DeviceRoutingManager.swift index 919446e9..616702f0 100644 --- a/OpenOats/Sources/OpenOats/Transcription/DeviceRoutingManager.swift +++ b/OpenOats/Sources/OpenOats/Transcription/DeviceRoutingManager.swift @@ -7,6 +7,10 @@ import os /// when the default input or output device changes. @MainActor final class DeviceRoutingManager { + /// Shared queue for all CoreAudio property listener registrations. + /// Must use the same queue instance for Add and Remove calls. + private static let listenerQueue = DispatchQueue(label: "com.openoats.device-routing", qos: .utility) + /// Called when a mic restart is requested with the target device ID. var onMicRestartRequested: (@Sendable (AudioDeviceID) async -> Void)? @@ -103,17 +107,10 @@ final class DeviceRoutingManager { currentTranscriptStore = nil } - deinit { - // Ensure cleanup happens even if stopListening wasn't called explicitly - // Dispatch to MainActor since these methods are MainActor-isolated - if isListening { - Task { @MainActor [weak self] in - guard let self else { return } - self.removeDefaultDeviceListener() - self.removeDefaultOutputDeviceListener() - } - } - } + // No deinit: Swift 6 strict concurrency prevents accessing non-Sendable + // AudioObjectPropertyListenerBlock from nonisolated deinit. The original + // deinit used [weak self] which was always nil anyway. + // Contract: callers MUST call stopListening() before releasing this object. /// Request a mic restart with a new device. /// Pass 0 to use the system default device. @@ -185,9 +182,6 @@ final class DeviceRoutingManager { mElement: kAudioObjectPropertyElementMain ) - // Use a background queue for the CoreAudio callback, then hop to MainActor via Task - let queue = DispatchQueue.global(qos: .utility) - let block: AudioObjectPropertyListenerBlock = { [weak self] _, _ in guard let self else { return } Task { @MainActor in @@ -201,7 +195,7 @@ final class DeviceRoutingManager { AudioObjectAddPropertyListenerBlock( AudioObjectID(kAudioObjectSystemObject), &address, - queue, + DeviceRoutingManager.listenerQueue, block ) } @@ -216,7 +210,7 @@ final class DeviceRoutingManager { AudioObjectRemovePropertyListenerBlock( AudioObjectID(kAudioObjectSystemObject), &address, - DispatchQueue.global(qos: .utility), + DeviceRoutingManager.listenerQueue, block ) defaultDeviceListenerBlock = nil @@ -231,8 +225,6 @@ final class DeviceRoutingManager { mElement: kAudioObjectPropertyElementMain ) - let queue = DispatchQueue.global(qos: .utility) - let block: AudioObjectPropertyListenerBlock = { [weak self] _, _ in guard let self else { return } Task { @MainActor in @@ -244,7 +236,7 @@ final class DeviceRoutingManager { AudioObjectAddPropertyListenerBlock( AudioObjectID(kAudioObjectSystemObject), &address, - queue, + DeviceRoutingManager.listenerQueue, block ) } @@ -259,7 +251,7 @@ final class DeviceRoutingManager { AudioObjectRemovePropertyListenerBlock( AudioObjectID(kAudioObjectSystemObject), &address, - DispatchQueue.global(qos: .utility), + DeviceRoutingManager.listenerQueue, block ) defaultOutputDeviceListenerBlock = nil diff --git a/OpenOats/Sources/OpenOats/Transcription/TranscriptionEngine.swift b/OpenOats/Sources/OpenOats/Transcription/TranscriptionEngine.swift index 68f7fa79..107012dc 100644 --- a/OpenOats/Sources/OpenOats/Transcription/TranscriptionEngine.swift +++ b/OpenOats/Sources/OpenOats/Transcription/TranscriptionEngine.swift @@ -102,8 +102,6 @@ final class TranscriptionEngine { set { streamCoordinator.isMicMuted = newValue } } - private var micTask: Task? - private var sysTask: Task? /// Keeps the mic stream alive for the audio level meter when transcription isn't running. private var micKeepAliveTask: Task? @@ -134,7 +132,18 @@ final class TranscriptionEngine { self.deviceRoutingManager = DeviceRoutingManager() self.streamCoordinator = TranscriptionStreamCoordinator() - // Wire up callbacks + // Wire download progress to observable properties + self.modelDownloadManager.onStatusUpdate = { [weak self] status in + Task { @MainActor in self?.assetStatus = status } + } + self.modelDownloadManager.onProgressUpdate = { [weak self] fraction in + Task { @MainActor in + self?.downloadProgress = fraction + self?.downloadDetail = self?.modelDownloadManager.downloadDetail + } + } + + // Wire up device routing callbacks self.deviceRoutingManager.onMicRestartRequested = { [weak self] deviceID in await self?.performMicRestart(deviceID: deviceID) } @@ -142,6 +151,13 @@ final class TranscriptionEngine { await self?.performSystemAudioRestart() } + // Wire mic health check callback + self.streamCoordinator.onMicHealthCheckFailed = { [weak self] in + Task { @MainActor in + self?.lastError = "Microphone is not producing audio. Check your input device in System Settings." + } + } + switch mode { case .live: self.needsModelDownload = modelDownloadManager.checkAvailability(for: settings.transcriptionModel) @@ -208,17 +224,16 @@ final class TranscriptionEngine { self.micBackend = backends.mic self.systemBackend = backends.system } catch { - lastError = "Failed to load models: \(error.localizedDescription)" + lastError = "Failed to load models: \(error)" assetStatus = "Ready" isRunning = false modelDownloadManager.clearCache(for: settings.transcriptionModel) + needsModelDownload = true + downloadConfirmed = false return } - // Sync observable properties from manager self.needsModelDownload = modelDownloadManager.needsDownload - self.downloadProgress = modelDownloadManager.downloadProgress - self.downloadDetail = modelDownloadManager.downloadDetail // Load VAD model assetStatus = "Loading VAD model..." @@ -256,6 +271,7 @@ final class TranscriptionEngine { Log.transcription.info("Transcription model loaded") guard let vadManager else { return } + guard let micBackend, let systemBackend else { return } // 2. Resolve mic device and start listening for device changes guard let targetMicID = deviceRoutingManager.resolvedMicDeviceID(for: inputDeviceID) else { @@ -281,15 +297,15 @@ final class TranscriptionEngine { let micResult = streamCoordinator.startMicStream( locale: locale, deviceID: targetMicID, - backend: micBackend!, + backend: micBackend, vadManager: vadManager, transcriptStore: transcriptStore, flushInterval: settings.transcriptionModel.flushIntervalSamples, useAEC: useAEC ) switch micResult { - case .success(let task): - self.micTask = task + case .success: + break case .failure(let error): lastError = error.localizedDescription isRunning = false @@ -299,28 +315,28 @@ final class TranscriptionEngine { // 4. Start system audio stream via coordinator let sysResult = await streamCoordinator.startSystemStream( locale: locale, - backend: systemBackend!, + backend: systemBackend, vadManager: vadManager, diarizationManager: diarizationManager, transcriptStore: transcriptStore, flushInterval: settings.transcriptionModel.flushIntervalSamples ) switch sysResult { - case .success(let task): - self.sysTask = task + case .success: + break case .failure(let error): lastError = error.localizedDescription } - assetStatus = "Transcribing (\(micBackend?.displayName ?? transcriptionModel.displayName))" + assetStatus = "Transcribing (\(micBackend.displayName))" Log.transcription.info("All transcription tasks started") // Store state for restarts deviceRoutingManager.storeRestartState( locale: locale, vadManager: vadManager, - micBackend: micBackend!, - systemBackend: systemBackend!, + micBackend: micBackend, + systemBackend: systemBackend, flushInterval: settings.transcriptionModel.flushIntervalSamples, transcriptStore: transcriptStore ) @@ -384,8 +400,6 @@ final class TranscriptionEngine { micBackend = nil systemBackend = nil - micTask = nil - sysTask = nil transcriptStore.volatileYouText = "" transcriptStore.volatileThemText = "" @@ -416,8 +430,6 @@ final class TranscriptionEngine { // Stop streams via coordinator streamCoordinator.stopAll() - micTask = nil - sysTask = nil micBackend = nil systemBackend = nil diarizationManager = nil @@ -446,7 +458,6 @@ final class TranscriptionEngine { // Stop current mic stream streamCoordinator.stopMicStream() - micTask = nil if Task.isCancelled || !isRunning { return @@ -464,8 +475,7 @@ final class TranscriptionEngine { useAEC: false ) switch micResult { - case .success(let task): - self.micTask = task + case .success: deviceRoutingManager.updateCurrentDeviceID(targetMicID, isUserSelection: false) lastError = nil Log.transcription.info("Mic restarted on device \(targetMicID, privacy: .public)") @@ -481,7 +491,6 @@ final class TranscriptionEngine { // Stop current system stream await streamCoordinator.finalizeSystemStream() - sysTask = nil if Task.isCancelled || !isRunning { return @@ -498,8 +507,12 @@ final class TranscriptionEngine { flushInterval: deviceRoutingManager.currentFlushInterval ?? settings.transcriptionModel.flushIntervalSamples ) switch sysResult { - case .success(let task): - self.sysTask = task + case .success: + // Clear any previous system audio errors + if let lastError, lastError.localizedCaseInsensitiveContains("system audio") || + lastError.localizedCaseInsensitiveContains("audio output device") { + self.lastError = nil + } Log.transcription.info("System audio stream restarted") case .failure(let error): lastError = error.localizedDescription @@ -528,11 +541,4 @@ final class TranscriptionEngine { return identifier.split(separator: "-").first.map { String($0).lowercased() } } - private func clearSystemAudioErrorIfPresent() { - guard let lastError else { return } - if lastError.localizedCaseInsensitiveContains("system audio") || - lastError.localizedCaseInsensitiveContains("audio output device") { - self.lastError = nil - } - } } diff --git a/OpenOats/Sources/OpenOats/Transcription/TranscriptionStreamCoordinator.swift b/OpenOats/Sources/OpenOats/Transcription/TranscriptionStreamCoordinator.swift index eacb8826..888257ad 100644 --- a/OpenOats/Sources/OpenOats/Transcription/TranscriptionStreamCoordinator.swift +++ b/OpenOats/Sources/OpenOats/Transcription/TranscriptionStreamCoordinator.swift @@ -39,6 +39,9 @@ final class TranscriptionStreamCoordinator { /// Audio recorder for tapping streams (set externally when recording is enabled). weak var audioRecorder: AudioRecorder? + /// Called when mic health check detects no audio after timeout. + var onMicHealthCheckFailed: (@Sendable () -> Void)? + /// Combined audio level (mic + system) for the UI meter. nonisolated var audioLevel: Float { max(micCapture.audioLevel, systemCapture.audioLevel) @@ -122,6 +125,7 @@ final class TranscriptionStreamCoordinator { guard let self, self.isMicRunning else { return } if !self.micCapture.hasCapturedFrames && self.micCapture.captureError == nil { Log.transcription.error("no mic audio after 5s") + self.onMicHealthCheckFailed?() } } @@ -186,24 +190,29 @@ final class TranscriptionStreamCoordinator { if let dm = diarizationManager { let diarFlushSize = 16000 let originalSysStream = sysStream - let (diarTapped, diarContinuation) = AsyncStream.makeStream(bufferingPolicy: .bufferingNewest(1)) + let (diarTapped, diarContinuation) = AsyncStream.makeStream(bufferingPolicy: .bufferingOldest(128)) diarizationTask?.cancel() diarizationTask = Task { [weak self, dm] in var diarBuf: [Float] = [] for await buffer in originalSysStream { + // AVAudioPCMBuffer is non-Sendable but used read-only here. + // nonisolated(unsafe) lets us read then yield without copying. + nonisolated(unsafe) let buffer = buffer // Check for cancellation guard !Task.isCancelled else { break } - diarContinuation.yield(buffer) - guard let channelData = buffer.floatChannelData else { continue } - let frameCount = Int(buffer.frameLength) - sysAudioTime.add(Double(frameCount) / buffer.format.sampleRate) - diarBuf.append(contentsOf: UnsafeBufferPointer(start: channelData[0], count: frameCount)) - if diarBuf.count >= diarFlushSize { - let batch = diarBuf - diarBuf.removeAll(keepingCapacity: true) - try? await dm.feedAudio(batch) + if let channelData = buffer.floatChannelData { + let frameCount = Int(buffer.frameLength) + let sampleRate = buffer.format.sampleRate + sysAudioTime.add(Double(frameCount) / sampleRate) + diarBuf.append(contentsOf: UnsafeBufferPointer(start: channelData[0], count: frameCount)) + if diarBuf.count >= diarFlushSize { + let batch = diarBuf + diarBuf.removeAll(keepingCapacity: true) + try? await dm.feedAudio(batch) + } } + diarContinuation.yield(buffer) } // Flush tail if !Task.isCancelled, !diarBuf.isEmpty { @@ -246,14 +255,15 @@ final class TranscriptionStreamCoordinator { /// Stop the system audio stream and transcription. func stopSystemStream() { diarizationTask?.cancel() + diarizationTask = nil systemCapture.finishStream() sysTask?.cancel() sysTask = nil - Task { - await diarizationTask?.value - await systemCapture.stop() - diarizationTask = nil - } + // systemCapture.stop() is async but non-critical for synchronous teardown. + // finishStream() already ends the audio stream; stop() releases the process tap. + // Safe to fire-and-forget since startSystemStream() calls bufferStream() + // which internally awaits any pending stop. + Task { await systemCapture.stop() } } /// Finalize system stream, waiting for transcriber to drain. @@ -361,14 +371,25 @@ final class TranscriptionStreamCoordinator { _ stream: AsyncStream, tap: @escaping @Sendable (AVAudioPCMBuffer) -> Void ) -> AsyncStream { - let (output, continuation) = AsyncStream.makeStream(bufferingPolicy: .bufferingNewest(1)) - Task { + let (output, continuation) = AsyncStream.makeStream(bufferingPolicy: .bufferingOldest(128)) + let forwarder = TapForwarder(stream: stream, continuation: continuation, tap: tap) + Task { await forwarder.run() } + return output + } + + /// Wraps the iteration in a Sendable closure to satisfy strict concurrency on the Task capture. + private struct TapForwarder: Sendable { + nonisolated(unsafe) let stream: AsyncStream + let continuation: AsyncStream.Continuation + let tap: @Sendable (AVAudioPCMBuffer) -> Void + + func run() async { for await buffer in stream { + nonisolated(unsafe) let buffer = buffer tap(buffer) continuation.yield(buffer) } continuation.finish() } - return output } }