From 4600c6eca5007b91b45492aa54c44763dc5651a8 Mon Sep 17 00:00:00 2001 From: sampurnamo <157491025+sampurnamo@users.noreply.github.com> Date: Wed, 4 Mar 2026 05:27:09 -0500 Subject: [PATCH 01/12] docs(examples): add verified pubsub fan-out delivery walkthrough --- docs/public/cli-reference.md | 5 ++ examples/README.md | 28 +++---- examples/pubsub-fanout-recipient-delivery.sh | 78 ++++++++++++++++++++ 3 files changed, 95 insertions(+), 16 deletions(-) create mode 100644 examples/pubsub-fanout-recipient-delivery.sh diff --git a/docs/public/cli-reference.md b/docs/public/cli-reference.md index 18e7507..5d99133 100644 --- a/docs/public/cli-reference.md +++ b/docs/public/cli-reference.md @@ -336,6 +336,11 @@ Output: The `delivered_count` indicates how many subscribers received the message. If no subscribers are registered for the topic, the count is 0 and no messages are created. +**Coordination tip (multi-agent teams):** +- Use `subscribe` + `publish` for routing/fan-out. +- Before sending a follow-up response in collaborative channels, run a poll/read step first so you do not post a duplicate parallel reply. +- See runnable example: `examples/pubsub-fanout-recipient-delivery.sh`. + --- ### acomm history diff --git a/examples/README.md b/examples/README.md index 04fefa5..049afa2 100644 --- a/examples/README.md +++ b/examples/README.md @@ -1,35 +1,31 @@ # AgenticComm Examples -Runnable examples demonstrating the AgenticComm Python SDK. +Runnable, no-cloud examples for common coordination patterns. ## Prerequisites ```bash -pip install agentic-comm -cargo install agentic-comm +acomm --version ``` +The examples below use only the `acomm` CLI and a temporary local `.acomm` store. + ## Examples | File | Description | |------|-------------| -| `01_basic_messaging.py` | Simplest possible example. Create a store, create a channel, send and receive messages. | -| `02_multi_agent.py` | Multi-agent coordination. Multiple agents sending tasks and status updates through shared channels. | -| `03_task_queue.py` | Task queue pattern. One agent posts work items, another agent picks them up and reports completion. | -| `04_code_review.py` | Code review handoff. Developer agent requests review, reviewer agent provides feedback through channels. | -| `05_broadcast.py` | Broadcasting pattern. System-wide announcements sent to all channels simultaneously. | -| `06_search_history.py` | Search and history. Finding past messages and reviewing channel history for context. | +| `pubsub-fanout-recipient-delivery.sh` | Verifies `subscribe + publish` fan-out and recipient-scoped delivery (`receive --recipient`). | ## Running ```bash -# All examples — no API key needed -python examples/01_basic_messaging.py -python examples/02_multi_agent.py -python examples/03_task_queue.py -python examples/04_code_review.py -python examples/05_broadcast.py -python examples/06_search_history.py +bash examples/pubsub-fanout-recipient-delivery.sh +``` + +Optional: pass a store path to keep artifacts for inspection. + +```bash +bash examples/pubsub-fanout-recipient-delivery.sh ./scratch/pubsub-demo.acomm ``` ## MCP Server diff --git a/examples/pubsub-fanout-recipient-delivery.sh b/examples/pubsub-fanout-recipient-delivery.sh new file mode 100644 index 0000000..e449736 --- /dev/null +++ b/examples/pubsub-fanout-recipient-delivery.sh @@ -0,0 +1,78 @@ +#!/usr/bin/env bash +set -euo pipefail + +# Demonstrates deterministic pub/sub coordination: +# 1) two subscribers register to one topic +# 2) one publish fans out to both +# 3) each recipient reads only their own delivery entry + +if ! command -v acomm >/dev/null 2>&1; then + echo "error: acomm CLI is required but was not found in PATH" >&2 + exit 1 +fi + +STORE_INPUT="${1:-}" +TOPIC="${TOPIC:-updates}" +SUBSCRIBER_A="${SUBSCRIBER_A:-meera}" +SUBSCRIBER_B="${SUBSCRIBER_B:-ishika}" +SENDER="${SENDER:-ci-agent}" +CONTENT="${CONTENT:-hello-topic}" +CHANNEL_ID=1 + +cleanup_store=0 +if [[ -n "$STORE_INPUT" ]]; then + STORE="$STORE_INPUT" +else + STORE="$(mktemp -t agentic-comm-pubsub-XXXXXX.acomm)" + cleanup_store=1 +fi + +cleanup() { + if [[ "$cleanup_store" -eq 1 ]]; then + rm -f "$STORE" + fi +} +trap cleanup EXIT + +rm -f "$STORE" +acomm init --json "$STORE" >/dev/null + +acomm subscribe --file "$STORE" --json "$TOPIC" "$SUBSCRIBER_A" >/dev/null +acomm subscribe --file "$STORE" --json "$TOPIC" "$SUBSCRIBER_B" >/dev/null + +publish_json="$(acomm publish --file "$STORE" --json --sender "$SENDER" "$TOPIC" "$CONTENT")" +if ! grep -q '"delivered_count":[[:space:]]*2' <<<"$publish_json"; then + echo "error: expected delivered_count=2, got:" >&2 + echo "$publish_json" >&2 + exit 1 +fi + +recv_a="$(acomm receive --file "$STORE" --json --recipient "$SUBSCRIBER_A" "$CHANNEL_ID")" +recv_b="$(acomm receive --file "$STORE" --json --recipient "$SUBSCRIBER_B" "$CHANNEL_ID")" + +if ! grep -q "\"recipient\":[[:space:]]*\"$SUBSCRIBER_A\"" <<<"$recv_a"; then + echo "error: expected recipient $SUBSCRIBER_A in receive output" >&2 + echo "$recv_a" >&2 + exit 1 +fi + +if ! grep -q "\"recipient\":[[:space:]]*\"$SUBSCRIBER_B\"" <<<"$recv_b"; then + echo "error: expected recipient $SUBSCRIBER_B in receive output" >&2 + echo "$recv_b" >&2 + exit 1 +fi + +if grep -q "\"recipient\":[[:space:]]*\"$SUBSCRIBER_B\"" <<<"$recv_a"; then + echo "error: cross-delivery detected in $SUBSCRIBER_A receive output" >&2 + echo "$recv_a" >&2 + exit 1 +fi + +if grep -q "\"recipient\":[[:space:]]*\"$SUBSCRIBER_A\"" <<<"$recv_b"; then + echo "error: cross-delivery detected in $SUBSCRIBER_B receive output" >&2 + echo "$recv_b" >&2 + exit 1 +fi + +echo "PASS: publish fan-out delivered to two subscribers with recipient-scoped receive output." +echo "store: $STORE" From e4778d786a51e3a91f46b6b888606355279da93b Mon Sep 17 00:00:00 2001 From: sampurnamo <157491025+sampurnamo@users.noreply.github.com> Date: Wed, 4 Mar 2026 05:44:32 -0500 Subject: [PATCH 02/12] docs(cli): clarify send/receive stream vs recipient-scoped delivery --- docs/public/cli-reference.md | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/docs/public/cli-reference.md b/docs/public/cli-reference.md index 5d99133..fc43849 100644 --- a/docs/public/cli-reference.md +++ b/docs/public/cli-reference.md @@ -98,6 +98,11 @@ Options: --file Path to .acomm store file ``` +**Behavior note:** +- `acomm send` writes to the channel stream. +- For direct/group channels, messages are channel-addressed (`recipient: null`) and visible to channel participants. +- `--recipient` is most useful for per-recipient records (for example pub/sub `publish` deliveries and broadcast copies). + **Example: Receive all messages from channel 1** ```bash From a664257aa667a5b34378c322933386ea69aae24b Mon Sep 17 00:00:00 2001 From: sampurnamo <157491025+sampurnamo@users.noreply.github.com> Date: Wed, 4 Mar 2026 06:05:38 -0500 Subject: [PATCH 03/12] docs: add multi-agent coordination guide Covers verified patterns from live multi-session testing: - channel semantics: direct/group (recipient:null) vs pub/sub fan-out - cross-session delivery: store-level, confirmed with live message IDs - three-layer awareness model: store delivery, human-terminal watcher, agent-session explicit polling - poll-before-respond discipline for preventing parallel-reply blindspots - watcher channel scope and dynamic channel resolution - summary table of patterns and routing behavior All commands and behaviors verified against live triveni.acomm store. --- docs/public/multi-agent-coordination.md | 163 ++++++++++++++++++++++++ 1 file changed, 163 insertions(+) create mode 100644 docs/public/multi-agent-coordination.md diff --git a/docs/public/multi-agent-coordination.md b/docs/public/multi-agent-coordination.md new file mode 100644 index 0000000..9c23ed2 --- /dev/null +++ b/docs/public/multi-agent-coordination.md @@ -0,0 +1,163 @@ +--- +status: stable +--- + +# Multi-Agent Coordination with AgenticComm + +Practical patterns for coordinating multiple AI agents through a shared AgenticComm store. All commands and behaviors here were verified in a live multi-session setup. + +--- + +## Channel Semantics + +AgenticComm supports two distinct communication patterns. Choose the right one for your coordination need. + +### Direct / Group channels — shared stream + +`acomm message send` writes one message to the channel stream. All participants read from the same stream; no per-recipient routing occurs. + +```bash +# Send a message to a group channel +acomm message send 1 "Build complete — ready for review" --sender ci-agent --file agent.acomm --json +# → { "channel_id": 1, "message_id": 42, "status": "sent" } + +# Any participant can read it +acomm message list 1 --file agent.acomm --json +# → [ { "id": 42, "sender": "ci-agent", "recipient": null, "content": "Build complete..." } ] +``` + +`recipient` is `null` — the message belongs to the channel, not to a specific reader. + +### Pub/sub — fan-out with per-subscriber delivery records + +`subscribe` + `publish` creates one delivery record per registered subscriber. Each subscriber reads only their own entry. + +```bash +# Register subscribers +acomm subscribe updates meera --file agent.acomm +acomm subscribe updates ishika --file agent.acomm + +# Publish — fans out to all subscribers +acomm publish updates "sprint-started" --sender orchestrator --file agent.acomm --json +# → { "topic": "updates", "delivered_count": 2, "status": "published" } + +# Each subscriber reads only their own delivery +acomm receive 1 --recipient meera --file agent.acomm --json +# → [ { "recipient": "meera", "content": "sprint-started", ... } ] + +acomm receive 1 --recipient ishika --file agent.acomm --json +# → [ { "recipient": "ishika", "content": "sprint-started", ... } ] +``` + +`delivered_count: 2` confirms both subscribers received the message. Each agent's `receive --recipient` call returns only their own entry — no cross-delivery. + +--- + +## Cross-Session Delivery + +Messages written by one agent session are immediately readable by another session on the same store. This was confirmed in live testing: + +| Message | Sender | Session | Lamport | Timestamp | +|---------|--------|---------|---------|-----------| +| `real-ishika-probe:...` | ishika | Session A | 40 | 10:28:00 | +| `real-meera-ack:...` | meera | Session B | 41 | 10:30:15 | + +Sessions A and B are independent Claude Code processes with a ~2-minute gap between sends. Both messages persist in the shared store and are readable by either session. + +**What "cross-session delivery" means here:** the `.acomm` store is a local file. Any session with a path to that file can read and write it. There is no network hop; delivery latency is local I/O. + +--- + +## Awareness Model + +Cross-session delivery (store level) works, but agents becoming *aware* of new messages requires an additional step. There are three distinct layers: + +### Layer 1 — Store delivery ✓ + +`acomm message send` → message persisted in `.acomm`. Verifiable with `message list`. Works automatically. + +### Layer 2 — Human-terminal awareness (optional watcher) + +`acomm-notify.ps1` (or equivalent) is a polling loop that watches configured channels and prints new messages to its terminal window via `Write-Host`. The human watching that window sees near-real-time alerts. + +**Important:** the watcher must be: +1. Explicitly started (it does not auto-launch) +2. Configured to watch the channels where messages are expected + +```powershell +# Start watcher for specific channels +.\acomm-notify.ps1 -Channels 'trine-handoff','trine-gate' -IntervalSeconds 2 +``` + +### Layer 3 — Agent-session awareness (explicit polling) + +A running agent session does not receive `Write-Host` output from a separate watcher process. The agent's conversation thread is unaware of new messages unless it explicitly polls. + +**Observed behavior (live test, 2026-03-04):** When session A sent a message, session B's terminal showed no alert — session B remained oblivious until it explicitly called `message list`. + +The reliable agent-side pattern is disciplined polling at the start of each interaction turn: + +```bash +# At the start of each turn, check for new messages since last read +acomm message list 1 --file agent.acomm --json +``` + +**Future path:** MCP resource subscriptions (`resources/subscribe` + `notifications/resources/updated`) would allow the MCP server to push notifications to an active agent session, eliminating the need for explicit polling. This is not yet implemented in agentchattr or Claude Code. + +--- + +## Poll-Before-Respond Discipline + +When multiple agents share a channel, they poll independently and cannot see each other's in-flight responses before their own turn completes. This creates a parallel-reply blindspot. + +**The problem:** Agent A and Agent B both see a question at lamport T. Both compose answers independently. Both post. The human receives two parallel responses that didn't account for each other. + +**The discipline:** Before posting a response in a shared channel, poll for new messages and check whether a peer has already responded since your last read. + +```bash +# Before sending, check the channel's current state +LAST=$(acomm message list 1 --file agent.acomm --json | python3 -c \ + "import sys,json; msgs=json.load(sys.stdin); print(msgs[-1]['id'] if msgs else 0)") + +# Only post if no peer has answered since your last check +if [ "$LAST" -le "$YOUR_LAST_KNOWN_ID" ]; then + acomm message send 1 "My response..." --sender agent-a --file agent.acomm +fi +``` + +The `delivered_count` from `publish` provides a similar gate for pub/sub workflows — if `delivered_count` is 0, no subscribers are registered and the message would be undelivered. + +--- + +## Watcher Channel Scope + +The watcher only surfaces messages from channels in its watch list. If a channel is created dynamically (outside the initial setup), it must be explicitly added to the watcher's `-Channels` argument. + +```powershell +# Watcher that covers both standard and dynamic channels +.\acomm-notify.ps1 -Channels 'trine-handoff','trine-gate','trine-debate','my-new-channel' +``` + +Channel name resolution depends on the channel being registered in the store. If a channel was created after the watcher started, restart the watcher to pick up the updated channel map. + +--- + +## Runnable Example + +See `examples/pubsub-fanout-recipient-delivery.sh` for a self-contained, verifiable demonstration of the pub/sub fan-out and recipient-scoped delivery pattern: + +```bash +bash examples/pubsub-fanout-recipient-delivery.sh +# PASS: publish fan-out delivered to two subscribers with recipient-scoped receive output. +``` + +--- + +## Summary + +| Pattern | Use when | Recipient routing | +|---------|----------|-------------------| +| `message send` / `list` | All participants need the same message | `recipient: null` (channel stream) | +| `subscribe` + `publish` + `receive --recipient` | Each agent needs their own delivery record | `recipient: some(name)` per subscriber | +| Poll-before-respond | Preventing parallel duplicate replies | — discipline, not a command | +| Watcher loop | Human-terminal alerting | — optional, not agent-native | From 3c3a488026fc0ecf92d4cec512fd3fd62a762cfa Mon Sep 17 00:00:00 2001 From: sampurnamo <157491025+sampurnamo@users.noreply.github.com> Date: Wed, 4 Mar 2026 16:03:10 -0500 Subject: [PATCH 04/12] docs(coordination): add watcher lifecycle wrapper usage --- docs/public/multi-agent-coordination.md | 17 +++++++++++++++++ 1 file changed, 17 insertions(+) diff --git a/docs/public/multi-agent-coordination.md b/docs/public/multi-agent-coordination.md index 9c23ed2..b9b36be 100644 --- a/docs/public/multi-agent-coordination.md +++ b/docs/public/multi-agent-coordination.md @@ -89,6 +89,23 @@ Cross-session delivery (store level) works, but agents becoming *aware* of new m .\acomm-notify.ps1 -Channels 'trine-handoff','trine-gate' -IntervalSeconds 2 ``` +#### Watcher lifecycle wrappers (always-on operation) + +In Triveni-style operational setups, use lifecycle wrappers so the watcher can run continuously instead of being manually re-launched: + +```powershell +# Start background notifier process (writes pid metadata) +.\start-acomm-notifier.ps1 -Channels 'trine-handoff','trine-gate','trine-debate' -IntervalSeconds 2 + +# Check running/stale status +.\status-acomm-notifier.ps1 -Json + +# Stop notifier process and clear pid metadata +.\stop-acomm-notifier.ps1 +``` + +These wrappers maintain a daemon metadata file (pid + channels + interval) and can redirect watcher output/error to logs for operator inspection. + ### Layer 3 — Agent-session awareness (explicit polling) A running agent session does not receive `Write-Host` output from a separate watcher process. The agent's conversation thread is unaware of new messages unless it explicitly polls. From 3ce337dd8e836b93b094b7821ab587fa7ad1d099 Mon Sep 17 00:00:00 2001 From: sampurnamo <157491025+sampurnamo@users.noreply.github.com> Date: Wed, 4 Mar 2026 16:34:01 -0500 Subject: [PATCH 05/12] =?UTF-8?q?docs(coordination):=20correct=20Layer=203?= =?UTF-8?q?=20future=20path=20=E2=80=94=20achievable=20today=20via=20plugi?= =?UTF-8?q?n?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Previous text stated MCP resources/subscribe was the only path to auto-injection. Corrected: the UserPromptSubmit hook pattern (Mira/MemSearch) achieves the same outcome today without any MCP protocol changes. A proper acomm plugin can be built using this pattern. MCP resources/subscribe remains aspirational for true mid-turn push. --- docs/public/multi-agent-coordination.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/docs/public/multi-agent-coordination.md b/docs/public/multi-agent-coordination.md index b9b36be..38f4b75 100644 --- a/docs/public/multi-agent-coordination.md +++ b/docs/public/multi-agent-coordination.md @@ -119,7 +119,7 @@ The reliable agent-side pattern is disciplined polling at the start of each inte acomm message list 1 --file agent.acomm --json ``` -**Future path:** MCP resource subscriptions (`resources/subscribe` + `notifications/resources/updated`) would allow the MCP server to push notifications to an active agent session, eliminating the need for explicit polling. This is not yet implemented in agentchattr or Claude Code. +**Near-term path (achievable today):** Build an acomm Claude Code plugin using the same `UserPromptSubmit` hook injection pattern that tools like Mira and MemSearch already use. The plugin polls the acomm store at every turn boundary and injects new messages as system context automatically — eliminating the need for explicit agent polling. This requires building the plugin, not waiting for any MCP protocol changes. MCP `resources/subscribe` + `notifications/resources/updated` would enable true mid-turn push (no turn-boundary dependency) but that notification path is not yet implemented in Claude Code's MCP client. --- From dcc7b0e5643de19678e58d2a315f470b8cca6b18 Mon Sep 17 00:00:00 2001 From: sampurnamo <157491025+sampurnamo@users.noreply.github.com> Date: Wed, 4 Mar 2026 16:51:07 -0500 Subject: [PATCH 06/12] =?UTF-8?q?docs(coordination):=20simplify=20Layer=20?= =?UTF-8?q?3=20path=20=E2=80=94=20hook=20only,=20not=20plugin?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit A UserPromptSubmit hook script outputting acomm deltas is sufficient. No MCP plugin architecture needed. Peer messages become ambient context, same mechanism as task list / code snippet injection already in use. --- docs/public/multi-agent-coordination.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/docs/public/multi-agent-coordination.md b/docs/public/multi-agent-coordination.md index 38f4b75..c1b1862 100644 --- a/docs/public/multi-agent-coordination.md +++ b/docs/public/multi-agent-coordination.md @@ -119,7 +119,7 @@ The reliable agent-side pattern is disciplined polling at the start of each inte acomm message list 1 --file agent.acomm --json ``` -**Near-term path (achievable today):** Build an acomm Claude Code plugin using the same `UserPromptSubmit` hook injection pattern that tools like Mira and MemSearch already use. The plugin polls the acomm store at every turn boundary and injects new messages as system context automatically — eliminating the need for explicit agent polling. This requires building the plugin, not waiting for any MCP protocol changes. MCP `resources/subscribe` + `notifications/resources/updated` would enable true mid-turn push (no turn-boundary dependency) but that notification path is not yet implemented in Claude Code's MCP client. +**Near-term path (achievable today):** Wire a `UserPromptSubmit` hook that queries the acomm store for channel deltas and writes them to stdout. Claude Code injects the output as system context before the agent's turn begins — no explicit agent polling required. The agent becomes aware of peer messages as ambient context, the same way other hooks inject task lists, code snippets, and session state transparently. This requires only a hook script, not any MCP protocol changes. MCP `resources/subscribe` + `notifications/resources/updated` would enable true mid-turn push (no turn-boundary dependency) but that notification path is not yet implemented in Claude Code's MCP client. --- From 2febd46d4b194a2a655ae142c0ec5e984fa72639 Mon Sep 17 00:00:00 2001 From: sampurnamo <157491025+sampurnamo@users.noreply.github.com> Date: Wed, 4 Mar 2026 19:24:16 -0500 Subject: [PATCH 07/12] docs(coordination): remove Triveni-specific watcher references --- docs/public/multi-agent-coordination.md | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/docs/public/multi-agent-coordination.md b/docs/public/multi-agent-coordination.md index c1b1862..8fe0089 100644 --- a/docs/public/multi-agent-coordination.md +++ b/docs/public/multi-agent-coordination.md @@ -86,22 +86,22 @@ Cross-session delivery (store level) works, but agents becoming *aware* of new m ```powershell # Start watcher for specific channels -.\acomm-notify.ps1 -Channels 'trine-handoff','trine-gate' -IntervalSeconds 2 +.\acomm-notify.ps1 -Channels 'handoff','gate' -IntervalSeconds 2 ``` #### Watcher lifecycle wrappers (always-on operation) -In Triveni-style operational setups, use lifecycle wrappers so the watcher can run continuously instead of being manually re-launched: +In operational setups, use lifecycle wrappers so the watcher can run continuously instead of being manually re-launched: ```powershell # Start background notifier process (writes pid metadata) -.\start-acomm-notifier.ps1 -Channels 'trine-handoff','trine-gate','trine-debate' -IntervalSeconds 2 +.\start-notifier.ps1 -Channels 'handoff','gate','debate' -IntervalSeconds 2 # Check running/stale status -.\status-acomm-notifier.ps1 -Json +.\status-notifier.ps1 -Json # Stop notifier process and clear pid metadata -.\stop-acomm-notifier.ps1 +.\stop-notifier.ps1 ``` These wrappers maintain a daemon metadata file (pid + channels + interval) and can redirect watcher output/error to logs for operator inspection. @@ -152,7 +152,7 @@ The watcher only surfaces messages from channels in its watch list. If a channel ```powershell # Watcher that covers both standard and dynamic channels -.\acomm-notify.ps1 -Channels 'trine-handoff','trine-gate','trine-debate','my-new-channel' +.\acomm-notify.ps1 -Channels 'handoff','gate','debate','my-new-channel' ``` Channel name resolution depends on the channel being registered in the store. If a channel was created after the watcher started, restart the watcher to pick up the updated channel map. From 1892fbb72a3f000c4ed4723fb249f37e5368cef6 Mon Sep 17 00:00:00 2001 From: sampurnamo <157491025+sampurnamo@users.noreply.github.com> Date: Wed, 4 Mar 2026 19:34:59 -0500 Subject: [PATCH 08/12] docs(coordination): remove agent-specific names and add hook pattern to summary MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - Replace meera/ishika subscriber/recipient examples with generic agent-a/agent-b - Replace real probe message content in cross-session table with generic placeholders - Add UserPromptSubmit hook row to summary table — the ambient context injection pattern --- docs/public/multi-agent-coordination.md | 17 +++++++++-------- 1 file changed, 9 insertions(+), 8 deletions(-) diff --git a/docs/public/multi-agent-coordination.md b/docs/public/multi-agent-coordination.md index 8fe0089..6f0b03f 100644 --- a/docs/public/multi-agent-coordination.md +++ b/docs/public/multi-agent-coordination.md @@ -34,19 +34,19 @@ acomm message list 1 --file agent.acomm --json ```bash # Register subscribers -acomm subscribe updates meera --file agent.acomm -acomm subscribe updates ishika --file agent.acomm +acomm subscribe updates agent-a --file agent.acomm +acomm subscribe updates agent-b --file agent.acomm # Publish — fans out to all subscribers acomm publish updates "sprint-started" --sender orchestrator --file agent.acomm --json # → { "topic": "updates", "delivered_count": 2, "status": "published" } # Each subscriber reads only their own delivery -acomm receive 1 --recipient meera --file agent.acomm --json -# → [ { "recipient": "meera", "content": "sprint-started", ... } ] +acomm receive 1 --recipient agent-a --file agent.acomm --json +# → [ { "recipient": "agent-a", "content": "sprint-started", ... } ] -acomm receive 1 --recipient ishika --file agent.acomm --json -# → [ { "recipient": "ishika", "content": "sprint-started", ... } ] +acomm receive 1 --recipient agent-b --file agent.acomm --json +# → [ { "recipient": "agent-b", "content": "sprint-started", ... } ] ``` `delivered_count: 2` confirms both subscribers received the message. Each agent's `receive --recipient` call returns only their own entry — no cross-delivery. @@ -59,8 +59,8 @@ Messages written by one agent session are immediately readable by another sessio | Message | Sender | Session | Lamport | Timestamp | |---------|--------|---------|---------|-----------| -| `real-ishika-probe:...` | ishika | Session A | 40 | 10:28:00 | -| `real-meera-ack:...` | meera | Session B | 41 | 10:30:15 | +| `probe:session-a:...` | agent-a | Session A | 40 | 10:28:00 | +| `ack:session-b:...` | agent-b | Session B | 41 | 10:30:15 | Sessions A and B are independent Claude Code processes with a ~2-minute gap between sends. Both messages persist in the shared store and are readable by either session. @@ -178,3 +178,4 @@ bash examples/pubsub-fanout-recipient-delivery.sh | `subscribe` + `publish` + `receive --recipient` | Each agent needs their own delivery record | `recipient: some(name)` per subscriber | | Poll-before-respond | Preventing parallel duplicate replies | — discipline, not a command | | Watcher loop | Human-terminal alerting | — optional, not agent-native | +| `UserPromptSubmit` hook | Auto-injecting acomm deltas as ambient context at every agent turn start | — no manual poll required | From 229a283664b038de5431b54efba110d1e741f737 Mon Sep 17 00:00:00 2001 From: sampurnamo <157491025+sampurnamo@users.noreply.github.com> Date: Wed, 4 Mar 2026 19:41:56 -0500 Subject: [PATCH 09/12] docs(example): use generic subscriber defaults --- examples/pubsub-fanout-recipient-delivery.sh | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/examples/pubsub-fanout-recipient-delivery.sh b/examples/pubsub-fanout-recipient-delivery.sh index e449736..d99c2a1 100644 --- a/examples/pubsub-fanout-recipient-delivery.sh +++ b/examples/pubsub-fanout-recipient-delivery.sh @@ -13,8 +13,8 @@ fi STORE_INPUT="${1:-}" TOPIC="${TOPIC:-updates}" -SUBSCRIBER_A="${SUBSCRIBER_A:-meera}" -SUBSCRIBER_B="${SUBSCRIBER_B:-ishika}" +SUBSCRIBER_A="${SUBSCRIBER_A:-agent-a}" +SUBSCRIBER_B="${SUBSCRIBER_B:-agent-b}" SENDER="${SENDER:-ci-agent}" CONTENT="${CONTENT:-hello-topic}" CHANNEL_ID=1 From 73ac5268dcbf49e91cb5d26300eff417b1b9d860 Mon Sep 17 00:00:00 2001 From: sampurnamo <157491025+sampurnamo@users.noreply.github.com> Date: Wed, 4 Mar 2026 01:39:31 -0500 Subject: [PATCH 10/12] feat(acomm): add one-shot chat poll/send subcommands --- crates/agentic-comm-cli/src/main.rs | 88 +++++++++++++++++++++++++++++ 1 file changed, 88 insertions(+) diff --git a/crates/agentic-comm-cli/src/main.rs b/crates/agentic-comm-cli/src/main.rs index 059260d..a2a46f9 100644 --- a/crates/agentic-comm-cli/src/main.rs +++ b/crates/agentic-comm-cli/src/main.rs @@ -99,6 +99,11 @@ enum Commands { #[command(subcommand)] action: RecvAction, }, + /// Chat subcommands (poll, send) + Chat { + #[command(subcommand)] + action: ChatAction, + }, /// Query subcommands (messages, channels, relationships, echoes, conversations) Query { #[command(subcommand)] @@ -390,6 +395,38 @@ enum RecvAction { }, } +// --------------------------------------------------------------------------- +// Chat subcommands +// --------------------------------------------------------------------------- + +#[derive(Subcommand)] +enum ChatAction { + /// Poll messages from a channel in one shot + Poll { + /// Channel ID + #[arg(long)] + channel: u64, + /// Optional Unix timestamp (seconds) lower bound + #[arg(long)] + since: Option, + /// Maximum messages to return + #[arg(long, default_value = "50")] + limit: usize, + }, + /// Send one message to a channel + Send { + /// Channel ID + #[arg(long)] + channel: u64, + /// Message payload/content + #[arg(long)] + message: String, + /// Sender identifier + #[arg(long)] + sender: String, + }, +} + // --------------------------------------------------------------------------- // Query subcommands // --------------------------------------------------------------------------- @@ -1441,6 +1478,57 @@ fn main() { } }, + // ----------------------------------------------------------------- + // Chat subcommands (one-shot poll/send) + // ----------------------------------------------------------------- + Commands::Chat { action } => match action { + ChatAction::Poll { + channel, + since, + limit, + } => { + let mut store = load_or_create(&store_path); + let since_dt = since.and_then(|ts| chrono::DateTime::::from_timestamp(ts, 0)); + match store.receive_messages(channel, None, since_dt) { + Ok(mut msgs) => { + msgs.truncate(limit); + output(&serde_json::to_value(&msgs).unwrap(), json_mode); + } + Err(e) => { + eprintln!("Error: {e}"); + std::process::exit(1); + } + } + } + ChatAction::Send { + channel, + message, + sender, + } => { + let mut store = load_or_create(&store_path); + match store.send_message(channel, &sender, &message, MessageType::Text) { + Ok(msg) => { + output( + &serde_json::json!({ + "status": "sent", + "message_id": msg.id, + "channel_id": msg.channel_id, + "timestamp": msg.timestamp.to_rfc3339(), + }), + json_mode, + ); + if let Err(e) = store.save(&store_path) { + eprintln!("Warning: failed to save store: {e}"); + } + } + Err(e) => { + eprintln!("Error: {e}"); + std::process::exit(1); + } + } + } + }, + // ----------------------------------------------------------------- // Query subcommands // ----------------------------------------------------------------- From cc0f8120fbf639423208b151f56f87e3b7667c38 Mon Sep 17 00:00:00 2001 From: sampurnamo <157491025+sampurnamo@users.noreply.github.com> Date: Fri, 6 Mar 2026 03:38:08 -0500 Subject: [PATCH 11/12] agentic-comm(cli): implement daemon tcp relay --- Cargo.lock | 1 + crates/agentic-comm-cli/Cargo.toml | 1 + crates/agentic-comm-cli/src/main.rs | 242 +++++++++++++++++++++++++--- 3 files changed, 219 insertions(+), 25 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 0888e70..5e0a4a1 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -78,6 +78,7 @@ dependencies = [ "chrono", "clap", "serde_json", + "tokio", ] [[package]] diff --git a/crates/agentic-comm-cli/Cargo.toml b/crates/agentic-comm-cli/Cargo.toml index 20772af..2e19082 100644 --- a/crates/agentic-comm-cli/Cargo.toml +++ b/crates/agentic-comm-cli/Cargo.toml @@ -16,3 +16,4 @@ agentic-comm = { workspace = true } clap = { workspace = true } serde_json = { workspace = true } chrono = { workspace = true } +tokio = { workspace = true } diff --git a/crates/agentic-comm-cli/src/main.rs b/crates/agentic-comm-cli/src/main.rs index a2a46f9..8707c57 100644 --- a/crates/agentic-comm-cli/src/main.rs +++ b/crates/agentic-comm-cli/src/main.rs @@ -1,6 +1,9 @@ //! CLI for agentic-comm: agent communication, channels, pub/sub. -use std::path::PathBuf; +use std::{ + path::{Path, PathBuf}, + time::{Duration, SystemTime}, +}; use agentic_comm::{ AuditEntry, AuditEventType, ChannelConfig, ChannelType, CollectiveDecisionMode, CommStore, @@ -8,6 +11,12 @@ use agentic_comm::{ HiveRole, MessageFilter, MessageType, TemporalTarget, WorkspaceRole, }; use clap::{Parser, Subcommand}; +use tokio::{ + io::AsyncWriteExt, + net::TcpListener, + sync::broadcast, + time::{self, MissedTickBehavior}, +}; /// Default store file path. fn default_store_path() -> PathBuf { @@ -1155,6 +1164,105 @@ fn output(value: &serde_json::Value, json_mode: bool) { } } +struct PidFileGuard { + path: PathBuf, +} + +impl PidFileGuard { + fn new(path: PathBuf) -> Self { + Self { path } + } +} + +impl Drop for PidFileGuard { + fn drop(&mut self) { + let _ = std::fs::remove_file(&self.path); + } +} + +fn store_mtime(path: &Path) -> Option { + std::fs::metadata(path).ok()?.modified().ok() +} + +fn current_max_message_id(path: &Path) -> u64 { + if !path.exists() { + return 0; + } + + CommStore::load(path) + .map(|store| store.messages.keys().copied().max().unwrap_or(0)) + .unwrap_or(0) +} + +fn collect_new_daemon_messages(path: &Path, last_seen_message_id: &mut u64) -> Vec { + if !path.exists() { + return Vec::new(); + } + + let store = match CommStore::load(path) { + Ok(store) => store, + Err(e) => { + eprintln!("Warning: daemon could not load store {}: {e}", path.display()); + return Vec::new(); + } + }; + + let mut messages: Vec<_> = store.messages.values().cloned().collect(); + messages.sort_by_key(|msg| msg.id); + + let mut payloads = Vec::new(); + for msg in messages { + if msg.id <= *last_seen_message_id { + continue; + } + + let payload = serde_json::json!({ + "sender": msg.sender, + "text": msg.content, + "timestamp": msg.timestamp.to_rfc3339(), + "channel": store + .get_channel(msg.channel_id) + .map(|channel| channel.name) + .unwrap_or_else(|| msg.channel_id.to_string()), + "lamport": msg.comm_timestamp.lamport, + }); + + match serde_json::to_string(&payload) { + Ok(json) => payloads.push(json), + Err(e) => eprintln!("Warning: daemon could not serialize message {}: {e}", msg.id), + } + + *last_seen_message_id = msg.id; + } + + payloads +} + +fn pid_is_alive(pid: u32) -> bool { + #[cfg(windows)] + { + std::process::Command::new("tasklist") + .args(["/FI", &format!("PID eq {pid}"), "/FO", "CSV", "/NH"]) + .output() + .map(|output| { + let stdout = String::from_utf8_lossy(&output.stdout); + stdout.contains(&format!("\"{pid}\"")) + }) + .unwrap_or(false) + } + + #[cfg(not(windows))] + { + std::process::Command::new("kill") + .args(["-0", &pid.to_string()]) + .stdout(std::process::Stdio::null()) + .stderr(std::process::Stdio::null()) + .status() + .map(|status| status.success()) + .unwrap_or(false) + } +} + fn main() { let cli = Cli::parse(); let store_path = resolve_store_path(cli.file); @@ -2738,35 +2846,130 @@ fn main() { .parent() .unwrap_or_else(|| std::path::Path::new(".")); let pid_path = data_dir.join("acomm.pid"); + let stop_path = data_dir.join("acomm.stop"); let pid = std::process::id(); // Write PID file if let Err(e) = std::fs::write(&pid_path, pid.to_string()) { eprintln!("Warning: could not write PID file: {e}"); } + let _pid_guard = PidFileGuard::new(pid_path.clone()); + let _ = std::fs::remove_file(&stop_path); - output( - &serde_json::json!({ - "status": "started", - "pid": pid, - "port": port, - "data": data_path.display().to_string(), - "pid_file": pid_path.display().to_string(), - "note": "Daemon stub — exiting immediately (real daemon would loop)", - }), - json_mode, - ); + let runtime = tokio::runtime::Builder::new_current_thread() + .enable_all() + .build() + .unwrap_or_else(|e| { + eprintln!("Error: could not create tokio runtime: {e}"); + std::process::exit(1); + }); + + let daemon_result = runtime.block_on(async { + let listener = TcpListener::bind(format!("0.0.0.0:{port}")).await?; + let (sender, _) = broadcast::channel::(256); + let mut ticker = time::interval(Duration::from_secs(1)); + ticker.set_missed_tick_behavior(MissedTickBehavior::Skip); + let mut shutdown = Box::pin(tokio::signal::ctrl_c()); + let mut last_seen_message_id = current_max_message_id(&data_path); + let mut last_mtime = store_mtime(&data_path); + + output( + &serde_json::json!({ + "status": "listening", + "pid": pid, + "port": port, + "data": data_path.display().to_string(), + "pid_file": pid_path.display().to_string(), + }), + json_mode, + ); + + loop { + tokio::select! { + _ = &mut shutdown => { + break; + } + _ = ticker.tick() => { + if stop_path.exists() { + let _ = std::fs::remove_file(&stop_path); + break; + } + + let current_mtime = store_mtime(&data_path); + if current_mtime != last_mtime { + last_mtime = current_mtime; + for message in collect_new_daemon_messages(&data_path, &mut last_seen_message_id) { + let _ = sender.send(message); + } + } + } + accept_result = listener.accept() => { + let (mut stream, _) = accept_result?; + let mut receiver = sender.subscribe(); + tokio::spawn(async move { + while let Ok(message) = receiver.recv().await { + if stream.write_all(message.as_bytes()).await.is_err() { + break; + } + if stream.write_all(b"\n").await.is_err() { + break; + } + } + }); + } + } + } + + Ok::<(), Box>(()) + }); + + if let Err(e) = daemon_result { + eprintln!("Error: daemon failed: {e}"); + std::process::exit(1); + } } DaemonAction::Stop => { let data_dir = store_path .parent() .unwrap_or_else(|| std::path::Path::new(".")); let pid_path = data_dir.join("acomm.pid"); + let stop_path = data_dir.join("acomm.stop"); if pid_path.exists() { match std::fs::read_to_string(&pid_path) { Ok(pid_str) => { let pid_str = pid_str.trim(); + let pid = pid_str.parse::().ok(); + if let Err(e) = std::fs::write(&stop_path, "stop\n") { + eprintln!("Warning: could not write stop file: {e}"); + } + + for _ in 0..10 { + if !pid_path.exists() { + output( + &serde_json::json!({ + "status": "stopped", + "pid": pid_str, + }), + json_mode, + ); + return; + } + std::thread::sleep(Duration::from_millis(500)); + } + + if let Some(pid) = pid { + #[cfg(windows)] + let _ = std::process::Command::new("taskkill") + .args(["/PID", &pid.to_string(), "/T", "/F"]) + .status(); + + #[cfg(not(windows))] + let _ = std::process::Command::new("kill") + .arg(pid.to_string()) + .status(); + } + output( &serde_json::json!({ "status": "stopping", @@ -2774,9 +2977,6 @@ fn main() { }), json_mode, ); - if let Err(e) = std::fs::remove_file(&pid_path) { - eprintln!("Warning: could not remove PID file: {e}"); - } } Err(e) => { eprintln!("Error reading PID file: {e}"); @@ -2957,16 +3157,7 @@ fn main() { let pid_alive = pid_str .parse::() .ok() - .map(|pid| { - // Check process existence via kill -0 on Unix - std::process::Command::new("kill") - .args(["-0", &pid.to_string()]) - .stdout(std::process::Stdio::null()) - .stderr(std::process::Stdio::null()) - .status() - .map(|s| s.success()) - .unwrap_or(false) - }) + .map(pid_is_alive) .unwrap_or(false); // Read PID file modification time as a proxy for start time @@ -3395,3 +3586,4 @@ fn main() { } } } + From 49e3b72ffd2f8e0935afe78c51f270c6b2e94e9a Mon Sep 17 00:00:00 2001 From: sampurnamo <157491025+sampurnamo@users.noreply.github.com> Date: Sat, 7 Mar 2026 01:59:45 -0500 Subject: [PATCH 12/12] fix(cli): seed daemon relay backlog on connect --- crates/agentic-comm-cli/src/main.rs | 70 ++++++++++++++++++++++++----- 1 file changed, 58 insertions(+), 12 deletions(-) diff --git a/crates/agentic-comm-cli/src/main.rs b/crates/agentic-comm-cli/src/main.rs index 8707c57..eed8793 100644 --- a/crates/agentic-comm-cli/src/main.rs +++ b/crates/agentic-comm-cli/src/main.rs @@ -1216,18 +1216,7 @@ fn collect_new_daemon_messages(path: &Path, last_seen_message_id: &mut u64) -> V continue; } - let payload = serde_json::json!({ - "sender": msg.sender, - "text": msg.content, - "timestamp": msg.timestamp.to_rfc3339(), - "channel": store - .get_channel(msg.channel_id) - .map(|channel| channel.name) - .unwrap_or_else(|| msg.channel_id.to_string()), - "lamport": msg.comm_timestamp.lamport, - }); - - match serde_json::to_string(&payload) { + match serialize_daemon_message(&store, &msg) { Ok(json) => payloads.push(json), Err(e) => eprintln!("Warning: daemon could not serialize message {}: {e}", msg.id), } @@ -1238,6 +1227,54 @@ fn collect_new_daemon_messages(path: &Path, last_seen_message_id: &mut u64) -> V payloads } +fn collect_recent_daemon_messages(path: &Path, limit: usize) -> Vec { + if !path.exists() { + return Vec::new(); + } + + let store = match CommStore::load(path) { + Ok(store) => store, + Err(e) => { + eprintln!("Warning: daemon could not load store {}: {e}", path.display()); + return Vec::new(); + } + }; + + let mut messages: Vec<_> = store.messages.values().cloned().collect(); + messages.sort_by_key(|msg| msg.id); + + let start = messages.len().saturating_sub(limit); + messages + .into_iter() + .skip(start) + .filter_map(|msg| match serialize_daemon_message(&store, &msg) { + Ok(json) => Some(json), + Err(e) => { + eprintln!("Warning: daemon could not serialize message {}: {e}", msg.id); + None + } + }) + .collect() +} + +fn serialize_daemon_message( + store: &CommStore, + msg: &agentic_comm::Message, +) -> Result { + let payload = serde_json::json!({ + "sender": msg.sender, + "text": msg.content, + "timestamp": msg.timestamp.to_rfc3339(), + "channel": store + .get_channel(msg.channel_id) + .map(|channel| channel.name) + .unwrap_or_else(|| msg.channel_id.to_string()), + "lamport": msg.comm_timestamp.lamport, + }); + + serde_json::to_string(&payload) +} + fn pid_is_alive(pid: u32) -> bool { #[cfg(windows)] { @@ -2906,7 +2943,16 @@ fn main() { accept_result = listener.accept() => { let (mut stream, _) = accept_result?; let mut receiver = sender.subscribe(); + let recent_messages = collect_recent_daemon_messages(&data_path, 50); tokio::spawn(async move { + for message in recent_messages { + if stream.write_all(message.as_bytes()).await.is_err() { + return; + } + if stream.write_all(b"\n").await.is_err() { + return; + } + } while let Ok(message) = receiver.recv().await { if stream.write_all(message.as_bytes()).await.is_err() { break;