Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 11 additions & 8 deletions cmd/floop/cmd_consolidate.go
Original file line number Diff line number Diff line change
Expand Up @@ -149,19 +149,22 @@ func runConsolidate(cmd *cobra.Command, args []string) error {
consolidator := consolidation.NewConsolidator(executor, llmClient, decisions, model)
runner := consolidation.NewRunner(consolidator)

result, err := runner.Run(ctx, evts, graphStore, consolidation.RunOptions{
result, runErr := runner.Run(ctx, evts, graphStore, consolidation.RunOptions{
DryRun: dryRun,
})
if err != nil {
return fmt.Errorf("consolidation pipeline: %w", err)
}

// Mark processed events as consolidated (prevents re-processing)
if !dryRun && len(result.SourceEventIDs) > 0 {
if err := es.MarkConsolidated(ctx, result.SourceEventIDs); err != nil {
return fmt.Errorf("marking events consolidated: %w", err)
// Mark successfully-processed events as consolidated even if a later
// session errored. Without this, the pipeline re-processes the same
// events on every invocation, wasting rate limit on redundant work.
if !dryRun && result != nil && len(result.SourceEventIDs) > 0 {
if markErr := es.MarkConsolidated(ctx, result.SourceEventIDs); markErr != nil {
fmt.Fprintf(cmd.ErrOrStderr(), "warning: failed to mark %d events consolidated: %v\n",
len(result.SourceEventIDs), markErr)
}
}
if runErr != nil {
return fmt.Errorf("consolidation pipeline: %w", runErr)
}

if jsonOut {
json.NewEncoder(out).Encode(map[string]interface{}{
Expand Down
129 changes: 129 additions & 0 deletions internal/consolidation/runner_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package consolidation
import (
"bufio"
"context"
"database/sql"
"encoding/json"
"fmt"
"os"
Expand All @@ -14,6 +15,7 @@ import (
"github.com/nvandessel/floop/internal/logging"
"github.com/nvandessel/floop/internal/models"
"github.com/nvandessel/floop/internal/store"
_ "modernc.org/sqlite"
)

// failOnSessionConsolidator wraps HeuristicConsolidator but fails Extract
Expand Down Expand Up @@ -101,6 +103,21 @@ func (s *stubConsolidator) Model() string {

func (s *stubConsolidator) SetRunID(id string) { s.runID = id }

// newTestEventStore creates a real SQLite event store for integration tests.
func newTestEventStore(t *testing.T) *events.SQLiteEventStore {
t.Helper()
db, err := sql.Open("sqlite", ":memory:")
if err != nil {
t.Fatalf("opening in-memory DB: %v", err)
}
t.Cleanup(func() { db.Close() })
es := events.NewSQLiteEventStore(db)
if err := es.InitSchema(context.Background()); err != nil {
t.Fatalf("InitSchema: %v", err)
}
return es
}

func TestRunner_DryRun(t *testing.T) {
h := NewHeuristicConsolidator()
runner := NewRunner(h)
Expand Down Expand Up @@ -948,6 +965,118 @@ func TestRunner_ContextCancelAfterRelate(t *testing.T) {
}
}

// TestRunner_MarkConsolidatedOnPartialFailure simulates the caller pattern from
// cmd_consolidate.go and handler_consolidate.go: when runner.Run returns both a
// result and an error, MarkConsolidated should still be called with the
// successful session's event IDs before the error is propagated.
func TestRunner_MarkConsolidatedOnPartialFailure(t *testing.T) {
c := &failOnSessionConsolidator{failSession: "sess-fail"}
runner := NewRunner(c)

evts := []events.Event{
{ID: "e1", SessionID: "sess-ok", Actor: "user", Kind: "correction", Content: "fix A"},
{ID: "e2", SessionID: "sess-ok", Actor: "user", Kind: "correction", Content: "fix B"},
{ID: "e3", SessionID: "sess-fail", Actor: "user", Kind: "correction", Content: "fail here"},
}

result, runErr := runner.Run(context.Background(), evts, nil, RunOptions{DryRun: true})
if runErr == nil {
t.Fatal("expected error from failing session")
}

// Simulate the fixed caller pattern: mark consolidated BEFORE checking error.
// This is what cmd_consolidate.go and handler_consolidate.go now do.
var markedIDs []string
if result != nil && len(result.SourceEventIDs) > 0 {
markedIDs = result.SourceEventIDs
}

// Successful session's events should be available for marking
if len(markedIDs) != 2 {
t.Fatalf("expected 2 event IDs from successful session, got %d: %v", len(markedIDs), markedIDs)
}

// Verify the correct IDs are present
idSet := map[string]bool{}
for _, id := range markedIDs {
idSet[id] = true
}
if !idSet["e1"] || !idSet["e2"] {
t.Errorf("expected e1 and e2 in marked IDs, got %v", markedIDs)
}
// e3 (from failed session) should NOT be in the list
if idSet["e3"] {
t.Error("e3 from failed session should not be in marked IDs")
}

// The original error should still propagate
if !strings.Contains(runErr.Error(), "sess-fail") {
t.Errorf("expected error mentioning sess-fail, got: %v", runErr)
}
}

// TestRunner_MarkConsolidatedOnPartialFailure_WithEventStore is an integration
// test that uses a real SQLite event store to verify the end-to-end flow:
// events from successful sessions get marked consolidated even when a later
// session fails.
func TestRunner_MarkConsolidatedOnPartialFailure_WithEventStore(t *testing.T) {
// Set up a real event store
es := newTestEventStore(t)

evts := []events.Event{
{ID: "e1", SessionID: "sess-ok", Actor: "user", Kind: "correction", Content: "fix A"},
{ID: "e2", SessionID: "sess-ok", Actor: "user", Kind: "correction", Content: "fix B"},
{ID: "e3", SessionID: "sess-fail", Actor: "user", Kind: "correction", Content: "fail here"},
}

ctx := context.Background()
for _, evt := range evts {
if err := es.Add(ctx, evt); err != nil {
t.Fatalf("Add(%s): %v", evt.ID, err)
}
}

// Run with a consolidator that fails on sess-fail
c := &failOnSessionConsolidator{failSession: "sess-fail"}
runner := NewRunner(c)
result, runErr := runner.Run(ctx, evts, nil, RunOptions{DryRun: true})

// Apply the fixed caller pattern
if result != nil && len(result.SourceEventIDs) > 0 {
if markErr := es.MarkConsolidated(ctx, result.SourceEventIDs); markErr != nil {
t.Fatalf("MarkConsolidated: %v", markErr)
}
}

// Verify the original error is still returned
if runErr == nil {
t.Fatal("expected error from failing session")
}

// Verify sess-ok events are marked consolidated
unconsolidated, err := es.GetUnconsolidated(ctx)
if err != nil {
t.Fatalf("GetUnconsolidated: %v", err)
}

unconsolidatedIDs := map[string]bool{}
for _, evt := range unconsolidated {
unconsolidatedIDs[evt.ID] = true
}

// e1 and e2 should be consolidated (not in unconsolidated list)
if unconsolidatedIDs["e1"] {
t.Error("e1 should be consolidated but is still unconsolidated")
}
if unconsolidatedIDs["e2"] {
t.Error("e2 should be consolidated but is still unconsolidated")
}
// e3 should still be unconsolidated
if !unconsolidatedIDs["e3"] {
t.Error("e3 should still be unconsolidated (from failed session)")
}
}

func TestRunner_PromoteError(t *testing.T) {
stub := &stubConsolidator{
promoteFn: func(_ context.Context, _ []ClassifiedMemory, _ []store.Edge, _ []MergeProposal, _ []int, _ store.GraphStore) (PromoteResult, error) {
Expand Down
20 changes: 11 additions & 9 deletions internal/mcp/handler_consolidate.go
Original file line number Diff line number Diff line change
Expand Up @@ -117,19 +117,21 @@ func (s *Server) handleFloopConsolidate(ctx context.Context, req *sdk.CallToolRe
}
}
c := consolidation.NewConsolidator(executor, s.llmClient, decisions, model)
result, err := consolidation.NewRunner(c).
result, runErr := consolidation.NewRunner(c).
Run(ctx, evts, s.store, consolidation.RunOptions{DryRun: args.DryRun})
if err != nil {
return nil, FloopConsolidateOutput{}, fmt.Errorf("consolidation pipeline failed: %w", err)
}

// Mark processed events as consolidated — fail the call if this errors,
// since leaving events unmarked will cause duplicate promotion on next run.
if !args.DryRun && len(result.SourceEventIDs) > 0 {
if err := s.eventStore.MarkConsolidated(ctx, result.SourceEventIDs); err != nil {
return nil, FloopConsolidateOutput{}, fmt.Errorf("marking events consolidated: %w", err)
// Mark successfully-processed events as consolidated even if a later
// session errored. Without this, the pipeline re-processes the same
// events on every invocation, wasting rate limit on redundant work.
if !args.DryRun && result != nil && len(result.SourceEventIDs) > 0 {
if markErr := s.eventStore.MarkConsolidated(ctx, result.SourceEventIDs); markErr != nil {
s.logger.Warn("failed to mark events consolidated",
"count", len(result.SourceEventIDs), "error", markErr)
}
}
if runErr != nil {
return nil, FloopConsolidateOutput{}, fmt.Errorf("consolidation pipeline failed: %w", runErr)
}

// Build candidate summaries
var candidateSummaries []CandidateSummary
Expand Down
Loading