From fc9d34ccafc69d5dba6d7b74911ee6e364aee154 Mon Sep 17 00:00:00 2001 From: Steven Enamakel Date: Sun, 12 Apr 2026 13:54:20 -0700 Subject: [PATCH 1/7] feat: enhance local AI asset state management and error handling - Updated the Home component to improve readiness checks for local AI assets, ensuring a more robust UI experience. - Refactored the LocalAiService to provide clearer state management for STT and TTS models, including handling on-demand downloads and error logging. - Enhanced the voice status function to accurately reflect the availability of transcription backends, improving overall system reliability. - Introduced warnings for on-demand model downloads to inform users about potential delays in functionality. --- app/src/pages/Home.tsx | 2 + src/openhuman/local_ai/service/assets.rs | 77 +++++++++++++++++++----- src/openhuman/voice/ops.rs | 9 ++- 3 files changed, 71 insertions(+), 17 deletions(-) diff --git a/app/src/pages/Home.tsx b/app/src/pages/Home.tsx index c3a8e5938..74f9f3849 100644 --- a/app/src/pages/Home.tsx +++ b/app/src/pages/Home.tsx @@ -184,6 +184,7 @@ const Home = () => { if (localAiStatus.state !== 'ready') return false; const isDone = (state: string | undefined | null): boolean => state === 'ready' || state === 'disabled' || state === 'ondemand'; + return ( isDone(localAiAssets.chat?.state) && isDone(localAiAssets.vision?.state) && @@ -192,6 +193,7 @@ const Home = () => { isDone(localAiAssets.tts?.state) ); }, [localAiStatus, localAiAssets]); + const isInstalling = localAiStatus?.state === 'installing'; const indeterminateDownload = isInstalling || diff --git a/src/openhuman/local_ai/service/assets.rs b/src/openhuman/local_ai/service/assets.rs index 3be7b8c6e..7d7f2ed99 100644 --- a/src/openhuman/local_ai/service/assets.rs +++ b/src/openhuman/local_ai/service/assets.rs @@ -4,6 +4,8 @@ use futures_util::TryStreamExt; use crate::openhuman::config::Config; use crate::openhuman::local_ai::model_ids; +use log::debug; + use crate::openhuman::local_ai::paths::{ resolve_stt_model_path, resolve_tts_voice_path, stt_model_target_path, tts_model_target_path, }; @@ -25,8 +27,61 @@ impl LocalAiService { let chat_ready = self.has_model(&chat_model).await.unwrap_or(false); let vision_ready = self.has_model(&vision_model).await.unwrap_or(false); let embedding_ready = self.has_model(&embedding_model).await.unwrap_or(false); - let stt_path = resolve_stt_model_path(config).ok(); - let tts_path = resolve_tts_voice_path(config).ok(); + let stt_resolve = resolve_stt_model_path(config); + let tts_resolve = resolve_tts_voice_path(config); + + let stt_path = stt_resolve.as_ref().ok().cloned(); + let tts_path = tts_resolve.as_ref().ok().cloned(); + + // STT and TTS are downloaded on-demand (first transcription / first + // synthesis). When the model file is not yet on disk but a download + // URL is configured, report "ondemand" instead of "missing" so the + // UI can treat the capability as non-blocking. + let has_stt_url = config + .local_ai + .stt_download_url + .as_deref() + .is_some_and(|v| !v.trim().is_empty()); + let has_tts_url = config + .local_ai + .tts_download_url + .as_deref() + .is_some_and(|v| !v.trim().is_empty()); + + let stt_state = if stt_path.is_some() { + "ready" + } else if has_stt_url { + "ondemand" + } else { + "missing" + }; + let tts_state = if tts_path.is_some() { + "ready" + } else if has_tts_url { + "ondemand" + } else { + "missing" + }; + + if let Err(ref err) = stt_resolve { + debug!( + "[local_ai::assets_status] STT resolve failed (state={stt_state}): {err}" + ); + } + if let Err(ref err) = tts_resolve { + debug!( + "[local_ai::assets_status] TTS resolve failed (state={tts_state}): {err}" + ); + } + + let stt_warning = match stt_state { + "ondemand" => Some("STT model will download on first transcription request.".to_string()), + _ => None, + }; + let tts_warning = match tts_state { + "ondemand" => Some("TTS voice will download on first synthesis request.".to_string()), + _ => None, + }; let vision_mode = presets::vision_mode_for_config(&config.local_ai); Ok(LocalAiAssetsStatus { @@ -67,28 +122,18 @@ impl LocalAiService { warning: None, }, stt: LocalAiAssetStatus { - state: if stt_path.is_some() { - "ready" - } else { - "missing" - } - .to_string(), + state: stt_state.to_string(), id: stt_model, provider: "whisper.cpp".to_string(), path: stt_path, - warning: None, + warning: stt_warning, }, tts: LocalAiAssetStatus { - state: if tts_path.is_some() { - "ready" - } else { - "missing" - } - .to_string(), + state: tts_state.to_string(), id: tts_voice, provider: "piper".to_string(), path: tts_path, - warning: None, + warning: tts_warning, }, quantization: model_ids::effective_quantization(config), }) diff --git a/src/openhuman/voice/ops.rs b/src/openhuman/voice/ops.rs index 3380317de..34324af28 100644 --- a/src/openhuman/voice/ops.rs +++ b/src/openhuman/voice/ops.rs @@ -33,7 +33,14 @@ pub async fn voice_status(config: &Config) -> Result, St let service = local_ai::global(config); let whisper_in_process = whisper_engine::is_loaded(&service.whisper); - let stt_available = whisper_in_process || (whisper_bin.is_some() && stt_model.is_some()); + // STT is available when ANY transcription backend can work: + // 1. The in-process whisper engine is already loaded, OR + // 2. In-process whisper is enabled in config and the model file exists + // (the engine will load the model on first use), OR + // 3. The whisper-cli binary is installed and the model file exists. + let stt_available = whisper_in_process + || (config.local_ai.whisper_in_process && stt_model.is_some()) + || (whisper_bin.is_some() && stt_model.is_some()); let tts_available = piper_bin.is_some() && tts_voice.is_some(); debug!( From 269de552c30d40d3cd6f8d657f9ab1dbc0b589d8 Mon Sep 17 00:00:00 2001 From: Steven Enamakel Date: Sun, 12 Apr 2026 14:05:53 -0700 Subject: [PATCH 2/7] feat: implement polling for voice server status in OverlayApp - Added a polling mechanism to periodically check the voice server status every 2 seconds, ensuring the overlay remains in sync with the server state. - Introduced logic to activate or dismiss the speech-to-text (STT) mode based on the server's recording or transcribing state, enhancing user experience and responsiveness. - Utilized the existing `callCoreRpc` function to fetch server status, improving reliability in state management during brief disconnects or reconnections. --- app/src/overlay/OverlayApp.tsx | 66 ++++++++++++++++++++++++++++++++++ 1 file changed, 66 insertions(+) diff --git a/app/src/overlay/OverlayApp.tsx b/app/src/overlay/OverlayApp.tsx index 75b31d3c5..0c08b3702 100644 --- a/app/src/overlay/OverlayApp.tsx +++ b/app/src/overlay/OverlayApp.tsx @@ -34,6 +34,7 @@ import { useCallback, useEffect, useMemo, useRef, useState } from 'react'; import { io, Socket } from 'socket.io-client'; import RotatingTetrahedronCanvas from '../components/RotatingTetrahedronCanvas'; +import { callCoreRpc } from '../services/coreRpcClient'; import { CORE_RPC_URL } from '../utils/config'; const OVERLAY_IDLE_WIDTH = 50; @@ -325,6 +326,71 @@ export default function OverlayApp() { }; }, [clearDismissTimer, handleAttention, handleDictationToggle, handleDictationTranscription]); + // ── Poll voice server status as fallback sync ───────────────────────── + // Socket events are the primary state driver, but if an event is missed + // (reconnect, brief disconnect) the overlay can get stuck. Polling the + // actual server state every 2s corrects any drift. + const modeRef = useRef(mode); + useEffect(() => { + modeRef.current = mode; + }, [mode]); + + useEffect(() => { + let disposed = false; + + const poll = async () => { + try { + const res = await callCoreRpc<{ + state: string; + hotkey: string; + activation_mode: string; + transcription_count: number; + last_error: string | null; + }>({ method: 'openhuman.voice_server_status' }); + + if (disposed) return; + + const serverState = res.state; // 'stopped' | 'idle' | 'recording' | 'transcribing' + const currentMode = modeRef.current; + + // Server is actively recording/transcribing but overlay is idle → show stt + if ( + (serverState === 'recording' || serverState === 'transcribing') && + currentMode === 'idle' + ) { + console.debug(`[overlay] poll sync: server=${serverState}, overlay=idle → activating stt`); + setMode('stt'); + setBubble({ + id: `stt-poll-${Date.now()}`, + text: serverState === 'transcribing' ? '"Transcribing…"' : STT_LISTENING_PLACEHOLDER, + tone: 'accent', + compact: true, + }); + } + + // Server is idle/stopped but overlay thinks it's in stt → dismiss + if ( + (serverState === 'idle' || serverState === 'stopped') && + currentMode === 'stt' + ) { + console.debug( + `[overlay] poll sync: server=${serverState}, overlay=stt → dismissing` + ); + goIdle(); + } + } catch { + // Core not reachable — ignore, will retry on next poll + } + }; + + void poll(); + const id = window.setInterval(() => void poll(), 2000); + return () => { + disposed = true; + window.clearInterval(id); + }; + }, [goIdle]); + // ── Window framing: resize / reposition on mode change ──────────────── const status: 'idle' | 'active' = mode === 'idle' ? 'idle' : 'active'; From 98b13c463b15b7ee124e150f36d1b4549a75b2bd Mon Sep 17 00:00:00 2001 From: Steven Enamakel Date: Sun, 12 Apr 2026 14:07:14 -0700 Subject: [PATCH 3/7] feat: refine prompt handling in LocalAiService - Updated the prompt construction logic in `LocalAiService` to enhance clarity and functionality. - When `no_think` is set, the system prompt now includes a directive for the model to respond with only the final answer, improving response accuracy. - Refactored the prompt and system parameters to ensure they are correctly formatted and passed to the `OllamaGenerateRequest`, enhancing overall request handling. --- .../local_ai/service/public_infer.rs | 20 ++++++++++++------- 1 file changed, 13 insertions(+), 7 deletions(-) diff --git a/src/openhuman/local_ai/service/public_infer.rs b/src/openhuman/local_ai/service/public_infer.rs index dce194365..36e23f507 100644 --- a/src/openhuman/local_ai/service/public_infer.rs +++ b/src/openhuman/local_ai/service/public_infer.rs @@ -308,16 +308,22 @@ impl LocalAiService { } let started = std::time::Instant::now(); - let mut combined_prompt = String::new(); - if no_think { - combined_prompt.push_str("Respond with only the final answer. No reasoning.\\n\\n"); - } - combined_prompt.push_str(prompt); + + // When `no_think` is set, append the instruction to the system + // prompt so the model treats it as a directive rather than content + // it might parrot back. + let effective_system = if no_think { + format!( + "{system}\n\nRespond with only the final answer. No reasoning, no preamble." + ) + } else { + system.to_string() + }; let body = OllamaGenerateRequest { model: model_ids::effective_chat_model_id(config), - prompt: combined_prompt, - system: Some(system.to_string()), + prompt: prompt.to_string(), + system: Some(effective_system), images: None, stream: false, options: Some(OllamaGenerateOptions { From de21f8b5a120a28718f1de6d4673054588087572 Mon Sep 17 00:00:00 2001 From: Steven Enamakel Date: Sun, 12 Apr 2026 14:43:19 -0700 Subject: [PATCH 4/7] feat: enhance STT readiness checks in VoicePanel and useVoiceSkillStatus - Updated the STT readiness logic in `VoicePanel` to account for both 'ready' and 'ondemand' states, improving the accuracy of readiness assessments. - Refined the `useVoiceSkillStatus` hook to ensure it only blocks when the local AI's STT state is explicitly 'missing', enhancing overall system reliability and user experience. --- app/src/components/settings/panels/VoicePanel.tsx | 4 +++- app/src/features/voice/useVoiceSkillStatus.ts | 8 ++++++-- 2 files changed, 9 insertions(+), 3 deletions(-) diff --git a/app/src/components/settings/panels/VoicePanel.tsx b/app/src/components/settings/panels/VoicePanel.tsx index d4db8be88..afdf55fb7 100644 --- a/app/src/components/settings/panels/VoicePanel.tsx +++ b/app/src/components/settings/panels/VoicePanel.tsx @@ -65,7 +65,9 @@ const VoicePanel = () => { setSavedSettings(settingsResponse.result); setServerStatus(serverResponse); setVoiceStatus(voiceResponse); - setSttReady(assetsResponse.result.stt?.state === 'ready' && voiceResponse.stt_available); + const sttAssetState = assetsResponse.result.stt?.state; + const sttAssetOk = sttAssetState === 'ready' || sttAssetState === 'ondemand'; + setSttReady(sttAssetOk && voiceResponse.stt_available); setError(null); } catch (err) { const message = err instanceof Error ? err.message : 'Failed to load voice settings'; diff --git a/app/src/features/voice/useVoiceSkillStatus.ts b/app/src/features/voice/useVoiceSkillStatus.ts index 39b3f193c..26d75faff 100644 --- a/app/src/features/voice/useVoiceSkillStatus.ts +++ b/app/src/features/voice/useVoiceSkillStatus.ts @@ -60,8 +60,12 @@ export function useVoiceSkillStatus(): VoiceSkillStatus { const sttReady = useMemo(() => { if (!voiceStatus) return false; if (!voiceStatus.stt_available) return false; - // Also check Local AI asset state if available - if (localAi && localAi.stt_state !== 'ready') return false; + // The in-memory stt_state starts as "idle" and only flips to "ready" + // after the first download or transcription. The authoritative check + // is `voiceStatus.stt_available` (which inspects the filesystem and + // engine readiness). Only block when stt_state is explicitly an error + // state — "missing" means the model file really isn't on disk. + if (localAi && localAi.stt_state === 'missing') return false; return true; }, [voiceStatus, localAi]); From d62c898514d5eac6e1c1bd04435d34ac7b8cd5d5 Mon Sep 17 00:00:00 2001 From: Steven Enamakel Date: Sun, 12 Apr 2026 15:40:18 -0700 Subject: [PATCH 5/7] feat(composio): implement ComposeIO trigger history component and hook - Added `ComposeioTriggerHistory` component to display a list of ComposeIO trigger events with formatted timestamps and payloads. - Introduced `useComposeioTriggerHistory` hook to manage fetching and state of trigger history entries, including error handling and loading states. - Updated `Webhooks` page to integrate the new component and hook, replacing previous webhook activity display with ComposeIO trigger history. - Created utility functions for formatting timestamps and payloads for better readability in the UI. - Established backend support for fetching trigger history through new Tauri commands, ensuring robust data handling and storage. --- .../webhooks/ComposeioTriggerHistory.tsx | 81 ++++++ app/src/hooks/useComposeioTriggerHistory.ts | 78 ++++++ app/src/pages/Webhooks.tsx | 67 ++--- app/src/utils/tauriCommands/composio.ts | 30 +++ app/src/utils/tauriCommands/index.ts | 1 + src/core/jsonrpc.rs | 7 +- src/openhuman/about_app/catalog.rs | 3 +- src/openhuman/composio/bus.rs | 20 ++ src/openhuman/composio/mod.rs | 7 +- src/openhuman/composio/ops.rs | 32 +++ src/openhuman/composio/schemas.rs | 37 +++ src/openhuman/composio/trigger_history.rs | 235 ++++++++++++++++++ src/openhuman/composio/types.rs | 26 ++ 13 files changed, 590 insertions(+), 34 deletions(-) create mode 100644 app/src/components/webhooks/ComposeioTriggerHistory.tsx create mode 100644 app/src/hooks/useComposeioTriggerHistory.ts create mode 100644 app/src/utils/tauriCommands/composio.ts create mode 100644 src/openhuman/composio/trigger_history.rs diff --git a/app/src/components/webhooks/ComposeioTriggerHistory.tsx b/app/src/components/webhooks/ComposeioTriggerHistory.tsx new file mode 100644 index 000000000..b25e4b7de --- /dev/null +++ b/app/src/components/webhooks/ComposeioTriggerHistory.tsx @@ -0,0 +1,81 @@ +import type { ComposioTriggerHistoryEntry } from '../../utils/tauriCommands'; + +interface ComposeioTriggerHistoryProps { + entries: ComposioTriggerHistoryEntry[]; +} + +function formatTimestamp(ts: number): string { + return new Date(ts).toLocaleString(undefined, { + year: 'numeric', + month: 'short', + day: '2-digit', + hour: '2-digit', + minute: '2-digit', + second: '2-digit', + }); +} + +function formatPayload(payload: unknown): string { + try { + return JSON.stringify(payload, null, 2); + } catch { + return String(payload); + } +} + +export default function ComposeioTriggerHistory({ entries }: ComposeioTriggerHistoryProps) { + if (entries.length === 0) { + return ( +
+

ComposeIO Trigger History

+

+ No ComposeIO triggers have been captured yet. +

+
+ ); + } + + return ( +
+

+ ComposeIO Trigger History{' '} + ({entries.length}) +

+
+ {entries.map(entry => ( +
+
+ + {entry.toolkit} + + + {entry.trigger} + + {formatTimestamp(entry.received_at_ms)} +
+ +
+
+
Metadata ID
+
{entry.metadata_id}
+
+
+
Metadata UUID
+
{entry.metadata_uuid}
+
+
+ +
+
Payload
+
+                {formatPayload(entry.payload)}
+              
+
+
+ ))} +
+
+ ); +} diff --git a/app/src/hooks/useComposeioTriggerHistory.ts b/app/src/hooks/useComposeioTriggerHistory.ts new file mode 100644 index 000000000..5f281f604 --- /dev/null +++ b/app/src/hooks/useComposeioTriggerHistory.ts @@ -0,0 +1,78 @@ +import debug from 'debug'; +import { useCallback, useEffect, useState } from 'react'; + +import { useCoreState } from '../providers/CoreStateProvider'; +import { + openhumanComposioListTriggerHistory, + type ComposioTriggerHistoryEntry, +} from '../utils/tauriCommands'; + +const log = debug('composio:history'); +const POLL_MS = 5000; + +export interface ComposeioTriggerHistoryState { + archiveDir: string | null; + currentDayFile: string | null; + entries: ComposioTriggerHistoryEntry[]; + loading: boolean; + error: string | null; + coreConnected: boolean; + refresh: () => Promise; +} + +export function useComposeioTriggerHistory(limit = 100): ComposeioTriggerHistoryState { + const { snapshot } = useCoreState(); + const [archiveDir, setArchiveDir] = useState(null); + const [currentDayFile, setCurrentDayFile] = useState(null); + const [entries, setEntries] = useState([]); + const [loading, setLoading] = useState(false); + const [error, setError] = useState(null); + const [coreConnected, setCoreConnected] = useState(false); + + const refresh = useCallback(async () => { + setLoading(true); + try { + const response = await openhumanComposioListTriggerHistory(limit); + const result = response.result.result; + setArchiveDir(result.archive_dir); + setCurrentDayFile(result.current_day_file); + setEntries(result.entries); + setError(null); + setCoreConnected(true); + log('loaded %d composio trigger entries', result.entries.length); + } catch (refreshError) { + const message = + refreshError instanceof Error ? refreshError.message : 'Failed to load ComposeIO history'; + setError(message); + setCoreConnected(false); + log('failed to load trigger history: %s', message); + } finally { + setLoading(false); + } + }, [limit]); + + useEffect(() => { + if (!snapshot.sessionToken) { + return; + } + + void refresh(); + const timer = window.setInterval(() => { + void refresh(); + }, POLL_MS); + + return () => { + window.clearInterval(timer); + }; + }, [snapshot.sessionToken, refresh]); + + return { + archiveDir, + currentDayFile, + entries, + loading, + error, + coreConnected, + refresh, + }; +} diff --git a/app/src/pages/Webhooks.tsx b/app/src/pages/Webhooks.tsx index e9fb27ba8..3d73621c0 100644 --- a/app/src/pages/Webhooks.tsx +++ b/app/src/pages/Webhooks.tsx @@ -1,28 +1,16 @@ -import TunnelList from '../components/webhooks/TunnelList'; -import WebhookActivity from '../components/webhooks/WebhookActivity'; -import { useWebhooks } from '../hooks/useWebhooks'; +import ComposeioTriggerHistory from '../components/webhooks/ComposeioTriggerHistory'; +import { useComposeioTriggerHistory } from '../hooks/useComposeioTriggerHistory'; export default function Webhooks() { - const { - tunnels, - registrations, - activity, - loading, - error, - coreConnected, - createTunnel, - deleteTunnel, - refreshTunnels, - registerEcho, - unregisterEcho, - } = useWebhooks(); + const { archiveDir, currentDayFile, entries, loading, error, coreConnected, refresh } = + useComposeioTriggerHistory(100); - if (loading && tunnels.length === 0) { + if (loading && entries.length === 0) { return (
- Loading webhooks… + Loading ComposeIO trigger history…
); @@ -32,8 +20,8 @@ export default function Webhooks() {
{/* Connection status */} -
-

Webhooks

+
+

ComposeIO Triggers

{coreConnected ? 'Connected' : 'Disconnected'} +
{error &&
{error}
}
- +
+

Archive

+

+ Every ComposeIO trigger is appended to a daily JSONL file. Files are labeled by UTC + day. +

+
+
+
Archive Directory
+
+ {archiveDir ?? 'Not available yet'} +
+
+
+
Today's File
+
+ {currentDayFile ?? 'Not available yet'} +
+
+
+
- +
diff --git a/app/src/utils/tauriCommands/composio.ts b/app/src/utils/tauriCommands/composio.ts new file mode 100644 index 000000000..2587a4f66 --- /dev/null +++ b/app/src/utils/tauriCommands/composio.ts @@ -0,0 +1,30 @@ +import { callCoreRpc } from '../../services/coreRpcClient'; +import { CommandResponse, isTauri } from './common'; + +export interface ComposioTriggerHistoryEntry { + received_at_ms: number; + toolkit: string; + trigger: string; + metadata_id: string; + metadata_uuid: string; + payload: unknown; +} + +export interface ComposioTriggerHistoryResult { + archive_dir: string; + current_day_file: string; + entries: ComposioTriggerHistoryEntry[]; +} + +export async function openhumanComposioListTriggerHistory( + limit = 100 +): Promise> { + if (!isTauri()) { + throw new Error('Not running in Tauri'); + } + + return await callCoreRpc>({ + method: 'openhuman.composio_list_trigger_history', + params: { limit }, + }); +} diff --git a/app/src/utils/tauriCommands/index.ts b/app/src/utils/tauriCommands/index.ts index ec7af23ef..c5724022a 100644 --- a/app/src/utils/tauriCommands/index.ts +++ b/app/src/utils/tauriCommands/index.ts @@ -7,6 +7,7 @@ export * from './window'; export * from './core'; export * from './memory'; export * from './webhooks'; +export * from './composio'; export * from './conscious'; export * from './subconscious'; export * from './localAi'; diff --git a/src/core/jsonrpc.rs b/src/core/jsonrpc.rs index 03b584cf7..6cc9bed7b 100644 --- a/src/core/jsonrpc.rs +++ b/src/core/jsonrpc.rs @@ -826,8 +826,13 @@ fn register_domain_subscribers(workspace_dir: std::path::PathBuf) { crate::openhuman::health::bus::register_health_subscriber(); crate::openhuman::memory::conversations::register_conversation_persistence_subscriber( - workspace_dir, + workspace_dir.clone(), ); + if let Err(error) = crate::openhuman::composio::init_composio_trigger_history( + workspace_dir.clone(), + ) { + log::warn!("[composio][history] failed to initialize trigger archive: {error}"); + } crate::openhuman::composio::register_composio_trigger_subscriber(); crate::openhuman::composio::start_periodic_sync(); diff --git a/src/openhuman/about_app/catalog.rs b/src/openhuman/about_app/catalog.rs index 41987fc5b..b15acdd2f 100644 --- a/src/openhuman/about_app/catalog.rs +++ b/src/openhuman/about_app/catalog.rs @@ -576,7 +576,8 @@ const CAPABILITIES: &[Capability] = &[ name: "Debug Webhooks", domain: "settings", category: CapabilityCategory::Settings, - description: "Inspect registered webhook tunnels and captured request and response logs.", + description: + "Inspect ComposeIO trigger history and find the daily JSONL archive files stored by the app.", how_to: "Settings > Developer Options > Webhooks", status: CapabilityStatus::Beta, }, diff --git a/src/openhuman/composio/bus.rs b/src/openhuman/composio/bus.rs index 3bd14c18a..47e6aeaa7 100644 --- a/src/openhuman/composio/bus.rs +++ b/src/openhuman/composio/bus.rs @@ -54,6 +54,7 @@ use async_trait::async_trait; use crate::core::event_bus::{subscribe_global, DomainEvent, EventHandler, SubscriptionHandle}; use crate::openhuman::agent::triage::{apply_decision, run_triage, TriggerEnvelope}; use crate::openhuman::config::rpc as config_rpc; +use crate::openhuman::composio::trigger_history; use super::client::ComposioClient; use super::providers::{get_provider, ProviderContext}; @@ -165,6 +166,25 @@ impl EventHandler for ComposioTriggerSubscriber { "[composio:bus] trigger received" ); + if let Some(store) = trigger_history::global() { + if let Err(error) = + store.record_trigger(toolkit, trigger, metadata_id, metadata_uuid, payload) + { + tracing::warn!( + toolkit = %toolkit, + trigger = %trigger, + error = %error, + "[composio][history] failed to archive trigger" + ); + } + } else { + tracing::debug!( + toolkit = %toolkit, + trigger = %trigger, + "[composio][history] archive store not initialized" + ); + } + if triage_disabled() { tracing::debug!( toolkit = %toolkit, diff --git a/src/openhuman/composio/mod.rs b/src/openhuman/composio/mod.rs index 8fdddc6fd..ddd17b4e4 100644 --- a/src/openhuman/composio/mod.rs +++ b/src/openhuman/composio/mod.rs @@ -42,6 +42,7 @@ pub mod periodic; pub mod providers; pub mod schemas; pub mod tools; +pub mod trigger_history; pub mod types; pub use bus::{register_composio_trigger_subscriber, ComposioTriggerSubscriber}; @@ -57,8 +58,12 @@ pub use schemas::{ all_registered_controllers as all_composio_registered_controllers, }; pub use tools::all_composio_agent_tools; +pub use trigger_history::{ + global as global_composio_trigger_history, init_global as init_composio_trigger_history, +}; pub use types::{ ComposioAuthorizeResponse, ComposioConnection, ComposioConnectionsResponse, ComposioDeleteResponse, ComposioExecuteResponse, ComposioToolFunction, ComposioToolSchema, - ComposioToolkitsResponse, ComposioToolsResponse, ComposioTriggerEvent, ComposioTriggerMetadata, + ComposioToolkitsResponse, ComposioToolsResponse, ComposioTriggerEvent, + ComposioTriggerHistoryEntry, ComposioTriggerHistoryResult, ComposioTriggerMetadata, }; diff --git a/src/openhuman/composio/ops.rs b/src/openhuman/composio/ops.rs index 3db2a238b..f3c3ddf15 100644 --- a/src/openhuman/composio/ops.rs +++ b/src/openhuman/composio/ops.rs @@ -28,6 +28,7 @@ use super::providers::{ use super::types::{ ComposioAuthorizeResponse, ComposioConnectionsResponse, ComposioDeleteResponse, ComposioExecuteResponse, ComposioToolkitsResponse, ComposioToolsResponse, + ComposioTriggerHistoryResult, }; /// Resolve a [`ComposioClient`] from the root config, or return an @@ -194,6 +195,37 @@ pub async fn composio_execute( } } +// ── Trigger history ──────────────────────────────────────────────── + +pub async fn composio_list_trigger_history( + config: &Config, + limit: Option, +) -> OpResult> { + let requested_limit = limit.unwrap_or(100).clamp(1, 500); + tracing::debug!( + limit = requested_limit, + workspace = %config.workspace_dir.display(), + "[composio] rpc list_trigger_history" + ); + + let store = super::trigger_history::global().ok_or_else(|| { + "[composio] trigger history unavailable: archive store is not initialized".to_string() + })?; + + let history = store + .list_recent(requested_limit) + .map_err(|error| format!("[composio] list_trigger_history failed: {error}"))?; + let count = history.entries.len(); + let archive_dir = history.archive_dir.clone(); + + Ok(RpcOutcome::new( + history, + vec![format!( + "composio: {count} trigger history entrie(s) loaded from {archive_dir}" + )], + )) +} + // ── Provider-backed ops ───────────────────────────────────────────── // // `composio_get_user_profile` and `composio_sync` route through the diff --git a/src/openhuman/composio/schemas.rs b/src/openhuman/composio/schemas.rs index 38079d648..74e437781 100644 --- a/src/openhuman/composio/schemas.rs +++ b/src/openhuman/composio/schemas.rs @@ -19,6 +19,11 @@ use crate::core::{ControllerSchema, FieldSchema, TypeSchema}; use crate::openhuman::config::rpc as config_rpc; use crate::rpc::RpcOutcome; +#[derive(Debug, serde::Deserialize)] +struct TriggerHistoryParams { + limit: Option, +} + pub fn all_controller_schemas() -> Vec { vec![ schemas("list_toolkits"), @@ -29,6 +34,7 @@ pub fn all_controller_schemas() -> Vec { schemas("execute"), schemas("get_user_profile"), schemas("sync"), + schemas("list_trigger_history"), ] } @@ -66,6 +72,10 @@ pub fn all_registered_controllers() -> Vec { schema: schemas("sync"), handler: handle_sync, }, + RegisteredController { + schema: schemas("list_trigger_history"), + handler: handle_list_trigger_history, + }, ] } @@ -230,6 +240,24 @@ pub fn schemas(function: &str) -> ControllerSchema { required: true, }], }, + "list_trigger_history" => ControllerSchema { + namespace: "composio", + function: "list_trigger_history", + description: + "List recent ComposeIO trigger events archived by the core and report the daily JSONL archive paths.", + inputs: vec![FieldSchema { + name: "limit", + ty: TypeSchema::Option(Box::new(TypeSchema::U64)), + comment: "Maximum number of archived trigger events to return.", + required: false, + }], + outputs: vec![FieldSchema { + name: "result", + ty: TypeSchema::Json, + comment: "Trigger history payload: { archive_dir, current_day_file, entries }.", + required: true, + }], + }, _other => ControllerSchema { namespace: "composio", function: "unknown", @@ -299,6 +327,15 @@ fn handle_execute(params: Map) -> ControllerFuture { }) } +fn handle_list_trigger_history(params: Map) -> ControllerFuture { + Box::pin(async move { + let config = config_rpc::load_config_with_timeout().await?; + let payload: TriggerHistoryParams = + serde_json::from_value(Value::Object(params)).map_err(|e| format!("invalid params: {e}"))?; + to_json(super::ops::composio_list_trigger_history(&config, payload.limit).await?) + }) +} + fn handle_get_user_profile(params: Map) -> ControllerFuture { Box::pin(async move { let config = config_rpc::load_config_with_timeout().await?; diff --git a/src/openhuman/composio/trigger_history.rs b/src/openhuman/composio/trigger_history.rs new file mode 100644 index 000000000..b4d8029eb --- /dev/null +++ b/src/openhuman/composio/trigger_history.rs @@ -0,0 +1,235 @@ +//! Persistent ComposeIO trigger history. +//! +//! Stores every incoming ComposeIO trigger as a JSONL record partitioned by +//! UTC day under `/state/triggers/YYYY-MM-DD.jsonl`. + +use std::fs::{self, OpenOptions}; +use std::io::{BufRead, BufReader, Write}; +use std::path::{Path, PathBuf}; +use std::sync::{Arc, OnceLock}; + +use chrono::Utc; + +use super::types::{ComposioTriggerHistoryEntry, ComposioTriggerHistoryResult}; + +static GLOBAL_TRIGGER_HISTORY: OnceLock> = OnceLock::new(); + +const TRIGGER_ARCHIVE_DIR: &str = "triggers"; + +pub fn init_global(workspace_dir: PathBuf) -> Result<(), String> { + if GLOBAL_TRIGGER_HISTORY.get().is_some() { + return Ok(()); + } + + let store = Arc::new(ComposioTriggerHistoryStore::new(&workspace_dir)?); + let _ = GLOBAL_TRIGGER_HISTORY.set(store); + Ok(()) +} + +pub fn global() -> Option> { + GLOBAL_TRIGGER_HISTORY.get().cloned() +} + +pub struct ComposioTriggerHistoryStore { + archive_dir: PathBuf, +} + +impl ComposioTriggerHistoryStore { + pub fn new(workspace_dir: &Path) -> Result { + let archive_dir = workspace_dir.join("state").join(TRIGGER_ARCHIVE_DIR); + fs::create_dir_all(&archive_dir).map_err(|error| { + format!( + "[composio][history] failed to create archive directory {}: {error}", + archive_dir.display() + ) + })?; + + tracing::debug!( + archive_dir = %archive_dir.display(), + "[composio][history] archive initialized" + ); + + Ok(Self { archive_dir }) + } + + pub fn record_trigger( + &self, + toolkit: &str, + trigger: &str, + metadata_id: &str, + metadata_uuid: &str, + payload: &serde_json::Value, + ) -> Result { + let entry = ComposioTriggerHistoryEntry { + received_at_ms: now_ms(), + toolkit: toolkit.to_string(), + trigger: trigger.to_string(), + metadata_id: metadata_id.to_string(), + metadata_uuid: metadata_uuid.to_string(), + payload: payload.clone(), + }; + + let path = self.current_day_file_path(); + let line = serde_json::to_string(&entry) + .map_err(|error| format!("[composio][history] failed to serialize trigger: {error}"))?; + + let mut file = OpenOptions::new() + .create(true) + .append(true) + .open(&path) + .map_err(|error| { + format!( + "[composio][history] failed to open archive file {}: {error}", + path.display() + ) + })?; + + writeln!(file, "{line}").map_err(|error| { + format!( + "[composio][history] failed to append archive file {}: {error}", + path.display() + ) + })?; + + tracing::debug!( + toolkit = %entry.toolkit, + trigger = %entry.trigger, + metadata_id = %entry.metadata_id, + archive_file = %path.display(), + "[composio][history] trigger archived" + ); + + Ok(entry) + } + + pub fn list_recent(&self, limit: usize) -> Result { + let limit = limit.max(1); + let mut day_files = self.list_day_files()?; + day_files.sort_by(|left, right| right.cmp(left)); + + let mut entries = Vec::new(); + for file in day_files { + let mut file_entries = self.read_day_file(&file)?; + file_entries.reverse(); + for entry in file_entries { + entries.push(entry); + if entries.len() >= limit { + break; + } + } + if entries.len() >= limit { + break; + } + } + + Ok(ComposioTriggerHistoryResult { + archive_dir: self.archive_dir.display().to_string(), + current_day_file: self.current_day_file_path().display().to_string(), + entries, + }) + } + + fn list_day_files(&self) -> Result, String> { + let dir = fs::read_dir(&self.archive_dir).map_err(|error| { + format!( + "[composio][history] failed to read archive directory {}: {error}", + self.archive_dir.display() + ) + })?; + + Ok(dir + .filter_map(|entry| entry.ok().map(|value| value.path())) + .filter(|path| path.extension().is_some_and(|ext| ext == "jsonl")) + .collect()) + } + + fn read_day_file(&self, path: &Path) -> Result, String> { + let file = OpenOptions::new().read(true).open(path).map_err(|error| { + format!( + "[composio][history] failed to open archive file {}: {error}", + path.display() + ) + })?; + + let reader = BufReader::new(file); + let mut entries = Vec::new(); + + for line in reader.lines() { + let line = match line { + Ok(line) if !line.trim().is_empty() => line, + Ok(_) => continue, + Err(error) => { + tracing::warn!( + archive_file = %path.display(), + error = %error, + "[composio][history] failed to read line" + ); + continue; + } + }; + + match serde_json::from_str::(&line) { + Ok(entry) => entries.push(entry), + Err(error) => { + tracing::warn!( + archive_file = %path.display(), + error = %error, + "[composio][history] failed to parse archived trigger line" + ); + } + } + } + + Ok(entries) + } + + fn current_day_file_path(&self) -> PathBuf { + self.archive_dir + .join(format!("{}.jsonl", Utc::now().format("%Y-%m-%d"))) + } +} + +fn now_ms() -> u64 { + std::time::SystemTime::now() + .duration_since(std::time::UNIX_EPOCH) + .map(|duration| duration.as_millis() as u64) + .unwrap_or(0) +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn archives_triggers_in_daily_jsonl_and_lists_latest_first() { + let temp = tempfile::tempdir().expect("tempdir"); + let workspace = temp.path().join("workspace"); + fs::create_dir_all(&workspace).expect("workspace dir"); + + let store = ComposioTriggerHistoryStore::new(&workspace).expect("store"); + store + .record_trigger( + "gmail", + "GMAIL_NEW_GMAIL_MESSAGE", + "id-1", + "uuid-1", + &serde_json::json!({ "subject": "hello" }), + ) + .expect("record first"); + store + .record_trigger( + "notion", + "NOTION_NEW_PAGE", + "id-2", + "uuid-2", + &serde_json::json!({ "title": "roadmap" }), + ) + .expect("record second"); + + let history = store.list_recent(10).expect("list"); + assert_eq!(history.entries.len(), 2); + assert_eq!(history.entries[0].metadata_id, "id-2"); + assert_eq!(history.entries[1].metadata_id, "id-1"); + assert!(PathBuf::from(&history.current_day_file).exists()); + } +} diff --git a/src/openhuman/composio/types.rs b/src/openhuman/composio/types.rs index f2d7bd515..e23024580 100644 --- a/src/openhuman/composio/types.rs +++ b/src/openhuman/composio/types.rs @@ -145,3 +145,29 @@ pub struct ComposioTriggerMetadata { #[serde(default)] pub uuid: String, } + +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct ComposioTriggerHistoryEntry { + /// Unix timestamp in milliseconds when the trigger reached the core. + pub received_at_ms: u64, + /// Toolkit slug, e.g. `"gmail"`. + pub toolkit: String, + /// Trigger slug, e.g. `"GMAIL_NEW_GMAIL_MESSAGE"`. + pub trigger: String, + /// Backend metadata id for this event. + pub metadata_id: String, + /// Backend metadata UUID for this event. + pub metadata_uuid: String, + /// Raw provider payload as forwarded by the backend socket event. + pub payload: serde_json::Value, +} + +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct ComposioTriggerHistoryResult { + /// Directory containing daily JSONL archives. + pub archive_dir: String, + /// Today's JSONL file path. + pub current_day_file: String, + /// Recent triggers, newest first. + pub entries: Vec, +} From 078485c1076b75596be59e61345dde0191b7c065 Mon Sep 17 00:00:00 2001 From: Steven Enamakel Date: Sun, 12 Apr 2026 15:43:55 -0700 Subject: [PATCH 6/7] style: apply repo formatting fixes --- .../components/webhooks/ComposeioTriggerHistory.tsx | 4 +++- app/src/hooks/useComposeioTriggerHistory.ts | 12 ++---------- app/src/overlay/OverlayApp.tsx | 13 +++++-------- app/src/pages/Webhooks.tsx | 8 ++++++-- src/openhuman/composio/bus.rs | 2 +- src/openhuman/composio/schemas.rs | 4 ++-- src/openhuman/local_ai/service/assets.rs | 12 +++++------- src/openhuman/local_ai/service/public_infer.rs | 4 +--- 8 files changed, 25 insertions(+), 34 deletions(-) diff --git a/app/src/components/webhooks/ComposeioTriggerHistory.tsx b/app/src/components/webhooks/ComposeioTriggerHistory.tsx index b25e4b7de..a7ac6ea0f 100644 --- a/app/src/components/webhooks/ComposeioTriggerHistory.tsx +++ b/app/src/components/webhooks/ComposeioTriggerHistory.tsx @@ -53,7 +53,9 @@ export default function ComposeioTriggerHistory({ entries }: ComposeioTriggerHis {entry.trigger} - {formatTimestamp(entry.received_at_ms)} + + {formatTimestamp(entry.received_at_ms)} +
diff --git a/app/src/hooks/useComposeioTriggerHistory.ts b/app/src/hooks/useComposeioTriggerHistory.ts index 5f281f604..1a8c2a177 100644 --- a/app/src/hooks/useComposeioTriggerHistory.ts +++ b/app/src/hooks/useComposeioTriggerHistory.ts @@ -3,8 +3,8 @@ import { useCallback, useEffect, useState } from 'react'; import { useCoreState } from '../providers/CoreStateProvider'; import { - openhumanComposioListTriggerHistory, type ComposioTriggerHistoryEntry, + openhumanComposioListTriggerHistory, } from '../utils/tauriCommands'; const log = debug('composio:history'); @@ -66,13 +66,5 @@ export function useComposeioTriggerHistory(limit = 100): ComposeioTriggerHistory }; }, [snapshot.sessionToken, refresh]); - return { - archiveDir, - currentDayFile, - entries, - loading, - error, - coreConnected, - refresh, - }; + return { archiveDir, currentDayFile, entries, loading, error, coreConnected, refresh }; } diff --git a/app/src/overlay/OverlayApp.tsx b/app/src/overlay/OverlayApp.tsx index 0c08b3702..364e48edb 100644 --- a/app/src/overlay/OverlayApp.tsx +++ b/app/src/overlay/OverlayApp.tsx @@ -358,7 +358,9 @@ export default function OverlayApp() { (serverState === 'recording' || serverState === 'transcribing') && currentMode === 'idle' ) { - console.debug(`[overlay] poll sync: server=${serverState}, overlay=idle → activating stt`); + console.debug( + `[overlay] poll sync: server=${serverState}, overlay=idle → activating stt` + ); setMode('stt'); setBubble({ id: `stt-poll-${Date.now()}`, @@ -369,13 +371,8 @@ export default function OverlayApp() { } // Server is idle/stopped but overlay thinks it's in stt → dismiss - if ( - (serverState === 'idle' || serverState === 'stopped') && - currentMode === 'stt' - ) { - console.debug( - `[overlay] poll sync: server=${serverState}, overlay=stt → dismissing` - ); + if ((serverState === 'idle' || serverState === 'stopped') && currentMode === 'stt') { + console.debug(`[overlay] poll sync: server=${serverState}, overlay=stt → dismissing`); goIdle(); } } catch { diff --git a/app/src/pages/Webhooks.tsx b/app/src/pages/Webhooks.tsx index 3d73621c0..bd207ab61 100644 --- a/app/src/pages/Webhooks.tsx +++ b/app/src/pages/Webhooks.tsx @@ -52,13 +52,17 @@ export default function Webhooks() {

-
Archive Directory
+
+ Archive Directory +
{archiveDir ?? 'Not available yet'}
-
Today's File
+
+ Today's File +
{currentDayFile ?? 'Not available yet'}
diff --git a/src/openhuman/composio/bus.rs b/src/openhuman/composio/bus.rs index 47e6aeaa7..6d3b2a882 100644 --- a/src/openhuman/composio/bus.rs +++ b/src/openhuman/composio/bus.rs @@ -53,8 +53,8 @@ use async_trait::async_trait; use crate::core::event_bus::{subscribe_global, DomainEvent, EventHandler, SubscriptionHandle}; use crate::openhuman::agent::triage::{apply_decision, run_triage, TriggerEnvelope}; -use crate::openhuman::config::rpc as config_rpc; use crate::openhuman::composio::trigger_history; +use crate::openhuman::config::rpc as config_rpc; use super::client::ComposioClient; use super::providers::{get_provider, ProviderContext}; diff --git a/src/openhuman/composio/schemas.rs b/src/openhuman/composio/schemas.rs index 74e437781..2ee97533f 100644 --- a/src/openhuman/composio/schemas.rs +++ b/src/openhuman/composio/schemas.rs @@ -330,8 +330,8 @@ fn handle_execute(params: Map) -> ControllerFuture { fn handle_list_trigger_history(params: Map) -> ControllerFuture { Box::pin(async move { let config = config_rpc::load_config_with_timeout().await?; - let payload: TriggerHistoryParams = - serde_json::from_value(Value::Object(params)).map_err(|e| format!("invalid params: {e}"))?; + let payload: TriggerHistoryParams = serde_json::from_value(Value::Object(params)) + .map_err(|e| format!("invalid params: {e}"))?; to_json(super::ops::composio_list_trigger_history(&config, payload.limit).await?) }) } diff --git a/src/openhuman/local_ai/service/assets.rs b/src/openhuman/local_ai/service/assets.rs index 7d7f2ed99..04a36ba8c 100644 --- a/src/openhuman/local_ai/service/assets.rs +++ b/src/openhuman/local_ai/service/assets.rs @@ -64,18 +64,16 @@ impl LocalAiService { }; if let Err(ref err) = stt_resolve { - debug!( - "[local_ai::assets_status] STT resolve failed (state={stt_state}): {err}" - ); + debug!("[local_ai::assets_status] STT resolve failed (state={stt_state}): {err}"); } if let Err(ref err) = tts_resolve { - debug!( - "[local_ai::assets_status] TTS resolve failed (state={tts_state}): {err}" - ); + debug!("[local_ai::assets_status] TTS resolve failed (state={tts_state}): {err}"); } let stt_warning = match stt_state { - "ondemand" => Some("STT model will download on first transcription request.".to_string()), + "ondemand" => { + Some("STT model will download on first transcription request.".to_string()) + } _ => None, }; let tts_warning = match tts_state { diff --git a/src/openhuman/local_ai/service/public_infer.rs b/src/openhuman/local_ai/service/public_infer.rs index 36e23f507..1917541db 100644 --- a/src/openhuman/local_ai/service/public_infer.rs +++ b/src/openhuman/local_ai/service/public_infer.rs @@ -313,9 +313,7 @@ impl LocalAiService { // prompt so the model treats it as a directive rather than content // it might parrot back. let effective_system = if no_think { - format!( - "{system}\n\nRespond with only the final answer. No reasoning, no preamble." - ) + format!("{system}\n\nRespond with only the final answer. No reasoning, no preamble.") } else { system.to_string() }; From aa8d9a9329e3832675c4f48d3e243386f53590cc Mon Sep 17 00:00:00 2001 From: Steven Enamakel Date: Sun, 12 Apr 2026 16:31:55 -0700 Subject: [PATCH 7/7] feat: add fs2 dependency and enhance ComposeIO trigger history handling - Introduced the `fs2` crate to manage file locking, improving the reliability of file operations in the ComposeIO trigger history. - Updated `ComposioTriggerHistoryStore` to utilize exclusive file locks during archive writing, ensuring data integrity. - Enhanced error handling for file operations, providing clearer logging for failures related to file access and locking. - Refactored the initialization logic for global trigger history to prevent duplicate setups, improving overall stability. --- Cargo.lock | 11 ++++ Cargo.toml | 1 + .../components/settings/panels/VoicePanel.tsx | 7 +++ .../webhooks/ComposeioTriggerHistory.tsx | 3 +- app/src/hooks/useComposeioTriggerHistory.ts | 47 ++++++++++++++- app/src/overlay/OverlayApp.tsx | 18 ++++-- app/src/utils/tauriCommands/composio.ts | 2 +- src/openhuman/about_app/catalog.rs | 2 +- src/openhuman/composio/bus.rs | 41 ++++++++++--- src/openhuman/composio/ops.rs | 10 +++- src/openhuman/composio/trigger_history.rs | 58 +++++++++++++++++-- .../local_ai/service/public_infer.rs | 17 +++++- 12 files changed, 188 insertions(+), 29 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 4e4224f00..cb88a7345 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2184,6 +2184,16 @@ dependencies = [ "percent-encoding", ] +[[package]] +name = "fs2" +version = "0.4.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9564fc758e15025b46aa6643b1b77d047d1a56a1aea6e01002ac0c7026876213" +dependencies = [ + "libc", + "winapi", +] + [[package]] name = "fs_extra" version = "1.3.0" @@ -4658,6 +4668,7 @@ dependencies = [ "env_logger", "fantoccini", "fastembed", + "fs2", "futures", "futures-util", "hex", diff --git a/Cargo.toml b/Cargo.toml index 441155bce..9d9577e96 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -90,6 +90,7 @@ hound = "3.5" enigo = "0.3" arboard = "3" rdev = "0.5" +fs2 = "0.4" matrix-sdk = { version = "0.16", optional = true, default-features = false, features = ["e2e-encryption", "rustls-tls", "markdown"] } fantoccini = { version = "0.22.0", optional = true, default-features = false, features = ["rustls-tls"] } diff --git a/app/src/components/settings/panels/VoicePanel.tsx b/app/src/components/settings/panels/VoicePanel.tsx index afdf55fb7..1d4023be5 100644 --- a/app/src/components/settings/panels/VoicePanel.tsx +++ b/app/src/components/settings/panels/VoicePanel.tsx @@ -67,6 +67,13 @@ const VoicePanel = () => { setVoiceStatus(voiceResponse); const sttAssetState = assetsResponse.result.stt?.state; const sttAssetOk = sttAssetState === 'ready' || sttAssetState === 'ondemand'; + if (process.env.NODE_ENV !== 'production') { + console.debug('[VoicePanel:stt] readiness decision', { + sttAssetState, + sttAssetOk, + sttAvailable: voiceResponse.stt_available, + }); + } setSttReady(sttAssetOk && voiceResponse.stt_available); setError(null); } catch (err) { diff --git a/app/src/components/webhooks/ComposeioTriggerHistory.tsx b/app/src/components/webhooks/ComposeioTriggerHistory.tsx index a7ac6ea0f..261ec799b 100644 --- a/app/src/components/webhooks/ComposeioTriggerHistory.tsx +++ b/app/src/components/webhooks/ComposeioTriggerHistory.tsx @@ -17,7 +17,8 @@ function formatTimestamp(ts: number): string { function formatPayload(payload: unknown): string { try { - return JSON.stringify(payload, null, 2); + const formatted = JSON.stringify(payload, null, 2); + return formatted ?? String(payload); } catch { return String(payload); } diff --git a/app/src/hooks/useComposeioTriggerHistory.ts b/app/src/hooks/useComposeioTriggerHistory.ts index 1a8c2a177..53a4dc514 100644 --- a/app/src/hooks/useComposeioTriggerHistory.ts +++ b/app/src/hooks/useComposeioTriggerHistory.ts @@ -1,5 +1,5 @@ import debug from 'debug'; -import { useCallback, useEffect, useState } from 'react'; +import { useCallback, useEffect, useRef, useState } from 'react'; import { useCoreState } from '../providers/CoreStateProvider'; import { @@ -28,11 +28,39 @@ export function useComposeioTriggerHistory(limit = 100): ComposeioTriggerHistory const [loading, setLoading] = useState(false); const [error, setError] = useState(null); const [coreConnected, setCoreConnected] = useState(false); + const isRefreshingRef = useRef(false); + const sessionTokenRef = useRef(snapshot.sessionToken); + + const clearHistory = useCallback(() => { + setArchiveDir(null); + setCurrentDayFile(null); + setEntries([]); + setLoading(false); + setError(null); + setCoreConnected(false); + }, []); + + useEffect(() => { + sessionTokenRef.current = snapshot.sessionToken; + }, [snapshot.sessionToken]); const refresh = useCallback(async () => { + if (isRefreshingRef.current) { + return; + } + if (!snapshot.sessionToken) { + clearHistory(); + return; + } + + const requestToken = snapshot.sessionToken; + isRefreshingRef.current = true; setLoading(true); try { const response = await openhumanComposioListTriggerHistory(limit); + if (!sessionTokenRef.current || sessionTokenRef.current !== requestToken) { + return; + } const result = response.result.result; setArchiveDir(result.archive_dir); setCurrentDayFile(result.current_day_file); @@ -41,18 +69,31 @@ export function useComposeioTriggerHistory(limit = 100): ComposeioTriggerHistory setCoreConnected(true); log('loaded %d composio trigger entries', result.entries.length); } catch (refreshError) { + if (!sessionTokenRef.current || sessionTokenRef.current !== requestToken) { + return; + } const message = refreshError instanceof Error ? refreshError.message : 'Failed to load ComposeIO history'; setError(message); setCoreConnected(false); log('failed to load trigger history: %s', message); } finally { + isRefreshingRef.current = false; setLoading(false); } - }, [limit]); + }, [clearHistory, limit, snapshot.sessionToken]); + + useEffect(() => { + if (snapshot.sessionToken) { + return; + } + + clearHistory(); + }, [clearHistory, snapshot.sessionToken]); useEffect(() => { if (!snapshot.sessionToken) { + clearHistory(); return; } @@ -64,7 +105,7 @@ export function useComposeioTriggerHistory(limit = 100): ComposeioTriggerHistory return () => { window.clearInterval(timer); }; - }, [snapshot.sessionToken, refresh]); + }, [clearHistory, refresh, snapshot.sessionToken]); return { archiveDir, currentDayFile, entries, loading, error, coreConnected, refresh }; } diff --git a/app/src/overlay/OverlayApp.tsx b/app/src/overlay/OverlayApp.tsx index 364e48edb..ca32a1c42 100644 --- a/app/src/overlay/OverlayApp.tsx +++ b/app/src/overlay/OverlayApp.tsx @@ -52,6 +52,7 @@ const DEFAULT_ATTENTION_TTL_MS = 6000; const STT_RELEASE_LINGER_MS = 1500; /** Placeholder bubble text while waiting for the first transcription. */ const STT_LISTENING_PLACEHOLDER = '"Listening…"'; +let lastPollDebugTs = 0; // ── State model ────────────────────────────────────────────────────────── @@ -356,11 +357,12 @@ export default function OverlayApp() { // Server is actively recording/transcribing but overlay is idle → show stt if ( (serverState === 'recording' || serverState === 'transcribing') && - currentMode === 'idle' + currentMode !== 'stt' ) { console.debug( - `[overlay] poll sync: server=${serverState}, overlay=idle → activating stt` + `[overlay] poll sync: server=${serverState}, overlay=${currentMode} → activating stt` ); + clearDismissTimer(); setMode('stt'); setBubble({ id: `stt-poll-${Date.now()}`, @@ -375,8 +377,14 @@ export default function OverlayApp() { console.debug(`[overlay] poll sync: server=${serverState}, overlay=stt → dismissing`); goIdle(); } - } catch { - // Core not reachable — ignore, will retry on next poll + } catch (err) { + if (process.env.NODE_ENV !== 'production') { + const now = Date.now(); + if (now - lastPollDebugTs > 5000) { + lastPollDebugTs = now; + console.debug('[overlay] RPC poll failed', err); + } + } } }; @@ -386,7 +394,7 @@ export default function OverlayApp() { disposed = true; window.clearInterval(id); }; - }, [goIdle]); + }, [clearDismissTimer, goIdle]); // ── Window framing: resize / reposition on mode change ──────────────── const status: 'idle' | 'active' = mode === 'idle' ? 'idle' : 'active'; diff --git a/app/src/utils/tauriCommands/composio.ts b/app/src/utils/tauriCommands/composio.ts index 2587a4f66..9882a21b4 100644 --- a/app/src/utils/tauriCommands/composio.ts +++ b/app/src/utils/tauriCommands/composio.ts @@ -1,5 +1,5 @@ import { callCoreRpc } from '../../services/coreRpcClient'; -import { CommandResponse, isTauri } from './common'; +import { type CommandResponse, isTauri } from './common'; export interface ComposioTriggerHistoryEntry { received_at_ms: number; diff --git a/src/openhuman/about_app/catalog.rs b/src/openhuman/about_app/catalog.rs index b15acdd2f..f9c0a27b4 100644 --- a/src/openhuman/about_app/catalog.rs +++ b/src/openhuman/about_app/catalog.rs @@ -577,7 +577,7 @@ const CAPABILITIES: &[Capability] = &[ domain: "settings", category: CapabilityCategory::Settings, description: - "Inspect ComposeIO trigger history and find the daily JSONL archive files stored by the app.", + "Inspect Composio trigger history and find the daily JSONL archive files stored by the app.", how_to: "Settings > Developer Options > Webhooks", status: CapabilityStatus::Beta, }, diff --git a/src/openhuman/composio/bus.rs b/src/openhuman/composio/bus.rs index 6d3b2a882..bf2ec5b19 100644 --- a/src/openhuman/composio/bus.rs +++ b/src/openhuman/composio/bus.rs @@ -167,15 +167,40 @@ impl EventHandler for ComposioTriggerSubscriber { ); if let Some(store) = trigger_history::global() { - if let Err(error) = - store.record_trigger(toolkit, trigger, metadata_id, metadata_uuid, payload) + let toolkit_owned = toolkit.clone(); + let trigger_owned = trigger.clone(); + let metadata_id_owned = metadata_id.clone(); + let metadata_uuid_owned = metadata_uuid.clone(); + let payload_owned = payload.clone(); + + match tokio::task::spawn_blocking(move || { + store.record_trigger( + &toolkit_owned, + &trigger_owned, + &metadata_id_owned, + &metadata_uuid_owned, + &payload_owned, + ) + }) + .await { - tracing::warn!( - toolkit = %toolkit, - trigger = %trigger, - error = %error, - "[composio][history] failed to archive trigger" - ); + Ok(Ok(_)) => {} + Ok(Err(error)) => { + tracing::warn!( + toolkit = %toolkit, + trigger = %trigger, + error = %error, + "[composio][history] failed to archive trigger" + ); + } + Err(error) => { + tracing::warn!( + toolkit = %toolkit, + trigger = %trigger, + error = %error, + "[composio][history] failed to join archive task" + ); + } } } else { tracing::debug!( diff --git a/src/openhuman/composio/ops.rs b/src/openhuman/composio/ops.rs index f3c3ddf15..ce1bff468 100644 --- a/src/openhuman/composio/ops.rs +++ b/src/openhuman/composio/ops.rs @@ -202,9 +202,14 @@ pub async fn composio_list_trigger_history( limit: Option, ) -> OpResult> { let requested_limit = limit.unwrap_or(100).clamp(1, 500); + let workspace_label = config + .workspace_dir + .file_name() + .and_then(|value| value.to_str()) + .unwrap_or(""); tracing::debug!( limit = requested_limit, - workspace = %config.workspace_dir.display(), + workspace = workspace_label, "[composio] rpc list_trigger_history" ); @@ -216,12 +221,11 @@ pub async fn composio_list_trigger_history( .list_recent(requested_limit) .map_err(|error| format!("[composio] list_trigger_history failed: {error}"))?; let count = history.entries.len(); - let archive_dir = history.archive_dir.clone(); Ok(RpcOutcome::new( history, vec![format!( - "composio: {count} trigger history entrie(s) loaded from {archive_dir}" + "composio: {count} trigger history entrie(s) loaded (archive present)" )], )) } diff --git a/src/openhuman/composio/trigger_history.rs b/src/openhuman/composio/trigger_history.rs index b4d8029eb..f98f41fec 100644 --- a/src/openhuman/composio/trigger_history.rs +++ b/src/openhuman/composio/trigger_history.rs @@ -9,6 +9,7 @@ use std::path::{Path, PathBuf}; use std::sync::{Arc, OnceLock}; use chrono::Utc; +use fs2::FileExt; use super::types::{ComposioTriggerHistoryEntry, ComposioTriggerHistoryResult}; @@ -17,13 +18,41 @@ static GLOBAL_TRIGGER_HISTORY: OnceLock> = Once const TRIGGER_ARCHIVE_DIR: &str = "triggers"; pub fn init_global(workspace_dir: PathBuf) -> Result<(), String> { - if GLOBAL_TRIGGER_HISTORY.get().is_some() { - return Ok(()); + let expected_archive_dir = workspace_dir.join("state").join(TRIGGER_ARCHIVE_DIR); + if let Some(existing) = GLOBAL_TRIGGER_HISTORY.get() { + if existing.archive_dir == expected_archive_dir { + return Ok(()); + } + + return Err(format!( + "[composio][history] global store already initialized for {} while attempting {}", + existing.archive_dir.display(), + expected_archive_dir.display() + )); } let store = Arc::new(ComposioTriggerHistoryStore::new(&workspace_dir)?); - let _ = GLOBAL_TRIGGER_HISTORY.set(store); - Ok(()) + match GLOBAL_TRIGGER_HISTORY.set(store.clone()) { + Ok(()) => Ok(()), + Err(_) => { + if let Some(existing) = GLOBAL_TRIGGER_HISTORY.get() { + if existing.archive_dir == store.archive_dir { + return Ok(()); + } + + return Err(format!( + "[composio][history] global store already initialized for {} while attempting {}", + existing.archive_dir.display(), + store.archive_dir.display() + )); + } + + Err(format!( + "[composio][history] failed to initialize global store for {}", + store.archive_dir.display() + )) + } + } } pub fn global() -> Option> { @@ -84,12 +113,29 @@ impl ComposioTriggerHistoryStore { ) })?; - writeln!(file, "{line}").map_err(|error| { + file.lock_exclusive().map_err(|error| { format!( - "[composio][history] failed to append archive file {}: {error}", + "[composio][history] failed to lock archive file {}: {error}", path.display() ) })?; + let write_result = writeln!(file, "{line}") + .and_then(|_| file.flush()) + .map_err(|error| { + format!( + "[composio][history] failed to append archive file {}: {error}", + path.display() + ) + }); + let unlock_result = file.unlock().map_err(|error| { + format!( + "[composio][history] failed to unlock archive file {}: {error}", + path.display() + ) + }); + + write_result?; + unlock_result?; tracing::debug!( toolkit = %entry.toolkit, diff --git a/src/openhuman/local_ai/service/public_infer.rs b/src/openhuman/local_ai/service/public_infer.rs index 1917541db..2f99615a5 100644 --- a/src/openhuman/local_ai/service/public_infer.rs +++ b/src/openhuman/local_ai/service/public_infer.rs @@ -308,18 +308,33 @@ impl LocalAiService { } let started = std::time::Instant::now(); + let model_id = model_ids::effective_chat_model_id(config); // When `no_think` is set, append the instruction to the system // prompt so the model treats it as a directive rather than content // it might parrot back. let effective_system = if no_think { + tracing::debug!( + no_think = true, + max_tokens = ?max_tokens, + allow_empty = allow_empty, + model = %model_id, + "[local_ai:infer] selected no_think prompt branch" + ); format!("{system}\n\nRespond with only the final answer. No reasoning, no preamble.") } else { + tracing::debug!( + no_think = false, + max_tokens = ?max_tokens, + allow_empty = allow_empty, + model = %model_id, + "[local_ai:infer] selected standard prompt branch" + ); system.to_string() }; let body = OllamaGenerateRequest { - model: model_ids::effective_chat_model_id(config), + model: model_id, prompt: prompt.to_string(), system: Some(effective_system), images: None,