Skip to content
14 changes: 10 additions & 4 deletions refact-agent/engine/src/cloud/experts_req.rs
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,9 @@ pub async fn expert_choice_consequences(
let query = r#"
query GetExpertModel($fexp_id: String!, $inside_fgroup_id: String!) {
expert_choice_consequences(fexp_id: $fexp_id, inside_fgroup_id: $inside_fgroup_id) {
provm_name
models {
provm_name
}
}
}
"#;
Expand All @@ -129,7 +131,11 @@ pub async fn expert_choice_consequences(
});

info!("expert_choice_consequences: address={}, fexp_id={}, inside_fgroup_id={}", config.address, fexp_id, fgroup_id);
let result: Vec<ModelInfo> = execute_graphql(
#[derive(Deserialize, Debug)]
struct Consequences {
models: Vec<ModelInfo>,
}
let result: Consequences = execute_graphql(
config,
query,
variables,
Expand All @@ -138,9 +144,9 @@ pub async fn expert_choice_consequences(
.await
.map_err(|e| e.to_string())?;

if result.is_empty() {
if result.models.is_empty() {
return Err(format!("No models found for the expert with name {}", fexp_id));
}

Ok(result[0].provm_name.clone())
Ok(result.models[0].provm_name.clone())
}
45 changes: 45 additions & 0 deletions refact-agent/engine/src/cloud/messages_req.rs
Original file line number Diff line number Diff line change
Expand Up @@ -303,3 +303,48 @@ pub async fn get_tool_names_from_openai_format(
}
Ok(tool_names)
}

pub async fn thread_message_patch_app_specific(
cmd_address_url: &str,
api_key: &str,
ftm_belongs_to_ft_id: &str,
ftm_alt: i64,
ftm_num: i64,
ftm_app_specific: Value,
) -> Result<(), String> {
let mutation = r#"
mutation ThreadMessagePatch($input: FThreadMessagePatch!) {
thread_message_patch(input: $input)
}
"#;

let variables = json!({
"input": {
"ftm_belongs_to_ft_id": ftm_belongs_to_ft_id,
"ftm_alt": ftm_alt,
"ftm_num": ftm_num,
"ftm_app_specific": serde_json::to_string(&ftm_app_specific).unwrap()
}
});

let config = GraphQLRequestConfig {
address: cmd_address_url.to_string(),
api_key: api_key.to_string(),
user_agent: Some("refact-lsp".to_string()),
additional_headers: None,
};

tracing::info!(
"thread_message_patch_app_specific: address={}, ftm_belongs_to_ft_id={}, ftm_alt={}, ftm_num={}",
config.address, ftm_belongs_to_ft_id, ftm_alt, ftm_num
);

execute_graphql_no_result(
config,
mutation,
variables,
"thread_message_update"
)
.await
.map_err(graphql_error_to_string)
}
58 changes: 35 additions & 23 deletions refact-agent/engine/src/cloud/subchat.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,14 @@ use std::sync::Arc;
use std::sync::atomic::Ordering;
use futures::StreamExt;
use crate::at_commands::at_commands::AtCommandsContext;
use crate::global_context::APP_CAPTURE_ID;
use tokio::sync::Mutex as AMutex;
use crate::call_validation::{ChatMessage, ReasoningEffort};
use crate::cloud::{threads_req, messages_req};
use serde::{Deserialize, Serialize};
use serde_json::Value;
use tokio_tungstenite::tungstenite::Message;
use tracing::{error, info};
use tracing::{error, info, warn};
use crate::cloud::threads_sub::{initialize_connection, ThreadPayload};

#[derive(Serialize, Deserialize, Debug)]
Expand Down Expand Up @@ -63,15 +64,16 @@ pub async fn subchat(
ccx.lock().await.chat_id.clone(),
)
};

let model_name = crate::cloud::experts_req::expert_choice_consequences(&cmd_address_url, &api_key, ft_fexp_id, &located_fgroup_id).await?;
let preferences = build_preferences(&model_name, temperature, max_new_tokens, 1, reasoning_effort);
let existing_threads = crate::cloud::threads_req::get_threads_app_captured(
&cmd_address_url,
&api_key,
&located_fgroup_id,
&app_searchable_id,
tool_call_id
APP_CAPTURE_ID,
Some(tool_call_id),
).await?;
let thread = if !existing_threads.is_empty() {
info!("There are already existing threads for this tool_id: {:?}", existing_threads);
Expand All @@ -83,14 +85,13 @@ pub async fn subchat(
&located_fgroup_id,
ft_fexp_id,
&format!("subchat_{}", ft_fexp_id),
&tool_call_id,
APP_CAPTURE_ID,
&app_searchable_id,
serde_json::json!({
"tool_call_id": tool_call_id,
"ft_fexp_id": ft_fexp_id,
}),
"tool_call_id": tool_call_id,
"ft_fexp_id": ft_fexp_id,
}),
None,
Some(parent_thread_id)
).await?;
let thread_messages = messages_req::convert_messages_to_thread_messages(
messages, 100, 100, 1, &thread.ft_id, Some(preferences)
Expand All @@ -100,9 +101,9 @@ pub async fn subchat(
).await?;
thread
};

