Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
36 changes: 34 additions & 2 deletions agent/internal/agent/poller.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,25 +6,30 @@ import (
"log/slog"
"os"
"path/filepath"
"strings"

"github.com/TerrifiedBug/vectorflow/agent/internal/client"
"github.com/TerrifiedBug/vectorflow/agent/internal/config"
)

type configFetcher interface {
GetConfig() (*client.ConfigResponse, error)
}

type pipelineState struct {
checksum string
version int
}

type poller struct {
cfg *config.Config
client *client.Client
client configFetcher
known map[string]pipelineState // pipelineId -> last known state
sampleRequests []client.SampleRequestMsg
pendingAction *client.PendingAction
}

func newPoller(cfg *config.Config, c *client.Client) *poller {
func newPoller(cfg *config.Config, c configFetcher) *poller {
return &poller{
cfg: cfg,
client: c,
Expand Down Expand Up @@ -146,6 +151,33 @@ func (p *poller) Poll() ([]PipelineAction, error) {
}
}

// Reconcile: remove any pipeline files on disk not in the server response.
// This catches orphans from previous enrollments or missed undeploys.
entries, readErr := os.ReadDir(pipelinesDir)
if readErr != nil {
slog.Warn("failed to read pipelines dir for reconciliation", "error", readErr)
} else {
for _, entry := range entries {
name := entry.Name()
if strings.HasSuffix(name, ".vf-metrics.yaml") {
continue
}
if !strings.HasSuffix(name, ".yaml") {
continue
}
id := strings.TrimSuffix(name, ".yaml")
if !seen[id] {
slog.Warn("removing orphaned pipeline config", "pipelineId", id)
if err := os.Remove(filepath.Join(pipelinesDir, name)); err != nil && !os.IsNotExist(err) {
slog.Warn("failed to remove orphaned pipeline config", "path", name, "error", err)
}
if err := os.Remove(filepath.Join(pipelinesDir, name+".vf-metrics.yaml")); err != nil && !os.IsNotExist(err) {
slog.Warn("failed to remove orphaned metrics sidecar", "path", name+".vf-metrics.yaml", "error", err)
}
}
Comment on lines +170 to +177
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Unlogged os.Remove errors in reconciliation block

Both os.Remove calls discard their return values silently, even though a slog.Warn is already emitted a line above. Compare this to the supervisor's equivalent cleanup at supervisor.go:205–206, which logs a warning when the remove fails. If an orphaned file can't be deleted (e.g., locked by another process or a permission issue), there's no indication of the failure.

Suggested change
slog.Warn("removing orphaned pipeline config", "pipelineId", id)
os.Remove(filepath.Join(pipelinesDir, name))
os.Remove(filepath.Join(pipelinesDir, name+".vf-metrics.yaml"))
}
slog.Warn("removing orphaned pipeline config", "pipelineId", id)
if err := os.Remove(filepath.Join(pipelinesDir, name)); err != nil && !os.IsNotExist(err) {
slog.Warn("failed to remove orphaned pipeline config", "path", name, "error", err)
}
if err := os.Remove(filepath.Join(pipelinesDir, name+".vf-metrics.yaml")); err != nil && !os.IsNotExist(err) {
slog.Warn("failed to remove orphaned metrics sidecar", "path", name+".vf-metrics.yaml", "error", err)
}

}
}

// Store sample requests for the agent to process
p.sampleRequests = resp.SampleRequests

Expand Down
120 changes: 120 additions & 0 deletions agent/internal/agent/poller_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,120 @@
package agent

import (
"os"
"path/filepath"
"strings"
"testing"

"github.com/TerrifiedBug/vectorflow/agent/internal/client"
"github.com/TerrifiedBug/vectorflow/agent/internal/config"
)

type mockConfigFetcher struct {
resp *client.ConfigResponse
}

func (m *mockConfigFetcher) GetConfig() (*client.ConfigResponse, error) {
return m.resp, nil
}

func TestPoll_RemovesOrphanedPipelineFiles(t *testing.T) {
tmpDir := t.TempDir()
pipelinesDir := filepath.Join(tmpDir, "pipelines")
os.MkdirAll(pipelinesDir, 0700)

// Pre-existing files: one valid pipeline, one orphan, one orphan with metrics sidecar
os.WriteFile(filepath.Join(pipelinesDir, "pipeline-a.yaml"), []byte("valid"), 0600)
os.WriteFile(filepath.Join(pipelinesDir, "orphan-b.yaml"), []byte("stale"), 0600)
os.WriteFile(filepath.Join(pipelinesDir, "orphan-b.yaml.vf-metrics.yaml"), []byte("stale-metrics"), 0600)
os.WriteFile(filepath.Join(pipelinesDir, "orphan-c.yaml"), []byte("stale2"), 0600)
// Non-yaml file should be left alone
os.WriteFile(filepath.Join(pipelinesDir, "notes.txt"), []byte("keep"), 0600)

mc := &mockConfigFetcher{
resp: &client.ConfigResponse{
Pipelines: []client.PipelineConfig{
{
PipelineID: "pipeline-a",
PipelineName: "Pipeline A",
ConfigYaml: "valid",
Checksum: "abc123",
Version: 1,
},
},
},
}

p := &poller{
cfg: &config.Config{DataDir: tmpDir},
client: mc,
known: make(map[string]pipelineState),
}

actions, err := p.Poll()
if err != nil {
t.Fatalf("Poll() error: %v", err)
}

// pipeline-a should be started (new to known map)
if len(actions) != 1 || actions[0].PipelineID != "pipeline-a" {
t.Errorf("expected 1 start action for pipeline-a, got %d actions", len(actions))
}

// pipeline-a.yaml should still exist
if _, err := os.Stat(filepath.Join(pipelinesDir, "pipeline-a.yaml")); err != nil {
t.Error("pipeline-a.yaml should exist")
}

// orphan-b.yaml and its metrics sidecar should be deleted
if _, err := os.Stat(filepath.Join(pipelinesDir, "orphan-b.yaml")); !os.IsNotExist(err) {
t.Error("orphan-b.yaml should be deleted")
}
if _, err := os.Stat(filepath.Join(pipelinesDir, "orphan-b.yaml.vf-metrics.yaml")); !os.IsNotExist(err) {
t.Error("orphan-b.yaml.vf-metrics.yaml should be deleted")
}

// orphan-c.yaml should be deleted
if _, err := os.Stat(filepath.Join(pipelinesDir, "orphan-c.yaml")); !os.IsNotExist(err) {
t.Error("orphan-c.yaml should be deleted")
}

// notes.txt (non-yaml) should be left alone
if _, err := os.Stat(filepath.Join(pipelinesDir, "notes.txt")); err != nil {
t.Error("notes.txt should still exist")
}
}

func TestPoll_EmptyResponseCleansAllFiles(t *testing.T) {
tmpDir := t.TempDir()
pipelinesDir := filepath.Join(tmpDir, "pipelines")
os.MkdirAll(pipelinesDir, 0700)

os.WriteFile(filepath.Join(pipelinesDir, "old-a.yaml"), []byte("stale"), 0600)
os.WriteFile(filepath.Join(pipelinesDir, "old-a.yaml.vf-metrics.yaml"), []byte("stale-metrics"), 0600)
os.WriteFile(filepath.Join(pipelinesDir, "old-b.yaml"), []byte("stale"), 0600)

mc := &mockConfigFetcher{
resp: &client.ConfigResponse{
Pipelines: []client.PipelineConfig{},
},
}

p := &poller{
cfg: &config.Config{DataDir: tmpDir},
client: mc,
known: make(map[string]pipelineState),
}

_, err := p.Poll()
if err != nil {
t.Fatalf("Poll() error: %v", err)
}

entries, _ := os.ReadDir(pipelinesDir)
for _, e := range entries {
if strings.HasSuffix(e.Name(), ".yaml") {
t.Errorf("expected all yaml files deleted, found: %s", e.Name())
}
}
}
Loading