From 2908ef55078d97010e1cdf7ffa6022d840513274 Mon Sep 17 00:00:00 2001 From: Humberto Yusta Date: Thu, 24 Jul 2025 15:06:39 +0200 Subject: [PATCH 1/9] fix: app searchable filter should be done in back end --- refact-agent/engine/src/cloud/threads_processing.rs | 12 ------------ 1 file changed, 12 deletions(-) diff --git a/refact-agent/engine/src/cloud/threads_processing.rs b/refact-agent/engine/src/cloud/threads_processing.rs index c85087cff..f792e1b3a 100644 --- a/refact-agent/engine/src/cloud/threads_processing.rs +++ b/refact-agent/engine/src/cloud/threads_processing.rs @@ -337,7 +337,6 @@ pub async fn process_thread_event( basic_info: BasicStuff, cmd_address_url: String, api_key: String, - app_searchable_id: String, located_fgroup_id: String, ) -> Result<(), String> { if thread_payload.ft_need_tool_calls == -1 @@ -345,17 +344,6 @@ pub async fn process_thread_event( || !thread_payload.ft_locked_by.is_empty() { return Ok(()); } - if let Some(ft_app_searchable) = thread_payload.ft_app_searchable.clone() { - if ft_app_searchable != app_searchable_id { - info!("thread `{}` has different `app_searchable` id, skipping it: {} != {}", - thread_payload.ft_id, app_searchable_id, ft_app_searchable - ); - return Ok(()); - } - } else { - info!("thread `{}` doesn't have the `app_searchable` id, skipping it", thread_payload.ft_id); - return Ok(()); - } if let Some(error) = thread_payload.ft_error.as_ref() { info!("thread `{}` has the error: `{}`. Skipping it", thread_payload.ft_id, error); return Ok(()); From 2b6d2a20917906a8330d9193e85faeff0451216d Mon Sep 17 00:00:00 2001 From: Humberto Yusta Date: Thu, 24 Jul 2025 16:24:25 +0200 Subject: [PATCH 2/9] fix: add app capture id --- refact-agent/engine/src/global_context.rs | 18 +++++++++++------- .../engine/src/http/routers/v1/workspace.rs | 6 +++--- .../services/graphql/queriesAndMutationsApi.ts | 4 ++++ 3 files changed, 18 insertions(+), 10 deletions(-) diff --git a/refact-agent/engine/src/global_context.rs b/refact-agent/engine/src/global_context.rs index 425aa1091..9a7e5cc68 100644 --- a/refact-agent/engine/src/global_context.rs +++ b/refact-agent/engine/src/global_context.rs @@ -198,7 +198,7 @@ pub async fn migrate_to_config_folder( pub fn get_app_searchable_id(workspace_folders: &[PathBuf]) -> String { use std::process::Command; use rand::Rng; - + // Try multiple methods to get a unique machine identifier on macOS let machine_id = { // First attempt: Use system_profiler to get hardware UUID (most reliable) @@ -217,13 +217,13 @@ pub fn get_app_searchable_id(workspace_folders: &[PathBuf]) -> String { .map(|s| s.trim().to_string()) }) }); - + if let Some(uuid) = hardware_uuid { if !uuid.trim().is_empty() { return uuid; } } - + // Second attempt: Try to get the serial number let serial_number = Command::new("system_profiler") .args(&["SPHardwareDataType"]) @@ -239,13 +239,13 @@ pub fn get_app_searchable_id(workspace_folders: &[PathBuf]) -> String { .map(|s| s.trim().to_string()) }) }); - + if let Some(serial) = serial_number { if !serial.trim().is_empty() { return serial; } } - + // Third attempt: Try to get the MAC address using ifconfig let mac_address = Command::new("ifconfig") .args(&["en0"]) @@ -261,13 +261,13 @@ pub fn get_app_searchable_id(workspace_folders: &[PathBuf]) -> String { .map(|s| s.trim().replace(":", "")) }) }); - + if let Some(mac) = mac_address { if !mac.trim().is_empty() && mac != "000000000000" { return mac; } } - + // Final fallback: Generate a random ID and store it persistently // This is just a temporary solution in case all other methods fail let mut rng = rand::thread_rng(); @@ -319,6 +319,10 @@ pub fn get_app_searchable_id(workspace_folders: &[PathBuf]) -> String { format!("{}-{}", machine_guid, folders) } +pub fn get_app_capture_id() -> String { + format!("refact-lsp:{}", crate::version::build::PKG_VERSION) +} + pub async fn try_load_caps_quickly_if_not_present( gcx: Arc>, max_age_seconds: u64, diff --git a/refact-agent/engine/src/http/routers/v1/workspace.rs b/refact-agent/engine/src/http/routers/v1/workspace.rs index d145b5832..85798fa23 100644 --- a/refact-agent/engine/src/http/routers/v1/workspace.rs +++ b/refact-agent/engine/src/http/routers/v1/workspace.rs @@ -6,7 +6,7 @@ use serde::{Deserialize, Serialize}; use tokio::sync::RwLock as ARwLock; use tracing::info; use crate::custom_error::ScratchError; -use crate::global_context::GlobalContext; +use crate::global_context::{get_app_capture_id, GlobalContext}; #[derive(Serialize, Deserialize, Clone, Debug)] @@ -25,7 +25,7 @@ pub async fn handle_v1_set_active_group_id( info!("set active group id to {}", post.group_id); gcx.write().await.active_group_id = Some(post.group_id); crate::cloud::threads_sub::trigger_threads_subscription_restart(gcx.clone()).await; - + Ok(Response::builder().status(StatusCode::OK).body(Body::from( serde_json::to_string(&serde_json::json!({ "success": true })).unwrap() )).unwrap()) @@ -37,6 +37,6 @@ pub async fn handle_v1_get_app_searchable_id( _body_bytes: hyper::body::Bytes, ) -> Result, ScratchError> { Ok(Response::builder().status(StatusCode::OK).body(Body::from( - serde_json::to_string(&serde_json::json!({ "app_searchable_id": gcx.read().await.app_searchable_id })).unwrap() + serde_json::to_string(&serde_json::json!({ "app_searchable_id": gcx.read().await.app_searchable_id, "app_capture_id": get_app_capture_id() })).unwrap() )).unwrap()) } diff --git a/refact-agent/gui/src/services/graphql/queriesAndMutationsApi.ts b/refact-agent/gui/src/services/graphql/queriesAndMutationsApi.ts index bad437512..36e695661 100644 --- a/refact-agent/gui/src/services/graphql/queriesAndMutationsApi.ts +++ b/refact-agent/gui/src/services/graphql/queriesAndMutationsApi.ts @@ -73,6 +73,7 @@ async function fetchAppSearchableId(apiKey: string, port: number) { type GetAppSearchableIdResponse = { app_searchable_id: string; + app_capture_id: string; }; function isGetAppSearchableResponse( @@ -81,6 +82,7 @@ function isGetAppSearchableResponse( if (!response) return false; if (typeof response !== "object") return false; if (!("app_searchable_id" in response)) return false; + if (!("app_capture_id" in response)) return false; return typeof response.app_searchable_id === "string"; } @@ -211,6 +213,7 @@ export const graphqlQueriesAndMutations = createApi({ located_fgroup_id: workspace, owner_shared: false, ft_app_searchable: appIdQuery.data?.app_searchable_id, + ft_app_capture: appIdQuery.data?.app_capture_id, ft_toolset: JSON.stringify(args.tools), }; const threadQuery = await client.mutation< @@ -315,6 +318,7 @@ export const graphqlQueriesAndMutations = createApi({ located_fgroup_id: workspace, owner_shared: false, ft_app_searchable: appIdQuery.data?.app_searchable_id, + ft_app_capture: appIdQuery.data?.app_capture_id, ft_toolset: JSON.stringify(args.tools), }; const threadQuery = await client.mutation< From 4a8b3b5c3f4f72028d845d26e95d722c10b7eadd Mon Sep 17 00:00:00 2001 From: Humberto Yusta Date: Thu, 24 Jul 2025 17:28:34 +0200 Subject: [PATCH 3/9] fix: rm version of app capture --- refact-agent/engine/src/global_context.rs | 5 +---- refact-agent/engine/src/http/routers/v1/workspace.rs | 4 ++-- 2 files changed, 3 insertions(+), 6 deletions(-) diff --git a/refact-agent/engine/src/global_context.rs b/refact-agent/engine/src/global_context.rs index 9a7e5cc68..69feeca1c 100644 --- a/refact-agent/engine/src/global_context.rs +++ b/refact-agent/engine/src/global_context.rs @@ -23,6 +23,7 @@ use crate::integrations::sessions::IntegrationSession; use crate::privacy::PrivacySettings; use crate::background_tasks::BackgroundTasksHolder; +pub const APP_CAPTURE_ID: &str = "refact-agent"; #[derive(Debug, StructOpt, Clone)] pub struct CommandLine { @@ -319,10 +320,6 @@ pub fn get_app_searchable_id(workspace_folders: &[PathBuf]) -> String { format!("{}-{}", machine_guid, folders) } -pub fn get_app_capture_id() -> String { - format!("refact-lsp:{}", crate::version::build::PKG_VERSION) -} - pub async fn try_load_caps_quickly_if_not_present( gcx: Arc>, max_age_seconds: u64, diff --git a/refact-agent/engine/src/http/routers/v1/workspace.rs b/refact-agent/engine/src/http/routers/v1/workspace.rs index 85798fa23..61f8d4690 100644 --- a/refact-agent/engine/src/http/routers/v1/workspace.rs +++ b/refact-agent/engine/src/http/routers/v1/workspace.rs @@ -6,7 +6,7 @@ use serde::{Deserialize, Serialize}; use tokio::sync::RwLock as ARwLock; use tracing::info; use crate::custom_error::ScratchError; -use crate::global_context::{get_app_capture_id, GlobalContext}; +use crate::global_context::{APP_CAPTURE_ID, GlobalContext}; #[derive(Serialize, Deserialize, Clone, Debug)] @@ -37,6 +37,6 @@ pub async fn handle_v1_get_app_searchable_id( _body_bytes: hyper::body::Bytes, ) -> Result, ScratchError> { Ok(Response::builder().status(StatusCode::OK).body(Body::from( - serde_json::to_string(&serde_json::json!({ "app_searchable_id": gcx.read().await.app_searchable_id, "app_capture_id": get_app_capture_id() })).unwrap() + serde_json::to_string(&serde_json::json!({ "app_searchable_id": gcx.read().await.app_searchable_id, "app_capture_id": APP_CAPTURE_ID })).unwrap() )).unwrap()) } From 0d2e766006061f025900b8beed2f44cc05613d8c Mon Sep 17 00:00:00 2001 From: Humberto Yusta Date: Thu, 24 Jul 2025 17:31:32 +0200 Subject: [PATCH 4/9] feat: use bot thread subs instead of threads in group --- refact-agent/engine/src/cloud/subchat.rs | 45 +++++---- refact-agent/engine/src/cloud/threads_sub.rs | 97 ++++++++++++++------ 2 files changed, 95 insertions(+), 47 deletions(-) diff --git a/refact-agent/engine/src/cloud/subchat.rs b/refact-agent/engine/src/cloud/subchat.rs index 9981f35f7..774271737 100644 --- a/refact-agent/engine/src/cloud/subchat.rs +++ b/refact-agent/engine/src/cloud/subchat.rs @@ -8,7 +8,7 @@ use crate::cloud::{threads_req, messages_req}; use serde::{Deserialize, Serialize}; use serde_json::Value; use tokio_tungstenite::tungstenite::Message; -use tracing::{error, info}; +use tracing::{error, info, warn}; use crate::cloud::threads_sub::{initialize_connection, ThreadPayload}; #[derive(Serialize, Deserialize, Debug)] @@ -63,7 +63,7 @@ pub async fn subchat( ccx.lock().await.chat_id.clone(), ) }; - + let model_name = crate::cloud::experts_req::expert_choice_consequences(&cmd_address_url, &api_key, ft_fexp_id, &located_fgroup_id).await?; let preferences = build_preferences(&model_name, temperature, max_new_tokens, 1, reasoning_effort); let existing_threads = crate::cloud::threads_req::get_threads_app_captured( @@ -100,9 +100,9 @@ pub async fn subchat( ).await?; thread }; - + let thread_id = thread.ft_id.clone(); - let connection_result = initialize_connection(&cmd_address_url, &api_key, &located_fgroup_id).await; + let connection_result = initialize_connection(&cmd_address_url, &api_key, &located_fgroup_id, &app_searchable_id).await; let mut connection = match connection_result { Ok(conn) => conn, Err(err) => return Err(format!("Failed to initialize WebSocket connection: {}", err)), @@ -125,20 +125,31 @@ pub async fn subchat( "data" => { if let Some(payload) = response["payload"].as_object() { let data = &payload["data"]; - let threads_in_group = &data["threads_in_group"]; - let news_action = threads_in_group["news_action"].as_str().unwrap_or(""); - if news_action != "INSERT" && news_action != "UPDATE" { - continue; - } - if let Ok(payload) = serde_json::from_value::(threads_in_group["news_payload"].clone()) { - if payload.ft_id != thread_id { + let threads_and_calls_subs = &data["bot_threads_and_calls_subs"]; + let news_action = threads_and_calls_subs["news_action"].as_str().unwrap_or(""); + let news_about = threads_and_calls_subs["news_about"].as_str().unwrap_or(""); + + if news_about == "flexus_thread" { + + if news_action != "INSERT" && news_action != "UPDATE" { continue; } - if payload.ft_error.is_some() { - break; + + if let Some(news_payload_thread) = threads_and_calls_subs["news_payload_thread"].as_object() { + if let Ok(payload) = serde_json::from_value::(serde_json::Value::Object(news_payload_thread.clone())) { + if payload.ft_id != thread_id { + continue; + } + if payload.ft_error.is_some() { + break; + } + } else { + info!("failed to parse thread payload: {:?}", news_payload_thread); + } + } else { + info!("received thread update but couldn't find news_payload_thread"); } - } else { - info!("failed to parse thread payload: {:?}", threads_in_group); + } } else { info!("received data message but couldn't find payload"); @@ -151,7 +162,7 @@ pub async fn subchat( error!("threads subscription complete: {}.\nRestarting it", text); } _ => { - info!("received message with unknown type: {}", text); + warn!("received message with unknown type: {}", text); } } } @@ -175,7 +186,7 @@ pub async fn subchat( return Err(format!("Thread error: {:?}", error)); } } - + let all_thread_messages = messages_req::get_thread_messages( &cmd_address_url, &api_key, &thread_id, 100 ).await?; diff --git a/refact-agent/engine/src/cloud/threads_sub.rs b/refact-agent/engine/src/cloud/threads_sub.rs index f9aed471d..12ac3835d 100644 --- a/refact-agent/engine/src/cloud/threads_sub.rs +++ b/refact-agent/engine/src/cloud/threads_sub.rs @@ -1,4 +1,4 @@ -use crate::global_context::GlobalContext; +use crate::global_context::{GlobalContext, APP_CAPTURE_ID}; use futures::{SinkExt, StreamExt}; use crate::cloud::graphql_client::{execute_graphql, GraphQLRequestConfig, graphql_error_to_string}; use serde::{Deserialize, Serialize}; @@ -37,13 +37,32 @@ pub struct BasicStuff { pub workspaces: Vec, } -// XXX use xxx_subs::filter for ft_app_capture const THREADS_SUBSCRIPTION_QUERY: &str = r#" - subscription ThreadsPageSubs($located_fgroup_id: String!) { - threads_in_group(located_fgroup_id: $located_fgroup_id) { + subscription ThreadsAndCallsSubs( + $fgroup_id: String, + $marketable_name: String!, + $ft_app_searchable: String!, + $inprocess_tool_names: [String!]!, + $max_threads: Int!, + $want_personas: Boolean!, + $want_threads: Boolean!, + $want_messages: Boolean!, + ) { + bot_threads_and_calls_subs( + fgroup_id: $fgroup_id, + marketable_name: $marketable_name, + marketable_version: "", + ft_app_searchable: $ft_app_searchable, + inprocess_tool_names: $inprocess_tool_names, + max_threads: $max_threads, + want_personas: $want_personas, + want_threads: $want_threads, + want_messages: $want_messages, + ) { news_action + news_about news_payload_id - news_payload { + news_payload_thread { owner_fuser_id ft_id ft_error @@ -86,11 +105,13 @@ pub async fn watch_threads_subscription(gcx: Arc>) { continue; }; + let app_searchable_id = gcx.read().await.app_searchable_id.clone(); + info!( - "starting subscription for threads_in_group with fgroup_id=\"{}\"", - located_fgroup_id + "starting subscription for bot_threads_and_calls_subs with fgroup_id=\"{}\" and app_searchable_id=\"{}\"", + located_fgroup_id, app_searchable_id ); - let connection_result = initialize_connection(&address_url, &api_key, &located_fgroup_id).await; + let connection_result = initialize_connection(&address_url, &api_key, &located_fgroup_id, &app_searchable_id).await; let mut connection = match connection_result { Ok(conn) => conn, Err(err) => { @@ -129,6 +150,7 @@ pub async fn initialize_connection( cmd_address_url: &str, api_key: &str, located_fgroup_id: &str, + app_searchable_id: &str, ) -> Result< futures::stream::SplitStream< tokio_tungstenite::WebSocketStream< @@ -208,7 +230,14 @@ pub async fn initialize_connection( "payload": { "query": THREADS_SUBSCRIPTION_QUERY, "variables": { - "located_fgroup_id": located_fgroup_id + "fgroup_id": located_fgroup_id, + "marketable_name": APP_CAPTURE_ID, + "ft_app_searchable": app_searchable_id, + "inprocess_tool_names": [], + "max_threads": 100, + "want_personas": false, + "want_threads": true, + "want_messages": false, } } }); @@ -231,7 +260,6 @@ async fn actual_subscription_loop( located_fgroup_id: &str, ) -> Result<(), String> { info!("cloud threads subscription started, waiting for events..."); - let app_searchable_id = gcx.read().await.app_searchable_id.clone(); let basic_info = get_basic_info(cmd_address_url, api_key).await?; while let Some(msg) = connection.next().await { if gcx.clone().read().await.shutdown_flag.load(Ordering::SeqCst) { @@ -255,26 +283,35 @@ async fn actual_subscription_loop( "data" => { if let Some(payload) = response["payload"].as_object() { let data = &payload["data"]; - let threads_in_group = &data["threads_in_group"]; - let news_action = threads_in_group["news_action"].as_str().unwrap_or(""); - if news_action != "INSERT" && news_action != "UPDATE" { - continue; - } - if let Ok(payload) = serde_json::from_value::(threads_in_group["news_payload"].clone()) { - let gcx_clone = gcx.clone(); - let payload_clone = payload.clone(); - let basic_info_clone = basic_info.clone(); - let cmd_address_url_clone = cmd_address_url.to_string(); - let api_key_clone = api_key.to_string(); - let app_searchable_id_clone = app_searchable_id.clone(); - let located_fgroup_id_clone = located_fgroup_id.to_string(); - tokio::spawn(async move { - crate::cloud::threads_processing::process_thread_event( - gcx_clone, payload_clone, basic_info_clone, cmd_address_url_clone, api_key_clone, app_searchable_id_clone, located_fgroup_id_clone - ).await - }); - } else { - info!("failed to parse thread payload: {:?}", threads_in_group); + let threads_and_calls_subs = &data["bot_threads_and_calls_subs"]; + let news_action = threads_and_calls_subs["news_action"].as_str().unwrap_or(""); + let news_about = threads_and_calls_subs["news_about"].as_str().unwrap_or(""); + + if news_about == "flexus_thread" { + + if news_action != "INSERT" && news_action != "UPDATE" { + continue; + } + + if let Some(news_payload_thread) = threads_and_calls_subs["news_payload_thread"].as_object() { + if let Ok(payload) = serde_json::from_value::(serde_json::Value::Object(news_payload_thread.clone())) { + let gcx_clone = gcx.clone(); + let payload_clone = payload.clone(); + let basic_info_clone = basic_info.clone(); + let cmd_address_url_clone = cmd_address_url.to_string(); + let api_key_clone = api_key.to_string(); + let located_fgroup_id_clone = located_fgroup_id.to_string(); + tokio::spawn(async move { + crate::cloud::threads_processing::process_thread_event( + gcx_clone, payload_clone, basic_info_clone, cmd_address_url_clone, api_key_clone, located_fgroup_id_clone + ).await + }); + } else { + info!("failed to parse thread payload: {:?}", news_payload_thread); + } + } else { + info!("received thread update but couldn't find news_payload_thread"); + } } } else { info!("received data message but couldn't find payload"); From 97b623ea530bdf0c0fb0fa715a70f23ef3929b3a Mon Sep 17 00:00:00 2001 From: Humberto Yusta Date: Thu, 24 Jul 2025 18:28:04 +0200 Subject: [PATCH 5/9] feat: listen for messages and create checkpoints for new user messages without checkpoints --- .../engine/src/cloud/threads_processing.rs | 32 +++++++++- refact-agent/engine/src/cloud/threads_sub.rs | 61 ++++++++++++++----- 2 files changed, 78 insertions(+), 15 deletions(-) diff --git a/refact-agent/engine/src/cloud/threads_processing.rs b/refact-agent/engine/src/cloud/threads_processing.rs index f792e1b3a..93445cd96 100644 --- a/refact-agent/engine/src/cloud/threads_processing.rs +++ b/refact-agent/engine/src/cloud/threads_processing.rs @@ -1,5 +1,6 @@ use std::collections::{HashMap, HashSet}; use std::sync::Arc; +use std::time::SystemTime; use indexmap::IndexMap; use tokio::sync::RwLock as ARwLock; use tokio::sync::Mutex as AMutex; @@ -10,7 +11,8 @@ use crate::basic_utils::generate_random_hash; use crate::call_validation::{ChatContent, ChatMessage, ChatToolCall, ContextEnum, ContextFile}; use crate::cloud::messages_req::ThreadMessage; use crate::cloud::threads_req::{lock_thread, Thread}; -use crate::cloud::threads_sub::{BasicStuff, ThreadPayload}; +use crate::cloud::threads_sub::{BasicStuff, ThreadMessagePayload, ThreadPayload}; +use crate::git::checkpoints::create_workspace_checkpoint; use crate::global_context::GlobalContext; use crate::scratchpads::scratchpad_utils::max_tokens_for_rag_chat_by_tools; use crate::tools::tools_description::{MatchConfirmDeny, MatchConfirmDenyResult, Tool}; @@ -374,6 +376,34 @@ pub async fn process_thread_event( process_result } +pub async fn process_thread_message_event( + gcx: Arc>, + mut thread_message_payload: ThreadMessagePayload, + basic_info: BasicStuff, + cmd_address_url: String, + api_key: String, + located_fgroup_id: String, +) -> Result<(), String> { + let old_message_cutoff = SystemTime::now().duration_since(SystemTime::UNIX_EPOCH).unwrap().as_secs() - 300; + if thread_message_payload.ftm_role != "user" || thread_message_payload.ftm_created_ts < old_message_cutoff as f64 { + return Ok(()); + } + if thread_message_payload.ftm_app_specific.as_ref().is_some_and(|a| a.get("checkpoints").is_some()) { + return Ok(()); + } + let (checkpoints, _) = create_workspace_checkpoint(gcx.clone(), None, &thread_message_payload.ftm_belongs_to_ft_id).await?; + + let mut app_specific = thread_message_payload.ftm_app_specific.as_ref() + .and_then(|v| v.as_object().cloned()) + .unwrap_or_else(|| serde_json::Map::new()); + app_specific.insert("checkpoints".to_string(), serde_json::json!(checkpoints)); + thread_message_payload.ftm_app_specific = Some(Value::Object(app_specific)); + + tracing::info!("Created checkpoints: {:#?}", checkpoints); + + Ok(()) +} + async fn process_locked_thread( gcx: Arc>, thread_payload: &ThreadPayload, diff --git a/refact-agent/engine/src/cloud/threads_sub.rs b/refact-agent/engine/src/cloud/threads_sub.rs index 12ac3835d..5a3dd3d1c 100644 --- a/refact-agent/engine/src/cloud/threads_sub.rs +++ b/refact-agent/engine/src/cloud/threads_sub.rs @@ -31,6 +31,16 @@ pub struct ThreadPayload { pub ft_confirmation_response: Option, } +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct ThreadMessagePayload { + pub ftm_belongs_to_ft_id: String, + pub ftm_alt: i64, + pub ftm_num: i64, + pub ftm_role: String, + pub ftm_app_specific: Option, + pub ftm_created_ts: f64, +} + #[derive(Debug, Serialize, Deserialize, Clone)] pub struct BasicStuff { pub fuser_id: String, @@ -44,9 +54,6 @@ const THREADS_SUBSCRIPTION_QUERY: &str = r#" $ft_app_searchable: String!, $inprocess_tool_names: [String!]!, $max_threads: Int!, - $want_personas: Boolean!, - $want_threads: Boolean!, - $want_messages: Boolean!, ) { bot_threads_and_calls_subs( fgroup_id: $fgroup_id, @@ -55,9 +62,9 @@ const THREADS_SUBSCRIPTION_QUERY: &str = r#" ft_app_searchable: $ft_app_searchable, inprocess_tool_names: $inprocess_tool_names, max_threads: $max_threads, - want_personas: $want_personas, - want_threads: $want_threads, - want_messages: $want_messages, + want_personas: false, + want_threads: true, + want_messages: true, ) { news_action news_about @@ -76,6 +83,14 @@ const THREADS_SUBSCRIPTION_QUERY: &str = r#" ft_app_capture ft_app_specific } + news_payload_thread_message { + ftm_belongs_to_ft_id + ftm_num + ftm_alt + ftm_role + ftm_app_specific + ftm_created_ts + } } } "#; @@ -235,9 +250,6 @@ pub async fn initialize_connection( "ft_app_searchable": app_searchable_id, "inprocess_tool_names": [], "max_threads": 100, - "want_personas": false, - "want_threads": true, - "want_messages": false, } } }); @@ -287,12 +299,11 @@ async fn actual_subscription_loop( let news_action = threads_and_calls_subs["news_action"].as_str().unwrap_or(""); let news_about = threads_and_calls_subs["news_about"].as_str().unwrap_or(""); - if news_about == "flexus_thread" { - - if news_action != "INSERT" && news_action != "UPDATE" { - continue; - } + if news_action != "INSERT" && news_action != "UPDATE" { + continue; + } + if news_about == "flexus_thread" { if let Some(news_payload_thread) = threads_and_calls_subs["news_payload_thread"].as_object() { if let Ok(payload) = serde_json::from_value::(serde_json::Value::Object(news_payload_thread.clone())) { let gcx_clone = gcx.clone(); @@ -313,6 +324,28 @@ async fn actual_subscription_loop( info!("received thread update but couldn't find news_payload_thread"); } } + + if news_about == "flexus_thread_message" { + if let Some(news_payload_thread_message) = threads_and_calls_subs["news_payload_thread_message"].as_object() { + if let Ok(payload) = serde_json::from_value::(serde_json::Value::Object(news_payload_thread_message.clone())) { + let gcx_clone = gcx.clone(); + let basic_info_clone = basic_info.clone(); + let cmd_address_url_clone = cmd_address_url.to_string(); + let api_key_clone = api_key.to_string(); + let located_fgroup_id_clone = located_fgroup_id.to_string(); + tokio::spawn(async move { + crate::cloud::threads_processing::process_thread_message_event( + gcx_clone, payload, basic_info_clone, cmd_address_url_clone, api_key_clone, located_fgroup_id_clone + ).await + }); + } else { + info!("failed to parse thread message payload: {:?}", news_payload_thread_message); + } + } else { + info!("received thread message update but couldn't find news_payload_thread_message"); + } + } + } else { info!("received data message but couldn't find payload"); } From 74b05f1f9357109bd1ff99664ff5baa11b4f6a2f Mon Sep 17 00:00:00 2001 From: Humberto Yusta Date: Fri, 25 Jul 2025 11:30:33 +0200 Subject: [PATCH 6/9] feat: add method to update messages app specific --- refact-agent/engine/src/cloud/messages_req.rs | 45 +++++++++++++++++++ 1 file changed, 45 insertions(+) diff --git a/refact-agent/engine/src/cloud/messages_req.rs b/refact-agent/engine/src/cloud/messages_req.rs index 73eab1629..cd2c28159 100644 --- a/refact-agent/engine/src/cloud/messages_req.rs +++ b/refact-agent/engine/src/cloud/messages_req.rs @@ -303,3 +303,48 @@ pub async fn get_tool_names_from_openai_format( } Ok(tool_names) } + +pub async fn thread_message_patch_app_specific( + cmd_address_url: &str, + api_key: &str, + ftm_belongs_to_ft_id: &str, + ftm_alt: i64, + ftm_num: i64, + ftm_app_specific: Value, +) -> Result<(), String> { + let mutation = r#" + mutation ThreadMessagePatch($input: FThreadMessagePatch!) { + thread_message_patch(input: $input) + } + "#; + + let variables = json!({ + "input": { + "ftm_belongs_to_ft_id": ftm_belongs_to_ft_id, + "ftm_alt": ftm_alt, + "ftm_num": ftm_num, + "ftm_app_specific": serde_json::to_string(&ftm_app_specific).unwrap() + } + }); + + let config = GraphQLRequestConfig { + address: cmd_address_url.to_string(), + api_key: api_key.to_string(), + user_agent: Some("refact-lsp".to_string()), + additional_headers: None, + }; + + tracing::info!( + "thread_message_patch_app_specific: address={}, ftm_belongs_to_ft_id={}, ftm_alt={}, ftm_num={}", + config.address, ftm_belongs_to_ft_id, ftm_alt, ftm_num + ); + + execute_graphql_no_result( + config, + mutation, + variables, + "thread_message_update" + ) + .await + .map_err(graphql_error_to_string) +} From bbfb434552a738720418ca7b605ebf73bbdfd378 Mon Sep 17 00:00:00 2001 From: Humberto Yusta Date: Fri, 25 Jul 2025 11:30:58 +0200 Subject: [PATCH 7/9] fix: update messages after checkpoints update --- .../engine/src/cloud/threads_processing.rs | 16 ++++++++++++---- refact-agent/engine/src/cloud/threads_sub.rs | 4 +--- 2 files changed, 13 insertions(+), 7 deletions(-) diff --git a/refact-agent/engine/src/cloud/threads_processing.rs b/refact-agent/engine/src/cloud/threads_processing.rs index 93445cd96..280daa458 100644 --- a/refact-agent/engine/src/cloud/threads_processing.rs +++ b/refact-agent/engine/src/cloud/threads_processing.rs @@ -379,10 +379,8 @@ pub async fn process_thread_event( pub async fn process_thread_message_event( gcx: Arc>, mut thread_message_payload: ThreadMessagePayload, - basic_info: BasicStuff, cmd_address_url: String, api_key: String, - located_fgroup_id: String, ) -> Result<(), String> { let old_message_cutoff = SystemTime::now().duration_since(SystemTime::UNIX_EPOCH).unwrap().as_secs() - 300; if thread_message_payload.ftm_role != "user" || thread_message_payload.ftm_created_ts < old_message_cutoff as f64 { @@ -391,15 +389,25 @@ pub async fn process_thread_message_event( if thread_message_payload.ftm_app_specific.as_ref().is_some_and(|a| a.get("checkpoints").is_some()) { return Ok(()); } + let (checkpoints, _) = create_workspace_checkpoint(gcx.clone(), None, &thread_message_payload.ftm_belongs_to_ft_id).await?; let mut app_specific = thread_message_payload.ftm_app_specific.as_ref() .and_then(|v| v.as_object().cloned()) .unwrap_or_else(|| serde_json::Map::new()); app_specific.insert("checkpoints".to_string(), serde_json::json!(checkpoints)); - thread_message_payload.ftm_app_specific = Some(Value::Object(app_specific)); + thread_message_payload.ftm_app_specific = Some(Value::Object(app_specific.clone())); + + crate::cloud::messages_req::thread_message_patch_app_specific( + &cmd_address_url, + &api_key, + &thread_message_payload.ftm_belongs_to_ft_id, + thread_message_payload.ftm_alt, + thread_message_payload.ftm_num, + Value::Object(app_specific), + ).await?; - tracing::info!("Created checkpoints: {:#?}", checkpoints); + tracing::info!("Created and stored checkpoints: {:#?}", checkpoints); Ok(()) } diff --git a/refact-agent/engine/src/cloud/threads_sub.rs b/refact-agent/engine/src/cloud/threads_sub.rs index 5a3dd3d1c..8a1248d3b 100644 --- a/refact-agent/engine/src/cloud/threads_sub.rs +++ b/refact-agent/engine/src/cloud/threads_sub.rs @@ -329,13 +329,11 @@ async fn actual_subscription_loop( if let Some(news_payload_thread_message) = threads_and_calls_subs["news_payload_thread_message"].as_object() { if let Ok(payload) = serde_json::from_value::(serde_json::Value::Object(news_payload_thread_message.clone())) { let gcx_clone = gcx.clone(); - let basic_info_clone = basic_info.clone(); let cmd_address_url_clone = cmd_address_url.to_string(); let api_key_clone = api_key.to_string(); - let located_fgroup_id_clone = located_fgroup_id.to_string(); tokio::spawn(async move { crate::cloud::threads_processing::process_thread_message_event( - gcx_clone, payload, basic_info_clone, cmd_address_url_clone, api_key_clone, located_fgroup_id_clone + gcx_clone, payload, cmd_address_url_clone, api_key_clone ).await }); } else { From c5ff41a1c49a36ca42fa9ce3c2bbf22e46c29995 Mon Sep 17 00:00:00 2001 From: Humberto Yusta Date: Fri, 25 Jul 2025 17:06:04 +0200 Subject: [PATCH 8/9] fix: expert choice consequences has models {} now in result --- refact-agent/engine/src/cloud/experts_req.rs | 14 ++++++++++---- 1 file changed, 10 insertions(+), 4 deletions(-) diff --git a/refact-agent/engine/src/cloud/experts_req.rs b/refact-agent/engine/src/cloud/experts_req.rs index 4a698e393..222c98b51 100644 --- a/refact-agent/engine/src/cloud/experts_req.rs +++ b/refact-agent/engine/src/cloud/experts_req.rs @@ -112,7 +112,9 @@ pub async fn expert_choice_consequences( let query = r#" query GetExpertModel($fexp_id: String!, $inside_fgroup_id: String!) { expert_choice_consequences(fexp_id: $fexp_id, inside_fgroup_id: $inside_fgroup_id) { - provm_name + models { + provm_name + } } } "#; @@ -129,7 +131,11 @@ pub async fn expert_choice_consequences( }); info!("expert_choice_consequences: address={}, fexp_id={}, inside_fgroup_id={}", config.address, fexp_id, fgroup_id); - let result: Vec = execute_graphql( + #[derive(Deserialize, Debug)] + struct Consequences { + models: Vec, + } + let result: Consequences = execute_graphql( config, query, variables, @@ -138,9 +144,9 @@ pub async fn expert_choice_consequences( .await .map_err(|e| e.to_string())?; - if result.is_empty() { + if result.models.is_empty() { return Err(format!("No models found for the expert with name {}", fexp_id)); } - Ok(result[0].provm_name.clone()) + Ok(result.models[0].provm_name.clone()) } From dd0494e3fb9412a38eb1433c4bc6e7c3ab725070 Mon Sep 17 00:00:00 2001 From: Humberto Yusta Date: Fri, 25 Jul 2025 17:07:18 +0200 Subject: [PATCH 9/9] fix: subchat using same app capture, get threads for subchat filter by tool call id --- refact-agent/engine/src/cloud/subchat.rs | 13 ++--- refact-agent/engine/src/cloud/threads_req.rs | 57 +++++++++++--------- 2 files changed, 38 insertions(+), 32 deletions(-) diff --git a/refact-agent/engine/src/cloud/subchat.rs b/refact-agent/engine/src/cloud/subchat.rs index 774271737..574f13f8f 100644 --- a/refact-agent/engine/src/cloud/subchat.rs +++ b/refact-agent/engine/src/cloud/subchat.rs @@ -2,6 +2,7 @@ use std::sync::Arc; use std::sync::atomic::Ordering; use futures::StreamExt; use crate::at_commands::at_commands::AtCommandsContext; +use crate::global_context::APP_CAPTURE_ID; use tokio::sync::Mutex as AMutex; use crate::call_validation::{ChatMessage, ReasoningEffort}; use crate::cloud::{threads_req, messages_req}; @@ -71,7 +72,8 @@ pub async fn subchat( &api_key, &located_fgroup_id, &app_searchable_id, - tool_call_id + APP_CAPTURE_ID, + Some(tool_call_id), ).await?; let thread = if !existing_threads.is_empty() { info!("There are already existing threads for this tool_id: {:?}", existing_threads); @@ -83,14 +85,13 @@ pub async fn subchat( &located_fgroup_id, ft_fexp_id, &format!("subchat_{}", ft_fexp_id), - &tool_call_id, + APP_CAPTURE_ID, &app_searchable_id, serde_json::json!({ - "tool_call_id": tool_call_id, - "ft_fexp_id": ft_fexp_id, - }), + "tool_call_id": tool_call_id, + "ft_fexp_id": ft_fexp_id, + }), None, - Some(parent_thread_id) ).await?; let thread_messages = messages_req::convert_messages_to_thread_messages( messages, 100, 100, 1, &thread.ft_id, Some(preferences) diff --git a/refact-agent/engine/src/cloud/threads_req.rs b/refact-agent/engine/src/cloud/threads_req.rs index efa7b9e02..d7b1c18fb 100644 --- a/refact-agent/engine/src/cloud/threads_req.rs +++ b/refact-agent/engine/src/cloud/threads_req.rs @@ -32,10 +32,9 @@ pub async fn create_thread( ft_app_searchable: &str, ft_app_specific: Value, ft_toolset: Option>, - parent_ft_id: Option, ) -> Result { use crate::cloud::graphql_client::{execute_graphql, GraphQLRequestConfig}; - + let mutation = r#" mutation CreateThread($input: FThreadInput!) { thread_create(input: $input) { @@ -64,8 +63,8 @@ pub async fn create_thread( Some(toolset) => serde_json::to_string(&toolset).map_err(|e| format!("Failed to serialize toolset: {}", e))?, None => "null".to_string(), }; - - let mut input = json!({ + + let input = json!({ "owner_shared": false, "located_fgroup_id": located_fgroup_id, "ft_fexp_id": ft_fexp_id, @@ -76,10 +75,6 @@ pub async fn create_thread( "ft_app_specific": serde_json::to_string(&ft_app_specific).unwrap(), }); - if let Some(parent_id) = parent_ft_id { - input["parent_ft_id"] = json!(parent_id); - } - let config = GraphQLRequestConfig { address: cmd_address_url.to_string(), api_key: api_key.to_string(), @@ -104,7 +99,7 @@ pub async fn get_thread( thread_id: &str, ) -> Result { use crate::cloud::graphql_client::{execute_graphql, GraphQLRequestConfig}; - + let query = r#" query GetThread($id: String!) { thread_get(id: $id) { @@ -151,12 +146,18 @@ pub async fn get_threads_app_captured( located_fgroup_id: &str, ft_app_searchable: &str, ft_app_capture: &str, + tool_call_id: Option<&str>, ) -> Result, String> { use crate::cloud::graphql_client::{execute_graphql, GraphQLRequestConfig}; - + let query = r#" - query GetThread($located_fgroup_id: String!, $ft_app_capture: String!, $ft_app_searchable: String!) { - threads_app_captured(located_fgroup_id: $located_fgroup_id, ft_app_capture: $ft_app_capture, ft_app_searchable: $ft_app_searchable) { + query GetThread($located_fgroup_id: String!, $ft_app_capture: String!, $ft_app_searchable: String!, $ft_app_specific_filters: [FTAppSpecificFilter!]) { + threads_app_captured( + located_fgroup_id: $located_fgroup_id, + ft_app_capture: $ft_app_capture, + ft_app_searchable: $ft_app_searchable, + ft_app_specific_filters: $ft_app_specific_filters + ) { owner_fuser_id owner_shared located_fgroup_id @@ -187,7 +188,11 @@ pub async fn get_threads_app_captured( let variables = json!({ "located_fgroup_id": located_fgroup_id, "ft_app_capture": ft_app_capture, - "ft_app_searchable": ft_app_searchable + "ft_app_searchable": ft_app_searchable, + "ft_app_specific_filters": match tool_call_id { + Some(id) => vec![json!({ "path": "tool_call_id", "equals": id })], + None => Vec::new(), + }, }); tracing::info!("get_threads_app_captured: address={}, located_fgroup_id={}, ft_app_capture={}, ft_app_searchable={}", config.address, located_fgroup_id, ft_app_capture, ft_app_searchable @@ -209,7 +214,7 @@ pub async fn set_thread_toolset( ft_toolset: Vec ) -> Result, String> { use crate::cloud::graphql_client::{execute_graphql, GraphQLRequestConfig}; - + let mutation = r#" mutation UpdateThread($thread_id: String!, $patch: FThreadPatch!) { thread_patch(id: $thread_id, patch: $patch) { @@ -217,7 +222,7 @@ pub async fn set_thread_toolset( } } "#; - + let variables = json!({ "thread_id": thread_id, "patch": { @@ -258,12 +263,12 @@ pub async fn lock_thread( hash: &str, ) -> Result<(), String> { use crate::cloud::graphql_client::{execute_graphql_bool_result, GraphQLRequestConfig}; - + let worker_name = format!("refact-lsp:{hash}"); let query = r#" mutation AdvanceLock($ft_id: String!, $worker_name: String!) { thread_lock(ft_id: $ft_id, worker_name: $worker_name) - } + } "#; let config = GraphQLRequestConfig { @@ -273,7 +278,7 @@ pub async fn lock_thread( }; let variables = json!({ - "ft_id": thread_id, + "ft_id": thread_id, "worker_name": worker_name }); @@ -303,7 +308,7 @@ pub async fn unlock_thread( hash: &str, ) -> Result<(), String> { use crate::cloud::graphql_client::{execute_graphql_bool_result, GraphQLRequestConfig}; - + let worker_name = format!("refact-lsp:{hash}"); let query = r#" mutation AdvanceUnlock($ft_id: String!, $worker_name: String!) { @@ -318,7 +323,7 @@ pub async fn unlock_thread( }; let variables = json!({ - "ft_id": thread_id, + "ft_id": thread_id, "worker_name": worker_name }); @@ -348,7 +353,7 @@ pub async fn set_error_thread( error: &str, ) -> Result<(), String> { use crate::cloud::graphql_client::{execute_graphql_no_result, GraphQLRequestConfig}; - + let mutation = r#" mutation SetThreadError($thread_id: String!, $patch: FThreadPatch!) { thread_patch(id: $thread_id, patch: $patch) { @@ -356,7 +361,7 @@ pub async fn set_error_thread( } } "#; - + let variables = json!({ "thread_id": thread_id, "patch": { @@ -369,7 +374,7 @@ pub async fn set_error_thread( api_key: api_key.to_string(), ..Default::default() }; - + tracing::info!("unlock_thread: address={}, thread_id={}, ft_error={}", config.address, thread_id, error ); @@ -390,16 +395,16 @@ pub async fn set_thread_confirmation_request( confirmation_request: Value, ) -> Result { use crate::cloud::graphql_client::{execute_graphql_bool_result, GraphQLRequestConfig}; - + let mutation = r#" mutation SetThreadConfirmationRequest($ft_id: String!, $confirmation_request: String!) { thread_set_confirmation_request(ft_id: $ft_id, confirmation_request: $confirmation_request) } "#; - + let confirmation_request_str = serde_json::to_string(&confirmation_request) .map_err(|e| format!("Failed to serialize confirmation request: {}", e))?; - + let variables = json!({ "ft_id": thread_id, "confirmation_request": confirmation_request_str