Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 14 additions & 0 deletions executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,9 @@ type TaskExecutor interface {
Shutdown() chan bool
}

// An executor schedules and executes tasks.
// It also reschedules fixed-rate and fixed-delay periodic tasks,
// though trigger tasks will reschedule themselves.
type SimpleTaskExecutor struct {
nextSequence int
isShutdown bool
Expand Down Expand Up @@ -206,6 +209,7 @@ func (executor *SimpleTaskExecutor) run() {
executor.taskQueue = append(executor.taskQueue, rescheduledTask)
case stoppedChan := <-executor.shutdownChannel:
executor.timer.Stop()
// Wait for all current tasks to finish executing.
executor.taskWaitGroup.Wait()
stoppedChan <- true
return
Expand All @@ -231,16 +235,26 @@ func (executor *SimpleTaskExecutor) startTask(scheduledRunnableTask *ScheduledRu

executor.taskWaitGroup.Done()

// If this task is non-periodic, it should be cancelled after execution.
if !scheduledRunnableTask.isPeriodic() {
scheduledRunnableTask.Cancel()
} else {
// If the task is periodic but not at a fixed rate,
// then it must be a fixed delay task. This means
// that the period is measured from the end of the previous
// execution.
if !scheduledRunnableTask.isFixedRate() {
scheduledRunnableTask.triggerTime = executor.calculateTriggerTime(scheduledRunnableTask.period)
executor.rescheduleTaskChannel <- scheduledRunnableTask
}
}
}()

// If the task is a fixed-rate periodic task,
// then we can schedule the next task
// before we begin executing the current task.
// We assume that the triggerTime was incremented
// before startTask() was called.
if scheduledRunnableTask.isPeriodic() && scheduledRunnableTask.isFixedRate() {
executor.rescheduleTaskChannel <- scheduledRunnableTask
}
Expand Down
3 changes: 3 additions & 0 deletions scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,9 @@ import (
"time"
)

// A TaskScheduler is the simplified top-level object
// that most user code will interface with.
// All state is handled internally by the executor.
type TaskScheduler interface {
Schedule(task Task, options ...Option) (ScheduledTask, error)
ScheduleWithCron(task Task, expression string, options ...Option) (ScheduledTask, error)
Expand Down
16 changes: 16 additions & 0 deletions task.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,12 @@ import (
"time"
)

// A Task represents a function which will be executed by the executor.
type Task func(ctx context.Context)

// A SchedulerTask represents a task that is about to be scheduled.
// This struct supports setting values via option functions
// such as WithTime() and WithLocation().
type SchedulerTask struct {
task Task
startTime time.Time
Expand Down Expand Up @@ -90,11 +94,18 @@ func WithLocation(location string) Option {
}
}

// A ScheduledTask is a minimal interface that represents
// one-off, fixed period, and dynamic period tasks
// that have been scheduled.
type ScheduledTask interface {
Cancel()
IsCancelled() bool
}

// A ScheduledRunnableTask either represents a fixed-rate task
// or a fixed-delay task. In either case, finding the time of
// next execution is a fairly simple operation.
// For more advanced cases, see TriggerTask.
type ScheduledRunnableTask struct {
id int
task Task
Expand Down Expand Up @@ -165,6 +176,8 @@ func (queue ScheduledTaskQueue) SorByTriggerTime() {
sort.Sort(queue)
}

// A TriggerTask represents a task which re-schedules itself.
// Useful for dynamic periods, such as those represented via CRON expressions.
type TriggerTask struct {
task Task
currentScheduledTask *ScheduledRunnableTask
Expand Down Expand Up @@ -208,6 +221,8 @@ func (task *TriggerTask) IsCancelled() bool {
return task.currentScheduledTask.IsCancelled()
}

// Schedule the task onto an executor. Note that when the task is run,
// it schedules itself again.
func (task *TriggerTask) Schedule() (ScheduledTask, error) {
task.triggerContextMu.Lock()
defer task.triggerContextMu.Unlock()
Expand All @@ -230,6 +245,7 @@ func (task *TriggerTask) Schedule() (ScheduledTask, error) {
return task, nil
}

// Run the trigger task and schedule it again.
func (task *TriggerTask) Run(ctx context.Context) {
task.triggerContextMu.Lock()

Expand Down
6 changes: 6 additions & 0 deletions trigger.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,10 @@ package chrono

import "time"

// A trigger context is a store of previous times
// that can be used to calculate the next trigger time.
// For example, if our schedule runs every 2 days,
// it is important to know which day our schedule started.
type TriggerContext interface {
LastCompletionTime() time.Time
LastExecutionTime() time.Time
Expand Down Expand Up @@ -36,6 +40,8 @@ func (ctx *SimpleTriggerContext) LastTriggeredExecutionTime() time.Time {
return ctx.lastTriggeredExecutionTime
}

// A trigger is a strategy which can be used to generate the next
// execution time, when supplied with context of previous times.
type Trigger interface {
NextExecutionTime(ctx TriggerContext) time.Time
}
Expand Down