From bf375305d4a293d7d8dad86d3ad4d78e98a145fe Mon Sep 17 00:00:00 2001 From: Mahil Patel Date: Sat, 14 Feb 2026 17:25:39 +0530 Subject: [PATCH 1/4] fix: implemented graceful shutdown for workloadmanager Signed-off-by: Mahil Patel --- cmd/workload-manager/main.go | 12 ++++++++++- pkg/router/session_manager_test.go | 4 ++++ pkg/store/interface.go | 2 ++ pkg/store/store_redis.go | 5 +++++ pkg/store/store_valkey.go | 6 ++++++ pkg/workloadmanager/server.go | 32 +++++++++++++++++++----------- 6 files changed, 48 insertions(+), 13 deletions(-) diff --git a/cmd/workload-manager/main.go b/cmd/workload-manager/main.go index b6f56a24..c2ce87ca 100644 --- a/cmd/workload-manager/main.go +++ b/cmd/workload-manager/main.go @@ -139,8 +139,18 @@ func main() { select { case <-sigCh: klog.Info("Received shutdown signal, shutting down gracefully...") + + // Create a shutdown context with a 15-second deadline + shutdownCtx, shutdownCancel := context.WithTimeout(context.Background(), 15*time.Second) + defer shutdownCancel() + + // 1. Shutdown server (stops HTTP, closes store) + if err := server.Shutdown(shutdownCtx); err != nil { + klog.Errorf("Server shutdown error: %v", err) + } + + // 2. Cancel root context to stop background workers (GC, informers, controller manager) cancel() - time.Sleep(2 * time.Second) // Give server time to shutdown gracefully case err := <-errCh: klog.Fatalf("Server error: %v", err) } diff --git a/pkg/router/session_manager_test.go b/pkg/router/session_manager_test.go index bdf6d6b8..8740813b 100644 --- a/pkg/router/session_manager_test.go +++ b/pkg/router/session_manager_test.go @@ -94,6 +94,10 @@ func (f *fakeStoreClient) UpdateSandboxLastActivity(_ context.Context, _ string, return nil } +func (f *fakeStoreClient) Close() error { + return nil +} + // ---- tests: GetSandboxBySession ---- func TestGetSandboxBySession_Success(t *testing.T) { diff --git a/pkg/store/interface.go b/pkg/store/interface.go index 29c35f0c..72e74749 100644 --- a/pkg/store/interface.go +++ b/pkg/store/interface.go @@ -40,4 +40,6 @@ type Store interface { ListInactiveSandboxes(ctx context.Context, before time.Time, limit int64) ([]*types.SandboxInfo, error) // UpdateSessionLastActivity updates the last-activity index for the given session UpdateSessionLastActivity(ctx context.Context, sessionID string, at time.Time) error + // Close releases all resources held by the store (e.g. connection pools) + Close() error } diff --git a/pkg/store/store_redis.go b/pkg/store/store_redis.go index 69bdd994..a1d60061 100644 --- a/pkg/store/store_redis.go +++ b/pkg/store/store_redis.go @@ -269,6 +269,11 @@ func (rs *redisStore) ListInactiveSandboxes(ctx context.Context, before time.Tim return rs.loadSandboxesBySessionIDs(ctx, ids) } +// Close releases all resources held by the redis store. +func (rs *redisStore) Close() error { + return rs.cli.Close() +} + // UpdateSessionLastActivity updates the last-activity index for the given session. func (rs *redisStore) UpdateSessionLastActivity(ctx context.Context, sessionID string, at time.Time) error { if sessionID == "" { diff --git a/pkg/store/store_valkey.go b/pkg/store/store_valkey.go index 31920a5a..4b8faff0 100644 --- a/pkg/store/store_valkey.go +++ b/pkg/store/store_valkey.go @@ -272,6 +272,12 @@ func (vs *valkeyStore) ListInactiveSandboxes(ctx context.Context, before time.Ti return vs.loadSandboxesBySessionIDs(ctx, ids) } +// Close releases all resources held by the valkey store. +func (vs *valkeyStore) Close() error { + vs.cli.Close() + return nil +} + // UpdateSessionLastActivity updates the last-activity index for the given session func (vs *valkeyStore) UpdateSessionLastActivity(ctx context.Context, sessionID string, at time.Time) error { if sessionID == "" { diff --git a/pkg/workloadmanager/server.go b/pkg/workloadmanager/server.go index d4199829..96031ac0 100644 --- a/pkg/workloadmanager/server.go +++ b/pkg/workloadmanager/server.go @@ -130,7 +130,7 @@ func (s *Server) Start(ctx context.Context) error { // Create HTTP/2 server for better performance h2s := &http2.Server{} - + // Wrap handler with h2c for HTTP/2 cleartext support h2cHandler := h2c.NewHandler(s.router, h2s) @@ -141,17 +141,6 @@ func (s *Server) Start(ctx context.Context) error { IdleTimeout: 90 * time.Second, // golang http default transport's idletimeout is 90s } - // Listen for shutdown signal in goroutine - go func() { - <-ctx.Done() - klog.Info("Shutting down server...") - shutdownCtx, cancel := context.WithTimeout(context.Background(), 10*time.Second) - defer cancel() - if err := s.httpServer.Shutdown(shutdownCtx); err != nil { - klog.Errorf("Server shutdown error: %v", err) - } - }() - klog.Infof("Server listening on %s", addr) gc := newGarbageCollector(s.k8sClient, s.storeClient, 15*time.Second) @@ -168,6 +157,25 @@ func (s *Server) Start(ctx context.Context) error { return s.httpServer.ListenAndServe() } +// Shutdown performs graceful shutdown of the server. +func (s *Server) Shutdown(ctx context.Context) error { + klog.Info("Shutting down HTTP server...") + if err := s.httpServer.Shutdown(ctx); err != nil { + klog.Errorf("HTTP server shutdown error: %v", err) + return fmt.Errorf("HTTP server shutdown: %w", err) + } + klog.Info("HTTP server stopped") + + klog.Info("Closing store connections...") + if err := s.storeClient.Close(); err != nil { + klog.Errorf("Store close error: %v", err) + return fmt.Errorf("store close: %w", err) + } + klog.Info("Store connections closed") + + return nil +} + // loggingMiddleware logs each request (except /health) func (s *Server) loggingMiddleware(c *gin.Context) { start := time.Now() From f210565080d1c651ccf91946a4d0052df79d7541 Mon Sep 17 00:00:00 2001 From: Mahil Patel Date: Sat, 14 Feb 2026 17:42:22 +0530 Subject: [PATCH 2/4] fix: resolve shutdown race condition and separate Shutdown/CloseStore responsibilities Signed-off-by: Mahil Patel --- cmd/workload-manager/main.go | 10 +++++++++- pkg/workloadmanager/server.go | 7 +++++-- 2 files changed, 14 insertions(+), 3 deletions(-) diff --git a/cmd/workload-manager/main.go b/cmd/workload-manager/main.go index c2ce87ca..b6c4da8c 100644 --- a/cmd/workload-manager/main.go +++ b/cmd/workload-manager/main.go @@ -144,13 +144,21 @@ func main() { shutdownCtx, shutdownCancel := context.WithTimeout(context.Background(), 15*time.Second) defer shutdownCancel() - // 1. Shutdown server (stops HTTP, closes store) + // 1. Stop accepting new requests and drain in-flight ones if err := server.Shutdown(shutdownCtx); err != nil { klog.Errorf("Server shutdown error: %v", err) } // 2. Cancel root context to stop background workers (GC, informers, controller manager) cancel() + + // 3. Allow background workers time to finish their current operations + time.Sleep(2 * time.Second) + + // 4. Close store connections after all workers have stopped + if err := server.CloseStore(); err != nil { + klog.Errorf("Store close error: %v", err) + } case err := <-errCh: klog.Fatalf("Server error: %v", err) } diff --git a/pkg/workloadmanager/server.go b/pkg/workloadmanager/server.go index 96031ac0..2fc27048 100644 --- a/pkg/workloadmanager/server.go +++ b/pkg/workloadmanager/server.go @@ -157,7 +157,7 @@ func (s *Server) Start(ctx context.Context) error { return s.httpServer.ListenAndServe() } -// Shutdown performs graceful shutdown of the server. +// Shutdown performs graceful shutdown of the HTTP server. func (s *Server) Shutdown(ctx context.Context) error { klog.Info("Shutting down HTTP server...") if err := s.httpServer.Shutdown(ctx); err != nil { @@ -165,14 +165,17 @@ func (s *Server) Shutdown(ctx context.Context) error { return fmt.Errorf("HTTP server shutdown: %w", err) } klog.Info("HTTP server stopped") + return nil +} +// CloseStore releases all resources held by the store (e.g. connection pools). +func (s *Server) CloseStore() error { klog.Info("Closing store connections...") if err := s.storeClient.Close(); err != nil { klog.Errorf("Store close error: %v", err) return fmt.Errorf("store close: %w", err) } klog.Info("Store connections closed") - return nil } From b609273ad4d5053ba269fee15590b90ab1736893 Mon Sep 17 00:00:00 2001 From: Mahil Patel Date: Mon, 16 Feb 2026 18:51:31 +0530 Subject: [PATCH 3/4] added the nil gaurd around s.httpServer.shutdown() Signed-off-by: Mahil Patel --- pkg/workloadmanager/server.go | 14 +++++++++----- 1 file changed, 9 insertions(+), 5 deletions(-) diff --git a/pkg/workloadmanager/server.go b/pkg/workloadmanager/server.go index 2fc27048..0ea4e1c9 100644 --- a/pkg/workloadmanager/server.go +++ b/pkg/workloadmanager/server.go @@ -159,12 +159,16 @@ func (s *Server) Start(ctx context.Context) error { // Shutdown performs graceful shutdown of the HTTP server. func (s *Server) Shutdown(ctx context.Context) error { - klog.Info("Shutting down HTTP server...") - if err := s.httpServer.Shutdown(ctx); err != nil { - klog.Errorf("HTTP server shutdown error: %v", err) - return fmt.Errorf("HTTP server shutdown: %w", err) + if s.httpServer != nil { + klog.Info("Shutting down HTTP server...") + if err := s.httpServer.Shutdown(ctx); err != nil { + klog.Errorf("HTTP server shutdown error: %v", err) + return fmt.Errorf("HTTP server shutdown: %w", err) + } + klog.Info("HTTP server stopped") + } else { + klog.Info("HTTP server not initialized, skipping HTTP shutdown") } - klog.Info("HTTP server stopped") return nil } From 44898eb164169a4d880e993a2fb92c2e65a5aef9 Mon Sep 17 00:00:00 2001 From: Mahil Patel Date: Mon, 16 Feb 2026 19:12:42 +0530 Subject: [PATCH 4/4] fix: address PR review comments for graceful shutdown Signed-off-by: Mahil Patel --- cmd/workload-manager/main.go | 8 ++++---- pkg/workloadmanager/handlers_test.go | 1 + pkg/workloadmanager/server.go | 16 ++++++++++++++-- 3 files changed, 19 insertions(+), 6 deletions(-) diff --git a/cmd/workload-manager/main.go b/cmd/workload-manager/main.go index b6c4da8c..8dc27720 100644 --- a/cmd/workload-manager/main.go +++ b/cmd/workload-manager/main.go @@ -146,18 +146,18 @@ func main() { // 1. Stop accepting new requests and drain in-flight ones if err := server.Shutdown(shutdownCtx); err != nil { - klog.Errorf("Server shutdown error: %v", err) + klog.Errorf("server shutdown error: %v", err) } // 2. Cancel root context to stop background workers (GC, informers, controller manager) cancel() - // 3. Allow background workers time to finish their current operations - time.Sleep(2 * time.Second) + // 3. Wait for background workers to finish their current operations + server.WaitForBackgroundWorkers() // 4. Close store connections after all workers have stopped if err := server.CloseStore(); err != nil { - klog.Errorf("Store close error: %v", err) + klog.Errorf("store close error: %v", err) } case err := <-errCh: klog.Fatalf("Server error: %v", err) diff --git a/pkg/workloadmanager/handlers_test.go b/pkg/workloadmanager/handlers_test.go index 5fd0bb7e..a5f78712 100644 --- a/pkg/workloadmanager/handlers_test.go +++ b/pkg/workloadmanager/handlers_test.go @@ -72,6 +72,7 @@ func (f *fakeStore) ListInactiveSandboxes(_ context.Context, _ time.Time, _ int6 func (f *fakeStore) UpdateSessionLastActivity(_ context.Context, _ string, _ time.Time) error { return nil } +func (f *fakeStore) Close() error { return nil } func readySandbox() *sandboxv1alpha1.Sandbox { return &sandboxv1alpha1.Sandbox{ diff --git a/pkg/workloadmanager/server.go b/pkg/workloadmanager/server.go index 0ea4e1c9..5b873b8f 100644 --- a/pkg/workloadmanager/server.go +++ b/pkg/workloadmanager/server.go @@ -20,6 +20,7 @@ import ( "context" "fmt" "net/http" + "sync" "time" "github.com/gin-gonic/gin" @@ -40,6 +41,7 @@ type Server struct { tokenCache *TokenCache informers *Informers storeClient store.Store + wg sync.WaitGroup } type Config struct { @@ -144,7 +146,11 @@ func (s *Server) Start(ctx context.Context) error { klog.Infof("Server listening on %s", addr) gc := newGarbageCollector(s.k8sClient, s.storeClient, 15*time.Second) - go gc.run(ctx.Done()) + s.wg.Add(1) + go func() { + defer s.wg.Done() + gc.run(ctx.Done()) + }() // Start HTTP or HTTPS server if s.config.EnableTLS { @@ -172,11 +178,17 @@ func (s *Server) Shutdown(ctx context.Context) error { return nil } +// WaitForBackgroundWorkers blocks until all background workers (e.g. garbage collector) +// have finished their current operations and exited. +func (s *Server) WaitForBackgroundWorkers() { + s.wg.Wait() +} + // CloseStore releases all resources held by the store (e.g. connection pools). func (s *Server) CloseStore() error { klog.Info("Closing store connections...") if err := s.storeClient.Close(); err != nil { - klog.Errorf("Store close error: %v", err) + klog.Errorf("store close error: %v", err) return fmt.Errorf("store close: %w", err) } klog.Info("Store connections closed")