From 98844505ed20c3443947f7911bc8f1741de941c1 Mon Sep 17 00:00:00 2001 From: TerrifiedBug Date: Thu, 5 Mar 2026 18:15:30 +0000 Subject: [PATCH 1/3] fix: stop infinite self-update retry loop on failure When a self-update failed (e.g., checksum mismatch from stale server cache), the pendingAction was never cleared. The server kept sending it every 15s config poll, causing an infinite retry loop. Fix: - Agent tracks failed update version and skips retries for same version - Agent reports updateError in heartbeat payload - Server clears pendingAction when agent reports update failure --- agent/internal/agent/agent.go | 16 ++++++++++++++-- agent/internal/client/client.go | 1 + src/app/api/agent/heartbeat/route.ts | 11 ++++++++--- 3 files changed, 23 insertions(+), 5 deletions(-) diff --git a/agent/internal/agent/agent.go b/agent/internal/agent/agent.go index f79c544..b933110 100644 --- a/agent/internal/agent/agent.go +++ b/agent/internal/agent/agent.go @@ -28,8 +28,10 @@ type Agent struct { vectorVersion string deploymentMode string - mu sync.Mutex - sampleResults []client.SampleResultMsg + mu sync.Mutex + sampleResults []client.SampleResultMsg + failedUpdateVersion string // skip retries for this version + updateError string // report failure to server } func New(cfg *config.Config) (*Agent, error) { @@ -146,10 +148,16 @@ func (a *Agent) handlePendingAction(action *client.PendingAction) { if a.deploymentMode == "DOCKER" { slog.Warn("received update command but running in Docker — ignoring", "targetVersion", action.TargetVersion) + a.updateError = "running in Docker" return } + if action.TargetVersion == a.failedUpdateVersion { + return // already failed for this version, don't retry + } if err := a.handleSelfUpdate(action); err != nil { slog.Error("self-update failed", "error", err) + a.failedUpdateVersion = action.TargetVersion + a.updateError = err.Error() } default: slog.Warn("unknown pending action type", "type", action.Type) @@ -164,6 +172,10 @@ func (a *Agent) sendHeartbeat() { a.mu.Unlock() hb := buildHeartbeat(a.supervisor, a.vectorVersion, a.deploymentMode, results) + if a.updateError != "" { + hb.UpdateError = a.updateError + a.updateError = "" // only report once + } if err := a.client.SendHeartbeat(hb); err != nil { slog.Warn("heartbeat error", "error", err) // Put results back so they retry on the next heartbeat diff --git a/agent/internal/client/client.go b/agent/internal/client/client.go index 0c9d8fe..20303ae 100644 --- a/agent/internal/client/client.go +++ b/agent/internal/client/client.go @@ -202,6 +202,7 @@ type HeartbeatRequest struct { VectorVersion string `json:"vectorVersion,omitempty"` DeploymentMode string `json:"deploymentMode,omitempty"` SampleResults []SampleResultMsg `json:"sampleResults,omitempty"` + UpdateError string `json:"updateError,omitempty"` } // SampleRequestMsg is received from the server via config poll. diff --git a/src/app/api/agent/heartbeat/route.ts b/src/app/api/agent/heartbeat/route.ts index 68ea223..8d98ddd 100644 --- a/src/app/api/agent/heartbeat/route.ts +++ b/src/app/api/agent/heartbeat/route.ts @@ -66,6 +66,7 @@ const heartbeatSchema = z.object({ schema: z.array(z.object({ path: z.string(), type: z.string(), sample: z.string() })).optional(), error: z.string().optional(), })).optional(), + updateError: z.string().optional(), }); let lastCleanup = 0; @@ -113,7 +114,7 @@ export async function POST(request: Request) { ); } - const { pipelines: rawPipelines, hostMetrics, agentVersion, vectorVersion, deploymentMode } = parsed.data; + const { pipelines: rawPipelines, hostMetrics, agentVersion, vectorVersion, deploymentMode, updateError } = parsed.data; // Validate pipeline ownership: only accept pipelines belonging to this agent's environment const validPipelineIds = new Set( @@ -126,9 +127,13 @@ export async function POST(request: Request) { const now = new Date(); - // Check if pendingAction should be cleared (agent has updated to target version) + // Check if pendingAction should be cleared let clearPendingAction = false; - if (agentVersion) { + if (updateError) { + // Agent reported update failure — clear to stop retry loop + clearPendingAction = true; + console.warn("Agent update failed, clearing pending action:", agent.nodeId, updateError); + } else if (agentVersion) { const currentNode = await prisma.vectorNode.findUnique({ where: { id: agent.nodeId }, select: { pendingAction: true }, From 2cd016b4b15569a1638d08cbe20c73b9fda0a9e4 Mon Sep 17 00:00:00 2001 From: TerrifiedBug Date: Thu, 5 Mar 2026 18:24:48 +0000 Subject: [PATCH 2/3] fix: cap updateError field length in heartbeat schema --- src/app/api/agent/heartbeat/route.ts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/app/api/agent/heartbeat/route.ts b/src/app/api/agent/heartbeat/route.ts index 8d98ddd..f0cc20b 100644 --- a/src/app/api/agent/heartbeat/route.ts +++ b/src/app/api/agent/heartbeat/route.ts @@ -66,7 +66,7 @@ const heartbeatSchema = z.object({ schema: z.array(z.object({ path: z.string(), type: z.string(), sample: z.string() })).optional(), error: z.string().optional(), })).optional(), - updateError: z.string().optional(), + updateError: z.string().max(500).optional(), }); let lastCleanup = 0; From cbbc7dd4b04eec05eb4f9daf6cbb2d9dd90f78b6 Mon Sep 17 00:00:00 2001 From: TerrifiedBug Date: Thu, 5 Mar 2026 18:32:47 +0000 Subject: [PATCH 3/3] fix: retry updateError delivery on heartbeat failure Move updateError clearing to the success path so it retries on the next heartbeat if delivery fails, matching the existing sampleResults retry pattern. --- agent/internal/agent/agent.go | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/agent/internal/agent/agent.go b/agent/internal/agent/agent.go index b933110..1631323 100644 --- a/agent/internal/agent/agent.go +++ b/agent/internal/agent/agent.go @@ -172,9 +172,9 @@ func (a *Agent) sendHeartbeat() { a.mu.Unlock() hb := buildHeartbeat(a.supervisor, a.vectorVersion, a.deploymentMode, results) - if a.updateError != "" { - hb.UpdateError = a.updateError - a.updateError = "" // only report once + updateErr := a.updateError + if updateErr != "" { + hb.UpdateError = updateErr } if err := a.client.SendHeartbeat(hb); err != nil { slog.Warn("heartbeat error", "error", err) @@ -185,6 +185,7 @@ func (a *Agent) sendHeartbeat() { a.mu.Unlock() } } else { + a.updateError = "" // clear only after successful delivery slog.Debug("heartbeat sent", "pipelines", len(hb.Pipelines), "sampleResults", len(results)) } }