Skip to content

Commit a9a4300

Browse files
authored
Merge branch 'main' into use-status-file-for-pids
2 parents 41d19a2 + 24a2f37 commit a9a4300

File tree

5 files changed

+1065
-11
lines changed

5 files changed

+1065
-11
lines changed

cmd/thv-operator/controllers/mcpserver_controller.go

Lines changed: 164 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -84,6 +84,19 @@ const mcpContainerName = "mcp"
8484
// trueValue is the string value "true" used for environment variable comparisons
8585
const trueValue = "true"
8686

87+
// Restart annotation keys for triggering pod restart
88+
const (
89+
RestartedAtAnnotationKey = "mcpserver.toolhive.stacklok.dev/restarted-at"
90+
RestartStrategyAnnotationKey = "mcpserver.toolhive.stacklok.dev/restart-strategy"
91+
LastProcessedRestartAnnotationKey = "mcpserver.toolhive.stacklok.dev/last-processed-restart"
92+
)
93+
94+
// Restart strategy constants
95+
const (
96+
RestartStrategyRolling = "rolling"
97+
RestartStrategyImmediate = "immediate"
98+
)
99+
87100
// Authorization ConfigMap label constants
88101
const (
89102
// authzLabelKey is the label key for authorization configuration type
@@ -161,6 +174,15 @@ func (r *MCPServerReconciler) Reconcile(ctx context.Context, req ctrl.Request) (
161174
return ctrl.Result{}, err
162175
}
163176

177+
// Check if the restart annotation has been updated and trigger a rolling restart if needed
178+
if shouldTriggerRestart, err := r.handleRestartAnnotation(ctx, mcpServer); err != nil {
179+
ctxLogger.Error(err, "Failed to handle restart annotation")
180+
return ctrl.Result{}, err
181+
} else if shouldTriggerRestart {
182+
// Return and requeue to avoid double-processing after triggering restart
183+
return ctrl.Result{Requeue: true}, nil
184+
}
185+
164186
// Check if MCPToolConfig is referenced and handle it
165187
if err := r.handleToolConfig(ctx, mcpServer); err != nil {
166188
ctxLogger.Error(err, "Failed to handle MCPToolConfig")
@@ -329,6 +351,148 @@ func (r *MCPServerReconciler) Reconcile(ctx context.Context, req ctrl.Request) (
329351
return ctrl.Result{}, nil
330352
}
331353

354+
// handleRestartAnnotation checks if the restart annotation has been updated and triggers a restart if needed
355+
// Returns true if a restart was triggered and the reconciliation should be requeued
356+
func (r *MCPServerReconciler) handleRestartAnnotation(ctx context.Context, mcpServer *mcpv1alpha1.MCPServer) (bool, error) {
357+
ctxLogger := log.FromContext(ctx)
358+
359+
// Get the current restarted-at annotation value from the CR
360+
currentRestartedAt := ""
361+
if mcpServer.Annotations != nil {
362+
currentRestartedAt = mcpServer.Annotations[RestartedAtAnnotationKey]
363+
}
364+
365+
// Skip if no restart annotation is present
366+
if currentRestartedAt == "" {
367+
return false, nil
368+
}
369+
370+
// Parse the timestamp from the annotation
371+
requestTime, err := time.Parse(time.RFC3339, currentRestartedAt)
372+
if err != nil {
373+
ctxLogger.Error(err, "Invalid timestamp format in restart annotation",
374+
"annotation", RestartedAtAnnotationKey,
375+
"value", currentRestartedAt)
376+
return false, nil
377+
}
378+
379+
// Check if we've already processed this restart request
380+
lastProcessedRestart := ""
381+
if mcpServer.Annotations != nil {
382+
lastProcessedRestart = mcpServer.Annotations[LastProcessedRestartAnnotationKey]
383+
}
384+
385+
if lastProcessedRestart != "" {
386+
lastProcessedTime, err := time.Parse(time.RFC3339, lastProcessedRestart)
387+
if err == nil && !requestTime.After(lastProcessedTime) {
388+
// This request has already been processed
389+
return false, nil
390+
}
391+
}
392+
393+
// Get restart strategy (default to rolling)
394+
strategy := RestartStrategyRolling
395+
if mcpServer.Annotations != nil {
396+
if strategyValue, exists := mcpServer.Annotations[RestartStrategyAnnotationKey]; exists {
397+
strategy = strategyValue
398+
}
399+
}
400+
401+
ctxLogger.Info("Processing restart request",
402+
"annotation", RestartedAtAnnotationKey,
403+
"timestamp", currentRestartedAt,
404+
"strategy", strategy)
405+
406+
// Perform the restart based on strategy
407+
err = r.performRestart(ctx, mcpServer, strategy)
408+
if err != nil {
409+
return false, fmt.Errorf("failed to perform restart: %w", err)
410+
}
411+
412+
// Update the last processed restart timestamp in annotations
413+
if mcpServer.Annotations == nil {
414+
mcpServer.Annotations = make(map[string]string)
415+
}
416+
mcpServer.Annotations[LastProcessedRestartAnnotationKey] = currentRestartedAt
417+
err = r.Update(ctx, mcpServer)
418+
if err != nil {
419+
return false, fmt.Errorf("failed to update MCPServer with last processed restart annotation: %w", err)
420+
}
421+
422+
return true, nil
423+
}
424+
425+
// performRestart executes the restart based on the specified strategy
426+
func (r *MCPServerReconciler) performRestart(ctx context.Context, mcpServer *mcpv1alpha1.MCPServer, strategy string) error {
427+
switch strategy {
428+
case RestartStrategyRolling:
429+
return r.performRollingRestart(ctx, mcpServer)
430+
case RestartStrategyImmediate:
431+
return r.performImmediateRestart(ctx, mcpServer)
432+
default:
433+
ctxLogger := log.FromContext(ctx)
434+
ctxLogger.Info("Unknown restart strategy, defaulting to rolling", "strategy", strategy)
435+
return r.performRollingRestart(ctx, mcpServer)
436+
}
437+
}
438+
439+
// performRollingRestart triggers a rolling restart by updating the deployment's pod template annotation
440+
func (r *MCPServerReconciler) performRollingRestart(ctx context.Context, mcpServer *mcpv1alpha1.MCPServer) error {
441+
ctxLogger := log.FromContext(ctx)
442+
deployment := &appsv1.Deployment{}
443+
err := r.Get(ctx, types.NamespacedName{Name: mcpServer.Name, Namespace: mcpServer.Namespace}, deployment)
444+
if err != nil {
445+
if errors.IsNotFound(err) {
446+
ctxLogger.Info("Deployment not found, skipping rolling restart")
447+
return nil
448+
}
449+
return fmt.Errorf("failed to get deployment for rolling restart: %w", err)
450+
}
451+
452+
// Update the deployment's pod template annotation to trigger a rolling restart
453+
if deployment.Spec.Template.Annotations == nil {
454+
deployment.Spec.Template.Annotations = map[string]string{}
455+
}
456+
deployment.Spec.Template.Annotations[RestartedAtAnnotationKey] = time.Now().Format(time.RFC3339)
457+
458+
err = r.Update(ctx, deployment)
459+
if err != nil {
460+
return fmt.Errorf("failed to update deployment for rolling restart: %w", err)
461+
}
462+
463+
ctxLogger.Info("Successfully triggered rolling restart of deployment", "deployment", deployment.Name)
464+
return nil
465+
}
466+
467+
// performImmediateRestart triggers an immediate restart by deleting the pods directly
468+
func (r *MCPServerReconciler) performImmediateRestart(ctx context.Context, mcpServer *mcpv1alpha1.MCPServer) error {
469+
ctxLogger := log.FromContext(ctx)
470+
471+
// List pods belonging to this MCPServer
472+
podList := &corev1.PodList{}
473+
listOpts := []client.ListOption{
474+
client.InNamespace(mcpServer.Namespace),
475+
client.MatchingLabels(labelsForMCPServer(mcpServer.Name)),
476+
}
477+
478+
err := r.List(ctx, podList, listOpts...)
479+
if err != nil {
480+
return fmt.Errorf("failed to list pods for immediate restart: %w", err)
481+
}
482+
483+
// Delete each pod to trigger immediate restart
484+
for _, pod := range podList.Items {
485+
ctxLogger.Info("Deleting pod for immediate restart", "pod", pod.Name)
486+
err = r.Delete(ctx, &pod)
487+
if err != nil && !errors.IsNotFound(err) {
488+
return fmt.Errorf("failed to delete pod %s for immediate restart: %w", pod.Name, err)
489+
}
490+
}
491+
492+
ctxLogger.Info("Successfully triggered immediate restart", "podsDeleted", len(podList.Items))
493+
return nil
494+
}
495+
332496
// ensureRBACResource is a generic helper function to ensure a Kubernetes resource exists and is up to date
333497
func (r *MCPServerReconciler) ensureRBACResource(
334498
ctx context.Context,

0 commit comments

Comments
 (0)