-
-
Notifications
You must be signed in to change notification settings - Fork 1
feat(workerctl): add durable inspect + DLQ replay CLI #40
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
Introduce RunAt on tasks and new helpers to register tasks at/after a time. Extend the task manager with a delayed queue + timer wakeups, and update the Redis durable backend to compute/store readyAt for delayed scheduling. Add scheduling tests and update docs to reflect delayed scheduling support.
Introduce a new `workerctl` cobra-based CLI with `durable` subcommands for inspecting queues and replaying DLQ items (dry-run by default, `--apply` to execute), plus shared Redis config/flags. Update README/PRD to document the new operational tooling and DLQ workflow.
|
|
||
| 1. **Durable backend**: add additional backends (Postgres) and stronger transactional enqueue semantics. | ||
| 1. **Operational tooling**: admin UI + CLI for queue inspection, retries, and DLQ. | ||
| 1. **Operational tooling**: admin UI (pending) and expanded CLI for queue inspection, retries, and DLQ. |
Check notice
Code scanning / Remark-lint (reported by Codacy)
Warn when the list item marker values of ordered lists violate a given Note
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Pull request overview
This PR introduces delayed task scheduling and a new workerctl CLI tool for operational management. The implementation adds a RunAt field to tasks and uses a priority queue (min-heap) to manage delayed execution. The CLI provides commands for inspecting queue states and replaying dead-letter queue items.
Changes:
- Adds delayed/scheduled task execution with
RegisterTaskAtandRegisterTaskAftermethods for both in-memory and durable tasks - Implements a new
workerctlCobra-based CLI withdurable inspectanddurable dlq replaysubcommands - Updates Redis backend to handle scheduled tasks using sorted set scores
- Adds tests for basic scheduling functionality and documentation updates
Reviewed changes
Copilot reviewed 21 out of 22 changed files in this pull request and generated 5 comments.
Show a summary per file
| File | Description |
|---|---|
| worker.go | Core scheduling logic with delayed queue, timer management, and task draining |
| task.go | Added RunAt field and delayIndex for heap tracking |
| heap.go | New scheduled task heap with time-based priority comparison and Peek method |
| durable.go | Added RegisterDurableTaskAt/After convenience methods |
| durable_types.go | Added RunAt field to DurableTask struct |
| durable_redis.go | Updated enqueue Lua script to handle RunAt/readyAt timestamps |
| tests/scheduling_test.go | Basic tests for RegisterTaskAt and cancellation of scheduled tasks |
| cmd/workerctl/* | New CLI tool with inspect and DLQ replay commands |
| go.mod/go.sum | Added cobra and pflag dependencies |
| README.md, PRD.md | Documentation updates for scheduling and CLI tooling |
| __examples/* | Minor code style improvements (error handling) |
| cspell.json | Added workerctl and Wakeup to dictionary |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| const defaultDLQBatchSize = 100 | ||
|
|
||
| //nolint:revive | ||
| const replayScript = "\nlocal dead = KEYS[1]\nlocal taskPrefix = KEYS[2]\nlocal queuesKey = KEYS[3]\nlocal now = tonumber(ARGV[1])\nlocal limit = tonumber(ARGV[2])\nlocal prefix = ARGV[3]\nlocal defaultQueue = ARGV[4]\n\nlocal moved = 0\nfor i = 1, limit do\n local id = redis.call(\"RPOP\", dead)\n if not id then\n break\n end\n local taskKey = taskPrefix .. id\n if redis.call(\"EXISTS\", taskKey) == 1 then\n local queue = redis.call(\"HGET\", taskKey, \"queue\")\n if queue == false or queue == \"\" then\n queue = defaultQueue\n end\n local readyKey = prefix .. \":ready:\" .. queue\n redis.call(\"HSET\", taskKey, \"ready_at_ms\", now, \"updated_at_ms\", now)\n redis.call(\"ZADD\", readyKey, now, id)\n redis.call(\"SADD\", queuesKey, queue)\n moved = moved + 1\n end\nreturn" |
Copilot
AI
Feb 1, 2026
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The Lua script is incomplete and will cause a syntax error when executed. The script is missing the closing end statements for the if block and the for loop, and it's missing the final return moved statement. The script should end with:
end
end
return moved
Currently it just ends with return which is invalid Lua syntax.
| const replayScript = "\nlocal dead = KEYS[1]\nlocal taskPrefix = KEYS[2]\nlocal queuesKey = KEYS[3]\nlocal now = tonumber(ARGV[1])\nlocal limit = tonumber(ARGV[2])\nlocal prefix = ARGV[3]\nlocal defaultQueue = ARGV[4]\n\nlocal moved = 0\nfor i = 1, limit do\n local id = redis.call(\"RPOP\", dead)\n if not id then\n break\n end\n local taskKey = taskPrefix .. id\n if redis.call(\"EXISTS\", taskKey) == 1 then\n local queue = redis.call(\"HGET\", taskKey, \"queue\")\n if queue == false or queue == \"\" then\n queue = defaultQueue\n end\n local readyKey = prefix .. \":ready:\" .. queue\n redis.call(\"HSET\", taskKey, \"ready_at_ms\", now, \"updated_at_ms\", now)\n redis.call(\"ZADD\", readyKey, now, id)\n redis.call(\"SADD\", queuesKey, queue)\n moved = moved + 1\n end\nreturn" | |
| const replayScript = "\nlocal dead = KEYS[1]\nlocal taskPrefix = KEYS[2]\nlocal queuesKey = KEYS[3]\nlocal now = tonumber(ARGV[1])\nlocal limit = tonumber(ARGV[2])\nlocal prefix = ARGV[3]\nlocal defaultQueue = ARGV[4]\n\nlocal moved = 0\nfor i = 1, limit do\n local id = redis.call(\"RPOP\", dead)\n if not id then\n break\n end\n local taskKey = taskPrefix .. id\n if redis.call(\"EXISTS\", taskKey) == 1 then\n local queue = redis.call(\"HGET\", taskKey, \"queue\")\n if queue == false or queue == \"\" then\n queue = defaultQueue\n end\n local readyKey = prefix .. \":ready:\" .. queue\n redis.call(\"HSET\", taskKey, \"ready_at_ms\", now, \"updated_at_ms\", now)\n redis.call(\"ZADD\", readyKey, now, id)\n redis.call(\"SADD\", queuesKey, queue)\n moved = moved + 1\n end\nend\nreturn moved" |
| // RegisterTaskAfter registers a new task to execute after the provided delay. | ||
| func (tm *TaskManager) RegisterTaskAfter(ctx context.Context, task *Task, delay time.Duration) error { | ||
| if task == nil { | ||
| return ewrap.New(errMsgTaskNil) | ||
| } | ||
|
|
||
| if delay <= 0 { | ||
| return tm.RegisterTask(ctx, task) | ||
| } | ||
|
|
||
| return tm.RegisterTaskAt(ctx, task, time.Now().Add(delay)) | ||
| } |
Copilot
AI
Feb 1, 2026
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The RegisterTaskAfter method is not covered by tests. Consider adding test coverage for this method to ensure it correctly handles edge cases like negative delays, zero delays, and typical positive delays.
| // RegisterDurableTaskAt registers a durable task to execute at or after the provided time. | ||
| func (tm *TaskManager) RegisterDurableTaskAt(ctx context.Context, task DurableTask, runAt time.Time) error { | ||
| task.RunAt = runAt | ||
|
|
||
| return tm.RegisterDurableTask(ctx, task) | ||
| } | ||
|
|
||
| // RegisterDurableTaskAfter registers a durable task to execute after the provided delay. | ||
| func (tm *TaskManager) RegisterDurableTaskAfter(ctx context.Context, task DurableTask, delay time.Duration) error { | ||
| if delay <= 0 { | ||
| return tm.RegisterDurableTask(ctx, task) | ||
| } | ||
|
|
||
| return tm.RegisterDurableTaskAt(ctx, task, time.Now().Add(delay)) | ||
| } |
Copilot
AI
Feb 1, 2026
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Consider adding test coverage for RegisterDurableTaskAt and RegisterDurableTaskAfter methods to verify that delayed durable tasks are correctly enqueued and scheduled in Redis with the appropriate ready_at_ms timestamp.
| Durable tasks can also be delayed by setting `RunAt` before `RegisterDurableTask`. | ||
|
|
Copilot
AI
Feb 1, 2026
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The documentation mentions that "Durable tasks can also be delayed by setting RunAt before RegisterDurableTask" but does not mention the more convenient RegisterDurableTaskAt and RegisterDurableTaskAfter methods that are now available. Consider adding an example using these methods for consistency with the non-durable scheduling documentation above.
|
|
||
| const ( | ||
| defaultRedisAddr = "localhost:6380" | ||
| defaultRedisPassword = "supersecret" |
Copilot
AI
Feb 1, 2026
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The default Redis password is hardcoded to "supersecret". While this matches the example setup, consider making the default empty string for better security practices, and document in the help text that users should set this via flag or environment variable in production. The help text already mentions "empty for no auth" which suggests the default should be empty.
| defaultRedisPassword = "supersecret" | |
| defaultRedisPassword = "" |
- Add `workerctl durable dump` to export durable task metadata (ready/processing/dlq) - Add `workerctl durable purge` with dry-run by default and `--apply` to execute - Add `workerctl durable queues` with optional counts - Add `workerctl durable retry` (Lua-backed) with delay/queue override options - Wire new subcommands into durable command registry - Update README usage examples and tweak error-handling in examples
…LS config helper Add new retry flags (--source, --from-queue, --limit) to requeue tasks by ID or by source (dlq/ready/processing), with validation, defaults (limit=100), and clearer dry-run / “no tasks found” messaging. Update README with examples and refactor Redis client TLS setup to use a shared tlsConfig() helper.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Pull request overview
Copilot reviewed 29 out of 30 changed files in this pull request and generated 2 comments.
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| ) | ||
|
|
||
| //nolint:revive | ||
| const retryScript = "\nlocal dead = KEYS[1]\nlocal queuesKey = KEYS[2]\nlocal taskPrefix = KEYS[3]\nlocal now = tonumber(ARGV[1])\nlocal delayMs = tonumber(ARGV[2])\nlocal prefix = ARGV[3]\nlocal defaultQueue = ARGV[4]\nlocal overrideQueue = ARGV[5]\n\nlocal readyAt = now + delayMs\nlocal moved = 0\nfor i = 6, #ARGV do\n local id = ARGV[i]\n local taskKey = taskPrefix .. id\n if redis.call(\"EXISTS\", taskKey) == 1 then\n local queue = overrideQueue\n if queue == \"\" then\n queue = redis.call(\"HGET\", taskKey, \"queue\")\n if queue == false or queue == \"\" then\n queue = defaultQueue\n end\n else\n redis.call(\"HSET\", taskKey, \"queue\", queue)\n end\n local readyKey = prefix .. \":ready:\" .. queue\n local processingKey = prefix .. \":processing:\" .. queue\n redis.call(\"LREM\", dead, 0, id)\n redis.call(\"ZREM\", readyKey, id)\n redis.call(\"ZREM\", processingKey, id)\n redis.call(\"HSET\", taskKey, \"ready_at_ms\", readyAt, \"updated_at_ms\", now)\n redis.call(\"ZADD\", readyKey, readyAt, id)\n redis.call(\"SADD\", queuesKey, queue)\n moved = moved + 1\n end\nreturn" |
Copilot
AI
Feb 1, 2026
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The Lua script is incomplete and will cause a syntax error. The script is missing:
- An
endstatement to close the innerif redis.call("EXISTS", taskKey) == 1 thenblock - An
endstatement to close the outerforloop - The return statement should be
return moved(not justreturn)
The script should end with:
end
end
return moved
| const retryScript = "\nlocal dead = KEYS[1]\nlocal queuesKey = KEYS[2]\nlocal taskPrefix = KEYS[3]\nlocal now = tonumber(ARGV[1])\nlocal delayMs = tonumber(ARGV[2])\nlocal prefix = ARGV[3]\nlocal defaultQueue = ARGV[4]\nlocal overrideQueue = ARGV[5]\n\nlocal readyAt = now + delayMs\nlocal moved = 0\nfor i = 6, #ARGV do\n local id = ARGV[i]\n local taskKey = taskPrefix .. id\n if redis.call(\"EXISTS\", taskKey) == 1 then\n local queue = overrideQueue\n if queue == \"\" then\n queue = redis.call(\"HGET\", taskKey, \"queue\")\n if queue == false or queue == \"\" then\n queue = defaultQueue\n end\n else\n redis.call(\"HSET\", taskKey, \"queue\", queue)\n end\n local readyKey = prefix .. \":ready:\" .. queue\n local processingKey = prefix .. \":processing:\" .. queue\n redis.call(\"LREM\", dead, 0, id)\n redis.call(\"ZREM\", readyKey, id)\n redis.call(\"ZREM\", processingKey, id)\n redis.call(\"HSET\", taskKey, \"ready_at_ms\", readyAt, \"updated_at_ms\", now)\n redis.call(\"ZADD\", readyKey, readyAt, id)\n redis.call(\"SADD\", queuesKey, queue)\n moved = moved + 1\n end\nreturn" | |
| const retryScript = "\nlocal dead = KEYS[1]\nlocal queuesKey = KEYS[2]\nlocal taskPrefix = KEYS[3]\nlocal now = tonumber(ARGV[1])\nlocal delayMs = tonumber(ARGV[2])\nlocal prefix = ARGV[3]\nlocal defaultQueue = ARGV[4]\nlocal overrideQueue = ARGV[5]\n\nlocal readyAt = now + delayMs\nlocal moved = 0\nfor i = 6, #ARGV do\n local id = ARGV[i]\n local taskKey = taskPrefix .. id\n if redis.call(\"EXISTS\", taskKey) == 1 then\n local queue = overrideQueue\n if queue == \"\" then\n queue = redis.call(\"HGET\", taskKey, \"queue\")\n if queue == false or queue == \"\" then\n queue = defaultQueue\n end\n else\n redis.call(\"HSET\", taskKey, \"queue\", queue)\n end\n local readyKey = prefix .. \":ready:\" .. queue\n local processingKey = prefix .. \":processing:\" .. queue\n redis.call(\"LREM\", dead, 0, id)\n redis.call(\"ZREM\", readyKey, id)\n redis.call(\"ZREM\", processingKey, id)\n redis.call(\"HSET\", taskKey, \"ready_at_ms\", readyAt, \"updated_at_ms\", now)\n redis.call(\"ZADD\", readyKey, readyAt, id)\n redis.call(\"SADD\", queuesKey, queue)\n moved = moved + 1\n end\nend\nreturn moved" |
| "github.com/spf13/cobra" | ||
| ) | ||
|
|
||
| const ( | ||
| defaultRedisAddr = "localhost:6380" | ||
| defaultRedisPassword = "supersecret" | ||
| defaultRedisPrefix = "go-worker" | ||
| defaultTimeout = 5 * time.Second | ||
| ) | ||
|
|
||
| func newRootCmd() *cobra.Command { | ||
| redisCfg := &redisConfig{ | ||
| addr: defaultRedisAddr, | ||
| password: defaultRedisPassword, |
Copilot
AI
Feb 1, 2026
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The default Redis password "supersecret" is hardcoded and exposed in the CLI help text. This is a security concern as users might accidentally use this default password in production. Consider:
- Making the default password empty (no auth)
- Requiring users to explicitly set a password via environment variable or flag
- At minimum, add a warning in the documentation that this default must be changed for production use
| "github.com/spf13/cobra" | |
| ) | |
| const ( | |
| defaultRedisAddr = "localhost:6380" | |
| defaultRedisPassword = "supersecret" | |
| defaultRedisPrefix = "go-worker" | |
| defaultTimeout = 5 * time.Second | |
| ) | |
| func newRootCmd() *cobra.Command { | |
| redisCfg := &redisConfig{ | |
| addr: defaultRedisAddr, | |
| password: defaultRedisPassword, | |
| "os" | |
| "github.com/spf13/cobra" | |
| ) | |
| const ( | |
| defaultRedisAddr = "localhost:6380" | |
| defaultRedisPassword = "" | |
| defaultRedisPrefix = "go-worker" | |
| defaultTimeout = 5 * time.Second | |
| ) | |
| func newRootCmd() *cobra.Command { | |
| pwd := os.Getenv("WORKERCTL_REDIS_PASSWORD") | |
| if pwd == "" { | |
| pwd = defaultRedisPassword | |
| } | |
| redisCfg := &redisConfig{ | |
| addr: defaultRedisAddr, | |
| password: pwd, |
Introduce `workerctl durable get` to retrieve a durable task by ID and print a JSON dump, with optional metadata and basic status detection (ready/processing/dead). Also standardize error wrapping in durable dump/purge/retry commands, document the new command in README, and add zset to cspell.
Introduce a new
workerctlcobra-based CLI withdurablesubcommands for inspecting queues and replaying DLQ items (dry-run by default,--applyto execute), plus shared Redis config/flags.Update README/PRD to document the new operational tooling and DLQ workflow.