diff --git a/.claude/rules/go.md b/.claude/rules/go.md index 0348482..42d1a31 100644 --- a/.claude/rules/go.md +++ b/.claude/rules/go.md @@ -92,6 +92,55 @@ time.Now().UTC() // yes time.Now() // no ``` +**12. Never return internal pointers from a locked accessor** + +If a type owns state behind a mutex, its `Get*`/`List*` methods return +value copies (deep enough — clone nested slices/maps/pointers). Mutation +goes through methods on that owning type; those methods take the lock +and mutate in place. A returned `*T` into concurrently-mutated state is +a bug — the lock protects the map, not the values. + +```go +// WRONG — caller holds a pointer into the store's live object, +// mutates it after the lock is dropped, other callers see torn writes +func (s *Store) ListSessions() []*Session { + s.mu.RLock() + defer s.mu.RUnlock() + out := make([]*Session, 0, len(s.sessions)) + for _, sess := range s.sessions { + out = append(out, sess) // leaks internal pointer + } + return out +} + +// RIGHT — snapshots fully decoupled from internal state +func (s *Store) ListSessions() []Session { + s.mu.RLock() + defer s.mu.RUnlock() + out := make([]Session, 0, len(s.sessions)) + for _, sess := range s.sessions { + out = append(out, cloneSession(sess)) + } + return out +} + +// RIGHT — mutation goes through the owner under the lock +func (s *Store) UpdateSession(key string, fn func(*Session) error) error { + s.mu.Lock() + defer s.mu.Unlock() + sess, ok := s.sessions[key] + if !ok { + return fmt.Errorf("session %s: %w", key, ErrNotFound) + } + return fn(sess) +} +``` + +The Kubernetes lister/informer pattern is the canonical example: cached +reads give immutable snapshots, writes go through the API. `go test -race` +is the backstop; the type signature is the fence. See orc +finding-032-store-sync-contract-leak. + ## Error handling - Every exported function that can fail returns `error` last diff --git a/internal/api/manifest.go b/internal/api/manifest.go index 231d29b..eed14b7 100644 --- a/internal/api/manifest.go +++ b/internal/api/manifest.go @@ -267,10 +267,15 @@ func (m *Manifest) Apply(store *Store) error { Generation: 1, CreatedAt: now, } - // Update roles if team already exists. - existing, err := store.GetTeam(team.Key()) - if err == nil { - existing.Roles = roles + // Update roles if team already exists; route through the store + // lock so the mutation doesn't race concurrent readers. + if _, err := store.GetTeam(team.Key()); err == nil { + if err := store.UpdateTeam(team.Key(), func(live *Team) error { + live.Roles = roles + return nil + }); err != nil { + return fmt.Errorf("apply team %s: %w", mt.Name, err) + } } else { if err := store.CreateTeam(team); err != nil { return fmt.Errorf("apply team %s: %w", mt.Name, err) diff --git a/internal/api/store.go b/internal/api/store.go index fbd8d2e..0dc0897 100644 --- a/internal/api/store.go +++ b/internal/api/store.go @@ -2,6 +2,7 @@ package api import ( "fmt" + "slices" "sync" "time" ) @@ -12,7 +13,11 @@ var ErrNotFound = fmt.Errorf("resource not found") // ErrAlreadyExists is returned when a resource already exists in the store. var ErrAlreadyExists = fmt.Errorf("resource already exists") -// Store holds all marvel resources in memory. +// Store holds all marvel resources in memory. The store is the +// synchronization boundary: all reads return value snapshots (decoupled +// from internal state), and all mutations go through Update* methods that +// take the write lock. Pointers to internal objects never escape the +// store. See orc finding-032. type Store struct { mu sync.RWMutex workspaces map[string]*Workspace @@ -31,34 +36,85 @@ func NewStore() *Store { } } +// --- clone helpers --- + +// Session/Workspace/Endpoint are either flat or contain a Runtime whose +// only aliasable field is Args. Team contains Roles (each with a +// HealthCheck pointer) and a Shift (with a Roles []string). Clone deeply +// enough that a snapshot is safe to mutate or marshal while the store +// continues to update the live objects. + +func cloneRuntime(r Runtime) Runtime { + out := r + if len(r.Args) > 0 { + out.Args = slices.Clone(r.Args) + } + return out +} + +func cloneSession(s *Session) Session { + out := *s + out.Runtime = cloneRuntime(s.Runtime) + return out +} + +func cloneRole(r Role) Role { + out := r + out.Runtime = cloneRuntime(r.Runtime) + if r.HealthCheck != nil { + hc := *r.HealthCheck + out.HealthCheck = &hc + } + return out +} + +func cloneTeam(t *Team) Team { + out := *t + if len(t.Roles) > 0 { + out.Roles = make([]Role, len(t.Roles)) + for i, r := range t.Roles { + out.Roles[i] = cloneRole(r) + } + } + if len(t.Shift.Roles) > 0 { + out.Shift.Roles = slices.Clone(t.Shift.Roles) + } + return out +} + // Workspace operations +// CreateWorkspace clones the input into the store. The caller's pointer +// is not aliased with store state. func (s *Store) CreateWorkspace(w *Workspace) error { s.mu.Lock() defer s.mu.Unlock() if _, ok := s.workspaces[w.Key()]; ok { return fmt.Errorf("workspace %s: %w", w.Key(), ErrAlreadyExists) } - s.workspaces[w.Key()] = w + c := *w + s.workspaces[w.Key()] = &c return nil } -func (s *Store) GetWorkspace(name string) (*Workspace, error) { +// GetWorkspace returns a snapshot of the named workspace. +func (s *Store) GetWorkspace(name string) (Workspace, error) { s.mu.RLock() defer s.mu.RUnlock() w, ok := s.workspaces[name] if !ok { - return nil, fmt.Errorf("workspace %s: %w", name, ErrNotFound) + return Workspace{}, fmt.Errorf("workspace %s: %w", name, ErrNotFound) } - return w, nil + return *w, nil } -func (s *Store) ListWorkspaces() []*Workspace { +// ListWorkspaces returns snapshots of all workspaces. +func (s *Store) ListWorkspaces() []Workspace { s.mu.RLock() defer s.mu.RUnlock() - result := make([]*Workspace, 0, len(s.workspaces)) + result := make([]Workspace, 0, len(s.workspaces)) for _, w := range s.workspaces { - result = append(result, w) + result = append(result, *w) } return result } @@ -75,67 +131,77 @@ func (s *Store) DeleteWorkspace(name string) error { // Session operations +// CreateSession clones the input into the store. The caller's pointer +// is not aliased with store state; further mutation of sess does not +// affect the store. Use UpdateSession to commit subsequent changes. func (s *Store) CreateSession(sess *Session) error { s.mu.Lock() defer s.mu.Unlock() if _, ok := s.sessions[sess.Key()]; ok { return fmt.Errorf("session %s: %w", sess.Key(), ErrAlreadyExists) } - s.sessions[sess.Key()] = sess + c := cloneSession(sess) + s.sessions[sess.Key()] = &c return nil } -func (s *Store) GetSession(key string) (*Session, error) { +// GetSession returns a snapshot of the named session. +func (s *Store) GetSession(key string) (Session, error) { s.mu.RLock() defer s.mu.RUnlock() sess, ok := s.sessions[key] if !ok { - return nil, fmt.Errorf("session %s: %w", key, ErrNotFound) + return Session{}, fmt.Errorf("session %s: %w", key, ErrNotFound) } - return sess, nil + return cloneSession(sess), nil } -func (s *Store) ListSessions() []*Session { +// ListSessions returns snapshots of all sessions. +func (s *Store) ListSessions() []Session { s.mu.RLock() defer s.mu.RUnlock() - result := make([]*Session, 0, len(s.sessions)) + result := make([]Session, 0, len(s.sessions)) for _, sess := range s.sessions { - result = append(result, sess) + result = append(result, cloneSession(sess)) } return result } -func (s *Store) ListSessionsByTeam(workspace, team string) []*Session { +// ListSessionsByTeam returns snapshots of sessions in the given team. +func (s *Store) ListSessionsByTeam(workspace, team string) []Session { s.mu.RLock() defer s.mu.RUnlock() - var result []*Session + var result []Session for _, sess := range s.sessions { if sess.Workspace == workspace && sess.Team == team { - result = append(result, sess) + result = append(result, cloneSession(sess)) } } return result } -func (s *Store) ListSessionsByTeamRole(workspace, team, role string) []*Session { +// ListSessionsByTeamRole returns snapshots of sessions in the given team and role. +func (s *Store) ListSessionsByTeamRole(workspace, team, role string) []Session { s.mu.RLock() defer s.mu.RUnlock() - var result []*Session + var result []Session for _, sess := range s.sessions { if sess.Workspace == workspace && sess.Team == team && sess.Role == role { - result = append(result, sess) + result = append(result, cloneSession(sess)) } } return result } -func (s *Store) ListSessionsByTeamRoleGeneration(workspace, team, role string, generation int64) []*Session { +// ListSessionsByTeamRoleGeneration returns snapshots of sessions in the +// given team, role, and generation. +func (s *Store) ListSessionsByTeamRoleGeneration(workspace, team, role string, generation int64) []Session { s.mu.RLock() defer s.mu.RUnlock() - var result []*Session + var result []Session for _, sess := range s.sessions { if sess.Workspace == workspace && sess.Team == team && sess.Role == role && sess.Generation == generation { - result = append(result, sess) + result = append(result, cloneSession(sess)) } } return result @@ -151,34 +217,54 @@ func (s *Store) DeleteSession(key string) error { return nil } +// UpdateSession applies fn to the live session under the write lock. +// The pointer passed to fn is valid only for fn's execution — do not +// stash it. Returning an error from fn aborts the update (no state is +// rolled back; the caller is responsible for not making partial writes +// that don't make sense together). +func (s *Store) UpdateSession(key string, fn func(*Session) error) error { + s.mu.Lock() + defer s.mu.Unlock() + sess, ok := s.sessions[key] + if !ok { + return fmt.Errorf("session %s: %w", key, ErrNotFound) + } + return fn(sess) +} + // Team operations +// CreateTeam clones the input into the store. The caller's pointer is +// not aliased with store state. func (s *Store) CreateTeam(t *Team) error { s.mu.Lock() defer s.mu.Unlock() if _, ok := s.teams[t.Key()]; ok { return fmt.Errorf("team %s: %w", t.Key(), ErrAlreadyExists) } - s.teams[t.Key()] = t + c := cloneTeam(t) + s.teams[t.Key()] = &c return nil } -func (s *Store) GetTeam(key string) (*Team, error) { +// GetTeam returns a snapshot of the named team. +func (s *Store) GetTeam(key string) (Team, error) { s.mu.RLock() defer s.mu.RUnlock() t, ok := s.teams[key] if !ok { - return nil, fmt.Errorf("team %s: %w", key, ErrNotFound) + return Team{}, fmt.Errorf("team %s: %w", key, ErrNotFound) } - return t, nil + return cloneTeam(t), nil } -func (s *Store) ListTeams() []*Team { +// ListTeams returns snapshots of all teams. +func (s *Store) ListTeams() []Team { s.mu.RLock() defer s.mu.RUnlock() - result := make([]*Team, 0, len(s.teams)) + result := make([]Team, 0, len(s.teams)) for _, t := range s.teams { - result = append(result, t) + result = append(result, cloneTeam(t)) } return result } @@ -193,34 +279,50 @@ func (s *Store) DeleteTeam(key string) error { return nil } +// UpdateTeam applies fn to the live team under the write lock. Same +// pointer-lifetime rules as UpdateSession. +func (s *Store) UpdateTeam(key string, fn func(*Team) error) error { + s.mu.Lock() + defer s.mu.Unlock() + t, ok := s.teams[key] + if !ok { + return fmt.Errorf("team %s: %w", key, ErrNotFound) + } + return fn(t) +} + // Endpoint operations +// CreateEndpoint clones the input into the store. func (s *Store) CreateEndpoint(e *Endpoint) error { s.mu.Lock() defer s.mu.Unlock() if _, ok := s.endpoints[e.Key()]; ok { return fmt.Errorf("endpoint %s: %w", e.Key(), ErrAlreadyExists) } - s.endpoints[e.Key()] = e + c := *e + s.endpoints[e.Key()] = &c return nil } -func (s *Store) GetEndpoint(key string) (*Endpoint, error) { +// GetEndpoint returns a snapshot of the named endpoint. +func (s *Store) GetEndpoint(key string) (Endpoint, error) { s.mu.RLock() defer s.mu.RUnlock() e, ok := s.endpoints[key] if !ok { - return nil, fmt.Errorf("endpoint %s: %w", key, ErrNotFound) + return Endpoint{}, fmt.Errorf("endpoint %s: %w", key, ErrNotFound) } - return e, nil + return *e, nil } -func (s *Store) ListEndpoints() []*Endpoint { +// ListEndpoints returns snapshots of all endpoints. +func (s *Store) ListEndpoints() []Endpoint { s.mu.RLock() defer s.mu.RUnlock() - result := make([]*Endpoint, 0, len(s.endpoints)) + result := make([]Endpoint, 0, len(s.endpoints)) for _, e := range s.endpoints { - result = append(result, e) + result = append(result, *e) } return result } @@ -235,7 +337,9 @@ func (s *Store) DeleteEndpoint(key string) error { return nil } -// UpdateSessionHeartbeat updates a session's context pressure and heartbeat timestamp. +// UpdateSessionHeartbeat updates a session's context pressure and +// heartbeat timestamp. Kept as a convenience helper; equivalent to +// UpdateSession with the corresponding closure. func (s *Store) UpdateSessionHeartbeat(key string, contextPercent float64) error { s.mu.Lock() defer s.mu.Unlock() diff --git a/internal/daemon/daemon.go b/internal/daemon/daemon.go index 4264402..3b28775 100644 --- a/internal/daemon/daemon.go +++ b/internal/daemon/daemon.go @@ -639,13 +639,22 @@ func (d *Daemon) handleScale(params json.RawMessage) Response { return Response{Error: fmt.Sprintf("role is required; available roles: %v", names)} } - found := false - for i := range t.Roles { - if t.Roles[i].Name == p.Role { - t.Roles[i].Replicas = p.Replicas - found = true - break + // Commit the replica change to the live team under the store lock. + // Pre-fix, this mutated a pointer returned by GetTeam — which used + // to alias store state. Now GetTeam returns a snapshot, so scaling + // must go through UpdateTeam. See orc finding-032. + var found bool + if err := d.store.UpdateTeam(p.TeamKey, func(live *api.Team) error { + for i := range live.Roles { + if live.Roles[i].Name == p.Role { + live.Roles[i].Replicas = p.Replicas + found = true + return nil + } } + return nil + }); err != nil { + return Response{Error: err.Error()} } if !found { return Response{Error: fmt.Sprintf("role %s not found in team %s", p.Role, p.TeamKey)} diff --git a/internal/session/manager.go b/internal/session/manager.go index a2fd36b..58d15ba 100644 --- a/internal/session/manager.go +++ b/internal/session/manager.go @@ -105,6 +105,17 @@ func (m *Manager) Create(sess *api.Session) error { return fmt.Errorf("create pane for %s: %w", sess.Key(), err) } + // Commit PaneID + State=Running to the live session under the store + // lock. Also update the caller's *api.Session so returning pointers + // stay consistent for any downstream emission/logging that references + // fields on sess. Per orc finding-032, the Store is the sync boundary. + if err := m.store.UpdateSession(sess.Key(), func(live *api.Session) error { + live.PaneID = paneID + live.State = api.SessionRunning + return nil + }); err != nil { + return fmt.Errorf("update session %s post-create: %w", sess.Key(), err) + } sess.PaneID = paneID sess.State = api.SessionRunning log.Printf("session %s running in pane %s", sess.Key(), paneID) @@ -122,7 +133,10 @@ func (m *Manager) Create(sess *api.Session) error { // resolveRuntime uses the adapter registry when team/role context is available, // falling back to direct command construction for ad-hoc sessions. func (m *Manager) resolveRuntime(sess *api.Session) (string, map[string]string) { - // Look up team and role for full adapter context. + // Look up team and role for full adapter context. Store returns + // snapshots — taking addresses of these locals is safe because the + // adapter is read-only and the LaunchContext doesn't outlive this + // function. team, teamErr := m.store.GetTeam(fmt.Sprintf("%s/%s", sess.Workspace, sess.Team)) if teamErr != nil { // Ad-hoc session or team not found — use direct command. @@ -149,8 +163,8 @@ func (m *Manager) resolveRuntime(sess *api.Session) (string, map[string]string) result, err := adapter.Prepare(&runtime.LaunchContext{ Session: sess, Role: role, - Team: team, - Workspace: ws, + Team: &team, + Workspace: &ws, SocketPath: m.SocketPath, }) if err != nil { @@ -248,8 +262,14 @@ func (m *Manager) ReapDead() []ReapedSession { log.Printf("session %s: pane %s gone, marking crashed", sess.Key(), sess.PaneID) lostPane := sess.PaneID m.clearStaleCrashed(sessions, sess.Workspace, sess.Team, sess.Role, sess.Key()) - sess.State = api.SessionCrashed - sess.PaneID = "" + if err := m.store.UpdateSession(sess.Key(), func(live *api.Session) error { + live.State = api.SessionCrashed + live.PaneID = "" + return nil + }); err != nil { + log.Printf("warning: mark crashed %s: %v", sess.Key(), err) + continue + } reaped = append(reaped, ReapedSession{ Key: sess.Key(), Workspace: sess.Workspace, @@ -274,7 +294,7 @@ func (m *Manager) ReapDead() []ReapedSession { // excluding the session about to be marked Crashed. Caps the store at // one Crashed marker per role so the reap path can't accumulate ghosts // across a saturated role's many crashes. -func (m *Manager) clearStaleCrashed(snapshot []*api.Session, workspace, team, role, exceptKey string) { +func (m *Manager) clearStaleCrashed(snapshot []api.Session, workspace, team, role, exceptKey string) { for _, other := range snapshot { if other.State != api.SessionCrashed { continue diff --git a/internal/team/controller.go b/internal/team/controller.go index 9f77d53..eb332d0 100644 --- a/internal/team/controller.go +++ b/internal/team/controller.go @@ -130,8 +130,8 @@ func (c *Controller) ReconcileOnce() { c.evaluateHealth() teams := c.store.ListTeams() - for _, t := range teams { - c.reconcileTeam(t) + for i := range teams { + c.reconcileTeam(&teams[i]) } } @@ -263,12 +263,18 @@ func (c *Controller) reconcileRole(t *api.Team, role *api.Role) { // evaluateHealth checks heartbeat staleness for all sessions and applies // restart policies when failure thresholds are exceeded. +// +// Per orc finding-032, all mutation to session health state is routed +// through the Store via UpdateSession. The closure body holds the write +// lock, so the mutation is atomic relative to daemon reads and other +// writers. The outer loop works off value snapshots — safe to iterate +// while the closure mutates the live copy. func (c *Controller) evaluateHealth() { now := time.Now().UTC() sessions := c.store.ListSessions() - // Build a lookup cache: workspace/team → team (avoid repeated store access). - teamCache := make(map[string]*api.Team) + // Build a lookup cache: workspace/team → team (value snapshot). + teamCache := make(map[string]api.Team) for _, t := range c.store.ListTeams() { teamCache[t.Key()] = t } @@ -291,52 +297,78 @@ func (c *Controller) evaluateHealth() { break } } - if role == nil || role.HealthCheck == nil { - sess.HealthState = api.HealthUnknown - continue - } - if role.HealthCheck.Type != api.HealthCheckHeartbeat { - // process-alive is handled by ReapDead. If we're here, the pane exists. - sess.HealthState = api.HealthHealthy - sess.FailureCount = 0 - continue - } + var ( + transitionedToUnhealthy bool + shouldApplyRestart bool + ) + var updated api.Session + + err := c.store.UpdateSession(sess.Key(), func(live *api.Session) error { + if role == nil || role.HealthCheck == nil { + live.HealthState = api.HealthUnknown + updated = *live + return nil + } + if role.HealthCheck.Type != api.HealthCheckHeartbeat { + // process-alive is handled by ReapDead. Pane exists → healthy. + live.HealthState = api.HealthHealthy + live.FailureCount = 0 + updated = *live + return nil + } - // Heartbeat staleness check. - sess.LastHealthCheck = now + // Heartbeat staleness check. + live.LastHealthCheck = now + + if live.LastHeartbeat.IsZero() { + // Grace period: allow timeout from creation for first heartbeat. + if now.Sub(live.CreatedAt) < role.HealthCheck.Timeout { + live.HealthState = api.HealthUnknown + updated = *live + return nil + } + live.FailureCount++ + } else if now.Sub(live.LastHeartbeat) > role.HealthCheck.Timeout { + live.FailureCount++ + } else { + live.FailureCount = 0 + live.HealthState = api.HealthHealthy + updated = *live + return nil + } - if sess.LastHeartbeat.IsZero() { - // Grace period: allow timeout from creation for first heartbeat. - if now.Sub(sess.CreatedAt) < role.HealthCheck.Timeout { - sess.HealthState = api.HealthUnknown - continue + if live.FailureCount >= role.HealthCheck.FailureThreshold { + if live.HealthState != api.HealthUnhealthy { + transitionedToUnhealthy = true + } + live.HealthState = api.HealthUnhealthy + shouldApplyRestart = true + } else { + live.HealthState = api.HealthUnhealthy } - sess.FailureCount++ - } else if now.Sub(sess.LastHeartbeat) > role.HealthCheck.Timeout { - sess.FailureCount++ - } else { - sess.FailureCount = 0 - sess.HealthState = api.HealthHealthy + updated = *live + return nil + }) + if err != nil { + // Session disappeared between snapshot and update — fine, + // another tick will handle whatever's next. continue } - if sess.FailureCount >= role.HealthCheck.FailureThreshold { - if sess.HealthState != api.HealthUnhealthy { - events.Emit(c.Events, events.Event{ - Kind: events.KindHealthCheckFailed, - Severity: events.SeverityWarning, - Workspace: sess.Workspace, - Team: sess.Team, - Role: sess.Role, - Session: sess.Key(), - Message: fmt.Sprintf("heartbeat stale %d/%d failures", sess.FailureCount, role.HealthCheck.FailureThreshold), - }) - } - sess.HealthState = api.HealthUnhealthy - c.applyRestartPolicy(sess, t, role) - } else { - sess.HealthState = api.HealthUnhealthy + if transitionedToUnhealthy { + events.Emit(c.Events, events.Event{ + Kind: events.KindHealthCheckFailed, + Severity: events.SeverityWarning, + Workspace: updated.Workspace, + Team: updated.Team, + Role: updated.Role, + Session: updated.Key(), + Message: fmt.Sprintf("heartbeat stale %d/%d failures", updated.FailureCount, role.HealthCheck.FailureThreshold), + }) + } + if shouldApplyRestart { + c.applyRestartPolicy(&updated, &t, role) } } } @@ -344,6 +376,10 @@ func (c *Controller) evaluateHealth() { func (c *Controller) applyRestartPolicy(sess *api.Session, t *api.Team, role *api.Role) { switch role.RestartPolicy { case api.RestartNever: + _ = c.store.UpdateSession(sess.Key(), func(live *api.Session) error { + live.State = api.SessionFailed + return nil + }) sess.State = api.SessionFailed log.Printf("health: session %s failed (restart_policy=never, failures=%d)", sess.Key(), sess.FailureCount) @@ -351,6 +387,10 @@ func (c *Controller) applyRestartPolicy(sess *api.Session, t *api.Team, role *ap if sess.State == api.SessionFailed { c.restartSession(sess, t, role) } else { + _ = c.store.UpdateSession(sess.Key(), func(live *api.Session) error { + live.State = api.SessionFailed + return nil + }) sess.State = api.SessionFailed } default: // RestartAlways @@ -393,6 +433,10 @@ func (c *Controller) restartSession(sess *api.Session, t *api.Team, role *api.Ro Message: fmt.Sprintf("cooling down, backoff until %s", rh.BackoffUntil.Format(time.RFC3339)), }) } + _ = c.store.UpdateSession(sess.Key(), func(live *api.Session) error { + live.State = api.SessionCrashLoopBackOff + return nil + }) sess.State = api.SessionCrashLoopBackOff return } @@ -415,6 +459,10 @@ func (c *Controller) restartSession(sess *api.Session, t *api.Team, role *api.Ro Message: fmt.Sprintf("max_restarts=%d reached", role.MaxRestarts), }) } + _ = c.store.UpdateSession(sess.Key(), func(live *api.Session) error { + live.State = api.SessionFailed + return nil + }) sess.State = api.SessionFailed return } @@ -430,6 +478,11 @@ func (c *Controller) restartSession(sess *api.Session, t *api.Team, role *api.Ro Session: sess.Key(), Message: fmt.Sprintf("restart #%d, next backoff %s", rh.RestartCount, time.Until(rh.BackoffUntil)), }) + _ = c.store.UpdateSession(sess.Key(), func(live *api.Session) error { + live.RestartCount++ + live.State = api.SessionFailed + return nil + }) sess.RestartCount++ sess.State = api.SessionFailed if err := c.sessMgr.Delete(sess.Key()); err != nil { @@ -474,21 +527,32 @@ func (c *Controller) InitiateShift(teamKey, role string) error { } oldGen := t.Generation - t.Generation++ - t.Shift = api.ShiftState{ - Phase: api.ShiftLaunching, - OldGeneration: oldGen, - RoleIndex: 0, - Roles: roles, - StartedAt: time.Now().UTC(), + newGen := oldGen + 1 + if err := c.store.UpdateTeam(teamKey, func(live *api.Team) error { + // Re-check inside the lock — another caller could have started + // a shift between our snapshot and this mutation. + if live.Shift.Phase != api.ShiftNone { + return fmt.Errorf("team %s: shift already in progress (phase: %s)", teamKey, live.Shift.Phase) + } + live.Generation = newGen + live.Shift = api.ShiftState{ + Phase: api.ShiftLaunching, + OldGeneration: oldGen, + RoleIndex: 0, + Roles: roles, + StartedAt: time.Now().UTC(), + } + return nil + }); err != nil { + return err } - log.Printf("shift: initiated for %s gen %d→%d roles=%v", teamKey, oldGen, t.Generation, roles) + log.Printf("shift: initiated for %s gen %d→%d roles=%v", teamKey, oldGen, newGen, roles) events.Emit(c.Events, events.Event{ Kind: events.KindShiftStarted, Workspace: t.Workspace, Team: t.Name, - Message: fmt.Sprintf("gen %d→%d roles=%v", oldGen, t.Generation, roles), + Message: fmt.Sprintf("gen %d→%d roles=%v", oldGen, newGen, roles), }) return nil } @@ -521,6 +585,10 @@ func (c *Controller) reconcileShift(t *api.Team) { Team: t.Name, Message: fmt.Sprintf("gen %d active", t.Generation), }) + _ = c.store.UpdateTeam(t.Key(), func(live *api.Team) error { + live.Shift = api.ShiftState{} + return nil + }) t.Shift = api.ShiftState{} return } @@ -544,6 +612,10 @@ func (c *Controller) reconcileShift(t *api.Team) { } if role == nil { log.Printf("shift: role %s not found in team %s, skipping", shiftingRoleName, t.Key()) + _ = c.store.UpdateTeam(t.Key(), func(live *api.Team) error { + live.Shift.RoleIndex++ + return nil + }) t.Shift.RoleIndex++ return } @@ -585,6 +657,10 @@ func (c *Controller) shiftLaunch(t *api.Team, role *api.Role) { if c.allReady(newGen, role) { log.Printf("shift: %s/%s role %s — %d new sessions ready, draining old gen %d", t.Workspace, t.Name, role.Name, len(newGen), t.Shift.OldGeneration) + _ = c.store.UpdateTeam(t.Key(), func(live *api.Team) error { + live.Shift.Phase = api.ShiftDraining + return nil + }) t.Shift.Phase = api.ShiftDraining } else { log.Printf("shift: %s/%s role %s — %d sessions launched, waiting for readiness", @@ -596,7 +672,7 @@ func (c *Controller) shiftLaunch(t *api.Team, role *api.Role) { // allReady returns true if all sessions are ready to take over. // For roles without a healthcheck, pane existence (Running state) is sufficient. // For heartbeat-based checks, at least one heartbeat must have been received. -func (c *Controller) allReady(sessions []*api.Session, role *api.Role) bool { +func (c *Controller) allReady(sessions []api.Session, role *api.Role) bool { if role.HealthCheck == nil { // No healthcheck — running state is sufficient. for _, s := range sessions { @@ -623,6 +699,13 @@ func (c *Controller) shiftDrain(t *api.Team, role *api.Role) { if len(oldGen) == 0 { // All old-gen drained for this role — advance to next role. log.Printf("shift: %s/%s role %s — old gen drained", t.Workspace, t.Name, role.Name) + _ = c.store.UpdateTeam(t.Key(), func(live *api.Team) error { + live.Shift.RoleIndex++ + if live.Shift.RoleIndex < len(live.Shift.Roles) { + live.Shift.Phase = api.ShiftLaunching + } + return nil + }) t.Shift.RoleIndex++ if t.Shift.RoleIndex < len(t.Shift.Roles) { t.Shift.Phase = api.ShiftLaunching diff --git a/internal/team/controller_test.go b/internal/team/controller_test.go index 920e99d..597f0f7 100644 --- a/internal/team/controller_test.go +++ b/internal/team/controller_test.go @@ -38,7 +38,7 @@ func setup(t *testing.T) (*api.Store, *session.Manager, *Controller, func()) { return store, sessMgr, ctrl, cleanup } -func createTeamFixture(t *testing.T, store *api.Store, wsName, teamName string, roles []api.Role) *api.Team { +func createTeamFixture(t *testing.T, store *api.Store, wsName, teamName string, roles []api.Role) { t.Helper() ws := &api.Workspace{Name: wsName, CreatedAt: time.Now().UTC()} if err := store.CreateWorkspace(ws); err != nil { @@ -54,7 +54,6 @@ func createTeamFixture(t *testing.T, store *api.Store, wsName, teamName string, if err := store.CreateTeam(team); err != nil { t.Fatal(err) } - return team } func TestReconcileScaleUp(t *testing.T) { @@ -94,7 +93,7 @@ func TestReconcileScaleDown(t *testing.T) { store, _, ctrl, cleanup := setup(t) t.Cleanup(cleanup) - team := createTeamFixture(t, store, "test-scaledown", "agents", []api.Role{ + createTeamFixture(t, store, "test-scaledown", "agents", []api.Role{ {Name: "worker", Replicas: 3, Runtime: api.Runtime{Name: "sleep", Command: "sleep", Args: []string{"300"}}}, }) @@ -103,7 +102,12 @@ func TestReconcileScaleDown(t *testing.T) { t.Fatal("expected 3 sessions after scale up") } - team.Roles[0].Replicas = 1 + if err := store.UpdateTeam("test-scaledown/agents", func(live *api.Team) error { + live.Roles[0].Replicas = 1 + return nil + }); err != nil { + t.Fatalf("scale down: %v", err) + } ctrl.ReconcileOnce() sessions := store.ListSessionsByTeamRole("test-scaledown", "agents", "worker") @@ -144,7 +148,7 @@ func TestReconcileMultipleRoles(t *testing.T) { store, _, ctrl, cleanup := setup(t) t.Cleanup(cleanup) - team := createTeamFixture(t, store, "test-multi", "squad", []api.Role{ + createTeamFixture(t, store, "test-multi", "squad", []api.Role{ {Name: "supervisor", Replicas: 1, Runtime: api.Runtime{Name: "sleep", Command: "sleep", Args: []string{"300"}}}, {Name: "worker", Replicas: 3, Runtime: api.Runtime{Name: "sleep", Command: "sleep", Args: []string{"300"}}}, }) @@ -166,7 +170,12 @@ func TestReconcileMultipleRoles(t *testing.T) { t.Fatalf("expected 4 total sessions, got %d", len(all)) } - team.Roles[1].Replicas = 1 + if err := store.UpdateTeam("test-multi/squad", func(live *api.Team) error { + live.Roles[1].Replicas = 1 + return nil + }); err != nil { + t.Fatalf("scale down worker: %v", err) + } ctrl.ReconcileOnce() workers = store.ListSessionsByTeamRole("test-multi", "squad", "worker") @@ -187,9 +196,10 @@ func TestShiftFullLifecycle(t *testing.T) { store, _, ctrl, cleanup := setup(t) t.Cleanup(cleanup) - team := createTeamFixture(t, store, "test-shift", "squad", []api.Role{ + createTeamFixture(t, store, "test-shift", "squad", []api.Role{ {Name: "worker", Replicas: 2, Runtime: api.Runtime{Name: "sleep", Command: "sleep", Args: []string{"300"}}}, }) + teamKey := "test-shift/squad" // Initial reconcile creates gen-1 sessions. ctrl.ReconcileOnce() @@ -199,10 +209,11 @@ func TestShiftFullLifecycle(t *testing.T) { } // Initiate shift. - if err := ctrl.InitiateShift("test-shift/squad", ""); err != nil { + if err := ctrl.InitiateShift(teamKey, ""); err != nil { t.Fatalf("initiate shift: %v", err) } + team, _ := store.GetTeam(teamKey) if team.Generation != 2 { t.Fatalf("expected generation 2, got %d", team.Generation) } @@ -213,10 +224,12 @@ func TestShiftFullLifecycle(t *testing.T) { // Run reconcile ticks until shift completes. for i := 0; i < 20; i++ { ctrl.ReconcileOnce() + team, _ = store.GetTeam(teamKey) if team.Shift.Phase == api.ShiftNone { break } } + team, _ = store.GetTeam(teamKey) if team.Shift.Phase != api.ShiftNone { t.Fatalf("shift didn't complete after 20 ticks, phase: %s", team.Shift.Phase) } @@ -250,18 +263,20 @@ func TestShiftMultipleRoles(t *testing.T) { store, _, ctrl, cleanup := setup(t) t.Cleanup(cleanup) - team := createTeamFixture(t, store, "test-shift-multi", "squad", []api.Role{ + createTeamFixture(t, store, "test-shift-multi", "squad", []api.Role{ {Name: "supervisor", Replicas: 1, Runtime: api.Runtime{Name: "sleep", Command: "sleep", Args: []string{"300"}}}, {Name: "worker", Replicas: 2, Runtime: api.Runtime{Name: "sleep", Command: "sleep", Args: []string{"300"}}}, }) + teamKey := "test-shift-multi/squad" ctrl.ReconcileOnce() // Initiate shift — supervisor should shift last. - if err := ctrl.InitiateShift("test-shift-multi/squad", ""); err != nil { + if err := ctrl.InitiateShift(teamKey, ""); err != nil { t.Fatalf("initiate shift: %v", err) } + team, _ := store.GetTeam(teamKey) if team.Shift.Roles[0] != "worker" { t.Fatalf("expected worker to shift first, got %s", team.Shift.Roles[0]) } @@ -272,11 +287,13 @@ func TestShiftMultipleRoles(t *testing.T) { // Run reconcile ticks until shift completes. for i := 0; i < 20; i++ { ctrl.ReconcileOnce() + team, _ = store.GetTeam(teamKey) if team.Shift.Phase == api.ShiftNone { break } } + team, _ = store.GetTeam(teamKey) if team.Shift.Phase != api.ShiftNone { t.Fatalf("shift didn't complete after 20 ticks, phase: %s", team.Shift.Phase) } @@ -317,18 +334,20 @@ func TestShiftSingleRole(t *testing.T) { store, _, ctrl, cleanup := setup(t) t.Cleanup(cleanup) - team := createTeamFixture(t, store, "test-shift-single", "squad", []api.Role{ + createTeamFixture(t, store, "test-shift-single", "squad", []api.Role{ {Name: "supervisor", Replicas: 1, Runtime: api.Runtime{Name: "sleep", Command: "sleep", Args: []string{"300"}}}, {Name: "worker", Replicas: 2, Runtime: api.Runtime{Name: "sleep", Command: "sleep", Args: []string{"300"}}}, }) + teamKey := "test-shift-single/squad" ctrl.ReconcileOnce() // Shift only workers. - if err := ctrl.InitiateShift("test-shift-single/squad", "worker"); err != nil { + if err := ctrl.InitiateShift(teamKey, "worker"); err != nil { t.Fatalf("initiate shift: %v", err) } + team, _ := store.GetTeam(teamKey) if len(team.Shift.Roles) != 1 { t.Fatalf("expected 1 role in shift, got %d", len(team.Shift.Roles)) } @@ -336,11 +355,13 @@ func TestShiftSingleRole(t *testing.T) { // Run ticks until complete. for i := 0; i < 20; i++ { ctrl.ReconcileOnce() + team, _ = store.GetTeam(teamKey) if team.Shift.Phase == api.ShiftNone { break } } + team, _ = store.GetTeam(teamKey) if team.Shift.Phase != api.ShiftNone { t.Fatalf("shift didn't complete") } @@ -380,9 +401,15 @@ func TestHealthEvalHeartbeatHealthy(t *testing.T) { t.Fatalf("expected 1 session, got %d", len(sessions)) } - // Simulate a fresh heartbeat. + // Simulate a fresh heartbeat (commit through the store so the + // controller sees the updated value on the next tick). sess := sessions[0] - sess.LastHeartbeat = time.Now().UTC() + if err := store.UpdateSession(sess.Key(), func(live *api.Session) error { + live.LastHeartbeat = time.Now().UTC() + return nil + }); err != nil { + t.Fatalf("update heartbeat: %v", err) + } ctrl.ReconcileOnce() @@ -418,13 +445,18 @@ func TestHealthEvalHeartbeatStale(t *testing.T) { // Set a stale heartbeat (well past timeout). sess := sessions[0] - sess.LastHeartbeat = time.Now().UTC().Add(-1 * time.Hour) + if err := store.UpdateSession(sess.Key(), func(live *api.Session) error { + live.LastHeartbeat = time.Now().UTC().Add(-1 * time.Hour) + return nil + }); err != nil { + t.Fatalf("update heartbeat: %v", err) + } // First eval: failure count 1 (below threshold of 2). ctrl.ReconcileOnce() - sess, _ = store.GetSession(sess.Key()) - if sess == nil { - t.Fatal("session should still exist (restart_policy=never)") + sess, err := store.GetSession(sess.Key()) + if err != nil { + t.Fatalf("session should still exist (restart_policy=never): %v", err) } if sess.FailureCount != 1 { t.Fatalf("expected 1 failure after first eval, got %d", sess.FailureCount) @@ -432,9 +464,9 @@ func TestHealthEvalHeartbeatStale(t *testing.T) { // Second eval: failure count 2 (meets threshold). ctrl.ReconcileOnce() - sess, _ = store.GetSession(sess.Key()) - if sess == nil { - t.Fatal("session should still exist (restart_policy=never)") + sess, err = store.GetSession(sess.Key()) + if err != nil { + t.Fatalf("session should still exist (restart_policy=never): %v", err) } if sess.State != api.SessionFailed { t.Fatalf("expected failed state with restart_policy=never, got %s", sess.State) @@ -465,8 +497,8 @@ func TestHealthEvalNoConfig(t *testing.T) { ctrl.ReconcileOnce() ctrl.ReconcileOnce() - sess, _ := store.GetSession(sessions[0].Key()) - if sess == nil { + sess, err := store.GetSession(sessions[0].Key()) + if err != nil { t.Fatal("session should still exist (no healthcheck)") } if sess.HealthState != api.HealthUnknown { @@ -504,7 +536,12 @@ func TestHealthRestartAlways(t *testing.T) { origCreatedAt := sessions[0].CreatedAt // Set stale heartbeat. - sessions[0].LastHeartbeat = time.Now().UTC().Add(-1 * time.Hour) + if err := store.UpdateSession(sessions[0].Key(), func(live *api.Session) error { + live.LastHeartbeat = time.Now().UTC().Add(-1 * time.Hour) + return nil + }); err != nil { + t.Fatalf("update heartbeat: %v", err) + } // Tick 1: unhealthy → first restart is immediate (count goes 0→1), // session is deleted, but reconciler holds off on recreating because @@ -553,7 +590,13 @@ func TestHealthRestartBackoffHoldsReplacement(t *testing.T) { }) ctrl.ReconcileOnce() - store.ListSessionsByTeamRole("test-health-hold", "squad", "worker")[0].LastHeartbeat = time.Now().UTC().Add(-1 * time.Hour) + staleKey := store.ListSessionsByTeamRole("test-health-hold", "squad", "worker")[0].Key() + if err := store.UpdateSession(staleKey, func(live *api.Session) error { + live.LastHeartbeat = time.Now().UTC().Add(-1 * time.Hour) + return nil + }); err != nil { + t.Fatalf("update heartbeat: %v", err) + } ctrl.ReconcileOnce() // first restart triggered + session deleted // Several reconciler ticks while still inside backoff: actual=0, @@ -604,7 +647,12 @@ func TestHealthRestartBackoffSiblingMarked(t *testing.T) { } // Fail worker-0 → triggers first restart for the role. - workers[0].LastHeartbeat = time.Now().UTC().Add(-1 * time.Hour) + if err := store.UpdateSession(workers[0].Key(), func(live *api.Session) error { + live.LastHeartbeat = time.Now().UTC().Add(-1 * time.Hour) + return nil + }); err != nil { + t.Fatalf("update heartbeat: %v", err) + } ctrl.ReconcileOnce() // Now fail worker-1 while still inside the backoff window. @@ -613,7 +661,12 @@ func TestHealthRestartBackoffSiblingMarked(t *testing.T) { if len(workers) != 1 { t.Fatalf("expected 1 surviving worker during backoff, got %d", len(workers)) } - workers[0].LastHeartbeat = time.Now().UTC().Add(-1 * time.Hour) + if err := store.UpdateSession(workers[0].Key(), func(live *api.Session) error { + live.LastHeartbeat = time.Now().UTC().Add(-1 * time.Hour) + return nil + }); err != nil { + t.Fatalf("update heartbeat: %v", err) + } ctrl.ReconcileOnce() // Sibling must be alive, marked CrashLoopBackOff, and the role @@ -659,7 +712,12 @@ func TestHealthRestartMaxReached(t *testing.T) { if len(got) == 0 { t.Fatalf("iteration %d: expected a running session", i) } - got[0].LastHeartbeat = time.Now().UTC().Add(-1 * time.Hour) + if err := store.UpdateSession(got[0].Key(), func(live *api.Session) error { + live.LastHeartbeat = time.Now().UTC().Add(-1 * time.Hour) + return nil + }); err != nil { + t.Fatalf("iteration %d: update heartbeat: %v", i, err) + } ctrl.ReconcileOnce() // fail + (maybe) restart clock.Advance(10 * time.Minute) }