From 59a098bbfcfa0bc9b1f5be9d0afd9f11dce0b43c Mon Sep 17 00:00:00 2001 From: Michael Smithhisler Date: Mon, 27 Apr 2026 16:58:38 -0400 Subject: [PATCH 1/5] timer: removes NewSafeTimer in favor of time.After --- client/allochealth/tracker.go | 8 +--- client/allocrunner/checks_hook.go | 9 +---- client/allocrunner/csi_hook.go | 10 +---- client/allocrunner/group_service_hook.go | 5 +-- client/allocrunner/tasklifecycle/gate_test.go | 8 +--- client/allocrunner/taskrunner/task_runner.go | 9 +---- .../taskrunner/task_runner_linux_test.go | 6 +-- client/allocrunner/taskrunner/vault_hook.go | 6 +-- client/allocwatcher/alloc_watcher.go | 8 +--- client/client.go | 4 +- client/consul/consul.go | 6 +-- client/drain.go | 13 +------ client/dynamicplugins/registry.go | 7 +--- client/heartbeatstop.go | 7 +--- client/rpc.go | 7 +--- client/serviceregistration/watcher.go | 32 ++-------------- client/vaultclient/vaultclient_test.go | 6 +-- client/widmgr/widmgr.go | 6 +-- command/agent/consul/version_checker.go | 6 +-- command/agent/monitor/monitor.go | 8 +--- command/agent/retry_join_test.go | 15 +------- command/agent/testagent.go | 7 +--- command/operator_debug.go | 5 +-- drivers/docker/cpuset.go | 7 +--- drivers/shared/eventer/eventer.go | 8 +--- helper/backoff.go | 6 +-- helper/broker/notify.go | 9 +---- helper/funcs.go | 38 +------------------ helper/funcs_test.go | 25 ------------ helper/winsvc/events_windows.go | 6 +-- helper/winsvc/windows_service_windows.go | 9 +---- nomad/blocked_evals.go | 9 +---- nomad/drainer/watch_jobs.go | 9 +---- nomad/drainer/watch_nodes.go | 8 +--- nomad/encrypter_test.go | 29 +++++--------- nomad/eval_broker.go | 8 +--- nomad/leader.go | 28 ++------------ nomad/lock/delay.go | 7 +--- nomad/lock/ttl.go | 7 +--- nomad/plan_apply_node_tracker.go | 8 +--- nomad/plan_queue.go | 8 +--- nomad/server.go | 7 +--- nomad/volumewatcher/volume_watcher.go | 7 +--- nomad/worker_test.go | 5 +-- 44 files changed, 66 insertions(+), 380 deletions(-) diff --git a/client/allochealth/tracker.go b/client/allochealth/tracker.go index 1d9525b0e7d..c068a778368 100644 --- a/client/allochealth/tracker.go +++ b/client/allochealth/tracker.go @@ -17,7 +17,6 @@ import ( "github.com/hashicorp/nomad/client/serviceregistration/checks/checkstore" cstructs "github.com/hashicorp/nomad/client/structs" "github.com/hashicorp/nomad/client/taskenv" - "github.com/hashicorp/nomad/helper" "github.com/hashicorp/nomad/nomad/structs" ) @@ -626,10 +625,6 @@ func evaluateConsulChecks(services []*structs.Service, registrations *servicereg // // Does not watch Consul service checks; see watchConsulEvents for those. func (t *Tracker) watchNomadEvents() { - // checkTicker is the ticker that triggers us to look at the checks in Nomad - checkTicker, cancel := helper.NewSafeTimer(t.checkLookupInterval) - defer cancel() - // waiter is used to fire when the checks have been healthy for the MinHealthyTime waiter := newHealthyFuture() @@ -650,9 +645,8 @@ func (t *Tracker) watchNomadEvents() { return // it is time to check the checks - case <-checkTicker.C: + case <-time.After(t.checkLookupInterval): results = t.checkStore.List(allocID) - checkTicker.Reset(t.checkLookupInterval) // enough time has passed with healthy checks case <-waiter.C(): diff --git a/client/allocrunner/checks_hook.go b/client/allocrunner/checks_hook.go index db042ce715a..6ad3c0a0f87 100644 --- a/client/allocrunner/checks_hook.go +++ b/client/allocrunner/checks_hook.go @@ -13,7 +13,6 @@ import ( "github.com/hashicorp/nomad/client/serviceregistration/checks" "github.com/hashicorp/nomad/client/serviceregistration/checks/checkstore" "github.com/hashicorp/nomad/client/taskenv" - "github.com/hashicorp/nomad/helper" "github.com/hashicorp/nomad/nomad/structs" ) @@ -44,9 +43,6 @@ func (o *observer) start() { // compromise between immediate (too early) and waiting full interval (slow) firstWait := o.check.Interval / 2 - timer, cancel := helper.NewSafeTimer(firstWait) - defer cancel() - for { select { @@ -55,15 +51,12 @@ func (o *observer) start() { return // time to execute the check - case <-timer.C: + case <-time.After(firstWait): query := checks.GetCheckQuery(o.check) result := o.checker.Do(o.ctx, o.qc, query) // and put the results into the store (already logged) _ = o.checkStore.Set(o.allocID, result) - - // setup timer for next interval - timer.Reset(o.check.Interval) } } } diff --git a/client/allocrunner/csi_hook.go b/client/allocrunner/csi_hook.go index dab26fa2786..d6e529af395 100644 --- a/client/allocrunner/csi_hook.go +++ b/client/allocrunner/csi_hook.go @@ -394,13 +394,11 @@ func (c *csiHook) claimWithRetry(req *structs.CSIVolumeClaimRequest) (*structs.C var resp structs.CSIVolumeClaimResponse var err error backoff := c.minBackoffInterval - t, stop := helper.NewSafeTimer(0) - defer stop() for { select { case <-ctx.Done(): return nil, err - case <-t.C: + case <-time.After(backoff): } err = c.rpcClient.RPC("CSIVolume.Claim", req, &resp) @@ -420,7 +418,6 @@ func (c *csiHook) claimWithRetry(req *structs.CSIVolumeClaimRequest) (*structs.C } c.logger.Debug( "volume could not be claimed because it is in use", "retry_in", backoff) - t.Reset(backoff) } return &resp, err } @@ -499,13 +496,11 @@ func (c *csiHook) unmountWithRetry(result *volumePublishResult) error { defer cancel() var err error backoff := c.minBackoffInterval - t, stop := helper.NewSafeTimer(0) - defer stop() for { select { case <-ctx.Done(): return err - case <-t.C: + case <-time.After(backoff): } err = c.unmountImpl(result) @@ -520,7 +515,6 @@ func (c *csiHook) unmountWithRetry(result *volumePublishResult) error { } } c.logger.Debug("volume could not be unmounted", "retry_in", backoff) - t.Reset(backoff) } return nil } diff --git a/client/allocrunner/group_service_hook.go b/client/allocrunner/group_service_hook.go index 89b8e04ec93..620a4591f28 100644 --- a/client/allocrunner/group_service_hook.go +++ b/client/allocrunner/group_service_hook.go @@ -248,13 +248,10 @@ func (h *groupServiceHook) preKillLocked() { h.logger.Debug("delay before killing tasks", "group", h.group, "shutdown_delay", h.delay) - timer, cancel := helper.NewSafeTimer(h.delay) - defer cancel() - select { // Wait for specified shutdown_delay unless ignored // This will block an agent from shutting down. - case <-timer.C: + case <-time.After(h.delay): case <-h.shutdownDelayCtx.Done(): } } diff --git a/client/allocrunner/tasklifecycle/gate_test.go b/client/allocrunner/tasklifecycle/gate_test.go index 537c846bc9a..84b177c3f48 100644 --- a/client/allocrunner/tasklifecycle/gate_test.go +++ b/client/allocrunner/tasklifecycle/gate_test.go @@ -8,7 +8,6 @@ import ( "time" "github.com/hashicorp/nomad/ci" - "github.com/hashicorp/nomad/helper" ) func TestGate(t *testing.T) { @@ -101,18 +100,15 @@ func TestGate_shutdown(t *testing.T) { close(closeCh) }() - timer, stop := helper.NewSafeTimer(time.Second) - defer stop() - select { case <-openCh: - case <-timer.C: + case <-time.After(1 * time.Second): t.Fatalf("timeout waiting for gate operations") } select { case <-closeCh: - case <-timer.C: + case <-time.After(1 * time.Second): t.Fatalf("timeout waiting for gate operations") } diff --git a/client/allocrunner/taskrunner/task_runner.go b/client/allocrunner/taskrunner/task_runner.go index 9f1f8030691..02ab5e6164b 100644 --- a/client/allocrunner/taskrunner/task_runner.go +++ b/client/allocrunner/taskrunner/task_runner.go @@ -36,7 +36,6 @@ import ( "github.com/hashicorp/nomad/client/taskenv" "github.com/hashicorp/nomad/client/vaultclient" "github.com/hashicorp/nomad/client/widmgr" - "github.com/hashicorp/nomad/helper" "github.com/hashicorp/nomad/helper/pluginutils/hclspecutils" "github.com/hashicorp/nomad/helper/pluginutils/hclutils" "github.com/hashicorp/nomad/helper/users/dynamic" @@ -622,10 +621,6 @@ func (tr *TaskRunner) Run() { // Set the initial task state. tr.stateUpdater.TaskStateUpdated() - // start with a stopped timer; actual restart delay computed later - timer, stop := helper.NewStoppedTimer() - defer stop() - MAIN: for !tr.shouldShutdown() { if dead { @@ -730,11 +725,9 @@ MAIN: break MAIN } - timer.Reset(restartDelay) - // Actually restart by sleeping and also watching for destroy events select { - case <-timer.C: + case <-time.After(restartDelay): case <-tr.killCtx.Done(): tr.logger.Trace("task killed between restarts", "delay", restartDelay) break MAIN diff --git a/client/allocrunner/taskrunner/task_runner_linux_test.go b/client/allocrunner/taskrunner/task_runner_linux_test.go index 1343e81d4de..9b8fce01b81 100644 --- a/client/allocrunner/taskrunner/task_runner_linux_test.go +++ b/client/allocrunner/taskrunner/task_runner_linux_test.go @@ -39,7 +39,6 @@ import ( cstate "github.com/hashicorp/nomad/client/state" cstructs "github.com/hashicorp/nomad/client/structs" "github.com/hashicorp/nomad/client/taskenv" - "github.com/hashicorp/nomad/helper" structsc "github.com/hashicorp/nomad/nomad/structs/config" ctestutil "github.com/hashicorp/nomad/client/testutil" @@ -2340,11 +2339,8 @@ func TestTaskRunner_TemplateWorkloadIdentity(t *testing.T) { doneCh := make(chan struct{}) t.Cleanup(func() { close(doneCh) }) - timer, stop := helper.NewSafeTimer(wait) - t.Cleanup(stop) - select { - case <-timer.C: + case <-time.After(wait): case <-doneCh: return } diff --git a/client/allocrunner/taskrunner/vault_hook.go b/client/allocrunner/taskrunner/vault_hook.go index 2368d59ab96..2d168bb995f 100644 --- a/client/allocrunner/taskrunner/vault_hook.go +++ b/client/allocrunner/taskrunner/vault_hook.go @@ -366,9 +366,6 @@ func (h *vaultHook) deriveVaultToken() (string, int, bool) { var attempts uint64 var backoff time.Duration - timer, stopTimer := helper.NewSafeTimer(0) - defer stopTimer() - for { token, lease, err := h.deriveVaultTokenJWT() if err == nil { @@ -387,7 +384,6 @@ func (h *vaultHook) deriveVaultToken() (string, int, bool) { // Handle the retry case backoff = helper.Backoff(vaultBackoffBaseline, vaultBackoffLimit, attempts) - timer.Reset(backoff) attempts++ h.logger.Error("failed to derive Vault token", "error", err, "recoverable", true, "backoff", backoff) @@ -396,7 +392,7 @@ func (h *vaultHook) deriveVaultToken() (string, int, bool) { select { case <-h.ctx.Done(): return "", 0, true - case <-timer.C: + case <-time.After(backoff): } } } diff --git a/client/allocwatcher/alloc_watcher.go b/client/allocwatcher/alloc_watcher.go index b5027c619bb..84374b77f7d 100644 --- a/client/allocwatcher/alloc_watcher.go +++ b/client/allocwatcher/alloc_watcher.go @@ -376,13 +376,11 @@ func (p *remotePrevAlloc) Wait(ctx context.Context) error { err := p.rpc.RPC("Alloc.GetAlloc", &req, &resp) if err != nil { retry := getRemoteRetryIntv + helper.RandomStagger(getRemoteRetryIntv) - timer, stop := helper.NewSafeTimer(retry) p.logger.Error("error querying previous alloc", "error", err, "wait", retry) select { - case <-timer.C: + case <-time.After(retry): continue case <-ctx.Done(): - stop() return ctx.Err() } } @@ -390,17 +388,15 @@ func (p *remotePrevAlloc) Wait(ctx context.Context) error { // Ensure that we didn't receive a stale response if req.AllowStale && resp.Index < req.MinQueryIndex { retry := getRemoteRetryIntv + helper.RandomStagger(getRemoteRetryIntv) - timer, stop := helper.NewSafeTimer(retry) p.logger.Warn("received stale alloc; retrying", "req_index", req.MinQueryIndex, "resp_index", resp.Index, "wait", retry, ) select { - case <-timer.C: + case <-time.After(retry): continue case <-ctx.Done(): - stop() return ctx.Err() } } diff --git a/client/client.go b/client/client.go index 0aba249f472..4fb7e49b267 100644 --- a/client/client.go +++ b/client/client.go @@ -2648,7 +2648,6 @@ OUTER: // Node.GetClientAllocs which returns older results. if allocsResp.Index <= allocsReq.MinQueryIndex { retry := c.retryIntv(getAllocRetryIntv) - timer, stop := helper.NewSafeTimer(retry) c.logger.Warn("failed to retrieve updated allocs; retrying", "req_index", allocsReq.MinQueryIndex, "resp_index", allocsResp.Index, @@ -2656,10 +2655,9 @@ OUTER: "wait", retry, ) select { - case <-timer.C: + case <-time.After(retry): continue case <-c.shutdownCh: - stop() return } } diff --git a/client/consul/consul.go b/client/consul/consul.go index 5b30afb0cfb..d5616f31d51 100644 --- a/client/consul/consul.go +++ b/client/consul/consul.go @@ -168,9 +168,6 @@ func (c *consulClient) RevokeTokens(tokens []*consulapi.ACLToken) error { // TokenPreflightCheck verifies that a token has been replicated before we // try to use it for registering services or bootstrapping Envoy func (c *consulClient) TokenPreflightCheck(pctx context.Context, t *consulapi.ACLToken) error { - timer, timerStop := helper.NewStoppedTimer() - defer timerStop() - var retry uint64 var err error ctx, cancel := context.WithTimeout(pctx, c.preflightCheckTimeout) @@ -191,11 +188,10 @@ func (c *consulClient) TokenPreflightCheck(pctx context.Context, t *consulapi.AC backoff := helper.Backoff( c.preflightCheckBaseInterval, c.preflightCheckBaseInterval*2, retry) c.logger.Trace("Consul token not ready", "error", err, "backoff", backoff) - timer.Reset(backoff) select { case <-ctx.Done(): return err - case <-timer.C: + case <-time.After(backoff): continue } } diff --git a/client/drain.go b/client/drain.go index 7dc6a8bb39f..a43d4e93ecf 100644 --- a/client/drain.go +++ b/client/drain.go @@ -9,7 +9,6 @@ import ( "time" "github.com/hashicorp/nomad/client/config" - "github.com/hashicorp/nomad/helper" "github.com/hashicorp/nomad/nomad/structs" ) @@ -89,9 +88,6 @@ func (c *Client) DrainSelf() error { // drain status, returning an error if the context expires or get any error from // the RPC call. If this function returns nil, the drain was successful. func (c *Client) pollServerForDrainStatus(ctx context.Context, interval time.Duration) error { - timer, stop := helper.NewSafeTimer(0) - defer stop() - statusReq := &structs.NodeSpecificRequest{ NodeID: c.NodeID(), SecretID: c.secretNodeID(), @@ -106,7 +102,7 @@ func (c *Client) pollServerForDrainStatus(ctx context.Context, interval time.Dur select { case <-ctx.Done(): return ctx.Err() - case <-timer.C: + case <-time.After(1 * time.Second): err := c.RPC("Node.GetNode", statusReq, &statusResp) if err != nil { return err @@ -114,7 +110,6 @@ func (c *Client) pollServerForDrainStatus(ctx context.Context, interval time.Dur if &statusResp != nil && statusResp.Node.DrainStrategy == nil { return nil } - timer.Reset(interval) } } } @@ -152,18 +147,14 @@ func (c *Client) pollLocalStatusForDrainStatus(ctx context.Context, return true } - timer, stop := helper.NewSafeTimer(0) - defer stop() - for { select { case <-ctx.Done(): return ctx.Err() - case <-timer.C: + case <-time.After(1 * time.Second): if drainIsDone() { return nil } - timer.Reset(interval) } } diff --git a/client/dynamicplugins/registry.go b/client/dynamicplugins/registry.go index 94a2c6e9480..3f35c3e05a5 100644 --- a/client/dynamicplugins/registry.go +++ b/client/dynamicplugins/registry.go @@ -13,8 +13,6 @@ import ( "fmt" "sync" "time" - - "github.com/hashicorp/nomad/helper" ) const ( @@ -339,14 +337,12 @@ func (d *dynamicRegistry) WaitForPlugin(ctx context.Context, ptype, name string) ctx, cancel := context.WithTimeout(ctx, 24*time.Hour) defer cancel() - timer, stop := helper.NewSafeTimer(time.Duration(delay) * time.Millisecond) - defer stop() for { select { case <-ctx.Done(): // an externally-defined timeout wins the day return nil, ctx.Err() - case <-timer.C: + case <-time.After(time.Duration(delay) * time.Millisecond): // continue after our internal delay } @@ -360,7 +356,6 @@ func (d *dynamicRegistry) WaitForPlugin(ctx context.Context, ptype, name string) if delay > maxDelay { delay = maxDelay } - timer.Reset(time.Duration(delay) * time.Millisecond) } } diff --git a/client/heartbeatstop.go b/client/heartbeatstop.go index 6d0415144e9..cc497183edb 100644 --- a/client/heartbeatstop.go +++ b/client/heartbeatstop.go @@ -10,7 +10,6 @@ import ( hclog "github.com/hashicorp/go-hclog" "github.com/hashicorp/nomad/client/allocrunner/interfaces" - "github.com/hashicorp/nomad/helper" "github.com/hashicorp/nomad/nomad/structs" ) @@ -77,8 +76,8 @@ func (h *heartbeatStop) watch() { h.setLastOk(time.Now()) allocIntervals := map[string]time.Duration{} - timer, stopTimer := helper.NewStoppedTimer() - defer stopTimer() + timer := time.NewTimer(1 * time.Second) + timer.Stop() for { // we want to fire the ticker only once the shortest @@ -92,8 +91,6 @@ func (h *heartbeatStop) watch() { } if interval != 0 { timer.Reset(interval) - } else { - timer.Stop() } select { diff --git a/client/rpc.go b/client/rpc.go index d1e9c7f0e80..e9f3bbafb58 100644 --- a/client/rpc.go +++ b/client/rpc.go @@ -144,12 +144,9 @@ TRY: return rpcErr } - // Wait to avoid thundering herd - timer, cancel := helper.NewSafeTimer(helper.RandomStagger(conf.RPCHoldTimeout / structs.JitterFraction)) - defer cancel() - select { - case <-timer.C: + // Wait to avoid thundering herd + case <-time.After(helper.RandomStagger(conf.RPCHoldTimeout / structs.JitterFraction)): // If we are going to retry a blocking query we need to update the time // to block so it finishes by our deadline. diff --git a/client/serviceregistration/watcher.go b/client/serviceregistration/watcher.go index a8bbb836acb..522f4754b77 100644 --- a/client/serviceregistration/watcher.go +++ b/client/serviceregistration/watcher.go @@ -10,7 +10,6 @@ import ( "github.com/hashicorp/go-hclog" "github.com/hashicorp/go-set/v3" - "github.com/hashicorp/nomad/helper" "github.com/hashicorp/nomad/nomad/structs" ) @@ -225,26 +224,7 @@ func (w *UniversalCheckWatcher) Run(ctx context.Context) { // map of checkID to their restarter handle (contains only checks we are watching) watched := make(map[string]*restarter) - checkTimer, cleanupCheckTimer := helper.NewSafeTimer(0) - defer cleanupCheckTimer() - - stopCheckTimer := func() { // todo: refactor using that other pattern - checkTimer.Stop() - select { - case <-checkTimer.C: - default: - } - } - - // initialize with checkTimer disabled - stopCheckTimer() - for { - // disable polling if there are no checks - if len(watched) == 0 { - stopCheckTimer() - } - select { // caller cancelled us; goodbye case <-ctx.Done(): @@ -263,16 +243,12 @@ func (w *UniversalCheckWatcher) Run(ctx context.Context) { checkName := update.restart.checkName w.logger.Trace("now watching check", "alloc_i", allocID, "task", taskName, "check", checkName) - // turn on the timer if we are now active - if len(watched) == 1 { - stopCheckTimer() - checkTimer.Reset(w.pollFrequency) - } - // poll time; refresh check statuses - case now := <-checkTimer.C: + case now := <-time.After(1 * time.Second): + if len(watched) == 0 { + continue + } w.interval(ctx, now, watched) - checkTimer.Reset(w.pollFrequency) } } } diff --git a/client/vaultclient/vaultclient_test.go b/client/vaultclient/vaultclient_test.go index 2244423b4a2..96669cc7d48 100644 --- a/client/vaultclient/vaultclient_test.go +++ b/client/vaultclient/vaultclient_test.go @@ -19,7 +19,6 @@ import ( josejwt "github.com/go-jose/go-jose/v3/jwt" "github.com/hashicorp/nomad/ci" "github.com/hashicorp/nomad/client/widmgr" - "github.com/hashicorp/nomad/helper" "github.com/hashicorp/nomad/helper/pointer" "github.com/hashicorp/nomad/helper/testlog" "github.com/hashicorp/nomad/helper/useragent" @@ -505,9 +504,6 @@ func TestVaultClient_RenewalConcurrent(t *testing.T) { } // Collect results with timeout. - timer, stop := helper.NewSafeTimer(3 * time.Second) - defer stop() - sawInitial := 0 sawRenew := 0 for { @@ -526,7 +522,7 @@ func TestVaultClient_RenewalConcurrent(t *testing.T) { } case got := <-resultCh: must.Nil(t, got, must.Sprintf("token renewal error: %v", got)) - case <-timer.C: + case <-time.After(3 * time.Second): t.Fatalf("timeout waiting for expected token renewals (initial: %d renewed: %d)", sawInitial, sawRenew) } diff --git a/client/widmgr/widmgr.go b/client/widmgr/widmgr.go index e943ebb1a37..f32b68a008c 100644 --- a/client/widmgr/widmgr.go +++ b/client/widmgr/widmgr.go @@ -364,9 +364,6 @@ func (m *WIDMgr) renew() { wait = helper.ExpiryToRenewTime(minExp, time.Now, m.minWait) } - timer, timerStop := helper.NewStoppedTimer() - defer timerStop() - var retry uint64 for { @@ -378,9 +375,8 @@ func (m *WIDMgr) renew() { } m.logger.Debug("waiting to renew identities", "num", len(reqs), "wait", wait) - timer.Reset(wait) select { - case <-timer.C: + case <-time.After(wait): m.logger.Trace("getting new signed identities", "num", len(reqs)) case <-m.stopCtx.Done(): // close watchers and shutdown diff --git a/command/agent/consul/version_checker.go b/command/agent/consul/version_checker.go index e0e9d0f8c10..60fe165fdda 100644 --- a/command/agent/consul/version_checker.go +++ b/command/agent/consul/version_checker.go @@ -23,9 +23,6 @@ func checkConsulTLSSkipVerify(ctx context.Context, logger log.Logger, client Age defer close(done) - timer, stop := helper.NewSafeTimer(limit) - defer stop() - var attempts uint64 var backoff time.Duration @@ -43,12 +40,11 @@ func checkConsulTLSSkipVerify(ctx context.Context, logger log.Logger, client Age backoff = helper.Backoff(baseline, limit, attempts) attempts++ - timer.Reset(backoff) select { case <-ctx.Done(): return - case <-timer.C: + case <-time.After(backoff): } } } diff --git a/command/agent/monitor/monitor.go b/command/agent/monitor/monitor.go index 66b80ae3bdb..0d5bb14fe79 100644 --- a/command/agent/monitor/monitor.go +++ b/command/agent/monitor/monitor.go @@ -9,7 +9,6 @@ import ( "time" log "github.com/hashicorp/go-hclog" - "github.com/hashicorp/nomad/helper" ) // Monitor provides a mechanism to stream logs using go-hclog @@ -111,17 +110,12 @@ func (d *monitor) Start() <-chan []byte { // dropped messages and makes room on the logCh // to add a dropped message count warning go func() { - timer, stop := helper.NewSafeTimer(d.droppedDuration) - defer stop() - // loop and check for dropped messages for { - timer.Reset(d.droppedDuration) - select { case <-d.doneCh: return - case <-timer.C: + case <-time.After(d.droppedDuration): d.Lock() // Check if there have been any dropped messages. diff --git a/command/agent/retry_join_test.go b/command/agent/retry_join_test.go index 09481495a7a..3dd94add33a 100644 --- a/command/agent/retry_join_test.go +++ b/command/agent/retry_join_test.go @@ -17,7 +17,6 @@ import ( "github.com/hashicorp/go-hclog" "github.com/hashicorp/go-netaddrs" "github.com/hashicorp/nomad/ci" - "github.com/hashicorp/nomad/helper" "github.com/hashicorp/nomad/helper/testlog" "github.com/hashicorp/nomad/testutil" "github.com/shoenig/test/must" @@ -272,11 +271,6 @@ func TestRetryJoin_RetryMaxAttempts(t *testing.T) { // to test this apart from inspecting log entries. errCh := make(chan struct{}) - // Create a timeout to protect against problems within the test blocking - // for arbitrary long times. - timeout, timeoutStop := helper.NewSafeTimer(2 * time.Second) - defer timeoutStop() - var output []string joiner := retryJoiner{ @@ -307,7 +301,7 @@ func TestRetryJoin_RetryMaxAttempts(t *testing.T) { must.Len(t, 0, output) case <-doneCh: t.Fatal("retry join completed without closing error channel") - case <-timeout.C: + case <-time.After(2 * time.Second): t.Fatal("timeout reached without error channel close") } } @@ -321,11 +315,6 @@ func TestRetryJoin_joinFuncFailure(t *testing.T) { // Create an output for logging that can be inspected logOutput := bytes.NewBufferString("") - // Create a timeout to protect against problems within the test blocking - // for arbitrary long times. - timeout, timeoutStop := helper.NewSafeTimer(2 * time.Second) - defer timeoutStop() - var output []string l := testlog.HCLogger(t) @@ -362,7 +351,7 @@ func TestRetryJoin_joinFuncFailure(t *testing.T) { must.Len(t, 0, output) case <-doneCh: t.Fatal("retry join completed without closing error channel") - case <-timeout.C: + case <-time.After(2 * time.Second): t.Fatal("timeout reached without error channel close") } diff --git a/command/agent/testagent.go b/command/agent/testagent.go index 39e6b7c0f53..1bdf8ebc110 100644 --- a/command/agent/testagent.go +++ b/command/agent/testagent.go @@ -20,7 +20,6 @@ import ( "github.com/hashicorp/nomad/ci" client "github.com/hashicorp/nomad/client/config" "github.com/hashicorp/nomad/client/fingerprint" - "github.com/hashicorp/nomad/helper" "github.com/hashicorp/nomad/helper/testlog" "github.com/hashicorp/nomad/nomad" "github.com/hashicorp/nomad/nomad/mock" @@ -299,16 +298,12 @@ func (a *TestAgent) Shutdown() { ch <- a.Agent.Shutdown() }() - // one minute grace period on shutdown - timer, cancel := helper.NewSafeTimer(1 * time.Minute) - defer cancel() - select { case err := <-ch: if err != nil { a.T.Fatalf("agent shutdown error: %v", err) } - case <-timer.C: + case <-time.After(1 * time.Minute): a.T.Fatal("agent shutdown timeout") } } diff --git a/command/operator_debug.go b/command/operator_debug.go index 72f2c0d2936..10a29f71321 100644 --- a/command/operator_debug.go +++ b/command/operator_debug.go @@ -1080,18 +1080,15 @@ func (c *OperatorDebugCommand) collectPeriodicPprofs(client *api.Client) { go func() { ctx, cancel := context.WithTimeout(c.ctx, c.duration) defer cancel() - timer, stop := helper.NewSafeTimer(c.pprofInterval) - defer stop() pprofIntervalCount := 1 for { select { case <-ctx.Done(): return - case <-timer.C: + case <-time.After(c.pprofInterval): c.Ui.Output(fmt.Sprintf(" Capture pprofInterval %04d", pprofIntervalCount)) c.collectPprofs(client, pprofServerIDs, pprofNodeIDs, pprofIntervalCount) - timer.Reset(c.pprofInterval) pprofIntervalCount++ } } diff --git a/drivers/docker/cpuset.go b/drivers/docker/cpuset.go index 7397fcde2cc..08a7bc5e77d 100644 --- a/drivers/docker/cpuset.go +++ b/drivers/docker/cpuset.go @@ -9,7 +9,6 @@ import ( "time" "github.com/hashicorp/nomad/client/lib/cgroupslib" - "github.com/hashicorp/nomad/helper" ) const ( @@ -36,16 +35,12 @@ func (c *cpuset) watch() { c.sync = c.copyCpuset } - ticks, cancel := helper.NewSafeTimer(cpusetSyncPeriod) - defer cancel() - for { select { case <-c.doneCh: return - case <-ticks.C: + case <-time.After(cpusetSyncPeriod): c.sync(c.source, c.destination) - ticks.Reset(cpusetSyncPeriod) } } } diff --git a/drivers/shared/eventer/eventer.go b/drivers/shared/eventer/eventer.go index 05b90ccd2a5..8462ca090c6 100644 --- a/drivers/shared/eventer/eventer.go +++ b/drivers/shared/eventer/eventer.go @@ -9,7 +9,6 @@ import ( "time" hclog "github.com/hashicorp/go-hclog" - "github.com/hashicorp/nomad/helper" "github.com/hashicorp/nomad/plugins/drivers" ) @@ -66,19 +65,14 @@ func NewEventer(ctx context.Context, logger hclog.Logger) *Eventer { // eventLoop is the main logic which pulls events from the channel and broadcasts // them to all consumers func (e *Eventer) eventLoop() { - timer, stop := helper.NewSafeTimer(ConsumerGCInterval) - defer stop() - for { - timer.Reset(ConsumerGCInterval) - select { case <-e.ctx.Done(): e.logger.Trace("task event loop shutdown") return case event := <-e.events: e.iterateConsumers(event) - case <-timer.C: + case <-time.After(ConsumerGCInterval): e.gcConsumers() } } diff --git a/helper/backoff.go b/helper/backoff.go index 675b131d47e..45eefa11ae6 100644 --- a/helper/backoff.go +++ b/helper/backoff.go @@ -38,13 +38,11 @@ func Backoff(backoffBase time.Duration, backoffLimit time.Duration, attempt uint func WithBackoffFunc(ctx context.Context, minBackoff, maxBackoff time.Duration, fn func() error) error { var err error backoff := minBackoff - t, stop := NewSafeTimer(0) - defer stop() for { select { case <-ctx.Done(): return fmt.Errorf("operation cancelled: %w", err) - case <-t.C: + case <-time.After(backoff): } err = fn() @@ -58,7 +56,5 @@ func WithBackoffFunc(ctx context.Context, minBackoff, maxBackoff time.Duration, backoff = maxBackoff } } - - t.Reset(backoff) } } diff --git a/helper/broker/notify.go b/helper/broker/notify.go index 75812dad060..52c871510f5 100644 --- a/helper/broker/notify.go +++ b/helper/broker/notify.go @@ -6,8 +6,6 @@ package broker import ( "context" "time" - - "github.com/hashicorp/nomad/helper" ) // GenericNotifier allows a process to send updates to many subscribers in an @@ -93,10 +91,6 @@ func (g *GenericNotifier) WaitForChange(timeout time.Duration) interface{} { case g.subscribeCh <- updateCh: } - // Create a timeout timer and use the helper to ensure this routine doesn't - // panic and making the stop call clear. - timeoutTimer, timeoutStop := helper.NewSafeTimer(timeout) - // Defer a function which performs all the required cleanup of the // subscriber once it has been notified of a change, or reached its wait // timeout. @@ -106,7 +100,6 @@ func (g *GenericNotifier) WaitForChange(timeout time.Duration) interface{} { case g.unsubscribeCh <- updateCh: } close(updateCh) - timeoutStop() }() // Enter the main loop which listens for an update or timeout and returns @@ -114,7 +107,7 @@ func (g *GenericNotifier) WaitForChange(timeout time.Duration) interface{} { select { case <-g.ctx.Done(): return "shutting down" - case <-timeoutTimer.C: + case <-time.After(timeout): return "wait timed out after " + timeout.String() case update := <-updateCh: return update diff --git a/helper/funcs.go b/helper/funcs.go index 9b87ef8eceb..ce1795b0e9e 100644 --- a/helper/funcs.go +++ b/helper/funcs.go @@ -7,7 +7,6 @@ import ( "crypto/sha512" "fmt" "maps" - "math" "net/http" "os" "path/filepath" @@ -358,36 +357,9 @@ func CheckNamespaceScope(provided string, requested []string) []string { return nil } -// StopFunc is used to stop a time.Timer created with NewSafeTimer +// StopFunc is used to stop a time.Timer created with NewSafeTicker type StopFunc func() -// NewSafeTimer creates a time.Timer but does not panic if duration is <= 0. -// -// Using a time.Timer is recommended instead of time.After when it is necessary -// to avoid leaking goroutines (e.g. in a select inside a loop). -// -// Returns the time.Timer and also a StopFunc, forcing the caller to deal -// with stopping the time.Timer to avoid leaking a goroutine. -// -// Note: If creating a Timer that should do nothing until Reset is called, use -// NewStoppedTimer instead for safely creating the timer in a stopped state. -func NewSafeTimer(duration time.Duration) (*time.Timer, StopFunc) { - if duration <= 0 { - // Avoid panic by using the smallest positive value. This is close enough - // to the behavior of time.After(0), which this helper is intended to - // replace. - // https://go.dev/play/p/EIkm9MsPbHY - duration = 1 - } - - t := time.NewTimer(duration) - cancel := func() { - t.Stop() - } - - return t, cancel -} - // NewSafeTicker creates a time.Ticker but does not panic if duration is <= 0. // // Returns the time.Ticker and also a StopFunc, forcing the caller to deal @@ -409,14 +381,6 @@ func NewSafeTicker(duration time.Duration) (*time.Ticker, StopFunc) { return t, cancel } -// NewStoppedTimer creates a time.Timer in a stopped state. This is useful when -// the actual wait time will computed and set later via Reset. -func NewStoppedTimer() (*time.Timer, StopFunc) { - t, f := NewSafeTimer(math.MaxInt64) - t.Stop() - return t, f -} - // ConvertSlice takes the input slice and generates a new one using the // supplied conversion function to covert the element. This is useful when // converting a slice of strings to a slice of structs which wraps the string. diff --git a/helper/funcs_test.go b/helper/funcs_test.go index 0d4af67354a..4239d21f00e 100644 --- a/helper/funcs_test.go +++ b/helper/funcs_test.go @@ -335,31 +335,6 @@ func TestCheckNamespaceScope(t *testing.T) { } } -func TestTimer_NewSafeTimer(t *testing.T) { - t.Run("zero", func(t *testing.T) { - timer, stop := NewSafeTimer(0) - defer stop() - <-timer.C - }) - - t.Run("positive", func(t *testing.T) { - timer, stop := NewSafeTimer(1) - defer stop() - <-timer.C - }) -} - -func TestTimer_NewStoppedTimer(t *testing.T) { - timer, stop := NewStoppedTimer() - defer stop() - - select { - case <-timer.C: - must.Unreachable(t) - default: - } -} - func Test_ConvertSlice(t *testing.T) { t.Run("string wrapper", func(t *testing.T) { diff --git a/helper/winsvc/events_windows.go b/helper/winsvc/events_windows.go index 63cc9f27f43..1176d1fdf2e 100644 --- a/helper/winsvc/events_windows.go +++ b/helper/winsvc/events_windows.go @@ -7,19 +7,15 @@ import ( "time" "github.com/hashicorp/go-hclog" - "github.com/hashicorp/nomad/helper" ) var chanEvents = make(chan Event) // SendEvent sends an event to the Windows eventlog func SendEvent(e Event) { - timer, stop := helper.NewSafeTimer(100 * time.Millisecond) - defer stop() - select { case chanEvents <- e: - case <-timer.C: + case <-time.After(100 * time.Millisecond): hclog.L().Error("failed to send event to windows eventlog, timed out", "event", e) } diff --git a/helper/winsvc/windows_service_windows.go b/helper/winsvc/windows_service_windows.go index ef254c455e7..017db949d4c 100644 --- a/helper/winsvc/windows_service_windows.go +++ b/helper/winsvc/windows_service_windows.go @@ -14,7 +14,6 @@ import ( "slices" "time" - "github.com/hashicorp/nomad/helper" "golang.org/x/sys/windows/registry" "golang.org/x/sys/windows/svc" "golang.org/x/sys/windows/svc/eventlog" @@ -231,13 +230,7 @@ func waitFor(ctx context.Context, condition func() (bool, error)) error { ctx, stop := signal.NotifyContext(ctx, os.Interrupt) defer stop() - pauseDur := time.Millisecond * 250 - t, timerStop := helper.NewSafeTimer(pauseDur) - defer timerStop() - for { - t.Reset(pauseDur) - complete, err := condition() if err != nil { return err @@ -250,7 +243,7 @@ func waitFor(ctx context.Context, condition func() (bool, error)) error { select { case <-ctx.Done(): return fmt.Errorf("timeout exceeded waiting for process") - case <-t.C: + case <-time.After(250 * time.Millisecond): } } } diff --git a/nomad/blocked_evals.go b/nomad/blocked_evals.go index 88846d1918f..f83078a5c0f 100644 --- a/nomad/blocked_evals.go +++ b/nomad/blocked_evals.go @@ -9,7 +9,6 @@ import ( "github.com/hashicorp/go-hclog" metrics "github.com/hashicorp/go-metrics/compat" - "github.com/hashicorp/nomad/helper" "github.com/hashicorp/nomad/nomad/structs" ) @@ -800,15 +799,9 @@ func (b *BlockedEvals) Flush() { // EmitStats is used to export metrics about the blocked eval tracker while enabled func (b *BlockedEvals) EmitStats(period time.Duration, stopCh <-chan struct{}) { - timer, stop := helper.NewSafeTimer(period) - defer stop() - for { - timer.Reset(period) - select { - - case <-timer.C: + case <-time.After(period): stats := b.stats.Copy() metrics.SetGauge([]string{"nomad", "blocked_evals", "total_quota_limit"}, float32(stats.TotalQuotaLimit)) metrics.SetGauge([]string{"nomad", "blocked_evals", "total_blocked"}, float32(stats.TotalBlocked)) diff --git a/nomad/drainer/watch_jobs.go b/nomad/drainer/watch_jobs.go index b2041200fa6..0471be35696 100644 --- a/nomad/drainer/watch_jobs.go +++ b/nomad/drainer/watch_jobs.go @@ -7,11 +7,11 @@ import ( "context" "fmt" "sync" + "time" log "github.com/hashicorp/go-hclog" memdb "github.com/hashicorp/go-memdb" - "github.com/hashicorp/nomad/helper" "github.com/hashicorp/nomad/nomad/state" "github.com/hashicorp/nomad/nomad/structs" "golang.org/x/time/rate" @@ -142,14 +142,9 @@ func (w *drainingJobWatcher) deregisterJob(jobID, namespace string) { // watch is the long lived watching routine that detects job drain changes. func (w *drainingJobWatcher) watch() { - timer, stop := helper.NewSafeTimer(stateReadErrorDelay) - defer stop() - waitIndex := uint64(1) for { - timer.Reset(stateReadErrorDelay) - w.logger.Trace("getting job allocs at index", "index", waitIndex) jobAllocs, index, err := w.getJobAllocs(w.getQueryCtx(), waitIndex) @@ -173,7 +168,7 @@ func (w *drainingJobWatcher) watch() { case <-w.ctx.Done(): w.logger.Trace("shutting down") return - case <-timer.C: + case <-time.After(stateReadErrorDelay): continue } } diff --git a/nomad/drainer/watch_nodes.go b/nomad/drainer/watch_nodes.go index 83fdb125c7a..361c0b332cb 100644 --- a/nomad/drainer/watch_nodes.go +++ b/nomad/drainer/watch_nodes.go @@ -5,10 +5,10 @@ package drainer import ( "context" + "time" log "github.com/hashicorp/go-hclog" memdb "github.com/hashicorp/go-memdb" - "github.com/hashicorp/nomad/helper" "github.com/hashicorp/nomad/nomad/state" "github.com/hashicorp/nomad/nomad/structs" @@ -146,13 +146,9 @@ func NewNodeDrainWatcher(ctx context.Context, limiter *rate.Limiter, state *stat // watch is the long lived watching routine that detects node changes. func (w *nodeDrainWatcher) watch() { - timer, stop := helper.NewSafeTimer(stateReadErrorDelay) - defer stop() - nindex := uint64(1) for { - timer.Reset(stateReadErrorDelay) nodes, index, err := w.getNodes(nindex) if err != nil { if err == context.Canceled { @@ -163,7 +159,7 @@ func (w *nodeDrainWatcher) watch() { select { case <-w.ctx.Done(): return - case <-timer.C: + case <-time.After(stateReadErrorDelay): continue } } diff --git a/nomad/encrypter_test.go b/nomad/encrypter_test.go index 0f3fdd27530..b44c44dfde4 100644 --- a/nomad/encrypter_test.go +++ b/nomad/encrypter_test.go @@ -25,7 +25,6 @@ import ( wrapping "github.com/hashicorp/go-kms-wrapping/v2" msgpackrpc "github.com/hashicorp/net-rpc-msgpackrpc/v2" "github.com/hashicorp/nomad/ci" - "github.com/hashicorp/nomad/helper" "github.com/hashicorp/nomad/helper/pointer" "github.com/hashicorp/nomad/helper/testlog" "github.com/hashicorp/nomad/helper/uuid" @@ -1046,29 +1045,25 @@ func TestEncrypter_IsReady_eventuallyReady(t *testing.T) { respCh <- encrypter.IsReady(timeoutCtx) }() + select { // Create a timer at 1/3 the value of the timeout. When this triggers, we // add a new decryption task to the encrypter. This simulates Nomad // upserting a new key into state which was not part of the original // snapshot or trailing logs and therefore should not block the readiness // check. - taskAddTimer, stop := helper.NewSafeTimer(timeout / 3) - t.Cleanup(stop) + case <-time.After(timeout / 3): + encrypter.decryptTasksLock.Lock() + encrypter.decryptTasks["id2"] = struct{}{} + encrypter.decryptTasksLock.Unlock() // Create a timer at half the value of the timeout. When this triggers, we // will remove the task from the encrypter simulating it finishing and the // encrypter becoming ready. - taskDeleteTimer, stop := helper.NewSafeTimer(timeout / 2) - t.Cleanup(stop) - - select { - case <-taskAddTimer.C: - encrypter.decryptTasksLock.Lock() - encrypter.decryptTasks["id2"] = struct{}{} - encrypter.decryptTasksLock.Unlock() - case <-taskDeleteTimer.C: + case <-time.After(timeout / 2): encrypter.decryptTasksLock.Lock() delete(encrypter.decryptTasks, "id1") encrypter.decryptTasksLock.Unlock() + case err := <-respCh: must.NoError(t, err) encrypter.decryptTasksLock.RLock() @@ -1352,17 +1347,11 @@ func TestEncrypter_decryptWrappedKeyTask_contextCancel(t *testing.T) { // // Canceling the context should cause the routine to exit and send an error // which we can check to ensure we correctly unblock. - timer, timerStop := helper.NewSafeTimer(500 * time.Millisecond) - defer timerStop() - - <-timer.C + <-time.After(500 * time.Millisecond) cancel() - timer, timerStop = helper.NewSafeTimer(200 * time.Millisecond) - defer timerStop() - select { - case <-timer.C: + case <-time.After(200 * time.Millisecond): t.Fatal("timed out waiting for decryptWrappedKeyTask to send its error") case err := <-errorCh: must.ErrorContains(t, err, "context canceled") diff --git a/nomad/eval_broker.go b/nomad/eval_broker.go index e1904794941..87f1f49a06d 100644 --- a/nomad/eval_broker.go +++ b/nomad/eval_broker.go @@ -15,7 +15,6 @@ import ( metrics "github.com/hashicorp/go-metrics/compat" - "github.com/hashicorp/nomad/helper" "github.com/hashicorp/nomad/helper/broker" "github.com/hashicorp/nomad/helper/uuid" "github.com/hashicorp/nomad/lib/delayheap" @@ -982,14 +981,9 @@ func (b *EvalBroker) Cancelable(batchSize int) []*structs.Evaluation { // EmitStats is used to export metrics about the broker while enabled func (b *EvalBroker) EmitStats(period time.Duration, stopCh <-chan struct{}) { - timer, stop := helper.NewSafeTimer(period) - defer stop() - for { - timer.Reset(period) - select { - case <-timer.C: + case <-time.After(period): stats := b.Stats() metrics.SetGauge([]string{"nomad", "broker", "total_ready"}, float32(stats.TotalReady)) metrics.SetGauge([]string{"nomad", "broker", "total_unacked"}, float32(stats.TotalUnacked)) diff --git a/nomad/leader.go b/nomad/leader.go index 26a5f1d9521..794f692cebd 100644 --- a/nomad/leader.go +++ b/nomad/leader.go @@ -17,7 +17,6 @@ import ( "github.com/hashicorp/go-memdb" metrics "github.com/hashicorp/go-metrics/compat" "github.com/hashicorp/go-version" - "github.com/hashicorp/nomad/helper" "github.com/hashicorp/nomad/helper/uuid" "github.com/hashicorp/nomad/nomad/peers" "github.com/hashicorp/nomad/nomad/state" @@ -957,10 +956,6 @@ func (s *Server) schedulePeriodic(stopCh chan struct{}) { variablesRekey := time.NewTicker(s.config.VariablesRekeyInterval) defer variablesRekey.Stop() - // Set up the expired ACL local token garbage collection timer. - localTokenExpiredGC, localTokenExpiredGCStop := helper.NewSafeTimer(s.config.ACLTokenExpirationGCInterval) - defer localTokenExpiredGCStop() - for { select { @@ -1000,11 +995,10 @@ func (s *Server) schedulePeriodic(stopCh chan struct{}) { if index, ok := s.getLatestIndex(); ok { s.evalBroker.Enqueue(s.coreJobEval(structs.CoreJobOneTimeTokenGC, index)) } - case <-localTokenExpiredGC.C: + case <-time.After(s.config.ACLTokenExpirationGCInterval): if index, ok := s.getLatestIndex(); ok { s.evalBroker.Enqueue(s.coreJobEval(structs.CoreJobLocalTokenExpiredGC, index)) } - localTokenExpiredGC.Reset(s.config.ACLTokenExpirationGCInterval) case <-rootKeyGC.C: if index, ok := s.getLatestIndex(); ok { s.evalBroker.Enqueue(s.coreJobEval(structs.CoreJobRootKeyRotateOrGC, index)) @@ -1024,18 +1018,12 @@ func (s *Server) schedulePeriodic(stopCh chan struct{}) { // onto the _core scheduler for ACL based activities such as removing expired // global ACL tokens. func (s *Server) schedulePeriodicAuthoritative(stopCh chan struct{}) { - - // Set up the expired ACL global token garbage collection timer. - globalTokenExpiredGC, globalTokenExpiredGCStop := helper.NewSafeTimer(s.config.ACLTokenExpirationGCInterval) - defer globalTokenExpiredGCStop() - for { select { - case <-globalTokenExpiredGC.C: + case <-time.After(s.config.ACLTokenExpirationGCInterval): if index, ok := s.getLatestIndex(); ok { s.evalBroker.Enqueue(s.coreJobEval(structs.CoreJobGlobalTokenExpiredGC, index)) } - globalTokenExpiredGC.Reset(s.config.ACLTokenExpirationGCInterval) case <-stopCh: return } @@ -1192,18 +1180,14 @@ func (s *Server) reapCancelableEvaluations(stopCh chan struct{}) chan struct{} { wakeCh := make(chan struct{}, 1) go func() { - - timer, cancel := helper.NewSafeTimer(s.config.EvalReapCancelableInterval) - defer cancel() for { select { case <-stopCh: return case <-wakeCh: cancelCancelableEvals(s) - case <-timer.C: + case <-time.After(s.config.EvalReapCancelableInterval): cancelCancelableEvals(s) - timer.Reset(s.config.EvalReapCancelableInterval) } } }() @@ -2583,12 +2567,8 @@ func diffACLBindingRules( // return // } func (s *Server) replicationBackoffContinue(stopCh chan struct{}) bool { - - timer, timerStopFn := helper.NewSafeTimer(s.config.ReplicationBackoff) - defer timerStopFn() - select { - case <-timer.C: + case <-time.After(s.config.ReplicationBackoff): return true case <-stopCh: return false diff --git a/nomad/lock/delay.go b/nomad/lock/delay.go index a04a2c1f20c..ea460d24168 100644 --- a/nomad/lock/delay.go +++ b/nomad/lock/delay.go @@ -8,7 +8,6 @@ import ( "time" metrics "github.com/hashicorp/go-metrics/compat" - "github.com/hashicorp/nomad/helper" ) // DelayTimer is used to mark certain locks as unacquirable. When a locks TTL @@ -68,13 +67,9 @@ func (d *DelayTimer) RemoveAll() { // EmitMetrics is a long-running routine used to emit periodic metrics about // the Delay. func (d *DelayTimer) EmitMetrics(period time.Duration, shutdownCh chan struct{}) { - timer, stop := helper.NewSafeTimer(period) - defer stop() - for { - timer.Reset(period) select { - case <-timer.C: + case <-time.After(period): metrics.SetGauge([]string{"variables", "locks", "delay_timer", "num"}, float32(d.timerNum())) case <-shutdownCh: return diff --git a/nomad/lock/ttl.go b/nomad/lock/ttl.go index 7f3d3f87fd5..d3e7a24770a 100644 --- a/nomad/lock/ttl.go +++ b/nomad/lock/ttl.go @@ -8,7 +8,6 @@ import ( "time" metrics "github.com/hashicorp/go-metrics/compat" - "github.com/hashicorp/nomad/helper" ) // TTLTimer provides a map of named timers which is safe for concurrent use. @@ -84,13 +83,9 @@ func (t *TTLTimer) StopAndRemoveAll() { // EmitMetrics is a long-running routine used to emit periodic metrics about // the Timer. func (t *TTLTimer) EmitMetrics(period time.Duration, shutdownCh chan struct{}) { - timer, stop := helper.NewSafeTimer(period) - defer stop() - for { - timer.Reset(period) select { - case <-timer.C: + case <-time.After(period): metrics.SetGauge([]string{"variables", "locks", "ttl_timer", "num"}, float32(t.TimerNum())) case <-shutdownCh: return diff --git a/nomad/plan_apply_node_tracker.go b/nomad/plan_apply_node_tracker.go index 09738cdd9f0..d9873be1b59 100644 --- a/nomad/plan_apply_node_tracker.go +++ b/nomad/plan_apply_node_tracker.go @@ -10,7 +10,6 @@ import ( "github.com/hashicorp/go-hclog" metrics "github.com/hashicorp/go-metrics/compat" lru "github.com/hashicorp/golang-lru/v2" - "github.com/hashicorp/nomad/helper" "golang.org/x/time/rate" ) @@ -111,14 +110,9 @@ func (c *CachedBadNodeTracker) Add(nodeID string) bool { // EmitStats generates metrics for the bad nodes being currently tracked. Must // be called in a goroutine. func (c *CachedBadNodeTracker) EmitStats(period time.Duration, stopCh <-chan struct{}) { - timer, stop := helper.NewSafeTimer(period) - defer stop() - for { - timer.Reset(period) - select { - case <-timer.C: + case <-time.After(period): c.emitStats() case <-stopCh: return diff --git a/nomad/plan_queue.go b/nomad/plan_queue.go index 1de6c739f2a..46d4ee3a29d 100644 --- a/nomad/plan_queue.go +++ b/nomad/plan_queue.go @@ -10,7 +10,6 @@ import ( "time" metrics "github.com/hashicorp/go-metrics/compat" - "github.com/hashicorp/nomad/helper" "github.com/hashicorp/nomad/nomad/structs" ) @@ -200,14 +199,9 @@ func (q *PlanQueue) Stats() *QueueStats { // EmitStats is used to export metrics about the broker while enabled func (q *PlanQueue) EmitStats(period time.Duration, stopCh <-chan struct{}) { - timer, stop := helper.NewSafeTimer(period) - defer stop() - for { - timer.Reset(period) - select { - case <-timer.C: + case <-time.After(period): stats := q.Stats() metrics.SetGauge([]string{"nomad", "plan", "queue_depth"}, float32(stats.Depth)) case <-stopCh: diff --git a/nomad/server.go b/nomad/server.go index 987efee78b6..6a49a6715e8 100644 --- a/nomad/server.go +++ b/nomad/server.go @@ -2200,14 +2200,9 @@ func (s *Server) Stats() map[string]map[string]string { // EmitRaftStats is used to export metrics about raft indexes and state store snapshot index func (s *Server) EmitRaftStats(period time.Duration, stopCh <-chan struct{}) { - timer, stop := helper.NewSafeTimer(period) - defer stop() - for { - timer.Reset(period) - select { - case <-timer.C: + case <-time.After(period): lastIndex := s.raft.LastIndex() metrics.SetGauge([]string{"raft", "lastIndex"}, float32(lastIndex)) appliedIndex := s.raft.AppliedIndex() diff --git a/nomad/volumewatcher/volume_watcher.go b/nomad/volumewatcher/volume_watcher.go index 163e16038cc..0f92b73fb74 100644 --- a/nomad/volumewatcher/volume_watcher.go +++ b/nomad/volumewatcher/volume_watcher.go @@ -11,7 +11,6 @@ import ( log "github.com/hashicorp/go-hclog" memdb "github.com/hashicorp/go-memdb" multierror "github.com/hashicorp/go-multierror" - "github.com/hashicorp/nomad/helper" "github.com/hashicorp/nomad/nomad/state" "github.com/hashicorp/nomad/nomad/structs" "golang.org/x/time/rate" @@ -118,9 +117,6 @@ func (vw *volumeWatcher) watch() { defer vw.deleteFn() defer vw.Stop() - timer, stop := helper.NewSafeTimer(vw.quiescentTimeout) - defer stop() - for { select { // TODO(tgross): currently server->client RPC have no cancellation @@ -137,8 +133,7 @@ func (vw *volumeWatcher) watch() { return } vw.volumeReap(vol) - timer.Reset(vw.quiescentTimeout) - case <-timer.C: + case <-time.After(vw.quiescentTimeout): // Wait until the volume has "settled" before stopping this // goroutine so that we can handle the burst of updates around // freeing claims without having to spin it back up diff --git a/nomad/worker_test.go b/nomad/worker_test.go index 470c2135055..86f85841feb 100644 --- a/nomad/worker_test.go +++ b/nomad/worker_test.go @@ -15,7 +15,6 @@ import ( log "github.com/hashicorp/go-hclog" "github.com/hashicorp/go-memdb" "github.com/hashicorp/nomad/ci" - "github.com/hashicorp/nomad/helper" "github.com/hashicorp/nomad/helper/testlog" "github.com/hashicorp/nomad/helper/uuid" "github.com/hashicorp/nomad/nomad/mock" @@ -365,12 +364,10 @@ func TestWorker_runBackoff(t *testing.T) { // We expect to be paused for 10ms + 1ms but otherwise can't be all that // precise here because of concurrency. But checking coverage for this test // shows we've covered the logic - t1, cancelT1 := helper.NewSafeTimer(100 * time.Millisecond) - defer cancelT1() select { case <-doneCh: t.Fatal("returned early") - case <-t1.C: + case <-time.After(100 * time.Millisecond): } workerCancel() From 9e94559ae30e66b08e82ba19dd3cb91f3653b6de Mon Sep 17 00:00:00 2001 From: Michael Smithhisler Date: Tue, 28 Apr 2026 08:07:27 -0400 Subject: [PATCH 2/5] fix backoff function for decrypting --- helper/backoff.go | 26 +++++++++++--------------- nomad/encrypter_test.go | 12 +----------- 2 files changed, 12 insertions(+), 26 deletions(-) diff --git a/helper/backoff.go b/helper/backoff.go index 45eefa11ae6..5a763c3a2af 100644 --- a/helper/backoff.go +++ b/helper/backoff.go @@ -24,10 +24,7 @@ func Backoff(backoffBase time.Duration, backoffLimit time.Duration, attempt uint } // Compute deadline and clamp it to backoffLimit - deadline := 1 << attempt * backoffBase - if deadline > backoffLimit { - deadline = backoffLimit - } + deadline := min(1< maxBackoff { - backoff = maxBackoff - } + backoff = min(backoff*2+RandomStagger(minBackoff/10), maxBackoff) } } } diff --git a/nomad/encrypter_test.go b/nomad/encrypter_test.go index b44c44dfde4..7ceed18f175 100644 --- a/nomad/encrypter_test.go +++ b/nomad/encrypter_test.go @@ -1312,23 +1312,13 @@ func TestEncrypter_decryptWrappedKeyTask_contextCancel(t *testing.T) { respCh := make(chan *cipherSet, 1) - // Generate a context and immediately cancel it. - ctx, cancel := context.WithCancel(context.Background()) - cancel() - - // Ensure we receive an error indicating we hit the context done case and - // check no cipher response was sent. - err = encrypter.decryptWrappedKeyTask(ctx, kmsWrapper, key.Meta, wrappedKey, respCh) - must.ErrorContains(t, err, "operation cancelled") - must.Eq(t, 0, len(respCh)) - // Recreate the response channel so that it is no longer buffered. The // decrypt task should now block on attempting to send to it. respCh = make(chan *cipherSet) // Generate a new context and an error channel so we can gather the response // of decryptWrappedKeyTask running inside a goroutine. - ctx, cancel = context.WithCancel(context.Background()) + ctx, cancel := context.WithCancel(context.Background()) errorCh := make(chan error, 1) From 94afb8ac71f58d505a8ff785267c2bccc25d6fc1 Mon Sep 17 00:00:00 2001 From: Michael Smithhisler Date: Mon, 4 May 2026 15:19:57 -0400 Subject: [PATCH 3/5] fix some bad time.Afters --- client/allocrunner/checks_hook.go | 6 ++++-- client/allocrunner/csi_hook.go | 33 ++++++++++++------------------- client/drain.go | 27 +++++++++++++------------ client/heartbeatstop.go | 9 ++++++--- 4 files changed, 37 insertions(+), 38 deletions(-) diff --git a/client/allocrunner/checks_hook.go b/client/allocrunner/checks_hook.go index 6ad3c0a0f87..2a3ba5aead2 100644 --- a/client/allocrunner/checks_hook.go +++ b/client/allocrunner/checks_hook.go @@ -41,7 +41,7 @@ type observer struct { // start checking our check on its interval func (o *observer) start() { // compromise between immediate (too early) and waiting full interval (slow) - firstWait := o.check.Interval / 2 + wait := o.check.Interval / 2 for { select { @@ -51,12 +51,14 @@ func (o *observer) start() { return // time to execute the check - case <-time.After(firstWait): + case <-time.After(wait): query := checks.GetCheckQuery(o.check) result := o.checker.Do(o.ctx, o.qc, query) // and put the results into the store (already logged) _ = o.checkStore.Set(o.allocID, result) + + wait = o.check.Interval } } } diff --git a/client/allocrunner/csi_hook.go b/client/allocrunner/csi_hook.go index d6e529af395..d8f98062c10 100644 --- a/client/allocrunner/csi_hook.go +++ b/client/allocrunner/csi_hook.go @@ -395,26 +395,22 @@ func (c *csiHook) claimWithRetry(req *structs.CSIVolumeClaimRequest) (*structs.C var err error backoff := c.minBackoffInterval for { - select { - case <-ctx.Done(): - return nil, err - case <-time.After(backoff): - } - err = c.rpcClient.RPC("CSIVolume.Claim", req, &resp) if err == nil { break } - if !isRetryableClaimRPCError(err) { break } + select { + case <-ctx.Done(): + return nil, err + case <-time.After(backoff): + } + if backoff < c.maxBackoffInterval { - backoff = backoff * 2 - if backoff > c.maxBackoffInterval { - backoff = c.maxBackoffInterval - } + backoff = min(backoff*2, c.maxBackoffInterval) } c.logger.Debug( "volume could not be claimed because it is in use", "retry_in", backoff) @@ -497,22 +493,19 @@ func (c *csiHook) unmountWithRetry(result *volumePublishResult) error { var err error backoff := c.minBackoffInterval for { + err = c.unmountImpl(result) + if err == nil { + break + } + select { case <-ctx.Done(): return err case <-time.After(backoff): } - err = c.unmountImpl(result) - if err == nil { - break - } - if backoff < c.maxBackoffInterval { - backoff = backoff * 2 - if backoff > c.maxBackoffInterval { - backoff = c.maxBackoffInterval - } + backoff = min(backoff*2, c.maxBackoffInterval) } c.logger.Debug("volume could not be unmounted", "retry_in", backoff) } diff --git a/client/drain.go b/client/drain.go index a43d4e93ecf..da6f684d1e9 100644 --- a/client/drain.go +++ b/client/drain.go @@ -99,17 +99,18 @@ func (c *Client) pollServerForDrainStatus(ctx context.Context, interval time.Dur var statusResp structs.SingleNodeResponse for { + err := c.RPC("Node.GetNode", statusReq, &statusResp) + if err != nil { + return err + } + if statusResp.Node.DrainStrategy == nil { + return nil + } + select { case <-ctx.Done(): return ctx.Err() - case <-time.After(1 * time.Second): - err := c.RPC("Node.GetNode", statusReq, &statusResp) - if err != nil { - return err - } - if &statusResp != nil && statusResp.Node.DrainStrategy == nil { - return nil - } + case <-time.After(interval): } } } @@ -148,14 +149,14 @@ func (c *Client) pollLocalStatusForDrainStatus(ctx context.Context, } for { + if drainIsDone() { + return nil + } + select { case <-ctx.Done(): return ctx.Err() - case <-time.After(1 * time.Second): - if drainIsDone() { - return nil - } + case <-time.After(interval): } - } } diff --git a/client/heartbeatstop.go b/client/heartbeatstop.go index cc497183edb..e3b82a82dee 100644 --- a/client/heartbeatstop.go +++ b/client/heartbeatstop.go @@ -76,8 +76,7 @@ func (h *heartbeatStop) watch() { h.setLastOk(time.Now()) allocIntervals := map[string]time.Duration{} - timer := time.NewTimer(1 * time.Second) - timer.Stop() + var timer *time.Timer for { // we want to fire the ticker only once the shortest @@ -90,7 +89,11 @@ func (h *heartbeatStop) watch() { } } if interval != 0 { - timer.Reset(interval) + if timer == nil { + timer = time.NewTimer(interval) + } else { + timer.Reset(interval) + } } select { From 01cfdefbc6c8b0038a62433ffe10b70e3e45b3d4 Mon Sep 17 00:00:00 2001 From: Michael Smithhisler Date: Mon, 4 May 2026 15:35:01 -0400 Subject: [PATCH 4/5] fix nil timer --- client/heartbeatstop.go | 9 +++------ 1 file changed, 3 insertions(+), 6 deletions(-) diff --git a/client/heartbeatstop.go b/client/heartbeatstop.go index e3b82a82dee..544f8a8b9f8 100644 --- a/client/heartbeatstop.go +++ b/client/heartbeatstop.go @@ -76,7 +76,8 @@ func (h *heartbeatStop) watch() { h.setLastOk(time.Now()) allocIntervals := map[string]time.Duration{} - var timer *time.Timer + timer := time.NewTimer(100) + timer.Stop() for { // we want to fire the ticker only once the shortest @@ -89,11 +90,7 @@ func (h *heartbeatStop) watch() { } } if interval != 0 { - if timer == nil { - timer = time.NewTimer(interval) - } else { - timer.Reset(interval) - } + timer.Reset(interval) } select { From 70c21ed64943a37387385d70ce178dddef2dfe13 Mon Sep 17 00:00:00 2001 From: Michael Smithhisler Date: Mon, 4 May 2026 16:00:35 -0400 Subject: [PATCH 5/5] fix poll frequency --- client/serviceregistration/watcher.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/client/serviceregistration/watcher.go b/client/serviceregistration/watcher.go index 522f4754b77..4e526521ce9 100644 --- a/client/serviceregistration/watcher.go +++ b/client/serviceregistration/watcher.go @@ -244,7 +244,7 @@ func (w *UniversalCheckWatcher) Run(ctx context.Context) { w.logger.Trace("now watching check", "alloc_i", allocID, "task", taskName, "check", checkName) // poll time; refresh check statuses - case now := <-time.After(1 * time.Second): + case now := <-time.After(w.pollFrequency): if len(watched) == 0 { continue }