From c189f3b4995650ced0be2f0e7d70fd185d402fd2 Mon Sep 17 00:00:00 2001 From: Nic van Dessel <51134175+nvandessel@users.noreply.github.com> Date: Sun, 29 Mar 2026 17:09:31 +0000 Subject: [PATCH 1/3] fix: mark consolidated events even when later sessions error MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit When runner.Run() returned an error (e.g., one session's LLM call timed out), both cmd_consolidate.go and handler_consolidate.go returned early without calling MarkConsolidated. This left ALL events — including those from successfully-processed sessions — unmarked, causing the pipeline to re-process the same events on every invocation and burning rate limit on redundant work. Now MarkConsolidated runs before the error check, using the SourceEventIDs that the runner already preserves from successful sessions. The mark failure is logged as a warning rather than masking the original pipeline error. Co-Authored-By: Claude Opus 4.6 (1M context) --- cmd/floop/cmd_consolidate.go | 19 +++++++++++-------- internal/mcp/handler_consolidate.go | 21 ++++++++++++--------- 2 files changed, 23 insertions(+), 17 deletions(-) diff --git a/cmd/floop/cmd_consolidate.go b/cmd/floop/cmd_consolidate.go index c8a7afb..20c41ce 100644 --- a/cmd/floop/cmd_consolidate.go +++ b/cmd/floop/cmd_consolidate.go @@ -149,19 +149,22 @@ func runConsolidate(cmd *cobra.Command, args []string) error { consolidator := consolidation.NewConsolidator(executor, llmClient, decisions, model) runner := consolidation.NewRunner(consolidator) - result, err := runner.Run(ctx, evts, graphStore, consolidation.RunOptions{ + result, runErr := runner.Run(ctx, evts, graphStore, consolidation.RunOptions{ DryRun: dryRun, }) - if err != nil { - return fmt.Errorf("consolidation pipeline: %w", err) - } - // Mark processed events as consolidated (prevents re-processing) - if !dryRun && len(result.SourceEventIDs) > 0 { - if err := es.MarkConsolidated(ctx, result.SourceEventIDs); err != nil { - return fmt.Errorf("marking events consolidated: %w", err) + // Mark successfully-processed events as consolidated even if a later + // session errored. Without this, the pipeline re-processes the same + // events on every invocation, wasting rate limit on redundant work. + if !dryRun && result != nil && len(result.SourceEventIDs) > 0 { + if markErr := es.MarkConsolidated(ctx, result.SourceEventIDs); markErr != nil { + fmt.Fprintf(cmd.ErrOrStderr(), "warning: failed to mark %d events consolidated: %v\n", + len(result.SourceEventIDs), markErr) } } + if runErr != nil { + return fmt.Errorf("consolidation pipeline: %w", runErr) + } if jsonOut { json.NewEncoder(out).Encode(map[string]interface{}{ diff --git a/internal/mcp/handler_consolidate.go b/internal/mcp/handler_consolidate.go index 691ed71..f4331a4 100644 --- a/internal/mcp/handler_consolidate.go +++ b/internal/mcp/handler_consolidate.go @@ -3,6 +3,7 @@ package mcp import ( "context" "fmt" + "log/slog" "path/filepath" "time" @@ -117,19 +118,21 @@ func (s *Server) handleFloopConsolidate(ctx context.Context, req *sdk.CallToolRe } } c := consolidation.NewConsolidator(executor, s.llmClient, decisions, model) - result, err := consolidation.NewRunner(c). + result, runErr := consolidation.NewRunner(c). Run(ctx, evts, s.store, consolidation.RunOptions{DryRun: args.DryRun}) - if err != nil { - return nil, FloopConsolidateOutput{}, fmt.Errorf("consolidation pipeline failed: %w", err) - } - // Mark processed events as consolidated — fail the call if this errors, - // since leaving events unmarked will cause duplicate promotion on next run. - if !args.DryRun && len(result.SourceEventIDs) > 0 { - if err := s.eventStore.MarkConsolidated(ctx, result.SourceEventIDs); err != nil { - return nil, FloopConsolidateOutput{}, fmt.Errorf("marking events consolidated: %w", err) + // Mark successfully-processed events as consolidated even if a later + // session errored. Without this, the pipeline re-processes the same + // events on every invocation, wasting rate limit on redundant work. + if !args.DryRun && result != nil && len(result.SourceEventIDs) > 0 { + if markErr := s.eventStore.MarkConsolidated(ctx, result.SourceEventIDs); markErr != nil { + slog.Warn("failed to mark events consolidated", + "count", len(result.SourceEventIDs), "error", markErr) } } + if runErr != nil { + return nil, FloopConsolidateOutput{}, fmt.Errorf("consolidation pipeline failed: %w", runErr) + } // Build candidate summaries var candidateSummaries []CandidateSummary From 9df4d5467acfd2db120827f6072e65d8a9bb2468 Mon Sep 17 00:00:00 2001 From: Nic van Dessel <51134175+nvandessel@users.noreply.github.com> Date: Sun, 29 Mar 2026 20:35:14 +0000 Subject: [PATCH 2/3] fix: use s.logger instead of global slog per Greptile review Co-Authored-By: Claude Opus 4.6 (1M context) --- internal/mcp/handler_consolidate.go | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/internal/mcp/handler_consolidate.go b/internal/mcp/handler_consolidate.go index f4331a4..42c8949 100644 --- a/internal/mcp/handler_consolidate.go +++ b/internal/mcp/handler_consolidate.go @@ -3,7 +3,6 @@ package mcp import ( "context" "fmt" - "log/slog" "path/filepath" "time" @@ -126,7 +125,7 @@ func (s *Server) handleFloopConsolidate(ctx context.Context, req *sdk.CallToolRe // events on every invocation, wasting rate limit on redundant work. if !args.DryRun && result != nil && len(result.SourceEventIDs) > 0 { if markErr := s.eventStore.MarkConsolidated(ctx, result.SourceEventIDs); markErr != nil { - slog.Warn("failed to mark events consolidated", + s.logger.Warn("failed to mark events consolidated", "count", len(result.SourceEventIDs), "error", markErr) } } From 1da3818dbedef9d0fa58b6285166a64a338f952d Mon Sep 17 00:00:00 2001 From: Nic van Dessel <51134175+nvandessel@users.noreply.github.com> Date: Sun, 29 Mar 2026 23:05:42 +0000 Subject: [PATCH 3/3] test: add coverage for mark-consolidated-on-partial-failure pattern MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Two new tests: - TestRunner_MarkConsolidatedOnPartialFailure: verifies SourceEventIDs from successful sessions are available for marking when a later session fails (unit test, no I/O) - TestRunner_MarkConsolidatedOnPartialFailure_WithEventStore: end-to-end integration test with real SQLite event store — verifies that events from successful sessions are marked consolidated while events from failed sessions remain unconsolidated Co-Authored-By: Claude Opus 4.6 (1M context) --- internal/consolidation/runner_test.go | 129 ++++++++++++++++++++++++++ 1 file changed, 129 insertions(+) diff --git a/internal/consolidation/runner_test.go b/internal/consolidation/runner_test.go index 312aebc..e960b7b 100644 --- a/internal/consolidation/runner_test.go +++ b/internal/consolidation/runner_test.go @@ -3,6 +3,7 @@ package consolidation import ( "bufio" "context" + "database/sql" "encoding/json" "fmt" "os" @@ -14,6 +15,7 @@ import ( "github.com/nvandessel/floop/internal/logging" "github.com/nvandessel/floop/internal/models" "github.com/nvandessel/floop/internal/store" + _ "modernc.org/sqlite" ) // failOnSessionConsolidator wraps HeuristicConsolidator but fails Extract @@ -101,6 +103,21 @@ func (s *stubConsolidator) Model() string { func (s *stubConsolidator) SetRunID(id string) { s.runID = id } +// newTestEventStore creates a real SQLite event store for integration tests. +func newTestEventStore(t *testing.T) *events.SQLiteEventStore { + t.Helper() + db, err := sql.Open("sqlite", ":memory:") + if err != nil { + t.Fatalf("opening in-memory DB: %v", err) + } + t.Cleanup(func() { db.Close() }) + es := events.NewSQLiteEventStore(db) + if err := es.InitSchema(context.Background()); err != nil { + t.Fatalf("InitSchema: %v", err) + } + return es +} + func TestRunner_DryRun(t *testing.T) { h := NewHeuristicConsolidator() runner := NewRunner(h) @@ -948,6 +965,118 @@ func TestRunner_ContextCancelAfterRelate(t *testing.T) { } } +// TestRunner_MarkConsolidatedOnPartialFailure simulates the caller pattern from +// cmd_consolidate.go and handler_consolidate.go: when runner.Run returns both a +// result and an error, MarkConsolidated should still be called with the +// successful session's event IDs before the error is propagated. +func TestRunner_MarkConsolidatedOnPartialFailure(t *testing.T) { + c := &failOnSessionConsolidator{failSession: "sess-fail"} + runner := NewRunner(c) + + evts := []events.Event{ + {ID: "e1", SessionID: "sess-ok", Actor: "user", Kind: "correction", Content: "fix A"}, + {ID: "e2", SessionID: "sess-ok", Actor: "user", Kind: "correction", Content: "fix B"}, + {ID: "e3", SessionID: "sess-fail", Actor: "user", Kind: "correction", Content: "fail here"}, + } + + result, runErr := runner.Run(context.Background(), evts, nil, RunOptions{DryRun: true}) + if runErr == nil { + t.Fatal("expected error from failing session") + } + + // Simulate the fixed caller pattern: mark consolidated BEFORE checking error. + // This is what cmd_consolidate.go and handler_consolidate.go now do. + var markedIDs []string + if result != nil && len(result.SourceEventIDs) > 0 { + markedIDs = result.SourceEventIDs + } + + // Successful session's events should be available for marking + if len(markedIDs) != 2 { + t.Fatalf("expected 2 event IDs from successful session, got %d: %v", len(markedIDs), markedIDs) + } + + // Verify the correct IDs are present + idSet := map[string]bool{} + for _, id := range markedIDs { + idSet[id] = true + } + if !idSet["e1"] || !idSet["e2"] { + t.Errorf("expected e1 and e2 in marked IDs, got %v", markedIDs) + } + // e3 (from failed session) should NOT be in the list + if idSet["e3"] { + t.Error("e3 from failed session should not be in marked IDs") + } + + // The original error should still propagate + if !strings.Contains(runErr.Error(), "sess-fail") { + t.Errorf("expected error mentioning sess-fail, got: %v", runErr) + } +} + +// TestRunner_MarkConsolidatedOnPartialFailure_WithEventStore is an integration +// test that uses a real SQLite event store to verify the end-to-end flow: +// events from successful sessions get marked consolidated even when a later +// session fails. +func TestRunner_MarkConsolidatedOnPartialFailure_WithEventStore(t *testing.T) { + // Set up a real event store + es := newTestEventStore(t) + + evts := []events.Event{ + {ID: "e1", SessionID: "sess-ok", Actor: "user", Kind: "correction", Content: "fix A"}, + {ID: "e2", SessionID: "sess-ok", Actor: "user", Kind: "correction", Content: "fix B"}, + {ID: "e3", SessionID: "sess-fail", Actor: "user", Kind: "correction", Content: "fail here"}, + } + + ctx := context.Background() + for _, evt := range evts { + if err := es.Add(ctx, evt); err != nil { + t.Fatalf("Add(%s): %v", evt.ID, err) + } + } + + // Run with a consolidator that fails on sess-fail + c := &failOnSessionConsolidator{failSession: "sess-fail"} + runner := NewRunner(c) + result, runErr := runner.Run(ctx, evts, nil, RunOptions{DryRun: true}) + + // Apply the fixed caller pattern + if result != nil && len(result.SourceEventIDs) > 0 { + if markErr := es.MarkConsolidated(ctx, result.SourceEventIDs); markErr != nil { + t.Fatalf("MarkConsolidated: %v", markErr) + } + } + + // Verify the original error is still returned + if runErr == nil { + t.Fatal("expected error from failing session") + } + + // Verify sess-ok events are marked consolidated + unconsolidated, err := es.GetUnconsolidated(ctx) + if err != nil { + t.Fatalf("GetUnconsolidated: %v", err) + } + + unconsolidatedIDs := map[string]bool{} + for _, evt := range unconsolidated { + unconsolidatedIDs[evt.ID] = true + } + + // e1 and e2 should be consolidated (not in unconsolidated list) + if unconsolidatedIDs["e1"] { + t.Error("e1 should be consolidated but is still unconsolidated") + } + if unconsolidatedIDs["e2"] { + t.Error("e2 should be consolidated but is still unconsolidated") + } + // e3 should still be unconsolidated + if !unconsolidatedIDs["e3"] { + t.Error("e3 should still be unconsolidated (from failed session)") + } +} + func TestRunner_PromoteError(t *testing.T) { stub := &stubConsolidator{ promoteFn: func(_ context.Context, _ []ClassifiedMemory, _ []store.Edge, _ []MergeProposal, _ []int, _ store.GraphStore) (PromoteResult, error) {