Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -5,3 +5,5 @@ go 1.23
require gopkg.in/yaml.v3 v3.0.1

require github.com/smacker/go-tree-sitter v0.0.0-20240827094217-dd81d9e9be82

require golang.org/x/sync v0.10.0 // indirect
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@ github.com/smacker/go-tree-sitter v0.0.0-20240827094217-dd81d9e9be82 h1:6C8qej6f
github.com/smacker/go-tree-sitter v0.0.0-20240827094217-dd81d9e9be82/go.mod h1:xe4pgH49k4SsmkQq5OT8abwhWmnzkhpgnXeekbx2efw=
github.com/stretchr/testify v1.9.0 h1:HtqpIVDClZ4nwg75+f6Lvsy/wHu+3BoSGCbBAcpTsTg=
github.com/stretchr/testify v1.9.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY=
golang.org/x/sync v0.10.0 h1:3NQrjDixjgGwUOCaF8w2+VYHv0Ve/vGYSbdkTa98gmQ=
golang.org/x/sync v0.10.0/go.mod h1:Czt+wKu1gCyEFDUtn0jG5QVvpJ6rzVqr5aXyt9drQfk=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405 h1:yhCVgyC4o1eVCa2tZl7eS0r+SDo693bJlVdllGtEeKM=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA=
Expand Down
14 changes: 11 additions & 3 deletions internal/server/handlers.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,13 @@ func (s *Server) handleRoot(w http.ResponseWriter, r *http.Request) {
return
}

_, report, err := s.getResult()
_, report, err := s.getResult(r.Context())
if err != nil {
// If the client disconnected, http.Error writes are best-effort
// — drop them rather than logging a confusing 500.
if r.Context().Err() != nil {
return
}
http.Error(w, "Analysis failed: "+err.Error(), http.StatusInternalServerError)
return
}
Expand Down Expand Up @@ -45,9 +50,12 @@ func (s *Server) handleHealth(w http.ResponseWriter, _ *http.Request) {
}

