diff --git a/OpenOats/Sources/OpenOats/Transcription/DeviceRoutingManager.swift b/OpenOats/Sources/OpenOats/Transcription/DeviceRoutingManager.swift new file mode 100644 index 00000000..616702f0 --- /dev/null +++ b/OpenOats/Sources/OpenOats/Transcription/DeviceRoutingManager.swift @@ -0,0 +1,325 @@ +import CoreAudio +import FluidAudio +import Foundation +import os + +/// Manages CoreAudio device listeners and coordinates audio stream restarts +/// when the default input or output device changes. +@MainActor +final class DeviceRoutingManager { + /// Shared queue for all CoreAudio property listener registrations. + /// Must use the same queue instance for Add and Remove calls. + private static let listenerQueue = DispatchQueue(label: "com.openoats.device-routing", qos: .utility) + + /// Called when a mic restart is requested with the target device ID. + var onMicRestartRequested: (@Sendable (AudioDeviceID) async -> Void)? + + /// Called when a system audio restart is requested. + var onSystemRestartRequested: (@Sendable () async -> Void)? + + /// Tracks whether user selected "System Default" (0) or a specific device. + private var userSelectedDeviceID: AudioDeviceID = 0 + + /// Tracks the resolved mic device ID currently in use. + private var currentMicDeviceID: AudioDeviceID = 0 + + /// Queued mic restart device ID (nil if no pending restart). + private var pendingMicDeviceID: AudioDeviceID? + + /// Queued system audio restart flag. + private var pendingSystemAudioRestart = false + + /// Active mic restart task. + private var micRestartTask: Task? + + /// Active system restart task. + private var sysRestartTask: Task? + + /// Listens for default input device changes at the OS level. + private var defaultDeviceListenerBlock: AudioObjectPropertyListenerBlock? + + /// Listens for default output device changes at the OS level. + private var defaultOutputDeviceListenerBlock: AudioObjectPropertyListenerBlock? + + /// Whether device listeners are currently installed. + private var isListening = false + + /// Stored state for restarts (populated by TranscriptionEngine) + private(set) var currentLocale: Locale? + private(set) var currentVadManager: VadManager? + private(set) var currentMicBackend: (any TranscriptionBackend)? + private(set) var currentSystemBackend: (any TranscriptionBackend)? + private(set) var currentFlushInterval: Int? + private(set) weak var currentTranscriptStore: TranscriptStore? + + /// Store restart state for later use during device changes. + func storeRestartState( + locale: Locale, + vadManager: VadManager, + micBackend: any TranscriptionBackend, + systemBackend: any TranscriptionBackend, + flushInterval: Int, + transcriptStore: TranscriptStore + ) { + currentLocale = locale + currentVadManager = vadManager + currentMicBackend = micBackend + currentSystemBackend = systemBackend + currentFlushInterval = flushInterval + currentTranscriptStore = transcriptStore + } + + /// Start listening for device changes. + /// - Parameter isUsingDefaultDevice: True if the user has selected "System Default" (deviceID = 0) + func startListening(isUsingDefaultDevice: Bool) { + guard !isListening else { return } + isListening = true + + // Store the default device selection state + if isUsingDefaultDevice { + userSelectedDeviceID = 0 + } + + installDefaultDeviceListener() + installDefaultOutputDeviceListener() + } + + /// Stop listening for device changes and cancel any pending restarts. + func stopListening() { + isListening = false + + removeDefaultDeviceListener() + removeDefaultOutputDeviceListener() + + micRestartTask?.cancel() + sysRestartTask?.cancel() + micRestartTask = nil + sysRestartTask = nil + pendingMicDeviceID = nil + pendingSystemAudioRestart = false + + // Clear restart state to free heavy objects (VadManager, backends) + currentLocale = nil + currentVadManager = nil + currentMicBackend = nil + currentSystemBackend = nil + currentFlushInterval = nil + currentTranscriptStore = nil + } + + // No deinit: Swift 6 strict concurrency prevents accessing non-Sendable + // AudioObjectPropertyListenerBlock from nonisolated deinit. The original + // deinit used [weak self] which was always nil anyway. + // Contract: callers MUST call stopListening() before releasing this object. + + /// Request a mic restart with a new device. + /// Pass 0 to use the system default device. + func requestMicRestart(deviceID: AudioDeviceID) { + guard isListening else { return } + + pendingMicDeviceID = deviceID + + if micRestartTask != nil { + Log.transcription.debug("queued mic restart for device \(deviceID, privacy: .public)") + return + } + + micRestartTask = Task { @MainActor [weak self] in + guard let self else { return } + defer { self.micRestartTask = nil } + + while !Task.isCancelled, let requestedDeviceID = self.pendingMicDeviceID { + self.pendingMicDeviceID = nil + await self.performMicRestart(deviceID: requestedDeviceID) + } + } + } + + /// Request a system audio restart (e.g., when output device changes). + func requestSystemAudioRestart() { + guard isListening else { return } + + pendingSystemAudioRestart = true + + if sysRestartTask != nil { + Log.transcription.debug("queued system audio restart") + return + } + + sysRestartTask = Task { @MainActor [weak self] in + guard let self else { return } + defer { self.sysRestartTask = nil } + + while !Task.isCancelled, self.pendingSystemAudioRestart { + self.pendingSystemAudioRestart = false + await self.performSystemAudioRestart() + } + } + } + + /// Update the tracked device IDs after a successful restart. + func updateCurrentDeviceID(_ deviceID: AudioDeviceID, isUserSelection: Bool) { + if isUserSelection { + userSelectedDeviceID = deviceID + } + currentMicDeviceID = deviceID + } + + /// Get the current mic device ID. + var currentMicID: AudioDeviceID { currentMicDeviceID } + + /// Get the user selected device ID (0 = system default). + var selectedDeviceID: AudioDeviceID { userSelectedDeviceID } + + // MARK: - Default Device Listeners + + private func installDefaultDeviceListener() { + guard defaultDeviceListenerBlock == nil else { return } + + var address = AudioObjectPropertyAddress( + mSelector: kAudioHardwarePropertyDefaultInputDevice, + mScope: kAudioObjectPropertyScopeGlobal, + mElement: kAudioObjectPropertyElementMain + ) + + let block: AudioObjectPropertyListenerBlock = { [weak self] _, _ in + guard let self else { return } + Task { @MainActor in + // Only auto-restart if user is using system default + guard self.userSelectedDeviceID == 0 else { return } + self.requestMicRestart(deviceID: 0) + } + } + defaultDeviceListenerBlock = block + + AudioObjectAddPropertyListenerBlock( + AudioObjectID(kAudioObjectSystemObject), + &address, + DeviceRoutingManager.listenerQueue, + block + ) + } + + private func removeDefaultDeviceListener() { + guard let block = defaultDeviceListenerBlock else { return } + var address = AudioObjectPropertyAddress( + mSelector: kAudioHardwarePropertyDefaultInputDevice, + mScope: kAudioObjectPropertyScopeGlobal, + mElement: kAudioObjectPropertyElementMain + ) + AudioObjectRemovePropertyListenerBlock( + AudioObjectID(kAudioObjectSystemObject), + &address, + DeviceRoutingManager.listenerQueue, + block + ) + defaultDeviceListenerBlock = nil + } + + private func installDefaultOutputDeviceListener() { + guard defaultOutputDeviceListenerBlock == nil else { return } + + var address = AudioObjectPropertyAddress( + mSelector: kAudioHardwarePropertyDefaultOutputDevice, + mScope: kAudioObjectPropertyScopeGlobal, + mElement: kAudioObjectPropertyElementMain + ) + + let block: AudioObjectPropertyListenerBlock = { [weak self] _, _ in + guard let self else { return } + Task { @MainActor in + self.requestSystemAudioRestart() + } + } + defaultOutputDeviceListenerBlock = block + + AudioObjectAddPropertyListenerBlock( + AudioObjectID(kAudioObjectSystemObject), + &address, + DeviceRoutingManager.listenerQueue, + block + ) + } + + private func removeDefaultOutputDeviceListener() { + guard let block = defaultOutputDeviceListenerBlock else { return } + var address = AudioObjectPropertyAddress( + mSelector: kAudioHardwarePropertyDefaultOutputDevice, + mScope: kAudioObjectPropertyScopeGlobal, + mElement: kAudioObjectPropertyElementMain + ) + AudioObjectRemovePropertyListenerBlock( + AudioObjectID(kAudioObjectSystemObject), + &address, + DeviceRoutingManager.listenerQueue, + block + ) + defaultOutputDeviceListenerBlock = nil + } + + // MARK: - Restart Implementation + + private func performMicRestart(deviceID: AudioDeviceID) async { + guard !Task.isCancelled else { return } + + userSelectedDeviceID = deviceID + + guard let targetMicID = resolvedMicDeviceID(for: deviceID) else { + Log.transcription.error("mic swap failed: device unavailable") + return + } + + guard targetMicID != currentMicDeviceID else { + Log.transcription.debug("mic swap skipped, same device \(targetMicID, privacy: .public)") + return + } + + Log.transcription.info("switching mic from \(self.currentMicDeviceID, privacy: .public) to \(targetMicID, privacy: .public)") + + // Notify the engine to perform the actual restart + // Note: State is updated only after successful restart via updateCurrentDeviceID + await onMicRestartRequested?(targetMicID) + } + + private func performSystemAudioRestart() async { + guard !Task.isCancelled else { return } + + Log.transcription.info("restarting system audio stream") + + // Notify the engine to perform the actual restart + await onSystemRestartRequested?() + } + + // MARK: - Device Resolution + + /// Generate an error message for when a mic device is unavailable. + func unavailableMicMessage(for inputDeviceID: AudioDeviceID) -> String { + if inputDeviceID > 0 { + return "The selected microphone is no longer available." + } + + return "No default microphone is currently available." + } + + /// Check if a specific device ID is available. + func isDeviceAvailable(_ deviceID: AudioDeviceID) -> Bool { + if deviceID == 0 { + return MicCapture.defaultInputDeviceID() != nil + } + let available = Set(MicCapture.availableInputDevices().map(\.id)) + return available.contains(deviceID) + } +} + +// MARK: - Public Extension for Engine Access + +extension DeviceRoutingManager { + /// Resolve the target mic device ID, handling "System Default" (0) selection. + func resolvedMicDeviceID(for inputDeviceID: AudioDeviceID) -> AudioDeviceID? { + if inputDeviceID > 0 { + let availableDeviceIDs = Set(MicCapture.availableInputDevices().map(\.id)) + return availableDeviceIDs.contains(inputDeviceID) ? inputDeviceID : nil + } + return MicCapture.defaultInputDeviceID() + } +} diff --git a/OpenOats/Sources/OpenOats/Transcription/ModelDownloadManager.swift b/OpenOats/Sources/OpenOats/Transcription/ModelDownloadManager.swift new file mode 100644 index 00000000..73289154 --- /dev/null +++ b/OpenOats/Sources/OpenOats/Transcription/ModelDownloadManager.swift @@ -0,0 +1,219 @@ +import AVFoundation +import FluidAudio +import Foundation +import os + +/// Enriched download progress info computed from fraction changes over time. +struct DownloadProgressDetail: Sendable { + let fraction: Double + /// Formatted string like "142 MB / 800 MB" + let sizeText: String? + /// Formatted string like "3.5 MB/s" + let speedText: String? + /// Formatted string like "2m 15s remaining" + let etaText: String? +} + +/// Loaded backend instances for mic and system audio transcription. +struct LoadedBackends { + let mic: any TranscriptionBackend + let system: any TranscriptionBackend +} + +/// Manages transcription model download, loading, and progress tracking. +@MainActor +final class ModelDownloadManager { + private let settings: AppSettings + + /// Combined progress for UI binding. + private(set) var needsDownload: Bool = false + private(set) var downloadProgress: Double? + private(set) var downloadDetail: DownloadProgressDetail? + private(set) var isLoading: Bool = false + + /// Callbacks for status updates. + var onStatusUpdate: (@Sendable (String) -> Void)? + var onProgressUpdate: (@Sendable (Double) -> Void)? + + // Progress tracking state (not observed) + private var downloadStartTime: Date? + private var downloadTotalBytes: Int64? + + init(settings: AppSettings) { + self.settings = settings + } + + /// Check if the specified model needs to be downloaded. + /// - Returns: true if a download is needed + @discardableResult + func checkAvailability(for model: TranscriptionModel) -> Bool { + needsDownload = Self.modelNeedsDownload(model) + return needsDownload + } + + /// Load the specified model and return configured backends for mic and system audio. + /// - Parameters: + /// - model: The transcription model to load + /// - customVocabulary: Custom vocabulary string to pass to the backend + /// - Returns: Loaded backends for mic and system audio + func loadModel( + _ model: TranscriptionModel, + customVocabulary: String + ) async throws -> LoadedBackends { + guard !isLoading else { + throw TranscriptionBackendError.notPrepared + } + + // Recompute availability based on the model parameter, not cached flag + let modelNeedsDownload = Self.modelNeedsDownload(model) + self.needsDownload = modelNeedsDownload + + isLoading = true + defer { + isLoading = false + // Clear progress state on both success and failure + downloadProgress = nil + downloadDetail = nil + downloadStartTime = nil + downloadTotalBytes = nil + } + + let isDownloading = modelNeedsDownload + + if isDownloading { + downloadProgress = 0 + downloadStartTime = Date() + downloadTotalBytes = model.estimatedDownloadBytes + downloadDetail = DownloadProgressDetail( + fraction: 0, + sizeText: nil, + speedText: nil, + etaText: nil + ) + } + + Log.transcription.info("loading transcription model \(model.rawValue, privacy: .public)") + + // Load mic backend + let mic = model.makeBackend(customVocabulary: customVocabulary) + try await mic.prepare( + onStatus: { [weak self] status in + Task { @MainActor in + self?.onStatusUpdate?(status) + } + }, + onProgress: { [weak self] fraction in + Task { @MainActor in + self?.downloadProgress = fraction + self?.updateDownloadDetail(fraction: fraction) + self?.onProgressUpdate?(fraction) + } + } + ) + + // Parakeet needs a separate backend for system audio (mutable decoder state). + // Qwen3 is actor-based and thread-safe, so reuse the same instance. + let system: any TranscriptionBackend + if model == .qwen3ASR06B { + system = mic + } else { + let sys = model.makeBackend(customVocabulary: customVocabulary) + try await sys.prepare { _ in } + system = sys + } + + // Clear download progress on success + needsDownload = false + + Log.transcription.info("transcription model loaded") + + return LoadedBackends(mic: mic, system: system) + } + + /// Clear the model cache for the specified model. + func clearCache(for model: TranscriptionModel) { + model.makeBackend().clearModelCache() + Log.transcription.info("cleared model cache for \(model.rawValue, privacy: .public)") + } + + // MARK: - Private Helpers + + private static func modelNeedsDownload(_ model: TranscriptionModel) -> Bool { + let backend = model.makeBackend() + if case .needsDownload = backend.checkStatus() { + return true + } + return false + } + + private func updateDownloadDetail(fraction: Double) { + guard let startTime = downloadStartTime else { + downloadDetail = DownloadProgressDetail( + fraction: fraction, + sizeText: nil, + speedText: nil, + etaText: nil + ) + return + } + + let elapsed = Date().timeIntervalSince(startTime) + let totalBytes = downloadTotalBytes + + // Size text: "142 MB / 800 MB" (only when total is known) + var sizeText: String? + if let totalBytes { + let downloaded = Int64(fraction * Double(totalBytes)) + sizeText = "\(Self.formatBytes(downloaded)) / \(Self.formatBytes(totalBytes))" + } + + // Speed and ETA need enough elapsed time to be meaningful + var speedText: String? + var etaText: String? + if elapsed > 1, fraction > 0.01 { + // Speed from fraction progress rate + known total + if let totalBytes { + let bytesDownloaded = fraction * Double(totalBytes) + let bytesPerSecond = bytesDownloaded / elapsed + speedText = "\(Self.formatBytes(Int64(bytesPerSecond)))/s" + + let remaining = Double(totalBytes) - bytesDownloaded + if bytesPerSecond > 0 { + let secondsLeft = remaining / bytesPerSecond + etaText = Self.formatDuration(secondsLeft) + } + } else { + // No total bytes known — estimate ETA from fraction rate alone + let fractionPerSecond = fraction / elapsed + if fractionPerSecond > 0 { + let remainingFraction = 1.0 - fraction + let secondsLeft = remainingFraction / fractionPerSecond + etaText = Self.formatDuration(secondsLeft) + } + } + } + + downloadDetail = DownloadProgressDetail( + fraction: fraction, + sizeText: sizeText, + speedText: speedText, + etaText: etaText + ) + } + + private static func formatBytes(_ bytes: Int64) -> String { + if bytes >= 1_000_000_000 { + return String(format: "%.1f GB", Double(bytes) / 1_000_000_000) + } else { + return String(format: "%.0f MB", Double(bytes) / 1_000_000) + } + } + + private static func formatDuration(_ seconds: Double) -> String { + let s = Int(seconds) + if s < 60 { return "\(s)s remaining" } + let m = s / 60 + let rem = s % 60 + return rem > 0 ? "\(m)m \(rem)s remaining" : "\(m)m remaining" + } +} diff --git a/OpenOats/Sources/OpenOats/Transcription/TranscriptionEngine.swift b/OpenOats/Sources/OpenOats/Transcription/TranscriptionEngine.swift index e48da036..107012dc 100644 --- a/OpenOats/Sources/OpenOats/Transcription/TranscriptionEngine.swift +++ b/OpenOats/Sources/OpenOats/Transcription/TranscriptionEngine.swift @@ -15,17 +15,6 @@ enum TranscriptionEngineError: LocalizedError { } } -/// Enriched download progress info computed from fraction changes over time. -struct DownloadProgressDetail: Sendable { - let fraction: Double - /// Formatted string like "142 MB / 800 MB" - let sizeText: String? - /// Formatted string like "3.5 MB/s" - let speedText: String? - /// Formatted string like "2m 15s remaining" - let etaText: String? -} - /// Orchestrates dual StreamingTranscriber instances for mic (you) and system audio (them). @Observable @MainActor @@ -87,22 +76,20 @@ final class TranscriptionEngine { set { withMutation(keyPath: \.downloadDetail) { _downloadDetail = newValue } } } - // Progress tracking state (not observed) - @ObservationIgnored private var downloadStartTime: Date? - @ObservationIgnored private var downloadTotalBytes: Int64? - - private let systemCapture = SystemAudioCapture() - private let micCapture = MicCapture() private let transcriptStore: TranscriptStore private let settings: AppSettings private let mode: Mode + private let modelDownloadManager: ModelDownloadManager + private let deviceRoutingManager: DeviceRoutingManager + private let streamCoordinator: TranscriptionStreamCoordinator + /// Combined audio level (mic + system) for the UI meter. /// nonisolated is safe here — both audioLevel properties are thread-safe (NSLock). nonisolated var audioLevel: Float { switch mode { case .live: - max(micCapture.audioLevel, systemCapture.audioLevel) + streamCoordinator.audioLevel case .scripted: _isRunning ? 0.35 : 0 } @@ -111,12 +98,10 @@ final class TranscriptionEngine { /// Mute/unmute the microphone. When muted, mic audio is not transcribed /// and the audio level reads as 0. System audio continues normally. nonisolated var isMicMuted: Bool { - get { micCapture.isMuted } - set { micCapture.isMuted = newValue } + get { streamCoordinator.isMicMuted } + set { streamCoordinator.isMicMuted = newValue } } - private var micTask: Task? - private var sysTask: Task? /// Keeps the mic stream alive for the audio level meter when transcription isn't running. private var micKeepAliveTask: Task? @@ -129,33 +114,53 @@ final class TranscriptionEngine { private var vadManager: VadManager? /// Audio recorder for tapping streams (set by ContentView when recording is enabled). - var audioRecorder: AudioRecorder? + var audioRecorder: AudioRecorder? { + didSet { + streamCoordinator.audioRecorder = audioRecorder + } + } /// Speaker diarization manager for system audio (nil when diarization is disabled). private var diarizationManager: DiarizationManager? - /// Tracks the resolved mic device ID currently in use. - private var currentMicDeviceID: AudioDeviceID = 0 - - /// Tracks whether user selected "System Default" (0) or a specific device. - private var userSelectedDeviceID: AudioDeviceID = 0 - - /// Listens for default input device changes at the OS level. - private var defaultDeviceListenerBlock: AudioObjectPropertyListenerBlock? - /// Listens for default output device changes at the OS level. - private var defaultOutputDeviceListenerBlock: AudioObjectPropertyListenerBlock? - private var micRestartTask: Task? - private var sysRestartTask: Task? - private var pendingMicDeviceID: AudioDeviceID? - private var pendingSystemAudioRestart = false - init(transcriptStore: TranscriptStore, settings: AppSettings, mode: Mode = .live) { self.transcriptStore = transcriptStore self.settings = settings self.mode = mode + + self.modelDownloadManager = ModelDownloadManager(settings: settings) + self.deviceRoutingManager = DeviceRoutingManager() + self.streamCoordinator = TranscriptionStreamCoordinator() + + // Wire download progress to observable properties + self.modelDownloadManager.onStatusUpdate = { [weak self] status in + Task { @MainActor in self?.assetStatus = status } + } + self.modelDownloadManager.onProgressUpdate = { [weak self] fraction in + Task { @MainActor in + self?.downloadProgress = fraction + self?.downloadDetail = self?.modelDownloadManager.downloadDetail + } + } + + // Wire up device routing callbacks + self.deviceRoutingManager.onMicRestartRequested = { [weak self] deviceID in + await self?.performMicRestart(deviceID: deviceID) + } + self.deviceRoutingManager.onSystemRestartRequested = { [weak self] in + await self?.performSystemAudioRestart() + } + + // Wire mic health check callback + self.streamCoordinator.onMicHealthCheckFailed = { [weak self] in + Task { @MainActor in + self?.lastError = "Microphone is not producing audio. Check your input device in System Settings." + } + } + switch mode { case .live: - self.needsModelDownload = Self.modelNeedsDownload(settings.transcriptionModel) + self.needsModelDownload = modelDownloadManager.checkAvailability(for: settings.transcriptionModel) case .scripted: self.needsModelDownload = false } @@ -164,7 +169,7 @@ final class TranscriptionEngine { func refreshModelAvailability() { switch mode { case .live: - needsModelDownload = Self.modelNeedsDownload(settings.transcriptionModel) + needsModelDownload = modelDownloadManager.checkAvailability(for: settings.transcriptionModel) case .scripted: needsModelDownload = false } @@ -208,160 +213,133 @@ final class TranscriptionEngine { isRunning = true - // 1. Load transcription models via backend protocol - let isDownloading = needsModelDownload - assetStatus = isDownloading - ? "Downloading \(transcriptionModel.displayName)..." - : "Loading \(transcriptionModel.displayName)..." - if isDownloading { - downloadProgress = 0 - downloadStartTime = Date() - downloadTotalBytes = transcriptionModel.estimatedDownloadBytes - downloadDetail = DownloadProgressDetail(fraction: 0, sizeText: nil, speedText: nil, etaText: nil) - } - Log.transcription.info("Loading transcription model \(transcriptionModel.rawValue, privacy: .public)") + // 1. Load transcription models via manager + assetStatus = "Loading transcription model..." + let backends: LoadedBackends do { - let vocab = settings.transcriptionCustomVocabulary - let mic = transcriptionModel.makeBackend(customVocabulary: vocab) - try await mic.prepare( - onStatus: { [weak self] status in - Task { @MainActor in - self?.assetStatus = status - } - }, - onProgress: { [weak self] fraction in - Task { @MainActor in - self?.downloadProgress = fraction - self?.updateDownloadDetail(fraction: fraction) - } - } + backends = try await modelDownloadManager.loadModel( + settings.transcriptionModel, + customVocabulary: settings.transcriptionCustomVocabulary ) - self.micBackend = mic - - // Parakeet needs a separate backend for system audio (mutable decoder state). - // Qwen3 is actor-based and thread-safe, so reuse the same instance. - if transcriptionModel == .qwen3ASR06B { - self.systemBackend = mic - } else { - let sys = transcriptionModel.makeBackend(customVocabulary: vocab) - try await sys.prepare { _ in } - self.systemBackend = sys - } + self.micBackend = backends.mic + self.systemBackend = backends.system + } catch { + lastError = "Failed to load models: \(error)" + assetStatus = "Ready" + isRunning = false + modelDownloadManager.clearCache(for: settings.transcriptionModel) + needsModelDownload = true + downloadConfirmed = false + return + } - assetStatus = "Loading VAD model..." - Log.transcription.info("Loading VAD model") + self.needsModelDownload = modelDownloadManager.needsDownload + + // Load VAD model + assetStatus = "Loading VAD model..." + Log.transcription.info("Loading VAD model") + do { let vad = try await VadManager() self.vadManager = vad + } catch { + lastError = "Failed to load VAD model: \(error.localizedDescription)" + assetStatus = "Ready" + isRunning = false + return + } - // Optionally load speaker diarization model - if settings.enableDiarization { - assetStatus = "Loading diarization model..." - Log.transcription.info("Loading LS-EEND diarization model") - let dm = DiarizationManager() - let variant = LSEENDVariant(rawValue: settings.diarizationVariant.rawValue) ?? .dihard3 + // Optionally load speaker diarization model + if settings.enableDiarization { + assetStatus = "Loading diarization model..." + Log.transcription.info("Loading LS-EEND diarization model") + let dm = DiarizationManager() + let variant = LSEENDVariant(rawValue: settings.diarizationVariant.rawValue) ?? .dihard3 + do { try await dm.load(variant: variant) self.diarizationManager = dm Log.transcription.info("Diarization model loaded") - } else { + } catch { + Log.transcription.error("Failed to load diarization model: \(error, privacy: .public)") + // Non-fatal: continue without diarization self.diarizationManager = nil } - - needsModelDownload = false - downloadConfirmed = false - downloadProgress = nil - downloadDetail = nil - downloadStartTime = nil - downloadTotalBytes = nil - assetStatus = "Models ready" - Log.transcription.info("Transcription model loaded") - } catch { - let msg = "Failed to load models: \(error.localizedDescription)" - Log.transcription.error("Failed to load models: \(msg, privacy: .public)") - lastError = msg - assetStatus = "Ready" - isRunning = false - downloadProgress = nil - downloadDetail = nil - downloadStartTime = nil - downloadTotalBytes = nil - // Clear corrupt cache so the next attempt triggers a fresh download - settings.transcriptionModel.makeBackend().clearModelCache() - Log.transcription.info("Cleared model cache for \(self.settings.transcriptionModel.rawValue, privacy: .public)") - needsModelDownload = true - downloadConfirmed = false - return + } else { + self.diarizationManager = nil } + assetStatus = "Models ready" + Log.transcription.info("Transcription model loaded") + guard let vadManager else { return } + guard let micBackend, let systemBackend else { return } - // 2. Start mic capture - userSelectedDeviceID = inputDeviceID - guard let targetMicID = resolvedMicDeviceID(for: inputDeviceID) else { - let msg = unavailableMicMessage(for: inputDeviceID) + // 2. Resolve mic device and start listening for device changes + guard let targetMicID = deviceRoutingManager.resolvedMicDeviceID(for: inputDeviceID) else { + let msg = deviceRoutingManager.unavailableMicMessage(for: inputDeviceID) Log.transcription.error("Mic unavailable: \(msg, privacy: .public)") lastError = msg assetStatus = "Ready" isRunning = false return } - currentMicDeviceID = targetMicID - // AEC (voice processing) conflicts with system audio capture on macOS — - // both cause CoreAudio aggregate-device reconfiguration that can stall the - // mic stream. Since system audio capture is always active during recording, - // AEC must be disabled to prevent capture failures. + + deviceRoutingManager.startListening(isUsingDefaultDevice: inputDeviceID == 0) + deviceRoutingManager.updateCurrentDeviceID(targetMicID, isUserSelection: true) + + // AEC (voice processing) conflicts with system audio capture on macOS let useAEC = false if settings.enableEchoCancellation { Log.transcription.info("AEC disabled - conflicts with system audio capture") } + // 3. Start mic stream via coordinator Log.transcription.info("Starting mic capture, targetMicID=\(targetMicID, privacy: .public), aec=\(useAEC, privacy: .public)") - startMicStream( + let micResult = streamCoordinator.startMicStream( locale: locale, - vadManager: vadManager, deviceID: targetMicID, - echoCancellation: useAEC + backend: micBackend, + vadManager: vadManager, + transcriptStore: transcriptStore, + flushInterval: settings.transcriptionModel.flushIntervalSamples, + useAEC: useAEC ) - - // Check for immediate mic capture failure - if let micError = micCapture.captureError { - Log.transcription.error("Mic capture error: \(micError, privacy: .public)") - lastError = micError + switch micResult { + case .success: + break + case .failure(let error): + lastError = error.localizedDescription + isRunning = false + return } - // Health check: if mic produces no audio within 5 seconds, retry once - // without AEC before surfacing the error. - Task { @MainActor [weak self] in - try? await Task.sleep(for: .seconds(5)) - guard let self, self.isRunning else { return } - if !self.micCapture.hasCapturedFrames && self.micCapture.captureError == nil { - if useAEC { - Log.transcription.error("No mic audio after 5s with AEC, retrying without") - self.micCapture.finishStream() - await self.micTask?.value - self.micTask = nil - self.micCapture.stop() - self.startMicStream( - locale: locale, - vadManager: vadManager, - deviceID: targetMicID, - echoCancellation: false - ) - } else { - Log.transcription.error("No mic audio after 5s") - self.lastError = "Microphone is not producing audio. Check your input device in System Settings." - } - } + // 4. Start system audio stream via coordinator + let sysResult = await streamCoordinator.startSystemStream( + locale: locale, + backend: systemBackend, + vadManager: vadManager, + diarizationManager: diarizationManager, + transcriptStore: transcriptStore, + flushInterval: settings.transcriptionModel.flushIntervalSamples + ) + switch sysResult { + case .success: + break + case .failure(let error): + lastError = error.localizedDescription } - // 3. Start system audio capture - await startSystemAudioStream(locale: locale, vadManager: vadManager) - - assetStatus = "Transcribing (\(micBackend?.displayName ?? transcriptionModel.displayName))" + assetStatus = "Transcribing (\(micBackend.displayName))" Log.transcription.info("All transcription tasks started") - // Install CoreAudio listeners for live device routing changes - installDefaultDeviceListener() - installDefaultOutputDeviceListener() + // Store state for restarts + deviceRoutingManager.storeRestartState( + locale: locale, + vadManager: vadManager, + micBackend: micBackend, + systemBackend: systemBackend, + flushInterval: settings.transcriptionModel.flushIntervalSamples, + transcriptStore: transcriptStore + ) } /// Restart only the mic capture with a new device, keeping system audio and models intact. @@ -369,108 +347,7 @@ final class TranscriptionEngine { func restartMic(inputDeviceID: AudioDeviceID) { if case .scripted = mode { return } guard isRunning else { return } - pendingMicDeviceID = inputDeviceID - - if micRestartTask != nil { - Log.transcription.info("Queued mic restart for device \(inputDeviceID, privacy: .public)") - return - } - - micRestartTask = Task { @MainActor [weak self] in - guard let self else { return } - defer { self.micRestartTask = nil } - - while self.isRunning, let requestedDeviceID = self.pendingMicDeviceID { - self.pendingMicDeviceID = nil - await self.performMicRestart(inputDeviceID: requestedDeviceID) - } - } - } - - // MARK: - Default Device Listener - - private func installDefaultDeviceListener() { - guard defaultDeviceListenerBlock == nil else { return } - - var address = AudioObjectPropertyAddress( - mSelector: kAudioHardwarePropertyDefaultInputDevice, - mScope: kAudioObjectPropertyScopeGlobal, - mElement: kAudioObjectPropertyElementMain - ) - - let block: AudioObjectPropertyListenerBlock = { [weak self] _, _ in - guard let self else { return } - Task { @MainActor in - guard self.isRunning, self.userSelectedDeviceID == 0 else { return } - self.restartMic(inputDeviceID: 0) - } - } - defaultDeviceListenerBlock = block - - AudioObjectAddPropertyListenerBlock( - AudioObjectID(kAudioObjectSystemObject), - &address, - DispatchQueue.main, - block - ) - } - - private func removeDefaultDeviceListener() { - guard let block = defaultDeviceListenerBlock else { return } - var address = AudioObjectPropertyAddress( - mSelector: kAudioHardwarePropertyDefaultInputDevice, - mScope: kAudioObjectPropertyScopeGlobal, - mElement: kAudioObjectPropertyElementMain - ) - AudioObjectRemovePropertyListenerBlock( - AudioObjectID(kAudioObjectSystemObject), - &address, - DispatchQueue.main, - block - ) - defaultDeviceListenerBlock = nil - } - - private func installDefaultOutputDeviceListener() { - guard defaultOutputDeviceListenerBlock == nil else { return } - - var address = AudioObjectPropertyAddress( - mSelector: kAudioHardwarePropertyDefaultOutputDevice, - mScope: kAudioObjectPropertyScopeGlobal, - mElement: kAudioObjectPropertyElementMain - ) - - let block: AudioObjectPropertyListenerBlock = { [weak self] _, _ in - guard let self else { return } - Task { @MainActor in - guard self.isRunning else { return } - self.restartSystemAudio() - } - } - defaultOutputDeviceListenerBlock = block - - AudioObjectAddPropertyListenerBlock( - AudioObjectID(kAudioObjectSystemObject), - &address, - DispatchQueue.main, - block - ) - } - - private func removeDefaultOutputDeviceListener() { - guard let block = defaultOutputDeviceListenerBlock else { return } - var address = AudioObjectPropertyAddress( - mSelector: kAudioHardwarePropertyDefaultOutputDevice, - mScope: kAudioObjectPropertyScopeGlobal, - mElement: kAudioObjectPropertyElementMain - ) - AudioObjectRemovePropertyListenerBlock( - AudioObjectID(kAudioObjectSystemObject), - &address, - DispatchQueue.main, - block - ) - defaultOutputDeviceListenerBlock = nil + deviceRoutingManager.requestMicRestart(deviceID: inputDeviceID) } private func ensureMicrophonePermission() async -> Bool { @@ -496,6 +373,8 @@ final class TranscriptionEngine { } func finalize() async { + Log.transcription.info("finalize() called") + if case .scripted = mode { isRunning = false assetStatus = "Ready" @@ -504,30 +383,15 @@ final class TranscriptionEngine { return } - removeDefaultDeviceListener() - removeDefaultOutputDeviceListener() - micRestartTask?.cancel() - sysRestartTask?.cancel() - micRestartTask = nil - sysRestartTask = nil - pendingMicDeviceID = nil - pendingSystemAudioRestart = false - micKeepAliveTask?.cancel() - - micCapture.finishStream() - systemCapture.finishStream() + isRunning = false + assetStatus = "Finalizing..." - await micTask?.value - await sysTask?.value + // Stop listening for device changes + deviceRoutingManager.stopListening() - micCapture.stop() - await systemCapture.stop() + // Finalize streams via coordinator + await streamCoordinator.finalize() - micTask = nil - sysTask = nil - pendingMicDeviceID = nil - micKeepAliveTask = nil - currentMicDeviceID = 0 // Finalize and release diarization manager if let dm = diarizationManager { await dm.finalize() @@ -538,11 +402,14 @@ final class TranscriptionEngine { systemBackend = nil transcriptStore.volatileYouText = "" transcriptStore.volatileThemText = "" - isRunning = false + assetStatus = "Ready" + Log.transcription.info("finalize() completed") } func stop() { + Log.transcription.info("stop() called") + if case .scripted = mode { isRunning = false assetStatus = "Ready" @@ -551,87 +418,69 @@ final class TranscriptionEngine { return } - removeDefaultDeviceListener() - removeDefaultOutputDeviceListener() - micRestartTask?.cancel() - sysRestartTask?.cancel() - micRestartTask = nil - sysRestartTask = nil - pendingMicDeviceID = nil - pendingSystemAudioRestart = false - micTask?.cancel() - sysTask?.cancel() + isRunning = false + assetStatus = "Ready" + micKeepAliveTask?.cancel() - micTask = nil - sysTask = nil micKeepAliveTask = nil - Task { await systemCapture.stop() } - micCapture.stop() - currentMicDeviceID = 0 + + // Stop device listeners + deviceRoutingManager.stopListening() + + // Stop streams via coordinator + streamCoordinator.stopAll() + micBackend = nil systemBackend = nil + diarizationManager = nil transcriptStore.volatileYouText = "" transcriptStore.volatileThemText = "" - isRunning = false - assetStatus = "Ready" + + Log.transcription.info("stop() completed") } - private func performMicRestart(inputDeviceID: AudioDeviceID) async { - guard isRunning, let vadManager else { return } + // MARK: - Restart Implementation (Called by DeviceRoutingManager) - userSelectedDeviceID = inputDeviceID + private func performMicRestart(deviceID: AudioDeviceID) async { + guard isRunning, let vadManager else { return } - guard let targetMicID = resolvedMicDeviceID(for: inputDeviceID) else { - let msg = unavailableMicMessage(for: inputDeviceID) - Log.transcription.error("Mic swap failed: \(msg, privacy: .public)") - lastError = msg + guard let targetMicID = deviceRoutingManager.resolvedMicDeviceID(for: deviceID) else { + Log.transcription.error("Mic swap failed: device unavailable") return } - guard targetMicID != currentMicDeviceID else { + guard targetMicID != deviceRoutingManager.currentMicID else { Log.transcription.debug("Mic swap skipped, same device \(targetMicID, privacy: .public)") return } - Log.transcription.info("Switching mic from \(self.currentMicDeviceID, privacy: .public) to \(targetMicID, privacy: .public)") + Log.transcription.info("Switching mic to \(targetMicID, privacy: .public)") - micCapture.finishStream() - await micTask?.value + // Stop current mic stream + streamCoordinator.stopMicStream() if Task.isCancelled || !isRunning { return } - micTask = nil - micCapture.stop() - startMicStream( - locale: settings.locale, + // Start new mic stream + guard let micBackend else { return } + let micResult = streamCoordinator.startMicStream( + locale: deviceRoutingManager.currentLocale ?? settings.locale, + deviceID: targetMicID, + backend: micBackend, vadManager: vadManager, - deviceID: targetMicID + transcriptStore: transcriptStore, + flushInterval: deviceRoutingManager.currentFlushInterval ?? settings.transcriptionModel.flushIntervalSamples, + useAEC: false ) - currentMicDeviceID = targetMicID - lastError = nil - - Log.transcription.info("Mic restarted on device \(targetMicID, privacy: .public)") - } - - private func restartSystemAudio() { - guard isRunning else { return } - pendingSystemAudioRestart = true - - if sysRestartTask != nil { - Log.transcription.info("Queued system audio restart") - return - } - - sysRestartTask = Task { @MainActor [weak self] in - guard let self else { return } - defer { self.sysRestartTask = nil } - - while self.isRunning, self.pendingSystemAudioRestart { - self.pendingSystemAudioRestart = false - await self.performSystemAudioRestart() - } + switch micResult { + case .success: + deviceRoutingManager.updateCurrentDeviceID(targetMicID, isUserSelection: false) + lastError = nil + Log.transcription.info("Mic restarted on device \(targetMicID, privacy: .public)") + case .failure(let error): + lastError = error.localizedDescription } } @@ -640,215 +489,38 @@ final class TranscriptionEngine { Log.transcription.info("Restarting system audio stream") - systemCapture.finishStream() - await sysTask?.value + // Stop current system stream + await streamCoordinator.finalizeSystemStream() if Task.isCancelled || !isRunning { return } - sysTask = nil - await systemCapture.stop() - await startSystemAudioStream(locale: settings.locale, vadManager: vadManager) - - Log.transcription.info("System audio stream restarted") - } - - private func startMicStream( - locale: Locale, - vadManager: VadManager, - deviceID: AudioDeviceID, - echoCancellation: Bool = false - ) { - var micStream = micCapture.bufferStream(deviceID: deviceID, echoCancellation: echoCancellation) - if let recorder = audioRecorder { - micStream = Self.tappedStream(micStream) { buffer in - recorder.writeMicBuffer(buffer) - } - } - let store = transcriptStore - guard let micTranscriber = makeTranscriber( - locale: locale, - speaker: .you, - vadManager: vadManager, - onPartial: { text in - Task { @MainActor in store.volatileYouText = text } - }, - onFinal: { text in - Task { @MainActor in - store.volatileYouText = "" - store.append(Utterance(text: text, speaker: .you)) - } - } - ) else { - lastError = "Failed to create transcriber. Try restarting." - isRunning = false - assetStatus = "Ready" - return - } - micTask = Task.detached { - await micTranscriber.run(stream: micStream) - } - } - - private func startSystemAudioStream( - locale: Locale, - vadManager: VadManager - ) async { - Log.transcription.info("Starting system audio capture") - - let sysStreams: SystemAudioCapture.CaptureStreams - do { - sysStreams = try await systemCapture.bufferStream() - Log.transcription.info("System audio capture started") - clearSystemAudioErrorIfPresent() - } catch { - let msg = "Failed to start system audio: \(error.localizedDescription)" - Log.transcription.error("Failed to start system audio: \(msg, privacy: .public)") - lastError = msg - return - } - - var sysStream = sysStreams.systemAudio - if let recorder = audioRecorder { - sysStream = Self.tappedStream(sysStream) { buffer in - recorder.writeSysBuffer(buffer) - } - } - - // Track cumulative audio time for diarizer speaker attribution - let sysAudioTime = SyncDouble() - - // Tee system audio to diarization manager if enabled - if let dm = diarizationManager { - let diarFlushSize = 16000 - let originalSysStream = sysStream - let (diarTapped, diarContinuation) = AsyncStream.makeStream() - Task { - nonisolated(unsafe) let safeDm = dm - var diarBuf: [Float] = [] - for await buffer in originalSysStream { - nonisolated(unsafe) let b = buffer - diarContinuation.yield(b) - guard let channelData = buffer.floatChannelData else { continue } - let frameCount = Int(buffer.frameLength) - sysAudioTime.add(Double(frameCount) / buffer.format.sampleRate) - diarBuf.append(contentsOf: UnsafeBufferPointer(start: channelData[0], count: frameCount)) - if diarBuf.count >= diarFlushSize { - let batch = diarBuf - diarBuf.removeAll(keepingCapacity: true) - try? await safeDm.feedAudio(batch) - } - } - // Flush tail - if !diarBuf.isEmpty { - try? await safeDm.feedAudio(diarBuf) - } - diarContinuation.finish() - } - sysStream = diarTapped - } - - let store = transcriptStore - guard let sysTranscriber = makeTranscriber( - locale: locale, - speaker: .them, - vadManager: vadManager, - onPartial: { text in - Task { @MainActor in store.volatileThemText = text } - }, - onFinal: { [weak self] text in - Task { @MainActor in - store.volatileThemText = "" - let speaker: Speaker - if let dm = self?.diarizationManager { - // Estimate segment time: each onFinal is ~3-5s of speech - let endTime = sysAudioTime.value - let startTime = max(0, endTime - 5.0) - speaker = await dm.dominantSpeaker(from: startTime, to: endTime) - } else { - speaker = .them - } - store.append(Utterance(text: text, speaker: speaker)) - } - } - ) else { - lastError = "Failed to create the system-audio transcriber. Try restarting." - return - } - - sysTask = Task.detached { - await sysTranscriber.run(stream: sysStream) - } - } - - private func makeTranscriber( - locale: Locale, - speaker: Speaker, - vadManager: VadManager, - onPartial: @escaping @Sendable (String) -> Void, - onFinal: @escaping @Sendable (String) -> Void - ) -> StreamingTranscriber? { - let backend = speaker == .you ? micBackend : systemBackend - guard let backend else { - Log.transcription.error("makeTranscriber called without initialized backend for \(speaker.storageKey, privacy: .public)") - return nil - } - return StreamingTranscriber( - backend: backend, - locale: locale, + // Start new system stream + guard let systemBackend else { return } + let sysResult = await streamCoordinator.startSystemStream( + locale: deviceRoutingManager.currentLocale ?? settings.locale, + backend: systemBackend, vadManager: vadManager, - speaker: speaker, - flushInterval: settings.transcriptionModel.flushIntervalSamples, - onPartial: onPartial, - onFinal: onFinal + diarizationManager: diarizationManager, + transcriptStore: transcriptStore, + flushInterval: deviceRoutingManager.currentFlushInterval ?? settings.transcriptionModel.flushIntervalSamples ) - } - - private func resolvedMicDeviceID(for inputDeviceID: AudioDeviceID) -> AudioDeviceID? { - if inputDeviceID > 0 { - let availableDeviceIDs = Set(MicCapture.availableInputDevices().map(\.id)) - return availableDeviceIDs.contains(inputDeviceID) ? inputDeviceID : nil - } - - return MicCapture.defaultInputDeviceID() - } - - private func unavailableMicMessage(for inputDeviceID: AudioDeviceID) -> String { - if inputDeviceID > 0 { - return "The selected microphone is no longer available." - } - - return "No default microphone is currently available." - } - - private static func modelNeedsDownload(_ model: TranscriptionModel) -> Bool { - let backend = model.makeBackend() - if case .needsDownload = backend.checkStatus() { - return true - } - return false - } - - /// Wrap an audio stream to forward each buffer to a synchronous tap before yielding it downstream. - private nonisolated static func tappedStream( - _ stream: AsyncStream, - tap: @escaping @Sendable (AVAudioPCMBuffer) -> Void - ) -> AsyncStream { - struct Box: @unchecked Sendable { let stream: AsyncStream } - let box = Box(stream: stream) - let (output, continuation) = AsyncStream.makeStream() - Task { - for await buffer in box.stream { - tap(buffer) - nonisolated(unsafe) let b = buffer - continuation.yield(b) + switch sysResult { + case .success: + // Clear any previous system audio errors + if let lastError, lastError.localizedCaseInsensitiveContains("system audio") || + lastError.localizedCaseInsensitiveContains("audio output device") { + self.lastError = nil } - continuation.finish() + Log.transcription.info("System audio stream restarted") + case .failure(let error): + lastError = error.localizedDescription } - return output } + // MARK: - Utility Methods + private func localeMismatchMessage( for locale: Locale, transcriptionModel: TranscriptionModel @@ -869,79 +541,4 @@ final class TranscriptionEngine { return identifier.split(separator: "-").first.map { String($0).lowercased() } } - private func clearSystemAudioErrorIfPresent() { - guard let lastError else { return } - if lastError.localizedCaseInsensitiveContains("system audio") || - lastError.localizedCaseInsensitiveContains("audio output device") { - self.lastError = nil - } - } - - // MARK: - Download Progress Detail - - private func updateDownloadDetail(fraction: Double) { - guard let startTime = downloadStartTime else { - downloadDetail = DownloadProgressDetail(fraction: fraction, sizeText: nil, speedText: nil, etaText: nil) - return - } - - let elapsed = Date().timeIntervalSince(startTime) - let totalBytes = downloadTotalBytes - - // Size text: "142 MB / 800 MB" (only when total is known) - var sizeText: String? - if let totalBytes { - let downloaded = Int64(fraction * Double(totalBytes)) - sizeText = "\(Self.formatBytes(downloaded)) / \(Self.formatBytes(totalBytes))" - } - - // Speed and ETA need enough elapsed time to be meaningful - var speedText: String? - var etaText: String? - if elapsed > 1, fraction > 0.01 { - // Speed from fraction progress rate + known total - if let totalBytes { - let bytesDownloaded = fraction * Double(totalBytes) - let bytesPerSecond = bytesDownloaded / elapsed - speedText = "\(Self.formatBytes(Int64(bytesPerSecond)))/s" - - let remaining = Double(totalBytes) - bytesDownloaded - if bytesPerSecond > 0 { - let secondsLeft = remaining / bytesPerSecond - etaText = Self.formatDuration(secondsLeft) - } - } else { - // No total bytes known — estimate ETA from fraction rate alone - let fractionPerSecond = fraction / elapsed - if fractionPerSecond > 0 { - let remainingFraction = 1.0 - fraction - let secondsLeft = remainingFraction / fractionPerSecond - etaText = Self.formatDuration(secondsLeft) - } - } - } - - downloadDetail = DownloadProgressDetail( - fraction: fraction, - sizeText: sizeText, - speedText: speedText, - etaText: etaText - ) - } - - private static func formatBytes(_ bytes: Int64) -> String { - if bytes >= 1_000_000_000 { - return String(format: "%.1f GB", Double(bytes) / 1_000_000_000) - } else { - return String(format: "%.0f MB", Double(bytes) / 1_000_000) - } - } - - private static func formatDuration(_ seconds: Double) -> String { - let s = Int(seconds) - if s < 60 { return "\(s)s remaining" } - let m = s / 60 - let rem = s % 60 - return rem > 0 ? "\(m)m \(rem)s remaining" : "\(m)m remaining" - } } diff --git a/OpenOats/Sources/OpenOats/Transcription/TranscriptionStreamCoordinator.swift b/OpenOats/Sources/OpenOats/Transcription/TranscriptionStreamCoordinator.swift new file mode 100644 index 00000000..888257ad --- /dev/null +++ b/OpenOats/Sources/OpenOats/Transcription/TranscriptionStreamCoordinator.swift @@ -0,0 +1,395 @@ +import AVFoundation +import FluidAudio +import Foundation +import os + +/// Errors that can occur during stream coordination with user-facing messages. +enum TranscriptionStreamError: LocalizedError { + case microphonePermissionDenied + case microphoneUnavailable + case systemAudioCaptureFailed(Error) + case transcriberCreationFailed(Speaker) + + var errorDescription: String? { + switch self { + case .microphonePermissionDenied: + return "Microphone access was denied. Enable it in System Settings > Privacy & Security > Microphone." + case .microphoneUnavailable: + return "The selected microphone is no longer available." + case .systemAudioCaptureFailed: + return "System audio capture failed to start." + case .transcriberCreationFailed(let speaker): + return "Failed to create the \(speaker == .you ? "microphone" : "system audio") transcriber. Try restarting." + } + } +} + +/// Coordinates audio stream capture, transcribers, and optional recording. +/// Manages the lifecycle of mic and system audio transcription tasks. +@MainActor +final class TranscriptionStreamCoordinator { + private let micCapture: MicCapture + private let systemCapture: SystemAudioCapture + + init(micCapture: MicCapture = MicCapture(), systemCapture: SystemAudioCapture = SystemAudioCapture()) { + self.micCapture = micCapture + self.systemCapture = systemCapture + } + + /// Audio recorder for tapping streams (set externally when recording is enabled). + weak var audioRecorder: AudioRecorder? + + /// Called when mic health check detects no audio after timeout. + var onMicHealthCheckFailed: (@Sendable () -> Void)? + + /// Combined audio level (mic + system) for the UI meter. + nonisolated var audioLevel: Float { + max(micCapture.audioLevel, systemCapture.audioLevel) + } + + /// Mute/unmute the microphone. When muted, mic audio is not transcribed + /// and the audio level reads as 0. System audio continues normally. + nonisolated var isMicMuted: Bool { + get { micCapture.isMuted } + set { micCapture.isMuted = newValue } + } + + /// Mic transcription task. + private var micTask: Task? + + /// System audio transcription task. + private var sysTask: Task? + + /// Health check task for mic audio. + private var micHealthTask: Task? + + /// Diarization audio feed task. + private var diarizationTask: Task? + + /// Track if mic is currently running for health checks. + private var isMicRunning = false + + /// Start the mic audio stream and transcription. + /// - Returns: The transcription task on success, or an error on failure + @discardableResult + func startMicStream( + locale: Locale, + deviceID: AudioDeviceID, + backend: any TranscriptionBackend, + vadManager: VadManager, + transcriptStore: TranscriptStore, + flushInterval: Int, + useAEC: Bool + ) -> Result, TranscriptionStreamError> { + // Store state for potential restarts + isMicRunning = true + + var micStream = micCapture.bufferStream(deviceID: deviceID, echoCancellation: useAEC) + + // Check for immediate mic capture failure + if let micError = micCapture.captureError { + Log.transcription.error("Mic capture setup error: \(micError)") + isMicRunning = false + return .failure(.microphoneUnavailable) + } + + // Add recording tap if recorder is set + if let recorder = audioRecorder { + micStream = Self.tappedStream(micStream) { buffer in + recorder.writeMicBuffer(buffer) + } + } + + guard let micTranscriber = makeTranscriber( + backend: backend, + locale: locale, + vadManager: vadManager, + speaker: .you, + transcriptStore: transcriptStore, + flushInterval: flushInterval + ) else { + Log.transcription.error("Failed to create mic transcriber") + isMicRunning = false + return .failure(.transcriberCreationFailed(.you)) + } + + micTask = Task { [weak self] in + await micTranscriber.run(stream: micStream) + await self?.markMicStopped() + } + + // Health check: if mic produces no audio within 5 seconds, log the issue + micHealthTask?.cancel() + micHealthTask = Task { @MainActor [weak self] in + try? await Task.sleep(for: .seconds(5)) + guard let self, self.isMicRunning else { return } + if !self.micCapture.hasCapturedFrames && self.micCapture.captureError == nil { + Log.transcription.error("no mic audio after 5s") + self.onMicHealthCheckFailed?() + } + } + + return .success(micTask!) + } + + /// Mark mic as stopped - MainActor-isolated to prevent concurrency violations + private func markMicStopped() { + isMicRunning = false + } + + /// Stop the mic audio stream and transcription. + func stopMicStream() { + micHealthTask?.cancel() + micHealthTask = nil + micCapture.finishStream() + micTask?.cancel() + micTask = nil + micCapture.stop() + isMicRunning = false + } + + /// Finalize mic stream, waiting for transcriber to drain. + func finalizeMicStream() async { + micHealthTask?.cancel() + micHealthTask = nil + micCapture.finishStream() + await micTask?.value + micCapture.stop() + micTask = nil + isMicRunning = false + } + + /// Start the system audio stream and transcription. + /// - Returns: The transcription task on success, or an error on failure + @discardableResult + func startSystemStream( + locale: Locale, + backend: any TranscriptionBackend, + vadManager: VadManager, + diarizationManager: DiarizationManager?, + transcriptStore: TranscriptStore, + flushInterval: Int + ) async -> Result, TranscriptionStreamError> { + Log.transcription.info("starting system audio capture") + + let sysStreams: SystemAudioCapture.CaptureStreams + do { + sysStreams = try await systemCapture.bufferStream() + Log.transcription.info("system audio capture started") + } catch { + Log.transcription.error("Failed to start system audio: \(error.localizedDescription, privacy: .public)") + return .failure(.systemAudioCaptureFailed(error)) + } + + var sysStream = sysStreams.systemAudio + + // Track cumulative audio time for diarizer speaker attribution + let sysAudioTime = SyncDouble() + + // Tee system audio to diarization manager if enabled + if let dm = diarizationManager { + let diarFlushSize = 16000 + let originalSysStream = sysStream + let (diarTapped, diarContinuation) = AsyncStream.makeStream(bufferingPolicy: .bufferingOldest(128)) + + diarizationTask?.cancel() + diarizationTask = Task { [weak self, dm] in + var diarBuf: [Float] = [] + for await buffer in originalSysStream { + // AVAudioPCMBuffer is non-Sendable but used read-only here. + // nonisolated(unsafe) lets us read then yield without copying. + nonisolated(unsafe) let buffer = buffer + // Check for cancellation + guard !Task.isCancelled else { break } + if let channelData = buffer.floatChannelData { + let frameCount = Int(buffer.frameLength) + let sampleRate = buffer.format.sampleRate + sysAudioTime.add(Double(frameCount) / sampleRate) + diarBuf.append(contentsOf: UnsafeBufferPointer(start: channelData[0], count: frameCount)) + if diarBuf.count >= diarFlushSize { + let batch = diarBuf + diarBuf.removeAll(keepingCapacity: true) + try? await dm.feedAudio(batch) + } + } + diarContinuation.yield(buffer) + } + // Flush tail + if !Task.isCancelled, !diarBuf.isEmpty { + try? await dm.feedAudio(diarBuf) + } + diarContinuation.finish() + self?.diarizationTask = nil + } + sysStream = diarTapped + } + + // Add recording tap if recorder is set + if let recorder = audioRecorder { + sysStream = Self.tappedStream(sysStream) { buffer in + recorder.writeSysBuffer(buffer) + } + } + + guard let sysTranscriber = makeTranscriber( + backend: backend, + locale: locale, + vadManager: vadManager, + speaker: .them, + transcriptStore: transcriptStore, + flushInterval: flushInterval, + diarizationManager: diarizationManager, + audioTime: diarizationManager != nil ? sysAudioTime : nil + ) else { + Log.transcription.error("Failed to create system audio transcriber") + return .failure(.transcriberCreationFailed(.them)) + } + + sysTask = Task { + await sysTranscriber.run(stream: sysStream) + } + + return .success(sysTask!) + } + + /// Stop the system audio stream and transcription. + func stopSystemStream() { + diarizationTask?.cancel() + diarizationTask = nil + systemCapture.finishStream() + sysTask?.cancel() + sysTask = nil + // systemCapture.stop() is async but non-critical for synchronous teardown. + // finishStream() already ends the audio stream; stop() releases the process tap. + // Safe to fire-and-forget since startSystemStream() calls bufferStream() + // which internally awaits any pending stop. + Task { await systemCapture.stop() } + } + + /// Finalize system stream, waiting for transcriber to drain. + func finalizeSystemStream() async { + diarizationTask?.cancel() + systemCapture.finishStream() + await sysTask?.value + await diarizationTask?.value + await systemCapture.stop() + sysTask = nil + diarizationTask = nil + } + + /// Stop all audio streams and transcription. + func stopAll() { + stopMicStream() + stopSystemStream() + } + + /// Finalize all streams and wait for tasks to complete. + func finalize() async { + micHealthTask?.cancel() + micHealthTask = nil + + diarizationTask?.cancel() + + micCapture.finishStream() + systemCapture.finishStream() + + await micTask?.value + await sysTask?.value + await diarizationTask?.value + + micCapture.stop() + await systemCapture.stop() + + micTask = nil + sysTask = nil + diarizationTask = nil + isMicRunning = false + } + + // MARK: - Private Helpers + + private func makeTranscriber( + backend: any TranscriptionBackend, + locale: Locale, + vadManager: VadManager, + speaker: Speaker, + transcriptStore: TranscriptStore, + flushInterval: Int, + diarizationManager: DiarizationManager? = nil, + audioTime: SyncDouble? = nil + ) -> StreamingTranscriber? { + let store = transcriptStore + + let onPartial: @Sendable (String) -> Void + let onFinal: @Sendable (String) -> Void + + if speaker == .you { + onPartial = { text in + Task { @MainActor in store.volatileYouText = text } + } + onFinal = { text in + Task { @MainActor in + store.volatileYouText = "" + store.append(Utterance(text: text, speaker: .you)) + } + } + } else { + let dm = diarizationManager + let time = audioTime + onPartial = { text in + Task { @MainActor in store.volatileThemText = text } + } + onFinal = { text in + Task { @MainActor in + store.volatileThemText = "" + let finalSpeaker: Speaker + if let dm, let t = time { + let endTime = t.value + let startTime = max(0, endTime - 5.0) + finalSpeaker = await dm.dominantSpeaker(from: startTime, to: endTime) + } else { + finalSpeaker = .them + } + store.append(Utterance(text: text, speaker: finalSpeaker)) + } + } + } + + return StreamingTranscriber( + backend: backend, + locale: locale, + vadManager: vadManager, + speaker: speaker, + flushInterval: flushInterval, + onPartial: onPartial, + onFinal: onFinal + ) + } + + /// Wrap an audio stream to forward each buffer to a synchronous tap before yielding it downstream. + private nonisolated static func tappedStream( + _ stream: AsyncStream, + tap: @escaping @Sendable (AVAudioPCMBuffer) -> Void + ) -> AsyncStream { + let (output, continuation) = AsyncStream.makeStream(bufferingPolicy: .bufferingOldest(128)) + let forwarder = TapForwarder(stream: stream, continuation: continuation, tap: tap) + Task { await forwarder.run() } + return output + } + + /// Wraps the iteration in a Sendable closure to satisfy strict concurrency on the Task capture. + private struct TapForwarder: Sendable { + nonisolated(unsafe) let stream: AsyncStream + let continuation: AsyncStream.Continuation + let tap: @Sendable (AVAudioPCMBuffer) -> Void + + func run() async { + for await buffer in stream { + nonisolated(unsafe) let buffer = buffer + tap(buffer) + continuation.yield(buffer) + } + continuation.finish() + } + } +} diff --git a/OpenOats/Sources/OpenOats/Utils/Logging.swift b/OpenOats/Sources/OpenOats/Utils/Logging.swift index eabe2f59..8300ab4f 100644 --- a/OpenOats/Sources/OpenOats/Utils/Logging.swift +++ b/OpenOats/Sources/OpenOats/Utils/Logging.swift @@ -1,6 +1,13 @@ import Foundation import os +/// Centralized logger factory. One subsystem, per-component categories. +/// +/// Usage: `Log.mic.debug("buffer received")` +/// +/// Filter in Terminal: +/// log stream --predicate 'subsystem == "com.openoats.app"' --level debug +/// log stream --predicate 'subsystem == "com.openoats.app" AND category == "MicCapture"' enum Log { static let mic = Logger(subsystem: subsystem, category: "MicCapture") static let recorder = Logger(subsystem: subsystem, category: "AudioRecorder")