Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
167 changes: 4 additions & 163 deletions backend/oldoperationscanner/operations_scanner.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,10 +31,6 @@ import (
"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/trace"

arohcpv1alpha1 "github.com/openshift-online/ocm-sdk-go/arohcp/v1alpha1"
cmv1 "github.com/openshift-online/ocm-sdk-go/clustersmgmt/v1"
ocmerrors "github.com/openshift-online/ocm-sdk-go/errors"

"github.com/Azure/ARO-HCP/backend/pkg/controllers/operationcontrollers"
"github.com/Azure/ARO-HCP/backend/pkg/listers"
backendtracing "github.com/Azure/ARO-HCP/backend/pkg/tracing"
Expand All @@ -60,23 +56,6 @@ const (
pollExternalAuthOperationLabel = "poll_external_auth"
)

// Copied from uhc-clusters-service, because the
// OCM SDK does not define this for some reason.
type NodePoolStateValue string

const (
NodePoolStateValidating NodePoolStateValue = "validating"
NodePoolStatePending NodePoolStateValue = "pending"
NodePoolStateInstalling NodePoolStateValue = "installing"
NodePoolStateReady NodePoolStateValue = "ready"
NodePoolStateUpdating NodePoolStateValue = "updating"
NodePoolStateValidatingUpdate NodePoolStateValue = "validating_update"
NodePoolStatePendingUpdate NodePoolStateValue = "pending_update"
NodePoolStateUninstalling NodePoolStateValue = "uninstalling"
NodePoolStateRecoverableError NodePoolStateValue = "recoverable_error"
NodePoolStateError NodePoolStateValue = "error"
)

type operation struct {
id string
doc *api.Operation
Expand All @@ -95,25 +74,6 @@ func listOperationLabelValues() iter.Seq[string] {
})
}

// setSpanAttributes sets the operation and resource attributes on the span.
func (o *operation) setSpanAttributes(span trace.Span) {
// Operation attributes.
span.SetAttributes(
tracing.OperationIDKey.String(string(o.id)),
tracing.OperationTypeKey.String(string(o.doc.Request)),
tracing.OperationStatusKey.String(string(o.doc.Status)),
)

// Resource attributes.
if o.doc.ExternalID != nil {
span.SetAttributes(
tracing.ResourceGroupNameKey.String(o.doc.ExternalID.ResourceGroupName),
tracing.ResourceNameKey.String(o.doc.ExternalID.Name),
tracing.ResourceTypeKey.String(o.doc.ExternalID.ResourceType.Type),
)
}
}

