Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -2,3 +2,5 @@
config.toml
*.swp
.DS_Store
.env
.kiro/
9 changes: 8 additions & 1 deletion charts/openab/templates/configmap.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,13 @@ data:
{{- if $cfg.discord.trustedBotIds }}
trusted_bot_ids = {{ $cfg.discord.trustedBotIds | toJson }}
{{- end }}
{{- /* allowUserMessages: controls whether the bot requires @mention in threads (Discord) */ -}}
{{- if $cfg.discord.allowUserMessages }}
{{- if not (has $cfg.discord.allowUserMessages (list "involved" "mentions")) }}
{{- fail (printf "agents.%s.discord.allowUserMessages must be one of: involved, mentions — got: %s" $name $cfg.discord.allowUserMessages) }}
Comment thread
thepagent marked this conversation as resolved.
{{- end }}
allow_user_messages = {{ $cfg.discord.allowUserMessages | toJson }} {{- /* involved (default): respond in bot's threads without @mention | mentions: always require @mention */ -}}
{{- end }}
{{- end }}

{{- if and ($cfg.slack).enabled }}
Expand Down Expand Up @@ -73,7 +80,7 @@ data:
{{- if not (has ($cfg.slack).allowUserMessages (list "involved" "mentions")) }}
{{- fail (printf "agents.%s.slack.allowUserMessages must be one of: involved, mentions — got: %s" $name ($cfg.slack).allowUserMessages) }}
{{- end }}
allow_user_messages = {{ ($cfg.slack).allowUserMessages | toJson }}
allow_user_messages = {{ ($cfg.slack).allowUserMessages | toJson }} {{- /* involved (default): respond in bot's threads without @mention | mentions: always require @mention */ -}}
{{- end }}
{{- end }}

Expand Down
125 changes: 65 additions & 60 deletions src/adapter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ use anyhow::Result;
use async_trait::async_trait;
use serde::Serialize;
use std::sync::Arc;
use tokio::sync::watch;
use tracing::error;

use crate::acp::{classify_notification, AcpEvent, ContentBlock, SessionPool};
Expand Down Expand Up @@ -41,6 +40,10 @@ pub struct SenderContext {
pub display_name: String,
pub channel: String,
pub channel_id: String,
/// Thread identifier, if the message is inside a thread.
/// Slack: thread_ts. Discord: None (threads are separate channels).
#[serde(skip_serializing_if = "Option::is_none")]
pub thread_id: Option<String>,
pub is_bot: bool,
}

Expand All @@ -57,9 +60,6 @@ pub trait ChatAdapter: Send + Sync + 'static {
/// Send a new message, returns a reference to the sent message.
async fn send_message(&self, channel: &ChannelRef, content: &str) -> Result<MessageRef>;

/// Edit an existing message in-place.
async fn edit_message(&self, msg: &MessageRef, content: &str) -> Result<()>;

/// Create a thread from a trigger message, returns the thread channel ref.
async fn create_thread(
&self,
Expand All @@ -73,6 +73,17 @@ pub trait ChatAdapter: Send + Sync + 'static {

/// Remove a reaction/emoji from a message.
async fn remove_reaction(&self, msg: &MessageRef, emoji: &str) -> Result<()>;

/// Edit an existing message in-place (for streaming updates).
/// Default: unsupported (send-once only).
async fn edit_message(&self, _msg: &MessageRef, _content: &str) -> Result<()> {
Err(anyhow::anyhow!("edit_message not supported"))
}

/// Whether this adapter should use streaming edit (true) or send-once (false).
Comment thread
thepagent marked this conversation as resolved.
fn use_streaming(&self) -> bool {
false
}
}

// --- AdapterRouter ---
Expand Down Expand Up @@ -130,8 +141,6 @@ impl AdapterRouter {
}
}

let thinking_msg = adapter.send_message(thread_channel, "...").await?;

