Skip to content

Commit 8f1a7e9

Browse files
authored
Thv registry deploy from controller (#1931)
* Add a service account and RBAC permissions for thv-registry-api * Explicitly annotate mcpregistry controller to create deployments and services * Optionally push the registry-api server to the kind cluster * Create the deployment * Create the service * Actually deploy the service and the deployment * fix lint * actually deploy fix * Replace DEPLOY_REGISTRY_API with ENABLE_EXPERIMENTAL_FEATURES - Update Taskfile to use ENABLE_EXPERIMENTAL_FEATURES instead of DEPLOY_REGISTRY_API - Replace registryAPI.serviceAccount.create condition with operator.features.experimental - Registry API RBAC resources (ServiceAccount, ClusterRole, ClusterRoleBinding) now only created when experimental features are enabled - Simplify deployment logic by consolidating feature flags * Refactor registry-api deployment into a separate package * Regenerate helm docs * Reduce duplication with helper methods to get storage name and resource name for MCPRegistry * Consolidate constants * Add unit tests * Consolidate logic in updating deployment to be the same as in updating service * Review feedback: Only reconcile API service if sync err is nil * Remove uneeded ConfigMap RBAC role * Bump charts
1 parent 36d52a7 commit 8f1a7e9

27 files changed

+2879
-21
lines changed

cmd/thv-operator/Taskfile.yml

Lines changed: 20 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -123,22 +123,41 @@ tasks:
123123
- helm upgrade --install toolhive-operator deploy/charts/operator --namespace toolhive-system --create-namespace --kubeconfig kconfig.yaml
124124

125125
operator-deploy-local:
126-
desc: Build the ToolHive runtime and Operator image locally and deploy it to the K8s cluster
126+
desc: |
127+
Build the ToolHive runtime and Operator image locally and deploy it to the K8s cluster.
128+
Set ENABLE_EXPERIMENTAL_FEATURES=true to enable experimental features in the operator
129+
and automatically build and deploy the registry API image.
130+
Example: task operator-deploy-local ENABLE_EXPERIMENTAL_FEATURES=true
127131
platforms: [linux, darwin]
128132
vars:
133+
ENABLE_EXPERIMENTAL_FEATURES: '{{.ENABLE_EXPERIMENTAL_FEATURES | default "false"}}'
129134
OPERATOR_IMAGE:
130135
sh: KO_DOCKER_REPO=kind.local ko build --local -B ./cmd/thv-operator | tail -n 1
131136
TOOLHIVE_IMAGE:
132137
sh: KO_DOCKER_REPO=kind.local ko build --local -B ./cmd/thv-proxyrunner | tail -n 1
138+
REGISTRY_API_IMAGE:
139+
sh: |
140+
if [ "{{.ENABLE_EXPERIMENTAL_FEATURES}}" = "true" ]; then
141+
KO_DOCKER_REPO=kind.local ko build --local -B ./cmd/thv-registry-api | tail -n 1
142+
else
143+
echo ""
144+
fi
133145
cmds:
134146
- echo "Loading toolhive operator image {{.OPERATOR_IMAGE}} into kind..."
135147
- kind load docker-image --name toolhive {{.OPERATOR_IMAGE}}
136148
- echo "Loading toolhive image {{.TOOLHIVE_IMAGE}} into kind..."
137149
- kind load docker-image --name toolhive {{.TOOLHIVE_IMAGE}}
150+
- |
151+
if [ "{{.ENABLE_EXPERIMENTAL_FEATURES}}" = "true" ]; then
152+
echo "Loading registry API image {{.REGISTRY_API_IMAGE}} into kind..."
153+
kind load docker-image --name toolhive {{.REGISTRY_API_IMAGE}}
154+
fi
138155
- |
139156
helm upgrade --install toolhive-operator deploy/charts/operator \
140157
--set operator.image={{.OPERATOR_IMAGE}} \
141158
--set operator.toolhiveRunnerImage={{.TOOLHIVE_IMAGE}} \
159+
--set operator.features.experimental={{.ENABLE_EXPERIMENTAL_FEATURES}} \
160+
{{if eq .ENABLE_EXPERIMENTAL_FEATURES "true"}}--set registryAPI.image={{.REGISTRY_API_IMAGE}}{{end}} \
142161
--namespace toolhive-system \
143162
--create-namespace \
144163
--kubeconfig kconfig.yaml \

cmd/thv-operator/api/v1alpha1/mcpregistry_types.go

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,8 @@
11
package v1alpha1
22

33
import (
4+
"fmt"
5+
46
corev1 "k8s.io/api/core/v1"
57
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
68
)
@@ -250,6 +252,16 @@ type MCPRegistryList struct {
250252
Items []MCPRegistry `json:"items"`
251253
}
252254

255+
// GetStorageName returns the name used for registry storage resources
256+
func (r *MCPRegistry) GetStorageName() string {
257+
return fmt.Sprintf("%s-registry-storage", r.Name)
258+
}
259+
260+
// GetAPIResourceName returns the base name for registry API resources (deployment, service)
261+
func (r *MCPRegistry) GetAPIResourceName() string {
262+
return fmt.Sprintf("%s-api", r.Name)
263+
}
264+
253265
func init() {
254266
SchemeBuilder.Register(&MCPRegistry{}, &MCPRegistryList{})
255267
}

cmd/thv-operator/controllers/mcpregistry_controller.go

Lines changed: 80 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -2,8 +2,11 @@ package controllers
22

33
import (
44
"context"
5+
"fmt"
56
"time"
67

8+
appsv1 "k8s.io/api/apps/v1"
9+
corev1 "k8s.io/api/core/v1"
710
"k8s.io/apimachinery/pkg/api/errors"
811
"k8s.io/apimachinery/pkg/runtime"
912
ctrl "sigs.k8s.io/controller-runtime"
@@ -12,6 +15,8 @@ import (
1215
"sigs.k8s.io/controller-runtime/pkg/log"
1316

1417
mcpv1alpha1 "github.com/stacklok/toolhive/cmd/thv-operator/api/v1alpha1"
18+
"github.com/stacklok/toolhive/cmd/thv-operator/pkg/mcpregistrystatus"
19+
"github.com/stacklok/toolhive/cmd/thv-operator/pkg/registryapi"
1520
"github.com/stacklok/toolhive/cmd/thv-operator/pkg/sources"
1621
"github.com/stacklok/toolhive/cmd/thv-operator/pkg/sync"
1722
)
@@ -22,19 +27,27 @@ type MCPRegistryReconciler struct {
2227
Scheme *runtime.Scheme
2328

2429
// Sync manager handles all sync operations
25-
syncManager sync.Manager
30+
syncManager sync.Manager
31+
storageManager sources.StorageManager
32+
sourceHandlerFactory sources.SourceHandlerFactory
33+
// Registry API manager handles API deployment operations
34+
registryAPIManager registryapi.Manager
2635
}
2736

2837
// NewMCPRegistryReconciler creates a new MCPRegistryReconciler with required dependencies
2938
func NewMCPRegistryReconciler(k8sClient client.Client, scheme *runtime.Scheme) *MCPRegistryReconciler {
3039
sourceHandlerFactory := sources.NewSourceHandlerFactory(k8sClient)
3140
storageManager := sources.NewConfigMapStorageManager(k8sClient, scheme)
3241
syncManager := sync.NewDefaultSyncManager(k8sClient, scheme, sourceHandlerFactory, storageManager)
42+
registryAPIManager := registryapi.NewManager(k8sClient, scheme, storageManager, sourceHandlerFactory)
3343

3444
return &MCPRegistryReconciler{
35-
Client: k8sClient,
36-
Scheme: scheme,
37-
syncManager: syncManager,
45+
Client: k8sClient,
46+
Scheme: scheme,
47+
syncManager: syncManager,
48+
storageManager: storageManager,
49+
sourceHandlerFactory: sourceHandlerFactory,
50+
registryAPIManager: registryAPIManager,
3851
}
3952
}
4053

@@ -43,9 +56,15 @@ func NewMCPRegistryReconciler(k8sClient client.Client, scheme *runtime.Scheme) *
4356
// +kubebuilder:rbac:groups=toolhive.stacklok.dev,resources=mcpregistries/finalizers,verbs=update
4457
// +kubebuilder:rbac:groups="",resources=configmaps,verbs=get;list;watch;create;update;patch;delete
4558
// +kubebuilder:rbac:groups="",resources=events,verbs=create;patch
59+
//
60+
// For creating registry-api deployment and service
61+
// +kubebuilder:rbac:groups=apps,resources=deployments,verbs=get;list;watch;create;update;patch;delete
62+
// +kubebuilder:rbac:groups="",resources=services,verbs=get;list;watch;create;update;patch;delete
4663

4764
// Reconcile is part of the main kubernetes reconciliation loop which aims to
4865
// move the current state of the cluster closer to the desired state.
66+
//
67+
//nolint:gocyclo // Complex reconciliation logic requires multiple conditions
4968
func (r *MCPRegistryReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) {
5069
ctxLogger := log.FromContext(ctx)
5170

@@ -106,8 +125,42 @@ func (r *MCPRegistryReconciler) Reconcile(ctx context.Context, req ctrl.Request)
106125
return ctrl.Result{}, nil
107126
}
108127

109-
// 3. Check if sync is needed before performing it
110-
result, err := r.reconcileSync(ctx, mcpRegistry)
128+
// 3. Create status collector for batched updates
129+
statusCollector := mcpregistrystatus.NewCollector(mcpRegistry)
130+
131+
// 4. Reconcile sync operation
132+
result, syncErr := r.reconcileSync(ctx, mcpRegistry, statusCollector)
133+
134+
// 5. Reconcile API service (deployment and service, independent of sync status)
135+
if syncErr == nil {
136+
if apiErr := r.registryAPIManager.ReconcileAPIService(ctx, mcpRegistry, statusCollector); apiErr != nil {
137+
ctxLogger.Error(apiErr, "Failed to reconcile API service")
138+
if syncErr == nil {
139+
err = apiErr
140+
}
141+
}
142+
}
143+
144+
// 6. Check if we need to requeue for API readiness
145+
if syncErr == nil && !r.registryAPIManager.IsAPIReady(ctx, mcpRegistry) {
146+
ctxLogger.Info("API not ready yet, scheduling requeue to check readiness")
147+
if result.RequeueAfter == 0 || result.RequeueAfter > time.Second*30 {
148+
result.RequeueAfter = time.Second * 30
149+
}
150+
}
151+
152+
// 7. Apply all status changes in a single batch update
153+
if statusUpdateErr := statusCollector.Apply(ctx, r.Status()); statusUpdateErr != nil {
154+
ctxLogger.Error(statusUpdateErr, "Failed to apply batched status update")
155+
// Return the status update error only if there was no main reconciliation error
156+
if syncErr == nil {
157+
err = statusUpdateErr
158+
}
159+
}
160+
161+
if err == nil {
162+
err = syncErr
163+
}
111164

112165
// Log reconciliation completion
113166
if err != nil {
@@ -124,16 +177,13 @@ func (r *MCPRegistryReconciler) Reconcile(ctx context.Context, req ctrl.Request)
124177
}
125178

126179
// reconcileSync checks if sync is needed and performs it if necessary
127-
func (r *MCPRegistryReconciler) reconcileSync(ctx context.Context, mcpRegistry *mcpv1alpha1.MCPRegistry) (ctrl.Result, error) {
180+
// This method only handles data synchronization to the target ConfigMap
181+
func (r *MCPRegistryReconciler) reconcileSync(
182+
ctx context.Context, mcpRegistry *mcpv1alpha1.MCPRegistry, statusCollector mcpregistrystatus.Collector,
183+
) (ctrl.Result, error) {
128184
ctxLogger := log.FromContext(ctx)
129185

130-
// Refresh the object to get latest status for accurate timing calculations
131-
if err := r.Get(ctx, client.ObjectKeyFromObject(mcpRegistry), mcpRegistry); err != nil {
132-
ctxLogger.Error(err, "Failed to refresh MCPRegistry object for sync check")
133-
return ctrl.Result{}, err
134-
}
135-
136-
// Check if sync is needed
186+
// Check if sync is needed - no need to refresh object here since we just fetched it
137187
syncNeeded, syncReason, nextSyncTime, err := r.syncManager.ShouldSync(ctx, mcpRegistry)
138188
if err != nil {
139189
ctxLogger.Error(err, "Failed to determine if sync is needed")
@@ -155,6 +205,10 @@ func (r *MCPRegistryReconciler) reconcileSync(ctx context.Context, mcpRegistry *
155205

156206
ctxLogger.Info("Sync needed", "reason", syncReason)
157207

208+
// Set phase to syncing before starting the sync process
209+
statusCollector.SetPhase(mcpv1alpha1.MCPRegistryPhaseSyncing)
210+
statusCollector.SetMessage("Syncing registry data")
211+
158212
// Handle manual sync with no data changes - update trigger tracking only
159213
if syncReason == sync.ReasonManualNoChanges {
160214
return r.syncManager.UpdateManualSyncTriggerOnly(ctx, mcpRegistry)
@@ -165,6 +219,8 @@ func (r *MCPRegistryReconciler) reconcileSync(ctx context.Context, mcpRegistry *
165219
if err != nil {
166220
// Sync failed - schedule retry with exponential backoff
167221
ctxLogger.Error(err, "Sync failed, scheduling retry")
222+
statusCollector.SetPhase(mcpv1alpha1.MCPRegistryPhaseFailed)
223+
statusCollector.SetMessage(fmt.Sprintf("Sync failed: %v", err))
168224
// Use a shorter retry interval instead of the full sync interval
169225
retryAfter := time.Minute * 5 // Default retry interval
170226
if result.RequeueAfter > 0 {
@@ -174,6 +230,11 @@ func (r *MCPRegistryReconciler) reconcileSync(ctx context.Context, mcpRegistry *
174230
return ctrl.Result{RequeueAfter: retryAfter}, err
175231
}
176232

233+
// Sync successful - keep in syncing phase until API is also ready
234+
statusCollector.SetMessage("Registry data synced successfully")
235+
236+
ctxLogger.Info("Registry data sync completed successfully")
237+
177238
// Schedule next automatic sync only if this was an automatic sync (not manual)
178239
if mcpRegistry.Spec.SyncPolicy != nil && !sync.IsManualSync(syncReason) {
179240
interval, parseErr := time.ParseDuration(mcpRegistry.Spec.SyncPolicy.Interval)
@@ -196,7 +257,7 @@ func (r *MCPRegistryReconciler) reconcileSync(ctx context.Context, mcpRegistry *
196257
func (r *MCPRegistryReconciler) finalizeMCPRegistry(ctx context.Context, registry *mcpv1alpha1.MCPRegistry) error {
197258
ctxLogger := log.FromContext(ctx)
198259

199-
// Update the MCPRegistry status to indicate termination
260+
// Update the MCPRegistry status to indicate termination - immediate update needed since object is being deleted
200261
registry.Status.Phase = mcpv1alpha1.MCPRegistryPhaseTerminating
201262
registry.Status.Message = "MCPRegistry is being terminated"
202263
if err := r.Status().Update(ctx, registry); err != nil {
@@ -211,7 +272,7 @@ func (r *MCPRegistryReconciler) finalizeMCPRegistry(ctx context.Context, registr
211272
}
212273

213274
// TODO: Add additional cleanup logic when other features are implemented:
214-
// - Clean up Registry API service
275+
// - Clean up Registry API deployment and service (will be handled by owner references)
215276
// - Cancel any running sync operations
216277

217278
ctxLogger.Info("MCPRegistry finalization completed", "registry", registry.Name)
@@ -222,5 +283,8 @@ func (r *MCPRegistryReconciler) finalizeMCPRegistry(ctx context.Context, registr
222283
func (r *MCPRegistryReconciler) SetupWithManager(mgr ctrl.Manager) error {
223284
return ctrl.NewControllerManagedBy(mgr).
224285
For(&mcpv1alpha1.MCPRegistry{}).
286+
Owns(&appsv1.Deployment{}).
287+
Owns(&corev1.Service{}).
288+
Owns(&corev1.ConfigMap{}).
225289
Complete(r)
226290
}
Lines changed: 105 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,105 @@
1+
// Package mcpregistrystatus provides status management and batched updates for MCPRegistry resources.
2+
package mcpregistrystatus
3+
4+
import (
5+
"context"
6+
"fmt"
7+
8+
"k8s.io/apimachinery/pkg/api/meta"
9+
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
10+
"sigs.k8s.io/controller-runtime/pkg/client"
11+
"sigs.k8s.io/controller-runtime/pkg/log"
12+
13+
mcpv1alpha1 "github.com/stacklok/toolhive/cmd/thv-operator/api/v1alpha1"
14+
)
15+
16+
// StatusCollector collects status changes during reconciliation
17+
// and applies them in a single batch update at the end.
18+
// It implements the Collector interface.
19+
type StatusCollector struct {
20+
mcpRegistry *mcpv1alpha1.MCPRegistry
21+
hasChanges bool
22+
phase *mcpv1alpha1.MCPRegistryPhase
23+
message *string
24+
apiEndpoint *string
25+
conditions map[string]metav1.Condition
26+
}
27+
28+
// NewCollector creates a new status update collector for the given MCPRegistry resource.
29+
func NewCollector(mcpRegistry *mcpv1alpha1.MCPRegistry) Collector {
30+
return &StatusCollector{
31+
mcpRegistry: mcpRegistry,
32+
conditions: make(map[string]metav1.Condition),
33+
}
34+
}
35+
36+
// SetPhase sets the phase to be updated.
37+
func (s *StatusCollector) SetPhase(phase mcpv1alpha1.MCPRegistryPhase) {
38+
s.phase = &phase
39+
s.hasChanges = true
40+
}
41+
42+
// SetMessage sets the message to be updated.
43+
func (s *StatusCollector) SetMessage(message string) {
44+
s.message = &message
45+
s.hasChanges = true
46+
}
47+
48+
// SetAPIEndpoint sets the API endpoint to be updated.
49+
func (s *StatusCollector) SetAPIEndpoint(endpoint string) {
50+
s.apiEndpoint = &endpoint
51+
s.hasChanges = true
52+
}
53+
54+
// SetAPIReadyCondition adds or updates the API ready condition.
55+
func (s *StatusCollector) SetAPIReadyCondition(reason, message string, status metav1.ConditionStatus) {
56+
s.conditions[mcpv1alpha1.ConditionAPIReady] = metav1.Condition{
57+
Type: mcpv1alpha1.ConditionAPIReady,
58+
Status: status,
59+
Reason: reason,
60+
Message: message,
61+
}
62+
s.hasChanges = true
63+
}
64+
65+
// Apply applies all collected status changes in a single batch update.
66+
func (s *StatusCollector) Apply(ctx context.Context, statusWriter client.StatusWriter) error {
67+
if !s.hasChanges {
68+
return nil
69+
}
70+
71+
ctxLogger := log.FromContext(ctx)
72+
73+
// Apply phase change
74+
if s.phase != nil {
75+
s.mcpRegistry.Status.Phase = *s.phase
76+
}
77+
78+
// Apply message change
79+
if s.message != nil {
80+
s.mcpRegistry.Status.Message = *s.message
81+
}
82+
83+
// Apply API endpoint change
84+
if s.apiEndpoint != nil {
85+
s.mcpRegistry.Status.APIEndpoint = *s.apiEndpoint
86+
}
87+
88+
// Apply condition changes
89+
for _, condition := range s.conditions {
90+
meta.SetStatusCondition(&s.mcpRegistry.Status.Conditions, condition)
91+
}
92+
93+
// Single status update
94+
if err := statusWriter.Update(ctx, s.mcpRegistry); err != nil {
95+
ctxLogger.Error(err, "Failed to apply batched status update")
96+
return fmt.Errorf("failed to apply batched status update: %w", err)
97+
}
98+
99+
ctxLogger.V(1).Info("Applied batched status update",
100+
"phase", s.phase,
101+
"message", s.message,
102+
"conditionsCount", len(s.conditions))
103+
104+
return nil
105+
}

0 commit comments

Comments
 (0)