diff --git a/cmd/go9p-deviceconnect/cli_test.go b/cmd/go9p-deviceconnect/cli_test.go index fc24310..5921131 100644 --- a/cmd/go9p-deviceconnect/cli_test.go +++ b/cmd/go9p-deviceconnect/cli_test.go @@ -65,5 +65,38 @@ func TestGo9pDeviceConnect_Discover(t *testing.T) { if !strings.Contains(s, "robot-001") || !strings.Contains(s, "sensor-001") { t.Fatalf("unexpected discover output: %q", s) } + + // Exercise cmdCall end-to-end. The demo backend's echo returns the + // payload as-is. Run it twice to confirm the CLI is idempotent across + // invocations (each cmdCall mounts and unmounts its own connection). + out.Reset() + errb.Reset() + code = cmdCall(&out, &errb, []string{ + "-addr", addr, + "-id", "robot-001", + "-fn", "echo", + "-payload", "hello", + }) + if code != 0 { + t.Fatalf("call code=%d stderr=%q", code, errb.String()) + } + if got := out.String(); got != "hello" { + t.Fatalf("call output=%q want %q", got, "hello") + } + + out.Reset() + errb.Reset() + code = cmdCall(&out, &errb, []string{ + "-addr", addr, + "-id", "robot-001", + "-fn", "echo", + "-payload", "again", + }) + if code != 0 { + t.Fatalf("second call code=%d stderr=%q", code, errb.String()) + } + if got := out.String(); got != "again" { + t.Fatalf("second call output=%q want %q", got, "again") + } } diff --git a/p/srv/examples/deviceconnect/DESIGN.md b/p/srv/examples/deviceconnect/DESIGN.md index 3e3298c..138701d 100644 --- a/p/srv/examples/deviceconnect/DESIGN.md +++ b/p/srv/examples/deviceconnect/DESIGN.md @@ -118,6 +118,23 @@ Supported commands: On failure, `ctl` returns a 9P error (Rerror). This is the “standard” mechanism. +## Call instance lifecycle + +Reading `clone` allocates a numbered subdirectory (e.g. `functions/echo/7/`). +That directory's lifetime is tied to the **9P connection** that allocated +it: when the connection drops — clean unmount, client crash, or network +loss — the example's server wrapper (`dcSrv`) reaps every call instance +attached to that connection via the framework's `ConnClosed` hook. + +This mirrors the canonical Plan 9 idiom: `/net/tcp` connections, `/srv` +posts, and per-process namespaces all evaporate with the connection that +created them. It also means: + +- The CLI does not need to write any explicit "close" command after a call. +- A client that crashes mid-call doesn't leak — the connection still drops + and cleanup still runs. +- Long-lived servers do not accumulate dead call directories. + ### `devices/by-id//functions///data` - **read/write**: diff --git a/p/srv/examples/deviceconnect/deviceconnect.go b/p/srv/examples/deviceconnect/deviceconnect.go index 27aba76..fa47fee 100644 --- a/p/srv/examples/deviceconnect/deviceconnect.go +++ b/p/srv/examples/deviceconnect/deviceconnect.go @@ -334,6 +334,7 @@ type funcCloneFile struct { backend Backend deviceID string fn string + fs *DCFS // back-pointer for per-conn registration of allocated dirs } func (f *funcCloneFile) Read(fid *srv.FFid, buf []byte, offset uint64) (int, error) { @@ -380,6 +381,13 @@ func (f *funcCloneFile) Read(fid *srv.FFid, buf []byte, offset uint64) (int, err instDir.Remove() return 0, &p.Error{Err: "buffer too small", Errornum: 0} } + // Register the dir against the connection that allocated it. When that + // connection drops, ConnClosed reaps anything still attached. This is the + // canonical 9P lifecycle (cf. /net/tcp, where connection state vanishes + // with the connection that created it). + if f.fs != nil && fid != nil && fid.Fid != nil { + f.fs.registerCallDir(fid.Fid.Fconn, instDir) + } copy(buf, out) return len(out), nil } @@ -404,6 +412,51 @@ type DCFS struct { // Per-device-value event cancels (best-effort cleanup on refresh). valueEventCancels map[string]context.CancelFunc + + // Per-connection registry of call instance dirs allocated via clone. + // On ConnClosed we remove everything still attached to the dropped + // connection, so a client crash or unmount auto-reaps its workspace. + connMu sync.Mutex + connDirs map[*srv.Conn][]*srv.File +} + +// registerCallDir associates a freshly-allocated call instance dir with the +// 9P connection that allocated it, so it can be reaped on disconnect. +func (fs *DCFS) registerCallDir(c *srv.Conn, dir *srv.File) { + if c == nil || dir == nil { + return + } + fs.connMu.Lock() + fs.connDirs[c] = append(fs.connDirs[c], dir) + fs.connMu.Unlock() +} + +// releaseConn removes every call instance dir registered to the given +// connection. Safe to call multiple times: srv.File.Remove is idempotent +// via the Fremoved flag, so a dir torn down by another path (e.g. a +// refresh that rebuilt its parent device) is skipped silently. +func (fs *DCFS) releaseConn(c *srv.Conn) { + fs.connMu.Lock() + dirs := fs.connDirs[c] + delete(fs.connDirs, c) + fs.connMu.Unlock() + for _, d := range dirs { + d.Remove() + } +} + +// dcSrv wraps srv.Fsrv so the deviceconnect example can hook 9P connection +// lifecycle events. The default srv.Fsrv has no opinion on connection close; +// here we use it to run releaseConn for the closing connection. +type dcSrv struct { + *srv.Fsrv + fs *DCFS +} + +func (s *dcSrv) ConnOpened(*srv.Conn) {} + +func (s *dcSrv) ConnClosed(c *srv.Conn) { + s.fs.releaseConn(c) } func buildDeviceConnectFS(backend Backend) (*DCFS, error) { @@ -413,6 +466,7 @@ func buildDeviceConnectFS(backend Backend) (*DCFS, error) { deviceDirs: make(map[string]*srv.File), eventCancels: make(map[string]context.CancelFunc), valueEventCancels: make(map[string]context.CancelFunc), + connDirs: make(map[*srv.Conn][]*srv.File), } fs.root = new(srv.File) @@ -671,6 +725,7 @@ func (fs *DCFS) addDeviceLocked(d Device) error { backend: fs.backend, deviceID: d.ID, fn: fn.Name, + fs: fs, } if err := cl.Add(fnDir, "clone", fs.user, nil, 0o444, cl); err != nil { return err @@ -789,9 +844,10 @@ func main() { if *debug { s.Debuglevel = 1 } - s.Start(s) + ds := &dcSrv{Fsrv: s, fs: fs} + ds.Start(ds) - if err := s.StartNetListener("tcp", *addr); err != nil { + if err := ds.StartNetListener("tcp", *addr); err != nil { log.Fatalf("listen: %v", err) } } diff --git a/p/srv/examples/deviceconnect/deviceconnect_test.go b/p/srv/examples/deviceconnect/deviceconnect_test.go index e0e0d6b..4b79fe7 100644 --- a/p/srv/examples/deviceconnect/deviceconnect_test.go +++ b/p/srv/examples/deviceconnect/deviceconnect_test.go @@ -9,6 +9,7 @@ import ( "os" "strings" "testing" + "time" "github.com/lionkov/go9p/p" "github.com/lionkov/go9p/p/clnt" @@ -115,7 +116,8 @@ func startDeviceConnectServer(t *testing.T, backend Backend) (addr string, stop } s := srv.NewFileSrv(fs.root) s.Dotu = true - s.Start(s) + ds := &dcSrv{Fsrv: s, fs: fs} + ds.Start(ds) ln, err := net.Listen("tcp", "127.0.0.1:0") if err != nil { @@ -432,6 +434,79 @@ func TestDeviceConnect_FunctionStream(t *testing.T) { } } +// Connection-scoped cleanup: dirs allocated by a connection are reaped when +// that connection drops. +func TestDeviceConnect_DisconnectCleansUpCalls(t *testing.T) { + backend := &fakeBackend{ + devices: []Device{ + { + ID: "robot-001", + Type: "robot", + Meta: "id=robot-001 type=robot", + Status: "ok", + Functions: []Function{ + {Name: "echo", About: "Echo.", Schema: "bytes"}, + }, + }, + }, + } + addr, stop := startDeviceConnectServer(t, backend) + defer stop() + + // Connection A: allocate several call dirs and never close them. + cA, cleanA := mountClient(t, addr) + const N = 5 + var ids []string + for i := 0; i < N; i++ { + cl, err := cA.FOpen("/devices/by-id/robot-001/functions/echo/clone", p.OREAD) + if err != nil { + t.Fatalf("clone %d: %v", i, err) + } + idb, err := io.ReadAll(cl) + _ = cl.Close() + if err != nil { + t.Fatalf("read clone %d: %v", i, err) + } + ids = append(ids, strings.TrimSpace(string(idb))) + } + + // Sanity: every dir is reachable while A is connected. + for _, id := range ids { + f, err := cA.FOpen("/devices/by-id/robot-001/functions/echo/"+id+"/data", p.OREAD) + if err != nil { + t.Fatalf("dir %q should be reachable while A is connected: %v", id, err) + } + _ = f.Close() + } + + // Drop A entirely. ConnClosed must reap everything A allocated. + cleanA() + + // Give the server a brief moment to run its ConnClosed hook (it runs + // inline on the conn's goroutine, but the close itself races against + // the client returning from Unmount). + deadline := time.Now().Add(2 * time.Second) + cB, cleanB := mountClient(t, addr) + defer cleanB() + for { + stillThere := 0 + for _, id := range ids { + f, err := cB.FOpen("/devices/by-id/robot-001/functions/echo/"+id+"/data", p.OREAD) + if err == nil { + stillThere++ + _ = f.Close() + } + } + if stillThere == 0 { + break + } + if time.Now().After(deadline) { + t.Fatalf("after A disconnected, %d/%d dirs still reachable from B — ConnClosed did not reap", stillThere, N) + } + time.Sleep(20 * time.Millisecond) + } +} + func TestDeviceConnect_DeviceEventsReplay(t *testing.T) { src := make(chan Event, 8) backend := &fakeBackend{