diff --git a/cmd/floop/cmd_consolidate.go b/cmd/floop/cmd_consolidate.go index c8a7afb..20c41ce 100644 --- a/cmd/floop/cmd_consolidate.go +++ b/cmd/floop/cmd_consolidate.go @@ -149,19 +149,22 @@ func runConsolidate(cmd *cobra.Command, args []string) error { consolidator := consolidation.NewConsolidator(executor, llmClient, decisions, model) runner := consolidation.NewRunner(consolidator) - result, err := runner.Run(ctx, evts, graphStore, consolidation.RunOptions{ + result, runErr := runner.Run(ctx, evts, graphStore, consolidation.RunOptions{ DryRun: dryRun, }) - if err != nil { - return fmt.Errorf("consolidation pipeline: %w", err) - } - // Mark processed events as consolidated (prevents re-processing) - if !dryRun && len(result.SourceEventIDs) > 0 { - if err := es.MarkConsolidated(ctx, result.SourceEventIDs); err != nil { - return fmt.Errorf("marking events consolidated: %w", err) + // Mark successfully-processed events as consolidated even if a later + // session errored. Without this, the pipeline re-processes the same + // events on every invocation, wasting rate limit on redundant work. + if !dryRun && result != nil && len(result.SourceEventIDs) > 0 { + if markErr := es.MarkConsolidated(ctx, result.SourceEventIDs); markErr != nil { + fmt.Fprintf(cmd.ErrOrStderr(), "warning: failed to mark %d events consolidated: %v\n", + len(result.SourceEventIDs), markErr) } } + if runErr != nil { + return fmt.Errorf("consolidation pipeline: %w", runErr) + } if jsonOut { json.NewEncoder(out).Encode(map[string]interface{}{ diff --git a/internal/consolidation/runner_test.go b/internal/consolidation/runner_test.go index 312aebc..e960b7b 100644 --- a/internal/consolidation/runner_test.go +++ b/internal/consolidation/runner_test.go @@ -3,6 +3,7 @@ package consolidation import ( "bufio" "context" + "database/sql" "encoding/json" "fmt" "os" @@ -14,6 +15,7 @@ import ( "github.com/nvandessel/floop/internal/logging" "github.com/nvandessel/floop/internal/models" "github.com/nvandessel/floop/internal/store" + _ "modernc.org/sqlite" ) // failOnSessionConsolidator wraps HeuristicConsolidator but fails Extract @@ -101,6 +103,21 @@ func (s *stubConsolidator) Model() string { func (s *stubConsolidator) SetRunID(id string) { s.runID = id } +// newTestEventStore creates a real SQLite event store for integration tests. +func newTestEventStore(t *testing.T) *events.SQLiteEventStore { + t.Helper() + db, err := sql.Open("sqlite", ":memory:") + if err != nil { + t.Fatalf("opening in-memory DB: %v", err) + } + t.Cleanup(func() { db.Close() }) + es := events.NewSQLiteEventStore(db) + if err := es.InitSchema(context.Background()); err != nil { + t.Fatalf("InitSchema: %v", err) + } + return es +} + func TestRunner_DryRun(t *testing.T) { h := NewHeuristicConsolidator() runner := NewRunner(h) @@ -948,6 +965,118 @@ func TestRunner_ContextCancelAfterRelate(t *testing.T) { } } +// TestRunner_MarkConsolidatedOnPartialFailure simulates the caller pattern from +// cmd_consolidate.go and handler_consolidate.go: when runner.Run returns both a +// result and an error, MarkConsolidated should still be called with the +// successful session's event IDs before the error is propagated. +func TestRunner_MarkConsolidatedOnPartialFailure(t *testing.T) { + c := &failOnSessionConsolidator{failSession: "sess-fail"} + runner := NewRunner(c) + + evts := []events.Event{ + {ID: "e1", SessionID: "sess-ok", Actor: "user", Kind: "correction", Content: "fix A"}, + {ID: "e2", SessionID: "sess-ok", Actor: "user", Kind: "correction", Content: "fix B"}, + {ID: "e3", SessionID: "sess-fail", Actor: "user", Kind: "correction", Content: "fail here"}, + } + + result, runErr := runner.Run(context.Background(), evts, nil, RunOptions{DryRun: true}) + if runErr == nil { + t.Fatal("expected error from failing session") + } + + // Simulate the fixed caller pattern: mark consolidated BEFORE checking error. + // This is what cmd_consolidate.go and handler_consolidate.go now do. + var markedIDs []string + if result != nil && len(result.SourceEventIDs) > 0 { + markedIDs = result.SourceEventIDs + } + + // Successful session's events should be available for marking + if len(markedIDs) != 2 { + t.Fatalf("expected 2 event IDs from successful session, got %d: %v", len(markedIDs), markedIDs) + } + + // Verify the correct IDs are present + idSet := map[string]bool{} + for _, id := range markedIDs { + idSet[id] = true + } + if !idSet["e1"] || !idSet["e2"] { + t.Errorf("expected e1 and e2 in marked IDs, got %v", markedIDs) + } + // e3 (from failed session) should NOT be in the list + if idSet["e3"] { + t.Error("e3 from failed session should not be in marked IDs") + } + + // The original error should still propagate + if !strings.Contains(runErr.Error(), "sess-fail") { + t.Errorf("expected error mentioning sess-fail, got: %v", runErr) + } +} + +// TestRunner_MarkConsolidatedOnPartialFailure_WithEventStore is an integration +// test that uses a real SQLite event store to verify the end-to-end flow: +// events from successful sessions get marked consolidated even when a later +// session fails. +func TestRunner_MarkConsolidatedOnPartialFailure_WithEventStore(t *testing.T) { + // Set up a real event store + es := newTestEventStore(t) + + evts := []events.Event{ + {ID: "e1", SessionID: "sess-ok", Actor: "user", Kind: "correction", Content: "fix A"}, + {ID: "e2", SessionID: "sess-ok", Actor: "user", Kind: "correction", Content: "fix B"}, + {ID: "e3", SessionID: "sess-fail", Actor: "user", Kind: "correction", Content: "fail here"}, + } + + ctx := context.Background() + for _, evt := range evts { + if err := es.Add(ctx, evt); err != nil { + t.Fatalf("Add(%s): %v", evt.ID, err) + } + } + + // Run with a consolidator that fails on sess-fail + c := &failOnSessionConsolidator{failSession: "sess-fail"} + runner := NewRunner(c) + result, runErr := runner.Run(ctx, evts, nil, RunOptions{DryRun: true}) + + // Apply the fixed caller pattern + if result != nil && len(result.SourceEventIDs) > 0 { + if markErr := es.MarkConsolidated(ctx, result.SourceEventIDs); markErr != nil { + t.Fatalf("MarkConsolidated: %v", markErr) + } + } + + // Verify the original error is still returned + if runErr == nil { + t.Fatal("expected error from failing session") + } + + // Verify sess-ok events are marked consolidated + unconsolidated, err := es.GetUnconsolidated(ctx) + if err != nil { + t.Fatalf("GetUnconsolidated: %v", err) + } + + unconsolidatedIDs := map[string]bool{} + for _, evt := range unconsolidated { + unconsolidatedIDs[evt.ID] = true + } + + // e1 and e2 should be consolidated (not in unconsolidated list) + if unconsolidatedIDs["e1"] { + t.Error("e1 should be consolidated but is still unconsolidated") + } + if unconsolidatedIDs["e2"] { + t.Error("e2 should be consolidated but is still unconsolidated") + } + // e3 should still be unconsolidated + if !unconsolidatedIDs["e3"] { + t.Error("e3 should still be unconsolidated (from failed session)") + } +} + func TestRunner_PromoteError(t *testing.T) { stub := &stubConsolidator{ promoteFn: func(_ context.Context, _ []ClassifiedMemory, _ []store.Edge, _ []MergeProposal, _ []int, _ store.GraphStore) (PromoteResult, error) { diff --git a/internal/mcp/handler_consolidate.go b/internal/mcp/handler_consolidate.go index 691ed71..42c8949 100644 --- a/internal/mcp/handler_consolidate.go +++ b/internal/mcp/handler_consolidate.go @@ -117,19 +117,21 @@ func (s *Server) handleFloopConsolidate(ctx context.Context, req *sdk.CallToolRe } } c := consolidation.NewConsolidator(executor, s.llmClient, decisions, model) - result, err := consolidation.NewRunner(c). + result, runErr := consolidation.NewRunner(c). Run(ctx, evts, s.store, consolidation.RunOptions{DryRun: args.DryRun}) - if err != nil { - return nil, FloopConsolidateOutput{}, fmt.Errorf("consolidation pipeline failed: %w", err) - } - // Mark processed events as consolidated — fail the call if this errors, - // since leaving events unmarked will cause duplicate promotion on next run. - if !args.DryRun && len(result.SourceEventIDs) > 0 { - if err := s.eventStore.MarkConsolidated(ctx, result.SourceEventIDs); err != nil { - return nil, FloopConsolidateOutput{}, fmt.Errorf("marking events consolidated: %w", err) + // Mark successfully-processed events as consolidated even if a later + // session errored. Without this, the pipeline re-processes the same + // events on every invocation, wasting rate limit on redundant work. + if !args.DryRun && result != nil && len(result.SourceEventIDs) > 0 { + if markErr := s.eventStore.MarkConsolidated(ctx, result.SourceEventIDs); markErr != nil { + s.logger.Warn("failed to mark events consolidated", + "count", len(result.SourceEventIDs), "error", markErr) } } + if runErr != nil { + return nil, FloopConsolidateOutput{}, fmt.Errorf("consolidation pipeline failed: %w", runErr) + } // Build candidate summaries var candidateSummaries []CandidateSummary