Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
33 changes: 33 additions & 0 deletions cmd/go9p-deviceconnect/cli_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,5 +65,38 @@ func TestGo9pDeviceConnect_Discover(t *testing.T) {
if !strings.Contains(s, "robot-001") || !strings.Contains(s, "sensor-001") {
t.Fatalf("unexpected discover output: %q", s)
}

// Exercise cmdCall end-to-end. The demo backend's echo returns the
// payload as-is. Run it twice to confirm the CLI is idempotent across
// invocations (each cmdCall mounts and unmounts its own connection).
out.Reset()
errb.Reset()
code = cmdCall(&out, &errb, []string{
"-addr", addr,
"-id", "robot-001",
"-fn", "echo",
"-payload", "hello",
})
if code != 0 {
t.Fatalf("call code=%d stderr=%q", code, errb.String())
}
if got := out.String(); got != "hello" {
t.Fatalf("call output=%q want %q", got, "hello")
}

out.Reset()
errb.Reset()
code = cmdCall(&out, &errb, []string{
"-addr", addr,
"-id", "robot-001",
"-fn", "echo",
"-payload", "again",
})
if code != 0 {
t.Fatalf("second call code=%d stderr=%q", code, errb.String())
}
if got := out.String(); got != "again" {
t.Fatalf("second call output=%q want %q", got, "again")
}
}

17 changes: 17 additions & 0 deletions p/srv/examples/deviceconnect/DESIGN.md
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,23 @@ Supported commands:

On failure, `ctl` returns a 9P error (Rerror). This is the “standard” mechanism.

## Call instance lifecycle

Reading `clone` allocates a numbered subdirectory (e.g. `functions/echo/7/`).
That directory's lifetime is tied to the **9P connection** that allocated
it: when the connection drops — clean unmount, client crash, or network
loss — the example's server wrapper (`dcSrv`) reaps every call instance
attached to that connection via the framework's `ConnClosed` hook.

This mirrors the canonical Plan 9 idiom: `/net/tcp` connections, `/srv`
posts, and per-process namespaces all evaporate with the connection that
created them. It also means:

- The CLI does not need to write any explicit "close" command after a call.
- A client that crashes mid-call doesn't leak — the connection still drops
and cleanup still runs.
- Long-lived servers do not accumulate dead call directories.

### `devices/by-id/<device-id>/functions/<fn>/<id>/data`

- **read/write**:
Expand Down
60 changes: 58 additions & 2 deletions p/srv/examples/deviceconnect/deviceconnect.go
Original file line number Diff line number Diff line change
Expand Up @@ -334,6 +334,7 @@ type funcCloneFile struct {
backend Backend
deviceID string
fn string
fs *DCFS // back-pointer for per-conn registration of allocated dirs
}

func (f *funcCloneFile) Read(fid *srv.FFid, buf []byte, offset uint64) (int, error) {
Expand Down Expand Up @@ -380,6 +381,13 @@ func (f *funcCloneFile) Read(fid *srv.FFid, buf []byte, offset uint64) (int, err
instDir.Remove()
return 0, &p.Error{Err: "buffer too small", Errornum: 0}
}
// Register the dir against the connection that allocated it. When that
// connection drops, ConnClosed reaps anything still attached. This is the
// canonical 9P lifecycle (cf. /net/tcp, where connection state vanishes
// with the connection that created it).
if f.fs != nil && fid != nil && fid.Fid != nil {
f.fs.registerCallDir(fid.Fid.Fconn, instDir)
}
copy(buf, out)
return len(out), nil
}
Expand All @@ -404,6 +412,51 @@ type DCFS struct {

// Per-device-value event cancels (best-effort cleanup on refresh).
valueEventCancels map[string]context.CancelFunc

// Per-connection registry of call instance dirs allocated via clone.
// On ConnClosed we remove everything still attached to the dropped
// connection, so a client crash or unmount auto-reaps its workspace.
connMu sync.Mutex
connDirs map[*srv.Conn][]*srv.File
}

// registerCallDir associates a freshly-allocated call instance dir with the
// 9P connection that allocated it, so it can be reaped on disconnect.
func (fs *DCFS) registerCallDir(c *srv.Conn, dir *srv.File) {
if c == nil || dir == nil {
return
}
fs.connMu.Lock()
fs.connDirs[c] = append(fs.connDirs[c], dir)
fs.connMu.Unlock()
}

// releaseConn removes every call instance dir registered to the given
// connection. Safe to call multiple times: srv.File.Remove is idempotent
// via the Fremoved flag, so a dir torn down by another path (e.g. a
// refresh that rebuilt its parent device) is skipped silently.
func (fs *DCFS) releaseConn(c *srv.Conn) {
fs.connMu.Lock()
dirs := fs.connDirs[c]
delete(fs.connDirs, c)
fs.connMu.Unlock()
for _, d := range dirs {
d.Remove()
}
}

