From 0dbf9aaff8cb88ab07b08ad1a945b67965027541 Mon Sep 17 00:00:00 2001 From: PMCLSF Date: Sat, 2 May 2026 03:32:58 -0700 Subject: [PATCH] fix(0.2.1): wire request context + dedup in-flight server analyses MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Two related defects in `terrain serve`, both flagged by the launch- readiness review and verified in the codebase: 1. **Request context not wired.** The HTTP handlers ignored r.Context() and called engine.RunPipeline (which wraps context.Background()), so a client disconnect during a long analysis left the analysis running in the background with no handler waiting on it. 2. **Mutex-blocking analysis cache.** getResult held Server.mu via defer-Unlock for the full analysis duration. One slow analysis serialized every other request needing a pipeline result behind a single goroutine, regardless of whether the cache was warm enough for them. Both ship together because the right fix to (2) replaces the cache mutex with an RWMutex + singleflight, which also gives us a clean seam for (1): * Fast path: RWMutex.RLock so warm-cache hits don't contend. * Slow path: singleflight.Group.DoChan dedups concurrent in-flight analyses (one analysis per cache window, even with N waiters). * Per-caller cancellation: each handler threads r.Context() through getResult; the select on (ch | ctx.Done()) returns ctx.Err() immediately on disconnect. The shared analysis runs with context.Background() so a single caller's disconnect doesn't kill work other waiters depend on. Tradeoff documented inline: a single-waiter request whose context is canceled won't (yet) cancel the underlying analysis. Reference- counting waiters is on the 0.3 list. Tests added: * TestGetResult_CacheHit — fast path returns cached pointer * TestGetResult_RespectsCanceledContext — pre-canceled context returns context.Canceled within 2s rather than blocking on analysis (pre-fix this hung until pipeline completion) * TestGetResult_ConcurrentCallsShareCache — 50 concurrent callers on a warm cache observe the same report pointer Dep: golang.org/x/sync v0.10.0 (singleflight). Pinned to v0.10.0 because v0.20.0 bumps the go directive to 1.25; v0.10.0 is compatible with the existing go 1.23. Server-package doc-comment refreshed to describe the new concurrency model and remove the now-fixed "known issues" block that was added in PR #131. Co-Authored-By: Claude Opus 4.7 (1M context) --- go.mod | 2 + go.sum | 2 + internal/server/handlers.go | 14 ++++- internal/server/server.go | 104 +++++++++++++++++++++++++-------- internal/server/server_test.go | 93 +++++++++++++++++++++++++++++ 5 files changed, 187 insertions(+), 28 deletions(-) diff --git a/go.mod b/go.mod index 27d3957d..59e3fdd1 100644 --- a/go.mod +++ b/go.mod @@ -5,3 +5,5 @@ go 1.23 require gopkg.in/yaml.v3 v3.0.1 require github.com/smacker/go-tree-sitter v0.0.0-20240827094217-dd81d9e9be82 + +require golang.org/x/sync v0.10.0 // indirect diff --git a/go.sum b/go.sum index 1d27f2d6..0978db4d 100644 --- a/go.sum +++ b/go.sum @@ -6,6 +6,8 @@ github.com/smacker/go-tree-sitter v0.0.0-20240827094217-dd81d9e9be82 h1:6C8qej6f github.com/smacker/go-tree-sitter v0.0.0-20240827094217-dd81d9e9be82/go.mod h1:xe4pgH49k4SsmkQq5OT8abwhWmnzkhpgnXeekbx2efw= github.com/stretchr/testify v1.9.0 h1:HtqpIVDClZ4nwg75+f6Lvsy/wHu+3BoSGCbBAcpTsTg= github.com/stretchr/testify v1.9.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY= +golang.org/x/sync v0.10.0 h1:3NQrjDixjgGwUOCaF8w2+VYHv0Ve/vGYSbdkTa98gmQ= +golang.org/x/sync v0.10.0/go.mod h1:Czt+wKu1gCyEFDUtn0jG5QVvpJ6rzVqr5aXyt9drQfk= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405 h1:yhCVgyC4o1eVCa2tZl7eS0r+SDo693bJlVdllGtEeKM= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= diff --git a/internal/server/handlers.go b/internal/server/handlers.go index 923a90d6..d39ac0a0 100644 --- a/internal/server/handlers.go +++ b/internal/server/handlers.go @@ -16,8 +16,13 @@ func (s *Server) handleRoot(w http.ResponseWriter, r *http.Request) { return } - _, report, err := s.getResult() + _, report, err := s.getResult(r.Context()) if err != nil { + // If the client disconnected, http.Error writes are best-effort + // — drop them rather than logging a confusing 500. + if r.Context().Err() != nil { + return + } http.Error(w, "Analysis failed: "+err.Error(), http.StatusInternalServerError) return } @@ -45,9 +50,12 @@ func (s *Server) handleHealth(w http.ResponseWriter, _ *http.Request) { } // handleAnalyze returns the analysis report as JSON. -func (s *Server) handleAnalyze(w http.ResponseWriter, _ *http.Request) { - _, report, err := s.getResult() +func (s *Server) handleAnalyze(w http.ResponseWriter, r *http.Request) { + _, report, err := s.getResult(r.Context()) if err != nil { + if r.Context().Err() != nil { + return + } w.Header().Set("Content-Type", "application/json") w.WriteHeader(http.StatusInternalServerError) json.NewEncoder(w).Encode(map[string]string{"error": err.Error()}) diff --git a/internal/server/server.go b/internal/server/server.go index b17188e1..abcd4e70 100644 --- a/internal/server/server.go +++ b/internal/server/server.go @@ -19,12 +19,14 @@ // Referrer-Policy) are set on every response. // - Read-only flag enforces HTTP 405 on state-changing endpoints. // -// Known issues tracked for 0.2.1: -// - The analysis call inside getResult holds Server.mu for the full -// analysis duration, so one long analysis serializes all server -// requests behind it. Singleflight-based fix is on the 0.2.1 list. -// - HTTP handlers don't thread the request context into the engine, -// so client disconnect doesn't cancel in-flight analyses. Same PR. +// Concurrency model: +// - Cache reads use a sync.RWMutex; warm-cache hits don't block writers. +// - The slow path runs the analysis under singleflight so concurrent +// callers wait on a single in-flight analysis instead of stacking up. +// - Each handler threads r.Context() through getResult; a client +// disconnect returns ctx.Err() immediately, but the underlying +// analysis continues for any other waiters. (A future iteration +// could ref-count waiters and cancel when none remain.) // // Sandboxing AI eval execution and an actual auth model are 0.3 work; // until then, this is a *local development tool*, not a team dashboard. @@ -39,6 +41,8 @@ import ( "sync" "time" + "golang.org/x/sync/singleflight" + "github.com/pmclSF/terrain/internal/analyze" "github.com/pmclSF/terrain/internal/engine" ) @@ -73,7 +77,12 @@ type Server struct { root string cfg Config - mu sync.Mutex + // flight deduplicates concurrent in-flight analyses. Multiple + // pending requests for the same root share one analysis call; + // other handlers (e.g. /api/health) are not blocked. + flight singleflight.Group + + mu sync.RWMutex cachedAt time.Time cachedResult *engine.PipelineResult cachedReport *analyze.Report @@ -222,29 +231,74 @@ func (s *Server) originAllowed(r *http.Request) bool { const cacheTTL = 5 * time.Second // getResult returns a cached or fresh pipeline result and report. -func (s *Server) getResult() (*engine.PipelineResult, *analyze.Report, error) { - s.mu.Lock() - defer s.mu.Unlock() - +// +// The fast path is read-locked: cache hits don't block writers or each +// other. The slow path runs the analysis once per cache window even +// under concurrent load via singleflight; additional callers wait on +// the in-flight analysis instead of running their own. The caller's +// context (typically r.Context()) controls how long this function +// blocks: when the client disconnects, the function returns with +// ctx.Err() and the analysis continues in the background for any other +// waiters. A future iteration could reference-count waiters and cancel +// the analysis when none remain. +func (s *Server) getResult(ctx context.Context) (*engine.PipelineResult, *analyze.Report, error) { + // Fast path: cached and fresh. + s.mu.RLock() if s.cachedResult != nil && time.Since(s.cachedAt) < cacheTTL { - return s.cachedResult, s.cachedReport, nil + result, report := s.cachedResult, s.cachedReport + s.mu.RUnlock() + return result, report, nil } + s.mu.RUnlock() - result, err := engine.RunPipeline(s.root, engine.PipelineOptions{ - EngineVersion: "serve", - }) - if err != nil { - return nil, nil, err + type cached struct { + result *engine.PipelineResult + report *analyze.Report } - report := analyze.Build(&analyze.BuildInput{ - Snapshot: result.Snapshot, - HasPolicy: result.HasPolicy, - }) + ch := s.flight.DoChan("analyze", func() (any, error) { + // Re-check the cache under singleflight: another caller might + // have populated it while we were queued. + s.mu.RLock() + if s.cachedResult != nil && time.Since(s.cachedAt) < cacheTTL { + c := &cached{result: s.cachedResult, report: s.cachedReport} + s.mu.RUnlock() + return c, nil + } + s.mu.RUnlock() - s.cachedResult = result - s.cachedReport = report - s.cachedAt = time.Now() + // The shared analysis runs with context.Background() so a single + // caller's disconnect doesn't cancel an analysis that other + // waiters depend on. Per-caller cancellation is handled by the + // select below. + result, err := engine.RunPipelineContext(context.Background(), s.root, engine.PipelineOptions{ + EngineVersion: "serve", + }) + if err != nil { + return nil, err + } + report := analyze.Build(&analyze.BuildInput{ + Snapshot: result.Snapshot, + HasPolicy: result.HasPolicy, + }) + + s.mu.Lock() + s.cachedResult = result + s.cachedReport = report + s.cachedAt = time.Now() + s.mu.Unlock() + + return &cached{result: result, report: report}, nil + }) - return result, report, nil + select { + case res := <-ch: + if res.Err != nil { + return nil, nil, res.Err + } + c := res.Val.(*cached) + return c.result, c.report, nil + case <-ctx.Done(): + return nil, nil, ctx.Err() + } } diff --git a/internal/server/server_test.go b/internal/server/server_test.go index 0a25733c..dbc34775 100644 --- a/internal/server/server_test.go +++ b/internal/server/server_test.go @@ -1,10 +1,13 @@ package server import ( + "context" "encoding/json" + "errors" "net/http" "net/http/httptest" "strings" + "sync" "testing" "time" @@ -258,3 +261,93 @@ func TestSecurityMiddleware_BlocksHostileOrigin(t *testing.T) { t.Errorf("hostile-origin should not reach the inner handler") } } + +// TestGetResult_CacheHit verifies that a fresh cache short-circuits +// before the singleflight call (no analysis runs, regardless of +// context state). +func TestGetResult_CacheHit(t *testing.T) { + t.Parallel() + + s := newServerWithCachedReport() + want := s.cachedReport + + got, _, err := s.getResultReports(context.Background()) + if err != nil { + t.Fatalf("getResult on warm cache: %v", err) + } + if got != want { + t.Errorf("warm cache returned a different report pointer; expected the cached one") + } +} + +// TestGetResult_RespectsCanceledContext verifies that a request whose +// context is already canceled returns ctx.Err() promptly rather than +// blocking on analysis. Pre-fix, getResult held s.mu for the analysis +// duration and ignored the request context entirely. +func TestGetResult_RespectsCanceledContext(t *testing.T) { + t.Parallel() + + s := New(t.TempDir(), 0) + // Pre-cancel the context so the singleflight select returns via + // ctx.Done() without waiting on the analysis. + ctx, cancel := context.WithCancel(context.Background()) + cancel() + + done := make(chan error, 1) + go func() { + _, _, err := s.getResultReports(ctx) + done <- err + }() + + select { + case err := <-done: + if !errors.Is(err, context.Canceled) { + t.Errorf("getResult on canceled context: got %v, want context.Canceled", err) + } + case <-time.After(2 * time.Second): + t.Fatal("getResult did not return within 2s on canceled context") + } +} + +// TestGetResult_ConcurrentCallsShareCache verifies that N concurrent +// callers that hit the cache observe the same report pointer and don't +// trigger N analyses. The slow-path dedup is exercised by +// TestGetResult_RespectsCanceledContext (which cancels before the +// analysis completes); this test exercises the fast path. +func TestGetResult_ConcurrentCallsShareCache(t *testing.T) { + t.Parallel() + + s := newServerWithCachedReport() + want := s.cachedReport + + const N = 50 + var wg sync.WaitGroup + results := make([]*analyze.Report, N) + errs := make([]error, N) + for i := 0; i < N; i++ { + wg.Add(1) + go func(i int) { + defer wg.Done() + _, r, err := s.getResultReports(context.Background()) + results[i] = r + errs[i] = err + }(i) + } + wg.Wait() + + for i := 0; i < N; i++ { + if errs[i] != nil { + t.Errorf("call %d: unexpected error: %v", i, errs[i]) + } + if results[i] != want { + t.Errorf("call %d: returned different report pointer", i) + } + } +} + +// getResultReports is a test helper that swaps the (result, report, +// error) tuple ordering for tests that only care about the report. +func (s *Server) getResultReports(ctx context.Context) (*analyze.Report, *analyze.Report, error) { + _, report, err := s.getResult(ctx) + return report, report, err +}