-
-
Notifications
You must be signed in to change notification settings - Fork 1
feat(worker): add cron scheduling and durable Redis coordination #48
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
hyp3rd
commented
Feb 3, 2026
- Add cron task registration APIs (standard + durable) with validation and logging
- Wire cron lifecycle into TaskManager; default cron location to UTC and add WithCronLocation
- Add Redis durable coordination: global rate limiting + leader lock using Lua scripts/keys
- Add unit + integration coverage for cron tasks and Redis coordination
- Add robfig/cron/v3 dependency
- Add `workerctl durable enqueue` to create durable tasks (payload from JSON/YAML, file, or base64) - Add unit + integration tests covering payload decoding and enqueue flow - Improve durable delete output labels to include task source and limits - Make durable snapshot scanning more robust (larger scanner buffer, clean paths) - Document enqueue examples and add yaml dependency
- Add cron task registration APIs (standard + durable) with validation and logging - Wire cron lifecycle into TaskManager; default cron location to UTC and add WithCronLocation - Add Redis durable coordination: global rate limiting + leader lock using Lua scripts/keys - Add unit + integration coverage for cron tasks and Redis coordination - Add robfig/cron/v3 dependency
Serialize Publish and Close with an input mutex so sends can’t occur while the input channel is being closed, avoiding “send on closed channel” panics. Removes the recover-based fallback in Publish in favor of explicit locking + closed checks.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Pull request overview
This PR adds cron-based task scheduling and Redis-based coordination features (global rate limiting and leader election) to the worker library.
Changes:
- Adds cron task scheduling with support for both in-memory and durable tasks using robfig/cron/v3
- Implements Redis-based global rate limiting using token bucket algorithm via Lua script
- Implements Redis-based leader lock for single-leader coordination across distributed workers
- Adds
workerctl durable enqueuecommand for creating durable tasks from JSON/YAML/base64 payloads
Reviewed changes
Copilot reviewed 18 out of 19 changed files in this pull request and generated 1 comment.
Show a summary per file
| File | Description |
|---|---|
| cron.go | New file implementing cron task registration, scheduling, and lifecycle management with 5-field and 6-field schedule support |
| worker.go | Integrates cron scheduler lifecycle (init, start, stop) into TaskManager |
| options.go | Adds WithCronLocation option for configuring cron timezone (defaults to UTC) |
| durable_redis.go | Adds global rate limiting and leader lock coordination features with Lua scripts for atomic operations |
| tests/cron_test.go | Unit tests for standard and durable cron tasks |
| tests/durable_redis_integration_test.go | Integration tests for global rate limit and leader lock features |
| cmd/workerctl/durable_enqueue.go | New command for enqueueing durable tasks with JSON/YAML/base64 payload support |
| cmd/workerctl/durable_enqueue_test.go | Unit tests for enqueue command payload handling |
| cmd/workerctl/durable_enqueue_integration_test.go | Integration test for enqueue command |
| cmd/workerctl/durable_snapshot.go | Adds buffer size configuration for handling large snapshot lines and fixes file path handling |
| cmd/workerctl/durable_delete.go | Improves delete target labels to show source queue information |
| cmd/workerctl/durable.go | Registers new enqueue subcommand |
| pkg/worker/v1/*.pb.go | Removes trailing blank lines (formatting) |
| go.mod, go.sum | Adds robfig/cron/v3 dependency and promotes gopkg.in/yaml.v3 from indirect to direct |
| cspell.json | Adds "PEXPIRE" and "robfig" to spell check dictionary |
| README.md | Documents cron scheduling, Redis coordination features, and enqueue command usage |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| func cronParserStandard(_ *time.Location) cron.Parser { | ||
| return cron.NewParser( | ||
| cron.Minute | cron.Hour | cron.Dom | cron.Month | cron.Dow | cron.Descriptor, | ||
| ) | ||
| } | ||
|
|
||
| func cronParserSeconds(_ *time.Location) cron.Parser { | ||
| return cron.NewParser( | ||
| cron.Second | cron.Minute | cron.Hour | cron.Dom | cron.Month | cron.Dow | cron.Descriptor, | ||
| ) | ||
| } |
Copilot
AI
Feb 3, 2026
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The location parameter in cronParserStandard and cronParserSeconds is unused. While cron parsers can be configured with a location, the robfig/cron/v3 library uses the location at the cron instance level (set via WithLocation in initCron), not at the parser level. These parameters should be removed since they're not used.