From 63176497d471f53d12d1c0f349b6626094c55907 Mon Sep 17 00:00:00 2001 From: Jean Mertz Date: Fri, 16 Jan 2026 13:14:14 +0100 Subject: [PATCH 1/5] enhance(cli, workspace): Persist events incrementally during streams The `jp query` command now persists conversation events to disk as they are received during the LLM stream. Previously, persistence occurred only after the entire command completed, which could lead to data loss if the process was interrupted during a long-running interaction or complex tool use. The `Workspace` now exposes a `persist_active_conversation` method to allow manual triggering of the persistence logic. Within the `query` command, the `handle_stream` function tracks new events in the current `Thread` and updates the workspace state before persisting, ensuring the local history remains up-to-date even during recursive tool calls. Signed-off-by: Jean Mertz --- crates/jp_cli/src/cmd/query.rs | 38 ++++++++++++++++++++++++++++++++-- crates/jp_workspace/src/lib.rs | 27 ++++++++++++++++++++++++ 2 files changed, 63 insertions(+), 2 deletions(-) diff --git a/crates/jp_cli/src/cmd/query.rs b/crates/jp_cli/src/cmd/query.rs index 231517a..97a9ae7 100644 --- a/crates/jp_cli/src/cmd/query.rs +++ b/crates/jp_cli/src/cmd/query.rs @@ -363,10 +363,13 @@ impl Query { // Keep track of the number of events in the stream, so that we can // later append new events to the end. - let current_events = stream.len(); + let mut current_events = stream.len(); let mut thread = build_thread(stream, attachments, &cfg.assistant, &tools)?; + let root = ctx.workspace.root().to_path_buf(); + let workspace = &mut ctx.workspace; + let mut result = Output::Ok(Success::Ok); if let Some(schema) = self.schema.clone() { result = handle_structured_output( @@ -385,10 +388,12 @@ impl Query { &cfg, &mut ctx.signals.receiver, &ctx.mcp_client, - ctx.workspace.root().to_path_buf(), + root, ctx.term.is_tty, &mut turn_state, &mut thread, + &mut current_events, + workspace, cfg.assistant.tool_choice.clone(), tools, conversation_id, @@ -581,11 +586,30 @@ impl Query { is_tty: bool, turn_state: &mut TurnState, thread: &mut Thread, + thread_skip: &mut usize, + workspace: &mut Workspace, tool_choice: ToolChoice, tools: Vec, conversation_id: ConversationId, printer: Arc, ) -> Result<()> { + // Append any new events to the end of the stream. + // + // By doing this in our `handle_stream` loop, we allow the in-memory + // state to be live-updated, which in turn allows us to call + // `Workspace::persist_active_conversation` in the loop, ensuring data + // is persisted even if the process is interrupted. + for event in thread.events.iter().skip(*thread_skip) { + *thread_skip += 1; + + workspace + .get_events_mut(&conversation_id) + .expect("TODO: add this invariant to the type system") + .push_with_config_delta(event); + } + + workspace.persist_active_conversation()?; + let mut result = Ok(()); let mut cancelled = false; turn_state.request_count += 1; @@ -689,6 +713,8 @@ impl Query { turn_state, provider.as_ref(), thread, + thread_skip, + workspace, &tool_choice, &tools, &mut response_handler, @@ -738,6 +764,8 @@ impl Query { is_tty, turn_state, thread, + thread_skip, + workspace, tool_choice, tools, conversation_id, @@ -814,6 +842,8 @@ impl Query { is_tty, turn_state, thread, + thread_skip, + workspace, // After the first tool call, we revert back to letting the LLM // decide if/which tool to use. ToolChoice::Auto, @@ -839,6 +869,8 @@ impl Query { turn_state: &mut TurnState, provider: &dyn provider::Provider, thread: &mut Thread, + thread_skip: &mut usize, + workspace: &mut Workspace, tool_choice: &ToolChoice, tools: &[ToolDefinition], response_handler: &mut ResponseHandler, @@ -870,6 +902,8 @@ impl Query { is_tty, turn_state, thread, + thread_skip, + workspace, tool_choice.clone(), tools.to_vec(), conversation_id, diff --git a/crates/jp_workspace/src/lib.rs b/crates/jp_workspace/src/lib.rs index d2c117d..f19f4c0 100644 --- a/crates/jp_workspace/src/lib.rs +++ b/crates/jp_workspace/src/lib.rs @@ -260,6 +260,33 @@ impl Workspace { Ok(()) } + /// Persists the active conversation to disk. + /// + /// This can be used continuously while the CLI is running, to persist the + /// active conversation to disk without having to wait for the CLI to exit. + /// This guards against a long-running LLM conversation not being persisted + /// to disk at the end, if the CLI is terminated early. + pub fn persist_active_conversation(&mut self) -> Result<()> { + if self.disable_persistence { + return Ok(()); + } + + let active_id = self.active_conversation_id(); + let Some(storage) = self.storage.as_mut() else { + return Ok(()); + }; + + storage.persist_conversations_and_events( + &TombMap::new(), + &self.state.local.events, + &active_id, + &self.state.local.active_conversation, + )?; + + info!(path = %self.root.display(), "Persisted active conversation."); + Ok(()) + } + /// Gets the ID of the active conversation. #[must_use] pub fn active_conversation_id(&self) -> ConversationId { From 02e53093c370078829f281f9a7525d774003ee9d Mon Sep 17 00:00:00 2001 From: Jean Mertz Date: Fri, 16 Jan 2026 13:21:11 +0100 Subject: [PATCH 2/5] fixup! enhance(cli, workspace): Persist events incrementally during streams Signed-off-by: Jean Mertz --- crates/jp_workspace/src/lib.rs | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/crates/jp_workspace/src/lib.rs b/crates/jp_workspace/src/lib.rs index f19f4c0..9399a3c 100644 --- a/crates/jp_workspace/src/lib.rs +++ b/crates/jp_workspace/src/lib.rs @@ -271,7 +271,6 @@ impl Workspace { return Ok(()); } - let active_id = self.active_conversation_id(); let Some(storage) = self.storage.as_mut() else { return Ok(()); }; @@ -279,7 +278,11 @@ impl Workspace { storage.persist_conversations_and_events( &TombMap::new(), &self.state.local.events, - &active_id, + &self + .state + .user + .conversations_metadata + .active_conversation_id, &self.state.local.active_conversation, )?; From bdf48aff9292231348772f0d3993693a1e44a6f2 Mon Sep 17 00:00:00 2001 From: Jean Mertz Date: Fri, 16 Jan 2026 13:21:40 +0100 Subject: [PATCH 3/5] fixup! enhance(cli, workspace): Persist events incrementally during streams Signed-off-by: Jean Mertz --- crates/jp_workspace/src/lib.rs | 7 ++----- 1 file changed, 2 insertions(+), 5 deletions(-) diff --git a/crates/jp_workspace/src/lib.rs b/crates/jp_workspace/src/lib.rs index 9399a3c..f19f4c0 100644 --- a/crates/jp_workspace/src/lib.rs +++ b/crates/jp_workspace/src/lib.rs @@ -271,6 +271,7 @@ impl Workspace { return Ok(()); } + let active_id = self.active_conversation_id(); let Some(storage) = self.storage.as_mut() else { return Ok(()); }; @@ -278,11 +279,7 @@ impl Workspace { storage.persist_conversations_and_events( &TombMap::new(), &self.state.local.events, - &self - .state - .user - .conversations_metadata - .active_conversation_id, + &active_id, &self.state.local.active_conversation, )?; From b6e47135eecb9efe3c24923a0122b2247f6460fa Mon Sep 17 00:00:00 2001 From: Jean Mertz Date: Fri, 16 Jan 2026 13:49:38 +0100 Subject: [PATCH 4/5] fixup! enhance(cli, workspace): Persist events incrementally during streams Signed-off-by: Jean Mertz --- crates/jp_workspace/src/lib.rs | 37 ++++++++++++++++++++++++++++++++++ 1 file changed, 37 insertions(+) diff --git a/crates/jp_workspace/src/lib.rs b/crates/jp_workspace/src/lib.rs index f19f4c0..291d92c 100644 --- a/crates/jp_workspace/src/lib.rs +++ b/crates/jp_workspace/src/lib.rs @@ -884,4 +884,41 @@ mod tests { active_conversation ); } + + #[test] + fn test_workspace_persist_active_conversation() { + jp_id::global::set("foo".to_owned()); + + let tmp = tempdir().unwrap(); + let root = tmp.path().join("root"); + let storage = root.join("storage"); + + let mut workspace = Workspace::new(&root).persisted_at(&storage).unwrap(); + let config = Arc::new(AppConfig::new_test()); + + let id1 = ConversationId::try_from(utc_datetime!(2023-01-01 0:00)).unwrap(); + let id2 = ConversationId::try_from(utc_datetime!(2023-01-02 0:00)).unwrap(); + + workspace.create_conversation_with_id(id1, Conversation::default(), config.clone()); + workspace.create_conversation_with_id(id2, Conversation::default(), config.clone()); + workspace + .set_active_conversation_id(id1, UtcDateTime::UNIX_EPOCH) + .unwrap(); + + workspace.persist_active_conversation().unwrap(); + assert!(storage.is_dir()); + + let id1_metadata_file = storage + .join(CONVERSATIONS_DIR) + .join(id1.to_dirname(None)) + .join(METADATA_FILE); + + let id2_metadata_file = storage + .join(CONVERSATIONS_DIR) + .join(id2.to_dirname(None)) + .join(METADATA_FILE); + + assert!(id1_metadata_file.is_file()); + assert!(!id2_metadata_file.is_file()); + } } From 3f8b43f1017a02bf4e241a295194074b2daa17a8 Mon Sep 17 00:00:00 2001 From: Jean Mertz Date: Fri, 16 Jan 2026 13:52:03 +0100 Subject: [PATCH 5/5] fixup! enhance(cli, workspace): Persist events incrementally during streams Signed-off-by: Jean Mertz --- crates/jp_workspace/src/lib.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/crates/jp_workspace/src/lib.rs b/crates/jp_workspace/src/lib.rs index 291d92c..5c15ae2 100644 --- a/crates/jp_workspace/src/lib.rs +++ b/crates/jp_workspace/src/lib.rs @@ -657,7 +657,7 @@ mod tests { use jp_storage::{CONVERSATIONS_DIR, METADATA_FILE, value::read_json}; use tempfile::tempdir; use test_log::test; - use time::UtcDateTime; + use time::{UtcDateTime, macros::utc_datetime}; use super::*;