Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
141 changes: 127 additions & 14 deletions cmd/marvel/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,10 @@ import (
"github.com/arcavenae/marvel/internal/api"
"github.com/arcavenae/marvel/internal/config"
"github.com/arcavenae/marvel/internal/daemon"
"github.com/arcavenae/marvel/internal/events"
"github.com/arcavenae/marvel/internal/keys"
"github.com/arcavenae/marvel/internal/paths"
"github.com/arcavenae/marvel/internal/rlog"
"github.com/arcavenae/marvel/internal/upgrade"
"github.com/spf13/cobra"
)
Expand Down Expand Up @@ -111,6 +113,7 @@ func main() {
root.AddCommand(keysCmd())
root.AddCommand(configCmd())
root.AddCommand(stopCmd())
root.AddCommand(eventsCmd())

if err := root.Execute(); err != nil {
os.Exit(1)
Expand All @@ -122,6 +125,9 @@ func daemonCmd() *cobra.Command {
var listenSocket string
var logFilePath string
var pidFilePath string
var logMaxSizeMiB int
var logMaxFiles int
var logMaxTotalMiB int

layout, _ := paths.Default()
defaultLog := ""
Expand Down Expand Up @@ -173,14 +179,14 @@ Examples:
}
var logCloser io.Closer
if logFilePath != "" {
closer, err := openLogFile(logFilePath)
closer, err := openRotatingLog(logFilePath, logMaxSizeMiB, logMaxFiles, logMaxTotalMiB)
if err != nil {
return err
}
logCloser = closer
defer func() { _ = logCloser.Close() }()
if f, ok := closer.(io.Writer); ok {
writers = append(writers, f)
if w, ok := closer.(io.Writer); ok {
writers = append(writers, w)
}
}
log.SetOutput(io.MultiWriter(writers...))
Expand Down Expand Up @@ -222,6 +228,15 @@ Examples:
"tee daemon stderr to this file (empty string disables)")
cmd.Flags().StringVar(&pidFilePath, "pidfile", defaultPid,
"write pid to this file on start, remove on stop (empty string disables)")
// Log rotation / retention — bounds disk usage for --log-file.
// Motivated by desk Pi headroom (aae-orc-k0t, Skippy session-025/026).
// Zero for any of these disables the corresponding limit.
cmd.Flags().IntVar(&logMaxSizeMiB, "log-max-size", 10,
"rotate --log-file when it exceeds this size in MiB (0 disables rotation)")
cmd.Flags().IntVar(&logMaxFiles, "log-max-files", 5,
"keep at most N gzipped archives of --log-file (0 keeps all)")
cmd.Flags().IntVar(&logMaxTotalMiB, "log-max-total", 0,
"cap total disk usage across --log-file and archives in MiB (0 disables)")

cmd.AddCommand(daemonLogsCmd())
return cmd
Expand Down Expand Up @@ -270,12 +285,107 @@ Examples:
return cmd
}

// openLogFile opens (appending) a log file at the given path. The
// caller is responsible for wiring it into log.SetOutput — this
// function only handles directory creation and permission enforcement
// so the log-file setup stays composable with the ring buffer / stderr
// tee decision in daemonCmd's RunE.
func openLogFile(path string) (io.Closer, error) {
// eventsCmd prints the daemon's structured event ring — the
// marvel-native equivalent of `kubectl get events`. Complements
// `marvel daemon logs` (raw stderr stream) with queryable,
// severity-tagged history scoped to workspaces, teams, roles, and
// sessions.
func eventsCmd() *cobra.Command {
var n int
var workspace, team, role, session, kind string
var warningsOnly bool
cmd := &cobra.Command{
Use: "events",
Short: "List recent session/team state-transition events",
Long: `Fetch the daemon's structured event ring and print matching events.

Events are emitted from session.Manager (session created, deleted,
crashed) and team.Controller (restart, crashloop-backoff, saturation,
shift started/completed, health failed). Each event has a timestamp,
kind, severity (info or warning), and session coordinates.

Examples:
marvel events # last 100 events
marvel events -n 500 # last 500 events
marvel events --workspace demo # filter by workspace
marvel events --session util/shell-g1-0 # filter by session key
marvel events --kind session.crashed # only crashes
marvel events --warnings # only warning-severity events
marvel --cluster desk events # remote daemon via mrvl://`,
RunE: func(cmd *cobra.Command, args []string) error {
params := map[string]any{"n": n}
if workspace != "" {
params["workspace"] = workspace
}
if team != "" {
params["team"] = team
}
if role != "" {
params["role"] = role
}
if session != "" {
params["session"] = session
}
if kind != "" {
params["kind"] = kind
}
if warningsOnly {
params["min_severity"] = "warning"
}
raw, _ := json.Marshal(params)
resp, err := send(daemon.Request{Method: "events", Params: raw})
if err != nil {
return err
}
if resp.Error != "" {
return fmt.Errorf("%s", resp.Error)
}
var result struct {
Events []events.Event `json:"events"`
}
if err := json.Unmarshal(resp.Result, &result); err != nil {
return fmt.Errorf("parse events: %w", err)
}
if len(result.Events) == 0 {
fmt.Println("no events")
return nil
}
tw := tabwriter.NewWriter(os.Stdout, 0, 0, 2, ' ', 0)
_, _ = fmt.Fprintln(tw, "TIME\tSEV\tKIND\tSESSION\tMESSAGE")
for _, ev := range result.Events {
sev := string(ev.Severity)
if sev == "" {
sev = "info"
}
sessRef := ev.Session
if sessRef == "" && ev.Team != "" {
sessRef = ev.Workspace + "/" + ev.Team
}
_, _ = fmt.Fprintf(tw, "%s\t%s\t%s\t%s\t%s\n",
ev.Timestamp.Format("15:04:05"), sev, ev.Kind, sessRef, ev.Message)
}
return tw.Flush()
},
}
cmd.Flags().IntVarP(&n, "lines", "n", 100, "number of events to return (0 = all buffered)")
cmd.Flags().StringVar(&workspace, "workspace", "", "filter by workspace")
cmd.Flags().StringVar(&team, "team", "", "filter by team")
cmd.Flags().StringVar(&role, "role", "", "filter by role")
cmd.Flags().StringVar(&session, "session", "", "filter by session key (workspace/name)")
cmd.Flags().StringVar(&kind, "kind", "", "filter by event kind (e.g. session.crashed, health.failed)")
cmd.Flags().BoolVar(&warningsOnly, "warnings", false, "show only warning-severity events")
return cmd
}

// openRotatingLog opens the daemon's on-disk log file with the rlog
// rotating writer. The three size/count ceilings are all opt-out (zero
// means "no limit") so existing `marvel daemon --log-file X` invocations
// keep the default raspi-friendly caps: 10 MiB per file, 5 archives.
//
// The caller is responsible for wiring the returned WriteCloser into
// log.SetOutput — this helper only handles directory creation,
// permission enforcement, and Options construction.
func openRotatingLog(path string, maxSizeMiB, maxFiles, maxTotalMiB int) (io.Closer, error) {
layout, _ := paths.Default()
if path == layout.DaemonLog() {
if err := layout.EnsureLogDir(); err != nil {
Expand All @@ -286,13 +396,16 @@ func openLogFile(path string) (io.Closer, error) {
return nil, fmt.Errorf("create log dir: %w", err)
}
}
f, err := os.OpenFile(path, os.O_APPEND|os.O_CREATE|os.O_WRONLY, paths.ModeAuthorized)
w, err := rlog.Open(path, rlog.Options{
MaxFileBytes: int64(maxSizeMiB) * 1024 * 1024,
MaxFiles: maxFiles,
MaxTotalBytes: int64(maxTotalMiB) * 1024 * 1024,
Mode: paths.ModeAuthorized,
})
if err != nil {
return nil, fmt.Errorf("open log file %s: %w", path, err)
return nil, fmt.Errorf("open rotating log %s: %w", path, err)
}
// Enforce the mode in case the file pre-existed with different perms.
_ = os.Chmod(path, paths.ModeAuthorized)
return f, nil
return w, nil
}

func workCmd() *cobra.Command {
Expand Down
57 changes: 57 additions & 0 deletions internal/daemon/daemon.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"golang.org/x/crypto/ssh/agent"

"github.com/arcavenae/marvel/internal/api"
"github.com/arcavenae/marvel/internal/events"
"github.com/arcavenae/marvel/internal/knownhosts"
"github.com/arcavenae/marvel/internal/logbuf"
"github.com/arcavenae/marvel/internal/paths"
Expand Down Expand Up @@ -93,6 +94,10 @@ type Daemon struct {

// In-memory ring of the most recent log lines. Always non-nil.
logs *logbuf.Buffer

// In-memory ring of structured state-transition events.
// Always non-nil.
events *events.Ring
}

// Options configures optional daemon behavior. Zero value disables
Expand All @@ -106,6 +111,10 @@ type Options struct {
// tees its log stream through. When nil, New allocates one at
// DefaultLogBufferLines. Tests may pre-allocate to inspect.
LogBuffer *logbuf.Buffer
// Events, when non-nil, is the structured event ring. When nil,
// New allocates one at events.DefaultCapacity. Tests may pre-
// allocate to inspect emitted events.
Events *events.Ring
}

// New creates a new daemon with default options.
Expand All @@ -129,13 +138,21 @@ func NewWithOptions(opts Options) (*Daemon, error) {
buf = logbuf.New(DefaultLogBufferLines)
}

evRing := opts.Events
if evRing == nil {
evRing = events.NewRing(events.DefaultCapacity)
}
sessMgr.Events = evRing
teamCtrl.Events = evRing

return &Daemon{
store: store,
sessMgr: sessMgr,
teamCtrl: teamCtrl,
driver: driver,
pidFile: opts.PidFile,
logs: buf,
events: evRing,
}, nil
}

Expand Down Expand Up @@ -356,6 +373,8 @@ func (d *Daemon) dispatch(req Request) Response {
return d.handleStop()
case "logs":
return d.handleLogs(req.Params)
case "events":
return d.handleEvents(req.Params)
default:
return Response{Error: fmt.Sprintf("unknown method: %s", req.Method)}
}
Expand Down Expand Up @@ -388,6 +407,44 @@ func (d *Daemon) handleLogs(params json.RawMessage) Response {
return Response{Result: data}
}

// Events params — filtered tail of the daemon's structured event ring.
type eventsParams struct {
N int `json:"n"` // number of events; <=0 returns the whole ring
Workspace string `json:"workspace,omitempty"`
Team string `json:"team,omitempty"`
Role string `json:"role,omitempty"`
Session string `json:"session,omitempty"`
Kind string `json:"kind,omitempty"`
MinSeverity string `json:"min_severity,omitempty"` // "" or "warning"
}

type eventsResult struct {
Events []events.Event `json:"events"`
}

func (d *Daemon) handleEvents(params json.RawMessage) Response {
var p eventsParams
if len(params) > 0 {
if err := json.Unmarshal(params, &p); err != nil {
return Response{Error: fmt.Sprintf("bad params: %v", err)}
}
}
f := events.Filter{
Workspace: p.Workspace,
Team: p.Team,
Role: p.Role,
Session: p.Session,
Kind: events.Kind(p.Kind),
MinSeverity: events.Severity(p.MinSeverity),
}
snap := d.events.Snapshot(f, p.N)
data, err := json.Marshal(eventsResult{Events: snap})
if err != nil {
return Response{Error: fmt.Sprintf("marshal events: %v", err)}
}
return Response{Result: data}
}

// Apply params
type applyParams struct {
ManifestData []byte `json:"manifest_data"`
Expand Down
Loading
Loading