diff --git a/crates/jp_cli/src/cmd/query.rs b/crates/jp_cli/src/cmd/query.rs index 231517a..97a9ae7 100644 --- a/crates/jp_cli/src/cmd/query.rs +++ b/crates/jp_cli/src/cmd/query.rs @@ -363,10 +363,13 @@ impl Query { // Keep track of the number of events in the stream, so that we can // later append new events to the end. - let current_events = stream.len(); + let mut current_events = stream.len(); let mut thread = build_thread(stream, attachments, &cfg.assistant, &tools)?; + let root = ctx.workspace.root().to_path_buf(); + let workspace = &mut ctx.workspace; + let mut result = Output::Ok(Success::Ok); if let Some(schema) = self.schema.clone() { result = handle_structured_output( @@ -385,10 +388,12 @@ impl Query { &cfg, &mut ctx.signals.receiver, &ctx.mcp_client, - ctx.workspace.root().to_path_buf(), + root, ctx.term.is_tty, &mut turn_state, &mut thread, + &mut current_events, + workspace, cfg.assistant.tool_choice.clone(), tools, conversation_id, @@ -581,11 +586,30 @@ impl Query { is_tty: bool, turn_state: &mut TurnState, thread: &mut Thread, + thread_skip: &mut usize, + workspace: &mut Workspace, tool_choice: ToolChoice, tools: Vec, conversation_id: ConversationId, printer: Arc, ) -> Result<()> { + // Append any new events to the end of the stream. + // + // By doing this in our `handle_stream` loop, we allow the in-memory + // state to be live-updated, which in turn allows us to call + // `Workspace::persist_active_conversation` in the loop, ensuring data + // is persisted even if the process is interrupted. + for event in thread.events.iter().skip(*thread_skip) { + *thread_skip += 1; + + workspace + .get_events_mut(&conversation_id) + .expect("TODO: add this invariant to the type system") + .push_with_config_delta(event); + } + + workspace.persist_active_conversation()?; + let mut result = Ok(()); let mut cancelled = false; turn_state.request_count += 1; @@ -689,6 +713,8 @@ impl Query { turn_state, provider.as_ref(), thread, + thread_skip, + workspace, &tool_choice, &tools, &mut response_handler, @@ -738,6 +764,8 @@ impl Query { is_tty, turn_state, thread, + thread_skip, + workspace, tool_choice, tools, conversation_id, @@ -814,6 +842,8 @@ impl Query { is_tty, turn_state, thread, + thread_skip, + workspace, // After the first tool call, we revert back to letting the LLM // decide if/which tool to use. ToolChoice::Auto, @@ -839,6 +869,8 @@ impl Query { turn_state: &mut TurnState, provider: &dyn provider::Provider, thread: &mut Thread, + thread_skip: &mut usize, + workspace: &mut Workspace, tool_choice: &ToolChoice, tools: &[ToolDefinition], response_handler: &mut ResponseHandler, @@ -870,6 +902,8 @@ impl Query { is_tty, turn_state, thread, + thread_skip, + workspace, tool_choice.clone(), tools.to_vec(), conversation_id, diff --git a/crates/jp_workspace/src/lib.rs b/crates/jp_workspace/src/lib.rs index d2c117d..5c15ae2 100644 --- a/crates/jp_workspace/src/lib.rs +++ b/crates/jp_workspace/src/lib.rs @@ -260,6 +260,33 @@ impl Workspace { Ok(()) } + /// Persists the active conversation to disk. + /// + /// This can be used continuously while the CLI is running, to persist the + /// active conversation to disk without having to wait for the CLI to exit. + /// This guards against a long-running LLM conversation not being persisted + /// to disk at the end, if the CLI is terminated early. + pub fn persist_active_conversation(&mut self) -> Result<()> { + if self.disable_persistence { + return Ok(()); + } + + let active_id = self.active_conversation_id(); + let Some(storage) = self.storage.as_mut() else { + return Ok(()); + }; + + storage.persist_conversations_and_events( + &TombMap::new(), + &self.state.local.events, + &active_id, + &self.state.local.active_conversation, + )?; + + info!(path = %self.root.display(), "Persisted active conversation."); + Ok(()) + } + /// Gets the ID of the active conversation. #[must_use] pub fn active_conversation_id(&self) -> ConversationId { @@ -630,7 +657,7 @@ mod tests { use jp_storage::{CONVERSATIONS_DIR, METADATA_FILE, value::read_json}; use tempfile::tempdir; use test_log::test; - use time::UtcDateTime; + use time::{UtcDateTime, macros::utc_datetime}; use super::*; @@ -857,4 +884,41 @@ mod tests { active_conversation ); } + + #[test] + fn test_workspace_persist_active_conversation() { + jp_id::global::set("foo".to_owned()); + + let tmp = tempdir().unwrap(); + let root = tmp.path().join("root"); + let storage = root.join("storage"); + + let mut workspace = Workspace::new(&root).persisted_at(&storage).unwrap(); + let config = Arc::new(AppConfig::new_test()); + + let id1 = ConversationId::try_from(utc_datetime!(2023-01-01 0:00)).unwrap(); + let id2 = ConversationId::try_from(utc_datetime!(2023-01-02 0:00)).unwrap(); + + workspace.create_conversation_with_id(id1, Conversation::default(), config.clone()); + workspace.create_conversation_with_id(id2, Conversation::default(), config.clone()); + workspace + .set_active_conversation_id(id1, UtcDateTime::UNIX_EPOCH) + .unwrap(); + + workspace.persist_active_conversation().unwrap(); + assert!(storage.is_dir()); + + let id1_metadata_file = storage + .join(CONVERSATIONS_DIR) + .join(id1.to_dirname(None)) + .join(METADATA_FILE); + + let id2_metadata_file = storage + .join(CONVERSATIONS_DIR) + .join(id2.to_dirname(None)) + .join(METADATA_FILE); + + assert!(id1_metadata_file.is_file()); + assert!(!id2_metadata_file.is_file()); + } }