Skip to content
Closed
39 changes: 30 additions & 9 deletions cmd/api/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -130,16 +130,28 @@ func run() error {
defer db.Close()
defer func() { _ = rdb.Close() }()

compute, storage, network, lbProxy, err := initBackends(deps, cfg, logger, db, rdb)
rawCompute, rawStorage, rawNetwork, rawLBProxy, err := initBackends(deps, cfg, logger, db, rdb)
if err != nil {
logger.Error("backend initialization failed", "error", err)
return err
}

// Wrap raw backends with resilience decorators (circuit breaker, bulkhead, timeouts).
compute := platform.NewResilientCompute(rawCompute, logger, platform.ResilientComputeOpts{})
storage := platform.NewResilientStorage(rawStorage, logger, platform.ResilientStorageOpts{})
network := platform.NewResilientNetwork(rawNetwork, logger, platform.ResilientNetworkOpts{})
lbProxy := platform.NewResilientLB(rawLBProxy, logger, platform.ResilientLBOpts{})

repos := deps.InitRepositories(db, rdb)

// Create leader elector for singleton worker coordination.
// When multiple worker replicas run, only one will hold leadership per key.
leaderElector := postgres.NewPgLeaderElector(db, logger)

svcs, workers, err := deps.InitServices(setup.ServiceConfig{
Config: cfg, Repos: repos, Compute: compute, Storage: storage,
Network: network, LBProxy: lbProxy, DB: db, RDB: rdb, Logger: logger,
LeaderElector: leaderElector,
})
if err != nil {
logger.Error("service initialization failed", "error", err)
Expand All @@ -159,47 +171,56 @@ func run() error {
}

func runApplication(deps AppDeps, cfg *platform.Config, logger *slog.Logger, r *gin.Engine, workers *setup.Workers) {
role := os.Getenv("APP_ROLE")
role := os.Getenv("ROLE")
if role == "" {
role = "all"
}

validRoles := map[string]bool{"api": true, "worker": true, "all": true}
if !validRoles[role] {
logger.Error("invalid ROLE value, must be one of: api, worker, all", "role", role)
return
}
logger.Info("starting with role", "role", role)

wg := &sync.WaitGroup{}
workerCtx, workerCancel := context.WithCancel(context.Background())

if role == "worker" || role == "all" {
runWorkers(workerCtx, wg, workers)
}

srv := deps.NewHTTPServer(":"+cfg.Port, r)

var srv *http.Server
if role == "api" || role == "all" {
srv = deps.NewHTTPServer(":"+cfg.Port, r)
go func() {
logger.Info("starting compute-api", "port", cfg.Port)
if err := deps.StartHTTPServer(srv); err != nil && !stdlib_errors.Is(err, http.ErrServerClosed) {
logger.Error("failed to start server", "error", err)
}
}()
} else {
logger.Info("running in worker-only mode")
logger.Info("running in worker-only mode, HTTP server disabled")
}

quit := make(chan os.Signal, 1)
deps.NotifySignals(quit, syscall.SIGINT, syscall.SIGTERM)
<-quit

logger.Info("shutting down server...")
logger.Info("shutting down...")

ctx, cancel := context.WithTimeout(context.Background(), defaultShutdownTimeout)
defer cancel()

if err := deps.ShutdownHTTPServer(ctx, srv); err != nil {
logger.Error("server forced to shutdown", "error", err)
if srv != nil {
if err := deps.ShutdownHTTPServer(ctx, srv); err != nil {
logger.Error("server forced to shutdown", "error", err)
}
}

workerCancel()
wg.Wait()
logger.Info("server exited")
logger.Info("shutdown complete")
}

type runner interface {
Expand Down
91 changes: 91 additions & 0 deletions cmd/api/main_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -167,6 +167,97 @@ func TestRunApplicationApiRoleStartsAndShutsDown(t *testing.T) {
}
}

func TestRunApplicationWorkerRoleDoesNotStartHTTP(t *testing.T) {
logger := slog.New(slog.NewTextHandler(io.Discard, nil))
t.Setenv("ROLE", "worker")

deps := DefaultDeps()

deps.NewHTTPServer = func(string, http.Handler) *http.Server {
t.Fatalf("NewHTTPServer should not be called in worker-only mode")
return nil
}
deps.StartHTTPServer = func(*http.Server) error {
t.Fatalf("StartHTTPServer should not be called in worker-only mode")
return nil
}
deps.ShutdownHTTPServer = func(context.Context, *http.Server) error {
t.Fatalf("ShutdownHTTPServer should not be called in worker-only mode")
return nil
}
deps.NotifySignals = func(c chan<- os.Signal, _ ...os.Signal) {
go func() {
// Give workers a moment to start, then signal shutdown
time.Sleep(50 * time.Millisecond)
c <- syscall.SIGTERM
}()
}

runApplication(deps, &platform.Config{Port: "0"}, logger, gin.New(), &setup.Workers{})
// If we reach here without t.Fatalf, the test passes — no HTTP server was touched.
}

func TestRunApplicationDefaultsToAllRole(t *testing.T) {
logger := slog.New(slog.NewTextHandler(io.Discard, nil))
t.Setenv("ROLE", "") // Explicitly empty to verify default

started := make(chan struct{})
shutdownCalled := make(chan struct{})
deps := DefaultDeps()

deps.NewHTTPServer = func(addr string, handler http.Handler) *http.Server {
return &http.Server{
Addr: addr,
Handler: handler,
ReadHeaderTimeout: 10 * time.Second,
}
}
deps.StartHTTPServer = func(*http.Server) error {
close(started)
return http.ErrServerClosed
}
deps.ShutdownHTTPServer = func(context.Context, *http.Server) error {
close(shutdownCalled)
return nil
}
deps.NotifySignals = func(c chan<- os.Signal, _ ...os.Signal) {
go func() {
<-started
c <- syscall.SIGTERM
}()
}

runApplication(deps, &platform.Config{Port: "0"}, logger, gin.New(), &setup.Workers{})

select {
case <-shutdownCalled:
case <-time.After(time.Second):
t.Fatalf("expected server shutdown to be called when ROLE defaults to 'all'")
}
}
Comment thread
coderabbitai[bot] marked this conversation as resolved.

func TestRunApplicationInvalidRoleReturnsEarly(t *testing.T) {
logger := slog.New(slog.NewTextHandler(io.Discard, nil))
t.Setenv("ROLE", "invalid")

deps := DefaultDeps()

deps.NewHTTPServer = func(string, http.Handler) *http.Server {
t.Fatalf("NewHTTPServer should not be called for invalid role")
return nil
}
deps.StartHTTPServer = func(*http.Server) error {
t.Fatalf("StartHTTPServer should not be called for invalid role")
return nil
}
deps.NotifySignals = func(c chan<- os.Signal, _ ...os.Signal) {
t.Fatalf("NotifySignals should not be called for invalid role")
}

// Should return immediately without starting anything
runApplication(deps, &platform.Config{Port: "0"}, logger, gin.New(), &setup.Workers{})
}

// Stub helpers below keep main.go testable without altering production behavior.

type stubDB struct{ closed bool }
Expand Down
120 changes: 84 additions & 36 deletions internal/api/setup/dependencies.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,10 @@
package setup

import (
"context"
"fmt"
"log/slog"
"sync"

"strings"

Expand Down Expand Up @@ -54,6 +56,8 @@ type Repositories struct {
AutoScaling ports.AutoScalingRepository
Accounting ports.AccountingRepository
TaskQueue ports.TaskQueue
DurableQueue ports.DurableTaskQueue
Ledger ports.ExecutionLedger
Image ports.ImageRepository
Cluster ports.ClusterRepository
Lifecycle ports.LifecycleRepository
Expand Down Expand Up @@ -99,6 +103,8 @@ func InitRepositories(db postgres.DB, rdb *redisv9.Client) *Repositories {
AutoScaling: postgres.NewAutoScalingRepo(db),
Accounting: postgres.NewAccountingRepository(db),
TaskQueue: redis.NewRedisTaskQueue(rdb),
DurableQueue: redis.NewDurableTaskQueue(rdb),
Ledger: postgres.NewExecutionLedger(db),
Image: postgres.NewImageRepository(db),
Cluster: postgres.NewClusterRepository(db),
Lifecycle: postgres.NewLifecycleRepository(db),
Expand Down Expand Up @@ -160,35 +166,46 @@ type Services struct {
VPCPeering ports.VPCPeeringService
}

// Workers struct to return background workers
// Runner is the interface that all background workers implement.
type Runner interface {
Run(context.Context, *sync.WaitGroup)
}

// Workers struct to return background workers.
// Singleton workers are typed as Runner so they can be wrapped with LeaderGuard.
// Parallel consumers retain concrete types for direct configuration access.
type Workers struct {
LB *services.LBWorker
AutoScaling *services.AutoScalingWorker
Cron *services.CronWorker
Container *services.ContainerWorker
Pipeline *workers.PipelineWorker
Provision *workers.ProvisionWorker
Accounting *workers.AccountingWorker
Cluster *workers.ClusterWorker
Lifecycle *workers.LifecycleWorker
ReplicaMonitor *workers.ReplicaMonitor
ClusterReconciler *workers.ClusterReconciler
Healing *workers.HealingWorker
DatabaseFailover *workers.DatabaseFailoverWorker
Log *workers.LogWorker
// Singleton workers (must run on exactly one node via leader election)
LB Runner
AutoScaling Runner
Cron Runner
Container Runner
Accounting Runner
Lifecycle Runner
ReplicaMonitor Runner
ClusterReconciler Runner
Healing Runner
DatabaseFailover Runner
Log Runner

// Parallel consumer workers (safe to run on multiple nodes)
Pipeline *workers.PipelineWorker
Provision *workers.ProvisionWorker
Cluster *workers.ClusterWorker
}

// ServiceConfig holds the dependencies required to initialize services
type ServiceConfig struct {
Config *platform.Config
Repos *Repositories
Compute ports.ComputeBackend
Storage ports.StorageBackend
Network ports.NetworkBackend
LBProxy ports.LBProxyAdapter
DB postgres.DB
RDB *redisv9.Client
Logger *slog.Logger
Config *platform.Config
Repos *Repositories
Compute ports.ComputeBackend
Storage ports.StorageBackend
Network ports.NetworkBackend
LBProxy ports.LBProxyAdapter
DB postgres.DB
RDB *redisv9.Client
Logger *slog.Logger
LeaderElector ports.LeaderElector // nil disables leader election (single-instance mode)
}

// InitServices constructs core services and background workers.
Expand Down Expand Up @@ -216,8 +233,10 @@ func InitServices(c ServiceConfig) (*Services, *Workers, error) {
if err != nil {
return nil, nil, fmt.Errorf("failed to init powerdns backend: %w", err)
}
// Wrap DNS backend with resilience (circuit breaker + timeout).
resilientDNS := platform.NewResilientDNS(pdnsBackend, c.Logger, platform.ResilientDNSOpts{})
dnsSvc := services.NewDNSService(services.DNSServiceParams{
Repo: c.Repos.DNS, Backend: pdnsBackend, VpcRepo: c.Repos.Vpc,
Repo: c.Repos.DNS, Backend: resilientDNS, VpcRepo: c.Repos.Vpc,
AuditSvc: auditSvc, EventSvc: eventSvc, Logger: c.Logger,
})

Expand Down Expand Up @@ -293,7 +312,7 @@ func InitServices(c ServiceConfig) (*Services, *Workers, error) {
accountingWorker := workers.NewAccountingWorker(accountingSvc, c.Logger)
imageSvc := services.NewImageService(c.Repos.Image, fileStore, c.Logger)
iamSvc := services.NewIAMService(c.Repos.IAM, auditSvc, eventSvc, c.Logger)
provisionWorker := workers.NewProvisionWorker(instSvcConcrete, c.Repos.TaskQueue, c.Logger)
provisionWorker := workers.NewProvisionWorker(instSvcConcrete, c.Repos.DurableQueue, c.Repos.Ledger, c.Logger)
healingWorker := workers.NewHealingWorker(instSvcConcrete, c.Repos.Instance, c.Logger)

clusterSvc, clusterProvisioner, err := initClusterServices(c, vpcSvc, instSvcConcrete, secretSvc, storageSvc, lbSvc, sgSvc)
Expand Down Expand Up @@ -333,17 +352,46 @@ func InitServices(c ServiceConfig) (*Services, *Workers, error) {
// 7. High Availability & Monitoring
replicaMonitor := initReplicaMonitor(c)

// Helper: wrap a singleton worker with LeaderGuard if leader election is enabled.
// Accepts a concrete pointer to avoid nil-interface pitfalls — callers must
// explicitly pass nil Runner when the worker should be skipped.
guardSingleton := func(key string, w Runner) Runner {
if w == nil || c.LeaderElector == nil {
return w
}
return workers.NewLeaderGuard(c.LeaderElector, key, w, c.Logger)
}

lifecycleWorker := workers.NewLifecycleWorker(c.Repos.Lifecycle, storageSvc, c.Repos.Storage, c.Logger)
clusterReconciler := workers.NewClusterReconciler(c.Repos.Cluster, clusterProvisioner, c.Logger)
dbFailoverWorker := workers.NewDatabaseFailoverWorker(databaseSvc, c.Repos.Database, c.Logger)
logWorker := workers.NewLogWorker(logSvc, c.Logger)

// For replicaMonitor, we must convert nil *ReplicaMonitor to nil Runner to avoid
// a non-nil interface wrapping a nil pointer.
var replicaMonitorRunner Runner
if replicaMonitor != nil {
replicaMonitorRunner = replicaMonitor
}

workersCollection := &Workers{
LB: lbWorker, AutoScaling: asgWorker, Cron: cronWorker, Container: containerWorker,
Pipeline: workers.NewPipelineWorker(c.Repos.Pipeline, c.Repos.TaskQueue, c.Compute, c.Logger),
Provision: provisionWorker, Accounting: accountingWorker,
Cluster: workers.NewClusterWorker(c.Repos.Cluster, clusterProvisioner, c.Repos.TaskQueue, c.Logger),
Lifecycle: workers.NewLifecycleWorker(c.Repos.Lifecycle, storageSvc, c.Repos.Storage, c.Logger),
ReplicaMonitor: replicaMonitor,
ClusterReconciler: workers.NewClusterReconciler(c.Repos.Cluster, clusterProvisioner, c.Logger),
Healing: healingWorker,
DatabaseFailover: workers.NewDatabaseFailoverWorker(databaseSvc, c.Repos.Database, c.Logger),
Log: workers.NewLogWorker(logSvc, c.Logger),
// Singleton workers — wrapped with leader election
LB: guardSingleton("singleton:lb", lbWorker),
AutoScaling: guardSingleton("singleton:autoscaling", asgWorker),
Cron: guardSingleton("singleton:cron", cronWorker),
Container: guardSingleton("singleton:container", containerWorker),
Accounting: guardSingleton("singleton:accounting", accountingWorker),
Lifecycle: guardSingleton("singleton:lifecycle", lifecycleWorker),
ReplicaMonitor: guardSingleton("singleton:replica-monitor", replicaMonitorRunner),
ClusterReconciler: guardSingleton("singleton:cluster-reconciler", clusterReconciler),
Healing: guardSingleton("singleton:healing", healingWorker),
DatabaseFailover: guardSingleton("singleton:db-failover", dbFailoverWorker),
Log: guardSingleton("singleton:log", logWorker),

// Parallel consumer workers — no leader election needed
Pipeline: workers.NewPipelineWorker(c.Repos.Pipeline, c.Repos.DurableQueue, c.Repos.Ledger, c.Compute, c.Logger),
Provision: provisionWorker,
Cluster: workers.NewClusterWorker(c.Repos.Cluster, clusterProvisioner, c.Repos.DurableQueue, c.Repos.Ledger, c.Logger),
}

return svcs, workersCollection, nil
Expand Down
Loading
Loading