Skip to content

feat(cli): add provider-agnostic channel bridge core#109

Draft
cdenneen wants to merge 1 commit intohappier-dev:devfrom
cdenneen:feat/channel-bridge-core
Draft

feat(cli): add provider-agnostic channel bridge core#109
cdenneen wants to merge 1 commit intohappier-dev:devfrom
cdenneen:feat/channel-bridge-core

Conversation

@cdenneen
Copy link

@cdenneen cdenneen commented Mar 4, 2026

Summary

This PR introduces a provider-agnostic channel bridge core for bi-directional session messaging.

It adds the reusable bridge worker and adapter contract, but intentionally does not include any provider-
specific adapter wiring (Telegram/Discord/etc). This keeps the core isolated and reviewable.

What’s Included

  • New core bridge worker:
    • apps/cli/src/channels/core/channelBridgeWorker.ts
  • Core contracts and runtime behavior:
    • channel adapter interface (pullInboundMessages, sendMessage, optional stop)
    • conversation/session binding model
    • in-memory binding store
    • bridge tick execution loop
  • Command routing in core flow:
    • /sessions
    • /attach <session-id-or-prefix>
    • /session
    • /detach
  • Message forwarding behavior:
    • inbound channel messages -> mapped Happier session
    • agent messages -> mapped channel conversation
    • per-binding cursor advancement (lastForwardedSeq)

Tests

  • Added focused tests for core behavior:
    • apps/cli/src/channels/core/channelBridgeWorker.test.ts
  • Covered scenarios:
    • attach + inbound forwarding
    • sessions listing + detach flow
    • outbound agent forwarding + cursor advancement

Out of Scope (Intentional)

  • Provider adapter implementation (Telegram, Discord, Slack, WhatsApp, etc)
  • Daemon wiring / runtime bootstrap for any provider
  • Bridge CLI config surfaces
  • Webhook/polling provider transport details

These are added in the follow-up stacked PR.

Summary by CodeRabbit

  • New Features

    • Provider-agnostic channel bridge: attach/detach sessions, list sessions, send/receive messages via slash commands; in-memory binding store with per-binding cursors, inbound deduplication, monotonic cursor advancement, partial-failure tolerance, startup hints, and graceful/idempotent start/stop.
  • Tests

    • Large test suite covering attach/sessions/detach flows, command handling, bidirectional forwarding, deduplication, adapter failure resilience, authorization and identity warnings, cursor persistence, and lifecycle behavior.
  • Documentation

    • Added channel-bridge overview documenting runtime model, adapter contract, and tick-loop behavior.

@coderabbitai
Copy link

coderabbitai bot commented Mar 4, 2026

Note

Reviews paused

It looks like this branch is under active development. To avoid overwhelming you with review comments due to an influx of new commits, CodeRabbit has automatically paused this review. You can configure this behavior by changing the reviews.auto_review.auto_pause_after_reviewed_commits setting.

Use the following commands to manage reviews:

  • @coderabbitai resume to resume automatic reviews.
  • @coderabbitai review to trigger a single review.

Use the checkboxes below for quick actions:

  • ▶️ Resume reviews
  • 🔍 Trigger review

Walkthrough

Adds a new provider-agnostic Channel Bridge worker and an extensive test suite plus docs. The worker maps external channel conversations to CLI sessions, handles slash commands, forwards inbound messages to sessions, relays agent replies to channels, persists cursors, deduplicates inbound messages, and runs via a tick-driven lifecycle with graceful shutdown.

Changes

Cohort / File(s) Summary
Channel Bridge Worker
apps/cli/src/channels/core/channelBridgeWorker.ts
New implementation: types/interfaces (conversation refs, inbound messages, adapters, bindings, deps, store), in-memory binding store, inbound deduper, slash-command parsing/handlers, core tick logic (executeChannelBridgeTick), start/stop/trigger worker lifecycle, cursor persistence, deduplication, warning hooks, and adapter lifecycle handling.
Worker Tests
apps/cli/src/channels/core/channelBridgeWorker.test.ts
Large, exhaustive test suite and in-test harnesses: adapter & deps mocks, in-memory binding store usage, deferred/wait helpers. Covers attach/sessions/detach flows, command auth/handling, forwarding user→session and agent→channel messages, cursor advancement, identity mismatches, adapter failure resilience, inbound dedup across ticks, startup behavior, and robust/idempotent shutdown.
Documentation
docs/channel-bridge.md, docs/README.md
Adds channel-bridge documentation and README index entry describing architecture, adapter contract, responsibilities (inbound reception, command parsing, binding management, routing, cursor tracking, forwarding), tick-loop behavior, and scope/next steps.

Sequence Diagram(s)

