Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 20 additions & 1 deletion cmd/thv-operator/Taskfile.yml
Original file line number Diff line number Diff line change
Expand Up @@ -123,22 +123,41 @@ tasks:
- helm upgrade --install toolhive-operator deploy/charts/operator --namespace toolhive-system --create-namespace --kubeconfig kconfig.yaml

operator-deploy-local:
desc: Build the ToolHive runtime and Operator image locally and deploy it to the K8s cluster
desc: |
Build the ToolHive runtime and Operator image locally and deploy it to the K8s cluster.
Set ENABLE_EXPERIMENTAL_FEATURES=true to enable experimental features in the operator
and automatically build and deploy the registry API image.
Example: task operator-deploy-local ENABLE_EXPERIMENTAL_FEATURES=true
platforms: [linux, darwin]
vars:
ENABLE_EXPERIMENTAL_FEATURES: '{{.ENABLE_EXPERIMENTAL_FEATURES | default "false"}}'
OPERATOR_IMAGE:
sh: KO_DOCKER_REPO=kind.local ko build --local -B ./cmd/thv-operator | tail -n 1
TOOLHIVE_IMAGE:
sh: KO_DOCKER_REPO=kind.local ko build --local -B ./cmd/thv-proxyrunner | tail -n 1
REGISTRY_API_IMAGE:
sh: |
if [ "{{.ENABLE_EXPERIMENTAL_FEATURES}}" = "true" ]; then
KO_DOCKER_REPO=kind.local ko build --local -B ./cmd/thv-registry-api | tail -n 1
else
echo ""
fi
cmds:
- echo "Loading toolhive operator image {{.OPERATOR_IMAGE}} into kind..."
- kind load docker-image --name toolhive {{.OPERATOR_IMAGE}}
- echo "Loading toolhive image {{.TOOLHIVE_IMAGE}} into kind..."
- kind load docker-image --name toolhive {{.TOOLHIVE_IMAGE}}
- |
if [ "{{.ENABLE_EXPERIMENTAL_FEATURES}}" = "true" ]; then
echo "Loading registry API image {{.REGISTRY_API_IMAGE}} into kind..."
kind load docker-image --name toolhive {{.REGISTRY_API_IMAGE}}
fi
- |
helm upgrade --install toolhive-operator deploy/charts/operator \
--set operator.image={{.OPERATOR_IMAGE}} \
--set operator.toolhiveRunnerImage={{.TOOLHIVE_IMAGE}} \
--set operator.features.experimental={{.ENABLE_EXPERIMENTAL_FEATURES}} \
{{if eq .ENABLE_EXPERIMENTAL_FEATURES "true"}}--set registryAPI.image={{.REGISTRY_API_IMAGE}}{{end}} \
--namespace toolhive-system \
--create-namespace \
--kubeconfig kconfig.yaml \
Expand Down
12 changes: 12 additions & 0 deletions cmd/thv-operator/api/v1alpha1/mcpregistry_types.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
package v1alpha1

import (
"fmt"

corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
)
Expand Down Expand Up @@ -250,6 +252,16 @@ type MCPRegistryList struct {
Items []MCPRegistry `json:"items"`
}

// GetStorageName returns the name used for registry storage resources
func (r *MCPRegistry) GetStorageName() string {
return fmt.Sprintf("%s-registry-storage", r.Name)
}

// GetAPIResourceName returns the base name for registry API resources (deployment, service)
func (r *MCPRegistry) GetAPIResourceName() string {
return fmt.Sprintf("%s-api", r.Name)
}

func init() {
SchemeBuilder.Register(&MCPRegistry{}, &MCPRegistryList{})
}
90 changes: 74 additions & 16 deletions cmd/thv-operator/controllers/mcpregistry_controller.go
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

in general, I'd move the functions to deploy the registry API to a dedicated package

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done in this push

Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,11 @@ package controllers

