Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
140 commits
Select commit Hold shift + click to select a range
d10e9b5
fix(stream): preserve pipe channels and tcp auth metadata
Apr 4, 2026
0c17b05
fix(stream): add transport heartbeats and deadlines
Apr 4, 2026
3643665
refactor(stream): align adapters with AX conventions
Apr 4, 2026
5931bae
fix(stream): tighten hub and tcp lifecycle
Apr 4, 2026
77b6c51
fix(stream): close remaining RFC runtime gaps
Apr 4, 2026
a68c3e5
feat(stream): implement hub broker loop
Apr 4, 2026
3f55d1f
fix(stream): tighten hub and tcp edge cases
Apr 4, 2026
5505529
feat(stream): implement redis and zmq transports
Apr 4, 2026
14a6a6e
fix(hub): dispatch callbacks through hub queue
Apr 4, 2026
5f957f4
feat(zmq): add handshake auth for receivers
Apr 4, 2026
c39ea6e
feat(adapter): support direct HTTP handlers
Apr 4, 2026
22c9c9b
feat(ws): add legacy compatibility aliases
Apr 4, 2026
26eddad
refactor(adapter): use semantic receiver names
Apr 4, 2026
0782aee
feat(stream): add bridge delivery hooks
Apr 4, 2026
fceec98
fix(stream): forward pipe broadcasts
Apr 4, 2026
c4e50e2
feat(ws): restore legacy compatibility exports
Apr 4, 2026
21bfd41
feat(stream): add explicit subscription errors
Apr 4, 2026
6f803f8
fix(redis): enforce bridge startup lifecycle
Apr 4, 2026
8759314
fix(sse): default zero-value heartbeat config
Apr 4, 2026
9127d43
feat(ws): add channel handler
Apr 4, 2026
0cb8f0f
fix(tcp): preserve first payload without auth handshake
Apr 4, 2026
a1935bb
Implement inbound transport dispatch semantics
Apr 4, 2026
caa3380
fix(stream): honour peer transport shutdown
Apr 4, 2026
a15dd0e
refactor(stream): use semantic receiver names
Apr 4, 2026
ce4a7c6
refactor(stream): add AX-oriented usage examples
Apr 4, 2026
f5962ad
refactor(stream): align pipe fallback with broadcast semantics
Apr 4, 2026
daf343e
chore(stream): refine AX-facing APIs
Apr 4, 2026
1aace93
refactor(stream): align public docs with AX examples
Apr 4, 2026
d716256
fix(stream): harden auth and tcp first-frame handling
Apr 4, 2026
6e29d4a
fix(stream): generate proper peer UUIDs
Apr 4, 2026
10d54fb
fix(stream): keep overflow deliveries on hub loop
Apr 4, 2026
bb44ee5
fix(stream): re-export frame aliases
Apr 4, 2026
bdfa4db
fix(ws): add legacy compatibility facade
Apr 4, 2026
752a96e
fix(pipe): publish wildcard frames in generic fallback
Apr 4, 2026
e1abc27
docs(stream): apply AX examples to transport APIs
Apr 4, 2026
353f534
docs(stream): improve peer usage examples
Apr 4, 2026
45daa23
fix(zmq): enforce handshake size limit
Apr 4, 2026
b4aad18
fix(tcp): guarantee full-frame writes
Apr 4, 2026
7c94303
docs(stream): tighten AX-facing comments
Apr 4, 2026
b779ac9
feat(sse): add auth failure callback
Apr 4, 2026
1ce9483
docs(stream): add example-led adapter comments
Apr 4, 2026
3b55ed8
style(stream): clarify AX examples
Apr 4, 2026
41f62bb
refactor(stream): ax polish auth and docs
Apr 4, 2026
80c343f
refactor(stream): clarify auth adapter names
Apr 4, 2026
47a1b90
fix(stream): clone piped frames at bridge boundary
Apr 4, 2026
f49b9f2
docs(stream): align public comments with AX
Apr 4, 2026
e184c19
chore(stream): clarify auth result naming
Apr 4, 2026
70d5809
Add executable AX usage examples
Apr 4, 2026
089554a
style(stream): align public comments with AX
Apr 4, 2026
5bdeb4e
feat(stream): add predictable subscribe API
Apr 4, 2026
0ac22ad
docs(stream): make auth examples more concrete
Apr 4, 2026
c29ebf5
AX document stream message types
Apr 4, 2026
0179542
style(stream): align pipe naming with AX
Apr 4, 2026
a04e403
style(stream): align public comments with AX
Apr 4, 2026
6327cc7
style(stream): refine AX-facing examples
Apr 4, 2026
cdfaff7
style(stream): align public comments with AX
Apr 4, 2026
1f428f6
fix(stream): make pipe stops idempotent
Apr 4, 2026
df12f2c
style(stream): sharpen auth example
Apr 4, 2026
61dc2d8
Restore ws hub compatibility handlers
Apr 4, 2026
505ff76
refactor(stream): clarify hub internals
Apr 4, 2026
5925a8e
feat(stream): enforce channel authorisation at adapter edges
Apr 4, 2026
f859919
fix(tcp): handle nil dial context
Apr 4, 2026
4b4f49b
docs(stream): align public comments with AX
Apr 4, 2026
fa5e377
docs(stream): add usage examples for sentinel errors
Apr 4, 2026
709a1df
style(stream): clarify hub publish queue naming
Apr 4, 2026
ba8841d
style(stream): align message docs with AX
Apr 4, 2026
b02dd12
style(stream): align public comments with AX
Apr 4, 2026
3ae382c
Add TCP reconnect state tracking
Apr 5, 2026
a5f5919
style(stream): align config comments with AX
Apr 5, 2026
b193fca
style(stream): improve AX naming in pipe and message examples
Apr 5, 2026
14a33c2
docs(stream): confirm RFC parity
Apr 5, 2026
0a520e7
style(stream): align auth examples with AX
Apr 5, 2026
8ae82bc
style(stream): sharpen AX-facing public comments
Apr 5, 2026
3639063
style(stream): refine stats example
Apr 5, 2026
77663fd
style(stream): sharpen hub config AX comments
Apr 5, 2026
a565420
style(stream): sharpen AX comments
Apr 5, 2026
77c29e6
style(stream): add connection state stringer
Apr 5, 2026
237069c
style(stream): sharpen package comments for AX
Apr 5, 2026
5edc894
style(stream): sharpen hub comments for AX
Apr 5, 2026
4b70283
style(stream): tighten message type examples for AX
Apr 5, 2026
6381c9d
style(stream): sharpen AX public comments
Apr 5, 2026
93d5163
docs(ax): tighten API examples and names
Apr 5, 2026
8b0b796
docs(ax): sharpen adapter usage examples
Apr 5, 2026
626c2a7
style(stream): sharpen public examples
Apr 5, 2026
c351b66
AX polish transport examples
Apr 5, 2026
7abea98
fix(stream): close transports on context cancellation
Apr 5, 2026
8ef64e4
refactor(stream): name queue defaults
Apr 5, 2026
80d2660
style(stream): sharpen hub AX comments
Apr 5, 2026
8477c6b
feat(tcp): add client handshake support
Apr 5, 2026
ec3a152
style(stream): sharpen hub AX comments
Apr 5, 2026
3b25afc
style(ax): sharpen public API usage comments
Apr 5, 2026
b98af2b
style(ax): add composition usage examples
Apr 5, 2026
7469899
style(stream): sort peer iteration deterministically
Apr 5, 2026
954e692
feat(stream): add hub running accessor
Apr 5, 2026
9590535
style(stream): sharpen peer usage comments
Apr 5, 2026
cd7ef3d
Register ZMQ peers with the hub
Apr 5, 2026
0d3cfa1
docs(stream): align public examples with AX
Apr 5, 2026
a20177c
docs(stream): add RFC spec mirrors and AX comment polish
Apr 5, 2026
fad1ecb
docs(ax): add Codex conventions mirror
Apr 5, 2026
5a92e11
fix(transport): serialize reconnecting sends
Apr 5, 2026
e3f2a5b
style(stream): rename peer send queue field
Apr 5, 2026
7ebc142
style(stream): sharpen stream interface usage comments
Apr 5, 2026
3507a74
fix(hub): deliver peer-originated frames to sender
Apr 5, 2026
8d35382
fix(stream): reject adapters when hub is not running
Apr 5, 2026
438f81f
style(hub): sharpen public API examples
Apr 5, 2026
0ed78ee
style(ax): add concrete stream usage examples
Apr 5, 2026
9fe77e6
refactor(stream): clarify core names
Apr 5, 2026
01ecd9c
style(hub): sharpen running example comment
Apr 5, 2026
d68eec7
refactor(stream): rename peer internals for clarity
Apr 5, 2026
f9f66fd
style(ax): sharpen stream examples
Apr 5, 2026
120e388
chore(ax): replace mu with mutex
Apr 5, 2026
13fc477
ax(stream): add enum stringers for logging
Apr 5, 2026
ce6e043
docs(ax): align public comments with AX examples
Apr 5, 2026
a427005
style(ax): sharpen package examples
Apr 5, 2026
52382dc
style(ax): sharpen package docs
Apr 5, 2026
b5415b3
fix(hub): lock peer subscription state
Apr 5, 2026
79347bf
style(ax): sharpen package examples
Apr 5, 2026
4520fc0
fix(tcp): allow large unauthenticated initial frames
Apr 5, 2026
a13339e
fix(stream): count handler-only channels in stats
Apr 5, 2026
c9d74c0
style(ax): sharpen package docs
Apr 5, 2026
e9a66d6
style(ax): sharpen stream examples
Apr 5, 2026
ab8c74b
fix(ws): re-export channel authoriser alias
Apr 5, 2026
87a65d4
style(ax): sharpen stream examples
Apr 5, 2026
29832b4
fix(adapters): harden stream framing
Apr 5, 2026
df1c26e
docs(stream): improve auth examples
Apr 5, 2026
69b7ca3
style(ax): sharpen message envelope example
Apr 5, 2026
cf3be2c
style(ax): sharpen legacy ws compatibility example
Apr 5, 2026
7cf55bb
style(ax): default peer claims to empty map
Apr 5, 2026
37f5560
fix(auth): initialize successful claims maps
Apr 5, 2026
5306423
fix(adapter): register peers after transport setup
Apr 5, 2026
f29a9cb
style(stream): align peer internals with AX naming
Apr 5, 2026
e63b04e
refactor(tcp): clarify framing helper names
Apr 5, 2026
8a4cd2f
fix(redis): use bridge source ids directly
Apr 5, 2026
135249d
style(ax): align examples with core print helper
Apr 5, 2026
2c15a1f
style(ax): sharpen auth result example comments
Apr 5, 2026
e2f66c1
style(ax): sharpen pipe examples
Apr 5, 2026
fbf95ae
feat(ax): add comprehensive tests and usage-example comments
Snider Apr 5, 2026
8f52f29
fix(ax): replace banned imports and add usage-example comments
Snider Apr 5, 2026
0f88bbe
feat(ax-10): bring go-stream to v0.8.0-alpha.1 + CLI test scaffold
Snider Apr 24, 2026
56dd1b1
refactor(core): full v0.9.0 compliance against core/go reference
Snider Apr 28, 2026
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 25 additions & 0 deletions CODEX.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
# CODEX.md — go-stream

