Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 1 addition & 7 deletions client/allochealth/tracker.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@ import (
"github.com/hashicorp/nomad/client/serviceregistration/checks/checkstore"
cstructs "github.com/hashicorp/nomad/client/structs"
"github.com/hashicorp/nomad/client/taskenv"
"github.com/hashicorp/nomad/helper"
"github.com/hashicorp/nomad/nomad/structs"
)

Expand Down Expand Up @@ -626,10 +625,6 @@ func evaluateConsulChecks(services []*structs.Service, registrations *servicereg
//
// Does not watch Consul service checks; see watchConsulEvents for those.
func (t *Tracker) watchNomadEvents() {
// checkTicker is the ticker that triggers us to look at the checks in Nomad
checkTicker, cancel := helper.NewSafeTimer(t.checkLookupInterval)
defer cancel()

// waiter is used to fire when the checks have been healthy for the MinHealthyTime
waiter := newHealthyFuture()

Expand All @@ -650,9 +645,8 @@ func (t *Tracker) watchNomadEvents() {
return

// it is time to check the checks
case <-checkTicker.C:
case <-time.After(t.checkLookupInterval):
results = t.checkStore.List(allocID)
checkTicker.Reset(t.checkLookupInterval)

// enough time has passed with healthy checks
case <-waiter.C():
Expand Down
11 changes: 3 additions & 8 deletions client/allocrunner/checks_hook.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@ import (
"github.com/hashicorp/nomad/client/serviceregistration/checks"
"github.com/hashicorp/nomad/client/serviceregistration/checks/checkstore"
"github.com/hashicorp/nomad/client/taskenv"
"github.com/hashicorp/nomad/helper"
"github.com/hashicorp/nomad/nomad/structs"
)

Expand Down Expand Up @@ -42,10 +41,7 @@ type observer struct {
// start checking our check on its interval
func (o *observer) start() {
// compromise between immediate (too early) and waiting full interval (slow)
firstWait := o.check.Interval / 2

timer, cancel := helper.NewSafeTimer(firstWait)
defer cancel()
wait := o.check.Interval / 2

for {
select {
Expand All @@ -55,15 +51,14 @@ func (o *observer) start() {
return

// time to execute the check
case <-timer.C:
case <-time.After(wait):
query := checks.GetCheckQuery(o.check)
result := o.checker.Do(o.ctx, o.qc, query)

// and put the results into the store (already logged)
_ = o.checkStore.Set(o.allocID, result)

// setup timer for next interval
timer.Reset(o.check.Interval)
wait = o.check.Interval
}
}
}
Expand Down
41 changes: 14 additions & 27 deletions client/allocrunner/csi_hook.go
Original file line number Diff line number Diff line change
Expand Up @@ -394,33 +394,26 @@ func (c *csiHook) claimWithRetry(req *structs.CSIVolumeClaimRequest) (*structs.C
var resp structs.CSIVolumeClaimResponse
var err error
backoff := c.minBackoffInterval
t, stop := helper.NewSafeTimer(0)
defer stop()
for {
select {
case <-ctx.Done():
return nil, err
case <-t.C:
}

err = c.rpcClient.RPC("CSIVolume.Claim", req, &resp)
if err == nil {
break
}

if !isRetryableClaimRPCError(err) {
break
}

select {
case <-ctx.Done():
return nil, err
case <-time.After(backoff):
}

if backoff < c.maxBackoffInterval {
backoff = backoff * 2
if backoff > c.maxBackoffInterval {
backoff = c.maxBackoffInterval
}
backoff = min(backoff*2, c.maxBackoffInterval)
}
c.logger.Debug(
"volume could not be claimed because it is in use", "retry_in", backoff)
t.Reset(backoff)
}
return &resp, err
}
Expand Down Expand Up @@ -499,28 +492,22 @@ func (c *csiHook) unmountWithRetry(result *volumePublishResult) error {
defer cancel()
var err error
backoff := c.minBackoffInterval
t, stop := helper.NewSafeTimer(0)
defer stop()
for {
select {
case <-ctx.Done():
return err
case <-t.C:
}

err = c.unmountImpl(result)
if err == nil {
break
}

select {
case <-ctx.Done():
return err
case <-time.After(backoff):
Comment thread
mismithhisler marked this conversation as resolved.
}

if backoff < c.maxBackoffInterval {
backoff = backoff * 2
if backoff > c.maxBackoffInterval {
backoff = c.maxBackoffInterval
}
backoff = min(backoff*2, c.maxBackoffInterval)
}
c.logger.Debug("volume could not be unmounted", "retry_in", backoff)
t.Reset(backoff)
}
return nil
}
Expand Down
5 changes: 1 addition & 4 deletions client/allocrunner/group_service_hook.go
Original file line number Diff line number Diff line change
Expand Up @@ -248,13 +248,10 @@ func (h *groupServiceHook) preKillLocked() {

h.logger.Debug("delay before killing tasks", "group", h.group, "shutdown_delay", h.delay)

timer, cancel := helper.NewSafeTimer(h.delay)
defer cancel()

select {
// Wait for specified shutdown_delay unless ignored
// This will block an agent from shutting down.
case <-timer.C:
case <-time.After(h.delay):
case <-h.shutdownDelayCtx.Done():
}
}
Expand Down
8 changes: 2 additions & 6 deletions client/allocrunner/tasklifecycle/gate_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ import (
"time"

"github.com/hashicorp/nomad/ci"
"github.com/hashicorp/nomad/helper"
)

func TestGate(t *testing.T) {
Expand Down Expand Up @@ -101,18 +100,15 @@ func TestGate_shutdown(t *testing.T) {
close(closeCh)
}()

timer, stop := helper.NewSafeTimer(time.Second)
defer stop()

select {
case <-openCh:
case <-timer.C:
case <-time.After(1 * time.Second):
t.Fatalf("timeout waiting for gate operations")
}

select {
case <-closeCh:
case <-timer.C:
case <-time.After(1 * time.Second):
t.Fatalf("timeout waiting for gate operations")
}

Expand Down
9 changes: 1 addition & 8 deletions client/allocrunner/taskrunner/task_runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,6 @@ import (
"github.com/hashicorp/nomad/client/taskenv"
"github.com/hashicorp/nomad/client/vaultclient"
"github.com/hashicorp/nomad/client/widmgr"
"github.com/hashicorp/nomad/helper"
"github.com/hashicorp/nomad/helper/pluginutils/hclspecutils"
"github.com/hashicorp/nomad/helper/pluginutils/hclutils"
"github.com/hashicorp/nomad/helper/users/dynamic"
Expand Down Expand Up @@ -622,10 +621,6 @@ func (tr *TaskRunner) Run() {
// Set the initial task state.
tr.stateUpdater.TaskStateUpdated()

// start with a stopped timer; actual restart delay computed later
timer, stop := helper.NewStoppedTimer()
defer stop()

MAIN:
for !tr.shouldShutdown() {
if dead {
Expand Down Expand Up @@ -730,11 +725,9 @@ MAIN:
break MAIN
}

timer.Reset(restartDelay)

// Actually restart by sleeping and also watching for destroy events
select {
case <-timer.C:
case <-time.After(restartDelay):
case <-tr.killCtx.Done():
tr.logger.Trace("task killed between restarts", "delay", restartDelay)
break MAIN
Expand Down
6 changes: 1 addition & 5 deletions client/allocrunner/taskrunner/task_runner_linux_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,6 @@ import (
cstate "github.com/hashicorp/nomad/client/state"
cstructs "github.com/hashicorp/nomad/client/structs"
"github.com/hashicorp/nomad/client/taskenv"
"github.com/hashicorp/nomad/helper"
structsc "github.com/hashicorp/nomad/nomad/structs/config"

ctestutil "github.com/hashicorp/nomad/client/testutil"
Expand Down Expand Up @@ -2340,11 +2339,8 @@ func TestTaskRunner_TemplateWorkloadIdentity(t *testing.T) {
doneCh := make(chan struct{})
t.Cleanup(func() { close(doneCh) })

timer, stop := helper.NewSafeTimer(wait)
t.Cleanup(stop)

select {
case <-timer.C:
case <-time.After(wait):
case <-doneCh:
return
}
Expand Down
6 changes: 1 addition & 5 deletions client/allocrunner/taskrunner/vault_hook.go
Original file line number Diff line number Diff line change
Expand Up @@ -366,9 +366,6 @@ func (h *vaultHook) deriveVaultToken() (string, int, bool) {
var attempts uint64
var backoff time.Duration

timer, stopTimer := helper.NewSafeTimer(0)
defer stopTimer()

for {
token, lease, err := h.deriveVaultTokenJWT()
if err == nil {
Expand All @@ -387,7 +384,6 @@ func (h *vaultHook) deriveVaultToken() (string, int, bool) {

// Handle the retry case
backoff = helper.Backoff(vaultBackoffBaseline, vaultBackoffLimit, attempts)
timer.Reset(backoff)
attempts++

h.logger.Error("failed to derive Vault token", "error", err, "recoverable", true, "backoff", backoff)
Expand All @@ -396,7 +392,7 @@ func (h *vaultHook) deriveVaultToken() (string, int, bool) {
select {
case <-h.ctx.Done():
return "", 0, true
case <-timer.C:
case <-time.After(backoff):
}
}
}
Expand Down
8 changes: 2 additions & 6 deletions client/allocwatcher/alloc_watcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -376,31 +376,27 @@ func (p *remotePrevAlloc) Wait(ctx context.Context) error {
err := p.rpc.RPC("Alloc.GetAlloc", &req, &resp)
if err != nil {
retry := getRemoteRetryIntv + helper.RandomStagger(getRemoteRetryIntv)
timer, stop := helper.NewSafeTimer(retry)
p.logger.Error("error querying previous alloc", "error", err, "wait", retry)
select {
case <-timer.C:
case <-time.After(retry):
continue
case <-ctx.Done():
stop()
return ctx.Err()
}
}

// Ensure that we didn't receive a stale response
if req.AllowStale && resp.Index < req.MinQueryIndex {
retry := getRemoteRetryIntv + helper.RandomStagger(getRemoteRetryIntv)
timer, stop := helper.NewSafeTimer(retry)
p.logger.Warn("received stale alloc; retrying",
"req_index", req.MinQueryIndex,
"resp_index", resp.Index,
"wait", retry,
)
select {
case <-timer.C:
case <-time.After(retry):
continue
case <-ctx.Done():
stop()
return ctx.Err()
}
}
Expand Down
4 changes: 1 addition & 3 deletions client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -2648,18 +2648,16 @@ OUTER:
// Node.GetClientAllocs which returns older results.
if allocsResp.Index <= allocsReq.MinQueryIndex {
retry := c.retryIntv(getAllocRetryIntv)
timer, stop := helper.NewSafeTimer(retry)
c.logger.Warn("failed to retrieve updated allocs; retrying",
"req_index", allocsReq.MinQueryIndex,
"resp_index", allocsResp.Index,
"num_allocs", len(pull),
"wait", retry,
)
select {
case <-timer.C:
case <-time.After(retry):
continue
case <-c.shutdownCh:
stop()
return
}
}
Expand Down
6 changes: 1 addition & 5 deletions client/consul/consul.go
Original file line number Diff line number Diff line change
Expand Up @@ -168,9 +168,6 @@ func (c *consulClient) RevokeTokens(tokens []*consulapi.ACLToken) error {
// TokenPreflightCheck verifies that a token has been replicated before we
// try to use it for registering services or bootstrapping Envoy
func (c *consulClient) TokenPreflightCheck(pctx context.Context, t *consulapi.ACLToken) error {
timer, timerStop := helper.NewStoppedTimer()
defer timerStop()

var retry uint64
var err error
ctx, cancel := context.WithTimeout(pctx, c.preflightCheckTimeout)
Expand All @@ -191,11 +188,10 @@ func (c *consulClient) TokenPreflightCheck(pctx context.Context, t *consulapi.AC
backoff := helper.Backoff(
c.preflightCheckBaseInterval, c.preflightCheckBaseInterval*2, retry)
c.logger.Trace("Consul token not ready", "error", err, "backoff", backoff)
timer.Reset(backoff)
select {
case <-ctx.Done():
return err
case <-timer.C:
case <-time.After(backoff):
continue
}
}
Expand Down
Loading
Loading