sequenceDiagram
    participant Adapter as Channel Adapter
    participant Worker as Channel Bridge Worker
    participant Store as Binding Store
    participant Deps as CLI Session Deps

    loop Periodic Tick
        Adapter->>Worker: pullInboundMessages()
        Worker->>Worker: parseSlashCommand / classify message
        alt Slash command
            Worker->>Deps: resolveSessionIdOrPrefix / listSessions
            Worker->>Store: upsertBinding / removeBinding / listBindings
            Worker->>Adapter: sendMessage (UI hint)
        else User message (bound)
            Worker->>Store: getBinding(ref)
            Worker->>Deps: sendUserMessageToSession(sessionId, message)
        else User message (unbound)
            Worker->>Adapter: sendMessage (no-binding hint)
        end

        Worker->>Store: listBindings()
        loop For each binding
            Worker->>Deps: resolveLatestSessionSeq(sessionId)
            Worker->>Deps: fetchAgentMessagesAfterSeq(sessionId, afterSeq)
            Deps-->>Worker: agent messages
            Worker->>Adapter: sendMessage (agent reply)
            Worker->>Store: updateLastForwardedSeq(ref, newSeq)
        end
    end
Loading

Estimated code review effort

🎯 4 (Complex) | ⏱️ ~45 minutes

🚥 Pre-merge checks | ✅ 2 | ❌ 1

❌ Failed checks (1 warning)

Check name Status Explanation Resolution
Docstring Coverage ⚠️ Warning Docstring coverage is 33.33% which is insufficient. The required threshold is 80.00%. Write docstrings for the functions missing them to satisfy the coverage threshold.
✅ Passed checks (2 passed)
Check name Status Explanation
Title check ✅ Passed The title 'feat(cli): add provider-agnostic channel bridge core' accurately and concisely describes the main feature being added—a provider-agnostic channel bridge core for the CLI.
Description check ✅ Passed The description comprehensively covers the template requirements: Summary (what the PR changes), What's Included (detailed scope), Tests (test coverage), and Out of Scope sections. It explains the problem, solution, and intentional exclusions clearly.

✏️ Tip: You can configure your own custom pre-merge checks in the settings.

✨ Finishing Touches
🧪 Generate unit tests (beta)
  • Create PR with unit tests
  • Post copyable unit tests in a comment

Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share

Comment @coderabbitai help to get the list of available commands and usage tips.

@greptile-apps
Copy link
Contributor

greptile-apps bot commented Mar 4, 2026

Greptile Summary

This PR introduces the provider-agnostic channel bridge core — a single-file worker (channelBridgeWorker.ts) plus an extensive test suite — that maps external channel conversations to Happier sessions via a pluggable adapter contract. The implementation is carefully factored: command handling, inbound deduplication, outbound cursor advancement, and shutdown sequencing are all cleanly separated. It has gone through multiple prior review rounds and the critical issues (stop/tick races, cursor re-delivery, separator collisions, duplicate adapters, orphaned bindings, etc.) have all been addressed.

Two minor observability gaps remain:

  • The warning emitted when sendUserMessageToSession fails (line 578) lacks session and conversation identifiers, making it harder to diagnose in multi-binding deployments. The symmetric outbound warning was already updated to include this context.
  • The orphaned-binding warning (no adapter registered for a stored binding) fires on every tick with no throttling, which can produce high-frequency log noise in production when an adapter is removed while bindings remain in the store.

Confidence Score: 4/5

  • Safe to merge; no logic errors or regressions found. The two remaining findings are style-level observability improvements.
  • The core implementation is solid: type contracts are clear, shutdown sequencing is correct, cursor advancement is incremental and monotonic, deduplication is properly isolated per worker, and the test suite is comprehensive. The two verified comments are style-level observability improvements (missing context in one warning message, unbounded warning frequency for orphaned bindings), neither of which affects correctness or safety.
  • apps/cli/src/channels/core/channelBridgeWorker.ts — two minor warning-message improvements noted; no correctness issues.

Sequence Diagram

sequenceDiagram
    participant A as ChannelAdapter
    participant W as ChannelBridgeWorker
    participant S as ChannelBindingStore
    participant D as ChannelBridgeDeps

    W->>A: pullInboundMessages()
    A-->>W: messages

    loop per inbound message
        W->>W: deduper.isDuplicate(message)
        alt slash command
            W->>D: authorizeCommand(commandName, actor)
            W->>S: getBinding / upsertBinding / removeBinding
            W->>A: sendMessage(reply)
        else regular text with binding
            W->>S: getBinding(ref)
            W->>D: sendUserMessageToSession(sessionId, text)
        else regular text without binding
            W->>A: sendMessage("No session attached")
        end
    end

    W->>S: listBindings()
    S-->>W: bindings

    loop per binding
        W->>D: fetchAgentMessagesAfterSeq(sessionId, cursor)
        D-->>W: rows sorted by seq
        loop per row
            W->>A: sendMessage(text)
            W->>S: updateLastForwardedSeq(binding, seq)
        end
    end
Loading

Last reviewed commit: 9e9b0d8

Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 4

🧹 Nitpick comments (2)
apps/cli/src/channels/core/channelBridgeWorker.test.ts (1)

41-203: Add lifecycle coverage for startChannelBridgeWorker (trigger and stop).