This repository keeps its working conventions in [CLAUDE.md](/workspace/CLAUDE.md).
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

🧩 Analysis chain

🏁 Script executed:

#!/bin/bash
# Description: Check if CLAUDE.md exists at the repository root or elsewhere

# Check for CLAUDE.md at repository root
if [ -f "CLAUDE.md" ]; then
  echo "✓ CLAUDE.md exists at repository root"
  echo "Relative path should be: CLAUDE.md"
elif [ -f "workspace/CLAUDE.md" ]; then
  echo "✓ CLAUDE.md exists at workspace/CLAUDE.md"
  echo "Relative path should be: workspace/CLAUDE.md"
else
  # Search for CLAUDE.md anywhere in the repository
  fd -t f "^CLAUDE\.md$" -x echo "Found at: {}"
fi

Repository: dAppCore/go-stream

Length of output: 135


Change the path reference to CLAUDE.md to use a relative path instead of the absolute /workspace/ path.

The absolute path /workspace/CLAUDE.md is environment-specific and will break for users cloning the repository. CLAUDE.md exists at the repository root and should be referenced relatively.

Suggested fix
-This repository keeps its working conventions in [CLAUDE.md](/workspace/CLAUDE.md).
+This repository keeps its working conventions in [CLAUDE.md](CLAUDE.md).
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
This repository keeps its working conventions in [CLAUDE.md](/workspace/CLAUDE.md).
This repository keeps its working conventions in [CLAUDE.md](CLAUDE.md).
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@CODEX.md` at line 3, Update the path reference in CODEX.md that currently
points to the absolute "/workspace/CLAUDE.md" and change it to a
repository-relative reference (e.g., "CLAUDE.md" or "./CLAUDE.md") so the link
works for users who clone the repo; locate the string "/workspace/CLAUDE.md" in
CODEX.md and replace it with the relative path.


Read these two documents before changing code:

```text
docs/RFC.md — go-stream implementation spec
docs/RFC-025-AGENT-EXPERIENCE.md — AX design principles
```

Key conventions:

- Use `core.E(scope, message, cause)` for errors.
- Keep comments as concrete usage examples.
- Prefer predictable names over shorthand.
- Preserve the transport-agnostic public API and the `ws` compatibility surface.

Commit convention:

```text
type(scope): description