let thread_id = thread.ft_id.clone();
let connection_result = initialize_connection(&cmd_address_url, &api_key, &located_fgroup_id).await;
let connection_result = initialize_connection(&cmd_address_url, &api_key, &located_fgroup_id, &app_searchable_id).await;
let mut connection = match connection_result {
Ok(conn) => conn,
Err(err) => return Err(format!("Failed to initialize WebSocket connection: {}", err)),
Expand All @@ -125,20 +126,31 @@ pub async fn subchat(
"data" => {
if let Some(payload) = response["payload"].as_object() {
let data = &payload["data"];
let threads_in_group = &data["threads_in_group"];
let news_action = threads_in_group["news_action"].as_str().unwrap_or("");
if news_action != "INSERT" && news_action != "UPDATE" {
continue;
}
if let Ok(payload) = serde_json::from_value::<ThreadPayload>(threads_in_group["news_payload"].clone()) {
if payload.ft_id != thread_id {
let threads_and_calls_subs = &data["bot_threads_and_calls_subs"];
let news_action = threads_and_calls_subs["news_action"].as_str().unwrap_or("");
let news_about = threads_and_calls_subs["news_about"].as_str().unwrap_or("");

if news_about == "flexus_thread" {

if news_action != "INSERT" && news_action != "UPDATE" {
continue;
}
if payload.ft_error.is_some() {
break;

if let Some(news_payload_thread) = threads_and_calls_subs["news_payload_thread"].as_object() {
if let Ok(payload) = serde_json::from_value::<ThreadPayload>(serde_json::Value::Object(news_payload_thread.clone())) {
if payload.ft_id != thread_id {
continue;
}
if payload.ft_error.is_some() {
break;
}
} else {
info!("failed to parse thread payload: {:?}", news_payload_thread);
}
} else {
info!("received thread update but couldn't find news_payload_thread");
}
} else {
info!("failed to parse thread payload: {:?}", threads_in_group);

}
} else {
info!("received data message but couldn't find payload");
Expand All @@ -151,7 +163,7 @@ pub async fn subchat(
error!("threads subscription complete: {}.\nRestarting it", text);
}
_ => {
info!("received message with unknown type: {}", text);
warn!("received message with unknown type: {}", text);
}
}
}
Expand All @@ -175,7 +187,7 @@ pub async fn subchat(
return Err(format!("Thread error: {:?}", error));
}
}

let all_thread_messages = messages_req::get_thread_messages(
&cmd_address_url, &api_key, &thread_id, 100
).await?;
Expand Down
52 changes: 39 additions & 13 deletions refact-agent/engine/src/cloud/threads_processing.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
use std::collections::{HashMap, HashSet};
use std::sync::Arc;
use std::time::SystemTime;
use indexmap::IndexMap;
use tokio::sync::RwLock as ARwLock;
use tokio::sync::Mutex as AMutex;
Expand All @@ -10,7 +11,8 @@ use crate::basic_utils::generate_random_hash;
use crate::call_validation::{ChatContent, ChatMessage, ChatToolCall, ContextEnum, ContextFile};
use crate::cloud::messages_req::ThreadMessage;
use crate::cloud::threads_req::{lock_thread, Thread};
use crate::cloud::threads_sub::{BasicStuff, ThreadPayload};
use crate::cloud::threads_sub::{BasicStuff, ThreadMessagePayload, ThreadPayload};
use crate::git::checkpoints::create_workspace_checkpoint;
use crate::global_context::GlobalContext;
use crate::scratchpads::scratchpad_utils::max_tokens_for_rag_chat_by_tools;
use crate::tools::tools_description::{MatchConfirmDeny, MatchConfirmDenyResult, Tool};
Expand Down Expand Up @@ -337,25 +339,13 @@ pub async fn process_thread_event(
basic_info: BasicStuff,
cmd_address_url: String,
api_key: String,
app_searchable_id: String,
located_fgroup_id: String,
) -> Result<(), String> {
if thread_payload.ft_need_tool_calls == -1
|| thread_payload.owner_fuser_id != basic_info.fuser_id
|| !thread_payload.ft_locked_by.is_empty() {
return Ok(());
}
if let Some(ft_app_searchable) = thread_payload.ft_app_searchable.clone() {
if ft_app_searchable != app_searchable_id {
info!("thread `{}` has different `app_searchable` id, skipping it: {} != {}",
thread_payload.ft_id, app_searchable_id, ft_app_searchable
);
return Ok(());
}
} else {
info!("thread `{}` doesn't have the `app_searchable` id, skipping it", thread_payload.ft_id);
return Ok(());
}
if let Some(error) = thread_payload.ft_error.as_ref() {
info!("thread `{}` has the error: `{}`. Skipping it", thread_payload.ft_id, error);
return Ok(());
Expand Down Expand Up @@ -386,6 +376,42 @@ pub async fn process_thread_event(
process_result
}

pub async fn process_thread_message_event(
gcx: Arc<ARwLock<GlobalContext>>,
mut thread_message_payload: ThreadMessagePayload,
cmd_address_url: String,
api_key: String,
) -> Result<(), String> {
let old_message_cutoff = SystemTime::now().duration_since(SystemTime::UNIX_EPOCH).unwrap().as_secs() - 300;
if thread_message_payload.ftm_role != "user" || thread_message_payload.ftm_created_ts < old_message_cutoff as f64 {
return Ok(());
}
if thread_message_payload.ftm_app_specific.as_ref().is_some_and(|a| a.get("checkpoints").is_some()) {
return Ok(());
}

let (checkpoints, _) = create_workspace_checkpoint(gcx.clone(), None, &thread_message_payload.ftm_belongs_to_ft_id).await?;

let mut app_specific = thread_message_payload.ftm_app_specific.as_ref()
.and_then(|v| v.as_object().cloned())
.unwrap_or_else(|| serde_json::Map::new());
app_specific.insert("checkpoints".to_string(), serde_json::json!(checkpoints));
thread_message_payload.ftm_app_specific = Some(Value::Object(app_specific.clone()));

crate::cloud::messages_req::thread_message_patch_app_specific(
&cmd_address_url,
&api_key,
&thread_message_payload.ftm_belongs_to_ft_id,
thread_message_payload.ftm_alt,
thread_message_payload.ftm_num,
Value::Object(app_specific),
).await?;

tracing::info!("Created and stored checkpoints: {:#?}", checkpoints);

Ok(())
}

async fn process_locked_thread(
gcx: Arc<ARwLock<GlobalContext>>,
thread_payload: &ThreadPayload,
Expand Down
Loading
Loading