From 4db1d9a1fc23d8c7e98080a5bfd029ef9239c18a Mon Sep 17 00:00:00 2001 From: TerrifiedBug Date: Thu, 5 Mar 2026 12:37:56 +0000 Subject: [PATCH 1/2] fix: reconcile pipeline directory to remove orphaned configs After each successful poll, scan the pipelines directory and remove any YAML files (and their .vf-metrics.yaml sidecars) not present in the server response. This prevents stale configs from accumulating after node re-enrollment into a different environment or missed undeploys. Also extracts a configFetcher interface from the poller to enable unit testing without an HTTP server. --- agent/internal/agent/poller.go | 30 ++++++- agent/internal/agent/poller_test.go | 120 ++++++++++++++++++++++++++++ 2 files changed, 148 insertions(+), 2 deletions(-) create mode 100644 agent/internal/agent/poller_test.go diff --git a/agent/internal/agent/poller.go b/agent/internal/agent/poller.go index 3423e48..ca2a224 100644 --- a/agent/internal/agent/poller.go +++ b/agent/internal/agent/poller.go @@ -6,11 +6,16 @@ import ( "log/slog" "os" "path/filepath" + "strings" "github.com/TerrifiedBug/vectorflow/agent/internal/client" "github.com/TerrifiedBug/vectorflow/agent/internal/config" ) +type configFetcher interface { + GetConfig() (*client.ConfigResponse, error) +} + type pipelineState struct { checksum string version int @@ -18,13 +23,13 @@ type pipelineState struct { type poller struct { cfg *config.Config - client *client.Client + client configFetcher known map[string]pipelineState // pipelineId -> last known state sampleRequests []client.SampleRequestMsg pendingAction *client.PendingAction } -func newPoller(cfg *config.Config, c *client.Client) *poller { +func newPoller(cfg *config.Config, c configFetcher) *poller { return &poller{ cfg: cfg, client: c, @@ -146,6 +151,27 @@ func (p *poller) Poll() ([]PipelineAction, error) { } } + // Reconcile: remove any pipeline files on disk not in the server response. + // This catches orphans from previous enrollments or missed undeploys. + entries, readErr := os.ReadDir(pipelinesDir) + if readErr == nil { + for _, entry := range entries { + name := entry.Name() + if strings.HasSuffix(name, ".vf-metrics.yaml") { + continue + } + if !strings.HasSuffix(name, ".yaml") { + continue + } + id := strings.TrimSuffix(name, ".yaml") + if !seen[id] { + slog.Warn("removing orphaned pipeline config", "pipelineId", id) + os.Remove(filepath.Join(pipelinesDir, name)) + os.Remove(filepath.Join(pipelinesDir, name+".vf-metrics.yaml")) + } + } + } + // Store sample requests for the agent to process p.sampleRequests = resp.SampleRequests diff --git a/agent/internal/agent/poller_test.go b/agent/internal/agent/poller_test.go new file mode 100644 index 0000000..2102e73 --- /dev/null +++ b/agent/internal/agent/poller_test.go @@ -0,0 +1,120 @@ +package agent + +import ( + "os" + "path/filepath" + "strings" + "testing" + + "github.com/TerrifiedBug/vectorflow/agent/internal/client" + "github.com/TerrifiedBug/vectorflow/agent/internal/config" +) + +type mockConfigFetcher struct { + resp *client.ConfigResponse +} + +func (m *mockConfigFetcher) GetConfig() (*client.ConfigResponse, error) { + return m.resp, nil +} + +func TestPoll_RemovesOrphanedPipelineFiles(t *testing.T) { + tmpDir := t.TempDir() + pipelinesDir := filepath.Join(tmpDir, "pipelines") + os.MkdirAll(pipelinesDir, 0700) + + // Pre-existing files: one valid pipeline, one orphan, one orphan with metrics sidecar + os.WriteFile(filepath.Join(pipelinesDir, "pipeline-a.yaml"), []byte("valid"), 0600) + os.WriteFile(filepath.Join(pipelinesDir, "orphan-b.yaml"), []byte("stale"), 0600) + os.WriteFile(filepath.Join(pipelinesDir, "orphan-b.yaml.vf-metrics.yaml"), []byte("stale-metrics"), 0600) + os.WriteFile(filepath.Join(pipelinesDir, "orphan-c.yaml"), []byte("stale2"), 0600) + // Non-yaml file should be left alone + os.WriteFile(filepath.Join(pipelinesDir, "notes.txt"), []byte("keep"), 0600) + + mc := &mockConfigFetcher{ + resp: &client.ConfigResponse{ + Pipelines: []client.PipelineConfig{ + { + PipelineID: "pipeline-a", + PipelineName: "Pipeline A", + ConfigYaml: "valid", + Checksum: "abc123", + Version: 1, + }, + }, + }, + } + + p := &poller{ + cfg: &config.Config{DataDir: tmpDir}, + client: mc, + known: make(map[string]pipelineState), + } + + actions, err := p.Poll() + if err != nil { + t.Fatalf("Poll() error: %v", err) + } + + // pipeline-a should be started (new to known map) + if len(actions) != 1 || actions[0].PipelineID != "pipeline-a" { + t.Errorf("expected 1 start action for pipeline-a, got %d actions", len(actions)) + } + + // pipeline-a.yaml should still exist + if _, err := os.Stat(filepath.Join(pipelinesDir, "pipeline-a.yaml")); err != nil { + t.Error("pipeline-a.yaml should exist") + } + + // orphan-b.yaml and its metrics sidecar should be deleted + if _, err := os.Stat(filepath.Join(pipelinesDir, "orphan-b.yaml")); !os.IsNotExist(err) { + t.Error("orphan-b.yaml should be deleted") + } + if _, err := os.Stat(filepath.Join(pipelinesDir, "orphan-b.yaml.vf-metrics.yaml")); !os.IsNotExist(err) { + t.Error("orphan-b.yaml.vf-metrics.yaml should be deleted") + } + + // orphan-c.yaml should be deleted + if _, err := os.Stat(filepath.Join(pipelinesDir, "orphan-c.yaml")); !os.IsNotExist(err) { + t.Error("orphan-c.yaml should be deleted") + } + + // notes.txt (non-yaml) should be left alone + if _, err := os.Stat(filepath.Join(pipelinesDir, "notes.txt")); err != nil { + t.Error("notes.txt should still exist") + } +} + +func TestPoll_EmptyResponseCleansAllFiles(t *testing.T) { + tmpDir := t.TempDir() + pipelinesDir := filepath.Join(tmpDir, "pipelines") + os.MkdirAll(pipelinesDir, 0700) + + os.WriteFile(filepath.Join(pipelinesDir, "old-a.yaml"), []byte("stale"), 0600) + os.WriteFile(filepath.Join(pipelinesDir, "old-a.yaml.vf-metrics.yaml"), []byte("stale-metrics"), 0600) + os.WriteFile(filepath.Join(pipelinesDir, "old-b.yaml"), []byte("stale"), 0600) + + mc := &mockConfigFetcher{ + resp: &client.ConfigResponse{ + Pipelines: []client.PipelineConfig{}, + }, + } + + p := &poller{ + cfg: &config.Config{DataDir: tmpDir}, + client: mc, + known: make(map[string]pipelineState), + } + + _, err := p.Poll() + if err != nil { + t.Fatalf("Poll() error: %v", err) + } + + entries, _ := os.ReadDir(pipelinesDir) + for _, e := range entries { + if strings.HasSuffix(e.Name(), ".yaml") { + t.Errorf("expected all yaml files deleted, found: %s", e.Name()) + } + } +} From 851efcba4607909739f83da2499ba29d49a0dc7d Mon Sep 17 00:00:00 2001 From: TerrifiedBug Date: Thu, 5 Mar 2026 12:54:51 +0000 Subject: [PATCH 2/2] fix: address greptile review findings Log ReadDir errors during reconciliation instead of silently skipping. Log os.Remove failures for orphaned files, consistent with supervisor cleanup pattern. Guard with !os.IsNotExist to avoid noisy logs. --- agent/internal/agent/poller.go | 12 +++++++++--- 1 file changed, 9 insertions(+), 3 deletions(-) diff --git a/agent/internal/agent/poller.go b/agent/internal/agent/poller.go index ca2a224..c4d5410 100644 --- a/agent/internal/agent/poller.go +++ b/agent/internal/agent/poller.go @@ -154,7 +154,9 @@ func (p *poller) Poll() ([]PipelineAction, error) { // Reconcile: remove any pipeline files on disk not in the server response. // This catches orphans from previous enrollments or missed undeploys. entries, readErr := os.ReadDir(pipelinesDir) - if readErr == nil { + if readErr != nil { + slog.Warn("failed to read pipelines dir for reconciliation", "error", readErr) + } else { for _, entry := range entries { name := entry.Name() if strings.HasSuffix(name, ".vf-metrics.yaml") { @@ -166,8 +168,12 @@ func (p *poller) Poll() ([]PipelineAction, error) { id := strings.TrimSuffix(name, ".yaml") if !seen[id] { slog.Warn("removing orphaned pipeline config", "pipelineId", id) - os.Remove(filepath.Join(pipelinesDir, name)) - os.Remove(filepath.Join(pipelinesDir, name+".vf-metrics.yaml")) + if err := os.Remove(filepath.Join(pipelinesDir, name)); err != nil && !os.IsNotExist(err) { + slog.Warn("failed to remove orphaned pipeline config", "path", name, "error", err) + } + if err := os.Remove(filepath.Join(pipelinesDir, name+".vf-metrics.yaml")); err != nil && !os.IsNotExist(err) { + slog.Warn("failed to remove orphaned metrics sidecar", "path", name+".vf-metrics.yaml", "error", err) + } } } }