// dcSrv wraps srv.Fsrv so the deviceconnect example can hook 9P connection
// lifecycle events. The default srv.Fsrv has no opinion on connection close;
// here we use it to run releaseConn for the closing connection.
type dcSrv struct {
*srv.Fsrv
fs *DCFS
}

func (s *dcSrv) ConnOpened(*srv.Conn) {}

func (s *dcSrv) ConnClosed(c *srv.Conn) {
s.fs.releaseConn(c)
}

func buildDeviceConnectFS(backend Backend) (*DCFS, error) {
Expand All @@ -413,6 +466,7 @@ func buildDeviceConnectFS(backend Backend) (*DCFS, error) {
deviceDirs: make(map[string]*srv.File),
eventCancels: make(map[string]context.CancelFunc),
valueEventCancels: make(map[string]context.CancelFunc),
connDirs: make(map[*srv.Conn][]*srv.File),
}

fs.root = new(srv.File)
Expand Down Expand Up @@ -671,6 +725,7 @@ func (fs *DCFS) addDeviceLocked(d Device) error {
backend: fs.backend,
deviceID: d.ID,
fn: fn.Name,
fs: fs,
}
if err := cl.Add(fnDir, "clone", fs.user, nil, 0o444, cl); err != nil {
return err
Expand Down Expand Up @@ -789,9 +844,10 @@ func main() {
if *debug {
s.Debuglevel = 1
}
s.Start(s)
ds := &dcSrv{Fsrv: s, fs: fs}
ds.Start(ds)

if err := s.StartNetListener("tcp", *addr); err != nil {
if err := ds.StartNetListener("tcp", *addr); err != nil {
log.Fatalf("listen: %v", err)
}
}
77 changes: 76 additions & 1 deletion p/srv/examples/deviceconnect/deviceconnect_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"os"
"strings"
"testing"
"time"

"github.com/lionkov/go9p/p"
"github.com/lionkov/go9p/p/clnt"
Expand Down Expand Up @@ -115,7 +116,8 @@ func startDeviceConnectServer(t *testing.T, backend Backend) (addr string, stop
}
s := srv.NewFileSrv(fs.root)
s.Dotu = true
s.Start(s)
ds := &dcSrv{Fsrv: s, fs: fs}
ds.Start(ds)

ln, err := net.Listen("tcp", "127.0.0.1:0")
if err != nil {
Expand Down Expand Up @@ -432,6 +434,79 @@ func TestDeviceConnect_FunctionStream(t *testing.T) {
}
}

// Connection-scoped cleanup: dirs allocated by a connection are reaped when
// that connection drops.
func TestDeviceConnect_DisconnectCleansUpCalls(t *testing.T) {
backend := &fakeBackend{
devices: []Device{
{
ID: "robot-001",
Type: "robot",
Meta: "id=robot-001 type=robot",
Status: "ok",
Functions: []Function{
{Name: "echo", About: "Echo.", Schema: "bytes"},
},
},
},
}
addr, stop := startDeviceConnectServer(t, backend)
defer stop()

// Connection A: allocate several call dirs and never close them.
cA, cleanA := mountClient(t, addr)
const N = 5
var ids []string
for i := 0; i < N; i++ {
cl, err := cA.FOpen("/devices/by-id/robot-001/functions/echo/clone", p.OREAD)
if err != nil {
t.Fatalf("clone %d: %v", i, err)
}
idb, err := io.ReadAll(cl)
_ = cl.Close()
if err != nil {
t.Fatalf("read clone %d: %v", i, err)
}
ids = append(ids, strings.TrimSpace(string(idb)))
}

// Sanity: every dir is reachable while A is connected.
for _, id := range ids {
f, err := cA.FOpen("/devices/by-id/robot-001/functions/echo/"+id+"/data", p.OREAD)
if err != nil {
t.Fatalf("dir %q should be reachable while A is connected: %v", id, err)
}
_ = f.Close()
}

// Drop A entirely. ConnClosed must reap everything A allocated.
cleanA()

// Give the server a brief moment to run its ConnClosed hook (it runs
// inline on the conn's goroutine, but the close itself races against
// the client returning from Unmount).
deadline := time.Now().Add(2 * time.Second)
cB, cleanB := mountClient(t, addr)
defer cleanB()
for {
stillThere := 0
for _, id := range ids {
f, err := cB.FOpen("/devices/by-id/robot-001/functions/echo/"+id+"/data", p.OREAD)
if err == nil {
stillThere++
_ = f.Close()
}
}
if stillThere == 0 {
break
}
if time.Now().After(deadline) {
t.Fatalf("after A disconnected, %d/%d dirs still reachable from B — ConnClosed did not reap", stillThere, N)
}
time.Sleep(20 * time.Millisecond)
}
}

func TestDeviceConnect_DeviceEventsReplay(t *testing.T) {
src := make(chan Event, 8)
backend := &fakeBackend{
Expand Down
Loading