import (
"context"
"fmt"
"time"

appsv1 "k8s.io/api/apps/v1"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/runtime"
ctrl "sigs.k8s.io/controller-runtime"
Expand All @@ -12,6 +15,8 @@ import (
"sigs.k8s.io/controller-runtime/pkg/log"

mcpv1alpha1 "github.com/stacklok/toolhive/cmd/thv-operator/api/v1alpha1"
"github.com/stacklok/toolhive/cmd/thv-operator/pkg/mcpregistrystatus"
"github.com/stacklok/toolhive/cmd/thv-operator/pkg/registryapi"
"github.com/stacklok/toolhive/cmd/thv-operator/pkg/sources"
"github.com/stacklok/toolhive/cmd/thv-operator/pkg/sync"
)
Expand All @@ -22,19 +27,27 @@ type MCPRegistryReconciler struct {
Scheme *runtime.Scheme

// Sync manager handles all sync operations
syncManager sync.Manager
syncManager sync.Manager
storageManager sources.StorageManager
sourceHandlerFactory sources.SourceHandlerFactory
// Registry API manager handles API deployment operations
registryAPIManager registryapi.Manager
}

// NewMCPRegistryReconciler creates a new MCPRegistryReconciler with required dependencies
func NewMCPRegistryReconciler(k8sClient client.Client, scheme *runtime.Scheme) *MCPRegistryReconciler {
sourceHandlerFactory := sources.NewSourceHandlerFactory(k8sClient)
storageManager := sources.NewConfigMapStorageManager(k8sClient, scheme)
syncManager := sync.NewDefaultSyncManager(k8sClient, scheme, sourceHandlerFactory, storageManager)
registryAPIManager := registryapi.NewManager(k8sClient, scheme, storageManager, sourceHandlerFactory)

return &MCPRegistryReconciler{
Client: k8sClient,
Scheme: scheme,
syncManager: syncManager,
Client: k8sClient,
Scheme: scheme,
syncManager: syncManager,
storageManager: storageManager,
sourceHandlerFactory: sourceHandlerFactory,
registryAPIManager: registryAPIManager,
}
}

Expand All @@ -43,9 +56,15 @@ func NewMCPRegistryReconciler(k8sClient client.Client, scheme *runtime.Scheme) *
// +kubebuilder:rbac:groups=toolhive.stacklok.dev,resources=mcpregistries/finalizers,verbs=update
// +kubebuilder:rbac:groups="",resources=configmaps,verbs=get;list;watch;create;update;patch;delete
// +kubebuilder:rbac:groups="",resources=events,verbs=create;patch
//
// For creating registry-api deployment and service
// +kubebuilder:rbac:groups=apps,resources=deployments,verbs=get;list;watch;create;update;patch;delete
// +kubebuilder:rbac:groups="",resources=services,verbs=get;list;watch;create;update;patch;delete

// Reconcile is part of the main kubernetes reconciliation loop which aims to
// move the current state of the cluster closer to the desired state.
//
//nolint:gocyclo // Complex reconciliation logic requires multiple conditions
func (r *MCPRegistryReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) {
ctxLogger := log.FromContext(ctx)

Expand Down Expand Up @@ -106,8 +125,36 @@ func (r *MCPRegistryReconciler) Reconcile(ctx context.Context, req ctrl.Request)
return ctrl.Result{}, nil
}

// 3. Check if sync is needed before performing it
result, err := r.reconcileSync(ctx, mcpRegistry)
// 3. Create status collector for batched updates
statusCollector := mcpregistrystatus.NewCollector(mcpRegistry)

// 4. Reconcile sync operation
result, err := r.reconcileSync(ctx, mcpRegistry, statusCollector)

// 5. Reconcile API service (deployment and service, independent of sync status)
if apiErr := r.registryAPIManager.ReconcileAPIService(ctx, mcpRegistry, statusCollector); apiErr != nil {
ctxLogger.Error(apiErr, "Failed to reconcile API service")
if err == nil {
err = apiErr
}
}

// 6. Check if we need to requeue for API readiness
if err == nil && !r.registryAPIManager.IsAPIReady(ctx, mcpRegistry) {
ctxLogger.Info("API not ready yet, scheduling requeue to check readiness")
if result.RequeueAfter == 0 || result.RequeueAfter > time.Second*30 {
result.RequeueAfter = time.Second * 30
}
}

// 7. Apply all status changes in a single batch update
if statusUpdateErr := statusCollector.Apply(ctx, r.Status()); statusUpdateErr != nil {
ctxLogger.Error(statusUpdateErr, "Failed to apply batched status update")
// Return the status update error only if there was no main reconciliation error
if err == nil {
err = statusUpdateErr
}
}

// Log reconciliation completion
if err != nil {
Expand All @@ -124,16 +171,13 @@ func (r *MCPRegistryReconciler) Reconcile(ctx context.Context, req ctrl.Request)
}

// reconcileSync checks if sync is needed and performs it if necessary
func (r *MCPRegistryReconciler) reconcileSync(ctx context.Context, mcpRegistry *mcpv1alpha1.MCPRegistry) (ctrl.Result, error) {
// This method only handles data synchronization to the target ConfigMap
func (r *MCPRegistryReconciler) reconcileSync(
ctx context.Context, mcpRegistry *mcpv1alpha1.MCPRegistry, statusCollector mcpregistrystatus.Collector,
) (ctrl.Result, error) {
ctxLogger := log.FromContext(ctx)

// Refresh the object to get latest status for accurate timing calculations
if err := r.Get(ctx, client.ObjectKeyFromObject(mcpRegistry), mcpRegistry); err != nil {
ctxLogger.Error(err, "Failed to refresh MCPRegistry object for sync check")
return ctrl.Result{}, err
}

// Check if sync is needed
// Check if sync is needed - no need to refresh object here since we just fetched it
syncNeeded, syncReason, nextSyncTime, err := r.syncManager.ShouldSync(ctx, mcpRegistry)
if err != nil {
ctxLogger.Error(err, "Failed to determine if sync is needed")
Expand All @@ -155,6 +199,10 @@ func (r *MCPRegistryReconciler) reconcileSync(ctx context.Context, mcpRegistry *

ctxLogger.Info("Sync needed", "reason", syncReason)

// Set phase to syncing before starting the sync process
statusCollector.SetPhase(mcpv1alpha1.MCPRegistryPhaseSyncing)
statusCollector.SetMessage("Syncing registry data")

// Handle manual sync with no data changes - update trigger tracking only
if syncReason == sync.ReasonManualNoChanges {
return r.syncManager.UpdateManualSyncTriggerOnly(ctx, mcpRegistry)
Expand All @@ -165,6 +213,8 @@ func (r *MCPRegistryReconciler) reconcileSync(ctx context.Context, mcpRegistry *
if err != nil {
// Sync failed - schedule retry with exponential backoff
ctxLogger.Error(err, "Sync failed, scheduling retry")
statusCollector.SetPhase(mcpv1alpha1.MCPRegistryPhaseFailed)
statusCollector.SetMessage(fmt.Sprintf("Sync failed: %v", err))
// Use a shorter retry interval instead of the full sync interval
retryAfter := time.Minute * 5 // Default retry interval
if result.RequeueAfter > 0 {
Expand All @@ -174,6 +224,11 @@ func (r *MCPRegistryReconciler) reconcileSync(ctx context.Context, mcpRegistry *
return ctrl.Result{RequeueAfter: retryAfter}, err
}

// Sync successful - keep in syncing phase until API is also ready
statusCollector.SetMessage("Registry data synced successfully")

ctxLogger.Info("Registry data sync completed successfully")

// Schedule next automatic sync only if this was an automatic sync (not manual)
if mcpRegistry.Spec.SyncPolicy != nil && !sync.IsManualSync(syncReason) {
interval, parseErr := time.ParseDuration(mcpRegistry.Spec.SyncPolicy.Interval)
Expand All @@ -196,7 +251,7 @@ func (r *MCPRegistryReconciler) reconcileSync(ctx context.Context, mcpRegistry *
func (r *MCPRegistryReconciler) finalizeMCPRegistry(ctx context.Context, registry *mcpv1alpha1.MCPRegistry) error {
ctxLogger := log.FromContext(ctx)

// Update the MCPRegistry status to indicate termination
// Update the MCPRegistry status to indicate termination - immediate update needed since object is being deleted
registry.Status.Phase = mcpv1alpha1.MCPRegistryPhaseTerminating
registry.Status.Message = "MCPRegistry is being terminated"
if err := r.Status().Update(ctx, registry); err != nil {
Expand All @@ -211,7 +266,7 @@ func (r *MCPRegistryReconciler) finalizeMCPRegistry(ctx context.Context, registr
}

// TODO: Add additional cleanup logic when other features are implemented:
// - Clean up Registry API service
// - Clean up Registry API deployment and service (will be handled by owner references)
// - Cancel any running sync operations

ctxLogger.Info("MCPRegistry finalization completed", "registry", registry.Name)
Expand All @@ -222,5 +277,8 @@ func (r *MCPRegistryReconciler) finalizeMCPRegistry(ctx context.Context, registr
func (r *MCPRegistryReconciler) SetupWithManager(mgr ctrl.Manager) error {
return ctrl.NewControllerManagedBy(mgr).
For(&mcpv1alpha1.MCPRegistry{}).
Owns(&appsv1.Deployment{}).
Owns(&corev1.Service{}).
Owns(&corev1.ConfigMap{}).
Complete(r)
}
105 changes: 105 additions & 0 deletions cmd/thv-operator/pkg/mcpregistrystatus/collector.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,105 @@
// Package mcpregistrystatus provides status management and batched updates for MCPRegistry resources.
package mcpregistrystatus

import (
"context"
"fmt"

"k8s.io/apimachinery/pkg/api/meta"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/log"

mcpv1alpha1 "github.com/stacklok/toolhive/cmd/thv-operator/api/v1alpha1"
)

// StatusCollector collects status changes during reconciliation
// and applies them in a single batch update at the end.
// It implements the Collector interface.
type StatusCollector struct {
mcpRegistry *mcpv1alpha1.MCPRegistry
hasChanges bool
phase *mcpv1alpha1.MCPRegistryPhase
message *string
apiEndpoint *string
conditions map[string]metav1.Condition
}

// NewCollector creates a new status update collector for the given MCPRegistry resource.
func NewCollector(mcpRegistry *mcpv1alpha1.MCPRegistry) Collector {
return &StatusCollector{
mcpRegistry: mcpRegistry,
conditions: make(map[string]metav1.Condition),
}
}

// SetPhase sets the phase to be updated.
func (s *StatusCollector) SetPhase(phase mcpv1alpha1.MCPRegistryPhase) {
s.phase = &phase
s.hasChanges = true
}

// SetMessage sets the message to be updated.
func (s *StatusCollector) SetMessage(message string) {
s.message = &message
s.hasChanges = true
}

// SetAPIEndpoint sets the API endpoint to be updated.
func (s *StatusCollector) SetAPIEndpoint(endpoint string) {
s.apiEndpoint = &endpoint
s.hasChanges = true
}

// SetAPIReadyCondition adds or updates the API ready condition.
func (s *StatusCollector) SetAPIReadyCondition(reason, message string, status metav1.ConditionStatus) {
s.conditions[mcpv1alpha1.ConditionAPIReady] = metav1.Condition{
Type: mcpv1alpha1.ConditionAPIReady,
Status: status,
Reason: reason,
Message: message,
}
s.hasChanges = true
}

// Apply applies all collected status changes in a single batch update.
func (s *StatusCollector) Apply(ctx context.Context, statusWriter client.StatusWriter) error {
if !s.hasChanges {
return nil
}

ctxLogger := log.FromContext(ctx)

// Apply phase change
if s.phase != nil {
s.mcpRegistry.Status.Phase = *s.phase
}

// Apply message change
if s.message != nil {
s.mcpRegistry.Status.Message = *s.message
}

// Apply API endpoint change
if s.apiEndpoint != nil {
s.mcpRegistry.Status.APIEndpoint = *s.apiEndpoint
}

// Apply condition changes
for _, condition := range s.conditions {
meta.SetStatusCondition(&s.mcpRegistry.Status.Conditions, condition)
}

// Single status update
if err := statusWriter.Update(ctx, s.mcpRegistry); err != nil {
ctxLogger.Error(err, "Failed to apply batched status update")
return fmt.Errorf("failed to apply batched status update: %w", err)
}

ctxLogger.V(1).Info("Applied batched status update",
"phase", s.phase,
"message", s.message,
"conditionsCount", len(s.conditions))

return nil
}
31 changes: 31 additions & 0 deletions cmd/thv-operator/pkg/mcpregistrystatus/types.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
// Package mcpregistrystatus provides status management for MCPRegistry resources.
package mcpregistrystatus

import (
"context"

metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"sigs.k8s.io/controller-runtime/pkg/client"

mcpv1alpha1 "github.com/stacklok/toolhive/cmd/thv-operator/api/v1alpha1"
)

// Collector defines the interface for collecting MCPRegistry status updates.
// It provides methods to collect status changes during reconciliation
// and apply them in a single batch update at the end.
type Collector interface {
// SetAPIReadyCondition sets the API ready condition with the specified reason, message, and status
SetAPIReadyCondition(reason, message string, status metav1.ConditionStatus)

// SetAPIEndpoint sets the API endpoint in the status
SetAPIEndpoint(endpoint string)

// SetPhase sets the MCPRegistry phase in the status
SetPhase(phase mcpv1alpha1.MCPRegistryPhase)

// SetMessage sets the status message
SetMessage(message string)

// Apply applies all collected status changes in a single batch update
Apply(ctx context.Context, statusWriter client.StatusWriter) error
}
Loading
Loading