type OperationsScanner struct {
dbClient database.DBClient
lockClient database.LockClientInterface
Expand Down Expand Up @@ -466,18 +426,15 @@ func (s *OperationsScanner) processOperations(ctx context.Context, subscriptionI
// processOperation processes a single operation on a resource.
func (s *OperationsScanner) processOperation(ctx context.Context, op operation) {
logger := utils.LoggerFromContext(ctx)
ctx, span := startChildSpan(ctx, "processOperation")
_, span := startChildSpan(ctx, "processOperation")
defer span.End()

logger.Info("Processing")
defer logger.Info("Processed")

switch op.doc.InternalID.Kind() {
case arohcpv1alpha1.NodePoolKind:
s.pollNodePoolOperation(ctx, op)
case cmv1.ExternalAuthKind:
s.pollExternalAuthOperation(ctx, op)
}
// XXX The previous business logic of OperationsScanner has
// been converted to various Kubernetes-style controllers
// that fulfill the OperationSynchronizer interface.
}

func (s *OperationsScanner) recordOperationError(ctx context.Context, operationName string, err error) {
Expand All @@ -490,76 +447,6 @@ func (s *OperationsScanner) recordOperationError(ctx context.Context, operationN
span.RecordError(err)
}

// pollNodePoolOperation updates the status of a node pool operation.
func (s *OperationsScanner) pollNodePoolOperation(ctx context.Context, op operation) {
logger := utils.LoggerFromContext(ctx)
ctx, span := startChildSpan(ctx, "pollNodePoolOperation")
defer span.End()
defer s.updateOperationMetrics(pollNodePoolOperationLabel)()
op.setSpanAttributes(span)

nodePoolStatus, err := s.clusterService.GetNodePoolStatus(ctx, op.doc.InternalID)
if err != nil {
var ocmError *ocmerrors.Error
if errors.As(err, &ocmError) && ocmError.Status() == http.StatusNotFound && op.doc.Request == database.OperationRequestDelete {
err = operationcontrollers.SetDeleteOperationAsCompleted(ctx, s.dbClient, op.doc, s.postAsyncNotification)
if err != nil {
s.recordOperationError(ctx, pollNodePoolOperationLabel, err)
logger.Error(err, "Failed to handle a completed deletion")
}
} else {
s.recordOperationError(ctx, pollNodePoolOperationLabel, err)
logger.Error(err, "Failed to get node pool status")
}

return
}

opStatus, opError, err := convertNodePoolStatus(op, nodePoolStatus)
if err != nil {
s.recordOperationError(ctx, pollNodePoolOperationLabel, err)
logger.Info("Node pool status conversion warning", "error", err.Error())
return
}

err = operationcontrollers.UpdateOperationStatus(ctx, s.dbClient, op.doc, opStatus, opError, s.postAsyncNotification)
if err != nil {
s.recordOperationError(ctx, pollNodePoolOperationLabel, err)
logger.Error(err, "Failed to update operation status")
}
}

// pollExternalAuthOperation updates the status of an external auth operation.
func (s *OperationsScanner) pollExternalAuthOperation(ctx context.Context, op operation) {
logger := utils.LoggerFromContext(ctx)
ctx, span := startChildSpan(ctx, "pollExternalAuthOperation")
defer span.End()
defer s.updateOperationMetrics(pollExternalAuthOperationLabel)()
op.setSpanAttributes(span)

_, err := s.clusterService.GetExternalAuth(ctx, op.doc.InternalID)
if err != nil {
var ocmError *ocmerrors.Error
if errors.As(err, &ocmError) && ocmError.Status() == http.StatusNotFound && op.doc.Request == database.OperationRequestDelete {
err = operationcontrollers.SetDeleteOperationAsCompleted(ctx, s.dbClient, op.doc, s.postAsyncNotification)
if err != nil {
s.recordOperationError(ctx, pollExternalAuthOperationLabel, err)
logger.Error(err, "Failed to handle a completed deletion")
}
} else {
s.recordOperationError(ctx, pollExternalAuthOperationLabel, err)
logger.Error(err, "Failed to get external auth status")
}

return
}
err = operationcontrollers.UpdateOperationStatus(ctx, s.dbClient, op.doc, arm.ProvisioningStateSucceeded, nil, s.postAsyncNotification)
if err != nil {
s.recordOperationError(ctx, pollExternalAuthOperationLabel, err)
logger.Error(err, "Failed to update operation status")
}
}

// withSubscriptionLock holds a subscription lock while executing the given function.
// In the event the subscription lock is lost, the context passed to the function will
// be canceled.
Expand Down Expand Up @@ -602,52 +489,6 @@ func (s *OperationsScanner) postAsyncNotification(ctx context.Context, operation
return operationcontrollers.PostAsyncNotification(ctx, s.notificationClient, operation)
}

// convertNodePoolStatus attempts to translate a NodePoolStatus object
// from Cluster Service into an ARM provisioning state and, if necessary,
// a structured OData error.
func convertNodePoolStatus(op operation, nodePoolStatus *arohcpv1alpha1.NodePoolStatus) (arm.ProvisioningState, *arm.CloudErrorBody, error) {
var opStatus = op.doc.Status
var opError *arm.CloudErrorBody
var err error

switch state := NodePoolStateValue(nodePoolStatus.State().NodePoolStateValue()); state {
case NodePoolStateValidating, NodePoolStatePending, NodePoolStateValidatingUpdate, NodePoolStatePendingUpdate:
// These are valid node pool states for ARO-HCP but there are
// no unique ProvisioningState values for them. They should
// only occur when ProvisioningState is Accepted.
if opStatus != arm.ProvisioningStateAccepted {
err = fmt.Errorf("got NodePoolStatusValue '%s' while ProvisioningState was '%s' instead of '%s'", state, opStatus, arm.ProvisioningStateAccepted)
}
case NodePoolStateInstalling:
opStatus = arm.ProvisioningStateProvisioning
case NodePoolStateReady:
// Resource deletion is successful when fetching its state
// from Cluster Service returns a "404 Not Found" error. If
// we see the resource in a "Ready" state during a deletion
// operation, leave the current provisioning state as is.
if op.doc.Request != database.OperationRequestDelete {
opStatus = arm.ProvisioningStateSucceeded
}
case NodePoolStateUpdating:
opStatus = arm.ProvisioningStateUpdating
case NodePoolStateUninstalling:
opStatus = arm.ProvisioningStateDeleting
case NodePoolStateRecoverableError, NodePoolStateError:
// XXX OCM SDK offers no error code or message for failed node pool
// operations so "Internal Server Error" is all we can do for now.
// https://issues.redhat.com/browse/ARO-14969
opStatus = arm.ProvisioningStateFailed
opError = arm.NewInternalServerError().CloudErrorBody
if msg, ok := nodePoolStatus.GetMessage(); ok {
opError.Message = msg
}
default:
err = fmt.Errorf("unhandled NodePoolState '%s'", state)
}

return opStatus, opError, err
}

// StartRootSpan initiates a new parent trace.
func StartRootSpan(ctx context.Context, name string) (context.Context, trace.Span) {
return otel.GetTracerProvider().
Expand Down
86 changes: 73 additions & 13 deletions backend/pkg/app/backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ import (
"fmt"
"net/http"
"sync"
"sync/atomic"
"time"

"github.com/prometheus/client_golang/prometheus"
Expand All @@ -31,7 +30,6 @@ import (
"k8s.io/client-go/tools/leaderelection/resourcelock"
utilsclock "k8s.io/utils/clock"

"github.com/Azure/ARO-HCP/backend/oldoperationscanner"
"github.com/Azure/ARO-HCP/backend/pkg/controllers"
"github.com/Azure/ARO-HCP/backend/pkg/controllers/controllerutils"
"github.com/Azure/ARO-HCP/backend/pkg/controllers/mismatchcontrollers"
Expand Down Expand Up @@ -240,9 +238,6 @@ func (b *Backend) runBackendControllersUnderLeaderElection(ctx context.Context,
activeOperationInformer, activeOperationLister := backendInformers.ActiveOperations()
clusterInformer, _ := backendInformers.Clusters()

startedLeading := atomic.Bool{}
operationsScanner := oldoperationscanner.NewOperationsScanner(
b.options.CosmosDBClient, b.options.ClustersServiceClient, b.options.AzureLocation, subscriptionLister)
dataDumpController := controllerutils.NewClusterWatchingController(
"DataDump", b.options.CosmosDBClient, clusterInformer, 1*time.Minute, controllers.NewDataDumpController(activeOperationLister, b.options.CosmosDBClient))
doNothingController := controllers.NewDoNothingExampleController(b.options.CosmosDBClient, subscriptionLister)
Expand Down Expand Up @@ -280,6 +275,72 @@ func (b *Backend) runBackendControllersUnderLeaderElection(ctx context.Context,
activeOperationInformer,
b.options.CosmosDBClient,
)
operationNodePoolCreateController := operationcontrollers.NewGenericOperationController(
"OperationNodePoolCreate",
operationcontrollers.NewOperationNodePoolCreateSynchronizer(
b.options.CosmosDBClient,
b.options.ClustersServiceClient,
http.DefaultClient,
),
10*time.Second,
activeOperationInformer,
b.options.CosmosDBClient,
)
operationNodePoolUpdateController := operationcontrollers.NewGenericOperationController(
"OperationNodePoolUpdate",
operationcontrollers.NewOperationNodePoolUpdateSynchronizer(
b.options.CosmosDBClient,
b.options.ClustersServiceClient,
http.DefaultClient,
),
10*time.Second,
activeOperationInformer,
b.options.CosmosDBClient,
)
operationNodePoolDeleteController := operationcontrollers.NewGenericOperationController(
"OperationNodePoolDelete",
operationcontrollers.NewOperationNodePoolDeleteSynchronizer(
b.options.CosmosDBClient,
b.options.ClustersServiceClient,
http.DefaultClient,
),
10*time.Second,
activeOperationInformer,
b.options.CosmosDBClient,
)
operationExternalAuthCreateController := operationcontrollers.NewGenericOperationController(
"OperationExternalAuthCreate",
operationcontrollers.NewOperationExternalAuthCreateSynchronizer(
b.options.CosmosDBClient,
b.options.ClustersServiceClient,
http.DefaultClient,
),
10*time.Second,
activeOperationInformer,
b.options.CosmosDBClient,
)
operationExternalAuthUpdateController := operationcontrollers.NewGenericOperationController(
"OperationExternalAuthUpdate",
operationcontrollers.NewOperationExternalAuthUpdateSynchronizer(
b.options.CosmosDBClient,
b.options.ClustersServiceClient,
http.DefaultClient,
),
10*time.Second,
activeOperationInformer,
b.options.CosmosDBClient,
)
operationExternalAuthDeleteController := operationcontrollers.NewGenericOperationController(
"OperationExternalAuthDelete",
operationcontrollers.NewOperationExternalAuthDeleteSynchronizer(
b.options.CosmosDBClient,
b.options.ClustersServiceClient,
http.DefaultClient,
),
10*time.Second,
activeOperationInformer,
b.options.CosmosDBClient,
)
operationRequestCredentialController := operationcontrollers.NewGenericOperationController(
"OperationRequestCredential",
operationcontrollers.NewOperationRequestCredentialSynchronizer(
Expand Down Expand Up @@ -333,18 +394,20 @@ func (b *Backend) runBackendControllersUnderLeaderElection(ctx context.Context,
RetryPeriod: leaderElectionRetryPeriod,
Callbacks: leaderelection.LeaderCallbacks{
OnStartedLeading: func(ctx context.Context) {
operationsScanner.LeaderGauge.Set(1)
startedLeading.Store(true)

// start the SharedInformers
go backendInformers.RunWithContext(ctx)

go operationsScanner.Run(ctx)
go dataDumpController.Run(ctx, 20)
go doNothingController.Run(ctx, 20)
go operationClusterCreateController.Run(ctx, 20)
go operationClusterUpdateController.Run(ctx, 20)
go operationClusterDeleteController.Run(ctx, 20)
go operationNodePoolCreateController.Run(ctx, 20)
go operationNodePoolUpdateController.Run(ctx, 20)
go operationNodePoolDeleteController.Run(ctx, 20)
go operationExternalAuthCreateController.Run(ctx, 20)
go operationExternalAuthUpdateController.Run(ctx, 20)
go operationExternalAuthDeleteController.Run(ctx, 20)
go operationRequestCredentialController.Run(ctx, 20)
go operationRevokeCredentialsController.Run(ctx, 20)
go clusterServiceMatchingClusterController.Run(ctx, 20)
Expand All @@ -356,10 +419,7 @@ func (b *Backend) runBackendControllersUnderLeaderElection(ctx context.Context,
go triggerControlPlaneUpgradeController.Run(ctx, 20)
},
OnStoppedLeading: func() {
operationsScanner.LeaderGauge.Set(0)
if startedLeading.Load() {
operationsScanner.Join()
}
// This needs to be defined even though it does nothing.
},
},
ReleaseOnCancel: true,
Expand Down
Loading