diff --git a/crates/openfang-channels/src/bridge.rs b/crates/openfang-channels/src/bridge.rs index ffd908e9a..a1ceb4fbd 100644 --- a/crates/openfang-channels/src/bridge.rs +++ b/crates/openfang-channels/src/bridge.rs @@ -434,6 +434,25 @@ fn default_output_format_for_channel(channel_type: &str) -> OutputFormat { } } +async fn send_immediate_ack_if_needed( + adapter: &dyn ChannelAdapter, + user: &ChannelUser, + channel_type: &str, + thread_id: Option<&str>, + output_format: OutputFormat, +) { + if channel_type == "dingtalk_stream" && thread_id.is_some() { + send_response( + adapter, + user, + "Received, please wait.".to_string(), + thread_id, + output_format, + ) + .await; + } +} + /// Send a lifecycle reaction (best-effort, non-blocking for supported adapters). /// /// Silently ignores errors — reactions are non-critical UX polish. @@ -548,7 +567,7 @@ async fn dispatch_message( .as_ref() .map(|o| o.lifecycle_reactions) .unwrap_or(true); - let thread_id = if threading_enabled { + let thread_id = if threading_enabled || ct_str == "dingtalk_stream" { message.thread_id.as_deref() } else { None @@ -620,6 +639,8 @@ async fn dispatch_message( return; } + send_immediate_ack_if_needed(adapter, &message.sender, ct_str, thread_id, output_format).await; + // For images: download, base64 encode, and send as multimodal content blocks if let ChannelContent::Image { ref url, diff --git a/crates/openfang-channels/src/dingtalk_stream.rs b/crates/openfang-channels/src/dingtalk_stream.rs index dd854b431..f7a4ac0be 100644 --- a/crates/openfang-channels/src/dingtalk_stream.rs +++ b/crates/openfang-channels/src/dingtalk_stream.rs @@ -8,7 +8,8 @@ //! 1. POST /v1.0/oauth2/accessToken → get access token //! 2. POST /v1.0/gateway/connections/open → get WebSocket URL //! 3. Connect via WebSocket, handle ping/pong and EVENT messages -//! 4. Outbound: POST /v1.0/robot/oToMessages/batchSend +//! 4. Outbound replies: POST to callback `sessionWebhook` +//! 5. Proactive outbound: POST /v1.0/robot/oToMessages/batchSend use crate::types::{ split_message, ChannelAdapter, ChannelContent, ChannelMessage, ChannelType, ChannelUser, @@ -148,6 +149,55 @@ impl DingTalkStreamAdapter { } Ok(()) } + + async fn send_to_session( + &self, + session_webhook: &str, + content: ChannelContent, + mention_user_id: Option<&str>, + ) -> Result<(), Box> { + if session_webhook.is_empty() { + return Err("DingTalk Stream: missing session webhook".into()); + } + + let text = match &content { + ChannelContent::Text(t) => t.as_str(), + _ => "(unsupported)", + }; + let chunks = split_message(text, MAX_MESSAGE_LEN); + + for (idx, chunk) in chunks.iter().enumerate() { + let body = session_text_body( + match &content { + ChannelContent::Text(_) => chunk, + _ => "(unsupported content type)", + }, + if idx == 0 { mention_user_id } else { None }, + ); + + let resp = self.client.post(session_webhook).json(&body).send().await?; + if !resp.status().is_success() { + let status = resp.status(); + let err_body = resp.text().await.unwrap_or_default(); + return Err(format!("DingTalk session send error {status}: {err_body}").into()); + } + + let result: serde_json::Value = resp.json().await?; + if result["errcode"].as_i64().unwrap_or(0) != 0 { + return Err(format!( + "DingTalk session send error: {}", + result["errmsg"].as_str().unwrap_or("unknown") + ) + .into()); + } + + if chunks.len() > 1 { + tokio::time::sleep(Duration::from_millis(200)).await; + } + } + + Ok(()) + } } #[async_trait] @@ -272,6 +322,16 @@ impl ChannelAdapter for DingTalkStreamAdapter { self.send_to_ids(&[uid.as_str()], content).await } + async fn send_in_thread( + &self, + user: &ChannelUser, + content: ChannelContent, + thread_id: &str, + ) -> Result<(), Box> { + self.send_to_session(thread_id, content, Some(user.platform_id.as_str())) + .await + } + async fn send_typing(&self, _user: &ChannelUser) -> Result<(), Box> { Ok(()) } @@ -454,7 +514,11 @@ struct CallbackPayload { conversation_id: String, #[serde(rename = "conversationType", default)] conversation_type: String, - #[serde(rename = "messageId", default)] + #[serde(rename = "sessionWebhook", default)] + session_webhook: String, + #[serde(rename = "isInAtList", default)] + is_in_at_list: bool, + #[serde(rename = "msgId", alias = "messageId", default)] message_id: String, } @@ -463,6 +527,89 @@ struct TextContent { content: String, } +fn session_text_body(content: &str, mention_user_id: Option<&str>) -> serde_json::Value { + let mut body = serde_json::json!({ + "msgtype": "text", + "text": { "content": content }, + }); + + if let Some(user_id) = mention_user_id.filter(|id| !id.is_empty()) { + body["at"] = serde_json::json!({ + "atUserIds": [user_id], + }); + } + + body +} + +fn callback_to_message(cb: CallbackPayload) -> Option { + if cb.msg_type != "text" { + return None; + } + + let text = cb.text?.content.trim().to_string(); + if text.is_empty() { + return None; + } + + let content = if text.starts_with('/') { + let parts: Vec<&str> = text.splitn(2, ' ').collect(); + let cmd = parts[0].trim_start_matches('/'); + let args: Vec = parts + .get(1) + .map(|a| a.split_whitespace().map(String::from).collect()) + .unwrap_or_default(); + ChannelContent::Command { + name: cmd.to_string(), + args, + } + } else { + ChannelContent::Text(text) + }; + + let sender_user_id = if cb.sender_staff_id.is_empty() { + cb.sender_id + } else { + cb.sender_staff_id + }; + + let is_group = cb.conversation_type == "2"; + let thread_id = if cb.session_webhook.is_empty() { + None + } else { + Some(cb.session_webhook.clone()) + }; + + let mut metadata = HashMap::new(); + metadata.insert( + "conversation_id".to_string(), + serde_json::Value::String(cb.conversation_id), + ); + metadata.insert( + "sender_user_id".to_string(), + serde_json::Value::String(sender_user_id.clone()), + ); + if is_group && cb.is_in_at_list { + metadata.insert("was_mentioned".to_string(), serde_json::Value::Bool(true)); + } + + Some(ChannelMessage { + channel: ChannelType::Custom("dingtalk_stream".to_string()), + platform_message_id: cb.message_id, + sender: ChannelUser { + platform_id: sender_user_id, + display_name: cb.sender_nick, + openfang_user: None, + }, + content, + target_agent: None, + timestamp: Utc::now(), + is_group, + thread_id, + metadata, + }) +} + async fn handle_frame(text: &str, sink: &mut S, tx: &mpsc::Sender) where S: SinkExt + Unpin, @@ -493,57 +640,9 @@ where }); if let Some(cb) = cb { - if cb.msg_type == "text" { - if let Some(ref tc) = cb.text { - let trimmed = tc.content.trim().to_string(); - if !trimmed.is_empty() { - let content = if trimmed.starts_with('/') { - let parts: Vec<&str> = trimmed.splitn(2, ' ').collect(); - let cmd = parts[0].trim_start_matches('/'); - let args: Vec = parts - .get(1) - .map(|a| a.split_whitespace().map(String::from).collect()) - .unwrap_or_default(); - ChannelContent::Command { - name: cmd.to_string(), - args, - } - } else { - ChannelContent::Text(trimmed) - }; - - let mut meta = HashMap::new(); - meta.insert( - "conversation_id".to_string(), - serde_json::Value::String(cb.conversation_id), - ); - - let uid = if cb.sender_staff_id.is_empty() { - cb.sender_id - } else { - cb.sender_staff_id - }; - - let msg = ChannelMessage { - channel: ChannelType::Custom("dingtalk_stream".to_string()), - platform_message_id: cb.message_id, - sender: ChannelUser { - platform_id: uid, - display_name: cb.sender_nick, - openfang_user: None, - }, - content, - target_agent: None, - timestamp: Utc::now(), - is_group: cb.conversation_type == "2", - thread_id: None, - metadata: meta, - }; - - if tx.send(msg).await.is_err() { - error!("DingTalk Stream: channel receiver dropped"); - } - } + if let Some(msg) = callback_to_message(cb) { + if tx.send(msg).await.is_err() { + error!("DingTalk Stream: channel receiver dropped"); } } } @@ -597,4 +696,81 @@ mod tests { assert_eq!(v["code"], 200); assert_eq!(v["headers"]["messageId"], "msg1"); } + + #[test] + fn session_text_body_with_mention() { + let body = session_text_body("Received, please wait.", Some("staff123")); + assert_eq!(body["msgtype"], "text"); + assert_eq!(body["text"]["content"], "Received, please wait."); + assert_eq!(body["at"]["atUserIds"][0], "staff123"); + } + + #[test] + fn session_text_body_without_mention() { + let body = session_text_body("hello", None); + assert_eq!(body["msgtype"], "text"); + assert_eq!(body["text"]["content"], "hello"); + assert!(body.get("at").is_none()); + } + + #[test] + fn callback_to_message_group_reply_uses_session_webhook() { + let msg = callback_to_message(CallbackPayload { + msg_type: "text".to_string(), + text: Some(TextContent { + content: "@bot hello".to_string(), + }), + sender_staff_id: "staff123".to_string(), + sender_id: "union123".to_string(), + sender_nick: "Alice".to_string(), + conversation_id: "cid123".to_string(), + conversation_type: "2".to_string(), + session_webhook: "https://example.com/session".to_string(), + is_in_at_list: true, + message_id: "msg123".to_string(), + }) + .unwrap(); + + assert_eq!(msg.sender.platform_id, "staff123"); + assert_eq!( + msg.thread_id.as_deref(), + Some("https://example.com/session") + ); + assert!(msg.is_group); + assert_eq!( + msg.metadata.get("sender_user_id").and_then(|v| v.as_str()), + Some("staff123") + ); + assert_eq!( + msg.metadata.get("was_mentioned").and_then(|v| v.as_bool()), + Some(true) + ); + } + + #[test] + fn callback_to_message_dm_falls_back_to_sender_id() { + let msg = callback_to_message(CallbackPayload { + msg_type: "text".to_string(), + text: Some(TextContent { + content: "hello".to_string(), + }), + sender_staff_id: String::new(), + sender_id: "union123".to_string(), + sender_nick: "Bob".to_string(), + conversation_id: "cid456".to_string(), + conversation_type: "1".to_string(), + session_webhook: "https://example.com/session-dm".to_string(), + is_in_at_list: false, + message_id: "msg456".to_string(), + }) + .unwrap(); + + assert_eq!(msg.sender.platform_id, "union123"); + assert_eq!( + msg.thread_id.as_deref(), + Some("https://example.com/session-dm") + ); + assert!(!msg.is_group); + assert!(!msg.metadata.contains_key("was_mentioned")); + } }