From 3fa9721adabf6f99f81c66cfe894676d4ff6c491 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Pawe=C5=82=20Bojanowski?= Date: Fri, 17 Apr 2026 10:04:26 +0200 Subject: [PATCH] StretchCluster: add ghost node ejection acceptance test MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Adds a multicluster acceptance test that verifies Redpanda's ghost node ejection — the automatic decommission of a broker whose region becomes permanently unreachable. Simulates a regional outage by shutting down (via harpoon's provider- agnostic `t.ShutdownNode`) the host worker node that the vcluster's workloads are pinned to. Harpoon cleanly re-adds the node with the same name and reloads the imported images onto it during cleanup, so subsequent scenarios see the cluster at its original size. ## Response to review feedback On @andrewstucki's suggestions: - Using the existing `takeRegionOffline` (scale vcluster StatefulSet to 0 + delete synced pods) did NOT reliably trigger Redpanda's auto-decommission path in our vcluster-on-k3d setup. The partition balancer saw the broker as unavailable but its voting consistently produced empty decommission candidates — even with Joe's scaled timeouts. Tearing down the host worker node is the most faithful simulation of a regional outage and the one that reliably triggers auto-decom, so this test keeps the node-teardown approach. - Switched the raw `k3d node delete` + `k3d node create` machinery to harpoon's `t.ShutdownNode`, which uses the Provider interface (DeleteNode / AddNode / LoadImages) so the test remains provider-agnostic for future cloud-infra runs. - Not merged into the regional-outage scenario: that scenario later brings the region back online and expects the operator in the returned region to resume reconciling. After an auto-decommission the returned broker's old identity is gone, so restoring would require a fresh operator deploy and re-registration — meaningfully different from the "transient outage" flow that scenario tests. Kept as a separate feature. On @joe-redpanda's suggestion: - Scaled node_status_interval, health_monitor_tick_interval, partition_autobalancing_tick_interval_ms, partition_autobalancing_node_availability_timeout_sec and partition_autobalancing_node_autodecommission_timeout_sec together per validators.cc rules. Total expected ejection time is ~90s from the moment the node goes down. ## Supporting changes - `stretch.go`: pin each vcluster's workloads (control plane + synced pods) to a single host worker node via `sync.fromHost.nodes.selector.labels: kubernetes.io/hostname` + virtual scheduler. Needed so shutting down the host node atomically kills the broker — a partial outage doesn't trigger the auto-decommission voting path. - `stretch.go`: skip offline regions in `ApplyAll`, `DeleteAll`, `DeleteNodepools` to avoid cleanup errors on unreachable vclusters. - `Taskfile.yml`: pre-pull `redpandadata/redpanda:v26.1.5`, `ghcr.io/loft-sh/kubernetes:v1.33.4`, and cert-manager `v1.17.1` images so the test doesn't hit dockerhub rate limits on first run. - `acceptance/main_test.go`: add `v26.1.5` to `WithImportedImages` so k3d loads it into the cluster. Co-Authored-By: Claude Opus 4.7 (1M context) --- Taskfile.yml | 9 + ...tretch-cluster-ghost-node-ejection.feature | 71 ++++ acceptance/main_test.go | 2 + acceptance/steps/register.go | 5 + acceptance/steps/stretch.go | 88 ++++- .../steps/stretch_ghost_node_ejection.go | 334 ++++++++++++++++++ 6 files changed, 504 insertions(+), 5 deletions(-) create mode 100644 acceptance/features/stretch-cluster-ghost-node-ejection.feature create mode 100644 acceptance/steps/stretch_ghost_node_ejection.go diff --git a/Taskfile.yml b/Taskfile.yml index f1420b753..7a1143ffc 100644 --- a/Taskfile.yml +++ b/Taskfile.yml @@ -384,6 +384,11 @@ tasks: - quay.io/jetstack/cert-manager-controller:{{.SECOND_TEST_CERTMANAGER_VERSION}} - quay.io/jetstack/cert-manager-cainjector:{{.SECOND_TEST_CERTMANAGER_VERSION}} - quay.io/jetstack/cert-manager-webhook:{{.SECOND_TEST_CERTMANAGER_VERSION}} + # cert-manager v1.17.1 is used inside vclusters (pkg/testutil.CertManagerVersion). + - quay.io/jetstack/cert-manager-controller:v1.17.1 + - quay.io/jetstack/cert-manager-cainjector:v1.17.1 + - quay.io/jetstack/cert-manager-webhook:v1.17.1 + - quay.io/jetstack/cert-manager-startupapicheck:v1.17.1 - '{{.TEST_REDPANDA_REPO}}:{{.TEST_REDPANDA_VERSION}}' - '{{.DEFAULT_TEST_UPGRADE_REDPANDA_REPO}}:{{.TEST_UPGRADE_REDPANDA_VERSION}}' - redpandadata/redpanda-operator:v25.1.3 @@ -391,6 +396,9 @@ tasks: - redpandadata/redpanda-operator:v25.3.1 - redpandadata/redpanda-operator:{{.TEST_UPGRADE_OPERATOR_VERSION}} - ghcr.io/loft-sh/vcluster-pro:{{.TEST_VCLUSTER_VERSION}} + # vcluster's embedded k8s distro — the tag is hardcoded in + # pkg/vcluster/vcluster.go's DefaultValues. + - ghcr.io/loft-sh/kubernetes:v1.33.4 - registry.k8s.io/kube-controller-manager:{{.TEST_KUBE_VERSION}} - registry.k8s.io/kube-apiserver:{{.TEST_KUBE_VERSION}} - coredns/coredns:{{.TEST_COREDNS_VERSION}} @@ -408,6 +416,7 @@ tasks: - redpandadata/redpanda:v25.2.1 - redpandadata/redpanda:v25.2.11 - redpandadata/redpanda:v26.1.1 + - redpandadata/redpanda:v26.1.5 cmds: - | diff --git a/acceptance/features/stretch-cluster-ghost-node-ejection.feature b/acceptance/features/stretch-cluster-ghost-node-ejection.feature new file mode 100644 index 000000000..20f88eb44 --- /dev/null +++ b/acceptance/features/stretch-cluster-ghost-node-ejection.feature @@ -0,0 +1,71 @@ +@multicluster +@serial +Feature: StretchCluster ghost node ejection + + When a node in a stretch cluster becomes permanently unreachable, Redpanda's + continuous data balancing should automatically decommission (eject) the ghost + node after the configured timeouts elapse. + + The partition balancer's default timeouts are measured in hours, so we scale + all the interrelated tunables together per Redpanda's validators.cc rules — + scaling one without the others breaks implicit assumptions. The ejection + timer runs for autodecommission_timeout seconds from the moment a quorum of + alive brokers agrees the node has been missing too long, so total expected + ejection time is ~90 seconds after the outage begins. + + @skip:gke @skip:aks @skip:eks + Scenario: Ghost node is ejected after a regional outage + Given I create a multicluster operator named "ghost" with 3 nodes + And I apply a multicluster Kubernetes manifest to "ghost": + """ + --- + apiVersion: cluster.redpanda.com/v1alpha2 + kind: StretchCluster + metadata: + name: cluster + namespace: default + spec: + external: + enabled: false + rbac: + enabled: true + config: + cluster: + partition_autobalancing_mode: continuous + partition_autobalancing_node_availability_timeout_sec: 45 + partition_autobalancing_node_autodecommission_timeout_sec: 90 + partition_autobalancing_tick_interval_ms: 10000 + health_monitor_tick_interval: 5000 + node_status_interval: 5000 + """ + And I apply a NodePool Kubernetes manifest to "ghost": + """ + spec: + clusterRef: + group: cluster.redpanda.com + kind: StretchCluster + name: cluster + replicas: 1 + image: + repository: redpandadata/redpanda + tag: v26.1.5 + sidecarImage: + repository: localhost/redpanda-operator + tag: dev + services: + perPod: + remote: + enabled: false + """ + And I expect 3 statefulsets in 3 kubernetes cluster to be created and eventually ready + And I expect all 3 NodePools in "ghost" to be eventually bound and deployed + # Verify the cluster starts healthy with all 3 nodes. + When I execute "rpk cluster health" command in the statefulset container in each cluster + Then the cluster health output should show 3 nodes across all clusters in "ghost" + # Simulate a regional outage by taking a non-controller region offline. We + # avoid the controller region so the remaining cluster retains a stable + # controller for the decommission decision. + When I take a non-controller region of "ghost" offline + # Wait for the ghost node to be ejected. With our config this takes ~90s + # (partition_autobalancing_node_autodecommission_timeout_sec). + Then the cluster health output should eventually show 2 nodes in the remaining clusters of "ghost" diff --git a/acceptance/main_test.go b/acceptance/main_test.go index 6a32dc5fd..157e25d7c 100644 --- a/acceptance/main_test.go +++ b/acceptance/main_test.go @@ -82,6 +82,8 @@ var setupSuite = sync.OnceValues(func() (*framework.Suite, error) { "redpandadata/redpanda-operator:v25.3.1", "redpandadata/redpanda:v25.1.1", "redpandadata/redpanda:v25.2.1", + // Image used by ghost node ejection feature. + "redpandadata/redpanda:v26.1.5", // Images used by upgrade and upgrade-regressions features. "redpandadata/redpanda:v25.2.11", "redpandadata/redpanda-unstable:v25.3.1-rc4", diff --git a/acceptance/steps/register.go b/acceptance/steps/register.go index cd8179ddc..493748a1d 100644 --- a/acceptance/steps/register.go +++ b/acceptance/steps/register.go @@ -129,6 +129,11 @@ func init() { framework.RegisterStep(`^the "([^"]*)" region of "([^"]*)" should reflect the updated StretchCluster spec$`, regionReflectsUpdatedSpec) framework.RegisterStep(`^the operator in the "([^"]*)" region of "([^"]*)" should eventually be running and reconciling$`, operatorInRegionRecovering) + // Ghost node ejection scenario steps + framework.RegisterStep(`^I take a non-controller region of "([^"]*)" offline$`, takeNonControllerRegionOffline) + framework.RegisterStep(`^the cluster health output should show (\d+) nodes across all clusters in "([^"]*)"$`, expectClusterHealthNodeCount) + framework.RegisterStep(`^the cluster health output should eventually show (\d+) nodes in the remaining clusters of "([^"]*)"$`, expectEventualNodeCountInRemainingClusters) + // Scaling scenario steps framework.RegisterStep(`^cluster "([^"]*)" should be stable with (\d+) nodes$`, checkClusterStableWithCount) framework.RegisterStep(`^cluster "([^"]*)" is stable with (\d+) nodes$`, checkClusterStableWithCount) diff --git a/acceptance/steps/stretch.go b/acceptance/steps/stretch.go index ef96bc9a6..8f42e1baa 100644 --- a/acceptance/steps/stretch.go +++ b/acceptance/steps/stretch.go @@ -243,6 +243,10 @@ func (v vclusterNodes) ApplyNodepoolsWithDifferentNamePerCluster(ctx context.Con func (v vclusterNodes) DeleteNodepools(ctx context.Context, manifest *godog.DocString) { t := framework.T(ctx) for _, node := range v { + if node.offline { + t.Logf("skipping NodePool cleanup for offline region %q", node.logicalName) + continue + } fullManifest := nodepoolManifest(nameMap[node.logicalName], manifest) t.Logf("applying manifest to %q", node.Name()) require.NoError(t, node.KubectlDelete(ctx, fullManifest)) @@ -252,6 +256,10 @@ func (v vclusterNodes) DeleteNodepools(ctx context.Context, manifest *godog.DocS func (v vclusterNodes) DeleteAll(ctx context.Context, manifest []byte) { t := framework.T(ctx) for _, node := range v { + if node.offline { + t.Logf("skipping manifest cleanup for offline region %q", node.logicalName) + continue + } require.NoError(t, node.KubectlDelete(ctx, manifest)) } } @@ -300,6 +308,11 @@ type vclusterNode struct { // offline is set to true when the region is intentionally taken offline for // disaster-recovery tests. ApplyAll and similar helpers skip offline nodes. offline bool + // k3dNodeName is the name of the k3d agent node this vcluster's workloads + // are pinned to via `sync.fromHost.nodes.selector.labels`. The + // ghost-node-ejection test deletes this node to simulate a regional + // outage; it's empty if pinning wasn't applied. + k3dNodeName string } func (n *vclusterNode) APIServer() string { @@ -385,7 +398,8 @@ func createNetworkedVClusterOperators(ctx context.Context, t framework.TestingT, redpandaLicense := os.Getenv(LicenseEnvVar) require.NotEmpty(t, redpandaLicense, LicenseEnvVar+" env var must be set") - vclusters := createVClusters(ctx, t, clusters) + k3dNodeNames := pickK3dAgentNodes(ctx, t, clusters) + vclusters := createVClusters(ctx, t, clusters, k3dNodeNames) assignOperatorServiceIPs(ctx, t, vclusters, namespace) peers := bootstrapTLS(ctx, t, vclusters, namespace) deployOperators(ctx, t, vclusters, namespace, redpandaLicense, peers) @@ -399,8 +413,8 @@ func createNetworkedVClusterOperators(ctx context.Context, t framework.TestingT, return stashNodes(ctx, clusterName, vclusters) } -func createVClusters(ctx context.Context, t framework.TestingT, clusters int32) []*vclusterNode { - t.Logf("creating %d vclusters", clusters) +func createVClusters(ctx context.Context, t framework.TestingT, clusters int32, k3dNodeNames []string) []*vclusterNode { + t.Logf("creating %d vclusters pinned to k3d nodes %v", clusters, k3dNodeNames) // Generate a unique per-test suffix so that vcluster host namespaces never // collide when tests run in parallel or back-to-back (a terminating namespace @@ -425,13 +439,16 @@ func createVClusters(ctx context.Context, t framework.TestingT, clusters int32) // with a letter); apirand.String can return strings starting with digits. actualName := fmt.Sprintf("vc-%s-%d", suffix, i) - vClusterValues := vcluster.DefaultValues + networkingValues(i, clusters, suffix) + vClusterValues := vcluster.DefaultValues + + networkingValues(i, clusters, suffix) + + pinningValues(k3dNodeNames[i]) cluster, err := vcluster.New(ctx, t.RestConfig(), vcluster.WithName(actualName), vcluster.WithValues(helm.RawYAML(vClusterValues))) require.NoError(t, err) scheme := t.Scheme() cluster.SetScheme(scheme) - t.Logf("finished creating vcluster %d (logical: %q, actual: %q)", i+1, logicalName, cluster.Name()) + t.Logf("finished creating vcluster %d (logical: %q, actual: %q, pinned to k3d node %q)", + i+1, logicalName, cluster.Name(), k3dNodeNames[i]) cleanupWrapper(t, func(ctx context.Context) { if err := cluster.Delete(); err != nil { @@ -453,6 +470,7 @@ func createVClusters(ctx context.Context, t framework.TestingT, clusters int32) Cluster: cluster, apiServer: fmt.Sprintf("https://%s", actualName), logicalName: logicalName, + k3dNodeName: k3dNodeNames[i], } }(i) } @@ -461,6 +479,66 @@ func createVClusters(ctx context.Context, t framework.TestingT, clusters int32) return nodes } +// pickK3dAgentNodes returns `clusters` worker-node hostnames from the host +// k3d cluster so each vcluster can be pinned to a distinct host node. Uses +// the built-in `kubernetes.io/hostname` label — no extra labeling needed. +// Skips the control-plane node and fails if there are not enough workers. +func pickK3dAgentNodes(ctx context.Context, t framework.TestingT, clusters int32) []string { + hostClient, err := client.New(t.RestConfig(), client.Options{}) + require.NoError(t, err) + + var nodeList corev1.NodeList + require.NoError(t, hostClient.List(ctx, &nodeList)) + + var workerNodes []corev1.Node + for _, n := range nodeList.Items { + if _, isControlPlane := n.Labels["node-role.kubernetes.io/control-plane"]; isControlPlane { + continue + } + workerNodes = append(workerNodes, n) + } + require.GreaterOrEqual(t, int32(len(workerNodes)), clusters, + "need at least %d worker nodes in host cluster, got %d", clusters, len(workerNodes)) + + // Sort for deterministic assignment within a single test run. + slices.SortFunc(workerNodes, func(a, b corev1.Node) int { + return strings.Compare(a.Name, b.Name) + }) + + names := make([]string, clusters) + for i := int32(0); i < clusters; i++ { + names[i] = workerNodes[i].Name + } + return names +} + +// pinningValues returns vcluster helm values that pin the control plane pod +// to a specific k3d node via `kubernetes.io/hostname` and filter the nodes +// visible inside the vcluster to just that same host node. Combined with the +// virtual scheduler, this ensures all synced workloads (operator, Redpanda, +// cert-manager) land on that single host node — so the ghost node ejection +// test can take them all down at once via `k3d node delete`. +func pinningValues(k3dNodeName string) string { + return fmt.Sprintf(` +controlPlane: + statefulSet: + scheduling: + nodeSelector: + kubernetes.io/hostname: %s + advanced: + virtualScheduler: + enabled: true +sync: + fromHost: + nodes: + enabled: true + selector: + all: false + labels: + kubernetes.io/hostname: %s +`, k3dNodeName, k3dNodeName) +} + // stretchClusterResourceName is the StretchCluster resource name used in all // multicluster acceptance tests. Per-pod service names are prefixed with this // value (e.g. "cluster-first-0"). diff --git a/acceptance/steps/stretch_ghost_node_ejection.go b/acceptance/steps/stretch_ghost_node_ejection.go new file mode 100644 index 000000000..a973cc7e0 --- /dev/null +++ b/acceptance/steps/stretch_ghost_node_ejection.go @@ -0,0 +1,334 @@ +// Copyright 2026 Redpanda Data, Inc. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.md +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0 + +package steps + +import ( + "bytes" + "context" + "fmt" + "strconv" + "strings" + "time" + + "github.com/redpanda-data/common-go/kube" + "github.com/stretchr/testify/require" + appsv1 "k8s.io/api/apps/v1" + corev1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "sigs.k8s.io/controller-runtime/pkg/client" + + framework "github.com/redpanda-data/redpanda-operator/harpoon" +) + +// takeNonControllerRegionOffline picks a region whose Redpanda broker is NOT +// the current controller, then simulates a regional outage by deleting the +// k3d agent hosting that region. Keeping the controller alive keeps the +// partition balancer running steadily so the auto-decommission decision +// isn't delayed by a controller failover. +// +// Unlike takeRegionOffline (which scales the vcluster control plane to 0 +// and deletes synced pods), this fully removes the host k3d node: kubelet, +// networking, and all pods pinned to it go away atomically — more faithful +// to a real regional outage and necessary for Redpanda's auto-decommission +// logic to reliably trigger in our vcluster-on-k3d setup. +func takeNonControllerRegionOffline(ctx context.Context, t framework.TestingT, clusterName string) context.Context { + nodes := getNodes(ctx, clusterName) + require.NotEmpty(t, nodes, "no vcluster nodes found") + + controllerRegion := discoverControllerRegionName(ctx, t, nodes[0]) + t.Logf("controller is in region %q, picking a non-controller region to take offline", controllerRegion) + + var target *vclusterNode + for _, n := range nodes { + if n.logicalName != controllerRegion { + target = n + break + } + } + require.NotNil(t, target, "no non-controller region found") + + deleteK3dNodeForRegion(ctx, t, target) + return ctx +} + +// deleteK3dNodeForRegion simulates a regional outage by deleting the host +// worker node that the vcluster's workloads are pinned to. Uses the harpoon +// framework's provider-agnostic ShutdownNode helper, which also registers a +// t.Cleanup that re-adds a node with the same name and reloads all imported +// images onto it — so subsequent scenarios in the same test run see the +// cluster at its original size without ImagePullBackOff. +func deleteK3dNodeForRegion(ctx context.Context, t framework.TestingT, node *vclusterNode) { + require.NotEmpty(t, node.k3dNodeName, + "region %s is not pinned to a k3d node — pinningValues may not have been applied", node.logicalName) + + t.Logf("simulating regional outage for region %s by shutting down host node %q", + node.logicalName, node.k3dNodeName) + + // ShutdownNode: delete via provider, re-add with the same name + reload + // imported images on cleanup. + t.ShutdownNode(ctx, node.k3dNodeName) + + // Wait for the Kubernetes Node to be gone or marked NotReady. The + // provider only removes the underlying runtime container; the Node + // object may linger until kube-controller-manager notices the missing + // kubelet. + hostClient, err := client.New(t.RestConfig(), client.Options{}) + require.NoError(t, err) + require.Eventually(t, func() bool { + var n corev1.Node + err := hostClient.Get(ctx, client.ObjectKey{Name: node.k3dNodeName}, &n) + if err != nil { + t.Logf("node %s gone from kubernetes: %v", node.k3dNodeName, err) + return true + } + for _, cond := range n.Status.Conditions { + if cond.Type == corev1.NodeReady && cond.Status != corev1.ConditionTrue { + t.Logf("node %s is %s (status=%s)", node.k3dNodeName, cond.Type, cond.Status) + return true + } + } + t.Logf("node %s still Ready", node.k3dNodeName) + return false + }, 3*time.Minute, 5*time.Second, "node %s did not become NotReady", node.k3dNodeName) + + // Mark the region offline so other helpers (ApplyAll, diagnostics, + // expectEventualNodeCountInRemainingClusters) skip it. + node.offline = true + + t.Logf("regional outage simulated: node %q is down, region %s is offline", + node.k3dNodeName, node.logicalName) +} + +// discoverControllerRegionName runs `rpk cluster health` and `rpk redpanda +// admin brokers list` on the provided region's Redpanda pod to determine which +// region hosts the current controller broker. +func discoverControllerRegionName(ctx context.Context, t framework.TestingT, node *vclusterNode) string { + pfCfg, err := node.PortForwardedRESTConfig(ctx) + require.NoError(t, err, "creating port-forwarded config for %s", node.Name()) + ctl, err := kube.FromRESTConfig(pfCfg) + require.NoError(t, err) + + pod := findRedpandaPod(ctx, t, node) + require.NotNil(t, pod, "no redpanda pod found in %s", node.Name()) + + execRpk := func(command string) string { + var out bytes.Buffer + err := ctl.Exec(ctx, pod, kube.ExecOptions{ + Container: "redpanda", + Command: []string{"/bin/bash", "-c", command}, + Stdout: &out, + }) + require.NoError(t, err, "executing %q in %s: %s", command, node.Name(), out.String()) + return out.String() + } + + healthOut := execRpk("rpk cluster health") + controllerID := parseControllerID(healthOut) + require.GreaterOrEqual(t, controllerID, 0, "failed to parse Controller ID:\n%s", healthOut) + t.Logf("controller broker id: %d", controllerID) + + brokersOut := execRpk("rpk redpanda admin brokers list") + controllerHost := parseBrokerHost(brokersOut, controllerID) + require.NotEmpty(t, controllerHost, "failed to find host for controller id %d:\n%s", controllerID, brokersOut) + t.Logf("controller broker host: %s", controllerHost) + + // HOST column looks like "--0.default" — look + // for any nodepool name from nameMap as a substring. + for vc, nodepoolName := range nameMap { + if strings.Contains(controllerHost, "-"+nodepoolName+"-") || strings.HasPrefix(controllerHost, nodepoolName+"-") { + return vc + } + } + require.FailNowf(t, "unknown host", "could not map host %q to a region (known nodepools: %v)", controllerHost, nameMap) + return "" +} + +func findRedpandaPod(ctx context.Context, _ framework.TestingT, node *vclusterNode) *corev1.Pod { + var stsList appsv1.StatefulSetList + if err := node.List(ctx, &stsList, client.InNamespace("default"), client.MatchingLabels{redpandaLabel: redpandaLabelValue}); err != nil { + return nil + } + if len(stsList.Items) == 0 { + return nil + } + selector, err := metav1.LabelSelectorAsSelector(stsList.Items[0].Spec.Selector) + if err != nil { + return nil + } + var pods corev1.PodList + if err := node.List(ctx, &pods, client.InNamespace("default"), client.MatchingLabelsSelector{Selector: selector}); err != nil { + return nil + } + if len(pods.Items) == 0 { + return nil + } + return &pods.Items[0] +} + +// parseControllerID finds "Controller ID: N" in rpk cluster health output. +func parseControllerID(output string) int { + for _, line := range strings.Split(output, "\n") { + line = strings.TrimSpace(line) + if !strings.HasPrefix(line, "Controller ID:") { + continue + } + fields := strings.Fields(line) + if len(fields) < 3 { + return -1 + } + id, err := strconv.Atoi(fields[len(fields)-1]) + if err != nil { + return -1 + } + return id + } + return -1 +} + +// parseBrokerHost looks up the HOST column for the given broker ID in +// `rpk redpanda admin brokers list` output. +func parseBrokerHost(output string, brokerID int) string { + idStr := strconv.Itoa(brokerID) + for _, line := range strings.Split(output, "\n") { + fields := strings.Fields(line) + if len(fields) < 2 { + continue + } + if fields[0] == idStr { + return fields[1] + } + } + return "" +} + +// parseHealthNodeIDs parses the "All nodes" line from `rpk cluster health` +// output and returns the list of node IDs. +// +// Example line: +// +// All nodes: [0 1 2] +func parseHealthNodeIDs(output string) ([]int, error) { + for _, line := range strings.Split(output, "\n") { + line = strings.TrimSpace(line) + if !strings.HasPrefix(line, "All nodes:") { + continue + } + start := strings.Index(line, "[") + end := strings.Index(line, "]") + if start < 0 || end < 0 || end <= start+1 { + return nil, nil + } + fields := strings.Fields(line[start+1 : end]) + ids := make([]int, 0, len(fields)) + for _, f := range fields { + id, err := strconv.Atoi(f) + if err != nil { + return nil, fmt.Errorf("parsing node ID %q: %w", f, err) + } + ids = append(ids, id) + } + return ids, nil + } + return nil, fmt.Errorf("no 'All nodes:' line found in output") +} + +// expectClusterHealthNodeCount asserts that every `rpk cluster health` result +// stashed in ctx (via executeCommandInStatefulsetContainers) reports the +// expected number of node IDs in its "All nodes" line. +func expectClusterHealthNodeCount(ctx context.Context, t framework.TestingT, expectedNodes int32, clusterName string) { + _ = clusterName // accepted for step symmetry, results are from last exec + results := ctx.Value(rpkResultsKey{}).([]rpkExecResult) + require.NotEmpty(t, results, "no execution results found") + + for _, result := range results { + ids, err := parseHealthNodeIDs(result.rawOutput) + require.NoError(t, err, "failed to parse output from %s:\n%s", result.clusterName, result.rawOutput) + t.Logf("cluster %s has nodes: %v", result.clusterName, ids) + require.Equal(t, int(expectedNodes), len(ids), + "expected %d nodes in cluster %s but got %d: %v", + expectedNodes, result.clusterName, len(ids), ids) + } +} + +// expectEventualNodeCountInRemainingClusters polls `rpk cluster health` on +// each still-reachable region and waits until all report the expected node +// count (i.e. the ghost node has been auto-ejected). Offline regions are +// skipped so we don't block on unreachable vclusters. +func expectEventualNodeCountInRemainingClusters(ctx context.Context, t framework.TestingT, expectedNodes int32, clusterName string) { + nodes := getNodes(ctx, clusterName) + + // Skip vclusters that were taken offline earlier in the scenario. + var remaining []*vclusterNode + for _, n := range nodes { + if n.offline { + t.Logf("skipping offline region %q", n.logicalName) + continue + } + remaining = append(remaining, n) + } + require.NotEmpty(t, remaining, "no reachable regions found") + t.Logf("polling %d reachable regions for ghost node ejection", len(remaining)) + + type execConfig struct { + node *vclusterNode + ctl *kube.Ctl + } + configs := make([]execConfig, 0, len(remaining)) + for _, node := range remaining { + pfCfg, err := node.PortForwardedRESTConfig(ctx) + require.NoError(t, err, "creating port-forwarded config for %s", node.Name()) + ctl, err := kube.FromRESTConfig(pfCfg) + require.NoError(t, err, "creating kube ctl for %s", node.Name()) + configs = append(configs, execConfig{node: node, ctl: ctl}) + } + + execInPod := func(cfg execConfig, pod *corev1.Pod, command string) string { + var out bytes.Buffer + if err := cfg.ctl.Exec(ctx, pod, kube.ExecOptions{ + Container: "redpanda", + Command: []string{"/bin/bash", "-c", command}, + Stdout: &out, + }); err != nil { + return fmt.Sprintf("error: %v (output: %s)", err, out.String()) + } + return out.String() + } + + require.Eventually(t, func() bool { + for _, cfg := range configs { + pod := findRedpandaPod(ctx, t, cfg.node) + if pod == nil { + t.Logf("no redpanda pod found in %s", cfg.node.Name()) + return false + } + + healthOutput := execInPod(cfg, pod, "rpk cluster health") + t.Logf("region %s rpk cluster health:\n%s", cfg.node.logicalName, healthOutput) + + ids, err := parseHealthNodeIDs(healthOutput) + if err != nil { + t.Logf("failed to parse health output from %s: %v", cfg.node.logicalName, err) + t.Logf("region %s broker list:\n%s", cfg.node.logicalName, + execInPod(cfg, pod, "rpk redpanda admin brokers list")) + return false + } + + if len(ids) != int(expectedNodes) { + t.Logf("region %s has %d nodes %v, expected %d", cfg.node.logicalName, len(ids), ids, expectedNodes) + t.Logf("region %s broker list:\n%s", cfg.node.logicalName, + execInPod(cfg, pod, "rpk redpanda admin brokers list")) + return false + } + } + return true + }, 10*time.Minute, 5*time.Second, "expected %d nodes in remaining regions after ghost node ejection", expectedNodes) + + t.Logf("all remaining regions report %d nodes as expected", expectedNodes) +}