executeChannelBridgeTick is covered well, but this PR also introduces a public worker lifecycle API. Add tests for initial trigger behavior, stop idempotency, and adapter stop() propagation.

As per coding guidelines: "Write unit tests for all public functions and components".

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@apps/cli/src/channels/core/channelBridgeWorker.test.ts` around lines 41 -
203, Add unit tests for the new public lifecycle API startChannelBridgeWorker:
write a test that calls startChannelBridgeWorker(...) then calls trigger() once
to ensure the worker runs its first tick (you can assert by spying on
executeChannelBridgeTick or deps called), a test that calls stop() twice to
verify idempotency, and a test that ensures adapter.stop() is called when stop()
is invoked (spy on the adapter harness' stop method). Use the existing
createAdapterHarness(), in-memory store, and deps mocks from the file to locate
where to hook spies and assertions; reference startChannelBridgeWorker, trigger,
stop, and adapter.stop to find the right symbols to exercise and assert.
apps/cli/src/channels/core/channelBridgeWorker.ts (1)

1-368: Add module and exported API JSDoc comments.

This file defines public bridge contracts and complex flow control, but there’s no file header/API JSDoc. Please document responsibilities, expected adapter behavior, and cursor semantics to reduce integration errors.

As per coding guidelines: "Include comprehensive JSDoc comments as file header comments explaining file responsibilities" and "Add JSDoc comments for public APIs and complex logic".

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@apps/cli/src/channels/core/channelBridgeWorker.ts` around lines 1 - 368, Add
a file-level JSDoc header describing the module responsibility (bridging channel
adapters to sessions, command handling, and agent->channel cursor semantics) and
add JSDoc comments for each public exported type/function: ChannelBridgeAdapter
(describe pullInboundMessages/sendMessage/optional stop contract and expected
error handling), ChannelBindingStore and ChannelSessionBinding (explain
lastForwardedSeq meaning and that updateLastForwardedSeq stores the highest seq
forwarded), ResolveSessionIdResult, createInMemoryChannelBindingStore (note
now() usage and normalization of seqs), executeChannelBridgeTick (document flow:
inbound pull -> commands -> forward user messages -> fetchAgentMessagesAfterSeq
where afterSeq is exclusive and returned messages should have seq > afterSeq),
and startChannelBridgeWorker (document tickMs bounds, SingleFlightIntervalLoop
usage and trigger/stop behavior); ensure comments mention expected behavior of
deps methods (resolveLatestSessionSeq, fetchAgentMessagesAfterSeq) and
error/warning callbacks so integrators know contracts and cursor semantics.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Inline comments:
In `@apps/cli/src/channels/core/channelBridgeWorker.test.ts`:
- Around line 46-58: The test currently uses vi.fn() mocks for ChannelBridgeDeps
(listSessions, resolveSessionIdOrPrefix, fetchAgentMessagesAfterSeq, etc.);
replace these mocked spies with lightweight real/test implementations of the
ChannelBridgeDeps interface (for example in-memory or fixture implementations
that implement listSessions, resolveSessionIdOrPrefix, sendUserMessageToSession,
resolveLatestSessionSeq, fetchAgentMessagesAfterSeq, onWarning) so the test
exercises actual behavior instead of mocks, and update assertions to verify real
outcomes; additionally add one or more tests that call the public
startChannelBridgeWorker function (using the same real/test ChannelBridgeDeps)
to cover its lifecycle and observable side-effects (session resolution, message
fetching/sending, and warnings) rather than only testing internal helpers.

In `@apps/cli/src/channels/core/channelBridgeWorker.ts`:
- Around line 247-253: The handler currently trusts event.providerId from
inbound payload which can allow spoofing; normalize and override provider
identity with the adapter's trusted providerId before passing to command
handling and forwarding. Update the call to handleCommand in channelBridgeWorker
to pass event.payload/providerId (or any field named providerId) set to
adapter.providerId, and ensure any later forwarding logic in the same function
(the block around lines 257-279) also replaces uses of event.providerId with
adapter.providerId so routing/binding uses adapter.providerId throughout;
specifically adjust where handleCommand is invoked and where outbound send/bind
logic reads event.providerId.
- Around line 206-216: The current logic uses
deps.resolveLatestSessionSeq(...).catch(() => 0) which can persist a binding
with lastForwardedSeq=0 and replay full history; change the flow in the attach
path so you do not upsertBinding or confirm the attach when seq resolution
fails: call resolveLatestSessionSeq(resolved.sessionId) and if it throws or
returns an invalid cursor, send a failure reply via replyToConversation(adapter,
ref, ...) and return false (or abort) instead of calling store.upsertBinding;
only persist the binding and send the success message after a successful seq
resolution. Ensure you update the code paths around resolveLatestSessionSeq,
store.upsertBinding, and replyToConversation so the binding is only created when
a valid latestSeq is obtained.
- Around line 242-255: The current loop over params.adapters will abort the
whole tick if adapter.pullInboundMessages() or handleCommand() throws; wrap the
call to adapter.pullInboundMessages() in a try/catch so a failing adapter is
logged and the loop continues to the next adapter, and inside the inbound-events
loop wrap handleCommand(...) (and parseSlashCommand if you prefer) in a
try/catch so an error for one event is logged and skipped without stopping
processing of remaining events/adapters; reference the existing symbols:
params.adapters, adapter.pullInboundMessages(), parseSlashCommand(),
handleCommand({ command, event, adapter, store: params.store, deps: params.deps
}) and use the existing logger (e.g. params.deps.logger.error) or console.error
to record the adapter/event and error details.

