-
-
Notifications
You must be signed in to change notification settings - Fork 1
feat(workerctl): add durable pause/resume and snapshot import/export #45
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
- Add `workerctl completion` command - Implement durable pause/resume commands (dry-run by default, `--apply` to persist) - Add snapshot export/import (stdin/stdout, limits, queue/DLQ options) - Add `durable stats --watch` and adjust stats output for watch-friendly JSON - Add Redis paused-key handling so durable dequeue can be skipped when paused - Update README and cspell terms
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Pull request overview
This PR extends the durable Redis worker tooling and workerctl CLI with operational controls (pause/resume), snapshot export/import, stats watching, and shell completion, plus a small test robustness tweak and Redis pause handling in the backend.
Changes:
- Backend: add a durable “paused” key in Redis and
RedisDurableBackend.Dequeuelogic to skip dequeues when paused; slightly relax a test timeout. - CLI (
workerctl): wire newdurable pause/durable resumecommands,durable snapshot export/import(JSONL),durable stats --watch, and a top-levelcompletioncommand; share Redis key helpers. - Docs/tooling: update README usage examples and spelling dictionary entries (cspell) to cover the new commands and Redis operations.
Reviewed changes
Copilot reviewed 12 out of 12 changed files in this pull request and generated 5 comments.
Show a summary per file
| File | Description |
|---|---|
tests/durable_test.go |
Increases a result wait timeout via a new testResultTimeout constant to reduce flakiness in durable loop tests. |
durable_redis.go |
Introduces a paused Redis key and isPaused helper so Dequeue returns no leases while paused, integrating with the new CLI pause/resume controls. |
cspell.json |
Adds new words (e.g., Redis commands and CLI-related terms) to prevent false-positive spell-check failures. |
cmd/workerctl/root.go |
Registers the new completion subcommand on the root workerctl command. |
cmd/workerctl/redis.go |
Adds a pausedKey helper to compute the durable pause key consistently with other Redis key helpers. |
cmd/workerctl/durable_stats.go |
Refactors stats collection into collectDurableStats, adds a --watch option, and outputs periodic JSON or human-readable stats until interrupted. |
cmd/workerctl/durable_snapshot.go |
Implements durable snapshot export and durable snapshot import commands, including JSONL encoding/decoding, interaction with ready/processing/DLQ structures, and Redis write-back logic. |
cmd/workerctl/durable_pause.go |
Adds durable pause and durable resume commands, with dry-run by default and an --apply flag that toggles the shared pause key in Redis. |
cmd/workerctl/durable_delete.go |
Switches one error path to use ewrap.Wrap for consistency with surrounding error handling. |
cmd/workerctl/durable.go |
Wires in the new durable subcommands (pause, resume, snapshot) under workerctl durable. |
cmd/workerctl/completion.go |
Adds a completion command to generate Bash/Zsh/Fish/PowerShell completion scripts based on the root Cobra command. |
README.md |
Documents durable stats --watch, durable pause/resume, snapshot export/import workflows, and the shell completion generation workflow. |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| func openInput(path string) (*os.File, func(), error) { | ||
| if path == "" || path == "-" { | ||
| return os.Stdin, func() {}, nil | ||
| } | ||
|
|
||
| file, err := os.Open(filepath.Base(path)) | ||
| if err != nil { | ||
| return nil, nil, ewrap.Wrap(err, "open input") |
Copilot
AI
Feb 2, 2026
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
openInput also uses filepath.Base(path), so --in values that include directories will be opened from the current working directory instead of the specified location. This can cause confusing "file not found" behaviour when the user points at a file outside the current directory; the import path handling should honor the full path rather than only the basename.
| return file, func() { | ||
| err = file.Close() | ||
| if err != nil { | ||
| fmt.Fprintf(os.Stderr, "close output: %v\n", err) |
Copilot
AI
Feb 2, 2026
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The error message printed in the openInput close function says "close output" even though it is closing the input file, which can be misleading when debugging issues around input handling. Consider updating this message to reference the input file to match the operation being performed.
| fmt.Fprintf(os.Stderr, "close output: %v\n", err) | |
| fmt.Fprintf(os.Stderr, "close input: %v\n", err) |
| scanner := bufio.NewScanner(reader) | ||
| entries := make([]snapshotEntry, 0, snapshotEntriesInitCap) | ||
|
|
||
| for scanner.Scan() { | ||
| line := strings.TrimSpace(scanner.Text()) | ||
| if line == "" { | ||
| continue | ||
| } | ||
|
|
||
| var entry snapshotEntry | ||
|
|
||
| err := json.Unmarshal([]byte(line), &entry) | ||
| if err != nil { | ||
| return ewrap.Wrapf(err, "decode snapshot entry") | ||
| } | ||
|
|
||
| entries = append(entries, entry) | ||
| } | ||
|
|
||
| err = scanner.Err() | ||
| if err != nil { | ||
| return ewrap.Wrap(err, "read snapshot") | ||
| } | ||
|
|
||
| if !opts.apply { | ||
| fmt.Fprintf(os.Stdout, "dry-run: use --apply to import %d snapshot item(s)\n", len(entries)) | ||
|
|
||
| return nil | ||
| } | ||
|
|
||
| client, err := cfg.client() | ||
| if err != nil { | ||
| return err | ||
| } | ||
| defer client.Close() | ||
|
|
||
| ctx, cancel := cfg.context() | ||
| defer cancel() | ||
|
|
||
| prefix := keyPrefix(cfg.prefix) | ||
| imported := 0 | ||
|
|
||
| for _, entry := range entries { | ||
| err := importSnapshotEntry(ctx, client, prefix, entry) | ||
| if err != nil { | ||
| return err | ||
| } | ||
|
|
||
| imported++ | ||
| } | ||
|
|
Copilot
AI
Feb 2, 2026
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
runSnapshotImport accumulates all snapshot entries into a slice before applying them, so importing a large snapshot file can lead to high memory usage or even out‑of‑memory conditions. To make imports more scalable, consider processing entries in a streaming fashion (e.g. applying each decoded line as you scan) instead of materializing the entire snapshot in memory first.
| func openOutput(path string) (*os.File, func(), error) { | ||
| if path == "" || path == "-" { | ||
| return os.Stdout, func() {}, nil | ||
| } | ||
|
|
||
| file, err := os.Create(filepath.Base(path)) | ||
| if err != nil { | ||
| return nil, nil, ewrap.Wrap(err, "open output") | ||
| } | ||
|
|
Copilot
AI
Feb 2, 2026
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
openOutput uses filepath.Base(path) when creating the file, which strips any directory components from the user‑supplied path and always writes into the current working directory. This means --out ./backups/snapshot.jsonl will ignore the ./backups directory and can produce files in unexpected locations; the function should respect the full path instead of only the basename.
workerctl completioncommand--applyto persist)durable stats --watchand adjust stats output for watch-friendly JSON