From 3ef98458f67ef66a814c3af4886c7f6de2e1a8b6 Mon Sep 17 00:00:00 2001 From: Stuart Douglas Date: Tue, 14 Apr 2026 14:59:31 +1000 Subject: [PATCH] jobscheduler: recover panics in worker goroutines Panics in scheduled jobs (e.g. from metadatadb, logging, or feature flags) would crash the entire process with exit code 2. Wrap job execution in a deferred recover so panics are captured as errors with a stack trace, logged, and the worker stays alive. Co-authored-by: Amp Amp-Thread-ID: https://ampcode.com/threads/T-019d8a56-4eee-7490-8132-1d5bbea7d251 --- internal/jobscheduler/jobs.go | 52 +++++++++++++++++++++++------------ 1 file changed, 35 insertions(+), 17 deletions(-) diff --git a/internal/jobscheduler/jobs.go b/internal/jobscheduler/jobs.go index 61fe365..7da63f2 100644 --- a/internal/jobscheduler/jobs.go +++ b/internal/jobscheduler/jobs.go @@ -3,6 +3,7 @@ package jobscheduler import ( "context" + "log/slog" "runtime" "strings" "sync" @@ -214,27 +215,44 @@ func (q *RootScheduler) worker(ctx context.Context, id int) { if !ok { continue } - jobAttrs := attribute.String("job.type", jobType(job.id)) - start := time.Now() - logger.InfoContext(ctx, "Starting job", "job", job) - err := job.run(ctx) - elapsed := time.Since(start) - status := "success" - if err != nil { - status = "error" - logger.ErrorContext(ctx, "Job failed", "job", job, "error", err, "elapsed", elapsed) - } else { - logger.InfoContext(ctx, "Job completed", "job", job, "elapsed", elapsed) - } - statusAttr := attribute.String("status", status) - q.metrics.jobsTotal.Add(ctx, 1, metric.WithAttributes(jobAttrs, statusAttr)) - q.metrics.jobDuration.Record(ctx, elapsed.Seconds(), metric.WithAttributes(jobAttrs, statusAttr)) - q.markQueueInactive(job.queue) - q.workAvailable <- true + q.runJob(ctx, logger, job) } } } +func (q *RootScheduler) runJob(ctx context.Context, logger *slog.Logger, job queueJob) { + defer q.markQueueInactive(job.queue) + defer func() { q.workAvailable <- true }() + + jobAttrs := attribute.String("job.type", jobType(job.id)) + start := time.Now() + logger.InfoContext(ctx, "Starting job", "job", job) + + var err error + func() { + defer func() { + if r := recover(); r != nil { + stack := make([]byte, 4096) + stack = stack[:runtime.Stack(stack, false)] + err = errors.Errorf("panic: %v\n%s", r, stack) + } + }() + err = job.run(ctx) + }() + + elapsed := time.Since(start) + status := "success" + if err != nil { + status = "error" + logger.ErrorContext(ctx, "Job failed", "job", job, "error", err, "elapsed", elapsed) + } else { + logger.InfoContext(ctx, "Job completed", "job", job, "elapsed", elapsed) + } + statusAttr := attribute.String("status", status) + q.metrics.jobsTotal.Add(ctx, 1, metric.WithAttributes(jobAttrs, statusAttr)) + q.metrics.jobDuration.Record(ctx, elapsed.Seconds(), metric.WithAttributes(jobAttrs, statusAttr)) +} + // jobType extracts a normalised job type from the job ID for metric labels. func jobType(id string) string { switch {