---

Nitpick comments:
In `@apps/cli/src/channels/core/channelBridgeWorker.test.ts`:
- Around line 41-203: Add unit tests for the new public lifecycle API
startChannelBridgeWorker: write a test that calls startChannelBridgeWorker(...)
then calls trigger() once to ensure the worker runs its first tick (you can
assert by spying on executeChannelBridgeTick or deps called), a test that calls
stop() twice to verify idempotency, and a test that ensures adapter.stop() is
called when stop() is invoked (spy on the adapter harness' stop method). Use the
existing createAdapterHarness(), in-memory store, and deps mocks from the file
to locate where to hook spies and assertions; reference
startChannelBridgeWorker, trigger, stop, and adapter.stop to find the right
symbols to exercise and assert.

In `@apps/cli/src/channels/core/channelBridgeWorker.ts`:
- Around line 1-368: Add a file-level JSDoc header describing the module
responsibility (bridging channel adapters to sessions, command handling, and
agent->channel cursor semantics) and add JSDoc comments for each public exported
type/function: ChannelBridgeAdapter (describe
pullInboundMessages/sendMessage/optional stop contract and expected error
handling), ChannelBindingStore and ChannelSessionBinding (explain
lastForwardedSeq meaning and that updateLastForwardedSeq stores the highest seq
forwarded), ResolveSessionIdResult, createInMemoryChannelBindingStore (note
now() usage and normalization of seqs), executeChannelBridgeTick (document flow:
inbound pull -> commands -> forward user messages -> fetchAgentMessagesAfterSeq
where afterSeq is exclusive and returned messages should have seq > afterSeq),
and startChannelBridgeWorker (document tickMs bounds, SingleFlightIntervalLoop
usage and trigger/stop behavior); ensure comments mention expected behavior of
deps methods (resolveLatestSessionSeq, fetchAgentMessagesAfterSeq) and
error/warning callbacks so integrators know contracts and cursor semantics.

ℹ️ Review info

Configuration used: Path: .coderabbit.yaml

Review profile: CHILL

Plan: Pro

Run ID: 135e570b-02cd-4832-9f9d-772aa7c7e57f

📥 Commits

Reviewing files that changed from the base of the PR and between f22f06a and cc1706f.

📒 Files selected for processing (2)
  • apps/cli/src/channels/core/channelBridgeWorker.test.ts
  • apps/cli/src/channels/core/channelBridgeWorker.ts

@cdenneen
Copy link
Author

cdenneen commented Mar 4, 2026

@coderabbitai please re-review this PR now that all bot threads have explicit resolution notes and follow-up hardening is complete.

@greptile-apps please run a fresh pass as well.

@coderabbitai
Copy link

coderabbitai bot commented Mar 4, 2026

@cdenneen: Understood! I'll kick off a fresh full review pass of PR #109 now that the previous threads have been addressed and the follow-up hardening is in place.

✅ Actions performed

Full review triggered.

Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Inline comments:
In `@apps/cli/src/channels/core/channelBridgeWorker.ts`:
- Around line 565-570: The current Promise.all over params.adapters can abort if
any adapter.stop() throws; update the shutdown mapping in channelBridgeWorker.ts
to wrap each adapter.stop() call in a try/catch so every adapter.stop() is
attempted even if one fails, log the specific error (including adapter
identifier if available) inside the catch, and return a resolved value so
Promise.all waits for all wrappered stops to complete rather than rejecting on
the first error.

ℹ️ Review info
⚙️ Run configuration

Configuration used: Path: .coderabbit.yaml

Review profile: CHILL

Plan: Pro

Run ID: 05bcf020-8e3c-440d-9b84-d2c5072a6b1c

📥 Commits

Reviewing files that changed from the base of the PR and between f22f06a and c8e1706.

📒 Files selected for processing (2)
  • apps/cli/src/channels/core/channelBridgeWorker.test.ts
  • apps/cli/src/channels/core/channelBridgeWorker.ts

@cdenneen
Copy link
Author

cdenneen commented Mar 4, 2026

Follow-up clarity pass — here are exact fix locations for the resolved bot items (commit c8e1706f):

I added this map so reviewers can verify each previously-resolved thread against concrete code anchors quickly.

coderabbitai[bot]
coderabbitai bot previously approved these changes Mar 4, 2026
@cdenneen
Copy link
Author

cdenneen commented Mar 4, 2026

@coderabbitai @greptile-apps follow-up fixes were pushed (commit 3a5d30a) for the latest unresolved threads; please re-review.

@coderabbitai
Copy link

coderabbitai bot commented Mar 4, 2026

@cdenneen: Acknowledged! I'll kick off a fresh full review pass of PR #109 to cover the follow-up fixes pushed in commit 3a5d30a2.

✅ Actions performed

Full review triggered.

coderabbitai[bot]
coderabbitai bot previously approved these changes Mar 4, 2026
@cdenneen
Copy link
Author

cdenneen commented Mar 4, 2026

@coderabbitai @greptile-apps re-review

@coderabbitai
Copy link

coderabbitai bot commented Mar 4, 2026

@cdenneen: Acknowledged! Kicking off a fresh full review of PR #109 to cover the latest follow-up fixes.

✅ Actions performed

Full review triggered.

@cdenneen
Copy link
Author

cdenneen commented Mar 4, 2026

@greptile review

cdenneen added a commit to cdenneen/happier-dev that referenced this pull request Mar 4, 2026
Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 2

🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Inline comments:
In `@apps/cli/src/channels/core/channelBridgeWorker.test.ts`:
- Around line 700-711: Wrap each test that starts a channel worker with a
try/finally so worker.stop() always runs: for tests that call
startChannelBridgeWorker(...) and then await waitFor(...) / assert on
harness.sent, move assertions into the try block and call await worker.stop() in
the finally block to ensure timers and in-flight work are cleaned up (apply the
same pattern to other tests that startChannelBridgeWorker, reference the worker
variable and its stop() method in the finally).
- Around line 67-73: The test's dedupe verification is passing prematurely
because after calling worker.trigger() the first tick already makes
sentToSession.length === 1 true; update the assertions in createAdapterHarness
tests (and the other occurrences at the noted sites) to deterministically wait
for the second tick before asserting dedupe behavior: after the first
worker.trigger() await the next processing cycle (e.g., call worker.trigger() a
second time or await a Promise that resolves when sentToSession.length >= 2) and
then assert that the duplicate message was ignored (sentToSession length remains
1) or that a dedupe-specific indicator was emitted; reference the harness
functions createAdapterHarness, pushInbound, sent (or sentToSession) and
worker.trigger() when updating each failing assertion to ensure the test
verifies the second tick outcome.

ℹ️ Review info
⚙️ Run configuration

Configuration used: Path: .coderabbit.yaml

Review profile: CHILL

Plan: Pro

Run ID: b7334bcb-a690-4b9f-8698-5f4f931f4799

📥 Commits

Reviewing files that changed from the base of the PR and between c8e1706 and 51bb9ea.

📒 Files selected for processing (2)
  • apps/cli/src/channels/core/channelBridgeWorker.test.ts
  • apps/cli/src/channels/core/channelBridgeWorker.ts
✅ Files skipped from review due to trivial changes (1)
  • apps/cli/src/channels/core/channelBridgeWorker.ts

cdenneen added a commit to cdenneen/happier-dev that referenced this pull request Mar 4, 2026
@cdenneen
Copy link
Author

cdenneen commented Mar 4, 2026

Addressed remaining feedback in commit 7fb5b65 (test determinism + lifecycle cleanup guards in channel bridge worker tests).

@coderabbitai review
@greptile review

@coderabbitai
Copy link

coderabbitai bot commented Mar 4, 2026

@cdenneen: Acknowledged! Kicking off a fresh full review of PR #109 to cover the test determinism and lifecycle cleanup guard fixes pushed in commit 7fb5b653.

✅ Actions performed

Full review triggered.

@coderabbitai
Copy link

coderabbitai bot commented Mar 4, 2026

@cdenneen: Acknowledged! Kicking off a fresh full review of PR #109 to cover the test determinism and lifecycle cleanup guard fixes in commit 7fb5b653.

✅ Actions performed

Full review triggered.

@cdenneen
Copy link
Author

cdenneen commented Mar 4, 2026

@greptile review

@cdenneen
Copy link
Author

cdenneen commented Mar 4, 2026

@coderabbitai review

coderabbitai[bot]
coderabbitai bot previously approved these changes Mar 4, 2026
@cdenneen
Copy link
Author

cdenneen commented Mar 4, 2026

@greptile review

@coderabbitai
Copy link

coderabbitai bot commented Mar 4, 2026

✅ Actions performed

Review triggered.

Note: CodeRabbit is an incremental review system and does not re-review already reviewed commits. This command is applicable only when automatic reviews are paused.

@cdenneen
Copy link
Author

cdenneen commented Mar 4, 2026

@coderabbitai review

@cdenneen
Copy link
Author

cdenneen commented Mar 4, 2026

@greptile review

@coderabbitai
Copy link

coderabbitai bot commented Mar 4, 2026

✅ Actions performed

Review triggered.

Note: CodeRabbit is an incremental review system and does not re-review already reviewed commits. This command is applicable only when automatic reviews are paused.

coderabbitai[bot]
coderabbitai bot previously approved these changes Mar 4, 2026
Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

♻️ Duplicate comments (1)
apps/cli/src/channels/core/channelBridgeWorker.ts (1)

210-230: ⚠️ Potential issue | 🟠 Major

Normalize in-memory cursor values with toNonNegativeInt to prevent NaN cursor corruption.

Line 218 and Line 229 can still persist/propagate non-finite cursor values (NaN), which breaks monotonic cursor semantics and downstream afterSeq comparisons.

🛠️ Proposed fix
     upsertBinding: async (binding) => {
       const key = bindingKey(binding);
       const existing = byKey.get(key);
+      const normalizedLastForwardedSeq = toNonNegativeInt(binding.lastForwardedSeq) ?? 0;
       const next: ChannelSessionBinding = {
         providerId: binding.providerId,
         conversationId: binding.conversationId,
         threadId: binding.threadId,
         sessionId: binding.sessionId,
-        lastForwardedSeq: Math.max(0, Math.trunc(binding.lastForwardedSeq)),
+        lastForwardedSeq: normalizedLastForwardedSeq,
         createdAtMs: existing?.createdAtMs ?? now(),
         updatedAtMs: now(),
       };
       byKey.set(key, next);
       return next;
     },
     updateLastForwardedSeq: async (ref, seq) => {
       const key = bindingKey(ref);
       const existing = byKey.get(key);
       if (!existing) return;
-      const nextSeq = Math.max(existing.lastForwardedSeq, Math.max(0, Math.trunc(seq)));
+      const parsedSeq = toNonNegativeInt(seq);
+      if (parsedSeq === null) return;
+      const nextSeq = Math.max(existing.lastForwardedSeq, parsedSeq);
       byKey.set(key, {
         ...existing,
         lastForwardedSeq: nextSeq,
         updatedAtMs: now(),
       });
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@apps/cli/src/channels/core/channelBridgeWorker.ts` around lines 210 - 230,
The upsertBinding and updateLastForwardedSeq implementations can accept
NaN/Infinity and corrupt in-memory cursors; replace the Math.max(0,
Math.trunc(...)) sanitization with a call to toNonNegativeInt for both
binding.lastForwardedSeq (in upsertBinding when building next.lastForwardedSeq)
and seq and existing.lastForwardedSeq (in updateLastForwardedSeq when computing
nextSeq) so you always store a finite non-negative integer; update uses of
existing?.lastForwardedSeq and computed nextSeq to pass through toNonNegativeInt
before persisting.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Duplicate comments:
In `@apps/cli/src/channels/core/channelBridgeWorker.ts`:
- Around line 210-230: The upsertBinding and updateLastForwardedSeq
implementations can accept NaN/Infinity and corrupt in-memory cursors; replace
the Math.max(0, Math.trunc(...)) sanitization with a call to toNonNegativeInt
for both binding.lastForwardedSeq (in upsertBinding when building
next.lastForwardedSeq) and seq and existing.lastForwardedSeq (in
updateLastForwardedSeq when computing nextSeq) so you always store a finite
non-negative integer; update uses of existing?.lastForwardedSeq and computed
nextSeq to pass through toNonNegativeInt before persisting.

ℹ️ Review info
⚙️ Run configuration

Configuration used: Path: .coderabbit.yaml

Review profile: CHILL

Plan: Pro

Run ID: 4947242d-a9cc-4087-903d-96d0f652adb2

📥 Commits

Reviewing files that changed from the base of the PR and between 4159227 and a83c471.

📒 Files selected for processing (2)
  • apps/cli/src/channels/core/channelBridgeWorker.test.ts
  • apps/cli/src/channels/core/channelBridgeWorker.ts

@cdenneen cdenneen marked this pull request as draft March 4, 2026 22:47
@cdenneen cdenneen force-pushed the feat/channel-bridge-core branch from a83c471 to 6994e4a Compare March 4, 2026 22:49
@cdenneen cdenneen force-pushed the feat/channel-bridge-core branch from 6994e4a to 7339299 Compare March 4, 2026 22:49
coderabbitai[bot]
coderabbitai bot previously approved these changes Mar 4, 2026
@cdenneen
Copy link
Author

cdenneen commented Mar 4, 2026

@coderabbitai review

@cdenneen
Copy link
Author

cdenneen commented Mar 4, 2026

@greptile review

@coderabbitai
Copy link

coderabbitai bot commented Mar 4, 2026

✅ Actions performed

Review triggered.

Note: CodeRabbit is an incremental review system and does not re-review already reviewed commits. This command is applicable only when automatic reviews are paused.

@cdenneen
Copy link
Author

cdenneen commented Mar 4, 2026

@coderabbitai review

@cdenneen
Copy link
Author

cdenneen commented Mar 4, 2026

@greptile review

@coderabbitai
Copy link

coderabbitai bot commented Mar 4, 2026

✅ Actions performed

Review triggered.

Note: CodeRabbit is an incremental review system and does not re-review already reviewed commits. This command is applicable only when automatic reviews are paused.

threadId: event.threadId,
});
} catch (error) {
params.deps.onWarning?.('Failed to forward channel message into session', error);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Inbound failure warning lacks identifying context.

'Failed to forward channel message into session' is emitted with no mention of which session, provider, or conversation was affected. In a multi-binding deployment this warning is hard to correlate to a specific conversation. The outbound counterpart at line 648 includes session=, provider=, and conversation= identifiers — the same treatment would make this warning equally actionable:

Suggested change
params.deps.onWarning?.('Failed to forward channel message into session', error);
params.deps.onWarning?.(`Failed to forward channel message into session ${binding.sessionId} (provider=${event.providerId} conversation=${event.conversationId})`, error);

Comment on lines +595 to +597
params.deps.onWarning?.(
`No adapter registered for binding providerId=${binding.providerId} conversationId=${binding.conversationId}; skipping outbound forwarding`,
);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Orphaned-binding warning fires on every tick without throttling.

When a binding exists for a provider with no registered adapter, this warning fires on every tick (default 2,500 ms). With even a handful of orphaned bindings — a natural operational state after an adapter is removed while its bindings remain in the store — the warning stream becomes a continuous flood with no signal about whether the situation is new or ongoing.

Consider tracking which orphaned bindings have already been warned within the current worker run (e.g. a Set maintained alongside the deduper in startChannelBridgeWorker and threaded into executeChannelBridgeTick), or documenting this as known behaviour so operators know to filter repeated messages.

Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

♻️ Duplicate comments (1)
apps/cli/src/channels/core/channelBridgeWorker.ts (1)

210-230: ⚠️ Potential issue | 🟠 Major

Prevent NaN cursor persistence in in-memory binding store.

Line 218 and Line 229 can still persist NaN (Math.trunc(NaN) => NaN), which breaks cursor monotonicity and downstream afterSeq comparisons. Use toNonNegativeInt(...) here like the outbound path does.

🛠️ Proposed fix
     upsertBinding: async (binding) => {
       const key = bindingKey(binding);
       const existing = byKey.get(key);
+      const normalizedLastForwardedSeq = toNonNegativeInt(binding.lastForwardedSeq) ?? 0;
       const next: ChannelSessionBinding = {
         providerId: binding.providerId,
         conversationId: binding.conversationId,
         threadId: binding.threadId,
         sessionId: binding.sessionId,
-        lastForwardedSeq: Math.max(0, Math.trunc(binding.lastForwardedSeq)),
+        lastForwardedSeq: normalizedLastForwardedSeq,
         createdAtMs: existing?.createdAtMs ?? now(),
         updatedAtMs: now(),
       };
       byKey.set(key, next);
       return next;
     },
     updateLastForwardedSeq: async (ref, seq) => {
       const key = bindingKey(ref);
       const existing = byKey.get(key);
       if (!existing) return;
-      const nextSeq = Math.max(existing.lastForwardedSeq, Math.max(0, Math.trunc(seq)));
+      const parsedSeq = toNonNegativeInt(seq);
+      if (parsedSeq === null) return;
+      const nextSeq = Math.max(existing.lastForwardedSeq, parsedSeq);
       byKey.set(key, {
         ...existing,
         lastForwardedSeq: nextSeq,
         updatedAtMs: now(),
       });
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@apps/cli/src/channels/core/channelBridgeWorker.ts` around lines 210 - 230,
The in-memory binding store can persist NaN for lastForwardedSeq because
Math.trunc(NaN) yields NaN; update the upsertBinding and updateLastForwardedSeq
logic to sanitize seqs using the existing toNonNegativeInt helper (instead of
Math.trunc/Math.max alone) when constructing ChannelSessionBinding and computing
nextSeq; locate the upsertBinding and updateLastForwardedSeq functions (use
bindingKey and ChannelSessionBinding identifiers) and replace the
Math.trunc(...) / Math.max(...) usage with toNonNegativeInt(...) so
lastForwardedSeq is always a non-negative integer.
🧹 Nitpick comments (1)
apps/cli/src/channels/core/channelBridgeWorker.ts (1)

390-413: Handle /attach resolver exceptions with an explicit user-facing reply.

If deps.resolveSessionIdOrPrefix(...) throws, the outer catch logs and drops the event flow, but the user gets no command-level failure message. Add a local try/catch in /attach to fail gracefully.

🛠️ Proposed fix
-    const resolved = await deps.resolveSessionIdOrPrefix(idOrPrefix);
+    let resolved: ResolveSessionIdResult;
+    try {
+      resolved = await deps.resolveSessionIdOrPrefix(idOrPrefix);
+    } catch (error) {
+      deps.onWarning?.('Failed to resolve session id/prefix for attach', error);
+      await replyToConversation(
+        adapter,
+        ref,
+        'Unable to resolve that session right now. Please try again.',
+      );
+      return true;
+    }

As per coding guidelines, "Use proper error handling with try/catch blocks or Promise.catch()" and "Use graceful error handling with proper error messages, try-catch blocks with specific error logging".

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@apps/cli/src/channels/core/channelBridgeWorker.ts` around lines 390 - 413,
The call to deps.resolveSessionIdOrPrefix(...) inside the /attach handler can
throw and currently bubbles out; wrap that call in a local try/catch inside the
/attach command flow (around the call that sets const resolved) so any thrown
exception is caught, logged (use the existing logger), and a user-facing reply
is sent via replyToConversation(adapter, ref, 'Failed to resolve session: <short
message>'). Keep the existing branch logic for resolved.ok inside the try block
and on catch return true after sending the error reply so the command fails
gracefully.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Inline comments:
In `@apps/cli/src/channels/core/channelBridgeWorker.ts`:
- Around line 728-736: The current shutdown logic builds adaptersToStop while
deduplicating by providerId (seenProviderIds) which prevents calling stop() on
multiple distinct ChannelBridgeAdapter instances that share the same providerId;
remove the providerId dedupe and instead push every adapter from params.adapters
(or dedupe by adapter instance identity if necessary) so each
ChannelBridgeAdapter instance gets its stop() invoked during shutdown; update
the loop that populates adaptersToStop (the code referencing params.adapters,
seenProviderIds, and adaptersToStop) to ensure stop() is called for every
adapter instance.

---

Duplicate comments:
In `@apps/cli/src/channels/core/channelBridgeWorker.ts`:
- Around line 210-230: The in-memory binding store can persist NaN for
lastForwardedSeq because Math.trunc(NaN) yields NaN; update the upsertBinding
and updateLastForwardedSeq logic to sanitize seqs using the existing
toNonNegativeInt helper (instead of Math.trunc/Math.max alone) when constructing
ChannelSessionBinding and computing nextSeq; locate the upsertBinding and
updateLastForwardedSeq functions (use bindingKey and ChannelSessionBinding
identifiers) and replace the Math.trunc(...) / Math.max(...) usage with
toNonNegativeInt(...) so lastForwardedSeq is always a non-negative integer.

---

Nitpick comments:
In `@apps/cli/src/channels/core/channelBridgeWorker.ts`:
- Around line 390-413: The call to deps.resolveSessionIdOrPrefix(...) inside the
/attach handler can throw and currently bubbles out; wrap that call in a local
try/catch inside the /attach command flow (around the call that sets const
resolved) so any thrown exception is caught, logged (use the existing logger),
and a user-facing reply is sent via replyToConversation(adapter, ref, 'Failed to
resolve session: <short message>'). Keep the existing branch logic for
resolved.ok inside the try block and on catch return true after sending the
error reply so the command fails gracefully.

ℹ️ Review info
⚙️ Run configuration

Configuration used: Path: .coderabbit.yaml

Review profile: CHILL

Plan: Pro

Run ID: 5b2f9650-9b00-41c2-a582-67d717047ce3

📥 Commits

Reviewing files that changed from the base of the PR and between a83c471 and 9e9b0d8.

📒 Files selected for processing (4)
  • apps/cli/src/channels/core/channelBridgeWorker.test.ts
  • apps/cli/src/channels/core/channelBridgeWorker.ts
  • docs/README.md
  • docs/channel-bridge.md
✅ Files skipped from review due to trivial changes (1)
  • apps/cli/src/channels/core/channelBridgeWorker.test.ts
🚧 Files skipped from review as they are similar to previous changes (2)
  • docs/README.md
  • docs/channel-bridge.md

Comment on lines +728 to +736
const adaptersToStop: ChannelBridgeAdapter[] = [];
const seenProviderIds = new Set<string>();
for (const adapter of params.adapters) {
if (seenProviderIds.has(adapter.providerId)) {
continue;
}
seenProviderIds.add(adapter.providerId);
adaptersToStop.push(adapter);
}
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Stop all unique adapter instances during shutdown (not just one per providerId).

Line 731 dedupes by providerId, which can skip stop() for distinct adapter instances that share the same provider id, leaving resources dangling.

🛠️ Proposed fix
-        const adaptersToStop: ChannelBridgeAdapter[] = [];
-        const seenProviderIds = new Set<string>();
+        const adaptersToStop: ChannelBridgeAdapter[] = [];
+        const seenAdapters = new Set<ChannelBridgeAdapter>();
         for (const adapter of params.adapters) {
-          if (seenProviderIds.has(adapter.providerId)) {
+          if (seenAdapters.has(adapter)) {
             continue;
           }
-          seenProviderIds.add(adapter.providerId);
+          seenAdapters.add(adapter);
           adaptersToStop.push(adapter);
         }
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@apps/cli/src/channels/core/channelBridgeWorker.ts` around lines 728 - 736,
The current shutdown logic builds adaptersToStop while deduplicating by
providerId (seenProviderIds) which prevents calling stop() on multiple distinct
ChannelBridgeAdapter instances that share the same providerId; remove the
providerId dedupe and instead push every adapter from params.adapters (or dedupe
by adapter instance identity if necessary) so each ChannelBridgeAdapter instance
gets its stop() invoked during shutdown; update the loop that populates
adaptersToStop (the code referencing params.adapters, seenProviderIds, and
adaptersToStop) to ensure stop() is called for every adapter instance.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant