Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 15 additions & 2 deletions agent/internal/agent/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,10 @@ type Agent struct {
vectorVersion string
deploymentMode string

mu sync.Mutex
sampleResults []client.SampleResultMsg
mu sync.Mutex
sampleResults []client.SampleResultMsg
failedUpdateVersion string // skip retries for this version
updateError string // report failure to server
}

func New(cfg *config.Config) (*Agent, error) {
Expand Down Expand Up @@ -146,10 +148,16 @@ func (a *Agent) handlePendingAction(action *client.PendingAction) {
if a.deploymentMode == "DOCKER" {
slog.Warn("received update command but running in Docker — ignoring",
"targetVersion", action.TargetVersion)
a.updateError = "running in Docker"
return
}
if action.TargetVersion == a.failedUpdateVersion {
return // already failed for this version, don't retry
}
if err := a.handleSelfUpdate(action); err != nil {
slog.Error("self-update failed", "error", err)
a.failedUpdateVersion = action.TargetVersion
a.updateError = err.Error()
}
default:
slog.Warn("unknown pending action type", "type", action.Type)
Expand All @@ -164,6 +172,10 @@ func (a *Agent) sendHeartbeat() {
a.mu.Unlock()

hb := buildHeartbeat(a.supervisor, a.vectorVersion, a.deploymentMode, results)
updateErr := a.updateError
if updateErr != "" {
hb.UpdateError = updateErr
}
if err := a.client.SendHeartbeat(hb); err != nil {
slog.Warn("heartbeat error", "error", err)
// Put results back so they retry on the next heartbeat
Expand All @@ -173,6 +185,7 @@ func (a *Agent) sendHeartbeat() {
a.mu.Unlock()
}
} else {
a.updateError = "" // clear only after successful delivery
slog.Debug("heartbeat sent", "pipelines", len(hb.Pipelines), "sampleResults", len(results))
}
}
Expand Down
1 change: 1 addition & 0 deletions agent/internal/client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -202,6 +202,7 @@ type HeartbeatRequest struct {
VectorVersion string `json:"vectorVersion,omitempty"`
DeploymentMode string `json:"deploymentMode,omitempty"`
SampleResults []SampleResultMsg `json:"sampleResults,omitempty"`
UpdateError string `json:"updateError,omitempty"`
}

// SampleRequestMsg is received from the server via config poll.
Expand Down
11 changes: 8 additions & 3 deletions src/app/api/agent/heartbeat/route.ts
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@ const heartbeatSchema = z.object({
schema: z.array(z.object({ path: z.string(), type: z.string(), sample: z.string() })).optional(),
error: z.string().optional(),
})).optional(),
updateError: z.string().max(500).optional(),
});

let lastCleanup = 0;
Expand Down Expand Up @@ -113,7 +114,7 @@ export async function POST(request: Request) {
);
}

const { pipelines: rawPipelines, hostMetrics, agentVersion, vectorVersion, deploymentMode } = parsed.data;
const { pipelines: rawPipelines, hostMetrics, agentVersion, vectorVersion, deploymentMode, updateError } = parsed.data;

// Validate pipeline ownership: only accept pipelines belonging to this agent's environment
const validPipelineIds = new Set(
Expand All @@ -126,9 +127,13 @@ export async function POST(request: Request) {

const now = new Date();

// Check if pendingAction should be cleared (agent has updated to target version)
// Check if pendingAction should be cleared
let clearPendingAction = false;
if (agentVersion) {
if (updateError) {
// Agent reported update failure — clear to stop retry loop
clearPendingAction = true;
console.warn("Agent update failed, clearing pending action:", agent.nodeId, updateError);
} else if (agentVersion) {
const currentNode = await prisma.vectorNode.findUnique({
where: { id: agent.nodeId },
select: { pendingAction: true },
Expand Down
Loading