Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions .changes/unreleased/operator-Fixed-20260423-224719.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
project: operator
kind: Fixed
body: Fixed an issue where in-use features from an enterprise cluster did not have deterministic sort order and could cause reconciliation storms due to status changes.
time: 2026-04-23T22:47:19.373877-04:00
4 changes: 2 additions & 2 deletions ci/rp-controller-gen.nix
Original file line number Diff line number Diff line change
Expand Up @@ -6,13 +6,13 @@

buildGo126Module rec {
pname = "rp-controller-gen";
version = "59451d668eb28f01f91354a2463d766866148ef4";
version = "f56efc4bf824e209eba669c8f3607c04f7e25ce9";

src = fetchFromGitHub {
owner = "redpanda-data";
repo = "common-go";
rev = "${version}";
hash = "sha256-KoVBYGE0pkXpcPb+8145Wmu0m8lex8hc8c0QzFVzhho=";
hash = "sha256-gcSvonPNqEKUqT+s7/a/KG7xqLhW8s9bj951w2x09rk=";
};

vendorHash = "sha256-PIKAvpLy0tTYkkzxg1UvHhDMhQGysPQ06k1J+5llN84=";
Expand Down
1 change: 1 addition & 0 deletions gen/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -304,6 +304,7 @@ require (
sigs.k8s.io/json v0.0.0-20250730193827-2d320260d730 // indirect
sigs.k8s.io/kustomize/api v0.20.1 // indirect
sigs.k8s.io/kustomize/kyaml v0.20.1 // indirect
sigs.k8s.io/mcs-api v0.4.1 // indirect
sigs.k8s.io/randfill v1.0.0 // indirect
sigs.k8s.io/structured-merge-diff/v6 v6.3.2 // indirect
)
Expand Down
1 change: 1 addition & 0 deletions go.work.sum
Original file line number Diff line number Diff line change
Expand Up @@ -1107,6 +1107,7 @@ github.com/GoogleCloudPlatform/opentelemetry-operations-go/detectors/gcp v1.29.0
github.com/GoogleCloudPlatform/opentelemetry-operations-go/detectors/gcp v1.29.0/go.mod h1:Cz6ft6Dkn3Et6l2v2a9/RpN7epQ1GtDlO6lj8bEcOvw=
github.com/GoogleCloudPlatform/opentelemetry-operations-go/detectors/gcp v1.30.0 h1:sBEjpZlNHzK1voKq9695PJSX2o5NEXl7/OL3coiIY0c=
github.com/GoogleCloudPlatform/opentelemetry-operations-go/detectors/gcp v1.30.0/go.mod h1:P4WPRUkOhJC13W//jWpyfJNDAIpvRbAUIYLX/4jtlE0=
github.com/GoogleCloudPlatform/opentelemetry-operations-go/detectors/gcp v1.31.0/go.mod h1:P4WPRUkOhJC13W//jWpyfJNDAIpvRbAUIYLX/4jtlE0=
github.com/GoogleCloudPlatform/opentelemetry-operations-go/exporter/metric v0.48.1 h1:UQ0AhxogsIRZDkElkblfnwjc3IaltCm2HUMvezQaL7s=
github.com/GoogleCloudPlatform/opentelemetry-operations-go/exporter/metric v0.48.1/go.mod h1:jyqM3eLpJ3IbIFDTKVz2rF9T/xWGW0rIriGwnz8l9Tk=
github.com/GoogleCloudPlatform/opentelemetry-operations-go/exporter/metric v0.50.0/go.mod h1:ZV4VOm0/eHR06JLrXWe09068dHpr3TRpY9Uo7T+anuA=
Expand Down
12 changes: 7 additions & 5 deletions operator/chart/chart.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,8 @@ func render(dot *helmette.Dot) []kube.Object {
ConfigMap(dot),
MetricsService(dot),
WebhookService(dot),
OperatorService(dot),
OperatorServiceExport(dot),
MutatingWebhookConfiguration(dot),
ValidatingWebhookConfiguration(dot),
ServiceAccount(dot),
Expand All @@ -65,10 +67,14 @@ func render(dot *helmette.Dot) []kube.Object {
MigrationJobServiceAccount(dot),
}

for _, svc := range StretchClusterService(dot) {
for _, svc := range OperatorPeerServices(dot) {
manifests = append(manifests, &svc)
}
Comment thread
andrewstucki marked this conversation as resolved.

for _, si := range OperatorServiceImports(dot) {
manifests = append(manifests, &si)
}

for _, cr := range ClusterRoles(dot) {
manifests = append(manifests, &cr)
}
Expand All @@ -77,10 +83,6 @@ func render(dot *helmette.Dot) []kube.Object {
manifests = append(manifests, &crb)
}

for _, svc := range StretchClusterService(dot) {
manifests = append(manifests, &svc)
}

// NB: This slice may contain nil interfaces!
// Filtering happens elsewhere, don't call this function directly if you
// can avoid it.
Expand Down
202 changes: 192 additions & 10 deletions operator/chart/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,47 +18,229 @@ import (
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/util/intstr"
"k8s.io/utils/ptr"
mcsv1alpha1 "sigs.k8s.io/mcs-api/pkg/apis/v1alpha1"

"github.com/redpanda-data/redpanda-operator/gotohelm/helmette"
)

func StretchClusterService(dot *helmette.Dot) []corev1.Service {
// OperatorServicePort is the gRPC port the operator listens on for
// cross-cluster raft traffic. Matches the --raft-address default.
const OperatorServicePort = 9443

// OperatorService renders the peer-facing gRPC Service in front of the
// operator Deployment when multicluster.service.enabled is true. The
// Service type and annotations come straight from values — any mesh
// that configures routing via annotations (Cilium ClusterMesh,
// Submariner, …) or any cloud-specific LoadBalancer knob fits without
// the chart hard-coding any particular implementation.
//
// The Service name equals the helm fullname so peers can address it as
// `<fullname>.<namespace>.svc.cluster.local:9443`. That matches the
// naming convention the bootstrap tooling already uses for the TLS
// secret prefix.
func OperatorService(dot *helmette.Dot) *corev1.Service {
values := helmette.Unwrap[Values](dot.Values)

if !values.Multicluster.ServicePerOperatorDeployment {
if !values.Multicluster.Enabled || !values.Multicluster.Service.Enabled {
return nil
}

var svcs []corev1.Service
annotations := helmette.Default(map[string]string{}, values.Annotations)
svcType := values.Multicluster.Service.Type
if svcType == "" {
svcType = corev1.ServiceTypeClusterIP
}

annotations := helmette.Merge(
map[string]string{},
helmette.Default(map[string]string{}, values.Annotations),
helmette.Default(map[string]string{}, values.Multicluster.Service.Annotations),
)

return &corev1.Service{
TypeMeta: metav1.TypeMeta{
APIVersion: "v1",
Kind: "Service",
},
ObjectMeta: metav1.ObjectMeta{
Name: Fullname(dot),
Namespace: dot.Release.Namespace,
Labels: Labels(dot),
Annotations: annotations,
},
Spec: corev1.ServiceSpec{
Type: svcType,
Selector: SelectorLabels(dot),
Ports: []corev1.ServicePort{
{
Name: "raft",
Port: int32(OperatorServicePort),
TargetPort: intstr.FromInt32(int32(OperatorServicePort)),
Protocol: corev1.ProtocolTCP,
},
},
// Peers need to reach this Service before the local operator
// is Ready (Ready is gated on raft quorum forming), so don't
// wait for readiness to publish endpoint addresses.
PublishNotReadyAddresses: true,
},
}
}

// OperatorPeerServices renders a selectorless placeholder Service for
// every remote peer listed in multicluster.peers (excluding the local
// cluster itself). Rendered only when multicluster.service.mesh=true.
//
// Why: with Cilium ClusterMesh "global services", a Service named X
// on cluster A merges its endpoints into a Service also named X on
// cluster B — but only if both Services exist. Without the placeholder
// on cluster B, a pod on B looking up `X.<ns>.svc.cluster.local` gets
// NXDOMAIN. This function emits the placeholders so every cluster can
// resolve every peer's operator Service. The real endpoints come from
// the OperatorService on the peer's own cluster; ClusterMesh merges
// them in via the matching name.
//
// Placeholders are always ClusterIP — they carry no selector and
// therefore no local endpoints, so a LoadBalancer type here would
// provision a cloud LB with nothing behind it. Only the local
// OperatorService respects Multicluster.Service.Type.
//
// Annotations are Multicluster.Service.Annotations merged with the
// peer's own Peer.Annotations (peer wins on conflict), so a user can
// set mesh-wide defaults and peer-specific overrides like Cilium
// `service.cilium.io/affinity: <cluster-name>`.
func OperatorPeerServices(dot *helmette.Dot) []corev1.Service {
values := helmette.Unwrap[Values](dot.Values)

if !values.Multicluster.Enabled || !values.Multicluster.Service.Enabled {
return nil
}
if !values.Multicluster.Service.Mesh {
return nil
}

self := values.Multicluster.Name
var svcs []corev1.Service
for _, p := range values.Multicluster.Peers {
if p.Name == self {
continue
}
annotations := helmette.Merge(
map[string]string{},
helmette.Default(map[string]string{}, values.Annotations),
helmette.Default(map[string]string{}, values.Multicluster.Service.Annotations),
helmette.Default(map[string]string{}, p.Annotations),
)
svcs = append(svcs, corev1.Service{
TypeMeta: metav1.TypeMeta{
APIVersion: "v1",
Kind: "Service",
},
ObjectMeta: metav1.ObjectMeta{
Name: cleanForK8sWithSuffix(fmt.Sprintf("%s-%s", p.Name, helmette.Default(dot.Chart.Name, values.NameOverride)), "raft-service"),
Name: p.Name,
Namespace: dot.Release.Namespace,
Labels: Labels(dot),
Annotations: helmette.Merge(annotations, helmette.Default(map[string]string{}, p.AdditionalAnnotation)),
Annotations: annotations,
},
Spec: corev1.ServiceSpec{
Selector: p.SelectorOverwrite,
Type: corev1.ServiceTypeClusterIP,
// No selector → no local endpoints. Cilium ClusterMesh
// merges in the remote peer's endpoints.
Ports: []corev1.ServicePort{
{
Port: int32(9443),
TargetPort: intstr.FromInt32(9443),
Name: "raft",
Port: int32(OperatorServicePort),
TargetPort: intstr.FromInt32(int32(OperatorServicePort)),
Protocol: corev1.ProtocolTCP,
},
},
PublishNotReadyAddresses: true,
},
})
}
return svcs
}

// OperatorServiceExport renders an MCS ServiceExport for the operator
// Service when multicluster.service.mcs is true. A compliant MCS
// controller in the cluster mirrors the Service into every peer cluster
// under `<name>.<namespace>.svc.clusterset.local`.
func OperatorServiceExport(dot *helmette.Dot) *mcsv1alpha1.ServiceExport {
values := helmette.Unwrap[Values](dot.Values)

if !values.Multicluster.Enabled ||
!values.Multicluster.Service.Enabled ||
!values.Multicluster.Service.MCS {
return nil
}

return &mcsv1alpha1.ServiceExport{
TypeMeta: metav1.TypeMeta{
APIVersion: "multicluster.x-k8s.io/v1alpha1",
Kind: "ServiceExport",
},
ObjectMeta: metav1.ObjectMeta{
Name: Fullname(dot),
Namespace: dot.Release.Namespace,
Labels: Labels(dot),
Annotations: helmette.Default(map[string]string{}, values.Annotations),
},
}
}

// OperatorServiceImports renders one ServiceImport per remote peer
// when multicluster.service.mcs is true. Each import gives this
// cluster a local clusterset-scoped entry point for that peer's
// exported operator Service, resolvable at
// `<peer>.<namespace>.svc.clusterset.local`.
//
// The local cluster is skipped: its own ServiceExport causes the MCS
// controller to auto-create a matching ServiceImport on every cluster
// in the clusterset including this one, so a chart-managed import for
// self would collide with the controller-managed one.
//
// Peers are trusted to be named after each remote operator's helm
// fullname — the same convention the chart uses elsewhere for matching
// peer.Name against the remote cluster's multicluster.name.
func OperatorServiceImports(dot *helmette.Dot) []mcsv1alpha1.ServiceImport {
values := helmette.Unwrap[Values](dot.Values)

if !values.Multicluster.Enabled ||
!values.Multicluster.Service.Enabled ||
!values.Multicluster.Service.MCS {
return nil
}

self := values.Multicluster.Name
var imports []mcsv1alpha1.ServiceImport
for _, p := range values.Multicluster.Peers {
if p.Name == self {
continue
}
imports = append(imports, mcsv1alpha1.ServiceImport{
Comment thread
andrewstucki marked this conversation as resolved.
TypeMeta: metav1.TypeMeta{
APIVersion: "multicluster.x-k8s.io/v1alpha1",
Kind: "ServiceImport",
},
ObjectMeta: metav1.ObjectMeta{
Name: p.Name,
Namespace: dot.Release.Namespace,
Labels: Labels(dot),
Annotations: helmette.Default(map[string]string{}, values.Annotations),
},
Spec: mcsv1alpha1.ServiceImportSpec{
Type: mcsv1alpha1.ClusterSetIP,
Ports: []mcsv1alpha1.ServicePort{
{
Name: "raft",
Protocol: corev1.ProtocolTCP,
Port: int32(OperatorServicePort),
},
},
},
})
}
return imports
}

func WebhookService(dot *helmette.Dot) *corev1.Service {
values := helmette.Unwrap[Values](dot.Values)

Expand Down
16 changes: 8 additions & 8 deletions operator/chart/templates/_chart.go.tpl
Original file line number Diff line number Diff line change
Expand Up @@ -5,27 +5,27 @@
{{- $dot := (index .a 0) -}}
{{- range $_ := (list 1) -}}
{{- $_is_returning := false -}}
{{- $manifests := (list (get (fromJson (include "operator.Issuer" (dict "a" (list $dot)))) "r") (get (fromJson (include "operator.Certificate" (dict "a" (list $dot)))) "r") (get (fromJson (include "operator.ConfigMap" (dict "a" (list $dot)))) "r") (get (fromJson (include "operator.MetricsService" (dict "a" (list $dot)))) "r") (get (fromJson (include "operator.WebhookService" (dict "a" (list $dot)))) "r") (get (fromJson (include "operator.MutatingWebhookConfiguration" (dict "a" (list $dot)))) "r") (get (fromJson (include "operator.ValidatingWebhookConfiguration" (dict "a" (list $dot)))) "r") (get (fromJson (include "operator.ServiceAccount" (dict "a" (list $dot)))) "r") (get (fromJson (include "operator.ServiceMonitor" (dict "a" (list $dot)))) "r") (get (fromJson (include "operator.Deployment" (dict "a" (list $dot)))) "r") (get (fromJson (include "operator.PreInstallCRDJob" (dict "a" (list $dot)))) "r") (get (fromJson (include "operator.CRDJobServiceAccount" (dict "a" (list $dot)))) "r") (get (fromJson (include "operator.PostUpgradeMigrationJob" (dict "a" (list $dot)))) "r") (get (fromJson (include "operator.MigrationJobServiceAccount" (dict "a" (list $dot)))) "r")) -}}
{{- range $_, $svc := (get (fromJson (include "operator.StretchClusterService" (dict "a" (list $dot)))) "r") -}}
{{- $manifests := (list (get (fromJson (include "operator.Issuer" (dict "a" (list $dot)))) "r") (get (fromJson (include "operator.Certificate" (dict "a" (list $dot)))) "r") (get (fromJson (include "operator.ConfigMap" (dict "a" (list $dot)))) "r") (get (fromJson (include "operator.MetricsService" (dict "a" (list $dot)))) "r") (get (fromJson (include "operator.WebhookService" (dict "a" (list $dot)))) "r") (get (fromJson (include "operator.OperatorService" (dict "a" (list $dot)))) "r") (get (fromJson (include "operator.OperatorServiceExport" (dict "a" (list $dot)))) "r") (get (fromJson (include "operator.MutatingWebhookConfiguration" (dict "a" (list $dot)))) "r") (get (fromJson (include "operator.ValidatingWebhookConfiguration" (dict "a" (list $dot)))) "r") (get (fromJson (include "operator.ServiceAccount" (dict "a" (list $dot)))) "r") (get (fromJson (include "operator.ServiceMonitor" (dict "a" (list $dot)))) "r") (get (fromJson (include "operator.Deployment" (dict "a" (list $dot)))) "r") (get (fromJson (include "operator.PreInstallCRDJob" (dict "a" (list $dot)))) "r") (get (fromJson (include "operator.CRDJobServiceAccount" (dict "a" (list $dot)))) "r") (get (fromJson (include "operator.PostUpgradeMigrationJob" (dict "a" (list $dot)))) "r") (get (fromJson (include "operator.MigrationJobServiceAccount" (dict "a" (list $dot)))) "r")) -}}
{{- range $_, $svc := (get (fromJson (include "operator.OperatorPeerServices" (dict "a" (list $dot)))) "r") -}}
{{- $manifests = (concat (default (list) $manifests) (list $svc)) -}}
{{- end -}}
{{- if $_is_returning -}}
{{- break -}}
{{- end -}}
{{- range $_, $cr := (get (fromJson (include "operator.ClusterRoles" (dict "a" (list $dot)))) "r") -}}
{{- $manifests = (concat (default (list) $manifests) (list $cr)) -}}
{{- range $_, $si := (get (fromJson (include "operator.OperatorServiceImports" (dict "a" (list $dot)))) "r") -}}
{{- $manifests = (concat (default (list) $manifests) (list $si)) -}}
{{- end -}}
{{- if $_is_returning -}}
{{- break -}}
{{- end -}}
{{- range $_, $crb := (get (fromJson (include "operator.ClusterRoleBindings" (dict "a" (list $dot)))) "r") -}}
{{- $manifests = (concat (default (list) $manifests) (list $crb)) -}}
{{- range $_, $cr := (get (fromJson (include "operator.ClusterRoles" (dict "a" (list $dot)))) "r") -}}
{{- $manifests = (concat (default (list) $manifests) (list $cr)) -}}
{{- end -}}
{{- if $_is_returning -}}
{{- break -}}
{{- end -}}
{{- range $_, $svc := (get (fromJson (include "operator.StretchClusterService" (dict "a" (list $dot)))) "r") -}}
{{- $manifests = (concat (default (list) $manifests) (list $svc)) -}}
{{- range $_, $crb := (get (fromJson (include "operator.ClusterRoleBindings" (dict "a" (list $dot)))) "r") -}}
{{- $manifests = (concat (default (list) $manifests) (list $crb)) -}}
{{- end -}}
{{- if $_is_returning -}}
{{- break -}}
Expand Down
Loading
Loading