diff --git a/agent/internal/agent/agent.go b/agent/internal/agent/agent.go index f79c544..1631323 100644 --- a/agent/internal/agent/agent.go +++ b/agent/internal/agent/agent.go @@ -28,8 +28,10 @@ type Agent struct { vectorVersion string deploymentMode string - mu sync.Mutex - sampleResults []client.SampleResultMsg + mu sync.Mutex + sampleResults []client.SampleResultMsg + failedUpdateVersion string // skip retries for this version + updateError string // report failure to server } func New(cfg *config.Config) (*Agent, error) { @@ -146,10 +148,16 @@ func (a *Agent) handlePendingAction(action *client.PendingAction) { if a.deploymentMode == "DOCKER" { slog.Warn("received update command but running in Docker — ignoring", "targetVersion", action.TargetVersion) + a.updateError = "running in Docker" return } + if action.TargetVersion == a.failedUpdateVersion { + return // already failed for this version, don't retry + } if err := a.handleSelfUpdate(action); err != nil { slog.Error("self-update failed", "error", err) + a.failedUpdateVersion = action.TargetVersion + a.updateError = err.Error() } default: slog.Warn("unknown pending action type", "type", action.Type) @@ -164,6 +172,10 @@ func (a *Agent) sendHeartbeat() { a.mu.Unlock() hb := buildHeartbeat(a.supervisor, a.vectorVersion, a.deploymentMode, results) + updateErr := a.updateError + if updateErr != "" { + hb.UpdateError = updateErr + } if err := a.client.SendHeartbeat(hb); err != nil { slog.Warn("heartbeat error", "error", err) // Put results back so they retry on the next heartbeat @@ -173,6 +185,7 @@ func (a *Agent) sendHeartbeat() { a.mu.Unlock() } } else { + a.updateError = "" // clear only after successful delivery slog.Debug("heartbeat sent", "pipelines", len(hb.Pipelines), "sampleResults", len(results)) } } diff --git a/agent/internal/client/client.go b/agent/internal/client/client.go index 0c9d8fe..20303ae 100644 --- a/agent/internal/client/client.go +++ b/agent/internal/client/client.go @@ -202,6 +202,7 @@ type HeartbeatRequest struct { VectorVersion string `json:"vectorVersion,omitempty"` DeploymentMode string `json:"deploymentMode,omitempty"` SampleResults []SampleResultMsg `json:"sampleResults,omitempty"` + UpdateError string `json:"updateError,omitempty"` } // SampleRequestMsg is received from the server via config poll. diff --git a/src/app/api/agent/heartbeat/route.ts b/src/app/api/agent/heartbeat/route.ts index 68ea223..f0cc20b 100644 --- a/src/app/api/agent/heartbeat/route.ts +++ b/src/app/api/agent/heartbeat/route.ts @@ -66,6 +66,7 @@ const heartbeatSchema = z.object({ schema: z.array(z.object({ path: z.string(), type: z.string(), sample: z.string() })).optional(), error: z.string().optional(), })).optional(), + updateError: z.string().max(500).optional(), }); let lastCleanup = 0; @@ -113,7 +114,7 @@ export async function POST(request: Request) { ); } - const { pipelines: rawPipelines, hostMetrics, agentVersion, vectorVersion, deploymentMode } = parsed.data; + const { pipelines: rawPipelines, hostMetrics, agentVersion, vectorVersion, deploymentMode, updateError } = parsed.data; // Validate pipeline ownership: only accept pipelines belonging to this agent's environment const validPipelineIds = new Set( @@ -126,9 +127,13 @@ export async function POST(request: Request) { const now = new Date(); - // Check if pendingAction should be cleared (agent has updated to target version) + // Check if pendingAction should be cleared let clearPendingAction = false; - if (agentVersion) { + if (updateError) { + // Agent reported update failure — clear to stop retry loop + clearPendingAction = true; + console.warn("Agent update failed, clearing pending action:", agent.nodeId, updateError); + } else if (agentVersion) { const currentNode = await prisma.vectorNode.findUnique({ where: { id: agent.nodeId }, select: { pendingAction: true },