Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 22 additions & 1 deletion crates/openfang-channels/src/bridge.rs
Original file line number Diff line number Diff line change
Expand Up @@ -434,6 +434,25 @@ fn default_output_format_for_channel(channel_type: &str) -> OutputFormat {
}
}

async fn send_immediate_ack_if_needed(
adapter: &dyn ChannelAdapter,
user: &ChannelUser,
channel_type: &str,
thread_id: Option<&str>,
output_format: OutputFormat,
) {
if channel_type == "dingtalk_stream" && thread_id.is_some() {
send_response(
adapter,
user,
"Received, please wait.".to_string(),
thread_id,
output_format,
)
.await;
}
}

/// Send a lifecycle reaction (best-effort, non-blocking for supported adapters).
///
/// Silently ignores errors — reactions are non-critical UX polish.
Expand Down Expand Up @@ -548,7 +567,7 @@ async fn dispatch_message(
.as_ref()
.map(|o| o.lifecycle_reactions)
.unwrap_or(true);
let thread_id = if threading_enabled {
let thread_id = if threading_enabled || ct_str == "dingtalk_stream" {
message.thread_id.as_deref()
} else {
None
Expand Down Expand Up @@ -620,6 +639,8 @@ async fn dispatch_message(
return;
}

send_immediate_ack_if_needed(adapter, &message.sender, ct_str, thread_id, output_format).await;

// For images: download, base64 encode, and send as multimodal content blocks
if let ChannelContent::Image {
ref url,
Expand Down
282 changes: 229 additions & 53 deletions crates/openfang-channels/src/dingtalk_stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,8 @@
//! 1. POST /v1.0/oauth2/accessToken → get access token
//! 2. POST /v1.0/gateway/connections/open → get WebSocket URL
//! 3. Connect via WebSocket, handle ping/pong and EVENT messages
//! 4. Outbound: POST /v1.0/robot/oToMessages/batchSend
//! 4. Outbound replies: POST to callback `sessionWebhook`
//! 5. Proactive outbound: POST /v1.0/robot/oToMessages/batchSend

use crate::types::{
split_message, ChannelAdapter, ChannelContent, ChannelMessage, ChannelType, ChannelUser,
Expand Down Expand Up @@ -148,6 +149,55 @@ impl DingTalkStreamAdapter {
}
Ok(())
}

async fn send_to_session(
&self,
session_webhook: &str,
content: ChannelContent,
mention_user_id: Option<&str>,
) -> Result<(), Box<dyn std::error::Error>> {
if session_webhook.is_empty() {
return Err("DingTalk Stream: missing session webhook".into());
}

let text = match &content {
ChannelContent::Text(t) => t.as_str(),
_ => "(unsupported)",
};
let chunks = split_message(text, MAX_MESSAGE_LEN);

for (idx, chunk) in chunks.iter().enumerate() {
let body = session_text_body(
match &content {
ChannelContent::Text(_) => chunk,
_ => "(unsupported content type)",
},
if idx == 0 { mention_user_id } else { None },
);

let resp = self.client.post(session_webhook).json(&body).send().await?;
if !resp.status().is_success() {
let status = resp.status();
let err_body = resp.text().await.unwrap_or_default();
return Err(format!("DingTalk session send error {status}: {err_body}").into());
}

let result: serde_json::Value = resp.json().await?;
if result["errcode"].as_i64().unwrap_or(0) != 0 {
return Err(format!(
"DingTalk session send error: {}",
result["errmsg"].as_str().unwrap_or("unknown")
)
.into());
}

if chunks.len() > 1 {
tokio::time::sleep(Duration::from_millis(200)).await;
}
}

Ok(())
}
}

#[async_trait]
Expand Down Expand Up @@ -272,6 +322,16 @@ impl ChannelAdapter for DingTalkStreamAdapter {
self.send_to_ids(&[uid.as_str()], content).await
}

async fn send_in_thread(
&self,
user: &ChannelUser,
content: ChannelContent,
thread_id: &str,
) -> Result<(), Box<dyn std::error::Error>> {
self.send_to_session(thread_id, content, Some(user.platform_id.as_str()))
.await
}

async fn send_typing(&self, _user: &ChannelUser) -> Result<(), Box<dyn std::error::Error>> {
Ok(())
}
Expand Down Expand Up @@ -454,7 +514,11 @@ struct CallbackPayload {
conversation_id: String,
#[serde(rename = "conversationType", default)]
conversation_type: String,
#[serde(rename = "messageId", default)]
#[serde(rename = "sessionWebhook", default)]
session_webhook: String,
#[serde(rename = "isInAtList", default)]
is_in_at_list: bool,
#[serde(rename = "msgId", alias = "messageId", default)]
message_id: String,
}

Expand All @@ -463,6 +527,89 @@ struct TextContent {
content: String,
}

fn session_text_body(content: &str, mention_user_id: Option<&str>) -> serde_json::Value {
let mut body = serde_json::json!({
"msgtype": "text",
"text": { "content": content },
});

if let Some(user_id) = mention_user_id.filter(|id| !id.is_empty()) {
body["at"] = serde_json::json!({
"atUserIds": [user_id],
});
}

body
}

fn callback_to_message(cb: CallbackPayload) -> Option<ChannelMessage> {
if cb.msg_type != "text" {
return None;
}

let text = cb.text?.content.trim().to_string();
if text.is_empty() {
return None;
}

let content = if text.starts_with('/') {
let parts: Vec<&str> = text.splitn(2, ' ').collect();
let cmd = parts[0].trim_start_matches('/');
let args: Vec<String> = parts
.get(1)
.map(|a| a.split_whitespace().map(String::from).collect())
.unwrap_or_default();
ChannelContent::Command {
name: cmd.to_string(),
args,
}
} else {
ChannelContent::Text(text)
};

let sender_user_id = if cb.sender_staff_id.is_empty() {
cb.sender_id
} else {
cb.sender_staff_id
};

let is_group = cb.conversation_type == "2";
let thread_id = if cb.session_webhook.is_empty() {
None
} else {
Some(cb.session_webhook.clone())
};

let mut metadata = HashMap::new();
metadata.insert(
"conversation_id".to_string(),
serde_json::Value::String(cb.conversation_id),
);
metadata.insert(
"sender_user_id".to_string(),
serde_json::Value::String(sender_user_id.clone()),
);
if is_group && cb.is_in_at_list {
metadata.insert("was_mentioned".to_string(), serde_json::Value::Bool(true));
}

Some(ChannelMessage {
channel: ChannelType::Custom("dingtalk_stream".to_string()),
platform_message_id: cb.message_id,
sender: ChannelUser {
platform_id: sender_user_id,
display_name: cb.sender_nick,
openfang_user: None,
},
content,
target_agent: None,
timestamp: Utc::now(),
is_group,
thread_id,
metadata,
})
}

async fn handle_frame<S>(text: &str, sink: &mut S, tx: &mpsc::Sender<ChannelMessage>)
where
S: SinkExt<Message> + Unpin,
Expand Down Expand Up @@ -493,57 +640,9 @@ where
});

if let Some(cb) = cb {
if cb.msg_type == "text" {
if let Some(ref tc) = cb.text {
let trimmed = tc.content.trim().to_string();
if !trimmed.is_empty() {
let content = if trimmed.starts_with('/') {
let parts: Vec<&str> = trimmed.splitn(2, ' ').collect();
let cmd = parts[0].trim_start_matches('/');
let args: Vec<String> = parts
.get(1)
.map(|a| a.split_whitespace().map(String::from).collect())
.unwrap_or_default();
ChannelContent::Command {
name: cmd.to_string(),
args,
}
} else {
ChannelContent::Text(trimmed)
};

let mut meta = HashMap::new();
meta.insert(
"conversation_id".to_string(),
serde_json::Value::String(cb.conversation_id),
);

let uid = if cb.sender_staff_id.is_empty() {
cb.sender_id
} else {
cb.sender_staff_id
};

let msg = ChannelMessage {
channel: ChannelType::Custom("dingtalk_stream".to_string()),
platform_message_id: cb.message_id,
sender: ChannelUser {
platform_id: uid,
display_name: cb.sender_nick,
openfang_user: None,
},
content,
target_agent: None,
timestamp: Utc::now(),
is_group: cb.conversation_type == "2",
thread_id: None,
metadata: meta,
};

if tx.send(msg).await.is_err() {
error!("DingTalk Stream: channel receiver dropped");
}
}
if let Some(msg) = callback_to_message(cb) {
if tx.send(msg).await.is_err() {
error!("DingTalk Stream: channel receiver dropped");
}
}
}
Expand Down Expand Up @@ -597,4 +696,81 @@ mod tests {
assert_eq!(v["code"], 200);
assert_eq!(v["headers"]["messageId"], "msg1");
}

#[test]
fn session_text_body_with_mention() {
let body = session_text_body("Received, please wait.", Some("staff123"));
assert_eq!(body["msgtype"], "text");
assert_eq!(body["text"]["content"], "Received, please wait.");
assert_eq!(body["at"]["atUserIds"][0], "staff123");
}

#[test]
fn session_text_body_without_mention() {
let body = session_text_body("hello", None);
assert_eq!(body["msgtype"], "text");
assert_eq!(body["text"]["content"], "hello");
assert!(body.get("at").is_none());
}

#[test]
fn callback_to_message_group_reply_uses_session_webhook() {
let msg = callback_to_message(CallbackPayload {
msg_type: "text".to_string(),
text: Some(TextContent {
content: "@bot hello".to_string(),
}),
sender_staff_id: "staff123".to_string(),
sender_id: "union123".to_string(),
sender_nick: "Alice".to_string(),
conversation_id: "cid123".to_string(),
conversation_type: "2".to_string(),
session_webhook: "https://example.com/session".to_string(),
is_in_at_list: true,
message_id: "msg123".to_string(),
})
.unwrap();

assert_eq!(msg.sender.platform_id, "staff123");
assert_eq!(
msg.thread_id.as_deref(),
Some("https://example.com/session")
);
assert!(msg.is_group);
assert_eq!(
msg.metadata.get("sender_user_id").and_then(|v| v.as_str()),
Some("staff123")
);
assert_eq!(
msg.metadata.get("was_mentioned").and_then(|v| v.as_bool()),
Some(true)
);
}

#[test]
fn callback_to_message_dm_falls_back_to_sender_id() {
let msg = callback_to_message(CallbackPayload {
msg_type: "text".to_string(),
text: Some(TextContent {
content: "hello".to_string(),
}),
sender_staff_id: String::new(),
sender_id: "union123".to_string(),
sender_nick: "Bob".to_string(),
conversation_id: "cid456".to_string(),
conversation_type: "1".to_string(),
session_webhook: "https://example.com/session-dm".to_string(),
is_in_at_list: false,
message_id: "msg456".to_string(),
})
.unwrap();

assert_eq!(msg.sender.platform_id, "union123");
assert_eq!(
msg.thread_id.as_deref(),
Some("https://example.com/session-dm")
);
assert!(!msg.is_group);
assert!(!msg.metadata.contains_key("was_mentioned"));
}
}