Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions apis/aga/v1beta1/globalaccelerator_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ const (
)

// PortRange defines the port range for Global Accelerator listeners.
// +kubebuilder:validation:XValidation:rule="self.fromPort <= self.toPort",message="FromPort must be less than or equal to ToPort"
type PortRange struct {
// FromPort is the first port in the range of ports, inclusive.
// +kubebuilder:validation:Minimum=1
Expand Down
3 changes: 3 additions & 0 deletions config/crd/aga/aga-crds.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -264,6 +264,9 @@ spec:
- fromPort
- toPort
type: object
x-kubernetes-validations:
- message: FromPort must be less than or equal to ToPort
rule: self.fromPort <= self.toPort
maxItems: 10
minItems: 1
type: array
Expand Down
3 changes: 3 additions & 0 deletions config/crd/aga/aga.k8s.aws_globalaccelerators.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -264,6 +264,9 @@ spec:
- fromPort
- toPort
type: object
x-kubernetes-validations:
- message: FromPort must be less than or equal to ToPort
rule: self.fromPort <= self.toPort
maxItems: 10
minItems: 1
type: array
Expand Down
18 changes: 18 additions & 0 deletions config/webhook/globalaccelerator_validator_patch.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
# This patch adds the GlobalAccelerator validator webhook configuration to the webhook configurations
apiVersion: admissionregistration.k8s.io/v1
kind: ValidatingWebhookConfiguration
metadata:
name: webhook-configuration
webhooks:
- name: vglobalaccelerator.aga.k8s.aws
rules:
- apiGroups:
- "aga.k8s.aws"
apiVersions:
- v1beta1
operations:
- CREATE
- UPDATE
resources:
- globalaccelerators
scope: "Namespaced"
1 change: 1 addition & 0 deletions config/webhook/kustomization.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -9,3 +9,4 @@ patchesStrategicMerge:
- pod_mutator_patch.yaml
- service_mutator_patch.yaml
- ingressclassparams_validator_patch.yaml
- globalaccelerator_validator_patch.yaml
21 changes: 21 additions & 0 deletions config/webhook/manifests.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,27 @@ kind: ValidatingWebhookConfiguration
metadata:
name: webhook
webhooks:
- admissionReviewVersions:
- v1beta1
clientConfig:
service:
name: webhook-service
namespace: system
path: /validate-aga-k8s-aws-v1beta1-globalaccelerator
failurePolicy: Fail
matchPolicy: Equivalent
name: vglobalaccelerator.aga.k8s.aws
rules:
- apiGroups:
- aga.k8s.aws
apiVersions:
- v1beta1
operations:
- CREATE
- UPDATE
resources:
- globalaccelerators
sideEffects: None
- admissionReviewVersions:
- v1beta1
clientConfig:
Expand Down
136 changes: 112 additions & 24 deletions controllers/aga/globalaccelerator_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,17 @@ package controllers
import (
"context"
"fmt"
"time"

"github.com/aws/aws-sdk-go-v2/service/globalaccelerator/types"
"github.com/go-logr/logr"
"github.com/pkg/errors"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/types"
ktypes "k8s.io/apimachinery/pkg/types"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/tools/record"
"k8s.io/client-go/util/workqueue"
"sigs.k8s.io/aws-load-balancer-controller/pkg/aws/services"
"sigs.k8s.io/aws-load-balancer-controller/pkg/shared_constants"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/client"
Expand All @@ -34,6 +40,7 @@ import (
"sigs.k8s.io/aws-load-balancer-controller/pkg/aga"
"sigs.k8s.io/aws-load-balancer-controller/pkg/config"
"sigs.k8s.io/aws-load-balancer-controller/pkg/deploy"
agadeploy "sigs.k8s.io/aws-load-balancer-controller/pkg/deploy/aga"
"sigs.k8s.io/aws-load-balancer-controller/pkg/deploy/tracking"
ctrlerrors "sigs.k8s.io/aws-load-balancer-controller/pkg/error"
"sigs.k8s.io/aws-load-balancer-controller/pkg/k8s"
Expand All @@ -42,6 +49,7 @@ import (
agamodel "sigs.k8s.io/aws-load-balancer-controller/pkg/model/aga"
"sigs.k8s.io/aws-load-balancer-controller/pkg/model/core"
"sigs.k8s.io/aws-load-balancer-controller/pkg/runtime"
agastatus "sigs.k8s.io/aws-load-balancer-controller/pkg/status/aga"
)

const (
Expand All @@ -52,21 +60,27 @@ const (
agaResourcesGroupVersion = "aga.k8s.aws/v1beta1"
globalAcceleratorKind = "GlobalAccelerator"

// Requeue constants for provisioning state monitoring
requeueMessage = "Monitoring provisioning state"
statusUpdateRequeueTime = 1 * time.Minute

// Metric stage constants
MetricStageFetchGlobalAccelerator = "fetch_globalAccelerator"
MetricStageAddFinalizers = "add_finalizers"
MetricStageBuildModel = "build_model"
MetricStageDeployStack = "deploy_stack"
MetricStageReconcileGlobalAccelerator = "reconcile_globalaccelerator"

// Metric error constants
MetricErrorAddFinalizers = "add_finalizers_error"
MetricErrorRemoveFinalizers = "remove_finalizers_error"
MetricErrorBuildModel = "build_model_error"
MetricErrorDeployStack = "deploy_stack_error"
MetricErrorReconcileGlobalAccelerator = "reconcile_globalaccelerator_error"
)

// NewGlobalAcceleratorReconciler constructs new globalAcceleratorReconciler
func NewGlobalAcceleratorReconciler(k8sClient client.Client, eventRecorder record.EventRecorder, finalizerManager k8s.FinalizerManager, config config.ControllerConfig, logger logr.Logger, metricsCollector lbcmetrics.MetricCollector, reconcileCounters *metricsutil.ReconcileCounters) *globalAcceleratorReconciler {
func NewGlobalAcceleratorReconciler(k8sClient client.Client, eventRecorder record.EventRecorder, finalizerManager k8s.FinalizerManager, config config.ControllerConfig, cloud services.Cloud, logger logr.Logger, metricsCollector lbcmetrics.MetricCollector, reconcileCounters *metricsutil.ReconcileCounters) *globalAcceleratorReconciler {

// Create tracking provider
trackingProvider := tracking.NewDefaultProvider(agaTagPrefix, config.ClusterName)
Expand All @@ -78,6 +92,7 @@ func NewGlobalAcceleratorReconciler(k8sClient client.Client, eventRecorder recor
trackingProvider,
config.FeatureGates,
config.ClusterName,
config.AWSConfig.Region,
config.DefaultTags,
config.ExternalManagedTags,
logger.WithName("aga-model-builder"),
Expand All @@ -87,17 +102,26 @@ func NewGlobalAcceleratorReconciler(k8sClient client.Client, eventRecorder recor
// Create stack marshaller
stackMarshaller := deploy.NewDefaultStackMarshaller()

// Create AGA stack deployer
stackDeployer := agadeploy.NewDefaultStackDeployer(cloud, config, agaTagPrefix, logger.WithName("aga-stack-deployer"), metricsCollector, controllerName)

// Create status updater
statusUpdater := agastatus.NewStatusUpdater(k8sClient, logger)

return &globalAcceleratorReconciler{
k8sClient: k8sClient,
eventRecorder: eventRecorder,
finalizerManager: finalizerManager,
logger: logger,
modelBuilder: agaModelBuilder,
stackMarshaller: stackMarshaller,
stackDeployer: stackDeployer,
statusUpdater: statusUpdater,
metricsCollector: metricsCollector,
reconcileTracker: reconcileCounters.IncrementAGA,

maxConcurrentReconciles: config.GlobalAcceleratorMaxConcurrentReconciles,
maxConcurrentReconciles: config.GlobalAcceleratorMaxConcurrentReconciles,
maxExponentialBackoffDelay: config.GlobalAcceleratorMaxExponentialBackoffDelay,
}
}

Expand All @@ -108,11 +132,14 @@ type globalAcceleratorReconciler struct {
finalizerManager k8s.FinalizerManager
modelBuilder aga.ModelBuilder
stackMarshaller deploy.StackMarshaller
stackDeployer agadeploy.StackDeployer
statusUpdater agastatus.StatusUpdater
logger logr.Logger
metricsCollector lbcmetrics.MetricCollector
reconcileTracker func(namespaceName types.NamespacedName)
reconcileTracker func(namespaceName ktypes.NamespacedName)

maxConcurrentReconciles int
maxConcurrentReconciles int
maxExponentialBackoffDelay time.Duration
}

// +kubebuilder:rbac:groups=aga.k8s.aws,resources=globalaccelerators,verbs=get;list;watch;patch
Expand Down Expand Up @@ -155,26 +182,19 @@ func (r *globalAcceleratorReconciler) reconcileGlobalAccelerator(ctx context.Con
return ctrlerrors.NewErrorWithMetrics(controllerName, MetricErrorAddFinalizers, err, r.metricsCollector)
}

// TODO: Implement GlobalAccelerator resource management
// This would include:
// 1. Creating/updating AWS Global Accelerator
// 2. Managing listeners and endpoint groups
// 3. Handling endpoint discovery from Services/Ingresses/Gateways
reconcileResourceFn := func() {
err = r.reconcileGlobalAcceleratorResources(ctx, ga)
}
r.metricsCollector.ObserveControllerReconcileLatency(controllerName, MetricStageReconcileGlobalAccelerator, reconcileResourceFn)
if err != nil {
return ctrlerrors.NewErrorWithMetrics(controllerName, MetricErrorReconcileGlobalAccelerator, err, r.metricsCollector)
}

r.eventRecorder.Event(ga, corev1.EventTypeNormal, k8s.GlobalAcceleratorEventReasonSuccessfullyReconciled, "Successfully reconciled")
return nil
}

func (r *globalAcceleratorReconciler) cleanupGlobalAccelerator(ctx context.Context, ga *agaapi.GlobalAccelerator) error {
if k8s.HasFinalizer(ga, shared_constants.GlobalAcceleratorFinalizer) {
// TODO: Implement cleanup logic for AWS Global Accelerator resources
// TODO: Implement cleanup logic for AWS Global Accelerator resources (Only cleaning up accelerator for now)
if err := r.cleanupGlobalAcceleratorResources(ctx, ga); err != nil {
r.eventRecorder.Event(ga, corev1.EventTypeWarning, k8s.GlobalAcceleratorEventReasonFailedCleanup, fmt.Sprintf("Failed cleanup due to %v", err))
return err
Expand Down Expand Up @@ -203,7 +223,7 @@ func (r *globalAcceleratorReconciler) buildModel(ctx context.Context, ga *agaapi
}

func (r *globalAcceleratorReconciler) reconcileGlobalAcceleratorResources(ctx context.Context, ga *agaapi.GlobalAccelerator) error {
r.logger.Info("Reconciling GlobalAccelerator resources", "name", ga.Name, "namespace", ga.Namespace)
r.logger.Info("Reconciling GlobalAccelerator resources", "globalAccelerator", k8s.NamespacedName(ga))
var stack core.Stack
var accelerator *agamodel.Accelerator
var err error
Expand All @@ -212,25 +232,92 @@ func (r *globalAcceleratorReconciler) reconcileGlobalAcceleratorResources(ctx co
}
r.metricsCollector.ObserveControllerReconcileLatency(controllerName, MetricStageBuildModel, buildModelFn)
if err != nil {
// Update status to indicate model building failure
if statusErr := r.statusUpdater.UpdateStatusFailure(ctx, ga, agadeploy.ModelBuildFailed, fmt.Sprintf("Failed to build model: %v", err)); statusErr != nil {
r.logger.Error(statusErr, "Failed to update GlobalAccelerator status after model build failure")
}
return ctrlerrors.NewErrorWithMetrics(controllerName, MetricErrorBuildModel, err, r.metricsCollector)
}

// Log the built model for debugging
r.logger.Info("Built model successfully", "accelerator", accelerator.ID(), "stackID", stack.StackID())
// Deploy the stack to create/update AWS Global Accelerator resources
deployStackFn := func() {
err = r.stackDeployer.Deploy(ctx, stack, r.metricsCollector, controllerName)
}
r.metricsCollector.ObserveControllerReconcileLatency(controllerName, MetricStageDeployStack, deployStackFn)
if err != nil {
r.eventRecorder.Event(ga, corev1.EventTypeWarning, k8s.GlobalAcceleratorEventReasonFailedDeploy, fmt.Sprintf("Failed to deploy stack due to %v", err))

// Update status to indicate deployment failure
if statusErr := r.statusUpdater.UpdateStatusFailure(ctx, ga, agadeploy.DeploymentFailed, fmt.Sprintf("Failed to deploy stack: %v", err)); statusErr != nil {
r.logger.Error(statusErr, "Failed to update GlobalAccelerator status after deployment failure")
}

return ctrlerrors.NewErrorWithMetrics(controllerName, MetricErrorDeployStack, err, r.metricsCollector)
}

r.logger.Info("Successfully deployed GlobalAccelerator stack", "stackID", stack.StackID())

// Update GlobalAccelerator status after successful deployment
requeueNeeded, err := r.statusUpdater.UpdateStatusSuccess(ctx, ga, accelerator)
if err != nil {
r.eventRecorder.Event(ga, corev1.EventTypeWarning, k8s.GlobalAcceleratorEventReasonFailedUpdateStatus, fmt.Sprintf("Failed update status due to %v", err))
return err
}
if requeueNeeded {
return ctrlerrors.NewRequeueNeededAfter(requeueMessage, statusUpdateRequeueTime)
}

// TODO: Implement the deploy phase
// This would include:
// 1. Deploy the stack to create/update AWS Global Accelerator resources
// 2. Update the GlobalAccelerator status with the created resources
// 3. Handle any deployment errors and update status accordingly
r.eventRecorder.Event(ga, corev1.EventTypeNormal, k8s.GlobalAcceleratorEventReasonSuccessfullyReconciled, "Successfully reconciled")

return nil
}

func (r *globalAcceleratorReconciler) cleanupGlobalAcceleratorResources(ctx context.Context, ga *agaapi.GlobalAccelerator) error {
// TODO: Implement the actual AWS Global Accelerator resource cleanup
// This is a placeholder implementation
r.logger.Info("Cleaning up GlobalAccelerator resources", "name", ga.Name, "namespace", ga.Namespace)
r.logger.Info("Cleaning up GlobalAccelerator resources", "globalAccelerator", k8s.NamespacedName(ga))

// TODO we will handle cleaning up dependent resources when we implement those
// 1. Find the accelerator ARN from the CRD status
if ga.Status.AcceleratorARN == nil {
r.logger.Info("No accelerator ARN found in status, nothing to clean up", "globalAccelerator", k8s.NamespacedName(ga))
return nil
}

acceleratorARN := *ga.Status.AcceleratorARN
if acceleratorARN == "" {
r.logger.Info("Empty accelerator ARN in status, nothing to clean up", "globalAccelerator", k8s.NamespacedName(ga))
return nil
}

// 2. Delete the accelerator using accelerator delete manager
acceleratorManager := r.stackDeployer.GetAcceleratorManager()
r.logger.Info("Deleting accelerator", "acceleratorARN", acceleratorARN, "globalAccelerator", k8s.NamespacedName(ga))

// Initialize reference to existing accelerator for deletion
acceleratorWithTags := agadeploy.AcceleratorWithTags{
Accelerator: &types.Accelerator{
AcceleratorArn: &acceleratorARN,
},
Tags: nil,
}

if err := acceleratorManager.Delete(ctx, acceleratorWithTags); err != nil {
// Check if it's an AcceleratorNotDisabledError
var notDisabledErr *agadeploy.AcceleratorNotDisabledError
if errors.As(err, &notDisabledErr) {
// Update status to indicate we're waiting for the accelerator to be disabled
if updateErr := r.statusUpdater.UpdateStatusDeletion(ctx, ga); updateErr != nil {
r.logger.Error(updateErr, "Failed to update status during accelerator deletion")
}
// Requeue after 30 seconds to check again
return ctrlerrors.NewRequeueNeeded("Waiting for accelerator to be disabled")
}

// Any other error
r.logger.Error(err, "Failed to delete accelerator", "acceleratorARN", acceleratorARN, "globalAccelerator", k8s.NamespacedName(ga))
return fmt.Errorf("failed to delete accelerator %s: %w", acceleratorARN, err)
}

r.logger.Info("Successfully cleaned up all GlobalAccelerator resources", "globalAccelerator", k8s.NamespacedName(ga))
return nil
}

Expand Down Expand Up @@ -259,6 +346,7 @@ func (r *globalAcceleratorReconciler) SetupWithManager(ctx context.Context, mgr
Named(controllerName).
WithOptions(controller.Options{
MaxConcurrentReconciles: r.maxConcurrentReconciles,
RateLimiter: workqueue.NewTypedItemExponentialFailureRateLimiter[reconcile.Request](5*time.Millisecond, r.maxExponentialBackoffDelay),
}).
Complete(r)
}
Expand Down
2 changes: 2 additions & 0 deletions docs/deploy/configurations.md
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,8 @@ Currently, you can set only 1 namespace to watch in this flag. See [this Kuberne
| [sync-period](#sync-period) | duration | 10h0m0s | Period at which the controller forces the repopulation of its local object stores |
| targetgroupbinding-max-concurrent-reconciles | int | 3 | Maximum number of concurrently running reconcile loops for targetGroupBinding |
| targetgroupbinding-max-exponential-backoff-delay | duration | 16m40s | Maximum duration of exponential backoff for targetGroupBinding reconcile failures |
| globalaccelerator-max-concurrent-reconciles | int | 3 | Maximum number of concurrently running reconcile loops for GlobalAccelerator objects |
| globalaccelerator-max-exponential-backoff-delay | duration | 16m40s | Maximum duration of exponential backoff for GlobalAccelerator reconcile failures |
| [lb-stabilization-monitor-interval](#lb-stabilization-monitor-interval) | duration | 2m | Interval at which the controller monitors the state of load balancer after creation
| tolerate-non-existent-backend-service | boolean | true | Whether to allow rules which refer to backend services that do not exist (When enabled, it will return 503 error if backend service not exist) |
| tolerate-non-existent-backend-action | boolean | true | Whether to allow rules which refer to backend actions that do not exist (When enabled, it will return 503 error if backend action not exist) |
Expand Down
2 changes: 2 additions & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ require (
github.com/aws/aws-sdk-go-v2/service/appmesh v1.27.7
github.com/aws/aws-sdk-go-v2/service/ec2 v1.173.0
github.com/aws/aws-sdk-go-v2/service/elasticloadbalancingv2 v1.51.0
github.com/aws/aws-sdk-go-v2/service/globalaccelerator v1.26.3
github.com/aws/aws-sdk-go-v2/service/resourcegroupstaggingapi v1.23.3
github.com/aws/aws-sdk-go-v2/service/servicediscovery v1.31.7
github.com/aws/aws-sdk-go-v2/service/shield v1.27.3
Expand Down Expand Up @@ -147,6 +148,7 @@ require (
github.com/sirupsen/logrus v1.9.3 // indirect
github.com/spf13/cast v1.7.0 // indirect
github.com/spf13/cobra v1.9.1 // indirect
github.com/stretchr/objx v0.5.2 // indirect
github.com/valyala/bytebufferpool v1.0.0 // indirect
github.com/valyala/fasthttp v1.34.0 // indirect
github.com/x448/float16 v0.8.4 // indirect
Expand Down
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,8 @@ github.com/aws/aws-sdk-go-v2/service/ec2 v1.173.0 h1:ta62lid9JkIpKZtZZXSj6rP2AqY
github.com/aws/aws-sdk-go-v2/service/ec2 v1.173.0/go.mod h1:o6QDjdVKpP5EF0dp/VlvqckzuSDATr1rLdHt3A5m0YY=
github.com/aws/aws-sdk-go-v2/service/elasticloadbalancingv2 v1.51.0 h1:Zy1yjx+R6cR4pAwzFFJ8nWJh4ri8I44H76PDJ77tcJo=
github.com/aws/aws-sdk-go-v2/service/elasticloadbalancingv2 v1.51.0/go.mod h1:RuZwE3p8IrWqK1kZhwH2TymlHLPuiI/taBMb8vrD39Q=
github.com/aws/aws-sdk-go-v2/service/globalaccelerator v1.26.3 h1:G8qcrur/MG4c7Wu+LMtpAPUSzmmaOa4ssHgYtefeJoo=
github.com/aws/aws-sdk-go-v2/service/globalaccelerator v1.26.3/go.mod h1:SJbyMV7JHSdKF1V0femihek4k7t2u5quWKiHzG8pihc=
github.com/aws/aws-sdk-go-v2/service/internal/accept-encoding v1.11.3 h1:dT3MqvGhSoaIhRseqw2I0yH81l7wiR2vjs57O51EAm8=
github.com/aws/aws-sdk-go-v2/service/internal/accept-encoding v1.11.3/go.mod h1:GlAeCkHwugxdHaueRr4nhPuY+WW+gR8UjlcqzPr1SPI=
github.com/aws/aws-sdk-go-v2/service/internal/presigned-url v1.11.17 h1:HGErhhrxZlQ044RiM+WdoZxp0p+EGM62y3L6pwA4olE=
Expand Down
Loading
Loading