Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 19 additions & 1 deletion cmd/workload-manager/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -139,8 +139,26 @@ func main() {
select {
case <-sigCh:
klog.Info("Received shutdown signal, shutting down gracefully...")

// Create a shutdown context with a 15-second deadline
shutdownCtx, shutdownCancel := context.WithTimeout(context.Background(), 15*time.Second)
defer shutdownCancel()

// 1. Stop accepting new requests and drain in-flight ones
if err := server.Shutdown(shutdownCtx); err != nil {
klog.Errorf("server shutdown error: %v", err)
}

// 2. Cancel root context to stop background workers (GC, informers, controller manager)
cancel()
time.Sleep(2 * time.Second) // Give server time to shutdown gracefully

// 3. Wait for background workers to finish their current operations
server.WaitForBackgroundWorkers()

// 4. Close store connections after all workers have stopped
if err := server.CloseStore(); err != nil {
klog.Errorf("store close error: %v", err)
}
case err := <-errCh:
klog.Fatalf("Server error: %v", err)
}
Expand Down
4 changes: 4 additions & 0 deletions pkg/router/session_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,10 @@ func (f *fakeStoreClient) UpdateSandboxLastActivity(_ context.Context, _ string,
return nil
}

func (f *fakeStoreClient) Close() error {
return nil
}

// ---- tests: GetSandboxBySession ----

func TestGetSandboxBySession_Success(t *testing.T) {
Expand Down
2 changes: 2 additions & 0 deletions pkg/store/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,4 +40,6 @@ type Store interface {
ListInactiveSandboxes(ctx context.Context, before time.Time, limit int64) ([]*types.SandboxInfo, error)
// UpdateSessionLastActivity updates the last-activity index for the given session
UpdateSessionLastActivity(ctx context.Context, sessionID string, at time.Time) error
// Close releases all resources held by the store (e.g. connection pools)
Close() error
}
5 changes: 5 additions & 0 deletions pkg/store/store_redis.go
Original file line number Diff line number Diff line change
Expand Up @@ -269,6 +269,11 @@ func (rs *redisStore) ListInactiveSandboxes(ctx context.Context, before time.Tim
return rs.loadSandboxesBySessionIDs(ctx, ids)
}

// Close releases all resources held by the redis store.
func (rs *redisStore) Close() error {
return rs.cli.Close()
}

// UpdateSessionLastActivity updates the last-activity index for the given session.
func (rs *redisStore) UpdateSessionLastActivity(ctx context.Context, sessionID string, at time.Time) error {
if sessionID == "" {
Expand Down
6 changes: 6 additions & 0 deletions pkg/store/store_valkey.go
Original file line number Diff line number Diff line change
Expand Up @@ -272,6 +272,12 @@ func (vs *valkeyStore) ListInactiveSandboxes(ctx context.Context, before time.Ti
return vs.loadSandboxesBySessionIDs(ctx, ids)
}

// Close releases all resources held by the valkey store.
func (vs *valkeyStore) Close() error {
vs.cli.Close()
return nil
}

// UpdateSessionLastActivity updates the last-activity index for the given session
func (vs *valkeyStore) UpdateSessionLastActivity(ctx context.Context, sessionID string, at time.Time) error {
if sessionID == "" {
Expand Down
1 change: 1 addition & 0 deletions pkg/workloadmanager/handlers_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@ func (f *fakeStore) ListInactiveSandboxes(_ context.Context, _ time.Time, _ int6
func (f *fakeStore) UpdateSessionLastActivity(_ context.Context, _ string, _ time.Time) error {
return nil
}
func (f *fakeStore) Close() error { return nil }

func readySandbox() *sandboxv1alpha1.Sandbox {
return &sandboxv1alpha1.Sandbox{
Expand Down
53 changes: 40 additions & 13 deletions pkg/workloadmanager/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"context"
"fmt"
"net/http"
"sync"
"time"

"github.com/gin-gonic/gin"
Expand All @@ -40,6 +41,7 @@ type Server struct {
tokenCache *TokenCache
informers *Informers
storeClient store.Store
wg sync.WaitGroup
}

type Config struct {
Expand Down Expand Up @@ -130,7 +132,7 @@ func (s *Server) Start(ctx context.Context) error {

// Create HTTP/2 server for better performance
h2s := &http2.Server{}

// Wrap handler with h2c for HTTP/2 cleartext support
h2cHandler := h2c.NewHandler(s.router, h2s)

Expand All @@ -141,21 +143,14 @@ func (s *Server) Start(ctx context.Context) error {
IdleTimeout: 90 * time.Second, // golang http default transport's idletimeout is 90s
}

// Listen for shutdown signal in goroutine
go func() {
<-ctx.Done()
klog.Info("Shutting down server...")
shutdownCtx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()
if err := s.httpServer.Shutdown(shutdownCtx); err != nil {
klog.Errorf("Server shutdown error: %v", err)
}
}()

klog.Infof("Server listening on %s", addr)

gc := newGarbageCollector(s.k8sClient, s.storeClient, 15*time.Second)
go gc.run(ctx.Done())
s.wg.Add(1)
go func() {
defer s.wg.Done()
gc.run(ctx.Done())
}()

// Start HTTP or HTTPS server
if s.config.EnableTLS {
Expand All @@ -168,6 +163,38 @@ func (s *Server) Start(ctx context.Context) error {
return s.httpServer.ListenAndServe()
}

// Shutdown performs graceful shutdown of the HTTP server.
func (s *Server) Shutdown(ctx context.Context) error {
if s.httpServer != nil {
klog.Info("Shutting down HTTP server...")
if err := s.httpServer.Shutdown(ctx); err != nil {
klog.Errorf("HTTP server shutdown error: %v", err)
return fmt.Errorf("HTTP server shutdown: %w", err)
}
klog.Info("HTTP server stopped")
} else {
klog.Info("HTTP server not initialized, skipping HTTP shutdown")
}
return nil
}

// WaitForBackgroundWorkers blocks until all background workers (e.g. garbage collector)
// have finished their current operations and exited.
func (s *Server) WaitForBackgroundWorkers() {
s.wg.Wait()
}

// CloseStore releases all resources held by the store (e.g. connection pools).
func (s *Server) CloseStore() error {
klog.Info("Closing store connections...")
if err := s.storeClient.Close(); err != nil {
klog.Errorf("store close error: %v", err)
return fmt.Errorf("store close: %w", err)
}
klog.Info("Store connections closed")
return nil
}

// loggingMiddleware logs each request (except /health)
func (s *Server) loggingMiddleware(c *gin.Context) {
start := time.Now()
Expand Down
Loading