Co-Authored-By: Virgil <virgil@lethean.io>
```
197 changes: 197 additions & 0 deletions adapter/redis/ax7_more_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,197 @@
// SPDX-License-Identifier: EUPL-1.2

package redis

import (
"github.com/alicebob/miniredis/v2"

core "dappco.re/go"
"dappco.re/go/stream"
)

func ax7StartedBridge(t *core.T) (*Bridge, core.CancelFunc) {
redisServer := miniredis.RunT(t)
hub := stream.NewHub()
ctx, cancel := core.WithCancel(core.Background())
go hub.Run(ctx)

bridge, err := NewBridge(hub, Config{Addr: redisServer.Addr(), Prefix: "pool"})
core.RequireNoError(t, err)
go func() {
if err := bridge.Start(ctx); err != nil {
t.Errorf("Start() error = %v", err)
}
}()
core.Sleep(100 * core.Millisecond)
return bridge, cancel
}

func TestAX7_NewBridge_Good(t *core.T) {
redisServer := miniredis.RunT(t)
hub := stream.NewHub()

bridge, err := NewBridge(hub, Config{Addr: redisServer.Addr()})
core.AssertNoError(t, err)
core.AssertEqual(t, hub, bridge.hub)
core.AssertEqual(t, "stream", bridge.config.Prefix)
}

func TestAX7_NewBridge_Bad(t *core.T) {
redisServer := miniredis.RunT(t)

bridge, err := NewBridge(nil, Config{Addr: redisServer.Addr()})
core.AssertError(t, err)
core.AssertNil(t, bridge)
}

func TestAX7_NewBridge_Ugly(t *core.T) {
redisServer := miniredis.RunT(t)
hub := stream.NewHub()

left, err := NewBridge(hub, Config{Addr: redisServer.Addr(), Prefix: "pool"})
core.RequireNoError(t, err)
right, err := NewBridge(hub, Config{Addr: redisServer.Addr(), Prefix: "pool"})
core.RequireNoError(t, err)
core.AssertNotEqual(t, left.SourceID(), right.SourceID())
}

func TestAX7_Bridge_SourceID_Good(t *core.T) {
redisServer := miniredis.RunT(t)
bridge, err := NewBridge(stream.NewHub(), Config{Addr: redisServer.Addr(), Prefix: "pool"})
core.RequireNoError(t, err)

core.AssertNotEmpty(t, bridge.SourceID())
core.AssertEqual(t, 36, core.RuneCount(bridge.SourceID()))
}

func TestAX7_Bridge_SourceID_Bad(t *core.T) {
var bridge *Bridge

core.AssertEqual(t, "", bridge.SourceID())
core.AssertNil(t, bridge)
}

func TestAX7_Bridge_SourceID_Ugly(t *core.T) {
redisServer := miniredis.RunT(t)
bridge, err := NewBridge(stream.NewHub(), Config{Addr: redisServer.Addr(), Prefix: "pool"})
core.RequireNoError(t, err)

sourceID := bridge.SourceID()
core.AssertEqual(t, sourceID, bridge.SourceID())
core.AssertNotEmpty(t, sourceID)
}

func TestAX7_Bridge_Start_Good(t *core.T) {
bridge, cancel := ax7StartedBridge(t)
defer cancel()

bridge.mutex.RLock()
running := bridge.running
bridge.mutex.RUnlock()
core.AssertTrue(t, running)
}

func TestAX7_Bridge_Start_Bad(t *core.T) {
var bridge *Bridge

err := bridge.Start(core.Background())
core.AssertError(t, err)
core.AssertContains(t, err.Error(), "nil bridge")
}

func TestAX7_Bridge_Start_Ugly(t *core.T) {
bridge, cancel := ax7StartedBridge(t)
defer cancel()

err := bridge.Start(core.Background())
core.AssertNoError(t, err)
core.AssertNotEmpty(t, bridge.SourceID())
}

func TestAX7_Bridge_Stop_Good(t *core.T) {
bridge, cancel := ax7StartedBridge(t)
defer cancel()

core.AssertNoError(t, bridge.Stop())
core.Sleep(50 * core.Millisecond)
bridge.mutex.RLock()
running := bridge.running
bridge.mutex.RUnlock()
core.AssertFalse(t, running)
}

func TestAX7_Bridge_Stop_Bad(t *core.T) {
var bridge *Bridge

core.AssertNoError(t, bridge.Stop())
core.AssertNil(t, bridge)
}

func TestAX7_Bridge_Stop_Ugly(t *core.T) {
redisServer := miniredis.RunT(t)
bridge, err := NewBridge(stream.NewHub(), Config{Addr: redisServer.Addr(), Prefix: "pool"})
core.RequireNoError(t, err)

core.AssertNoError(t, bridge.Stop())
core.AssertNotEmpty(t, bridge.SourceID())
}

func TestAX7_Bridge_PublishToChannel_Good(t *core.T) {
redisServer := miniredis.RunT(t)
hub1 := stream.NewHub()
hub2 := stream.NewHub()
ctx, cancel := core.WithCancel(core.Background())
defer cancel()
go hub1.Run(ctx)
go hub2.Run(ctx)
bridge1, err := NewBridge(hub1, Config{Addr: redisServer.Addr(), Prefix: "pool"})
core.RequireNoError(t, err)
bridge2, err := NewBridge(hub2, Config{Addr: redisServer.Addr(), Prefix: "pool"})
core.RequireNoError(t, err)
go func() { core.AssertNoError(t, bridge1.Start(ctx)) }()
go func() { core.AssertNoError(t, bridge2.Start(ctx)) }()
core.Sleep(100 * core.Millisecond)

received := make(chan []byte, 1)
stop := hub2.Subscribe("block", func(frame []byte) { received <- append([]byte(nil), frame...) })
defer stop()
core.AssertNoError(t, bridge1.PublishToChannel("block", []byte("template")))
core.AssertEqual(t, "template", string(<-received))
}

func TestAX7_Bridge_PublishToChannel_Bad(t *core.T) {
redisServer := miniredis.RunT(t)
bridge, err := NewBridge(stream.NewHub(), Config{Addr: redisServer.Addr(), Prefix: "pool"})
core.RequireNoError(t, err)

err = bridge.PublishToChannel("block", []byte("template"))
core.AssertError(t, err)
core.AssertContains(t, err.Error(), "not started")
}

func TestAX7_Bridge_PublishToChannel_Ugly(t *core.T) {
bridge, cancel := ax7StartedBridge(t)
defer cancel()

err := bridge.PublishToChannel("", []byte("template"))
core.AssertError(t, err)
core.AssertContains(t, err.Error(), "empty channel")
}

func TestAX7_Bridge_PublishBroadcast_Bad(t *core.T) {
redisServer := miniredis.RunT(t)
bridge, err := NewBridge(stream.NewHub(), Config{Addr: redisServer.Addr(), Prefix: "pool"})
core.RequireNoError(t, err)

err = bridge.PublishBroadcast([]byte("shutdown"))
core.AssertError(t, err)
core.AssertContains(t, err.Error(), "not started")
}

func TestAX7_Bridge_PublishBroadcast_Ugly(t *core.T) {
var bridge *Bridge

err := bridge.PublishBroadcast([]byte("shutdown"))
core.AssertError(t, err)
core.AssertContains(t, err.Error(), "nil bridge")
}
33 changes: 33 additions & 0 deletions adapter/redis/example_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
// SPDX-License-Identifier: EUPL-1.2

package redis_test

import (
"context"

"dappco.re/go/stream"
"dappco.re/go/stream/adapter/redis"
)

func ExampleNewBridge() {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

hub := stream.NewHub()
go hub.Run(ctx)

bridge, err := redis.NewBridge(hub, redis.Config{
Addr: "127.0.0.1:6379",
Prefix: "pool",
})
if err != nil {
return
}
defer bridge.Stop()

go func() {
_ = bridge.Start(ctx)
}()

_ = bridge.PublishToChannel("block", []byte("template"))
}
Loading