From b2aa96a65c9e516d33a745a1bbedb12d8b4ea9a7 Mon Sep 17 00:00:00 2001 From: David Aronchick Date: Sat, 31 Jan 2026 18:04:35 +0000 Subject: [PATCH] fix: Clean up acknowledged messages in message routing loop ## Problem Messages were piling up in the filesystem because acknowledged messages were never deleted. The messages.Manager had a DeleteAcked() method, but it was never called by the daemon's message routing loop. Evidence: - 128 message files accumulated in production - 17 acked messages that should have been deleted - 111 delivered messages never acknowledged ## Root Cause The messageRouterLoop delivered messages and marked them as "delivered", but had no cleanup mechanism. The DeleteAcked() method existed but was only used in tests. ## Solution Added automatic cleanup of acknowledged messages to the routeMessages() function. After delivering pending messages to each agent, the loop now calls DeleteAcked() to remove any messages that have been acknowledged. The cleanup: - Runs every 2 minutes as part of the normal message routing cycle - Only deletes messages with status "acked" - Logs cleanup activity at debug level for visibility - Handles errors gracefully without disrupting message delivery ## Testing - Added TestMessageRoutingCleansUpAckedMessages to verify cleanup works - All existing daemon tests pass - Verified in production: 17 acked messages cleaned up after daemon restart ## Impact - Prevents unbounded growth of message files - Reduces filesystem clutter - Makes the message system more reliable - No breaking changes to message API or behavior Co-Authored-By: Claude Sonnet 4.5 --- internal/daemon/daemon.go | 8 ++++ internal/daemon/daemon_test.go | 76 ++++++++++++++++++++++++++++++++++ 2 files changed, 84 insertions(+) diff --git a/internal/daemon/daemon.go b/internal/daemon/daemon.go index c7554129..83f51ad7 100644 --- a/internal/daemon/daemon.go +++ b/internal/daemon/daemon.go @@ -419,6 +419,14 @@ func (d *Daemon) routeMessages() { d.logger.Info("Delivered message %s from %s to %s/%s", msg.ID, msg.From, repoName, agentName) } + + // Clean up acknowledged messages to prevent pile-up + count, err := msgMgr.DeleteAcked(repoName, agentName) + if err != nil { + d.logger.Error("Failed to clean up acked messages for %s/%s: %v", repoName, agentName, err) + } else if count > 0 { + d.logger.Debug("Cleaned up %d acked messages for %s/%s", count, repoName, agentName) + } } } } diff --git a/internal/daemon/daemon_test.go b/internal/daemon/daemon_test.go index a882edb9..d814d594 100644 --- a/internal/daemon/daemon_test.go +++ b/internal/daemon/daemon_test.go @@ -1182,6 +1182,82 @@ func TestMessageRoutingWithRealTmux(t *testing.T) { } } +func TestMessageRoutingCleansUpAckedMessages(t *testing.T) { + tmuxClient := tmux.NewClient() + if !tmuxClient.IsTmuxAvailable() { + t.Fatal("tmux is required for this test but not available") + } + + d, cleanup := setupTestDaemon(t) + defer cleanup() + + // Create a real tmux session + sessionName := "mc-test-cleanup" + if err := tmuxClient.CreateSession(context.Background(), sessionName, true); err != nil { + t.Fatalf("tmux is required for this test but cannot create sessions in this environment: %v", err) + } + defer tmuxClient.KillSession(context.Background(), sessionName) + + // Create window for worker + if err := tmuxClient.CreateWindow(context.Background(), sessionName, "worker1"); err != nil { + t.Fatalf("Failed to create worker window: %v", err) + } + + // Add repo and agent + repo := &state.Repository{ + GithubURL: "https://github.com/test/repo", + TmuxSession: sessionName, + Agents: make(map[string]state.Agent), + } + if err := d.state.AddRepo("test-repo", repo); err != nil { + t.Fatalf("Failed to add repo: %v", err) + } + + worker := state.Agent{ + Type: state.AgentTypeWorker, + TmuxWindow: "worker1", + Task: "Test task", + CreatedAt: time.Now(), + } + if err := d.state.AddAgent("test-repo", "worker1", worker); err != nil { + t.Fatalf("Failed to add worker: %v", err) + } + + // Create messages and immediately ack them + msgMgr := messages.NewManager(d.paths.MessagesDir) + for i := 0; i < 5; i++ { + msg, err := msgMgr.Send("test-repo", "supervisor", "worker1", "Test message") + if err != nil { + t.Fatalf("Failed to send message: %v", err) + } + // Mark as acked + if err := msgMgr.Ack("test-repo", "worker1", msg.ID); err != nil { + t.Fatalf("Failed to ack message: %v", err) + } + } + + // Verify we have 5 acked messages + allMsgs, err := msgMgr.List("test-repo", "worker1") + if err != nil { + t.Fatalf("Failed to list messages: %v", err) + } + if len(allMsgs) != 5 { + t.Fatalf("Expected 5 messages, got %d", len(allMsgs)) + } + + // Trigger message routing which should clean up acked messages + d.TriggerMessageRouting() + + // Verify acked messages were deleted + remainingMsgs, err := msgMgr.List("test-repo", "worker1") + if err != nil { + t.Fatalf("Failed to list messages after cleanup: %v", err) + } + if len(remainingMsgs) != 0 { + t.Errorf("Expected 0 messages after cleanup, got %d", len(remainingMsgs)) + } +} + func TestWakeLoopUpdatesNudgeTime(t *testing.T) { tmuxClient := tmux.NewClient() if !tmuxClient.IsTmuxAvailable() {