diff --git a/operator/cmd/multicluster/multicluster.go b/operator/cmd/multicluster/multicluster.go index bd2ccae52..9d7b5b8f5 100644 --- a/operator/cmd/multicluster/multicluster.go +++ b/operator/cmd/multicluster/multicluster.go @@ -29,11 +29,13 @@ import ( "sigs.k8s.io/controller-runtime/pkg/webhook" "github.com/redpanda-data/redpanda-operator/operator/internal/controller" + "github.com/redpanda-data/redpanda-operator/operator/internal/controller/pvcunbinder" redpandacontrollers "github.com/redpanda-data/redpanda-operator/operator/internal/controller/redpanda" "github.com/redpanda-data/redpanda-operator/operator/internal/lifecycle" internalclient "github.com/redpanda-data/redpanda-operator/operator/pkg/client" "github.com/redpanda-data/redpanda-operator/pkg/multicluster" "github.com/redpanda-data/redpanda-operator/pkg/multicluster/watcher" + "github.com/redpanda-data/redpanda-operator/pkg/pflagutil" ) // NB: these annotations are necessary because we want the ability to manager service accounts @@ -77,6 +79,10 @@ type MulticlusterOptions struct { peersStrings []string LicenseFilePath string + + UnbindPVCsAfter time.Duration + AllowPVRebinding bool + UnbinderSelector pflagutil.LabelSelectorValue } func (o *MulticlusterOptions) validate() error { @@ -165,6 +171,9 @@ func (o *MulticlusterOptions) BindFlags(cmd *cobra.Command) { cmd.Flags().DurationVar(&o.LeaderElectionLeaseDuration, "local-leader-election-lease-duration", 0, "Duration of the local leader election lease (0 uses controller-runtime default of 15s)") cmd.Flags().DurationVar(&o.LeaderElectionRenewDeadline, "local-leader-election-renew-deadline", 0, "Renew deadline for the local leader election lease (0 uses controller-runtime default of 10s)") cmd.Flags().DurationVar(&o.LeaderElectionRetryPeriod, "local-leader-election-retry-period", 0, "Retry period for the local leader election lease (0 uses controller-runtime default of 2s)") + cmd.Flags().DurationVar(&o.UnbindPVCsAfter, "unbind-pvcs-after", 0, "if not zero, runs the PVCUnbinder controller which attempts to 'unbind' the PVCs' of Pods that are Pending for longer than the given duration") + cmd.Flags().BoolVar(&o.AllowPVRebinding, "allow-pv-rebinding", false, "controls whether or not PVs unbound by the PVCUnbinder have their .ClaimRef cleared, which allows them to be reused") + cmd.Flags().Var(&o.UnbinderSelector, "unbinder-label-selector", "if provided, a Kubernetes label selector that will filter Pods to be considered by the PVCUnbinder.") } func Command() *cobra.Command { @@ -361,5 +370,20 @@ func Run( return err } + if opts.UnbindPVCsAfter <= 0 { + setupLog.Info("PVCUnbinder controller not active", "unbind-after", opts.UnbindPVCsAfter, "selector", opts.UnbinderSelector, "allow-pv-rebinding", opts.AllowPVRebinding) + } else { + setupLog.Info("starting PVCUnbinder controller", "unbind-after", opts.UnbindPVCsAfter, "selector", opts.UnbinderSelector, "allow-pv-rebinding", opts.AllowPVRebinding) + if err := (&pvcunbinder.MulticlusterController{ + Manager: manager, + Timeout: opts.UnbindPVCsAfter, + Selector: opts.UnbinderSelector.Selector, + AllowRebinding: opts.AllowPVRebinding, + }).SetupWithMultiClusterManager(); err != nil { + setupLog.Error(err, "unable to create controller", "controller", "PVCUnbinder") + return err + } + } + return manager.Start(ctrl.SetupSignalHandler()) } diff --git a/operator/internal/controller/pvcunbinder/pvcunbinder.go b/operator/internal/controller/pvcunbinder/pvcunbinder.go index a170ed1d6..aad2b2815 100644 --- a/operator/internal/controller/pvcunbinder/pvcunbinder.go +++ b/operator/internal/controller/pvcunbinder/pvcunbinder.go @@ -27,6 +27,10 @@ import ( "sigs.k8s.io/controller-runtime/pkg/client" "sigs.k8s.io/controller-runtime/pkg/log" "sigs.k8s.io/controller-runtime/pkg/predicate" + mcbuilder "sigs.k8s.io/multicluster-runtime/pkg/builder" + mcreconcile "sigs.k8s.io/multicluster-runtime/pkg/reconcile" + + "github.com/redpanda-data/redpanda-operator/pkg/multicluster" ) var schedulingFailureRE = regexp.MustCompile(`(^0/[1-9]\d* nodes are available)|(volume node affinity)`) @@ -69,6 +73,55 @@ type Controller struct { AllowRebinding bool } +// MulticlusterController is a multicluster-aware version of Controller that +// watches Pods across all clusters managed by a multicluster.Manager. +type MulticlusterController struct { + Manager multicluster.Manager + Timeout time.Duration + Selector labels.Selector + AllowRebinding bool +} + +func (r *MulticlusterController) SetupWithMultiClusterManager() error { + selectorPredicate := predicate.NewPredicateFuncs(func(object client.Object) bool { + if r.Selector == nil { + return true + } + lbls := object.GetLabels() + if lbls == nil { + lbls = map[string]string{} + } + return r.Selector.Matches(labels.Set(lbls)) + }) + unbinderPredicate := predicate.NewPredicateFuncs(pvcUnbinderPredicate) + + return mcbuilder.ControllerManagedBy(r.Manager). + For( + &corev1.Pod{}, + mcbuilder.WithEngageWithLocalCluster(true), + mcbuilder.WithEngageWithProviderClusters(true), + ). + WithEventFilter(selectorPredicate). + WithEventFilter(unbinderPredicate). + Complete(r) +} + +func (r *MulticlusterController) Reconcile(ctx context.Context, req mcreconcile.Request) (ctrl.Result, error) { + k8sCluster, err := r.Manager.GetCluster(ctx, req.ClusterName) + if err != nil { + log.FromContext(ctx).Error(err, "unable to fetch cluster, skipping reconciliation", "cluster", req.ClusterName) + return ctrl.Result{}, nil + } + + c := &Controller{ + Client: k8sCluster.GetClient(), + Timeout: r.Timeout, + Selector: r.Selector, + AllowRebinding: r.AllowRebinding, + } + return c.Reconcile(ctx, req.Request) +} + // +kubebuilder:rbac:groups=core,resources=persistentvolumes,verbs=get;list;watch;patch // +kubebuilder:rbac:groups=core,namespace=default,resources=pods,verbs=get;list;watch;delete