From 25f5b2028900832f13220670fbbf784367696bcd Mon Sep 17 00:00:00 2001 From: Michael Pursifull Date: Sat, 18 Apr 2026 19:17:41 -0500 Subject: [PATCH 1/5] feat(rlog): size-rotating, gzip-compressing log writer with retention MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit New internal/rlog package — io.WriteCloser that rotates the active log file by size, gzips the rolled file, enforces a count + total-bytes retention ceiling (oldest archives deleted first), and exposes a stub Shipper hook so operators can wire up remote offload later without touching the writer. Motivated by desk Pi disk-quota headroom: the daemon's --log-file tee (~/.marvel/log/daemon.log) currently grows unbounded. A single misbehaving reconciler can fill the filesystem overnight, especially while the RRD-style dedup idea (aae-orc-4wz) is still just an idea. Shipper stays as an interface with a NoopShipper impl; the real transports (scp, s3, rsync, http POST) are a separate probe when the offload-to-central-logger direction gets prioritized. Refs: aae-orc-k0t, Skippy session-025/026 raspi feedback. --- internal/rlog/rlog.go | 346 +++++++++++++++++++++++++++++++++++++ internal/rlog/rlog_test.go | 257 +++++++++++++++++++++++++++ 2 files changed, 603 insertions(+) create mode 100644 internal/rlog/rlog.go create mode 100644 internal/rlog/rlog_test.go diff --git a/internal/rlog/rlog.go b/internal/rlog/rlog.go new file mode 100644 index 0000000..888b8c4 --- /dev/null +++ b/internal/rlog/rlog.go @@ -0,0 +1,346 @@ +// Package rlog provides a size-rotating, compressing log writer for the +// marvel daemon's on-disk log file. Designed for low-quota environments +// like the desk Pi — bounds total disk usage, gzip-compresses rolled +// files, and surfaces a Shipper hook so operators can offload archives +// elsewhere when they care to. +// +// Usage: +// +// w, err := rlog.Open(path, rlog.Options{ +// MaxFileBytes: 10 * 1024 * 1024, // 10 MiB +// MaxFiles: 5, // keep 5 gzipped archives +// }) +// if err != nil { ... } +// defer w.Close() +// log.SetOutput(w) +// +// The writer is safe for concurrent Write calls. +package rlog + +import ( + "compress/gzip" + "fmt" + "io" + "os" + "path/filepath" + "sort" + "strings" + "sync" + "time" +) + +// Options configures a rotating Writer. Zero values mean "no limit" for +// size-bounded fields, and "no retention" for count-bounded fields — +// pick sane values explicitly at the call site. +type Options struct { + // MaxFileBytes is the size threshold at which the active log file is + // rotated. Zero disables size-based rotation (file grows unbounded). + MaxFileBytes int64 + + // MaxFiles is the number of gzipped rotated files to keep alongside + // the active one. Zero keeps all of them (no count-based deletion). + MaxFiles int + + // MaxTotalBytes is the disk-usage ceiling across the active file plus + // all rotated archives. When exceeded, the oldest archives are + // deleted until the total is under the ceiling again. Zero disables. + MaxTotalBytes int64 + + // Mode is the file mode used when creating the active log file. Zero + // falls back to 0600. + Mode os.FileMode + + // Shipper, when non-nil, is invoked with the path of each newly + // rotated (and gzipped) archive before retention runs. Shipper + // errors are logged via the writer's ErrorLog but do not block + // rotation — archives are deleted on retention even if Ship failed. + Shipper Shipper + + // ErrorLog is where the writer reports background failures + // (shipping, retention). When nil, errors are written to stderr. + ErrorLog func(format string, args ...any) + + // now is an injection point for tests; nil means time.Now().UTC(). + now func() time.Time +} + +// Shipper offloads a rotated archive to some external destination. +// Implementations might upload to S3, scp to a central logger, or copy +// to a mounted filesystem. The stub [NoopShipper] does nothing. +// +// Ship is called synchronously from the writer's rotation path. Keep +// implementations fast or run the actual transfer asynchronously and +// return nil immediately. +type Shipper interface { + Ship(archivePath string) error +} + +// NoopShipper implements Shipper and does nothing. Use this to reserve +// the hook without yet deciding where archives go. +type NoopShipper struct{} + +// Ship satisfies the Shipper interface. +func (NoopShipper) Ship(string) error { return nil } + +// Writer is a size-rotating, gzip-compressing io.WriteCloser. Safe for +// concurrent Write calls. +type Writer struct { + path string + opts Options + + mu sync.Mutex + file *os.File + written int64 // bytes written to the active file since open +} + +// Open creates (or appends to) a rotating log at path with the given +// options. Existing content in the file is preserved and its size is +// consulted so Open after a restart doesn't reset the rotation counter. +func Open(path string, opts Options) (*Writer, error) { + if opts.Mode == 0 { + opts.Mode = 0o600 + } + if err := os.MkdirAll(filepath.Dir(path), 0o700); err != nil { + return nil, fmt.Errorf("create log dir: %w", err) + } + f, err := os.OpenFile(path, os.O_APPEND|os.O_CREATE|os.O_WRONLY, opts.Mode) + if err != nil { + return nil, fmt.Errorf("open log %s: %w", path, err) + } + _ = os.Chmod(path, opts.Mode) + info, err := f.Stat() + if err != nil { + _ = f.Close() + return nil, fmt.Errorf("stat log %s: %w", path, err) + } + return &Writer{ + path: path, + opts: opts, + file: f, + written: info.Size(), + }, nil +} + +// Write satisfies io.Writer. Rotation is decided *after* a write, so a +// single line never gets split across files — the one that trips the +// size threshold lands in the old file, then rotation happens before +// the next Write. +func (w *Writer) Write(p []byte) (int, error) { + w.mu.Lock() + defer w.mu.Unlock() + + if w.file == nil { + return 0, os.ErrClosed + } + n, err := w.file.Write(p) + w.written += int64(n) + if err != nil { + return n, err + } + if w.opts.MaxFileBytes > 0 && w.written >= w.opts.MaxFileBytes { + if rerr := w.rotateLocked(); rerr != nil { + w.logErr("rotate %s: %v", w.path, rerr) + } + } + return n, nil +} + +// Close closes the active file. The writer is unusable afterward. +func (w *Writer) Close() error { + w.mu.Lock() + defer w.mu.Unlock() + if w.file == nil { + return nil + } + err := w.file.Close() + w.file = nil + return err +} + +// Rotate forces a rotation even if the size threshold hasn't been +// reached. Useful for scheduled cuts (e.g., daily) or tests. +func (w *Writer) Rotate() error { + w.mu.Lock() + defer w.mu.Unlock() + return w.rotateLocked() +} + +func (w *Writer) rotateLocked() error { + if w.file == nil { + return os.ErrClosed + } + if err := w.file.Close(); err != nil { + return fmt.Errorf("close active: %w", err) + } + w.file = nil + + archivePath := "" + // Only bother rotating a non-empty file — rotating an empty file + // creates a zero-byte archive every time Rotate is called on a + // freshly-opened writer, which is noise. + if info, err := os.Stat(w.path); err == nil && info.Size() > 0 { + ts := w.now().Format("20060102T150405Z") + archivePath = fmt.Sprintf("%s.%s.gz", w.path, ts) + if err := compressAndReplace(w.path, archivePath); err != nil { + return fmt.Errorf("compress: %w", err) + } + } + + f, err := os.OpenFile(w.path, os.O_APPEND|os.O_CREATE|os.O_WRONLY, w.opts.Mode) + if err != nil { + return fmt.Errorf("reopen active: %w", err) + } + _ = os.Chmod(w.path, w.opts.Mode) + w.file = f + w.written = 0 + + if archivePath != "" && w.opts.Shipper != nil { + if err := w.opts.Shipper.Ship(archivePath); err != nil { + w.logErr("ship %s: %v", archivePath, err) + } + } + + if err := w.enforceRetentionLocked(); err != nil { + w.logErr("retention: %v", err) + } + return nil +} + +// compressAndReplace gzips src into dst, then removes src. Uses a +// temporary file to avoid leaving a half-written archive if the process +// dies mid-compression. +func compressAndReplace(src, dst string) error { + in, err := os.Open(src) + if err != nil { + return err + } + defer func() { _ = in.Close() }() + + tmp, err := os.CreateTemp(filepath.Dir(dst), filepath.Base(dst)+".tmp-*") + if err != nil { + return err + } + tmpPath := tmp.Name() + // Best-effort cleanup if something goes wrong before the rename. + cleanup := true + defer func() { + if cleanup { + _ = os.Remove(tmpPath) + } + }() + + gz := gzip.NewWriter(tmp) + if _, err := io.Copy(gz, in); err != nil { + _ = gz.Close() + _ = tmp.Close() + return err + } + if err := gz.Close(); err != nil { + _ = tmp.Close() + return err + } + if err := tmp.Close(); err != nil { + return err + } + if err := os.Rename(tmpPath, dst); err != nil { + return err + } + cleanup = false + return os.Remove(src) +} + +// enforceRetentionLocked applies MaxFiles and MaxTotalBytes caps, +// deleting the oldest archives (by embedded timestamp) first. Caller +// must hold w.mu. +func (w *Writer) enforceRetentionLocked() error { + if w.opts.MaxFiles <= 0 && w.opts.MaxTotalBytes <= 0 { + return nil + } + archives, err := listArchives(w.path) + if err != nil { + return err + } + // archives is sorted oldest-first (ascending timestamp). + if w.opts.MaxFiles > 0 { + for len(archives) > w.opts.MaxFiles { + if err := os.Remove(archives[0].path); err != nil { + return fmt.Errorf("delete %s: %w", archives[0].path, err) + } + archives = archives[1:] + } + } + if w.opts.MaxTotalBytes > 0 { + // Include the active file in the total. + activeSize := int64(0) + if info, err := os.Stat(w.path); err == nil { + activeSize = info.Size() + } + total := activeSize + for _, a := range archives { + total += a.size + } + for total > w.opts.MaxTotalBytes && len(archives) > 0 { + if err := os.Remove(archives[0].path); err != nil { + return fmt.Errorf("delete %s: %w", archives[0].path, err) + } + total -= archives[0].size + archives = archives[1:] + } + } + return nil +} + +type archiveInfo struct { + path string + size int64 +} + +// listArchives returns all rotated archives for basePath, sorted by the +// timestamp embedded in the filename (oldest first). +func listArchives(basePath string) ([]archiveInfo, error) { + dir := filepath.Dir(basePath) + prefix := filepath.Base(basePath) + "." + entries, err := os.ReadDir(dir) + if err != nil { + return nil, err + } + var archives []archiveInfo + for _, e := range entries { + if e.IsDir() { + continue + } + name := e.Name() + if !strings.HasPrefix(name, prefix) || !strings.HasSuffix(name, ".gz") { + continue + } + info, err := e.Info() + if err != nil { + continue + } + archives = append(archives, archiveInfo{ + path: filepath.Join(dir, name), + size: info.Size(), + }) + } + // Filename embeds a lexicographically-sortable timestamp, so string + // sort == chronological sort. + sort.Slice(archives, func(i, j int) bool { + return archives[i].path < archives[j].path + }) + return archives, nil +} + +func (w *Writer) now() time.Time { + if w.opts.now != nil { + return w.opts.now() + } + return time.Now().UTC() +} + +func (w *Writer) logErr(format string, args ...any) { + if w.opts.ErrorLog != nil { + w.opts.ErrorLog(format, args...) + return + } + fmt.Fprintf(os.Stderr, "rlog: "+format+"\n", args...) +} diff --git a/internal/rlog/rlog_test.go b/internal/rlog/rlog_test.go new file mode 100644 index 0000000..29caf1e --- /dev/null +++ b/internal/rlog/rlog_test.go @@ -0,0 +1,257 @@ +package rlog + +import ( + "bytes" + "compress/gzip" + "fmt" + "io" + "os" + "path/filepath" + "strings" + "sync" + "testing" + "time" +) + +// newTestClock returns an Options.now that advances one second per call — +// rotated filenames embed timestamps, so tests need distinct ones to +// keep lexicographic ordering meaningful and avoid clobbering archives +// created in the same wall-clock second. +func newTestClock(start time.Time) func() time.Time { + mu := sync.Mutex{} + t := start + return func() time.Time { + mu.Lock() + defer mu.Unlock() + out := t + t = t.Add(time.Second) + return out + } +} + +func TestWriterRotatesAtMaxSize(t *testing.T) { + dir := t.TempDir() + path := filepath.Join(dir, "test.log") + w, err := Open(path, Options{ + MaxFileBytes: 64, + MaxFiles: 10, + now: newTestClock(time.Date(2026, 4, 18, 0, 0, 0, 0, time.UTC)), + }) + if err != nil { + t.Fatalf("Open: %v", err) + } + t.Cleanup(func() { _ = w.Close() }) + + // Two writes each larger than the threshold — each should trigger + // rotation after it lands. + for i := 0; i < 2; i++ { + payload := fmt.Sprintf("line-%d %s\n", i, strings.Repeat("x", 80)) + if _, err := w.Write([]byte(payload)); err != nil { + t.Fatalf("write %d: %v", i, err) + } + } + + archives, err := listArchives(path) + if err != nil { + t.Fatalf("listArchives: %v", err) + } + if len(archives) != 2 { + t.Fatalf("expected 2 gzipped archives, got %d", len(archives)) + } + + // First archive should decompress to line-0. + checkArchiveContents(t, archives[0].path, "line-0") + checkArchiveContents(t, archives[1].path, "line-1") +} + +func TestWriterRetentionByCount(t *testing.T) { + dir := t.TempDir() + path := filepath.Join(dir, "test.log") + w, err := Open(path, Options{ + MaxFileBytes: 1, // rotate every write + MaxFiles: 2, + now: newTestClock(time.Date(2026, 4, 18, 0, 0, 0, 0, time.UTC)), + }) + if err != nil { + t.Fatalf("Open: %v", err) + } + t.Cleanup(func() { _ = w.Close() }) + + for i := 0; i < 5; i++ { + if _, err := fmt.Fprintf(w, "line-%d\n", i); err != nil { + t.Fatalf("write %d: %v", i, err) + } + } + + archives, err := listArchives(path) + if err != nil { + t.Fatalf("listArchives: %v", err) + } + if len(archives) != 2 { + t.Fatalf("expected 2 archives after retention, got %d", len(archives)) + } + // The two archives kept must be the newest — oldest-first delete + // means survivors are the last two. + checkArchiveContents(t, archives[0].path, "line-3") + checkArchiveContents(t, archives[1].path, "line-4") +} + +func TestWriterRetentionByTotalBytes(t *testing.T) { + dir := t.TempDir() + path := filepath.Join(dir, "test.log") + // Each line ~12 bytes uncompressed; gzip with tiny input is ~30-40 + // bytes wire size. Ceiling of 100 bytes across archives + active = + // retains only the most recent 1-2. + w, err := Open(path, Options{ + MaxFileBytes: 1, + MaxFiles: 100, // don't let MaxFiles kick in + MaxTotalBytes: 100, + now: newTestClock(time.Date(2026, 4, 18, 0, 0, 0, 0, time.UTC)), + }) + if err != nil { + t.Fatalf("Open: %v", err) + } + t.Cleanup(func() { _ = w.Close() }) + + for i := 0; i < 10; i++ { + if _, err := fmt.Fprintf(w, "line-%d\n", i); err != nil { + t.Fatalf("write %d: %v", i, err) + } + } + + archives, err := listArchives(path) + if err != nil { + t.Fatalf("listArchives: %v", err) + } + activeSize := int64(0) + if info, err := os.Stat(path); err == nil { + activeSize = info.Size() + } + total := activeSize + for _, a := range archives { + total += a.size + } + if total > 100 { + t.Fatalf("total bytes %d exceeds cap 100 (archives=%d)", total, len(archives)) + } +} + +func TestWriterShipperCalledOnRotate(t *testing.T) { + dir := t.TempDir() + path := filepath.Join(dir, "test.log") + shipped := []string{} + var mu sync.Mutex + w, err := Open(path, Options{ + MaxFileBytes: 1, + MaxFiles: 10, + Shipper: shipperFunc(func(archive string) error { + mu.Lock() + defer mu.Unlock() + shipped = append(shipped, archive) + return nil + }), + now: newTestClock(time.Date(2026, 4, 18, 0, 0, 0, 0, time.UTC)), + }) + if err != nil { + t.Fatalf("Open: %v", err) + } + t.Cleanup(func() { _ = w.Close() }) + + for i := 0; i < 3; i++ { + if _, err := fmt.Fprintf(w, "line-%d\n", i); err != nil { + t.Fatalf("write %d: %v", i, err) + } + } + + mu.Lock() + defer mu.Unlock() + if len(shipped) != 3 { + t.Fatalf("expected Shipper called 3 times, got %d", len(shipped)) + } + for _, s := range shipped { + if !strings.HasSuffix(s, ".gz") { + t.Fatalf("shipped path doesn't end .gz: %s", s) + } + } +} + +func TestWriterEmptyRotateNoArchive(t *testing.T) { + dir := t.TempDir() + path := filepath.Join(dir, "test.log") + w, err := Open(path, Options{ + MaxFileBytes: 100, + MaxFiles: 10, + }) + if err != nil { + t.Fatalf("Open: %v", err) + } + t.Cleanup(func() { _ = w.Close() }) + + // Force a rotate on an empty file — should not produce an archive. + if err := w.Rotate(); err != nil { + t.Fatalf("Rotate empty: %v", err) + } + archives, err := listArchives(path) + if err != nil { + t.Fatalf("listArchives: %v", err) + } + if len(archives) != 0 { + t.Fatalf("empty rotate should not create archive, got %d", len(archives)) + } +} + +func TestWriterAppendPreservesOnReopen(t *testing.T) { + dir := t.TempDir() + path := filepath.Join(dir, "test.log") + + w, err := Open(path, Options{MaxFileBytes: 0}) + if err != nil { + t.Fatalf("Open: %v", err) + } + if _, err := w.Write([]byte("first\n")); err != nil { + t.Fatal(err) + } + _ = w.Close() + + w2, err := Open(path, Options{MaxFileBytes: 0}) + if err != nil { + t.Fatalf("reopen: %v", err) + } + if _, err := w2.Write([]byte("second\n")); err != nil { + t.Fatal(err) + } + _ = w2.Close() + + b, err := os.ReadFile(path) + if err != nil { + t.Fatal(err) + } + if string(b) != "first\nsecond\n" { + t.Fatalf("expected both lines preserved, got %q", string(b)) + } +} + +type shipperFunc func(string) error + +func (f shipperFunc) Ship(p string) error { return f(p) } + +func checkArchiveContents(t *testing.T, archivePath, wantSubstring string) { + t.Helper() + f, err := os.Open(archivePath) + if err != nil { + t.Fatalf("open %s: %v", archivePath, err) + } + defer func() { _ = f.Close() }() + gz, err := gzip.NewReader(f) + if err != nil { + t.Fatalf("gzip reader %s: %v", archivePath, err) + } + defer func() { _ = gz.Close() }() + var buf bytes.Buffer + if _, err := io.Copy(&buf, gz); err != nil { + t.Fatalf("decompress %s: %v", archivePath, err) + } + if !strings.Contains(buf.String(), wantSubstring) { + t.Fatalf("archive %s: expected to contain %q, got %q", archivePath, wantSubstring, buf.String()) + } +} From f14fdb1ecc9d3fc9ae64799b33486e39422b15a2 Mon Sep 17 00:00:00 2001 From: Michael Pursifull Date: Sat, 18 Apr 2026 19:23:34 -0500 Subject: [PATCH 2/5] feat(daemon): rotate --log-file via rlog, new --log-max-* flags MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Replaces the unbounded os.OpenFile append with rlog.Writer. Defaults set raspi-friendly caps: - --log-max-size 10 (MiB; rotate at 10 MiB) - --log-max-files 5 (keep 5 gzipped archives) - --log-max-total 0 (total-bytes cap disabled by default) Zero in any slot disables that specific limit. Backward-compatible for the common path — `marvel daemon --log-file X` now rotates into `X..gz` archives instead of growing forever. Refs: aae-orc-k0t, Skippy session-025/026 raspi feedback. --- cmd/marvel/main.go | 47 ++++++++++++++++++++++++++++++++-------------- 1 file changed, 33 insertions(+), 14 deletions(-) diff --git a/cmd/marvel/main.go b/cmd/marvel/main.go index 76fca0e..40156b2 100644 --- a/cmd/marvel/main.go +++ b/cmd/marvel/main.go @@ -22,6 +22,7 @@ import ( "github.com/arcavenae/marvel/internal/daemon" "github.com/arcavenae/marvel/internal/keys" "github.com/arcavenae/marvel/internal/paths" + "github.com/arcavenae/marvel/internal/rlog" "github.com/arcavenae/marvel/internal/upgrade" "github.com/spf13/cobra" ) @@ -122,6 +123,9 @@ func daemonCmd() *cobra.Command { var listenSocket string var logFilePath string var pidFilePath string + var logMaxSizeMiB int + var logMaxFiles int + var logMaxTotalMiB int layout, _ := paths.Default() defaultLog := "" @@ -173,14 +177,14 @@ Examples: } var logCloser io.Closer if logFilePath != "" { - closer, err := openLogFile(logFilePath) + closer, err := openRotatingLog(logFilePath, logMaxSizeMiB, logMaxFiles, logMaxTotalMiB) if err != nil { return err } logCloser = closer defer func() { _ = logCloser.Close() }() - if f, ok := closer.(io.Writer); ok { - writers = append(writers, f) + if w, ok := closer.(io.Writer); ok { + writers = append(writers, w) } } log.SetOutput(io.MultiWriter(writers...)) @@ -222,6 +226,15 @@ Examples: "tee daemon stderr to this file (empty string disables)") cmd.Flags().StringVar(&pidFilePath, "pidfile", defaultPid, "write pid to this file on start, remove on stop (empty string disables)") + // Log rotation / retention — bounds disk usage for --log-file. + // Motivated by desk Pi headroom (aae-orc-k0t, Skippy session-025/026). + // Zero for any of these disables the corresponding limit. + cmd.Flags().IntVar(&logMaxSizeMiB, "log-max-size", 10, + "rotate --log-file when it exceeds this size in MiB (0 disables rotation)") + cmd.Flags().IntVar(&logMaxFiles, "log-max-files", 5, + "keep at most N gzipped archives of --log-file (0 keeps all)") + cmd.Flags().IntVar(&logMaxTotalMiB, "log-max-total", 0, + "cap total disk usage across --log-file and archives in MiB (0 disables)") cmd.AddCommand(daemonLogsCmd()) return cmd @@ -270,12 +283,15 @@ Examples: return cmd } -// openLogFile opens (appending) a log file at the given path. The -// caller is responsible for wiring it into log.SetOutput — this -// function only handles directory creation and permission enforcement -// so the log-file setup stays composable with the ring buffer / stderr -// tee decision in daemonCmd's RunE. -func openLogFile(path string) (io.Closer, error) { +// openRotatingLog opens the daemon's on-disk log file with the rlog +// rotating writer. The three size/count ceilings are all opt-out (zero +// means "no limit") so existing `marvel daemon --log-file X` invocations +// keep the default raspi-friendly caps: 10 MiB per file, 5 archives. +// +// The caller is responsible for wiring the returned WriteCloser into +// log.SetOutput — this helper only handles directory creation, +// permission enforcement, and Options construction. +func openRotatingLog(path string, maxSizeMiB, maxFiles, maxTotalMiB int) (io.Closer, error) { layout, _ := paths.Default() if path == layout.DaemonLog() { if err := layout.EnsureLogDir(); err != nil { @@ -286,13 +302,16 @@ func openLogFile(path string) (io.Closer, error) { return nil, fmt.Errorf("create log dir: %w", err) } } - f, err := os.OpenFile(path, os.O_APPEND|os.O_CREATE|os.O_WRONLY, paths.ModeAuthorized) + w, err := rlog.Open(path, rlog.Options{ + MaxFileBytes: int64(maxSizeMiB) * 1024 * 1024, + MaxFiles: maxFiles, + MaxTotalBytes: int64(maxTotalMiB) * 1024 * 1024, + Mode: paths.ModeAuthorized, + }) if err != nil { - return nil, fmt.Errorf("open log file %s: %w", path, err) + return nil, fmt.Errorf("open rotating log %s: %w", path, err) } - // Enforce the mode in case the file pre-existed with different perms. - _ = os.Chmod(path, paths.ModeAuthorized) - return f, nil + return w, nil } func workCmd() *cobra.Command { From d9baac7bbe2d15bf6c6e319b0ff0a69ccf34e00b Mon Sep 17 00:00:00 2001 From: Michael Pursifull Date: Sat, 18 Apr 2026 19:29:07 -0500 Subject: [PATCH 3/5] feat(events): bounded in-memory ring for state-transition events MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit New internal/events package — structured events (SessionCreated, SessionCrashed, SessionRestarted, HealthCheckFailed, ShiftStarted, etc.) stored in a bounded ring, filterable by workspace/team/role/ session/kind/severity. Complements internal/logbuf (raw daemon stderr stream) with queryable, structured history — marvel's equivalent of 'kubectl get events'. Producers (session.Manager, team.Controller) will be wired in the next commit. Emit() is nil-safe so a producer instantiated without a ring still runs without panic. Refs: aae-orc-k0t --- internal/events/events.go | 213 +++++++++++++++++++++++++++++++++ internal/events/events_test.go | 130 ++++++++++++++++++++ 2 files changed, 343 insertions(+) create mode 100644 internal/events/events.go create mode 100644 internal/events/events_test.go diff --git a/internal/events/events.go b/internal/events/events.go new file mode 100644 index 0000000..3b47c2a --- /dev/null +++ b/internal/events/events.go @@ -0,0 +1,213 @@ +// Package events provides a bounded in-memory ring of structured +// state-transition events — marvel's equivalent of `kubectl get events`. +// Complements internal/logbuf (raw daemon stderr stream) with +// queryable, filterable history keyed to sessions, teams, and +// workspaces. +// +// The ring is the primary data structure. An Emitter interface lets +// producers (session.Manager, team.Controller) emit without coupling +// to the ring type; tests can inject a DiscardEmitter. +package events + +import ( + "sync" + "time" +) + +// Kind identifies a class of event. These are stable string tags — +// clients can filter on them, dashboards can group on them. +type Kind string + +// Canonical event kinds. New producers should add entries here rather +// than inventing string literals at call sites. +const ( + KindSessionCreated Kind = "session.created" + KindSessionDeleted Kind = "session.deleted" + KindSessionCrashed Kind = "session.crashed" + KindSessionRestarted Kind = "session.restarted" + KindSessionFailed Kind = "session.failed" + KindHealthCheckFailed Kind = "health.failed" + KindCrashLoopBackoff Kind = "health.crashloop-backoff" + KindShiftStarted Kind = "team.shift-started" + KindShiftCompleted Kind = "team.shift-completed" + KindRoleSaturated Kind = "role.saturated" +) + +// Severity mirrors the kubernetes Warning/Normal distinction. Lets +// operators filter `marvel events --severity warning` for the things +// that need attention. +type Severity string + +const ( + SeverityInfo Severity = "info" + SeverityWarning Severity = "warning" +) + +// Event is one structured state-transition record. +type Event struct { + Timestamp time.Time `json:"ts"` + Kind Kind `json:"kind"` + Severity Severity `json:"severity"` + Workspace string `json:"workspace,omitempty"` + Team string `json:"team,omitempty"` + Role string `json:"role,omitempty"` + Session string `json:"session,omitempty"` + // Message is a short human-readable description. Keep it one + // line — operators scan dozens of these at a time. + Message string `json:"message"` +} + +// Emitter is what producers call to record an event. Nil is a safe +// value — callers use [Emit] which no-ops on a nil emitter. +type Emitter interface { + Emit(Event) +} + +// Emit is the producer-side sugar that handles nil emitters. Every +// caller in session.Manager / team.Controller goes through this so +// adding a new emission site is always safe regardless of whether +// the daemon wired the ring. +func Emit(e Emitter, ev Event) { + if e == nil { + return + } + if ev.Timestamp.IsZero() { + ev.Timestamp = time.Now().UTC() + } + if ev.Severity == "" { + ev.Severity = SeverityInfo + } + e.Emit(ev) +} + +// Discard is an Emitter that drops events. Useful in tests that don't +// care to assert on the event stream. +type Discard struct{} + +// Emit satisfies Emitter. +func (Discard) Emit(Event) {} + +// Ring is a bounded in-memory event buffer. Safe for concurrent +// Emit / Snapshot calls. +type Ring struct { + mu sync.Mutex + capacity int + buf []Event + head int // index of the oldest event when len(buf) == capacity + full bool +} + +// DefaultCapacity is the ring size used when NewRing is called with +// a zero or negative capacity. Sized to cover a couple of hours of +// typical cluster activity without getting close to daemon RSS +// concerns — events are tiny compared to the log ring. +const DefaultCapacity = 2000 + +// NewRing returns a fresh ring with the given capacity. Zero or +// negative falls back to DefaultCapacity. +func NewRing(capacity int) *Ring { + if capacity <= 0 { + capacity = DefaultCapacity + } + return &Ring{ + capacity: capacity, + buf: make([]Event, 0, capacity), + } +} + +// Emit satisfies Emitter. Appends to the tail, overwriting the head +// when full. +func (r *Ring) Emit(ev Event) { + r.mu.Lock() + defer r.mu.Unlock() + if ev.Timestamp.IsZero() { + ev.Timestamp = time.Now().UTC() + } + if ev.Severity == "" { + ev.Severity = SeverityInfo + } + if !r.full { + r.buf = append(r.buf, ev) + if len(r.buf) == r.capacity { + r.full = true + } + return + } + r.buf[r.head] = ev + r.head = (r.head + 1) % r.capacity +} + +// Filter selects events to include in Snapshot results. Empty fields +// match anything; set fields must match exactly. For MinSeverity, +// SeverityWarning matches only warnings; the zero value matches all. +type Filter struct { + Workspace string + Team string + Role string + Session string + Kind Kind + MinSeverity Severity +} + +// Snapshot returns up to `n` most recent events matching f, oldest-first +// (so the tail of the slice is the newest event). n<=0 returns all +// matching events. +func (r *Ring) Snapshot(f Filter, n int) []Event { + r.mu.Lock() + defer r.mu.Unlock() + + ordered := r.orderedLocked() + var out []Event + for _, ev := range ordered { + if !matches(ev, f) { + continue + } + out = append(out, ev) + } + if n > 0 && len(out) > n { + out = out[len(out)-n:] + } + return out +} + +// Len returns the number of events currently stored. +func (r *Ring) Len() int { + r.mu.Lock() + defer r.mu.Unlock() + return len(r.buf) +} + +// orderedLocked returns events oldest-first. Caller holds r.mu. +func (r *Ring) orderedLocked() []Event { + if !r.full { + out := make([]Event, len(r.buf)) + copy(out, r.buf) + return out + } + out := make([]Event, r.capacity) + copy(out, r.buf[r.head:]) + copy(out[r.capacity-r.head:], r.buf[:r.head]) + return out +} + +func matches(ev Event, f Filter) bool { + if f.Workspace != "" && f.Workspace != ev.Workspace { + return false + } + if f.Team != "" && f.Team != ev.Team { + return false + } + if f.Role != "" && f.Role != ev.Role { + return false + } + if f.Session != "" && f.Session != ev.Session { + return false + } + if f.Kind != "" && f.Kind != ev.Kind { + return false + } + if f.MinSeverity == SeverityWarning && ev.Severity != SeverityWarning { + return false + } + return true +} diff --git a/internal/events/events_test.go b/internal/events/events_test.go new file mode 100644 index 0000000..4e3240f --- /dev/null +++ b/internal/events/events_test.go @@ -0,0 +1,130 @@ +package events + +import ( + "sync" + "testing" + "time" +) + +func TestRingAppendsUntilFull(t *testing.T) { + r := NewRing(3) + for i := 0; i < 3; i++ { + r.Emit(Event{Kind: KindSessionCreated, Session: "s", Message: string(rune('a' + i))}) + } + if got := r.Len(); got != 3 { + t.Fatalf("Len=%d, want 3", got) + } + snap := r.Snapshot(Filter{}, 0) + if len(snap) != 3 { + t.Fatalf("Snapshot len=%d, want 3", len(snap)) + } + if snap[0].Message != "a" || snap[2].Message != "c" { + t.Fatalf("expected a..c, got %q..%q", snap[0].Message, snap[2].Message) + } +} + +func TestRingOverwritesOldestWhenFull(t *testing.T) { + r := NewRing(3) + for i := 0; i < 5; i++ { + r.Emit(Event{Kind: KindSessionCreated, Message: string(rune('a' + i))}) + } + if got := r.Len(); got != 3 { + t.Fatalf("Len=%d, want 3 (capacity)", got) + } + snap := r.Snapshot(Filter{}, 0) + // After 5 emits into capacity-3, survivors are c, d, e (oldest first). + if len(snap) != 3 { + t.Fatalf("Snapshot len=%d, want 3", len(snap)) + } + if snap[0].Message != "c" || snap[1].Message != "d" || snap[2].Message != "e" { + t.Fatalf("expected c,d,e — got %q,%q,%q", snap[0].Message, snap[1].Message, snap[2].Message) + } +} + +func TestRingSnapshotLimitN(t *testing.T) { + r := NewRing(10) + for i := 0; i < 10; i++ { + r.Emit(Event{Kind: KindSessionCreated, Message: string(rune('a' + i))}) + } + snap := r.Snapshot(Filter{}, 3) + if len(snap) != 3 { + t.Fatalf("Snapshot(n=3) len=%d, want 3", len(snap)) + } + // Tail should be the newest 3: h, i, j. + if snap[0].Message != "h" || snap[2].Message != "j" { + t.Fatalf("expected h..j, got %q..%q", snap[0].Message, snap[2].Message) + } +} + +func TestRingFilterBySession(t *testing.T) { + r := NewRing(10) + r.Emit(Event{Kind: KindSessionCreated, Session: "foo/a", Message: "hit"}) + r.Emit(Event{Kind: KindSessionCreated, Session: "foo/b", Message: "miss"}) + r.Emit(Event{Kind: KindSessionCrashed, Session: "foo/a", Message: "hit"}) + + snap := r.Snapshot(Filter{Session: "foo/a"}, 0) + if len(snap) != 2 { + t.Fatalf("expected 2 events for foo/a, got %d", len(snap)) + } + for _, ev := range snap { + if ev.Message != "hit" { + t.Fatalf("wrong event in filtered snapshot: %+v", ev) + } + } +} + +func TestRingFilterBySeverity(t *testing.T) { + r := NewRing(10) + r.Emit(Event{Kind: KindSessionCreated, Severity: SeverityInfo, Message: "info"}) + r.Emit(Event{Kind: KindSessionCrashed, Severity: SeverityWarning, Message: "warn"}) + + snap := r.Snapshot(Filter{MinSeverity: SeverityWarning}, 0) + if len(snap) != 1 { + t.Fatalf("expected 1 warning event, got %d", len(snap)) + } + if snap[0].Message != "warn" { + t.Fatalf("filtered event wrong: %+v", snap[0]) + } +} + +func TestEmitNilEmitterNoPanic(t *testing.T) { + // The zero-emission sugar has to be nil-safe — producers in + // session.Manager / team.Controller receive the ring through + // dependency injection and may legitimately have no emitter. + Emit(nil, Event{Kind: KindSessionCreated}) +} + +func TestEmitStampsAndSeverityDefault(t *testing.T) { + r := NewRing(1) + Emit(r, Event{Kind: KindSessionCreated}) + snap := r.Snapshot(Filter{}, 0) + if len(snap) != 1 { + t.Fatalf("expected 1 event, got %d", len(snap)) + } + if snap[0].Timestamp.IsZero() { + t.Error("expected Emit to stamp Timestamp") + } + if snap[0].Severity != SeverityInfo { + t.Errorf("expected default Severity=info, got %q", snap[0].Severity) + } +} + +func TestRingConcurrentEmit(t *testing.T) { + // Smoke test — must not deadlock or race (the -race build catches + // the latter). Goal is just concurrent access, not a specific count. + r := NewRing(1000) + var wg sync.WaitGroup + for i := 0; i < 10; i++ { + wg.Add(1) + go func() { + defer wg.Done() + for j := 0; j < 200; j++ { + r.Emit(Event{Kind: KindSessionCreated, Timestamp: time.Now(), Message: "x"}) + } + }() + } + wg.Wait() + if r.Len() == 0 { + t.Fatal("expected some events after concurrent emit") + } +} From 0db964fa29caf55817f57f0aaffbc1624bf8066c Mon Sep 17 00:00:00 2001 From: Michael Pursifull Date: Sat, 18 Apr 2026 19:32:30 -0500 Subject: [PATCH 4/5] feat(events): emit from session.Manager + team.Controller MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Wire events.Emitter through both producers. Nil-safe (events.Emit no-ops on a nil Emitter) so tests that don't inject a ring stay quiet and existing callers that haven't been updated still compile. Emission sites: - Manager.Create → session.created - Manager.Delete → session.deleted - Manager.ReapDead → session.crashed (warning) - Controller.restartSession: - role.saturated (warning) when MaxRestarts reached - health.crashloop-backoff (warning) on first backoff tick - session.restarted (warning) on actual restart - Controller.evaluateHealth → health.failed (warning) on first failure-threshold breach - Controller.InitiateShift → team.shift-started - Controller.reconcileShift completion → team.shift-completed Refs: aae-orc-k0t --- internal/session/manager.go | 32 +++++++++++++++++++++ internal/team/controller.go | 57 ++++++++++++++++++++++++++++++++++++- 2 files changed, 88 insertions(+), 1 deletion(-) diff --git a/internal/session/manager.go b/internal/session/manager.go index 3da0925..a2fd36b 100644 --- a/internal/session/manager.go +++ b/internal/session/manager.go @@ -9,6 +9,7 @@ import ( "time" "github.com/arcavenae/marvel/internal/api" + "github.com/arcavenae/marvel/internal/events" "github.com/arcavenae/marvel/internal/runtime" "github.com/arcavenae/marvel/internal/tmux" ) @@ -19,6 +20,11 @@ type Manager struct { driver *tmux.Driver adapters *runtime.Registry SocketPath string + // Events receives structured state-transition events. Nil is safe + // (all emission sites use events.Emit which no-ops on nil) so tests + // and callers that don't care about the event stream don't need to + // wire a ring. + Events events.Emitter } // NewManager creates a session manager with the default runtime adapter registry. @@ -102,6 +108,14 @@ func (m *Manager) Create(sess *api.Session) error { sess.PaneID = paneID sess.State = api.SessionRunning log.Printf("session %s running in pane %s", sess.Key(), paneID) + events.Emit(m.Events, events.Event{ + Kind: events.KindSessionCreated, + Workspace: sess.Workspace, + Team: sess.Team, + Role: sess.Role, + Session: sess.Key(), + Message: fmt.Sprintf("pane %s", paneID), + }) return nil } @@ -183,6 +197,14 @@ func (m *Manager) Delete(key string) error { } log.Printf("session %s deleted", key) + events.Emit(m.Events, events.Event{ + Kind: events.KindSessionDeleted, + Workspace: sess.Workspace, + Team: sess.Team, + Role: sess.Role, + Session: sess.Key(), + Message: "session deleted", + }) return nil } @@ -224,6 +246,7 @@ func (m *Manager) ReapDead() []ReapedSession { } if !m.driver.HasPane(sess.PaneID) { log.Printf("session %s: pane %s gone, marking crashed", sess.Key(), sess.PaneID) + lostPane := sess.PaneID m.clearStaleCrashed(sessions, sess.Workspace, sess.Team, sess.Role, sess.Key()) sess.State = api.SessionCrashed sess.PaneID = "" @@ -233,6 +256,15 @@ func (m *Manager) ReapDead() []ReapedSession { Team: sess.Team, Role: sess.Role, }) + events.Emit(m.Events, events.Event{ + Kind: events.KindSessionCrashed, + Severity: events.SeverityWarning, + Workspace: sess.Workspace, + Team: sess.Team, + Role: sess.Role, + Session: sess.Key(), + Message: fmt.Sprintf("pane %s gone", lostPane), + }) } } return reaped diff --git a/internal/team/controller.go b/internal/team/controller.go index c16d050..9f77d53 100644 --- a/internal/team/controller.go +++ b/internal/team/controller.go @@ -12,6 +12,7 @@ import ( "time" "github.com/arcavenae/marvel/internal/api" + "github.com/arcavenae/marvel/internal/events" "github.com/arcavenae/marvel/internal/session" ) @@ -20,7 +21,9 @@ type Controller struct { store *api.Store sessMgr *session.Manager SocketPath string - mu sync.Mutex + // Events receives structured state-transition events. Nil is safe. + Events events.Emitter + mu sync.Mutex // roleHealth tracks per-role crash-loop state: restart count and // next-allowed-restart deadline. Keyed by workspace/team/role so @@ -319,6 +322,17 @@ func (c *Controller) evaluateHealth() { } if sess.FailureCount >= role.HealthCheck.FailureThreshold { + if sess.HealthState != api.HealthUnhealthy { + events.Emit(c.Events, events.Event{ + Kind: events.KindHealthCheckFailed, + Severity: events.SeverityWarning, + Workspace: sess.Workspace, + Team: sess.Team, + Role: sess.Role, + Session: sess.Key(), + Message: fmt.Sprintf("heartbeat stale %d/%d failures", sess.FailureCount, role.HealthCheck.FailureThreshold), + }) + } sess.HealthState = api.HealthUnhealthy c.applyRestartPolicy(sess, t, role) } else { @@ -368,6 +382,17 @@ func (c *Controller) restartSession(sess *api.Session, t *api.Team, role *api.Ro // Checked before saturation so we don't clobber a CrashLoopBackOff // marker with Failed on the tick that hits MaxRestarts. if now.Before(rh.BackoffUntil) { + if sess.State != api.SessionCrashLoopBackOff { + events.Emit(c.Events, events.Event{ + Kind: events.KindCrashLoopBackoff, + Severity: events.SeverityWarning, + Workspace: t.Workspace, + Team: t.Name, + Role: role.Name, + Session: sess.Key(), + Message: fmt.Sprintf("cooling down, backoff until %s", rh.BackoffUntil.Format(time.RFC3339)), + }) + } sess.State = api.SessionCrashLoopBackOff return } @@ -380,6 +405,15 @@ func (c *Controller) restartSession(sess *api.Session, t *api.Team, role *api.Ro if sess.State != api.SessionFailed { log.Printf("health: session %s: role %s hit max_restarts=%d, not restarting", sess.Key(), roleKey, role.MaxRestarts) + events.Emit(c.Events, events.Event{ + Kind: events.KindRoleSaturated, + Severity: events.SeverityWarning, + Workspace: t.Workspace, + Team: t.Name, + Role: role.Name, + Session: sess.Key(), + Message: fmt.Sprintf("max_restarts=%d reached", role.MaxRestarts), + }) } sess.State = api.SessionFailed return @@ -387,6 +421,15 @@ func (c *Controller) restartSession(sess *api.Session, t *api.Team, role *api.Ro log.Printf("health: restarting session %s (role %s restart #%d, next backoff=%s)", sess.Key(), roleKey, rh.RestartCount, time.Until(rh.BackoffUntil)) + events.Emit(c.Events, events.Event{ + Kind: events.KindSessionRestarted, + Severity: events.SeverityWarning, + Workspace: t.Workspace, + Team: t.Name, + Role: role.Name, + Session: sess.Key(), + Message: fmt.Sprintf("restart #%d, next backoff %s", rh.RestartCount, time.Until(rh.BackoffUntil)), + }) sess.RestartCount++ sess.State = api.SessionFailed if err := c.sessMgr.Delete(sess.Key()); err != nil { @@ -441,6 +484,12 @@ func (c *Controller) InitiateShift(teamKey, role string) error { } log.Printf("shift: initiated for %s gen %d→%d roles=%v", teamKey, oldGen, t.Generation, roles) + events.Emit(c.Events, events.Event{ + Kind: events.KindShiftStarted, + Workspace: t.Workspace, + Team: t.Name, + Message: fmt.Sprintf("gen %d→%d roles=%v", oldGen, t.Generation, roles), + }) return nil } @@ -466,6 +515,12 @@ func (c *Controller) reconcileShift(t *api.Team) { if t.Shift.RoleIndex >= len(t.Shift.Roles) { // All roles shifted — complete. log.Printf("shift: complete for %s/%s", t.Workspace, t.Name) + events.Emit(c.Events, events.Event{ + Kind: events.KindShiftCompleted, + Workspace: t.Workspace, + Team: t.Name, + Message: fmt.Sprintf("gen %d active", t.Generation), + }) t.Shift = api.ShiftState{} return } From a18106563489fbb22f8e42dbd63164f744b0c658 Mon Sep 17 00:00:00 2001 From: Michael Pursifull Date: Sat, 18 Apr 2026 19:39:37 -0500 Subject: [PATCH 5/5] feat(events): daemon ring + events RPC + 'marvel events' CLI MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Daemon owns an events.Ring (default 2000-event capacity), wires it to session.Manager and team.Controller at construction. New 'events' RPC method returns a filtered snapshot; new top-level 'marvel events' command prints a tabulated view with filters for workspace, team, role, session key, kind, and warnings-only. Usage: marvel events # last 100 events marvel events -n 500 marvel events --session demo/shell-g1-0 marvel events --kind session.crashed marvel events --warnings marvel --cluster desk events # remote via mrvl:// Complements 'marvel daemon logs' (raw stderr stream) with structured, queryable state-transition history — marvel's 'kubectl get events'. Refs: aae-orc-k0t --- cmd/marvel/main.go | 94 +++++++++++++++++++++++++++++++++++++++ internal/daemon/daemon.go | 57 ++++++++++++++++++++++++ 2 files changed, 151 insertions(+) diff --git a/cmd/marvel/main.go b/cmd/marvel/main.go index 40156b2..d24a67c 100644 --- a/cmd/marvel/main.go +++ b/cmd/marvel/main.go @@ -20,6 +20,7 @@ import ( "github.com/arcavenae/marvel/internal/api" "github.com/arcavenae/marvel/internal/config" "github.com/arcavenae/marvel/internal/daemon" + "github.com/arcavenae/marvel/internal/events" "github.com/arcavenae/marvel/internal/keys" "github.com/arcavenae/marvel/internal/paths" "github.com/arcavenae/marvel/internal/rlog" @@ -112,6 +113,7 @@ func main() { root.AddCommand(keysCmd()) root.AddCommand(configCmd()) root.AddCommand(stopCmd()) + root.AddCommand(eventsCmd()) if err := root.Execute(); err != nil { os.Exit(1) @@ -283,6 +285,98 @@ Examples: return cmd } +// eventsCmd prints the daemon's structured event ring — the +// marvel-native equivalent of `kubectl get events`. Complements +// `marvel daemon logs` (raw stderr stream) with queryable, +// severity-tagged history scoped to workspaces, teams, roles, and +// sessions. +func eventsCmd() *cobra.Command { + var n int + var workspace, team, role, session, kind string + var warningsOnly bool + cmd := &cobra.Command{ + Use: "events", + Short: "List recent session/team state-transition events", + Long: `Fetch the daemon's structured event ring and print matching events. + +Events are emitted from session.Manager (session created, deleted, +crashed) and team.Controller (restart, crashloop-backoff, saturation, +shift started/completed, health failed). Each event has a timestamp, +kind, severity (info or warning), and session coordinates. + +Examples: + marvel events # last 100 events + marvel events -n 500 # last 500 events + marvel events --workspace demo # filter by workspace + marvel events --session util/shell-g1-0 # filter by session key + marvel events --kind session.crashed # only crashes + marvel events --warnings # only warning-severity events + marvel --cluster desk events # remote daemon via mrvl://`, + RunE: func(cmd *cobra.Command, args []string) error { + params := map[string]any{"n": n} + if workspace != "" { + params["workspace"] = workspace + } + if team != "" { + params["team"] = team + } + if role != "" { + params["role"] = role + } + if session != "" { + params["session"] = session + } + if kind != "" { + params["kind"] = kind + } + if warningsOnly { + params["min_severity"] = "warning" + } + raw, _ := json.Marshal(params) + resp, err := send(daemon.Request{Method: "events", Params: raw}) + if err != nil { + return err + } + if resp.Error != "" { + return fmt.Errorf("%s", resp.Error) + } + var result struct { + Events []events.Event `json:"events"` + } + if err := json.Unmarshal(resp.Result, &result); err != nil { + return fmt.Errorf("parse events: %w", err) + } + if len(result.Events) == 0 { + fmt.Println("no events") + return nil + } + tw := tabwriter.NewWriter(os.Stdout, 0, 0, 2, ' ', 0) + _, _ = fmt.Fprintln(tw, "TIME\tSEV\tKIND\tSESSION\tMESSAGE") + for _, ev := range result.Events { + sev := string(ev.Severity) + if sev == "" { + sev = "info" + } + sessRef := ev.Session + if sessRef == "" && ev.Team != "" { + sessRef = ev.Workspace + "/" + ev.Team + } + _, _ = fmt.Fprintf(tw, "%s\t%s\t%s\t%s\t%s\n", + ev.Timestamp.Format("15:04:05"), sev, ev.Kind, sessRef, ev.Message) + } + return tw.Flush() + }, + } + cmd.Flags().IntVarP(&n, "lines", "n", 100, "number of events to return (0 = all buffered)") + cmd.Flags().StringVar(&workspace, "workspace", "", "filter by workspace") + cmd.Flags().StringVar(&team, "team", "", "filter by team") + cmd.Flags().StringVar(&role, "role", "", "filter by role") + cmd.Flags().StringVar(&session, "session", "", "filter by session key (workspace/name)") + cmd.Flags().StringVar(&kind, "kind", "", "filter by event kind (e.g. session.crashed, health.failed)") + cmd.Flags().BoolVar(&warningsOnly, "warnings", false, "show only warning-severity events") + return cmd +} + // openRotatingLog opens the daemon's on-disk log file with the rlog // rotating writer. The three size/count ceilings are all opt-out (zero // means "no limit") so existing `marvel daemon --log-file X` invocations diff --git a/internal/daemon/daemon.go b/internal/daemon/daemon.go index 957f0aa..4264402 100644 --- a/internal/daemon/daemon.go +++ b/internal/daemon/daemon.go @@ -23,6 +23,7 @@ import ( "golang.org/x/crypto/ssh/agent" "github.com/arcavenae/marvel/internal/api" + "github.com/arcavenae/marvel/internal/events" "github.com/arcavenae/marvel/internal/knownhosts" "github.com/arcavenae/marvel/internal/logbuf" "github.com/arcavenae/marvel/internal/paths" @@ -93,6 +94,10 @@ type Daemon struct { // In-memory ring of the most recent log lines. Always non-nil. logs *logbuf.Buffer + + // In-memory ring of structured state-transition events. + // Always non-nil. + events *events.Ring } // Options configures optional daemon behavior. Zero value disables @@ -106,6 +111,10 @@ type Options struct { // tees its log stream through. When nil, New allocates one at // DefaultLogBufferLines. Tests may pre-allocate to inspect. LogBuffer *logbuf.Buffer + // Events, when non-nil, is the structured event ring. When nil, + // New allocates one at events.DefaultCapacity. Tests may pre- + // allocate to inspect emitted events. + Events *events.Ring } // New creates a new daemon with default options. @@ -129,6 +138,13 @@ func NewWithOptions(opts Options) (*Daemon, error) { buf = logbuf.New(DefaultLogBufferLines) } + evRing := opts.Events + if evRing == nil { + evRing = events.NewRing(events.DefaultCapacity) + } + sessMgr.Events = evRing + teamCtrl.Events = evRing + return &Daemon{ store: store, sessMgr: sessMgr, @@ -136,6 +152,7 @@ func NewWithOptions(opts Options) (*Daemon, error) { driver: driver, pidFile: opts.PidFile, logs: buf, + events: evRing, }, nil } @@ -356,6 +373,8 @@ func (d *Daemon) dispatch(req Request) Response { return d.handleStop() case "logs": return d.handleLogs(req.Params) + case "events": + return d.handleEvents(req.Params) default: return Response{Error: fmt.Sprintf("unknown method: %s", req.Method)} } @@ -388,6 +407,44 @@ func (d *Daemon) handleLogs(params json.RawMessage) Response { return Response{Result: data} } +// Events params — filtered tail of the daemon's structured event ring. +type eventsParams struct { + N int `json:"n"` // number of events; <=0 returns the whole ring + Workspace string `json:"workspace,omitempty"` + Team string `json:"team,omitempty"` + Role string `json:"role,omitempty"` + Session string `json:"session,omitempty"` + Kind string `json:"kind,omitempty"` + MinSeverity string `json:"min_severity,omitempty"` // "" or "warning" +} + +type eventsResult struct { + Events []events.Event `json:"events"` +} + +func (d *Daemon) handleEvents(params json.RawMessage) Response { + var p eventsParams + if len(params) > 0 { + if err := json.Unmarshal(params, &p); err != nil { + return Response{Error: fmt.Sprintf("bad params: %v", err)} + } + } + f := events.Filter{ + Workspace: p.Workspace, + Team: p.Team, + Role: p.Role, + Session: p.Session, + Kind: events.Kind(p.Kind), + MinSeverity: events.Severity(p.MinSeverity), + } + snap := d.events.Snapshot(f, p.N) + data, err := json.Marshal(eventsResult{Events: snap}) + if err != nil { + return Response{Error: fmt.Sprintf("marshal events: %v", err)} + } + return Response{Result: data} +} + // Apply params type applyParams struct { ManifestData []byte `json:"manifest_data"`