diff --git a/fuku.yaml b/fuku.yaml index d9979f0..76b15b4 100644 --- a/fuku.yaml +++ b/fuku.yaml @@ -54,3 +54,6 @@ profiles: logging: level: debug format: console + +concurrency: + workers: 10 diff --git a/internal/app/errors/errors.go b/internal/app/errors/errors.go index b760089..bbbe679 100644 --- a/internal/app/errors/errors.go +++ b/internal/app/errors/errors.go @@ -5,9 +5,10 @@ import ( ) var ( - ErrFailedToReadConfig = errors.New("failed to read config file") - ErrFailedToParseConfig = errors.New("failed to parse config file") - ErrInvalidConfig = errors.New("invalid configuration") + ErrFailedToReadConfig = errors.New("failed to read config file") + ErrFailedToParseConfig = errors.New("failed to parse config file") + ErrInvalidConfig = errors.New("invalid configuration") + ErrInvalidConcurrencyWorkers = errors.New("concurrency workers must be greater than 0") ErrProfileNotFound = errors.New("profile not found") ErrUnsupportedProfileFormat = errors.New("unsupported profile format") @@ -50,5 +51,6 @@ var ( var ( As = errors.As + Is = errors.Is New = errors.New ) diff --git a/internal/app/runner/workerpool.go b/internal/app/runner/workerpool.go index 4be3975..75a6c2b 100644 --- a/internal/app/runner/workerpool.go +++ b/internal/app/runner/workerpool.go @@ -17,10 +17,10 @@ type workerPool struct { sem chan struct{} } -// NewWorkerPool creates a new worker pool with the specified maximum workers -func NewWorkerPool() WorkerPool { +// NewWorkerPool creates a new worker pool with the configured maximum workers +func NewWorkerPool(cfg *config.Config) WorkerPool { return &workerPool{ - sem: make(chan struct{}, config.MaxWorkers), + sem: make(chan struct{}, cfg.Concurrency.Workers), } } diff --git a/internal/app/runner/workerpool_test.go b/internal/app/runner/workerpool_test.go index e67af9c..e0b9f2d 100644 --- a/internal/app/runner/workerpool_test.go +++ b/internal/app/runner/workerpool_test.go @@ -13,10 +13,11 @@ import ( ) func Test_AcquireRelease(t *testing.T) { - pool := NewWorkerPool() ctx := context.Background() + cfg := config.DefaultConfig() + pool := NewWorkerPool(cfg) - for i := 0; i < config.MaxWorkers; i++ { + for i := 0; i < cfg.Concurrency.Workers; i++ { err := pool.Acquire(ctx) require.NoError(t, err) } @@ -44,14 +45,15 @@ func Test_AcquireRelease(t *testing.T) { t.Fatal("Should have acquired worker slot after release") } - for i := 0; i < config.MaxWorkers; i++ { + for i := 0; i < cfg.Concurrency.Workers; i++ { pool.Release() } } func Test_ConcurrentWorkers(t *testing.T) { ctx := context.Background() - pool := NewWorkerPool() + cfg := config.DefaultConfig() + pool := NewWorkerPool(cfg) var ( activeWorkers int @@ -93,15 +95,16 @@ func Test_ConcurrentWorkers(t *testing.T) { wg.Wait() assert.Equal(t, 0, activeWorkers) - assert.LessOrEqual(t, maxActive, config.MaxWorkers) + assert.LessOrEqual(t, maxActive, cfg.Concurrency.Workers) assert.Greater(t, maxActive, 0) } func Test_AcquireContextCancelled(t *testing.T) { ctx := context.Background() - pool := NewWorkerPool() + cfg := config.DefaultConfig() + pool := NewWorkerPool(cfg) - for i := 0; i < config.MaxWorkers; i++ { + for i := 0; i < cfg.Concurrency.Workers; i++ { err := pool.Acquire(ctx) require.NoError(t, err) } @@ -125,7 +128,7 @@ func Test_AcquireContextCancelled(t *testing.T) { t.Fatal("Should have received context cancellation error") } - for i := 0; i < config.MaxWorkers; i++ { + for i := 0; i < cfg.Concurrency.Workers; i++ { pool.Release() } } diff --git a/internal/config/config.go b/internal/config/config.go index 006c744..24a7a5f 100644 --- a/internal/config/config.go +++ b/internal/config/config.go @@ -23,6 +23,9 @@ type Config struct { Level string `yaml:"level"` Format string `yaml:"format"` } + Concurrency struct { + Workers int `yaml:"workers"` + } Version int } @@ -64,9 +67,12 @@ func DefaultConfig() *Config { Version: 1, } + cfg.Profiles[Default] = "*" + cfg.Logging.Level = LogLevel cfg.Logging.Format = LogFormat - cfg.Profiles[Default] = "*" + + cfg.Concurrency.Workers = MaxWorkers return cfg } @@ -244,6 +250,10 @@ func parseTierOrder(data []byte) (*Topology, error) { // Validate validates the configuration func (c *Config) Validate() error { + if err := c.validateConcurrency(); err != nil { + return err + } + for name, service := range c.Services { if err := service.validateReadiness(); err != nil { return fmt.Errorf("service %s: %w", name, err) @@ -253,6 +263,15 @@ func (c *Config) Validate() error { return nil } +// validateConcurrency validates concurrency settings +func (c *Config) validateConcurrency() error { + if c.Concurrency.Workers <= 0 { + return errors.ErrInvalidConcurrencyWorkers + } + + return nil +} + // validateReadiness validates the readiness configuration func (s *Service) validateReadiness() error { if s.Readiness == nil { diff --git a/internal/config/config_test.go b/internal/config/config_test.go index 5f9581d..79c8f37 100644 --- a/internal/config/config_test.go +++ b/internal/config/config_test.go @@ -16,6 +16,7 @@ func Test_DefaultConfig(t *testing.T) { assert.NotNil(t, cfg.Profiles) assert.Equal(t, LogLevel, cfg.Logging.Level) assert.Equal(t, LogFormat, cfg.Logging.Format) + assert.Equal(t, MaxWorkers, cfg.Concurrency.Workers) assert.Equal(t, 1, cfg.Version) } @@ -67,6 +68,46 @@ logging: }, error: nil, }, + { + name: "valid config file with concurrency", + setupFunc: func() func() { + content := `version: 1 +services: + test-service: + dir: ./test +concurrency: + workers: 10 +` + + err := os.WriteFile("fuku.yaml", []byte(content), 0644) + if err != nil { + t.Fatal(err) + } + + return func() { os.Remove("fuku.yaml") } + }, + error: nil, + }, + { + name: "invalid concurrency workers zero", + setupFunc: func() func() { + content := `version: 1 +services: + test-service: + dir: ./test +concurrency: + workers: 0 +` + + err := os.WriteFile("fuku.yaml", []byte(content), 0644) + if err != nil { + t.Fatal(err) + } + + return func() { os.Remove("fuku.yaml") } + }, + error: errors.ErrInvalidConfig, + }, { name: "invalid yaml structure for unmarshal", setupFunc: func() func() { @@ -114,7 +155,7 @@ services: "this should be a map not a string" if tt.error != nil { assert.Error(t, err) - assert.Equal(t, tt.error, err) + assert.True(t, errors.Is(err, tt.error), "expected error %v, got %v", tt.error, err) assert.Nil(t, cfg) assert.Nil(t, topology) } else { @@ -126,6 +167,48 @@ services: "this should be a map not a string" } } +func Test_LoadConcurrencyConfig(t *testing.T) { + tests := []struct { + name string + yaml string + expectedWorkers int + }{ + { + name: "default workers when not specified", + yaml: `version: 1`, + expectedWorkers: MaxWorkers, + }, + { + name: "custom workers value", + yaml: `version: 1 +concurrency: + workers: 10`, + expectedWorkers: 10, + }, + { + name: "workers value of 1", + yaml: `version: 1 +concurrency: + workers: 1`, + expectedWorkers: 1, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + err := os.WriteFile("fuku.yaml", []byte(tt.yaml), 0644) + if err != nil { + t.Fatal(err) + } + defer os.Remove("fuku.yaml") + + cfg, _, err := Load() + assert.NoError(t, err) + assert.Equal(t, tt.expectedWorkers, cfg.Concurrency.Workers) + }) + } +} + func Test_ApplyDefaults(t *testing.T) { tests := []struct { name string @@ -183,71 +266,124 @@ func Test_Validate(t *testing.T) { expectError bool errorMsg string }{ + { + name: "valid configuration with default workers", + config: DefaultConfig(), + expectError: false, + }, + { + name: "valid configuration with custom workers", + config: func() *Config { + cfg := DefaultConfig() + cfg.Concurrency.Workers = 10 + + return cfg + }(), + expectError: false, + }, + { + name: "invalid workers zero", + config: func() *Config { + cfg := DefaultConfig() + cfg.Concurrency.Workers = 0 + + return cfg + }(), + expectError: true, + errorMsg: "concurrency workers must be greater than 0", + }, + { + name: "invalid workers negative", + config: func() *Config { + cfg := DefaultConfig() + cfg.Concurrency.Workers = -1 + + return cfg + }(), + expectError: true, + errorMsg: "concurrency workers must be greater than 0", + }, { name: "valid configuration with standard tiers", - config: &Config{ - Services: map[string]*Service{ + config: func() *Config { + cfg := DefaultConfig() + cfg.Services = map[string]*Service{ "api": {Dir: "api", Tier: "foundation"}, "web": {Dir: "web", Tier: "platform"}, - }, - }, + } + + return cfg + }(), expectError: false, }, { name: "valid configuration with custom tier", - config: &Config{ - Services: map[string]*Service{ + config: func() *Config { + cfg := DefaultConfig() + cfg.Services = map[string]*Service{ "api": {Dir: "api", Tier: "custom-tier"}, - }, - }, + } + + return cfg + }(), expectError: false, }, { name: "valid configuration with mixed tiers", - config: &Config{ - Services: map[string]*Service{ + config: func() *Config { + cfg := DefaultConfig() + cfg.Services = map[string]*Service{ "api": {Dir: "api", Tier: "foundation"}, "custom": {Dir: "custom", Tier: "middleware"}, "another": {Dir: "another", Tier: "services"}, - }, - }, + } + + return cfg + }(), expectError: false, }, { name: "service with invalid readiness type", - config: &Config{ - Services: map[string]*Service{ + config: func() *Config { + cfg := DefaultConfig() + cfg.Services = map[string]*Service{ "api": {Dir: "api", Readiness: &Readiness{Type: "invalid"}}, - }, - }, + } + + return cfg + }(), expectError: true, errorMsg: "service api", }, { name: "service with http readiness missing url", - config: &Config{ - Services: map[string]*Service{ + config: func() *Config { + cfg := DefaultConfig() + cfg.Services = map[string]*Service{ "api": {Dir: "api", Readiness: &Readiness{Type: TypeHTTP}}, - }, - }, + } + + return cfg + }(), expectError: true, errorMsg: "service api", }, { name: "service with log readiness missing pattern", - config: &Config{ - Services: map[string]*Service{ + config: func() *Config { + cfg := DefaultConfig() + cfg.Services = map[string]*Service{ "api": {Dir: "api", Readiness: &Readiness{Type: TypeLog}}, - }, - }, + } + + return cfg + }(), expectError: true, errorMsg: "service api", }, { - name: "empty services map", - config: &Config{ - Services: map[string]*Service{}, - }, + name: "empty services map", + config: DefaultConfig(), expectError: false, }, }