diff --git a/.github/workflows/ltb.yaml b/.github/workflows/ltb.yaml index 3ace630c..ebd78c3e 100644 --- a/.github/workflows/ltb.yaml +++ b/.github/workflows/ltb.yaml @@ -32,8 +32,7 @@ jobs: fail-fast: false max-parallel: 4 matrix: - #os: [ubuntu-latest, windows-latest, macos-latest] - os: [ubuntu-latest, macos-latest] + os: [ubuntu-latest] runs-on: ${{ matrix.os }} timeout-minutes: 10 steps: diff --git a/_test/helpers.go b/_test/helpers.go index 67e8b24f..841288c3 100644 --- a/_test/helpers.go +++ b/_test/helpers.go @@ -8,6 +8,7 @@ import ( "os" "strconv" "testing" + "time" "github.com/carlmjohnson/be" "github.com/nats-io/nats-server/v2/server" @@ -63,6 +64,9 @@ func StartNatsServer(t testing.TB, workDir string) *server.Server { }) server.Start() + if !server.ReadyForConnections(5 * time.Second) { + t.Fatal("nats server failed to start") + } return server } @@ -131,10 +135,22 @@ func StartNexus(t testing.TB, ctx context.Context, natsUrl string, size int, sta be.NilErr(t, err) be.NilErr(t, node.Start()) - for !node.IsReady() { - } + be.NilErr(t, node.IsReady(30*time.Second)) ret[i] = node } return ret } + +// WaitFor polls condition every 25ms until it returns true or timeout is reached. +func WaitFor(t testing.TB, timeout time.Duration, condition func() bool, msg string) { + t.Helper() + deadline := time.Now().Add(timeout) + for time.Now().Before(deadline) { + if condition() { + return + } + time.Sleep(25 * time.Millisecond) + } + t.Fatalf("timed out waiting: %s", msg) +} diff --git a/_test/nexlet_inmem/inmem_test.go b/_test/nexlet_inmem/inmem_test.go index cad89ba3..ab44239d 100644 --- a/_test/nexlet_inmem/inmem_test.go +++ b/_test/nexlet_inmem/inmem_test.go @@ -115,6 +115,9 @@ func TestFunctionStartWorkloadAndTrigger(t *testing.T) { StoreDir: t.TempDir(), }) server.Start() + if !server.ReadyForConnections(5 * time.Second) { + t.Fatal("nats server failed to start") + } defer server.Shutdown() nc, err := nats.Connect(server.ClientURL()) @@ -142,17 +145,24 @@ func TestFunctionStartWorkloadAndTrigger(t *testing.T) { ) be.NilErr(t, err) be.NilErr(t, nn.Start()) - for !nn.IsReady() { - } + be.NilErr(t, nn.IsReady(10*time.Second)) - ctx, cancel := context.WithTimeout(t.Context(), time.Second*3) + ctx, cancel := context.WithTimeout(t.Context(), time.Second*30) defer cancel() nClient, err := client.NewClient(ctx, nc, models.SystemNamespace) be.NilErr(t, err) be.Nonzero(t, nClient) - aR, err := nClient.Auction("inmem", nil) + var aR []*models.AuctionResponse + deadline := time.Now().Add(10 * time.Second) + for time.Now().Before(deadline) { + aR, err = nClient.Auction("inmem", nil) + if err == nil && len(aR) == 1 { + break + } + time.Sleep(100 * time.Millisecond) + } be.NilErr(t, err) be.Equal(t, 1, len(aR)) diff --git a/agents/native/state_test.go b/agents/native/state_test.go index 91a5b48d..9b0d7780 100644 --- a/agents/native/state_test.go +++ b/agents/native/state_test.go @@ -17,6 +17,18 @@ import ( "github.com/synadia-io/nex/models" ) +func waitFor(t testing.TB, timeout time.Duration, condition func() bool, msg string) { + t.Helper() + deadline := time.Now().Add(timeout) + for time.Now().Before(deadline) { + if condition() { + return + } + time.Sleep(25 * time.Millisecond) + } + t.Fatalf("timed out waiting: %s", msg) +} + func MockRunner(t testing.TB) (*agent.Runner, error) { t.Helper() @@ -116,7 +128,9 @@ func TestNexletState(t *testing.T) { err = ns.RemoveWorkload("derp", "abc123") be.NilErr(t, err) - time.Sleep(300 * time.Millisecond) + waitFor(t, 5*time.Second, func() bool { + return ns.WorkloadCount() == 0 + }, "waiting for workload removal") be.Equal(t, 1, ns.NamespaceCount()) be.Equal(t, 0, ns.WorkloadCount()) @@ -131,7 +145,9 @@ func TestNexletState(t *testing.T) { err = ns.SetLameduckMode(time.Second) be.NilErr(t, err) - time.Sleep(2000 * time.Millisecond) + waitFor(t, 10*time.Second, func() bool { + return ns.NamespaceCount() == 0 && ns.WorkloadCount() == 0 + }, "waiting for lameduck cleanup") be.Equal(t, 0, ns.NamespaceCount()) be.Equal(t, 0, ns.WorkloadCount()) }) diff --git a/client/client_test.go b/client/client_test.go index fb4f2eb7..aea70121 100644 --- a/client/client_test.go +++ b/client/client_test.go @@ -13,12 +13,7 @@ import ( func TestNewNexClient(t *testing.T) { server := _test.StartNatsServer(t, t.TempDir()) - defer func() { - for server.NumClients() == 0 { - server.Shutdown() - return - } - }() + defer server.Shutdown() nc, err := nats.Connect(server.ClientURL()) be.NilErr(t, err) @@ -35,12 +30,7 @@ func TestNewNexClient(t *testing.T) { func TestNewNexClientWithOptions(t *testing.T) { server := _test.StartNatsServer(t, t.TempDir()) - defer func() { - for server.NumClients() == 0 { - server.Shutdown() - return - } - }() + defer server.Shutdown() nc, err := nats.Connect(server.ClientURL()) be.NilErr(t, err) @@ -67,12 +57,7 @@ func TestNewNexClientWithOptions(t *testing.T) { func TestNexClient_User(t *testing.T) { workDir := t.TempDir() server := _test.StartNatsServer(t, workDir) - defer func() { - for server.NumClients() == 0 { - server.Shutdown() - return - } - }() + defer server.Shutdown() ctx, cancel := context.WithCancel(t.Context()) defer cancel() @@ -88,9 +73,11 @@ func TestNexClient_User(t *testing.T) { be.NilErr(t, err) be.Nonzero(t, client) - ar, err := client.Auction("inmem", map[string]string{}) - be.NilErr(t, err) - be.Equal(t, 1, len(ar)) + var ar []*models.AuctionResponse + _test.WaitFor(t, 10*time.Second, func() bool { + ar, err = client.Auction("inmem", map[string]string{}) + return err == nil && len(ar) == 1 + }, "waiting for auction to return 1 result") sr, err := client.StartWorkload(ar[0].BidderId, "tester", "My test workload", "{}", "inmem", models.WorkloadLifecycleService, nil) be.NilErr(t, err) @@ -122,12 +109,7 @@ func TestNexClient_System(t *testing.T) { t.Parallel() workDir := t.TempDir() server := _test.StartNatsServer(t, workDir) - defer func() { - for server.NumClients() == 0 { - server.Shutdown() - return - } - }() + defer server.Shutdown() nc, err := nats.Connect(server.ClientURL()) be.NilErr(t, err) @@ -159,10 +141,11 @@ func TestNexClient_System(t *testing.T) { ldr, err := client.SetLameduck(_test.Node1Pub, 0, nil) be.NilErr(t, err) be.True(t, ldr.Success) - time.Sleep(250 * time.Millisecond) - nodes, err = client.ListNodes(nil) - be.NilErr(t, err) + _test.WaitFor(t, 10*time.Second, func() bool { + nodes, err = client.ListNodes(nil) + return err == nil && len(nodes) == tt.size-1 + }, "waiting for lameduck node to disappear from list") be.Equal(t, tt.size-1, len(nodes)) for _, node := range nexNodes { @@ -175,12 +158,7 @@ func TestNexClient_System(t *testing.T) { func TestNexClient_SystemAsUser(t *testing.T) { workDir := t.TempDir() server := _test.StartNatsServer(t, workDir) - defer func() { - for server.NumClients() == 0 { - server.Shutdown() - return - } - }() + defer server.Shutdown() nc, err := nats.Connect(server.ClientURL()) be.NilErr(t, err) @@ -228,12 +206,7 @@ func TestNexClient_ListWorkloads(t *testing.T) { t.Parallel() workDir := t.TempDir() server := _test.StartNatsServer(t, workDir) - defer func() { - for server.NumClients() == 0 { - server.Shutdown() - return - } - }() + defer server.Shutdown() ctx, cancel := context.WithCancel(context.Background()) defer cancel() @@ -249,23 +222,27 @@ func TestNexClient_ListWorkloads(t *testing.T) { be.NilErr(t, err) be.Nonzero(t, client) - ar, err := client.Auction("inmem", map[string]string{}) - be.NilErr(t, err) - be.Equal(t, 1, len(ar)) + var ar []*models.AuctionResponse + _test.WaitFor(t, 10*time.Second, func() bool { + ar, err = client.Auction("inmem", map[string]string{}) + return err == nil && len(ar) == 1 + }, "waiting for auction to return 1 result") _, err = client.StartWorkload(ar[0].BidderId, "tester1", "My test workload", "{}", "inmem", models.WorkloadLifecycleService, nil) be.NilErr(t, err) - ar, err = client.Auction("inmem", map[string]string{}) - be.NilErr(t, err) - be.Equal(t, 1, len(ar)) + _test.WaitFor(t, 10*time.Second, func() bool { + ar, err = client.Auction("inmem", map[string]string{}) + return err == nil && len(ar) == 1 + }, "waiting for auction to return 1 result") _, err = client.StartWorkload(ar[0].BidderId, "tester2", "My test workload", "{}", "inmem", models.WorkloadLifecycleService, nil) be.NilErr(t, err) - ar, err = client.Auction("inmem", map[string]string{}) - be.NilErr(t, err) - be.Equal(t, 1, len(ar)) + _test.WaitFor(t, 10*time.Second, func() bool { + ar, err = client.Auction("inmem", map[string]string{}) + return err == nil && len(ar) == 1 + }, "waiting for auction to return 1 result") _, err = client.StartWorkload(ar[0].BidderId, "tester3", "My test workload", "{}", "inmem", models.WorkloadLifecycleService, nil) be.NilErr(t, err) @@ -285,11 +262,7 @@ func TestNexClient_ListWorkloads(t *testing.T) { func TestNexClient_List_NoNodes(t *testing.T) { server := _test.StartNatsServer(t, t.TempDir()) - defer func() { - for server.NumClients() == 0 { - server.Shutdown() - } - }() + defer server.Shutdown() nc, err := nats.Connect(server.ClientURL()) be.NilErr(t, err) @@ -336,12 +309,7 @@ func TestNexClient_CloneWorkload(t *testing.T) { t.Parallel() workDir := t.TempDir() server := _test.StartNatsServer(t, workDir) - defer func() { - for server.NumClients() == 0 { - server.Shutdown() - return - } - }() + defer server.Shutdown() nc, err := nats.Connect(server.ClientURL()) be.NilErr(t, err) @@ -361,9 +329,11 @@ func TestNexClient_CloneWorkload(t *testing.T) { be.NilErr(t, err) be.Nonzero(t, client) - ar, err := client.Auction("inmem", map[string]string{}) - be.NilErr(t, err) - be.Equal(t, tt.size, len(ar)) + var ar []*models.AuctionResponse + _test.WaitFor(t, 10*time.Second, func() bool { + ar, err = client.Auction("inmem", map[string]string{}) + return err == nil && len(ar) == tt.size + }, "waiting for auction to return expected results") swr, err := client.StartWorkload(ar[0].BidderId, "tester1", "My test workload", "{}", "inmem", models.WorkloadLifecycleService, nil) be.NilErr(t, err) @@ -371,16 +341,18 @@ func TestNexClient_CloneWorkload(t *testing.T) { _, err = client.CloneWorkload(swr.Id, nil) be.NilErr(t, err) - time.Sleep(250 * time.Millisecond) - - wl, err := client.ListWorkloads(nil) - be.NilErr(t, err) - - totalCount := 0 - for _, w := range wl { - totalCount += len(*w) - } - + var totalCount int + _test.WaitFor(t, 10*time.Second, func() bool { + wl, wlErr := client.ListWorkloads(nil) + if wlErr != nil { + return false + } + totalCount = 0 + for _, w := range wl { + totalCount += len(*w) + } + return totalCount == 2 + }, "waiting for cloned workload to appear") be.Equal(t, 2, totalCount) for _, node := range nexNodes { @@ -405,12 +377,7 @@ func TestNexClient_GetNexusPTags(t *testing.T) { t.Parallel() workDir := t.TempDir() server := _test.StartNatsServer(t, workDir) - defer func() { - for server.NumClients() == 0 { - server.Shutdown() - return - } - }() + defer server.Shutdown() nc, err := nats.Connect(server.ClientURL()) be.NilErr(t, err) @@ -443,11 +410,7 @@ func TestNexClient_GetNexusPTags(t *testing.T) { func TestNexClient_GetNexusPTags_NoNodes(t *testing.T) { server := _test.StartNatsServer(t, t.TempDir()) - defer func() { - for server.NumClients() == 0 { - server.Shutdown() - } - }() + defer server.Shutdown() nc, err := nats.Connect(server.ClientURL()) be.NilErr(t, err) @@ -477,12 +440,7 @@ func TestNexClient_StopWorkloadDNE(t *testing.T) { t.Parallel() workDir := t.TempDir() server := _test.StartNatsServer(t, workDir) - defer func() { - for server.NumClients() == 0 { - server.Shutdown() - return - } - }() + defer server.Shutdown() nc, err := nats.Connect(server.ClientURL()) be.NilErr(t, err) @@ -532,12 +490,7 @@ func TestNexClient_StopWorkload(t *testing.T) { t.Parallel() workDir := t.TempDir() server := _test.StartNatsServer(t, workDir) - defer func() { - for server.NumClients() == 0 { - server.Shutdown() - return - } - }() + defer server.Shutdown() nc, err := nats.Connect(server.ClientURL()) be.NilErr(t, err) @@ -557,14 +510,26 @@ func TestNexClient_StopWorkload(t *testing.T) { be.NilErr(t, err) be.Nonzero(t, client) - ar, err := client.Auction("inmem", map[string]string{}) - be.NilErr(t, err) - be.Equal(t, tt.size, len(ar)) + var ar []*models.AuctionResponse + _test.WaitFor(t, 10*time.Second, func() bool { + ar, err = client.Auction("inmem", map[string]string{}) + return err == nil && len(ar) == tt.size + }, "waiting for auction to return expected results") swr, err := client.StartWorkload(ar[0].BidderId, "tester1", "My test workload", "{}", "inmem", models.WorkloadLifecycleService, nil) be.NilErr(t, err) - time.Sleep(250 * time.Millisecond) + _test.WaitFor(t, 10*time.Second, func() bool { + wl, wlErr := client.ListWorkloads(nil) + if wlErr != nil { + return false + } + totalCount := 0 + for _, w := range wl { + totalCount += len(*w) + } + return totalCount == 1 + }, "waiting for workload to be running") str, err := client.StopWorkload(swr.Id) be.NilErr(t, err) diff --git a/cmd/nex/logger_test.go b/cmd/nex/logger_test.go index c6bb2f59..2d148bc5 100644 --- a/cmd/nex/logger_test.go +++ b/cmd/nex/logger_test.go @@ -29,17 +29,15 @@ func startNatsServer(t testing.TB) *server.Server { be.NilErr(t, err) s.Start() + if !s.ReadyForConnections(5 * time.Second) { + t.Fatal("nats server failed to start") + } return s } func TestConfigureLogger(t *testing.T) { s := startNatsServer(t) - defer func() { - for s.NumClients() == 0 { - s.Shutdown() - return - } - }() + defer s.Shutdown() today := time.Now().Format("2006-01-02") diff --git a/cmd/nex/main_test.go b/cmd/nex/main_test.go index bec75391..8b9a3416 100644 --- a/cmd/nex/main_test.go +++ b/cmd/nex/main_test.go @@ -52,7 +52,7 @@ func TestCLIWithConfig(t *testing.T) { be.NilErr(t, err) _, _ = f.WriteString(config) - defer f.Close() + defer func() { be.NilErr(t, f.Close()) }() nex := NexCLI{} parser := kong.Must(&nex, diff --git a/cmd/nex/node.go b/cmd/nex/node.go index 906ca034..b7f9f923 100644 --- a/cmd/nex/node.go +++ b/cmd/nex/node.go @@ -307,9 +307,8 @@ func (u Up) Run(ctx context.Context, globals *Globals) error { return err } - for nex.IsReady() { - time.Sleep(100 * time.Millisecond) - break + if err := nex.IsReady(10 * time.Second); err != nil { + return err } quit := make(chan os.Signal, 1) diff --git a/cmd/nex/node_test.go b/cmd/nex/node_test.go index 747a2f2b..af5e41b8 100644 --- a/cmd/nex/node_test.go +++ b/cmd/nex/node_test.go @@ -75,12 +75,7 @@ func TestNodeCommandDefaults(t *testing.T) { func TestNodeUp(t *testing.T) { s := startNatsServer(t) - defer func() { - for s.NumClients() == 0 { - s.Shutdown() - return - } - }() + defer s.Shutdown() nex := NexCLI{ Globals: Globals{ @@ -112,7 +107,6 @@ func TestNodeUp(t *testing.T) { stdout := captureOutput(t, func() { err := nex.Node.Up.Run(ctx, &nex.Globals) be.NilErr(t, err) - time.Sleep(500 * time.Millisecond) }) be.True(t, strings.Contains(stdout, fmt.Sprintf("[INFO] Starting nex node version=0.0.0 commit=development build_date=unknown node_id=%s name=testnode nexus=testnexus nats_server=%s start_time=", TestServerPublicKey, s.ClientURL()))) @@ -122,12 +116,7 @@ func TestNodeUp(t *testing.T) { func TestNodeList(t *testing.T) { s := startNatsServer(t) - defer func() { - for s.NumClients() == 0 { - s.Shutdown() - return - } - }() + defer s.Shutdown() ctx, cancel := context.WithCancel(context.Background()) defer cancel() @@ -147,7 +136,6 @@ func TestNodeList(t *testing.T) { stdout := captureOutput(t, func() { err := nex.Node.List.Run(ctx, &nex.Globals) be.NilErr(t, err) - time.Sleep(500 * time.Millisecond) }) resp := []*models.NodePingResponse{} @@ -180,15 +168,13 @@ func captureOutput(t testing.TB, f func()) string { f() - w.Close() + be.NilErr(t, w.Close()) os.Stdout = origStdout var buf bytes.Buffer _, err = io.Copy(&buf, r) - if err != nil { - t.Fatal("failed to read from pipe: " + err.Error()) - } - r.Close() + be.NilErr(t, err) + be.NilErr(t, r.Close()) return buf.String() } diff --git a/cmd/nex/workloads.go b/cmd/nex/workloads.go index f45d2f15..7c3dcebd 100644 --- a/cmd/nex/workloads.go +++ b/cmd/nex/workloads.go @@ -90,7 +90,7 @@ func (r *StartWorkload) Run(ctx context.Context, globals *Globals) error { var nexfile models.Nexfile if r.WorkloadNexfile != nil { - defer r.WorkloadNexfile.Close() + defer func() { _ = r.WorkloadNexfile.Close() }() data, err := io.ReadAll(r.WorkloadNexfile) if err != nil { diff --git a/internal/agent_registration.go b/internal/agent_registration.go index b885f8cb..efa61cea 100644 --- a/internal/agent_registration.go +++ b/internal/agent_registration.go @@ -247,6 +247,7 @@ func (ar *AgentRegistrations) startHealthMonitor() { ar.rwLock.Unlock() return case <-ticker.C: + ar.rwLock.RLock() for _, reg := range ar.Registrations { reg.rwLock.Lock() switch { @@ -257,6 +258,7 @@ func (ar *AgentRegistrations) startHealthMonitor() { } reg.rwLock.Unlock() } + ar.rwLock.RUnlock() } } } diff --git a/internal/state/nats_kv_test.go b/internal/state/nats_kv_test.go index e3f85db4..afb6f4f5 100644 --- a/internal/state/nats_kv_test.go +++ b/internal/state/nats_kv_test.go @@ -3,6 +3,7 @@ package state import ( "context" "testing" + "time" "github.com/carlmjohnson/be" "github.com/nats-io/nats-server/v2/server" @@ -21,6 +22,9 @@ func startNatsServer(t testing.TB, workDir string) *server.Server { }) server.Start() + if !server.ReadyForConnections(5 * time.Second) { + t.Fatal("nats server failed to start") + } return server } diff --git a/internal/ttlmap_test.go b/internal/ttlmap_test.go index c01e2f92..c038059b 100644 --- a/internal/ttlmap_test.go +++ b/internal/ttlmap_test.go @@ -7,13 +7,28 @@ import ( "github.com/carlmjohnson/be" ) +func waitFor(t testing.TB, timeout time.Duration, condition func() bool, msg string) { + t.Helper() + deadline := time.Now().Add(timeout) + for time.Now().Before(deadline) { + if condition() { + return + } + time.Sleep(25 * time.Millisecond) + } + t.Fatalf("timed out waiting: %s", msg) +} + func TestTTLMap(t *testing.T) { m := NewTTLMap(time.Second) m.Put("key", "value", nil) be.True(t, m.Exists("key")) v, _ := m.Get("key") be.Equal(t, "value", v) - time.Sleep(time.Second * 2) + waitFor(t, 5*time.Second, func() bool { + v, _ := m.Get("key") + return v == "" + }, "waiting for TTL expiry") v, _ = m.Get("key") be.Equal(t, "", v) } diff --git a/internal/watcher_test.go b/internal/watcher_test.go index 82c2a733..96f8b7c2 100644 --- a/internal/watcher_test.go +++ b/internal/watcher_test.go @@ -8,6 +8,7 @@ import ( "strings" "sync" "testing" + "time" "github.com/carlmjohnson/be" "github.com/nats-io/nats-server/v2/server" @@ -32,6 +33,9 @@ func startNatsServer(t testing.TB, workDir string) *server.Server { }) server.Start() + if !server.ReadyForConnections(5 * time.Second) { + t.Fatal("nats server failed to start") + } return server } diff --git a/node.go b/node.go index 21f6ad8c..66ce3707 100644 --- a/node.go +++ b/node.go @@ -325,8 +325,15 @@ func (n *NexNode) Start() error { return nil } -func (n *NexNode) IsReady() bool { - return n.nodeState == models.NodeStateRunning +func (n *NexNode) IsReady(timeout time.Duration) error { + deadline := time.Now().Add(timeout) + for time.Now().Before(deadline) { + if n.nodeState == models.NodeStateRunning { + return nil + } + time.Sleep(25 * time.Millisecond) + } + return fmt.Errorf("node not ready after %s", timeout) } func (n *NexNode) Shutdown() error { diff --git a/node_test.go b/node_test.go index 17c07698..0b8ffb8f 100644 --- a/node_test.go +++ b/node_test.go @@ -36,6 +36,9 @@ func startNatsServer(t testing.TB) *server.Server { be.NilErr(t, err) s.Start() + if !s.ReadyForConnections(5 * time.Second) { + t.Fatal("nats server failed to start") + } return s } @@ -97,11 +100,7 @@ func TestNodeOptions(t *testing.T) { func TestNodeStartStop(t *testing.T) { s := startNatsServer(t) - defer func() { - for s.NumClients() == 0 { - s.Shutdown() - } - }() + defer s.Shutdown() nc, err := nats.Connect(s.ClientURL()) be.NilErr(t, err) @@ -134,7 +133,7 @@ func TestNodeStartStop(t *testing.T) { ) be.NilErr(t, err) - ctx, cancel := context.WithTimeout(context.Background(), time.Second*3) + ctx, cancel := context.WithTimeout(context.Background(), time.Second*30) defer cancel() go func() { @@ -144,9 +143,7 @@ func TestNodeStartStop(t *testing.T) { be.NilErr(t, nn.Start()) - for !nn.IsReady() { - time.Sleep(100 * time.Millisecond) - } + be.NilErr(t, nn.IsReady(10*time.Second)) be.Equal(t, 1, nn.registeredAgents.Count()) be.Equal(t, 22, nc.NumSubscriptions()) @@ -212,7 +209,7 @@ func TestNodeInfoHandler(t *testing.T) { ) be.NilErr(t, err) - ctx, cancel := context.WithTimeout(context.Background(), time.Second*3) + ctx, cancel := context.WithTimeout(context.Background(), time.Second*30) defer cancel() go func() { @@ -222,9 +219,7 @@ func TestNodeInfoHandler(t *testing.T) { be.NilErr(t, nn.Start()) - for !nn.IsReady() { - time.Sleep(100 * time.Millisecond) - } + be.NilErr(t, nn.IsReady(10*time.Second)) nodeInfo, err := nc.Request(models.NodeInfoRequestSubject(models.SystemNamespace, pub), []byte{}, time.Second) be.NilErr(t, err) @@ -277,9 +272,7 @@ func TestNodeLameduckHandlerWithTag(t *testing.T) { be.NilErr(t, nn.Start()) - for !nn.IsReady() { - time.Sleep(100 * time.Millisecond) - } + be.NilErr(t, nn.IsReady(10*time.Second)) req := models.LameduckRequest{ Delay: "0s", @@ -332,9 +325,7 @@ func TestNodeLameduckHandlerWithoutTag(t *testing.T) { be.NilErr(t, nn.Start()) - for !nn.IsReady() { - time.Sleep(100 * time.Millisecond) - } + be.NilErr(t, nn.IsReady(10*time.Second)) req := models.LameduckRequest{ Delay: "0s", @@ -373,9 +364,7 @@ func TestNodeShutdownExitCodes(t *testing.T) { be.NilErr(t, nn.Start()) - for !nn.IsReady() { - time.Sleep(10 * time.Millisecond) - } + be.NilErr(t, nn.IsReady(10*time.Second)) // Normal shutdown go func() { @@ -412,9 +401,7 @@ func TestNodeShutdownExitCodes(t *testing.T) { be.NilErr(t, nn.Start()) - for !nn.IsReady() { - time.Sleep(10 * time.Millisecond) - } + be.NilErr(t, nn.IsReady(10*time.Second)) // Trigger lameduck req := models.LameduckRequest{ @@ -433,11 +420,7 @@ func TestNodeShutdownExitCodes(t *testing.T) { func TestNodeDeployCloneUndeploy(t *testing.T) { s := startNatsServer(t) - defer func() { - for s.NumClients() == 0 { - s.Shutdown() - } - }() + defer s.Shutdown() output := new(bytes.Buffer) logger := slog.New(slog.NewJSONHandler(output, &slog.HandlerOptions{Level: slog.LevelDebug})) @@ -466,7 +449,7 @@ func TestNodeDeployCloneUndeploy(t *testing.T) { ) be.NilErr(t, err) - ctx, cancel := context.WithTimeout(context.Background(), time.Second*5) + ctx, cancel := context.WithTimeout(context.Background(), time.Second*30) defer cancel() go func() { @@ -476,9 +459,7 @@ func TestNodeDeployCloneUndeploy(t *testing.T) { be.NilErr(t, nn.Start()) - for !nn.IsReady() { - time.Sleep(100 * time.Millisecond) - } + be.NilErr(t, nn.IsReady(10*time.Second)) req := models.AuctionRequest{ AgentType: "inmem", @@ -487,7 +468,7 @@ func TestNodeDeployCloneUndeploy(t *testing.T) { reqB, err := json.Marshal(req) be.NilErr(t, err) - auctionRespRaw, err := nc.Request(models.AuctionRequestSubject(models.SystemNamespace), reqB, time.Second*3) + auctionRespRaw, err := nc.Request(models.AuctionRequestSubject(models.SystemNamespace), reqB, time.Second*10) be.NilErr(t, err) auctionResp := models.AuctionResponse{} @@ -504,7 +485,7 @@ func TestNodeDeployCloneUndeploy(t *testing.T) { startWorkloadReqB, err := json.Marshal(startWorkloadReq) be.NilErr(t, err) - startWorkloadRespRaw, err := nc.Request(models.AuctionDeployRequestSubject(models.SystemNamespace, auctionResp.BidderId), startWorkloadReqB, time.Second) + startWorkloadRespRaw, err := nc.Request(models.AuctionDeployRequestSubject(models.SystemNamespace, auctionResp.BidderId), startWorkloadReqB, time.Second*5) be.NilErr(t, err) startWorkloadResp := models.StartWorkloadResponse{} @@ -522,7 +503,7 @@ func TestNodeDeployCloneUndeploy(t *testing.T) { cloneWorkloadReqB, err := json.Marshal(cloneWorkloadReq) be.NilErr(t, err) - cloneWorkloadRespRaw, err := nc.Request(models.CloneWorkloadRequestSubject(models.SystemNamespace, startWorkloadResp.Id), cloneWorkloadReqB, time.Second) + cloneWorkloadRespRaw, err := nc.Request(models.CloneWorkloadRequestSubject(models.SystemNamespace, startWorkloadResp.Id), cloneWorkloadReqB, time.Second*5) be.NilErr(t, err) cloneWorkloadResp := models.CloneWorkloadResponse{} @@ -535,7 +516,7 @@ func TestNodeDeployCloneUndeploy(t *testing.T) { nsPingReqB, err := json.Marshal(nsPingReq) be.NilErr(t, err) - nsPingRespRaw, err := nc.Request(models.NamespacePingRequestSubject(models.SystemNamespace), nsPingReqB, time.Second*5) + nsPingRespRaw, err := nc.Request(models.NamespacePingRequestSubject(models.SystemNamespace), nsPingReqB, time.Second*10) be.NilErr(t, err) nsPingResp := models.AgentListWorkloadsResponse{} @@ -547,7 +528,7 @@ func TestNodeDeployCloneUndeploy(t *testing.T) { stopWorkloadReqB, err := json.Marshal(stopWorkloadReq) be.NilErr(t, err) - stopWorkloadRespRaw, err := nc.Request(models.UndeployRequestSubject(models.SystemNamespace, startWorkloadResp.Id), stopWorkloadReqB, time.Second) + stopWorkloadRespRaw, err := nc.Request(models.UndeployRequestSubject(models.SystemNamespace, startWorkloadResp.Id), stopWorkloadReqB, time.Second*5) be.NilErr(t, err) stopWorkloadResp := models.StopWorkloadResponse{}