Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
38 changes: 36 additions & 2 deletions crates/jp_cli/src/cmd/query.rs
Original file line number Diff line number Diff line change
Expand Up @@ -363,10 +363,13 @@ impl Query {

// Keep track of the number of events in the stream, so that we can
// later append new events to the end.
let current_events = stream.len();
let mut current_events = stream.len();

let mut thread = build_thread(stream, attachments, &cfg.assistant, &tools)?;

let root = ctx.workspace.root().to_path_buf();
let workspace = &mut ctx.workspace;

let mut result = Output::Ok(Success::Ok);
if let Some(schema) = self.schema.clone() {
result = handle_structured_output(
Expand All @@ -385,10 +388,12 @@ impl Query {
&cfg,
&mut ctx.signals.receiver,
&ctx.mcp_client,
ctx.workspace.root().to_path_buf(),
root,
ctx.term.is_tty,
&mut turn_state,
&mut thread,
&mut current_events,
workspace,
cfg.assistant.tool_choice.clone(),
tools,
conversation_id,
Expand Down Expand Up @@ -581,11 +586,30 @@ impl Query {
is_tty: bool,
turn_state: &mut TurnState,
thread: &mut Thread,
thread_skip: &mut usize,
workspace: &mut Workspace,
tool_choice: ToolChoice,
tools: Vec<ToolDefinition>,
conversation_id: ConversationId,
printer: Arc<Printer>,
) -> Result<()> {
// Append any new events to the end of the stream.
//
// By doing this in our `handle_stream` loop, we allow the in-memory
// state to be live-updated, which in turn allows us to call
// `Workspace::persist_active_conversation` in the loop, ensuring data
// is persisted even if the process is interrupted.
for event in thread.events.iter().skip(*thread_skip) {
*thread_skip += 1;

workspace
.get_events_mut(&conversation_id)
.expect("TODO: add this invariant to the type system")
.push_with_config_delta(event);
}

workspace.persist_active_conversation()?;

let mut result = Ok(());
let mut cancelled = false;
turn_state.request_count += 1;
Expand Down Expand Up @@ -689,6 +713,8 @@ impl Query {
turn_state,
provider.as_ref(),
thread,
thread_skip,
workspace,
&tool_choice,
&tools,
&mut response_handler,
Expand Down Expand Up @@ -738,6 +764,8 @@ impl Query {
is_tty,
turn_state,
thread,
thread_skip,
workspace,
tool_choice,
tools,
conversation_id,
Expand Down Expand Up @@ -814,6 +842,8 @@ impl Query {
is_tty,
turn_state,
thread,
thread_skip,
workspace,
// After the first tool call, we revert back to letting the LLM
// decide if/which tool to use.
ToolChoice::Auto,
Expand All @@ -839,6 +869,8 @@ impl Query {
turn_state: &mut TurnState,
provider: &dyn provider::Provider,
thread: &mut Thread,
thread_skip: &mut usize,
workspace: &mut Workspace,
tool_choice: &ToolChoice,
tools: &[ToolDefinition],
response_handler: &mut ResponseHandler,
Expand Down Expand Up @@ -870,6 +902,8 @@ impl Query {
is_tty,
turn_state,
thread,
thread_skip,
workspace,
tool_choice.clone(),
tools.to_vec(),
conversation_id,
Expand Down
66 changes: 65 additions & 1 deletion crates/jp_workspace/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -260,6 +260,33 @@ impl Workspace {
Ok(())
}

/// Persists the active conversation to disk.
///
/// This can be used continuously while the CLI is running, to persist the
/// active conversation to disk without having to wait for the CLI to exit.
/// This guards against a long-running LLM conversation not being persisted
/// to disk at the end, if the CLI is terminated early.
pub fn persist_active_conversation(&mut self) -> Result<()> {
if self.disable_persistence {
return Ok(());
}

let active_id = self.active_conversation_id();
let Some(storage) = self.storage.as_mut() else {
return Ok(());
};

storage.persist_conversations_and_events(
&TombMap::new(),
&self.state.local.events,
&active_id,
&self.state.local.active_conversation,
)?;

info!(path = %self.root.display(), "Persisted active conversation.");
Ok(())
}

/// Gets the ID of the active conversation.
#[must_use]
pub fn active_conversation_id(&self) -> ConversationId {
Expand Down Expand Up @@ -630,7 +657,7 @@ mod tests {
use jp_storage::{CONVERSATIONS_DIR, METADATA_FILE, value::read_json};
use tempfile::tempdir;
use test_log::test;
use time::UtcDateTime;
use time::{UtcDateTime, macros::utc_datetime};

use super::*;

Expand Down Expand Up @@ -857,4 +884,41 @@ mod tests {
active_conversation
);
}

#[test]
fn test_workspace_persist_active_conversation() {
jp_id::global::set("foo".to_owned());

let tmp = tempdir().unwrap();
let root = tmp.path().join("root");
let storage = root.join("storage");

let mut workspace = Workspace::new(&root).persisted_at(&storage).unwrap();
let config = Arc::new(AppConfig::new_test());

let id1 = ConversationId::try_from(utc_datetime!(2023-01-01 0:00)).unwrap();
let id2 = ConversationId::try_from(utc_datetime!(2023-01-02 0:00)).unwrap();

workspace.create_conversation_with_id(id1, Conversation::default(), config.clone());
workspace.create_conversation_with_id(id2, Conversation::default(), config.clone());
workspace
.set_active_conversation_id(id1, UtcDateTime::UNIX_EPOCH)
.unwrap();

workspace.persist_active_conversation().unwrap();
assert!(storage.is_dir());

let id1_metadata_file = storage
.join(CONVERSATIONS_DIR)
.join(id1.to_dirname(None))
.join(METADATA_FILE);

let id2_metadata_file = storage
.join(CONVERSATIONS_DIR)
.join(id2.to_dirname(None))
.join(METADATA_FILE);

assert!(id1_metadata_file.is_file());
assert!(!id2_metadata_file.is_file());
}
}
Loading