Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
37 commits
Select commit Hold shift + click to select a range
605395d
core: support batch job timeouts via new field max_run_duration
pkazmierczak Apr 7, 2026
2388417
refine batch timeout watcher
pkazmierczak Apr 7, 2026
617fe2b
client status
pkazmierczak Apr 7, 2026
1f4c4db
integration-style test
pkazmierczak Apr 7, 2026
180aea3
event stream
pkazmierczak Apr 7, 2026
5fff828
AllocTimeoutReasonMaxRunDuration
pkazmierczak Apr 7, 2026
8fbfff3
batch timeout watcher improvements
pkazmierczak Apr 7, 2026
697c5c3
added batchtimeout package to test-core.json
pkazmierczak Apr 7, 2026
a31525a
move to allocrunner
pkazmierczak Apr 8, 2026
2a7fc3b
remove nomad/batchtimeout
pkazmierczak Apr 8, 2026
e9d30e7
timer tweaks
pkazmierczak Apr 8, 2026
d560d47
max_run_duration_hook
pkazmierczak Apr 8, 2026
737f23d
TestJobs_ApiJobToStructsJob
pkazmierczak Apr 8, 2026
48a9652
don't need that interface
pkazmierczak Apr 8, 2026
eda5a4f
it's actually not a hook
pkazmierczak Apr 9, 2026
ab7565d
refinements
pkazmierczak Apr 9, 2026
c44ba70
corrections
pkazmierczak Apr 9, 2026
9abba4f
test refactror
pkazmierczak Apr 9, 2026
0aecff5
tidying up
pkazmierczak Apr 9, 2026
f454526
more tidying up
pkazmierczak Apr 9, 2026
9b735a0
bugfix
pkazmierczak Apr 9, 2026
7e0509a
unify fully running since
pkazmierczak Apr 13, 2026
ea2f1ad
hook alloc can never be nil
pkazmierczak Apr 13, 2026
f996731
even stream corrections
pkazmierczak Apr 13, 2026
a521239
desired status comment
pkazmierczak Apr 14, 2026
2f420ac
allocrunner unnecessary copy fix
pkazmierczak Apr 14, 2026
ae2d08a
fix UTC and interface
pkazmierczak Apr 14, 2026
bc6344c
max_run_duration cleanups (thanks for the comments @mismithhisler)
pkazmierczak Apr 14, 2026
f246b59
client: enforce max_run_duration regardless of task state (#27827)
pkazmierczak Apr 17, 2026
3b6f4df
allow for updates to max_run_duration during job run
pkazmierczak Apr 17, 2026
53f6210
metrics
pkazmierczak Apr 17, 2026
7841eda
better logs
pkazmierczak Apr 17, 2026
ce45ae8
fix clock arming bug
pkazmierczak Apr 17, 2026
6380561
don't start poststop tasks if the timeout has passed
pkazmierczak Apr 17, 2026
879219a
TestMaxRunDurationHook_EmitMetrics correction
pkazmierczak Apr 17, 2026
6a6e6a6
TestServiceSched_JobModify_MaxRunDuration_InPlace fix
pkazmierczak Apr 17, 2026
e34dbc9
yet another fix to TestMaxRunDurationHook_EmitMetrics
pkazmierczak Apr 17, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions api/tasks.go
Original file line number Diff line number Diff line change
Expand Up @@ -507,6 +507,7 @@ type TaskGroup struct {
Meta map[string]string `hcl:"meta,block"`
Services []*Service `hcl:"service,block"`
ShutdownDelay *time.Duration `mapstructure:"shutdown_delay" hcl:"shutdown_delay,optional"`
MaxRunDuration *time.Duration `mapstructure:"max_run_duration" hcl:"max_run_duration,optional"`
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wonder if we'd regret naming this ttl? It is literally the amount time we allow the allocation to live, but it might be too ambiguous of a name for the group block. You could interpret the group.ttl as including time sitting in the eval queue as well or something, whereas the run aspect of this makes it more obvious.

So I think this is fine, but since jobspec fields are our core user interface, I think they're worth careful consideration. 🤔

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah I don't know about this. ttl has a loaded meaning in my opinion. We can think of a better name for this setting, sure, but I don't think it should be ttl.

// Deprecated: StopAfterClientDisconnect is deprecated in Nomad 1.8 and ignored in Nomad 1.10. Use Disconnect.StopOnClientAfter.
StopAfterClientDisconnect *time.Duration `mapstructure:"stop_after_client_disconnect" hcl:"stop_after_client_disconnect,optional"`
// Deprecated: MaxClientDisconnect is deprecated in Nomad 1.8.0 and ignored in Nomad 1.10. Use Disconnect.LostAfter.
Expand Down
57 changes: 52 additions & 5 deletions client/allocrunner/alloc_runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -737,8 +737,14 @@ func (ar *allocRunner) killTasks() map[string]*structs.TaskState {
return nil
}

return structs.NewTaskEvent(structs.TaskKilling).
event := structs.NewTaskEvent(structs.TaskKilling).
SetKillTimeout(tr.Task().KillTimeout, ar.clientConfig.MaxKillTimeout)

if ar.state.MaxRunDurationExceeded {
event.SetDisplayMessage(structs.AllocTimeoutReasonMaxRunDuration)
}

return event
}

// Kill leader first, synchronously
Expand Down Expand Up @@ -809,6 +815,23 @@ func (ar *allocRunner) killTasks() map[string]*structs.TaskState {
}
wg.Wait()

// Skip poststop tasks entirely when max_run_duration has been exceeded so
// they are not started after the allocation has timed out.
if ar.state.MaxRunDurationExceeded {
for name, tr := range ar.tasks {
if !tr.IsPoststopTask() {
continue
}

state := tr.TaskState()
if state != nil {
states[name] = state
}
}

return states
}

// Perform no action on post stop tasks, but retain their states if they exist. This
// commonly happens at the time of alloc GC from the client node.
for name, tr := range ar.tasks {
Expand Down Expand Up @@ -845,7 +868,10 @@ func (ar *allocRunner) clientAlloc(taskStates map[string]*structs.TaskState) *st
}

// Compute the ClientStatus
if ar.state.ClientStatus != "" {
if ar.state.MaxRunDurationExceeded {
a.ClientStatus = structs.AllocClientStatusComplete
a.ClientDescription = structs.AllocTimeoutReasonMaxRunDuration
} else if ar.state.ClientStatus != "" {
// The client status is being forced
a.ClientStatus, a.ClientDescription = ar.state.ClientStatus, ar.state.ClientDescription
} else {
Expand Down Expand Up @@ -983,7 +1009,7 @@ func (ar *allocRunner) AllocState() *state.State {
// If TaskStateUpdated has not been called yet, ar.state.TaskStates
// won't be set as it is not the canonical source of TaskStates.
if len(state.TaskStates) == 0 {
ar.state.TaskStates = make(map[string]*structs.TaskState, len(ar.tasks))
state.TaskStates = make(map[string]*structs.TaskState, len(ar.tasks))
for k, tr := range ar.tasks {
state.TaskStates[k] = tr.TaskState()
}
Expand Down Expand Up @@ -1081,6 +1107,27 @@ func (ar *allocRunner) Listener() *cstructs.AllocListener {
return ar.allocBroadcaster.Listen()
}

func (ar *allocRunner) EnforceMaxRunDurationTimeout(deadline time.Time) {
now := time.Now()

if ar.isShuttingDown() {
return
}

if now.Before(deadline) {
return
}

ar.stateLock.Lock()
ar.state.MaxRunDurationExceeded = true
ar.state.ClientStatus = structs.AllocClientStatusComplete
ar.state.ClientDescription = structs.AllocTimeoutReasonMaxRunDuration
ar.stateLock.Unlock()

ar.logger.Debug("allocation exceeded max_run_duration, killing tasks", "deadline", deadline)
ar.killTasks()
}

func (ar *allocRunner) destroyImpl() {
// Stop any running tasks and persist states in case the client is
// shutdown before Destroy finishes.
Expand Down Expand Up @@ -1255,8 +1302,8 @@ func (ar *allocRunner) Shutdown() {
go func() {
ar.logger.Trace("shutting down")

// Shutdown tasks gracefully if they were run
wg := sync.WaitGroup{}
// Shutdown task runners
var wg sync.WaitGroup
for _, tr := range ar.tasks {
wg.Add(1)
go func(tr *taskrunner.TaskRunner) {
Expand Down
1 change: 1 addition & 0 deletions client/allocrunner/alloc_runner_hooks.go
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,7 @@ func (ar *allocRunner) initRunnerHooks(config *clientconfig.Config) error {
ar.runnerHooks = []interfaces.RunnerHook{
newIdentityHook(hookLogger, ar.widmgr),
newAllocDirHook(hookLogger, ar.allocDir),
newMaxRunDurationHook(hookLogger, alloc, ar.clientBaseLabels, ar.EnforceMaxRunDurationTimeout),
newConsulHook(consulHookConfig{
alloc: ar.alloc,
allocdir: ar.allocDir,
Expand Down
207 changes: 207 additions & 0 deletions client/allocrunner/alloc_runner_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -497,6 +497,95 @@ func TestAllocRunner_Lifecycle_Poststop(t *testing.T) {

}

func TestAllocRunner_MaxRunDuration_SkipsPoststopTasks(t *testing.T) {
ci.Parallel(t)

alloc := mock.LifecycleAlloc()
tr := alloc.AllocatedResources.Tasks[alloc.Job.TaskGroups[0].Tasks[0].Name]

alloc.Job.Type = structs.JobTypeBatch
maxRunDuration := 50 * time.Millisecond
alloc.Job.TaskGroups[0].MaxRunDuration = &maxRunDuration

mainTask := alloc.Job.TaskGroups[0].Tasks[0]
mainTask.Config["run_for"] = "100s"
mainTask.KillTimeout = 10 * time.Millisecond

poststopTask := alloc.Job.TaskGroups[0].Tasks[1]
poststopTask.Name = "poststop"
poststopTask.Lifecycle.Hook = structs.TaskLifecycleHookPoststop
poststopTask.Config["run_for"] = "10s"

alloc.Job.TaskGroups[0].Tasks = []*structs.Task{mainTask, poststopTask}
alloc.AllocatedResources.Tasks = map[string]*structs.AllocatedTaskResources{
mainTask.Name: tr,
poststopTask.Name: tr,
}

conf, cleanup := testAllocRunnerConfig(t, alloc)
defer cleanup()

arIface, err := NewAllocRunner(conf)
must.NoError(t, err)
ar := arIface.(*allocRunner)

go ar.Run()
defer destroy(ar)

upd := conf.StateUpdater.(*MockStateUpdater)

testutil.WaitForResult(func() (bool, error) {
last := upd.Last()
if last == nil {
return false, fmt.Errorf("no updates")
}

if last.ClientStatus != structs.AllocClientStatusRunning {
return false, fmt.Errorf("expected alloc to be running not %s", last.ClientStatus)
}

if s := last.TaskStates[mainTask.Name].State; s != structs.TaskStateRunning {
return false, fmt.Errorf("expected main task to be running not %s", s)
}

if s := last.TaskStates[poststopTask.Name].State; s != structs.TaskStatePending {
return false, fmt.Errorf("expected poststop task to be pending not %s", s)
}

return true, nil
}, func(err error) {
t.Fatalf("error waiting for initial state:\n%v", err)
})

testutil.WaitForResult(func() (bool, error) {
last := upd.Last()
if last == nil {
return false, fmt.Errorf("no updates")
}

if last.ClientStatus != structs.AllocClientStatusComplete {
return false, fmt.Errorf("expected alloc to be complete not %s", last.ClientStatus)
}

if last.ClientDescription != structs.AllocTimeoutReasonMaxRunDuration {
return false, fmt.Errorf("expected alloc description %q not %q", structs.AllocTimeoutReasonMaxRunDuration, last.ClientDescription)
}

if s := last.TaskStates[mainTask.Name].State; s != structs.TaskStateDead {
return false, fmt.Errorf("expected main task to be dead not %s", s)
}

if s := last.TaskStates[poststopTask.Name].State; s != structs.TaskStatePending {
return false, fmt.Errorf("expected poststop task to remain pending not %s", s)
}

return true, nil
}, func(err error) {
last := upd.Last()
t.Fatalf("error waiting for max_run_duration state:\n%v\nlast=%#v", err, last)
})
}

func TestAllocRunner_Lifecycle_Restart(t *testing.T) {
ci.Parallel(t)

Expand Down Expand Up @@ -2676,6 +2765,124 @@ func TestAllocRunner_GetUpdatePriority(t *testing.T) {
must.Eq(t, cstructs.AllocUpdatePriorityUrgent, ar.GetUpdatePriority(calloc))
}

func TestAllocRunner_MaxRunDuration_StopsExpiredAlloc(t *testing.T) {
ci.Parallel(t)

alloc := mock.BatchAlloc()
task := alloc.Job.TaskGroups[0].Tasks[0]
task.Driver = "mock_driver"
task.Config = map[string]interface{}{
"run_for": "10s",
}
task.KillTimeout = 10 * time.Millisecond
maxRunDuration := 50 * time.Millisecond
alloc.Job.TaskGroups[0].MaxRunDuration = &maxRunDuration

conf, cleanup := testAllocRunnerConfig(t, alloc)
defer cleanup()

arIface, err := NewAllocRunner(conf)
must.NoError(t, err)
ar := arIface.(*allocRunner)

go ar.Run()
defer destroy(ar)

testutil.WaitForResult(func() (bool, error) {
state := ar.AllocState()
if state == nil {
return false, fmt.Errorf("no alloc state")
}
if state.ClientStatus != structs.AllocClientStatusComplete {
return false, fmt.Errorf("got status %v; want %v", state.ClientStatus, structs.AllocClientStatusComplete)
}
if state.ClientDescription != structs.AllocTimeoutReasonMaxRunDuration {
return false, fmt.Errorf("got description %q; want %q", state.ClientDescription, structs.AllocTimeoutReasonMaxRunDuration)
}
if !state.MaxRunDurationExceeded {
return false, fmt.Errorf("max run duration was not marked exceeded")
}
return true, nil
}, func(err error) {
state := ar.AllocState()
t.Fatalf("timed out waiting for alloc runner max_run_duration enforcement: %v; state=%#v", err, state)
})
}

func TestAllocRunner_MaxRunDuration_UpdateExtendsRunningAlloc(t *testing.T) {
ci.Parallel(t)

alloc := mock.BatchAlloc()
task := alloc.Job.TaskGroups[0].Tasks[0]
task.Driver = "mock_driver"
task.Config = map[string]interface{}{
"run_for": "10s",
}
task.KillTimeout = 10 * time.Millisecond

initialMaxRunDuration := 75 * time.Millisecond
alloc.Job.TaskGroups[0].MaxRunDuration = &initialMaxRunDuration

conf, cleanup := testAllocRunnerConfig(t, alloc)
defer cleanup()

arIface, err := NewAllocRunner(conf)
must.NoError(t, err)
ar := arIface.(*allocRunner)

go ar.Run()
defer destroy(ar)

testutil.WaitForResult(func() (bool, error) {
state := ar.AllocState()
if state == nil {
return false, fmt.Errorf("no alloc state")
}
if state.ClientStatus != structs.AllocClientStatusRunning {
return false, fmt.Errorf("got status %v; want %v", state.ClientStatus, structs.AllocClientStatusRunning)
}
return true, nil
}, func(err error) {
state := ar.AllocState()
t.Fatalf("timed out waiting for alloc runner to start: %v; state=%#v", err, state)
})

time.Sleep(40 * time.Millisecond)

updatedAlloc := ar.Alloc().Copy()
updatedAlloc.AllocModifyIndex++
updatedMaxRunDuration := 200 * time.Millisecond
updatedAlloc.Job.TaskGroups[0].MaxRunDuration = &updatedMaxRunDuration
ar.Update(updatedAlloc)

time.Sleep(60 * time.Millisecond)

state := ar.AllocState()
must.NotNil(t, state)
must.False(t, state.MaxRunDurationExceeded)
must.Eq(t, structs.AllocClientStatusRunning, state.ClientStatus)

testutil.WaitForResult(func() (bool, error) {
state := ar.AllocState()
if state == nil {
return false, fmt.Errorf("no alloc state")
}
if state.ClientStatus != structs.AllocClientStatusComplete {
return false, fmt.Errorf("got status %v; want %v", state.ClientStatus, structs.AllocClientStatusComplete)
}
if state.ClientDescription != structs.AllocTimeoutReasonMaxRunDuration {
return false, fmt.Errorf("got description %q; want %q", state.ClientDescription, structs.AllocTimeoutReasonMaxRunDuration)
}
if !state.MaxRunDurationExceeded {
return false, fmt.Errorf("max run duration was not marked exceeded")
}
return true, nil
}, func(err error) {
state := ar.AllocState()
t.Fatalf("timed out waiting for alloc runner max_run_duration enforcement after update: %v; state=%#v", err, state)
})
}

func TestAllocRunner_setHookStatsHandler(t *testing.T) {
ci.Parallel(t)

Expand Down
Loading
Loading