diff --git a/Cargo.lock b/Cargo.lock index 0888e70..5e0a4a1 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -78,6 +78,7 @@ dependencies = [ "chrono", "clap", "serde_json", + "tokio", ] [[package]] diff --git a/crates/agentic-comm-cli/Cargo.toml b/crates/agentic-comm-cli/Cargo.toml index 20772af..2e19082 100644 --- a/crates/agentic-comm-cli/Cargo.toml +++ b/crates/agentic-comm-cli/Cargo.toml @@ -16,3 +16,4 @@ agentic-comm = { workspace = true } clap = { workspace = true } serde_json = { workspace = true } chrono = { workspace = true } +tokio = { workspace = true } diff --git a/crates/agentic-comm-cli/src/main.rs b/crates/agentic-comm-cli/src/main.rs index 059260d..eed8793 100644 --- a/crates/agentic-comm-cli/src/main.rs +++ b/crates/agentic-comm-cli/src/main.rs @@ -1,6 +1,9 @@ //! CLI for agentic-comm: agent communication, channels, pub/sub. -use std::path::PathBuf; +use std::{ + path::{Path, PathBuf}, + time::{Duration, SystemTime}, +}; use agentic_comm::{ AuditEntry, AuditEventType, ChannelConfig, ChannelType, CollectiveDecisionMode, CommStore, @@ -8,6 +11,12 @@ use agentic_comm::{ HiveRole, MessageFilter, MessageType, TemporalTarget, WorkspaceRole, }; use clap::{Parser, Subcommand}; +use tokio::{ + io::AsyncWriteExt, + net::TcpListener, + sync::broadcast, + time::{self, MissedTickBehavior}, +}; /// Default store file path. fn default_store_path() -> PathBuf { @@ -99,6 +108,11 @@ enum Commands { #[command(subcommand)] action: RecvAction, }, + /// Chat subcommands (poll, send) + Chat { + #[command(subcommand)] + action: ChatAction, + }, /// Query subcommands (messages, channels, relationships, echoes, conversations) Query { #[command(subcommand)] @@ -390,6 +404,38 @@ enum RecvAction { }, } +// --------------------------------------------------------------------------- +// Chat subcommands +// --------------------------------------------------------------------------- + +#[derive(Subcommand)] +enum ChatAction { + /// Poll messages from a channel in one shot + Poll { + /// Channel ID + #[arg(long)] + channel: u64, + /// Optional Unix timestamp (seconds) lower bound + #[arg(long)] + since: Option, + /// Maximum messages to return + #[arg(long, default_value = "50")] + limit: usize, + }, + /// Send one message to a channel + Send { + /// Channel ID + #[arg(long)] + channel: u64, + /// Message payload/content + #[arg(long)] + message: String, + /// Sender identifier + #[arg(long)] + sender: String, + }, +} + // --------------------------------------------------------------------------- // Query subcommands // --------------------------------------------------------------------------- @@ -1118,6 +1164,142 @@ fn output(value: &serde_json::Value, json_mode: bool) { } } +struct PidFileGuard { + path: PathBuf, +} + +impl PidFileGuard { + fn new(path: PathBuf) -> Self { + Self { path } + } +} + +impl Drop for PidFileGuard { + fn drop(&mut self) { + let _ = std::fs::remove_file(&self.path); + } +} + +fn store_mtime(path: &Path) -> Option { + std::fs::metadata(path).ok()?.modified().ok() +} + +fn current_max_message_id(path: &Path) -> u64 { + if !path.exists() { + return 0; + } + + CommStore::load(path) + .map(|store| store.messages.keys().copied().max().unwrap_or(0)) + .unwrap_or(0) +} + +fn collect_new_daemon_messages(path: &Path, last_seen_message_id: &mut u64) -> Vec { + if !path.exists() { + return Vec::new(); + } + + let store = match CommStore::load(path) { + Ok(store) => store, + Err(e) => { + eprintln!("Warning: daemon could not load store {}: {e}", path.display()); + return Vec::new(); + } + }; + + let mut messages: Vec<_> = store.messages.values().cloned().collect(); + messages.sort_by_key(|msg| msg.id); + + let mut payloads = Vec::new(); + for msg in messages { + if msg.id <= *last_seen_message_id { + continue; + } + + match serialize_daemon_message(&store, &msg) { + Ok(json) => payloads.push(json), + Err(e) => eprintln!("Warning: daemon could not serialize message {}: {e}", msg.id), + } + + *last_seen_message_id = msg.id; + } + + payloads +} + +fn collect_recent_daemon_messages(path: &Path, limit: usize) -> Vec { + if !path.exists() { + return Vec::new(); + } + + let store = match CommStore::load(path) { + Ok(store) => store, + Err(e) => { + eprintln!("Warning: daemon could not load store {}: {e}", path.display()); + return Vec::new(); + } + }; + + let mut messages: Vec<_> = store.messages.values().cloned().collect(); + messages.sort_by_key(|msg| msg.id); + + let start = messages.len().saturating_sub(limit); + messages + .into_iter() + .skip(start) + .filter_map(|msg| match serialize_daemon_message(&store, &msg) { + Ok(json) => Some(json), + Err(e) => { + eprintln!("Warning: daemon could not serialize message {}: {e}", msg.id); + None + } + }) + .collect() +} + +fn serialize_daemon_message( + store: &CommStore, + msg: &agentic_comm::Message, +) -> Result { + let payload = serde_json::json!({ + "sender": msg.sender, + "text": msg.content, + "timestamp": msg.timestamp.to_rfc3339(), + "channel": store + .get_channel(msg.channel_id) + .map(|channel| channel.name) + .unwrap_or_else(|| msg.channel_id.to_string()), + "lamport": msg.comm_timestamp.lamport, + }); + + serde_json::to_string(&payload) +} + +fn pid_is_alive(pid: u32) -> bool { + #[cfg(windows)] + { + std::process::Command::new("tasklist") + .args(["/FI", &format!("PID eq {pid}"), "/FO", "CSV", "/NH"]) + .output() + .map(|output| { + let stdout = String::from_utf8_lossy(&output.stdout); + stdout.contains(&format!("\"{pid}\"")) + }) + .unwrap_or(false) + } + + #[cfg(not(windows))] + { + std::process::Command::new("kill") + .args(["-0", &pid.to_string()]) + .stdout(std::process::Stdio::null()) + .stderr(std::process::Stdio::null()) + .status() + .map(|status| status.success()) + .unwrap_or(false) + } +} + fn main() { let cli = Cli::parse(); let store_path = resolve_store_path(cli.file); @@ -1441,6 +1623,57 @@ fn main() { } }, + // ----------------------------------------------------------------- + // Chat subcommands (one-shot poll/send) + // ----------------------------------------------------------------- + Commands::Chat { action } => match action { + ChatAction::Poll { + channel, + since, + limit, + } => { + let mut store = load_or_create(&store_path); + let since_dt = since.and_then(|ts| chrono::DateTime::::from_timestamp(ts, 0)); + match store.receive_messages(channel, None, since_dt) { + Ok(mut msgs) => { + msgs.truncate(limit); + output(&serde_json::to_value(&msgs).unwrap(), json_mode); + } + Err(e) => { + eprintln!("Error: {e}"); + std::process::exit(1); + } + } + } + ChatAction::Send { + channel, + message, + sender, + } => { + let mut store = load_or_create(&store_path); + match store.send_message(channel, &sender, &message, MessageType::Text) { + Ok(msg) => { + output( + &serde_json::json!({ + "status": "sent", + "message_id": msg.id, + "channel_id": msg.channel_id, + "timestamp": msg.timestamp.to_rfc3339(), + }), + json_mode, + ); + if let Err(e) = store.save(&store_path) { + eprintln!("Warning: failed to save store: {e}"); + } + } + Err(e) => { + eprintln!("Error: {e}"); + std::process::exit(1); + } + } + } + }, + // ----------------------------------------------------------------- // Query subcommands // ----------------------------------------------------------------- @@ -2650,35 +2883,139 @@ fn main() { .parent() .unwrap_or_else(|| std::path::Path::new(".")); let pid_path = data_dir.join("acomm.pid"); + let stop_path = data_dir.join("acomm.stop"); let pid = std::process::id(); // Write PID file if let Err(e) = std::fs::write(&pid_path, pid.to_string()) { eprintln!("Warning: could not write PID file: {e}"); } + let _pid_guard = PidFileGuard::new(pid_path.clone()); + let _ = std::fs::remove_file(&stop_path); - output( - &serde_json::json!({ - "status": "started", - "pid": pid, - "port": port, - "data": data_path.display().to_string(), - "pid_file": pid_path.display().to_string(), - "note": "Daemon stub — exiting immediately (real daemon would loop)", - }), - json_mode, - ); + let runtime = tokio::runtime::Builder::new_current_thread() + .enable_all() + .build() + .unwrap_or_else(|e| { + eprintln!("Error: could not create tokio runtime: {e}"); + std::process::exit(1); + }); + + let daemon_result = runtime.block_on(async { + let listener = TcpListener::bind(format!("0.0.0.0:{port}")).await?; + let (sender, _) = broadcast::channel::(256); + let mut ticker = time::interval(Duration::from_secs(1)); + ticker.set_missed_tick_behavior(MissedTickBehavior::Skip); + let mut shutdown = Box::pin(tokio::signal::ctrl_c()); + let mut last_seen_message_id = current_max_message_id(&data_path); + let mut last_mtime = store_mtime(&data_path); + + output( + &serde_json::json!({ + "status": "listening", + "pid": pid, + "port": port, + "data": data_path.display().to_string(), + "pid_file": pid_path.display().to_string(), + }), + json_mode, + ); + + loop { + tokio::select! { + _ = &mut shutdown => { + break; + } + _ = ticker.tick() => { + if stop_path.exists() { + let _ = std::fs::remove_file(&stop_path); + break; + } + + let current_mtime = store_mtime(&data_path); + if current_mtime != last_mtime { + last_mtime = current_mtime; + for message in collect_new_daemon_messages(&data_path, &mut last_seen_message_id) { + let _ = sender.send(message); + } + } + } + accept_result = listener.accept() => { + let (mut stream, _) = accept_result?; + let mut receiver = sender.subscribe(); + let recent_messages = collect_recent_daemon_messages(&data_path, 50); + tokio::spawn(async move { + for message in recent_messages { + if stream.write_all(message.as_bytes()).await.is_err() { + return; + } + if stream.write_all(b"\n").await.is_err() { + return; + } + } + while let Ok(message) = receiver.recv().await { + if stream.write_all(message.as_bytes()).await.is_err() { + break; + } + if stream.write_all(b"\n").await.is_err() { + break; + } + } + }); + } + } + } + + Ok::<(), Box>(()) + }); + + if let Err(e) = daemon_result { + eprintln!("Error: daemon failed: {e}"); + std::process::exit(1); + } } DaemonAction::Stop => { let data_dir = store_path .parent() .unwrap_or_else(|| std::path::Path::new(".")); let pid_path = data_dir.join("acomm.pid"); + let stop_path = data_dir.join("acomm.stop"); if pid_path.exists() { match std::fs::read_to_string(&pid_path) { Ok(pid_str) => { let pid_str = pid_str.trim(); + let pid = pid_str.parse::().ok(); + if let Err(e) = std::fs::write(&stop_path, "stop\n") { + eprintln!("Warning: could not write stop file: {e}"); + } + + for _ in 0..10 { + if !pid_path.exists() { + output( + &serde_json::json!({ + "status": "stopped", + "pid": pid_str, + }), + json_mode, + ); + return; + } + std::thread::sleep(Duration::from_millis(500)); + } + + if let Some(pid) = pid { + #[cfg(windows)] + let _ = std::process::Command::new("taskkill") + .args(["/PID", &pid.to_string(), "/T", "/F"]) + .status(); + + #[cfg(not(windows))] + let _ = std::process::Command::new("kill") + .arg(pid.to_string()) + .status(); + } + output( &serde_json::json!({ "status": "stopping", @@ -2686,9 +3023,6 @@ fn main() { }), json_mode, ); - if let Err(e) = std::fs::remove_file(&pid_path) { - eprintln!("Warning: could not remove PID file: {e}"); - } } Err(e) => { eprintln!("Error reading PID file: {e}"); @@ -2869,16 +3203,7 @@ fn main() { let pid_alive = pid_str .parse::() .ok() - .map(|pid| { - // Check process existence via kill -0 on Unix - std::process::Command::new("kill") - .args(["-0", &pid.to_string()]) - .stdout(std::process::Stdio::null()) - .stderr(std::process::Stdio::null()) - .status() - .map(|s| s.success()) - .unwrap_or(false) - }) + .map(pid_is_alive) .unwrap_or(false); // Read PID file modification time as a proxy for start time @@ -3307,3 +3632,4 @@ fn main() { } } } + diff --git a/docs/public/cli-reference.md b/docs/public/cli-reference.md index 18e7507..fc43849 100644 --- a/docs/public/cli-reference.md +++ b/docs/public/cli-reference.md @@ -98,6 +98,11 @@ Options: --file Path to .acomm store file ``` +**Behavior note:** +- `acomm send` writes to the channel stream. +- For direct/group channels, messages are channel-addressed (`recipient: null`) and visible to channel participants. +- `--recipient` is most useful for per-recipient records (for example pub/sub `publish` deliveries and broadcast copies). + **Example: Receive all messages from channel 1** ```bash @@ -336,6 +341,11 @@ Output: The `delivered_count` indicates how many subscribers received the message. If no subscribers are registered for the topic, the count is 0 and no messages are created. +**Coordination tip (multi-agent teams):** +- Use `subscribe` + `publish` for routing/fan-out. +- Before sending a follow-up response in collaborative channels, run a poll/read step first so you do not post a duplicate parallel reply. +- See runnable example: `examples/pubsub-fanout-recipient-delivery.sh`. + --- ### acomm history diff --git a/docs/public/multi-agent-coordination.md b/docs/public/multi-agent-coordination.md new file mode 100644 index 0000000..6f0b03f --- /dev/null +++ b/docs/public/multi-agent-coordination.md @@ -0,0 +1,181 @@ +--- +status: stable +--- + +# Multi-Agent Coordination with AgenticComm + +Practical patterns for coordinating multiple AI agents through a shared AgenticComm store. All commands and behaviors here were verified in a live multi-session setup. + +--- + +## Channel Semantics + +AgenticComm supports two distinct communication patterns. Choose the right one for your coordination need. + +### Direct / Group channels — shared stream + +`acomm message send` writes one message to the channel stream. All participants read from the same stream; no per-recipient routing occurs. + +```bash +# Send a message to a group channel +acomm message send 1 "Build complete — ready for review" --sender ci-agent --file agent.acomm --json +# → { "channel_id": 1, "message_id": 42, "status": "sent" } + +# Any participant can read it +acomm message list 1 --file agent.acomm --json +# → [ { "id": 42, "sender": "ci-agent", "recipient": null, "content": "Build complete..." } ] +``` + +`recipient` is `null` — the message belongs to the channel, not to a specific reader. + +### Pub/sub — fan-out with per-subscriber delivery records + +`subscribe` + `publish` creates one delivery record per registered subscriber. Each subscriber reads only their own entry. + +```bash +# Register subscribers +acomm subscribe updates agent-a --file agent.acomm +acomm subscribe updates agent-b --file agent.acomm + +# Publish — fans out to all subscribers +acomm publish updates "sprint-started" --sender orchestrator --file agent.acomm --json +# → { "topic": "updates", "delivered_count": 2, "status": "published" } + +# Each subscriber reads only their own delivery +acomm receive 1 --recipient agent-a --file agent.acomm --json +# → [ { "recipient": "agent-a", "content": "sprint-started", ... } ] + +acomm receive 1 --recipient agent-b --file agent.acomm --json +# → [ { "recipient": "agent-b", "content": "sprint-started", ... } ] +``` + +`delivered_count: 2` confirms both subscribers received the message. Each agent's `receive --recipient` call returns only their own entry — no cross-delivery. + +--- + +## Cross-Session Delivery + +Messages written by one agent session are immediately readable by another session on the same store. This was confirmed in live testing: + +| Message | Sender | Session | Lamport | Timestamp | +|---------|--------|---------|---------|-----------| +| `probe:session-a:...` | agent-a | Session A | 40 | 10:28:00 | +| `ack:session-b:...` | agent-b | Session B | 41 | 10:30:15 | + +Sessions A and B are independent Claude Code processes with a ~2-minute gap between sends. Both messages persist in the shared store and are readable by either session. + +**What "cross-session delivery" means here:** the `.acomm` store is a local file. Any session with a path to that file can read and write it. There is no network hop; delivery latency is local I/O. + +--- + +## Awareness Model + +Cross-session delivery (store level) works, but agents becoming *aware* of new messages requires an additional step. There are three distinct layers: + +### Layer 1 — Store delivery ✓ + +`acomm message send` → message persisted in `.acomm`. Verifiable with `message list`. Works automatically. + +### Layer 2 — Human-terminal awareness (optional watcher) + +`acomm-notify.ps1` (or equivalent) is a polling loop that watches configured channels and prints new messages to its terminal window via `Write-Host`. The human watching that window sees near-real-time alerts. + +**Important:** the watcher must be: +1. Explicitly started (it does not auto-launch) +2. Configured to watch the channels where messages are expected + +```powershell +# Start watcher for specific channels +.\acomm-notify.ps1 -Channels 'handoff','gate' -IntervalSeconds 2 +``` + +#### Watcher lifecycle wrappers (always-on operation) + +In operational setups, use lifecycle wrappers so the watcher can run continuously instead of being manually re-launched: + +```powershell +# Start background notifier process (writes pid metadata) +.\start-notifier.ps1 -Channels 'handoff','gate','debate' -IntervalSeconds 2 + +# Check running/stale status +.\status-notifier.ps1 -Json + +# Stop notifier process and clear pid metadata +.\stop-notifier.ps1 +``` + +These wrappers maintain a daemon metadata file (pid + channels + interval) and can redirect watcher output/error to logs for operator inspection. + +### Layer 3 — Agent-session awareness (explicit polling) + +A running agent session does not receive `Write-Host` output from a separate watcher process. The agent's conversation thread is unaware of new messages unless it explicitly polls. + +**Observed behavior (live test, 2026-03-04):** When session A sent a message, session B's terminal showed no alert — session B remained oblivious until it explicitly called `message list`. + +The reliable agent-side pattern is disciplined polling at the start of each interaction turn: + +```bash +# At the start of each turn, check for new messages since last read +acomm message list 1 --file agent.acomm --json +``` + +**Near-term path (achievable today):** Wire a `UserPromptSubmit` hook that queries the acomm store for channel deltas and writes them to stdout. Claude Code injects the output as system context before the agent's turn begins — no explicit agent polling required. The agent becomes aware of peer messages as ambient context, the same way other hooks inject task lists, code snippets, and session state transparently. This requires only a hook script, not any MCP protocol changes. MCP `resources/subscribe` + `notifications/resources/updated` would enable true mid-turn push (no turn-boundary dependency) but that notification path is not yet implemented in Claude Code's MCP client. + +--- + +## Poll-Before-Respond Discipline + +When multiple agents share a channel, they poll independently and cannot see each other's in-flight responses before their own turn completes. This creates a parallel-reply blindspot. + +**The problem:** Agent A and Agent B both see a question at lamport T. Both compose answers independently. Both post. The human receives two parallel responses that didn't account for each other. + +**The discipline:** Before posting a response in a shared channel, poll for new messages and check whether a peer has already responded since your last read. + +```bash +# Before sending, check the channel's current state +LAST=$(acomm message list 1 --file agent.acomm --json | python3 -c \ + "import sys,json; msgs=json.load(sys.stdin); print(msgs[-1]['id'] if msgs else 0)") + +# Only post if no peer has answered since your last check +if [ "$LAST" -le "$YOUR_LAST_KNOWN_ID" ]; then + acomm message send 1 "My response..." --sender agent-a --file agent.acomm +fi +``` + +The `delivered_count` from `publish` provides a similar gate for pub/sub workflows — if `delivered_count` is 0, no subscribers are registered and the message would be undelivered. + +--- + +## Watcher Channel Scope + +The watcher only surfaces messages from channels in its watch list. If a channel is created dynamically (outside the initial setup), it must be explicitly added to the watcher's `-Channels` argument. + +```powershell +# Watcher that covers both standard and dynamic channels +.\acomm-notify.ps1 -Channels 'handoff','gate','debate','my-new-channel' +``` + +Channel name resolution depends on the channel being registered in the store. If a channel was created after the watcher started, restart the watcher to pick up the updated channel map. + +--- + +## Runnable Example + +See `examples/pubsub-fanout-recipient-delivery.sh` for a self-contained, verifiable demonstration of the pub/sub fan-out and recipient-scoped delivery pattern: + +```bash +bash examples/pubsub-fanout-recipient-delivery.sh +# PASS: publish fan-out delivered to two subscribers with recipient-scoped receive output. +``` + +--- + +## Summary + +| Pattern | Use when | Recipient routing | +|---------|----------|-------------------| +| `message send` / `list` | All participants need the same message | `recipient: null` (channel stream) | +| `subscribe` + `publish` + `receive --recipient` | Each agent needs their own delivery record | `recipient: some(name)` per subscriber | +| Poll-before-respond | Preventing parallel duplicate replies | — discipline, not a command | +| Watcher loop | Human-terminal alerting | — optional, not agent-native | +| `UserPromptSubmit` hook | Auto-injecting acomm deltas as ambient context at every agent turn start | — no manual poll required | diff --git a/examples/README.md b/examples/README.md index 04fefa5..049afa2 100644 --- a/examples/README.md +++ b/examples/README.md @@ -1,35 +1,31 @@ # AgenticComm Examples -Runnable examples demonstrating the AgenticComm Python SDK. +Runnable, no-cloud examples for common coordination patterns. ## Prerequisites ```bash -pip install agentic-comm -cargo install agentic-comm +acomm --version ``` +The examples below use only the `acomm` CLI and a temporary local `.acomm` store. + ## Examples | File | Description | |------|-------------| -| `01_basic_messaging.py` | Simplest possible example. Create a store, create a channel, send and receive messages. | -| `02_multi_agent.py` | Multi-agent coordination. Multiple agents sending tasks and status updates through shared channels. | -| `03_task_queue.py` | Task queue pattern. One agent posts work items, another agent picks them up and reports completion. | -| `04_code_review.py` | Code review handoff. Developer agent requests review, reviewer agent provides feedback through channels. | -| `05_broadcast.py` | Broadcasting pattern. System-wide announcements sent to all channels simultaneously. | -| `06_search_history.py` | Search and history. Finding past messages and reviewing channel history for context. | +| `pubsub-fanout-recipient-delivery.sh` | Verifies `subscribe + publish` fan-out and recipient-scoped delivery (`receive --recipient`). | ## Running ```bash -# All examples — no API key needed -python examples/01_basic_messaging.py -python examples/02_multi_agent.py -python examples/03_task_queue.py -python examples/04_code_review.py -python examples/05_broadcast.py -python examples/06_search_history.py +bash examples/pubsub-fanout-recipient-delivery.sh +``` + +Optional: pass a store path to keep artifacts for inspection. + +```bash +bash examples/pubsub-fanout-recipient-delivery.sh ./scratch/pubsub-demo.acomm ``` ## MCP Server diff --git a/examples/pubsub-fanout-recipient-delivery.sh b/examples/pubsub-fanout-recipient-delivery.sh new file mode 100644 index 0000000..d99c2a1 --- /dev/null +++ b/examples/pubsub-fanout-recipient-delivery.sh @@ -0,0 +1,78 @@ +#!/usr/bin/env bash +set -euo pipefail + +# Demonstrates deterministic pub/sub coordination: +# 1) two subscribers register to one topic +# 2) one publish fans out to both +# 3) each recipient reads only their own delivery entry + +if ! command -v acomm >/dev/null 2>&1; then + echo "error: acomm CLI is required but was not found in PATH" >&2 + exit 1 +fi + +STORE_INPUT="${1:-}" +TOPIC="${TOPIC:-updates}" +SUBSCRIBER_A="${SUBSCRIBER_A:-agent-a}" +SUBSCRIBER_B="${SUBSCRIBER_B:-agent-b}" +SENDER="${SENDER:-ci-agent}" +CONTENT="${CONTENT:-hello-topic}" +CHANNEL_ID=1 + +cleanup_store=0 +if [[ -n "$STORE_INPUT" ]]; then + STORE="$STORE_INPUT" +else + STORE="$(mktemp -t agentic-comm-pubsub-XXXXXX.acomm)" + cleanup_store=1 +fi + +cleanup() { + if [[ "$cleanup_store" -eq 1 ]]; then + rm -f "$STORE" + fi +} +trap cleanup EXIT + +rm -f "$STORE" +acomm init --json "$STORE" >/dev/null + +acomm subscribe --file "$STORE" --json "$TOPIC" "$SUBSCRIBER_A" >/dev/null +acomm subscribe --file "$STORE" --json "$TOPIC" "$SUBSCRIBER_B" >/dev/null + +publish_json="$(acomm publish --file "$STORE" --json --sender "$SENDER" "$TOPIC" "$CONTENT")" +if ! grep -q '"delivered_count":[[:space:]]*2' <<<"$publish_json"; then + echo "error: expected delivered_count=2, got:" >&2 + echo "$publish_json" >&2 + exit 1 +fi + +recv_a="$(acomm receive --file "$STORE" --json --recipient "$SUBSCRIBER_A" "$CHANNEL_ID")" +recv_b="$(acomm receive --file "$STORE" --json --recipient "$SUBSCRIBER_B" "$CHANNEL_ID")" + +if ! grep -q "\"recipient\":[[:space:]]*\"$SUBSCRIBER_A\"" <<<"$recv_a"; then + echo "error: expected recipient $SUBSCRIBER_A in receive output" >&2 + echo "$recv_a" >&2 + exit 1 +fi + +if ! grep -q "\"recipient\":[[:space:]]*\"$SUBSCRIBER_B\"" <<<"$recv_b"; then + echo "error: expected recipient $SUBSCRIBER_B in receive output" >&2 + echo "$recv_b" >&2 + exit 1 +fi + +if grep -q "\"recipient\":[[:space:]]*\"$SUBSCRIBER_B\"" <<<"$recv_a"; then + echo "error: cross-delivery detected in $SUBSCRIBER_A receive output" >&2 + echo "$recv_a" >&2 + exit 1 +fi + +if grep -q "\"recipient\":[[:space:]]*\"$SUBSCRIBER_A\"" <<<"$recv_b"; then + echo "error: cross-delivery detected in $SUBSCRIBER_B receive output" >&2 + echo "$recv_b" >&2 + exit 1 +fi + +echo "PASS: publish fan-out delivered to two subscribers with recipient-scoped receive output." +echo "store: $STORE"