let thread_key = format!(
"{}:{}",
adapter.platform(),
Expand All @@ -144,7 +153,7 @@ impl AdapterRouter {
if let Err(e) = self.pool.get_or_create(&thread_key).await {
let msg = format_user_error(&e.to_string());
let _ = adapter
.edit_message(&thinking_msg, &format!("⚠️ {msg}"))
.send_message(thread_channel, &format!("⚠️ {msg}"))
.await;
error!("pool error: {e}");
return Err(e);
Expand All @@ -165,7 +174,6 @@ impl AdapterRouter {
&thread_key,
content_blocks,
thread_channel,
&thinking_msg,
reactions.clone(),
)
.await;
Expand All @@ -190,7 +198,7 @@ impl AdapterRouter {

if let Err(ref e) = result {
let _ = adapter
.edit_message(&thinking_msg, &format!("⚠️ {e}"))
.send_message(thread_channel, &format!("⚠️ {e}"))
.await;
}

Expand All @@ -203,13 +211,12 @@ impl AdapterRouter {
thread_key: &str,
content_blocks: Vec<ContentBlock>,
thread_channel: &ChannelRef,
thinking_msg: &MessageRef,
reactions: Arc<StatusReactionController>,
) -> Result<()> {
let adapter = adapter.clone();
let thread_channel = thread_channel.clone();
let msg_ref = thinking_msg.clone();
let message_limit = adapter.message_limit();
let streaming = adapter.use_streaming();

self.pool
.with_connection(thread_key, |conn| {
Expand All @@ -221,57 +228,51 @@ impl AdapterRouter {
let (mut rx, _) = conn.session_prompt(content_blocks).await?;
reactions.set_thinking().await;

let initial = if reset {
"⚠️ _Session expired, starting fresh..._\n\n...".to_string()
} else {
"...".to_string()
};
let (buf_tx, buf_rx) = watch::channel(initial);

let mut text_buf = String::new();
let mut tool_lines: Vec<ToolEntry> = Vec::new();

if reset {
text_buf.push_str("⚠️ _Session expired, starting fresh..._\n\n");
}

// Spawn edit-streaming task — only edits the single message, never sends new ones.
// Long content is truncated during streaming; final multi-message split happens after.
let streaming_limit = message_limit.saturating_sub(100);
let edit_handle = {
let adapter = adapter.clone();
let msg_ref = msg_ref.clone();
let mut buf_rx = buf_rx.clone();
// Streaming edit: send placeholder, spawn edit loop
let (buf_tx, placeholder_msg) = if streaming {
let initial = if reset {
"⚠️ _Session expired, starting fresh..._\n\n…".to_string()
} else {
"…".to_string()
};
let msg = adapter.send_message(&thread_channel, &initial).await?;
let (tx, rx) = tokio::sync::watch::channel(initial);
let edit_adapter = adapter.clone();
let edit_msg = msg.clone();
Comment thread
thepagent marked this conversation as resolved.
let limit = message_limit;
let mut buf_rx = rx;
tokio::spawn(async move {
let mut last_content = String::new();
let mut last = String::new();
loop {
tokio::time::sleep(std::time::Duration::from_millis(1500)).await;
if buf_rx.has_changed().unwrap_or(false) {
let content = buf_rx.borrow_and_update().clone();
if content != last_content {
let display = if content.chars().count() > streaming_limit {
// Tail-priority: keep the last N chars so user
// sees the most recent agent output
let total = content.chars().count();
let skip = total - streaming_limit;
let truncated: String = content.chars().skip(skip).collect();
format!("…(truncated)\n{truncated}")
if content != last {
let display = if content.chars().count() > limit - 100 {
format!("…{}", format::truncate_chars_tail(&content, limit - 100))
} else {
content.clone()
};
let _ = adapter.edit_message(&msg_ref, &display).await;
last_content = content;
let _ = edit_adapter.edit_message(&edit_msg, &display).await;
last = content;
}
}
if buf_rx.has_changed().is_err() {
break;
}
if buf_rx.has_changed().is_err() { break; }
}
})
});
(Some(tx), Some(msg))
} else {
(None, None)
};

// Process ACP notifications
let mut got_first_text = false;
let mut response_error: Option<String> = None;
while let Some(notification) = rx.recv().await {
if notification.id.is_some() {
Expand All @@ -284,12 +285,10 @@ impl AdapterRouter {
if let Some(event) = classify_notification(&notification) {
match event {
AcpEvent::Text(t) => {
if !got_first_text {
got_first_text = true;
}
text_buf.push_str(&t);
let _ =
buf_tx.send(compose_display(&tool_lines, &text_buf, true));
if let Some(tx) = &buf_tx {
let _ = tx.send(compose_display(&tool_lines, &text_buf, true));
}
}
AcpEvent::Thinking => {
reactions.set_thinking().await;
Expand All @@ -307,8 +306,9 @@ impl AdapterRouter {
state: ToolState::Running,
});
}
let _ =
buf_tx.send(compose_display(&tool_lines, &text_buf, true));
if let Some(tx) = &buf_tx {
let _ = tx.send(compose_display(&tool_lines, &text_buf, true));
}
}
AcpEvent::ToolDone { id, title, status } => {
reactions.set_thinking().await;
Expand All @@ -329,19 +329,20 @@ impl AdapterRouter {
state: new_state,
});
}
let _ =
buf_tx.send(compose_display(&tool_lines, &text_buf, true));
if let Some(tx) = &buf_tx {
let _ = tx.send(compose_display(&tool_lines, &text_buf, true));
}
}
_ => {}
}
}
}

conn.prompt_done().await;
// Stop the edit loop
drop(buf_tx);
let _ = edit_handle.await;

// Final edit with complete content
// Build final content
let final_content = compose_display(&tool_lines, &text_buf, false);
let final_content = if final_content.is_empty() {
if let Some(err) = response_error {
Expand All @@ -356,14 +357,18 @@ impl AdapterRouter {
};

let chunks = format::split_message(&final_content, message_limit);
let mut current_msg = msg_ref;
for (i, chunk) in chunks.iter().enumerate() {
if i == 0 {
let _ = adapter.edit_message(&current_msg, chunk).await;
} else if let Ok(new_msg) =
adapter.send_message(&thread_channel, chunk).await
{
current_msg = new_msg;
if let Some(msg) = placeholder_msg {
// Streaming: edit first chunk into placeholder, send rest as new messages
if let Some(first) = chunks.first() {
let _ = adapter.edit_message(&msg, first).await;
}
for chunk in chunks.iter().skip(1) {
let _ = adapter.send_message(&thread_channel, chunk).await;
}
} else {
// Send-once: all chunks as new messages
for chunk in &chunks {
let _ = adapter.send_message(&thread_channel, chunk).await;
}
}

Expand Down
22 changes: 3 additions & 19 deletions src/discord.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ use crate::format;
use crate::media;
use async_trait::async_trait;
use std::sync::LazyLock;
use serenity::builder::{CreateThread, EditMessage};
use serenity::builder::CreateThread;
use serenity::http::Http;
use serenity::model::channel::{AutoArchiveDuration, Message, ReactionType};
use serenity::model::gateway::Ready;
Expand Down Expand Up @@ -54,19 +54,6 @@ impl ChatAdapter for DiscordAdapter {
})
}

async fn edit_message(&self, msg: &MessageRef, content: &str) -> anyhow::Result<()> {
let ch_id: u64 = msg.channel.channel_id.parse()?;
let msg_id: u64 = msg.message_id.parse()?;
ChannelId::new(ch_id)
.edit_message(
&self.http,
MessageId::new(msg_id),
EditMessage::new().content(content),
)
.await?;
Ok(())
}

async fn create_thread(
&self,
channel: &ChannelRef,
Expand Down Expand Up @@ -216,11 +203,7 @@ impl EventHandler for Handler {
self.allowed_channels.is_empty() || self.allowed_channels.contains(&channel_id);

let is_mentioned = msg.mentions_user_id(bot_id)
|| msg.content.contains(&format!("<@{}>", bot_id))
|| msg
.mention_roles
.iter()
.any(|r| msg.content.contains(&format!("<@&{}>", r)));
|| msg.content.contains(&format!("<@{}>", bot_id));
Comment thread
thepagent marked this conversation as resolved.

// Bot message gating (from upstream #321)
if msg.author.bot {
Expand Down Expand Up @@ -361,6 +344,7 @@ impl EventHandler for Handler {
display_name: display_name.to_string(),
channel: "discord".into(),
channel_id: msg.channel_id.to_string(),
thread_id: None,
is_bot: msg.author.bot,
};

Expand Down
10 changes: 10 additions & 0 deletions src/format.rs
Original file line number Diff line number Diff line change
Expand Up @@ -60,3 +60,13 @@ pub fn shorten_thread_name(prompt: &str) -> String {
}
}


/// Truncate a string to at most `limit` Unicode characters, keeping the tail
/// (most recent output) for better streaming UX.
pub fn truncate_chars_tail(s: &str, limit: usize) -> String {
let total = s.chars().count();
if total <= limit {
return s.to_string();
}
s.chars().skip(total - limit).collect()
}
Loading
Loading