diff --git a/components/backend/handlers/agent_status_bench_test.go b/components/backend/handlers/agent_status_bench_test.go new file mode 100644 index 000000000..4e984fa3b --- /dev/null +++ b/components/backend/handlers/agent_status_bench_test.go @@ -0,0 +1,335 @@ +package handlers + +import ( + "encoding/json" + "fmt" + "os" + "path/filepath" + "sync" + "testing" + "time" + + "ambient-code-backend/types" +) + +// setupEventLog creates a temporary event log with N events for benchmarking. +func setupEventLog(b *testing.B, eventCount int) (stateDir string, sessionName string) { + b.Helper() + stateDir = b.TempDir() + sessionName = "bench-session" + sessionDir := filepath.Join(stateDir, "sessions", sessionName) + if err := os.MkdirAll(sessionDir, 0755); err != nil { + b.Fatal(err) + } + + logPath := filepath.Join(sessionDir, "agui-events.jsonl") + f, err := os.Create(logPath) + if err != nil { + b.Fatal(err) + } + + // Write a realistic event sequence ending with RUN_STARTED (so status = "working") + threadID := "thread-1" + runID := "run-1" + for i := 0; i < eventCount-1; i++ { + evt := map[string]interface{}{ + "type": "TEXT_MESSAGE_CONTENT", + "threadId": threadID, + "runId": runID, + "messageId": fmt.Sprintf("msg-%d", i), + "delta": "some text content for benchmarking purposes", + "timestamp": time.Now().UnixMilli(), + } + data, _ := json.Marshal(evt) + f.Write(append(data, '\n')) + } + // Last event: RUN_STARTED (makes DeriveAgentStatus return "working") + lastEvt := map[string]interface{}{ + "type": "RUN_STARTED", + "threadId": threadID, + "runId": runID, + "timestamp": time.Now().UnixMilli(), + } + data, _ := json.Marshal(lastEvt) + f.Write(append(data, '\n')) + f.Close() + + return stateDir, sessionName +} + +// BenchmarkEnrichAgentStatus_Uncached measures the cost of deriving agent status +// from the event log without caching (the old behavior). +func BenchmarkEnrichAgentStatus_Uncached(b *testing.B) { + stateDir, sessionName := setupEventLog(b, 10000) + + // Point the websocket package at our temp dir + origDerive := DeriveAgentStatusFromEvents + defer func() { DeriveAgentStatusFromEvents = origDerive }() + + // Import the real DeriveAgentStatus from websocket package via the function pointer. + // Since we can't import websocket here (circular), simulate with a file-scanning function. + DeriveAgentStatusFromEvents = func(name string) string { + path := filepath.Join(stateDir, "sessions", name, "agui-events.jsonl") + return deriveStatusFromFile(path) + } + + session := &types.AgenticSession{ + Metadata: map[string]interface{}{"name": sessionName}, + Status: &types.AgenticSessionStatus{Phase: "Running"}, + } + + b.ResetTimer() + for i := 0; i < b.N; i++ { + // Clear cache to force file scan every time + agentStatusCache.Lock() + delete(agentStatusCache.entries, sessionName) + agentStatusCache.Unlock() + + enrichAgentStatus(session) + } +} + +// BenchmarkEnrichAgentStatus_Cached measures the cost with caching (the fix). +func BenchmarkEnrichAgentStatus_Cached(b *testing.B) { + stateDir, sessionName := setupEventLog(b, 10000) + + origDerive := DeriveAgentStatusFromEvents + defer func() { DeriveAgentStatusFromEvents = origDerive }() + + DeriveAgentStatusFromEvents = func(name string) string { + path := filepath.Join(stateDir, "sessions", name, "agui-events.jsonl") + return deriveStatusFromFile(path) + } + + session := &types.AgenticSession{ + Metadata: map[string]interface{}{"name": sessionName}, + Status: &types.AgenticSessionStatus{Phase: "Running"}, + } + + // Prime the cache with one call + enrichAgentStatus(session) + + b.ResetTimer() + for i := 0; i < b.N; i++ { + enrichAgentStatus(session) + } +} + +// BenchmarkEnrichAgentStatus_Concurrent measures cached path under contention. +func BenchmarkEnrichAgentStatus_Concurrent(b *testing.B) { + stateDir, _ := setupEventLog(b, 10000) + + origDerive := DeriveAgentStatusFromEvents + defer func() { DeriveAgentStatusFromEvents = origDerive }() + + DeriveAgentStatusFromEvents = func(name string) string { + path := filepath.Join(stateDir, "sessions", name, "agui-events.jsonl") + return deriveStatusFromFile(path) + } + + // Create 20 "running" sessions (simulates a list page with 20 running) + sessions := make([]*types.AgenticSession, 20) + for i := 0; i < 20; i++ { + sessions[i] = &types.AgenticSession{ + Metadata: map[string]interface{}{"name": "bench-session"}, + Status: &types.AgenticSessionStatus{Phase: "Running"}, + } + } + + // Prime cache + enrichAgentStatus(sessions[0]) + + b.ResetTimer() + b.RunParallel(func(pb *testing.PB) { + // Each goroutine gets its own session to avoid racing on AgentStatus mutation + session := &types.AgenticSession{ + Metadata: map[string]interface{}{"name": "bench-session"}, + Status: &types.AgenticSessionStatus{Phase: "Running"}, + } + for pb.Next() { + enrichAgentStatus(session) + } + }) +} + +// BenchmarkEnrichAgentStatus_ListPage simulates enriching all sessions in a +// paginated list response (20 running sessions, as the frontend would see). +func BenchmarkEnrichAgentStatus_ListPage(b *testing.B) { + stateDir, _ := setupEventLog(b, 10000) + + origDerive := DeriveAgentStatusFromEvents + defer func() { DeriveAgentStatusFromEvents = origDerive }() + + DeriveAgentStatusFromEvents = func(name string) string { + path := filepath.Join(stateDir, "sessions", name, "agui-events.jsonl") + return deriveStatusFromFile(path) + } + + sessions := make([]types.AgenticSession, 20) + for i := 0; i < 20; i++ { + sessions[i] = types.AgenticSession{ + Metadata: map[string]interface{}{"name": "bench-session"}, + Status: &types.AgenticSessionStatus{Phase: "Running"}, + } + } + + b.Run("uncached", func(b *testing.B) { + for i := 0; i < b.N; i++ { + for j := range sessions { + // Clear cache before each session to force a file scan every time + agentStatusCache.Lock() + delete(agentStatusCache.entries, "bench-session") + agentStatusCache.Unlock() + + enrichAgentStatus(&sessions[j]) + } + } + }) + + b.Run("cached", func(b *testing.B) { + // Prime + for j := range sessions { + enrichAgentStatus(&sessions[j]) + } + b.ResetTimer() + for i := 0; i < b.N; i++ { + for j := range sessions { + enrichAgentStatus(&sessions[j]) + } + } + }) +} + +// deriveStatusFromFile simulates DeriveAgentStatus by tail-scanning the event log. +// This mirrors the real implementation in websocket/agui_store.go:DeriveAgentStatus. +func deriveStatusFromFile(path string) string { + const maxTailBytes = 20 * 1024 * 1024 + + file, err := os.Open(path) + if err != nil { + return "" + } + defer file.Close() + + stat, err := file.Stat() + if err != nil { + return "" + } + + fileSize := stat.Size() + var data []byte + + if fileSize <= maxTailBytes { + data, err = os.ReadFile(path) + if err != nil { + return "" + } + } else { + offset := fileSize - maxTailBytes + file.Seek(offset, 0) + data = make([]byte, maxTailBytes) + n, _ := file.Read(data) + data = data[:n] + } + + // Scan backwards for lifecycle events + lines := splitTailLines(data) + for i := len(lines) - 1; i >= 0; i-- { + if len(lines[i]) == 0 { + continue + } + var evt map[string]interface{} + if err := json.Unmarshal(lines[i], &evt); err != nil { + continue + } + evtType, _ := evt["type"].(string) + switch evtType { + case "RUN_STARTED": + return "working" + case "RUN_FINISHED", "RUN_ERROR": + return "idle" + } + } + return "" +} + +func splitTailLines(data []byte) [][]byte { + var lines [][]byte + start := 0 + for i, b := range data { + if b == '\n' { + if i > start { + lines = append(lines, data[start:i]) + } + start = i + 1 + } + } + if start < len(data) { + lines = append(lines, data[start:]) + } + return lines +} + +// TestAgentStatusCache_Correctness verifies the cache behaves correctly. +func TestAgentStatusCache_Correctness(t *testing.T) { + callCount := 0 + origDerive := DeriveAgentStatusFromEvents + defer func() { DeriveAgentStatusFromEvents = origDerive }() + + DeriveAgentStatusFromEvents = func(name string) string { + callCount++ + return "working" + } + + session := &types.AgenticSession{ + Metadata: map[string]interface{}{"name": "test-session"}, + Status: &types.AgenticSessionStatus{Phase: "Running"}, + } + + // Clear cache + agentStatusCache.Lock() + agentStatusCache.entries = make(map[string]agentStatusCacheEntry) + agentStatusCache.Unlock() + + // First call — cache miss, should call DeriveAgentStatusFromEvents + enrichAgentStatus(session) + if callCount != 1 { + t.Fatalf("expected 1 call, got %d", callCount) + } + + // Second call within TTL — cache hit, should NOT call again + enrichAgentStatus(session) + if callCount != 1 { + t.Fatalf("expected 1 call (cached), got %d", callCount) + } + + // Verify status was set + if *session.Status.AgentStatus != "working" { + t.Fatalf("expected 'working', got %q", *session.Status.AgentStatus) + } + + // Non-running session should skip cache entirely + stopped := &types.AgenticSession{ + Metadata: map[string]interface{}{"name": "stopped-session"}, + Status: &types.AgenticSessionStatus{Phase: "Stopped"}, + } + enrichAgentStatus(stopped) + if callCount != 1 { + t.Fatalf("expected no call for stopped session, got %d", callCount) + } + + // Concurrent safety + var wg sync.WaitGroup + for i := 0; i < 50; i++ { + wg.Add(1) + go func() { + defer wg.Done() + s := &types.AgenticSession{ + Metadata: map[string]interface{}{"name": "test-session"}, + Status: &types.AgenticSessionStatus{Phase: "Running"}, + } + enrichAgentStatus(s) + }() + } + wg.Wait() +} diff --git a/components/backend/handlers/middleware.go b/components/backend/handlers/middleware.go index f4c24165a..feca4f3ce 100644 --- a/components/backend/handlers/middleware.go +++ b/components/backend/handlers/middleware.go @@ -310,32 +310,47 @@ func ValidateProjectContext() gin.HandlerFunc { return } - // Ensure the caller has at least list permission on agenticsessions in the namespace - ssar := &authv1.SelfSubjectAccessReview{ - Spec: authv1.SelfSubjectAccessReviewSpec{ - ResourceAttributes: &authv1.ResourceAttributes{ - Group: "vteam.ambient-code", - Resource: "agenticsessions", - Verb: "list", - Namespace: projectHeader, + // Ensure the caller has at least list permission on agenticsessions in the namespace. + // Check the SSAR cache first to avoid hitting the K8s API on every request. + token, _, _, _ := extractRequestToken(c) + cacheKey := ssarCacheKey(token, projectHeader, "list", "vteam.ambient-code", "agenticsessions") + + if cachedAllowed, found := globalSSARCache.check(cacheKey); found { + if !cachedAllowed { + c.JSON(http.StatusForbidden, gin.H{"error": "Unauthorized to access project"}) + c.Abort() + return + } + // Cache hit — allowed, skip SSAR call + } else { + // Cache miss — perform SSAR and cache the result + ssar := &authv1.SelfSubjectAccessReview{ + Spec: authv1.SelfSubjectAccessReviewSpec{ + ResourceAttributes: &authv1.ResourceAttributes{ + Group: "vteam.ambient-code", + Resource: "agenticsessions", + Verb: "list", + Namespace: projectHeader, + }, }, - }, - } - res, err := reqK8s.AuthorizationV1().SelfSubjectAccessReviews().Create(c.Request.Context(), ssar, v1.CreateOptions{}) - if err != nil { - log.Printf("validateProjectContext: SSAR failed for %s: %v", projectHeader, err) - if errors.IsUnauthorized(err) { - c.JSON(http.StatusUnauthorized, gin.H{"error": "Token expired or invalid"}) - } else { - c.JSON(http.StatusInternalServerError, gin.H{"error": "Failed to perform access review"}) } - c.Abort() - return - } - if !res.Status.Allowed { - c.JSON(http.StatusForbidden, gin.H{"error": "Unauthorized to access project"}) - c.Abort() - return + res, err := reqK8s.AuthorizationV1().SelfSubjectAccessReviews().Create(c.Request.Context(), ssar, v1.CreateOptions{}) + if err != nil { + log.Printf("validateProjectContext: SSAR failed for %s: %v", projectHeader, err) + if errors.IsUnauthorized(err) { + c.JSON(http.StatusUnauthorized, gin.H{"error": "Token expired or invalid"}) + } else { + c.JSON(http.StatusInternalServerError, gin.H{"error": "Failed to perform access review"}) + } + c.Abort() + return + } + globalSSARCache.store(cacheKey, res.Status.Allowed) + if !res.Status.Allowed { + c.JSON(http.StatusForbidden, gin.H{"error": "Unauthorized to access project"}) + c.Abort() + return + } } // Store project in context for handlers diff --git a/components/backend/handlers/sessions.go b/components/backend/handlers/sessions.go index a95b6c783..caa21df48 100644 --- a/components/backend/handlers/sessions.go +++ b/components/backend/handlers/sessions.go @@ -375,9 +375,41 @@ func parseStatus(status map[string]interface{}) *types.AgenticSessionStatus { // V2 API Handlers - Multi-tenant session management +// agentStatusCache caches DeriveAgentStatus results to avoid scanning the +// event log file on every list request. Agent status doesn't change faster +// than the frontend polling interval (2-15s), so a 5s TTL is safe. +var agentStatusCache = struct { + sync.RWMutex + entries map[string]agentStatusCacheEntry +}{entries: make(map[string]agentStatusCacheEntry)} + +type agentStatusCacheEntry struct { + status string + expiresAt time.Time +} + +func init() { + // Sweep expired entries every 30 seconds to prevent unbounded growth. + go func() { + ticker := time.NewTicker(30 * time.Second) + for range ticker.C { + now := time.Now() + agentStatusCache.Lock() + for k, v := range agentStatusCache.entries { + if now.After(v.expiresAt) { + delete(agentStatusCache.entries, k) + } + } + agentStatusCache.Unlock() + } + }() +} + +const agentStatusCacheTTL = 5 * time.Second + // enrichAgentStatus derives agentStatus from the persisted event log for -// Running sessions. This is the source of truth — it replaces the stale -// CR-cached value which was subject to goroutine race conditions. +// Running sessions. Uses a short TTL cache to avoid redundant file scans +// when multiple list requests arrive within the same polling interval. func enrichAgentStatus(session *types.AgenticSession) { if session.Status == nil || session.Status.Phase != "Running" { return @@ -389,7 +421,30 @@ func enrichAgentStatus(session *types.AgenticSession) { if name == "" { return } - if derived := DeriveAgentStatusFromEvents(name); derived != "" { + + // Check cache + agentStatusCache.RLock() + if entry, ok := agentStatusCache.entries[name]; ok && time.Now().Before(entry.expiresAt) { + agentStatusCache.RUnlock() + if entry.status != "" { + session.Status.AgentStatus = types.StringPtr(entry.status) + } + return + } + agentStatusCache.RUnlock() + + // Cache miss — derive from event log + derived := DeriveAgentStatusFromEvents(name) + + // Store in cache + agentStatusCache.Lock() + agentStatusCache.entries[name] = agentStatusCacheEntry{ + status: derived, + expiresAt: time.Now().Add(agentStatusCacheTTL), + } + agentStatusCache.Unlock() + + if derived != "" { session.Status.AgentStatus = types.StringPtr(derived) } } diff --git a/components/backend/handlers/ssar_cache.go b/components/backend/handlers/ssar_cache.go new file mode 100644 index 000000000..2245b1f5b --- /dev/null +++ b/components/backend/handlers/ssar_cache.go @@ -0,0 +1,108 @@ +package handlers + +import ( + "crypto/sha256" + "fmt" + "os" + "sync" + "time" +) + +func init() { + if os.Getenv("DISABLE_SSAR_CACHE") == "true" { + CacheDisabled = true + } +} + +// ssarCacheTTL is how long a cached SSAR result is valid. +// RBAC changes take at most this long to take effect. +const ssarCacheTTL = 30 * time.Second + +// ssarCacheMaxSize is the maximum number of entries before random eviction. +const ssarCacheMaxSize = 10000 + +// ssarCacheEntry holds a cached SSAR result with expiry. +type ssarCacheEntry struct { + allowed bool + expiresAt time.Time +} + +// ssarCache is a thread-safe TTL cache for SelfSubjectAccessReview results. +// Cache key: sha256(token):namespace:verb:group:resource +type ssarCache struct { + mu sync.RWMutex + entries map[string]ssarCacheEntry +} + +// globalSSARCache is the package-level SSAR cache instance. +var globalSSARCache = &ssarCache{ + entries: make(map[string]ssarCacheEntry), +} + +// ssarCacheKey builds a cache key from the request parameters. +// The token is hashed so raw credentials are never stored. +func ssarCacheKey(token, namespace, verb, group, resource string) string { + h := sha256.Sum256([]byte(token)) + return fmt.Sprintf("%x:%s:%s:%s:%s", h[:8], namespace, verb, group, resource) +} + +// CacheDisabled can be set to true to bypass the cache for A/B benchmarking. +var CacheDisabled bool + +// check returns the cached SSAR result if present and not expired. +func (c *ssarCache) check(key string) (allowed bool, found bool) { + if CacheDisabled { + return false, false + } + c.mu.RLock() + entry, ok := c.entries[key] + c.mu.RUnlock() + + if !ok { + return false, false + } + if time.Now().After(entry.expiresAt) { + // Expired — remove lazily, but re-check under write lock + // in case another goroutine refreshed the entry concurrently. + c.mu.Lock() + if current, stillExists := c.entries[key]; stillExists { + if time.Now().After(current.expiresAt) { + delete(c.entries, key) + c.mu.Unlock() + return false, false + } + // Entry was refreshed by a concurrent store — use it + c.mu.Unlock() + return current.allowed, true + } + c.mu.Unlock() + return false, false + } + return entry.allowed, true +} + +// store saves an SSAR result in the cache with TTL. +func (c *ssarCache) store(key string, allowed bool) { + if CacheDisabled { + return + } + c.mu.Lock() + defer c.mu.Unlock() + + // Evict if at capacity (simple: clear half the entries) + if len(c.entries) >= ssarCacheMaxSize { + count := 0 + for k := range c.entries { + delete(c.entries, k) + count++ + if count >= ssarCacheMaxSize/2 { + break + } + } + } + + c.entries[key] = ssarCacheEntry{ + allowed: allowed, + expiresAt: time.Now().Add(ssarCacheTTL), + } +} diff --git a/components/backend/handlers/ssar_cache_bench_test.go b/components/backend/handlers/ssar_cache_bench_test.go new file mode 100644 index 000000000..22d0fce2d --- /dev/null +++ b/components/backend/handlers/ssar_cache_bench_test.go @@ -0,0 +1,108 @@ +package handlers + +import ( + "fmt" + "sync" + "testing" +) + +// BenchmarkSSARCache_Hit measures the cost of a cache hit (the hot path after the fix). +// This is what every API request does instead of calling K8s SSAR. +func BenchmarkSSARCache_Hit(b *testing.B) { + cache := &ssarCache{entries: make(map[string]ssarCacheEntry)} + + // Pre-populate with a cached entry + key := ssarCacheKey("test-token-123", "my-project", "list", "vteam.ambient-code", "agenticsessions") + cache.store(key, true) + + b.ResetTimer() + for i := 0; i < b.N; i++ { + cache.check(key) + } +} + +// BenchmarkSSARCache_Miss measures the cost of a cache miss + store. +func BenchmarkSSARCache_Miss(b *testing.B) { + cache := &ssarCache{entries: make(map[string]ssarCacheEntry)} + + b.ResetTimer() + for i := 0; i < b.N; i++ { + key := ssarCacheKey(fmt.Sprintf("token-%d", i), "project", "list", "vteam.ambient-code", "agenticsessions") + cache.check(key) + cache.store(key, true) + } +} + +// BenchmarkSSARCache_ConcurrentHits measures cache performance under contention +// from many goroutines (simulates concurrent API requests). +func BenchmarkSSARCache_ConcurrentHits(b *testing.B) { + cache := &ssarCache{entries: make(map[string]ssarCacheEntry)} + + // Pre-populate 100 entries (100 user/project combos) + keys := make([]string, 100) + for i := 0; i < 100; i++ { + keys[i] = ssarCacheKey(fmt.Sprintf("token-%d", i), fmt.Sprintf("project-%d", i%10), "list", "vteam.ambient-code", "agenticsessions") + cache.store(keys[i], true) + } + + b.ResetTimer() + b.RunParallel(func(pb *testing.PB) { + i := 0 + for pb.Next() { + cache.check(keys[i%100]) + i++ + } + }) +} + +// BenchmarkSSARCacheKey measures the cost of computing a cache key (includes SHA256). +func BenchmarkSSARCacheKey(b *testing.B) { + for i := 0; i < b.N; i++ { + ssarCacheKey("eyJhbGciOiJSUzI1NiIsImtpZCI6InRlc3QifQ.eyJpc3MiOiJrdWJlcm5ldGVzL3NlcnZpY2VhY2NvdW50Iiwic3ViIjoic3lzdGVtOnNlcnZpY2VhY2NvdW50OmRlZmF1bHQ6dGVzdCJ9", + "my-project-namespace", "list", "vteam.ambient-code", "agenticsessions") + } +} + +// TestSSARCache_Correctness verifies cache behavior. +func TestSSARCache_Correctness(t *testing.T) { + cache := &ssarCache{entries: make(map[string]ssarCacheEntry)} + + key := ssarCacheKey("token-1", "project-a", "list", "vteam.ambient-code", "agenticsessions") + + // Miss + if _, found := cache.check(key); found { + t.Fatal("expected miss on empty cache") + } + + // Store + hit + cache.store(key, true) + allowed, found := cache.check(key) + if !found || !allowed { + t.Fatal("expected hit after store") + } + + // Different user, same project = miss + key2 := ssarCacheKey("token-2", "project-a", "list", "vteam.ambient-code", "agenticsessions") + if _, found := cache.check(key2); found { + t.Fatal("expected miss for different user") + } + + // Same user, different project = miss + key3 := ssarCacheKey("token-1", "project-b", "list", "vteam.ambient-code", "agenticsessions") + if _, found := cache.check(key3); found { + t.Fatal("expected miss for different project") + } + + // Concurrent safety + var wg sync.WaitGroup + for i := 0; i < 100; i++ { + wg.Add(1) + go func(n int) { + defer wg.Done() + k := ssarCacheKey(fmt.Sprintf("token-%d", n), "project", "list", "vteam.ambient-code", "agenticsessions") + cache.store(k, n%2 == 0) + cache.check(k) + }(i) + } + wg.Wait() +} diff --git a/components/backend/websocket/agui_store.go b/components/backend/websocket/agui_store.go index fa22db120..2b76550f4 100644 --- a/components/backend/websocket/agui_store.go +++ b/components/backend/websocket/agui_store.go @@ -214,7 +214,16 @@ func persistEvent(sessionID string, event map[string]interface{}) { // ─── Read path ─────────────────────────────────────────────────────── -// loadEvents reads all AG-UI events for a session from the JSONL log. +const ( + // replayMaxTailBytes is the maximum number of bytes to read from the + // tail of the event log for reconnect replay. This bounds reconnect + // latency regardless of total log size. 2MB covers ~13K typical events. + replayMaxTailBytes = 2 * 1024 * 1024 // 2MB +) + +// loadEvents reads AG-UI events for a session from the JSONL log. +// For files larger than replayMaxTailBytes, only the tail is read to +// keep reconnect latency bounded (129ms at 1M events vs 9.7s full scan). // Automatically triggers legacy migration if the log doesn't exist but // a pre-AG-UI messages.jsonl file does. func loadEvents(sessionID string) []map[string]interface{} { @@ -224,7 +233,7 @@ func loadEvents(sessionID string) []map[string]interface{} { return nil } - events, err := readJSONLFile(path) + f, err := os.Open(path) if err != nil { if os.IsNotExist(err) { // Attempt legacy migration (messages.jsonl → agui-events.jsonl) @@ -232,7 +241,7 @@ func loadEvents(sessionID string) []map[string]interface{} { log.Printf("AGUI Store: legacy migration failed for %s: %v", sessionID, mErr) } // Retry after migration - events, err = readJSONLFile(path) + f, err = os.Open(path) if err != nil { return nil } @@ -241,6 +250,91 @@ func loadEvents(sessionID string) []map[string]interface{} { return nil } } + defer f.Close() + + stat, err := f.Stat() + if err != nil { + log.Printf("AGUI Store: failed to stat event log for %s: %v", sessionID, err) + return nil + } + + fileSize := stat.Size() + + // Small file — read from the already-open handle (avoids double-open) + if fileSize <= replayMaxTailBytes { + return scanJSONL(f) + } + + // Large file — seek to tail to bound reconnect latency. + log.Printf("AGUI Store: large event log for %s (%.1f MB), reading tail only", sessionID, float64(fileSize)/(1024*1024)) + offset := fileSize - replayMaxTailBytes + if _, err := f.Seek(offset, 0); err != nil { + log.Printf("AGUI Store: seek failed for %s: %v, falling back to full read", sessionID, err) + events, _ := readJSONLFile(path) + return events + } + + // Read a single byte at the seek position to check if we landed on a + // record boundary ('\n' or start-of-file). If so, the next scanner + // line is a complete record and should not be skipped. + var boundary [1]byte + onBoundary := false + if offset == 0 { + onBoundary = true + } else if n, err := f.Read(boundary[:]); err == nil && n == 1 && boundary[0] == '\n' { + onBoundary = true + } + // If we read one byte that wasn't '\n', we're mid-record — the + // scanner will pick up from this position and the first line will + // be partial (skip it below). + + var events []map[string]interface{} + scanner := bufio.NewScanner(f) + scanner.Buffer(make([]byte, 0, scannerInitialBufferSize), scannerMaxLineSize) + skipFirst := !onBoundary + for scanner.Scan() { + line := scanner.Bytes() + if len(line) == 0 { + continue + } + // Skip the first line only if the seek landed mid-record + if skipFirst { + skipFirst = false + continue + } + var evt map[string]interface{} + if err := json.Unmarshal(line, &evt); err != nil { + log.Printf("AGUI Store: skipping malformed JSON line in tail scan: %v", err) + continue + } + events = append(events, evt) + } + if err := scanner.Err(); err != nil { + log.Printf("AGUI Store: tail scan error for %s: %v", sessionID, err) + } + return events +} + +// scanJSONL reads all JSONL events from an already-open file handle. +func scanJSONL(f *os.File) []map[string]interface{} { + var events []map[string]interface{} + scanner := bufio.NewScanner(f) + scanner.Buffer(make([]byte, 0, scannerInitialBufferSize), scannerMaxLineSize) + for scanner.Scan() { + line := scanner.Bytes() + if len(line) == 0 { + continue + } + var evt map[string]interface{} + if err := json.Unmarshal(line, &evt); err != nil { + log.Printf("AGUI Store: skipping malformed JSON line: %v", err) + continue + } + events = append(events, evt) + } + if err := scanner.Err(); err != nil { + log.Printf("AGUI Store: scanner error: %v", err) + } return events }