Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 24 additions & 0 deletions operator/cmd/multicluster/multicluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,11 +29,13 @@ import (
"sigs.k8s.io/controller-runtime/pkg/webhook"

"github.com/redpanda-data/redpanda-operator/operator/internal/controller"
"github.com/redpanda-data/redpanda-operator/operator/internal/controller/pvcunbinder"
redpandacontrollers "github.com/redpanda-data/redpanda-operator/operator/internal/controller/redpanda"
"github.com/redpanda-data/redpanda-operator/operator/internal/lifecycle"
internalclient "github.com/redpanda-data/redpanda-operator/operator/pkg/client"
"github.com/redpanda-data/redpanda-operator/pkg/multicluster"
"github.com/redpanda-data/redpanda-operator/pkg/multicluster/watcher"
"github.com/redpanda-data/redpanda-operator/pkg/pflagutil"
)

// NB: these annotations are necessary because we want the ability to manager service accounts
Expand Down Expand Up @@ -77,6 +79,10 @@ type MulticlusterOptions struct {
peersStrings []string

LicenseFilePath string

UnbindPVCsAfter time.Duration
AllowPVRebinding bool
UnbinderSelector pflagutil.LabelSelectorValue
}

func (o *MulticlusterOptions) validate() error {
Expand Down Expand Up @@ -165,6 +171,9 @@ func (o *MulticlusterOptions) BindFlags(cmd *cobra.Command) {
cmd.Flags().DurationVar(&o.LeaderElectionLeaseDuration, "local-leader-election-lease-duration", 0, "Duration of the local leader election lease (0 uses controller-runtime default of 15s)")
cmd.Flags().DurationVar(&o.LeaderElectionRenewDeadline, "local-leader-election-renew-deadline", 0, "Renew deadline for the local leader election lease (0 uses controller-runtime default of 10s)")
cmd.Flags().DurationVar(&o.LeaderElectionRetryPeriod, "local-leader-election-retry-period", 0, "Retry period for the local leader election lease (0 uses controller-runtime default of 2s)")
cmd.Flags().DurationVar(&o.UnbindPVCsAfter, "unbind-pvcs-after", 0, "if not zero, runs the PVCUnbinder controller which attempts to 'unbind' the PVCs' of Pods that are Pending for longer than the given duration")
cmd.Flags().BoolVar(&o.AllowPVRebinding, "allow-pv-rebinding", false, "controls whether or not PVs unbound by the PVCUnbinder have their .ClaimRef cleared, which allows them to be reused")
cmd.Flags().Var(&o.UnbinderSelector, "unbinder-label-selector", "if provided, a Kubernetes label selector that will filter Pods to be considered by the PVCUnbinder.")
}

func Command() *cobra.Command {
Expand Down Expand Up @@ -361,5 +370,20 @@ func Run(
return err
}

if opts.UnbindPVCsAfter <= 0 {
setupLog.Info("PVCUnbinder controller not active", "unbind-after", opts.UnbindPVCsAfter, "selector", opts.UnbinderSelector, "allow-pv-rebinding", opts.AllowPVRebinding)
} else {
setupLog.Info("starting PVCUnbinder controller", "unbind-after", opts.UnbindPVCsAfter, "selector", opts.UnbinderSelector, "allow-pv-rebinding", opts.AllowPVRebinding)
if err := (&pvcunbinder.MulticlusterController{
Manager: manager,
Timeout: opts.UnbindPVCsAfter,
Selector: opts.UnbinderSelector.Selector,
AllowRebinding: opts.AllowPVRebinding,
}).SetupWithMultiClusterManager(); err != nil {
setupLog.Error(err, "unable to create controller", "controller", "PVCUnbinder")
return err
}
}

return manager.Start(ctrl.SetupSignalHandler())
}
53 changes: 53 additions & 0 deletions operator/internal/controller/pvcunbinder/pvcunbinder.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,10 @@ import (
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/log"
"sigs.k8s.io/controller-runtime/pkg/predicate"
mcbuilder "sigs.k8s.io/multicluster-runtime/pkg/builder"
mcreconcile "sigs.k8s.io/multicluster-runtime/pkg/reconcile"

"github.com/redpanda-data/redpanda-operator/pkg/multicluster"
)

var schedulingFailureRE = regexp.MustCompile(`(^0/[1-9]\d* nodes are available)|(volume node affinity)`)
Expand Down Expand Up @@ -69,6 +73,55 @@ type Controller struct {
AllowRebinding bool
}

// MulticlusterController is a multicluster-aware version of Controller that
// watches Pods across all clusters managed by a multicluster.Manager.
type MulticlusterController struct {
Manager multicluster.Manager
Timeout time.Duration
Selector labels.Selector
AllowRebinding bool
}

func (r *MulticlusterController) SetupWithMultiClusterManager() error {
selectorPredicate := predicate.NewPredicateFuncs(func(object client.Object) bool {
if r.Selector == nil {
return true
}
lbls := object.GetLabels()
if lbls == nil {
lbls = map[string]string{}
}
return r.Selector.Matches(labels.Set(lbls))
})
unbinderPredicate := predicate.NewPredicateFuncs(pvcUnbinderPredicate)

return mcbuilder.ControllerManagedBy(r.Manager).
For(
&corev1.Pod{},
mcbuilder.WithEngageWithLocalCluster(true),
mcbuilder.WithEngageWithProviderClusters(true),
).
WithEventFilter(selectorPredicate).
WithEventFilter(unbinderPredicate).
Complete(r)
}

func (r *MulticlusterController) Reconcile(ctx context.Context, req mcreconcile.Request) (ctrl.Result, error) {
k8sCluster, err := r.Manager.GetCluster(ctx, req.ClusterName)
if err != nil {
log.FromContext(ctx).Error(err, "unable to fetch cluster, skipping reconciliation", "cluster", req.ClusterName)
return ctrl.Result{}, nil
}

c := &Controller{
Client: k8sCluster.GetClient(),
Timeout: r.Timeout,
Selector: r.Selector,
AllowRebinding: r.AllowRebinding,
}
return c.Reconcile(ctx, req.Request)
}

// +kubebuilder:rbac:groups=core,resources=persistentvolumes,verbs=get;list;watch;patch

// +kubebuilder:rbac:groups=core,namespace=default,resources=pods,verbs=get;list;watch;delete
Expand Down
Loading