diff --git a/cmd/workload-manager/main.go b/cmd/workload-manager/main.go index b6f56a24..8dc27720 100644 --- a/cmd/workload-manager/main.go +++ b/cmd/workload-manager/main.go @@ -139,8 +139,26 @@ func main() { select { case <-sigCh: klog.Info("Received shutdown signal, shutting down gracefully...") + + // Create a shutdown context with a 15-second deadline + shutdownCtx, shutdownCancel := context.WithTimeout(context.Background(), 15*time.Second) + defer shutdownCancel() + + // 1. Stop accepting new requests and drain in-flight ones + if err := server.Shutdown(shutdownCtx); err != nil { + klog.Errorf("server shutdown error: %v", err) + } + + // 2. Cancel root context to stop background workers (GC, informers, controller manager) cancel() - time.Sleep(2 * time.Second) // Give server time to shutdown gracefully + + // 3. Wait for background workers to finish their current operations + server.WaitForBackgroundWorkers() + + // 4. Close store connections after all workers have stopped + if err := server.CloseStore(); err != nil { + klog.Errorf("store close error: %v", err) + } case err := <-errCh: klog.Fatalf("Server error: %v", err) } diff --git a/pkg/router/session_manager_test.go b/pkg/router/session_manager_test.go index bdf6d6b8..8740813b 100644 --- a/pkg/router/session_manager_test.go +++ b/pkg/router/session_manager_test.go @@ -94,6 +94,10 @@ func (f *fakeStoreClient) UpdateSandboxLastActivity(_ context.Context, _ string, return nil } +func (f *fakeStoreClient) Close() error { + return nil +} + // ---- tests: GetSandboxBySession ---- func TestGetSandboxBySession_Success(t *testing.T) { diff --git a/pkg/store/interface.go b/pkg/store/interface.go index 29c35f0c..72e74749 100644 --- a/pkg/store/interface.go +++ b/pkg/store/interface.go @@ -40,4 +40,6 @@ type Store interface { ListInactiveSandboxes(ctx context.Context, before time.Time, limit int64) ([]*types.SandboxInfo, error) // UpdateSessionLastActivity updates the last-activity index for the given session UpdateSessionLastActivity(ctx context.Context, sessionID string, at time.Time) error + // Close releases all resources held by the store (e.g. connection pools) + Close() error } diff --git a/pkg/store/store_redis.go b/pkg/store/store_redis.go index 69bdd994..a1d60061 100644 --- a/pkg/store/store_redis.go +++ b/pkg/store/store_redis.go @@ -269,6 +269,11 @@ func (rs *redisStore) ListInactiveSandboxes(ctx context.Context, before time.Tim return rs.loadSandboxesBySessionIDs(ctx, ids) } +// Close releases all resources held by the redis store. +func (rs *redisStore) Close() error { + return rs.cli.Close() +} + // UpdateSessionLastActivity updates the last-activity index for the given session. func (rs *redisStore) UpdateSessionLastActivity(ctx context.Context, sessionID string, at time.Time) error { if sessionID == "" { diff --git a/pkg/store/store_valkey.go b/pkg/store/store_valkey.go index 31920a5a..4b8faff0 100644 --- a/pkg/store/store_valkey.go +++ b/pkg/store/store_valkey.go @@ -272,6 +272,12 @@ func (vs *valkeyStore) ListInactiveSandboxes(ctx context.Context, before time.Ti return vs.loadSandboxesBySessionIDs(ctx, ids) } +// Close releases all resources held by the valkey store. +func (vs *valkeyStore) Close() error { + vs.cli.Close() + return nil +} + // UpdateSessionLastActivity updates the last-activity index for the given session func (vs *valkeyStore) UpdateSessionLastActivity(ctx context.Context, sessionID string, at time.Time) error { if sessionID == "" { diff --git a/pkg/workloadmanager/handlers_test.go b/pkg/workloadmanager/handlers_test.go index 5fd0bb7e..a5f78712 100644 --- a/pkg/workloadmanager/handlers_test.go +++ b/pkg/workloadmanager/handlers_test.go @@ -72,6 +72,7 @@ func (f *fakeStore) ListInactiveSandboxes(_ context.Context, _ time.Time, _ int6 func (f *fakeStore) UpdateSessionLastActivity(_ context.Context, _ string, _ time.Time) error { return nil } +func (f *fakeStore) Close() error { return nil } func readySandbox() *sandboxv1alpha1.Sandbox { return &sandboxv1alpha1.Sandbox{ diff --git a/pkg/workloadmanager/server.go b/pkg/workloadmanager/server.go index d4199829..5b873b8f 100644 --- a/pkg/workloadmanager/server.go +++ b/pkg/workloadmanager/server.go @@ -20,6 +20,7 @@ import ( "context" "fmt" "net/http" + "sync" "time" "github.com/gin-gonic/gin" @@ -40,6 +41,7 @@ type Server struct { tokenCache *TokenCache informers *Informers storeClient store.Store + wg sync.WaitGroup } type Config struct { @@ -130,7 +132,7 @@ func (s *Server) Start(ctx context.Context) error { // Create HTTP/2 server for better performance h2s := &http2.Server{} - + // Wrap handler with h2c for HTTP/2 cleartext support h2cHandler := h2c.NewHandler(s.router, h2s) @@ -141,21 +143,14 @@ func (s *Server) Start(ctx context.Context) error { IdleTimeout: 90 * time.Second, // golang http default transport's idletimeout is 90s } - // Listen for shutdown signal in goroutine - go func() { - <-ctx.Done() - klog.Info("Shutting down server...") - shutdownCtx, cancel := context.WithTimeout(context.Background(), 10*time.Second) - defer cancel() - if err := s.httpServer.Shutdown(shutdownCtx); err != nil { - klog.Errorf("Server shutdown error: %v", err) - } - }() - klog.Infof("Server listening on %s", addr) gc := newGarbageCollector(s.k8sClient, s.storeClient, 15*time.Second) - go gc.run(ctx.Done()) + s.wg.Add(1) + go func() { + defer s.wg.Done() + gc.run(ctx.Done()) + }() // Start HTTP or HTTPS server if s.config.EnableTLS { @@ -168,6 +163,38 @@ func (s *Server) Start(ctx context.Context) error { return s.httpServer.ListenAndServe() } +// Shutdown performs graceful shutdown of the HTTP server. +func (s *Server) Shutdown(ctx context.Context) error { + if s.httpServer != nil { + klog.Info("Shutting down HTTP server...") + if err := s.httpServer.Shutdown(ctx); err != nil { + klog.Errorf("HTTP server shutdown error: %v", err) + return fmt.Errorf("HTTP server shutdown: %w", err) + } + klog.Info("HTTP server stopped") + } else { + klog.Info("HTTP server not initialized, skipping HTTP shutdown") + } + return nil +} + +// WaitForBackgroundWorkers blocks until all background workers (e.g. garbage collector) +// have finished their current operations and exited. +func (s *Server) WaitForBackgroundWorkers() { + s.wg.Wait() +} + +// CloseStore releases all resources held by the store (e.g. connection pools). +func (s *Server) CloseStore() error { + klog.Info("Closing store connections...") + if err := s.storeClient.Close(); err != nil { + klog.Errorf("store close error: %v", err) + return fmt.Errorf("store close: %w", err) + } + klog.Info("Store connections closed") + return nil +} + // loggingMiddleware logs each request (except /health) func (s *Server) loggingMiddleware(c *gin.Context) { start := time.Now()