// handleAnalyze returns the analysis report as JSON.
func (s *Server) handleAnalyze(w http.ResponseWriter, _ *http.Request) {
_, report, err := s.getResult()
func (s *Server) handleAnalyze(w http.ResponseWriter, r *http.Request) {
_, report, err := s.getResult(r.Context())
if err != nil {
if r.Context().Err() != nil {
return
}
w.Header().Set("Content-Type", "application/json")
w.WriteHeader(http.StatusInternalServerError)
json.NewEncoder(w).Encode(map[string]string{"error": err.Error()})
Expand Down
104 changes: 79 additions & 25 deletions internal/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,14 @@
// Referrer-Policy) are set on every response.
// - Read-only flag enforces HTTP 405 on state-changing endpoints.
//
// Known issues tracked for 0.2.1:
// - The analysis call inside getResult holds Server.mu for the full
// analysis duration, so one long analysis serializes all server
// requests behind it. Singleflight-based fix is on the 0.2.1 list.
// - HTTP handlers don't thread the request context into the engine,
// so client disconnect doesn't cancel in-flight analyses. Same PR.
// Concurrency model:
// - Cache reads use a sync.RWMutex; warm-cache hits don't block writers.
// - The slow path runs the analysis under singleflight so concurrent
// callers wait on a single in-flight analysis instead of stacking up.
// - Each handler threads r.Context() through getResult; a client
// disconnect returns ctx.Err() immediately, but the underlying
// analysis continues for any other waiters. (A future iteration
// could ref-count waiters and cancel when none remain.)
//
// Sandboxing AI eval execution and an actual auth model are 0.3 work;
// until then, this is a *local development tool*, not a team dashboard.
Expand All @@ -39,6 +41,8 @@ import (
"sync"
"time"

"golang.org/x/sync/singleflight"

"github.com/pmclSF/terrain/internal/analyze"
"github.com/pmclSF/terrain/internal/engine"
)
Expand Down Expand Up @@ -73,7 +77,12 @@ type Server struct {
root string
cfg Config

mu sync.Mutex
// flight deduplicates concurrent in-flight analyses. Multiple
// pending requests for the same root share one analysis call;
// other handlers (e.g. /api/health) are not blocked.
flight singleflight.Group

mu sync.RWMutex
cachedAt time.Time
cachedResult *engine.PipelineResult
cachedReport *analyze.Report
Expand Down Expand Up @@ -222,29 +231,74 @@ func (s *Server) originAllowed(r *http.Request) bool {
const cacheTTL = 5 * time.Second

// getResult returns a cached or fresh pipeline result and report.
func (s *Server) getResult() (*engine.PipelineResult, *analyze.Report, error) {
s.mu.Lock()
defer s.mu.Unlock()

//
// The fast path is read-locked: cache hits don't block writers or each
// other. The slow path runs the analysis once per cache window even
// under concurrent load via singleflight; additional callers wait on
// the in-flight analysis instead of running their own. The caller's
// context (typically r.Context()) controls how long this function
// blocks: when the client disconnects, the function returns with
// ctx.Err() and the analysis continues in the background for any other
// waiters. A future iteration could reference-count waiters and cancel
// the analysis when none remain.
func (s *Server) getResult(ctx context.Context) (*engine.PipelineResult, *analyze.Report, error) {
// Fast path: cached and fresh.
s.mu.RLock()
if s.cachedResult != nil && time.Since(s.cachedAt) < cacheTTL {
return s.cachedResult, s.cachedReport, nil
result, report := s.cachedResult, s.cachedReport
s.mu.RUnlock()
return result, report, nil
}
s.mu.RUnlock()

result, err := engine.RunPipeline(s.root, engine.PipelineOptions{
EngineVersion: "serve",
})
if err != nil {
return nil, nil, err
type cached struct {
result *engine.PipelineResult
report *analyze.Report
}

report := analyze.Build(&analyze.BuildInput{
Snapshot: result.Snapshot,
HasPolicy: result.HasPolicy,
})
ch := s.flight.DoChan("analyze", func() (any, error) {
// Re-check the cache under singleflight: another caller might
// have populated it while we were queued.
s.mu.RLock()
if s.cachedResult != nil && time.Since(s.cachedAt) < cacheTTL {
c := &cached{result: s.cachedResult, report: s.cachedReport}
s.mu.RUnlock()
return c, nil
}
s.mu.RUnlock()

s.cachedResult = result
s.cachedReport = report
s.cachedAt = time.Now()
// The shared analysis runs with context.Background() so a single
// caller's disconnect doesn't cancel an analysis that other
// waiters depend on. Per-caller cancellation is handled by the
// select below.
result, err := engine.RunPipelineContext(context.Background(), s.root, engine.PipelineOptions{
EngineVersion: "serve",
})
if err != nil {
return nil, err
}
report := analyze.Build(&analyze.BuildInput{
Snapshot: result.Snapshot,
HasPolicy: result.HasPolicy,
})

s.mu.Lock()
s.cachedResult = result
s.cachedReport = report
s.cachedAt = time.Now()
s.mu.Unlock()

return &cached{result: result, report: report}, nil
})

return result, report, nil
select {
case res := <-ch:
if res.Err != nil {
return nil, nil, res.Err
}
c := res.Val.(*cached)
return c.result, c.report, nil
case <-ctx.Done():
return nil, nil, ctx.Err()
}
}
93 changes: 93 additions & 0 deletions internal/server/server_test.go
Original file line number Diff line number Diff line change
@@ -1,10 +1,13 @@
package server

import (
"context"
"encoding/json"
"errors"
"net/http"
"net/http/httptest"
"strings"
"sync"
"testing"
"time"

Expand Down Expand Up @@ -258,3 +261,93 @@ func TestSecurityMiddleware_BlocksHostileOrigin(t *testing.T) {
t.Errorf("hostile-origin should not reach the inner handler")
}
}

// TestGetResult_CacheHit verifies that a fresh cache short-circuits
// before the singleflight call (no analysis runs, regardless of
// context state).
func TestGetResult_CacheHit(t *testing.T) {
t.Parallel()

s := newServerWithCachedReport()
want := s.cachedReport

got, _, err := s.getResultReports(context.Background())
if err != nil {
t.Fatalf("getResult on warm cache: %v", err)
}
if got != want {
t.Errorf("warm cache returned a different report pointer; expected the cached one")
}
}

// TestGetResult_RespectsCanceledContext verifies that a request whose
// context is already canceled returns ctx.Err() promptly rather than
// blocking on analysis. Pre-fix, getResult held s.mu for the analysis
// duration and ignored the request context entirely.
func TestGetResult_RespectsCanceledContext(t *testing.T) {
t.Parallel()

s := New(t.TempDir(), 0)
// Pre-cancel the context so the singleflight select returns via
// ctx.Done() without waiting on the analysis.
ctx, cancel := context.WithCancel(context.Background())
cancel()

done := make(chan error, 1)
go func() {
_, _, err := s.getResultReports(ctx)
done <- err
}()

select {
case err := <-done:
if !errors.Is(err, context.Canceled) {
t.Errorf("getResult on canceled context: got %v, want context.Canceled", err)
}
case <-time.After(2 * time.Second):
t.Fatal("getResult did not return within 2s on canceled context")
}
}

// TestGetResult_ConcurrentCallsShareCache verifies that N concurrent
// callers that hit the cache observe the same report pointer and don't
// trigger N analyses. The slow-path dedup is exercised by
// TestGetResult_RespectsCanceledContext (which cancels before the
// analysis completes); this test exercises the fast path.
func TestGetResult_ConcurrentCallsShareCache(t *testing.T) {
t.Parallel()

s := newServerWithCachedReport()
want := s.cachedReport

const N = 50
var wg sync.WaitGroup
results := make([]*analyze.Report, N)
errs := make([]error, N)
for i := 0; i < N; i++ {
wg.Add(1)
go func(i int) {
defer wg.Done()
_, r, err := s.getResultReports(context.Background())
results[i] = r
errs[i] = err
}(i)
}
wg.Wait()

for i := 0; i < N; i++ {
if errs[i] != nil {
t.Errorf("call %d: unexpected error: %v", i, errs[i])
}
if results[i] != want {
t.Errorf("call %d: returned different report pointer", i)
}
}
}

// getResultReports is a test helper that swaps the (result, report,
// error) tuple ordering for tests that only care about the report.
func (s *Server) getResultReports(ctx context.Context) (*analyze.Report, *analyze.Report, error) {
_, report, err := s.getResult(ctx)
return report, report, err
}
Loading