diff --git a/refact-agent/engine/src/cloud/experts_req.rs b/refact-agent/engine/src/cloud/experts_req.rs index 4a698e393..222c98b51 100644 --- a/refact-agent/engine/src/cloud/experts_req.rs +++ b/refact-agent/engine/src/cloud/experts_req.rs @@ -112,7 +112,9 @@ pub async fn expert_choice_consequences( let query = r#" query GetExpertModel($fexp_id: String!, $inside_fgroup_id: String!) { expert_choice_consequences(fexp_id: $fexp_id, inside_fgroup_id: $inside_fgroup_id) { - provm_name + models { + provm_name + } } } "#; @@ -129,7 +131,11 @@ pub async fn expert_choice_consequences( }); info!("expert_choice_consequences: address={}, fexp_id={}, inside_fgroup_id={}", config.address, fexp_id, fgroup_id); - let result: Vec = execute_graphql( + #[derive(Deserialize, Debug)] + struct Consequences { + models: Vec, + } + let result: Consequences = execute_graphql( config, query, variables, @@ -138,9 +144,9 @@ pub async fn expert_choice_consequences( .await .map_err(|e| e.to_string())?; - if result.is_empty() { + if result.models.is_empty() { return Err(format!("No models found for the expert with name {}", fexp_id)); } - Ok(result[0].provm_name.clone()) + Ok(result.models[0].provm_name.clone()) } diff --git a/refact-agent/engine/src/cloud/messages_req.rs b/refact-agent/engine/src/cloud/messages_req.rs index 73eab1629..cd2c28159 100644 --- a/refact-agent/engine/src/cloud/messages_req.rs +++ b/refact-agent/engine/src/cloud/messages_req.rs @@ -303,3 +303,48 @@ pub async fn get_tool_names_from_openai_format( } Ok(tool_names) } + +pub async fn thread_message_patch_app_specific( + cmd_address_url: &str, + api_key: &str, + ftm_belongs_to_ft_id: &str, + ftm_alt: i64, + ftm_num: i64, + ftm_app_specific: Value, +) -> Result<(), String> { + let mutation = r#" + mutation ThreadMessagePatch($input: FThreadMessagePatch!) { + thread_message_patch(input: $input) + } + "#; + + let variables = json!({ + "input": { + "ftm_belongs_to_ft_id": ftm_belongs_to_ft_id, + "ftm_alt": ftm_alt, + "ftm_num": ftm_num, + "ftm_app_specific": serde_json::to_string(&ftm_app_specific).unwrap() + } + }); + + let config = GraphQLRequestConfig { + address: cmd_address_url.to_string(), + api_key: api_key.to_string(), + user_agent: Some("refact-lsp".to_string()), + additional_headers: None, + }; + + tracing::info!( + "thread_message_patch_app_specific: address={}, ftm_belongs_to_ft_id={}, ftm_alt={}, ftm_num={}", + config.address, ftm_belongs_to_ft_id, ftm_alt, ftm_num + ); + + execute_graphql_no_result( + config, + mutation, + variables, + "thread_message_update" + ) + .await + .map_err(graphql_error_to_string) +} diff --git a/refact-agent/engine/src/cloud/subchat.rs b/refact-agent/engine/src/cloud/subchat.rs index 9981f35f7..574f13f8f 100644 --- a/refact-agent/engine/src/cloud/subchat.rs +++ b/refact-agent/engine/src/cloud/subchat.rs @@ -2,13 +2,14 @@ use std::sync::Arc; use std::sync::atomic::Ordering; use futures::StreamExt; use crate::at_commands::at_commands::AtCommandsContext; +use crate::global_context::APP_CAPTURE_ID; use tokio::sync::Mutex as AMutex; use crate::call_validation::{ChatMessage, ReasoningEffort}; use crate::cloud::{threads_req, messages_req}; use serde::{Deserialize, Serialize}; use serde_json::Value; use tokio_tungstenite::tungstenite::Message; -use tracing::{error, info}; +use tracing::{error, info, warn}; use crate::cloud::threads_sub::{initialize_connection, ThreadPayload}; #[derive(Serialize, Deserialize, Debug)] @@ -63,7 +64,7 @@ pub async fn subchat( ccx.lock().await.chat_id.clone(), ) }; - + let model_name = crate::cloud::experts_req::expert_choice_consequences(&cmd_address_url, &api_key, ft_fexp_id, &located_fgroup_id).await?; let preferences = build_preferences(&model_name, temperature, max_new_tokens, 1, reasoning_effort); let existing_threads = crate::cloud::threads_req::get_threads_app_captured( @@ -71,7 +72,8 @@ pub async fn subchat( &api_key, &located_fgroup_id, &app_searchable_id, - tool_call_id + APP_CAPTURE_ID, + Some(tool_call_id), ).await?; let thread = if !existing_threads.is_empty() { info!("There are already existing threads for this tool_id: {:?}", existing_threads); @@ -83,14 +85,13 @@ pub async fn subchat( &located_fgroup_id, ft_fexp_id, &format!("subchat_{}", ft_fexp_id), - &tool_call_id, + APP_CAPTURE_ID, &app_searchable_id, serde_json::json!({ - "tool_call_id": tool_call_id, - "ft_fexp_id": ft_fexp_id, - }), + "tool_call_id": tool_call_id, + "ft_fexp_id": ft_fexp_id, + }), None, - Some(parent_thread_id) ).await?; let thread_messages = messages_req::convert_messages_to_thread_messages( messages, 100, 100, 1, &thread.ft_id, Some(preferences) @@ -100,9 +101,9 @@ pub async fn subchat( ).await?; thread }; - + let thread_id = thread.ft_id.clone(); - let connection_result = initialize_connection(&cmd_address_url, &api_key, &located_fgroup_id).await; + let connection_result = initialize_connection(&cmd_address_url, &api_key, &located_fgroup_id, &app_searchable_id).await; let mut connection = match connection_result { Ok(conn) => conn, Err(err) => return Err(format!("Failed to initialize WebSocket connection: {}", err)), @@ -125,20 +126,31 @@ pub async fn subchat( "data" => { if let Some(payload) = response["payload"].as_object() { let data = &payload["data"]; - let threads_in_group = &data["threads_in_group"]; - let news_action = threads_in_group["news_action"].as_str().unwrap_or(""); - if news_action != "INSERT" && news_action != "UPDATE" { - continue; - } - if let Ok(payload) = serde_json::from_value::(threads_in_group["news_payload"].clone()) { - if payload.ft_id != thread_id { + let threads_and_calls_subs = &data["bot_threads_and_calls_subs"]; + let news_action = threads_and_calls_subs["news_action"].as_str().unwrap_or(""); + let news_about = threads_and_calls_subs["news_about"].as_str().unwrap_or(""); + + if news_about == "flexus_thread" { + + if news_action != "INSERT" && news_action != "UPDATE" { continue; } - if payload.ft_error.is_some() { - break; + + if let Some(news_payload_thread) = threads_and_calls_subs["news_payload_thread"].as_object() { + if let Ok(payload) = serde_json::from_value::(serde_json::Value::Object(news_payload_thread.clone())) { + if payload.ft_id != thread_id { + continue; + } + if payload.ft_error.is_some() { + break; + } + } else { + info!("failed to parse thread payload: {:?}", news_payload_thread); + } + } else { + info!("received thread update but couldn't find news_payload_thread"); } - } else { - info!("failed to parse thread payload: {:?}", threads_in_group); + } } else { info!("received data message but couldn't find payload"); @@ -151,7 +163,7 @@ pub async fn subchat( error!("threads subscription complete: {}.\nRestarting it", text); } _ => { - info!("received message with unknown type: {}", text); + warn!("received message with unknown type: {}", text); } } } @@ -175,7 +187,7 @@ pub async fn subchat( return Err(format!("Thread error: {:?}", error)); } } - + let all_thread_messages = messages_req::get_thread_messages( &cmd_address_url, &api_key, &thread_id, 100 ).await?; diff --git a/refact-agent/engine/src/cloud/threads_processing.rs b/refact-agent/engine/src/cloud/threads_processing.rs index c85087cff..280daa458 100644 --- a/refact-agent/engine/src/cloud/threads_processing.rs +++ b/refact-agent/engine/src/cloud/threads_processing.rs @@ -1,5 +1,6 @@ use std::collections::{HashMap, HashSet}; use std::sync::Arc; +use std::time::SystemTime; use indexmap::IndexMap; use tokio::sync::RwLock as ARwLock; use tokio::sync::Mutex as AMutex; @@ -10,7 +11,8 @@ use crate::basic_utils::generate_random_hash; use crate::call_validation::{ChatContent, ChatMessage, ChatToolCall, ContextEnum, ContextFile}; use crate::cloud::messages_req::ThreadMessage; use crate::cloud::threads_req::{lock_thread, Thread}; -use crate::cloud::threads_sub::{BasicStuff, ThreadPayload}; +use crate::cloud::threads_sub::{BasicStuff, ThreadMessagePayload, ThreadPayload}; +use crate::git::checkpoints::create_workspace_checkpoint; use crate::global_context::GlobalContext; use crate::scratchpads::scratchpad_utils::max_tokens_for_rag_chat_by_tools; use crate::tools::tools_description::{MatchConfirmDeny, MatchConfirmDenyResult, Tool}; @@ -337,7 +339,6 @@ pub async fn process_thread_event( basic_info: BasicStuff, cmd_address_url: String, api_key: String, - app_searchable_id: String, located_fgroup_id: String, ) -> Result<(), String> { if thread_payload.ft_need_tool_calls == -1 @@ -345,17 +346,6 @@ pub async fn process_thread_event( || !thread_payload.ft_locked_by.is_empty() { return Ok(()); } - if let Some(ft_app_searchable) = thread_payload.ft_app_searchable.clone() { - if ft_app_searchable != app_searchable_id { - info!("thread `{}` has different `app_searchable` id, skipping it: {} != {}", - thread_payload.ft_id, app_searchable_id, ft_app_searchable - ); - return Ok(()); - } - } else { - info!("thread `{}` doesn't have the `app_searchable` id, skipping it", thread_payload.ft_id); - return Ok(()); - } if let Some(error) = thread_payload.ft_error.as_ref() { info!("thread `{}` has the error: `{}`. Skipping it", thread_payload.ft_id, error); return Ok(()); @@ -386,6 +376,42 @@ pub async fn process_thread_event( process_result } +pub async fn process_thread_message_event( + gcx: Arc>, + mut thread_message_payload: ThreadMessagePayload, + cmd_address_url: String, + api_key: String, +) -> Result<(), String> { + let old_message_cutoff = SystemTime::now().duration_since(SystemTime::UNIX_EPOCH).unwrap().as_secs() - 300; + if thread_message_payload.ftm_role != "user" || thread_message_payload.ftm_created_ts < old_message_cutoff as f64 { + return Ok(()); + } + if thread_message_payload.ftm_app_specific.as_ref().is_some_and(|a| a.get("checkpoints").is_some()) { + return Ok(()); + } + + let (checkpoints, _) = create_workspace_checkpoint(gcx.clone(), None, &thread_message_payload.ftm_belongs_to_ft_id).await?; + + let mut app_specific = thread_message_payload.ftm_app_specific.as_ref() + .and_then(|v| v.as_object().cloned()) + .unwrap_or_else(|| serde_json::Map::new()); + app_specific.insert("checkpoints".to_string(), serde_json::json!(checkpoints)); + thread_message_payload.ftm_app_specific = Some(Value::Object(app_specific.clone())); + + crate::cloud::messages_req::thread_message_patch_app_specific( + &cmd_address_url, + &api_key, + &thread_message_payload.ftm_belongs_to_ft_id, + thread_message_payload.ftm_alt, + thread_message_payload.ftm_num, + Value::Object(app_specific), + ).await?; + + tracing::info!("Created and stored checkpoints: {:#?}", checkpoints); + + Ok(()) +} + async fn process_locked_thread( gcx: Arc>, thread_payload: &ThreadPayload, diff --git a/refact-agent/engine/src/cloud/threads_req.rs b/refact-agent/engine/src/cloud/threads_req.rs index efa7b9e02..d7b1c18fb 100644 --- a/refact-agent/engine/src/cloud/threads_req.rs +++ b/refact-agent/engine/src/cloud/threads_req.rs @@ -32,10 +32,9 @@ pub async fn create_thread( ft_app_searchable: &str, ft_app_specific: Value, ft_toolset: Option>, - parent_ft_id: Option, ) -> Result { use crate::cloud::graphql_client::{execute_graphql, GraphQLRequestConfig}; - + let mutation = r#" mutation CreateThread($input: FThreadInput!) { thread_create(input: $input) { @@ -64,8 +63,8 @@ pub async fn create_thread( Some(toolset) => serde_json::to_string(&toolset).map_err(|e| format!("Failed to serialize toolset: {}", e))?, None => "null".to_string(), }; - - let mut input = json!({ + + let input = json!({ "owner_shared": false, "located_fgroup_id": located_fgroup_id, "ft_fexp_id": ft_fexp_id, @@ -76,10 +75,6 @@ pub async fn create_thread( "ft_app_specific": serde_json::to_string(&ft_app_specific).unwrap(), }); - if let Some(parent_id) = parent_ft_id { - input["parent_ft_id"] = json!(parent_id); - } - let config = GraphQLRequestConfig { address: cmd_address_url.to_string(), api_key: api_key.to_string(), @@ -104,7 +99,7 @@ pub async fn get_thread( thread_id: &str, ) -> Result { use crate::cloud::graphql_client::{execute_graphql, GraphQLRequestConfig}; - + let query = r#" query GetThread($id: String!) { thread_get(id: $id) { @@ -151,12 +146,18 @@ pub async fn get_threads_app_captured( located_fgroup_id: &str, ft_app_searchable: &str, ft_app_capture: &str, + tool_call_id: Option<&str>, ) -> Result, String> { use crate::cloud::graphql_client::{execute_graphql, GraphQLRequestConfig}; - + let query = r#" - query GetThread($located_fgroup_id: String!, $ft_app_capture: String!, $ft_app_searchable: String!) { - threads_app_captured(located_fgroup_id: $located_fgroup_id, ft_app_capture: $ft_app_capture, ft_app_searchable: $ft_app_searchable) { + query GetThread($located_fgroup_id: String!, $ft_app_capture: String!, $ft_app_searchable: String!, $ft_app_specific_filters: [FTAppSpecificFilter!]) { + threads_app_captured( + located_fgroup_id: $located_fgroup_id, + ft_app_capture: $ft_app_capture, + ft_app_searchable: $ft_app_searchable, + ft_app_specific_filters: $ft_app_specific_filters + ) { owner_fuser_id owner_shared located_fgroup_id @@ -187,7 +188,11 @@ pub async fn get_threads_app_captured( let variables = json!({ "located_fgroup_id": located_fgroup_id, "ft_app_capture": ft_app_capture, - "ft_app_searchable": ft_app_searchable + "ft_app_searchable": ft_app_searchable, + "ft_app_specific_filters": match tool_call_id { + Some(id) => vec![json!({ "path": "tool_call_id", "equals": id })], + None => Vec::new(), + }, }); tracing::info!("get_threads_app_captured: address={}, located_fgroup_id={}, ft_app_capture={}, ft_app_searchable={}", config.address, located_fgroup_id, ft_app_capture, ft_app_searchable @@ -209,7 +214,7 @@ pub async fn set_thread_toolset( ft_toolset: Vec ) -> Result, String> { use crate::cloud::graphql_client::{execute_graphql, GraphQLRequestConfig}; - + let mutation = r#" mutation UpdateThread($thread_id: String!, $patch: FThreadPatch!) { thread_patch(id: $thread_id, patch: $patch) { @@ -217,7 +222,7 @@ pub async fn set_thread_toolset( } } "#; - + let variables = json!({ "thread_id": thread_id, "patch": { @@ -258,12 +263,12 @@ pub async fn lock_thread( hash: &str, ) -> Result<(), String> { use crate::cloud::graphql_client::{execute_graphql_bool_result, GraphQLRequestConfig}; - + let worker_name = format!("refact-lsp:{hash}"); let query = r#" mutation AdvanceLock($ft_id: String!, $worker_name: String!) { thread_lock(ft_id: $ft_id, worker_name: $worker_name) - } + } "#; let config = GraphQLRequestConfig { @@ -273,7 +278,7 @@ pub async fn lock_thread( }; let variables = json!({ - "ft_id": thread_id, + "ft_id": thread_id, "worker_name": worker_name }); @@ -303,7 +308,7 @@ pub async fn unlock_thread( hash: &str, ) -> Result<(), String> { use crate::cloud::graphql_client::{execute_graphql_bool_result, GraphQLRequestConfig}; - + let worker_name = format!("refact-lsp:{hash}"); let query = r#" mutation AdvanceUnlock($ft_id: String!, $worker_name: String!) { @@ -318,7 +323,7 @@ pub async fn unlock_thread( }; let variables = json!({ - "ft_id": thread_id, + "ft_id": thread_id, "worker_name": worker_name }); @@ -348,7 +353,7 @@ pub async fn set_error_thread( error: &str, ) -> Result<(), String> { use crate::cloud::graphql_client::{execute_graphql_no_result, GraphQLRequestConfig}; - + let mutation = r#" mutation SetThreadError($thread_id: String!, $patch: FThreadPatch!) { thread_patch(id: $thread_id, patch: $patch) { @@ -356,7 +361,7 @@ pub async fn set_error_thread( } } "#; - + let variables = json!({ "thread_id": thread_id, "patch": { @@ -369,7 +374,7 @@ pub async fn set_error_thread( api_key: api_key.to_string(), ..Default::default() }; - + tracing::info!("unlock_thread: address={}, thread_id={}, ft_error={}", config.address, thread_id, error ); @@ -390,16 +395,16 @@ pub async fn set_thread_confirmation_request( confirmation_request: Value, ) -> Result { use crate::cloud::graphql_client::{execute_graphql_bool_result, GraphQLRequestConfig}; - + let mutation = r#" mutation SetThreadConfirmationRequest($ft_id: String!, $confirmation_request: String!) { thread_set_confirmation_request(ft_id: $ft_id, confirmation_request: $confirmation_request) } "#; - + let confirmation_request_str = serde_json::to_string(&confirmation_request) .map_err(|e| format!("Failed to serialize confirmation request: {}", e))?; - + let variables = json!({ "ft_id": thread_id, "confirmation_request": confirmation_request_str diff --git a/refact-agent/engine/src/cloud/threads_sub.rs b/refact-agent/engine/src/cloud/threads_sub.rs index f9aed471d..8a1248d3b 100644 --- a/refact-agent/engine/src/cloud/threads_sub.rs +++ b/refact-agent/engine/src/cloud/threads_sub.rs @@ -1,4 +1,4 @@ -use crate::global_context::GlobalContext; +use crate::global_context::{GlobalContext, APP_CAPTURE_ID}; use futures::{SinkExt, StreamExt}; use crate::cloud::graphql_client::{execute_graphql, GraphQLRequestConfig, graphql_error_to_string}; use serde::{Deserialize, Serialize}; @@ -31,19 +31,45 @@ pub struct ThreadPayload { pub ft_confirmation_response: Option, } +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct ThreadMessagePayload { + pub ftm_belongs_to_ft_id: String, + pub ftm_alt: i64, + pub ftm_num: i64, + pub ftm_role: String, + pub ftm_app_specific: Option, + pub ftm_created_ts: f64, +} + #[derive(Debug, Serialize, Deserialize, Clone)] pub struct BasicStuff { pub fuser_id: String, pub workspaces: Vec, } -// XXX use xxx_subs::filter for ft_app_capture const THREADS_SUBSCRIPTION_QUERY: &str = r#" - subscription ThreadsPageSubs($located_fgroup_id: String!) { - threads_in_group(located_fgroup_id: $located_fgroup_id) { + subscription ThreadsAndCallsSubs( + $fgroup_id: String, + $marketable_name: String!, + $ft_app_searchable: String!, + $inprocess_tool_names: [String!]!, + $max_threads: Int!, + ) { + bot_threads_and_calls_subs( + fgroup_id: $fgroup_id, + marketable_name: $marketable_name, + marketable_version: "", + ft_app_searchable: $ft_app_searchable, + inprocess_tool_names: $inprocess_tool_names, + max_threads: $max_threads, + want_personas: false, + want_threads: true, + want_messages: true, + ) { news_action + news_about news_payload_id - news_payload { + news_payload_thread { owner_fuser_id ft_id ft_error @@ -57,6 +83,14 @@ const THREADS_SUBSCRIPTION_QUERY: &str = r#" ft_app_capture ft_app_specific } + news_payload_thread_message { + ftm_belongs_to_ft_id + ftm_num + ftm_alt + ftm_role + ftm_app_specific + ftm_created_ts + } } } "#; @@ -86,11 +120,13 @@ pub async fn watch_threads_subscription(gcx: Arc>) { continue; }; + let app_searchable_id = gcx.read().await.app_searchable_id.clone(); + info!( - "starting subscription for threads_in_group with fgroup_id=\"{}\"", - located_fgroup_id + "starting subscription for bot_threads_and_calls_subs with fgroup_id=\"{}\" and app_searchable_id=\"{}\"", + located_fgroup_id, app_searchable_id ); - let connection_result = initialize_connection(&address_url, &api_key, &located_fgroup_id).await; + let connection_result = initialize_connection(&address_url, &api_key, &located_fgroup_id, &app_searchable_id).await; let mut connection = match connection_result { Ok(conn) => conn, Err(err) => { @@ -129,6 +165,7 @@ pub async fn initialize_connection( cmd_address_url: &str, api_key: &str, located_fgroup_id: &str, + app_searchable_id: &str, ) -> Result< futures::stream::SplitStream< tokio_tungstenite::WebSocketStream< @@ -208,7 +245,11 @@ pub async fn initialize_connection( "payload": { "query": THREADS_SUBSCRIPTION_QUERY, "variables": { - "located_fgroup_id": located_fgroup_id + "fgroup_id": located_fgroup_id, + "marketable_name": APP_CAPTURE_ID, + "ft_app_searchable": app_searchable_id, + "inprocess_tool_names": [], + "max_threads": 100, } } }); @@ -231,7 +272,6 @@ async fn actual_subscription_loop( located_fgroup_id: &str, ) -> Result<(), String> { info!("cloud threads subscription started, waiting for events..."); - let app_searchable_id = gcx.read().await.app_searchable_id.clone(); let basic_info = get_basic_info(cmd_address_url, api_key).await?; while let Some(msg) = connection.next().await { if gcx.clone().read().await.shutdown_flag.load(Ordering::SeqCst) { @@ -255,27 +295,55 @@ async fn actual_subscription_loop( "data" => { if let Some(payload) = response["payload"].as_object() { let data = &payload["data"]; - let threads_in_group = &data["threads_in_group"]; - let news_action = threads_in_group["news_action"].as_str().unwrap_or(""); + let threads_and_calls_subs = &data["bot_threads_and_calls_subs"]; + let news_action = threads_and_calls_subs["news_action"].as_str().unwrap_or(""); + let news_about = threads_and_calls_subs["news_about"].as_str().unwrap_or(""); + if news_action != "INSERT" && news_action != "UPDATE" { continue; } - if let Ok(payload) = serde_json::from_value::(threads_in_group["news_payload"].clone()) { - let gcx_clone = gcx.clone(); - let payload_clone = payload.clone(); - let basic_info_clone = basic_info.clone(); - let cmd_address_url_clone = cmd_address_url.to_string(); - let api_key_clone = api_key.to_string(); - let app_searchable_id_clone = app_searchable_id.clone(); - let located_fgroup_id_clone = located_fgroup_id.to_string(); - tokio::spawn(async move { - crate::cloud::threads_processing::process_thread_event( - gcx_clone, payload_clone, basic_info_clone, cmd_address_url_clone, api_key_clone, app_searchable_id_clone, located_fgroup_id_clone - ).await - }); - } else { - info!("failed to parse thread payload: {:?}", threads_in_group); + + if news_about == "flexus_thread" { + if let Some(news_payload_thread) = threads_and_calls_subs["news_payload_thread"].as_object() { + if let Ok(payload) = serde_json::from_value::(serde_json::Value::Object(news_payload_thread.clone())) { + let gcx_clone = gcx.clone(); + let payload_clone = payload.clone(); + let basic_info_clone = basic_info.clone(); + let cmd_address_url_clone = cmd_address_url.to_string(); + let api_key_clone = api_key.to_string(); + let located_fgroup_id_clone = located_fgroup_id.to_string(); + tokio::spawn(async move { + crate::cloud::threads_processing::process_thread_event( + gcx_clone, payload_clone, basic_info_clone, cmd_address_url_clone, api_key_clone, located_fgroup_id_clone + ).await + }); + } else { + info!("failed to parse thread payload: {:?}", news_payload_thread); + } + } else { + info!("received thread update but couldn't find news_payload_thread"); + } } + + if news_about == "flexus_thread_message" { + if let Some(news_payload_thread_message) = threads_and_calls_subs["news_payload_thread_message"].as_object() { + if let Ok(payload) = serde_json::from_value::(serde_json::Value::Object(news_payload_thread_message.clone())) { + let gcx_clone = gcx.clone(); + let cmd_address_url_clone = cmd_address_url.to_string(); + let api_key_clone = api_key.to_string(); + tokio::spawn(async move { + crate::cloud::threads_processing::process_thread_message_event( + gcx_clone, payload, cmd_address_url_clone, api_key_clone + ).await + }); + } else { + info!("failed to parse thread message payload: {:?}", news_payload_thread_message); + } + } else { + info!("received thread message update but couldn't find news_payload_thread_message"); + } + } + } else { info!("received data message but couldn't find payload"); } diff --git a/refact-agent/engine/src/global_context.rs b/refact-agent/engine/src/global_context.rs index 425aa1091..69feeca1c 100644 --- a/refact-agent/engine/src/global_context.rs +++ b/refact-agent/engine/src/global_context.rs @@ -23,6 +23,7 @@ use crate::integrations::sessions::IntegrationSession; use crate::privacy::PrivacySettings; use crate::background_tasks::BackgroundTasksHolder; +pub const APP_CAPTURE_ID: &str = "refact-agent"; #[derive(Debug, StructOpt, Clone)] pub struct CommandLine { @@ -198,7 +199,7 @@ pub async fn migrate_to_config_folder( pub fn get_app_searchable_id(workspace_folders: &[PathBuf]) -> String { use std::process::Command; use rand::Rng; - + // Try multiple methods to get a unique machine identifier on macOS let machine_id = { // First attempt: Use system_profiler to get hardware UUID (most reliable) @@ -217,13 +218,13 @@ pub fn get_app_searchable_id(workspace_folders: &[PathBuf]) -> String { .map(|s| s.trim().to_string()) }) }); - + if let Some(uuid) = hardware_uuid { if !uuid.trim().is_empty() { return uuid; } } - + // Second attempt: Try to get the serial number let serial_number = Command::new("system_profiler") .args(&["SPHardwareDataType"]) @@ -239,13 +240,13 @@ pub fn get_app_searchable_id(workspace_folders: &[PathBuf]) -> String { .map(|s| s.trim().to_string()) }) }); - + if let Some(serial) = serial_number { if !serial.trim().is_empty() { return serial; } } - + // Third attempt: Try to get the MAC address using ifconfig let mac_address = Command::new("ifconfig") .args(&["en0"]) @@ -261,13 +262,13 @@ pub fn get_app_searchable_id(workspace_folders: &[PathBuf]) -> String { .map(|s| s.trim().replace(":", "")) }) }); - + if let Some(mac) = mac_address { if !mac.trim().is_empty() && mac != "000000000000" { return mac; } } - + // Final fallback: Generate a random ID and store it persistently // This is just a temporary solution in case all other methods fail let mut rng = rand::thread_rng(); diff --git a/refact-agent/engine/src/http/routers/v1/workspace.rs b/refact-agent/engine/src/http/routers/v1/workspace.rs index d145b5832..61f8d4690 100644 --- a/refact-agent/engine/src/http/routers/v1/workspace.rs +++ b/refact-agent/engine/src/http/routers/v1/workspace.rs @@ -6,7 +6,7 @@ use serde::{Deserialize, Serialize}; use tokio::sync::RwLock as ARwLock; use tracing::info; use crate::custom_error::ScratchError; -use crate::global_context::GlobalContext; +use crate::global_context::{APP_CAPTURE_ID, GlobalContext}; #[derive(Serialize, Deserialize, Clone, Debug)] @@ -25,7 +25,7 @@ pub async fn handle_v1_set_active_group_id( info!("set active group id to {}", post.group_id); gcx.write().await.active_group_id = Some(post.group_id); crate::cloud::threads_sub::trigger_threads_subscription_restart(gcx.clone()).await; - + Ok(Response::builder().status(StatusCode::OK).body(Body::from( serde_json::to_string(&serde_json::json!({ "success": true })).unwrap() )).unwrap()) @@ -37,6 +37,6 @@ pub async fn handle_v1_get_app_searchable_id( _body_bytes: hyper::body::Bytes, ) -> Result, ScratchError> { Ok(Response::builder().status(StatusCode::OK).body(Body::from( - serde_json::to_string(&serde_json::json!({ "app_searchable_id": gcx.read().await.app_searchable_id })).unwrap() + serde_json::to_string(&serde_json::json!({ "app_searchable_id": gcx.read().await.app_searchable_id, "app_capture_id": APP_CAPTURE_ID })).unwrap() )).unwrap()) } diff --git a/refact-agent/gui/src/services/graphql/queriesAndMutationsApi.ts b/refact-agent/gui/src/services/graphql/queriesAndMutationsApi.ts index bad437512..36e695661 100644 --- a/refact-agent/gui/src/services/graphql/queriesAndMutationsApi.ts +++ b/refact-agent/gui/src/services/graphql/queriesAndMutationsApi.ts @@ -73,6 +73,7 @@ async function fetchAppSearchableId(apiKey: string, port: number) { type GetAppSearchableIdResponse = { app_searchable_id: string; + app_capture_id: string; }; function isGetAppSearchableResponse( @@ -81,6 +82,7 @@ function isGetAppSearchableResponse( if (!response) return false; if (typeof response !== "object") return false; if (!("app_searchable_id" in response)) return false; + if (!("app_capture_id" in response)) return false; return typeof response.app_searchable_id === "string"; } @@ -211,6 +213,7 @@ export const graphqlQueriesAndMutations = createApi({ located_fgroup_id: workspace, owner_shared: false, ft_app_searchable: appIdQuery.data?.app_searchable_id, + ft_app_capture: appIdQuery.data?.app_capture_id, ft_toolset: JSON.stringify(args.tools), }; const threadQuery = await client.mutation< @@ -315,6 +318,7 @@ export const graphqlQueriesAndMutations = createApi({ located_fgroup_id: workspace, owner_shared: false, ft_app_searchable: appIdQuery.data?.app_searchable_id, + ft_app_capture: appIdQuery.data?.app_capture_id, ft_toolset: JSON.stringify(args.tools), }; const threadQuery = await client.mutation<