Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions fuku.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -54,3 +54,6 @@ profiles:
logging:
level: debug
format: console

concurrency:
workers: 10
8 changes: 5 additions & 3 deletions internal/app/errors/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,10 @@ import (
)

var (
ErrFailedToReadConfig = errors.New("failed to read config file")
ErrFailedToParseConfig = errors.New("failed to parse config file")
ErrInvalidConfig = errors.New("invalid configuration")
ErrFailedToReadConfig = errors.New("failed to read config file")
ErrFailedToParseConfig = errors.New("failed to parse config file")
ErrInvalidConfig = errors.New("invalid configuration")
ErrInvalidConcurrencyWorkers = errors.New("concurrency workers must be greater than 0")

ErrProfileNotFound = errors.New("profile not found")
ErrUnsupportedProfileFormat = errors.New("unsupported profile format")
Expand Down Expand Up @@ -50,5 +51,6 @@ var (

var (
As = errors.As
Is = errors.Is
New = errors.New
)
6 changes: 3 additions & 3 deletions internal/app/runner/workerpool.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,10 @@ type workerPool struct {
sem chan struct{}
}

// NewWorkerPool creates a new worker pool with the specified maximum workers
func NewWorkerPool() WorkerPool {
// NewWorkerPool creates a new worker pool with the configured maximum workers
func NewWorkerPool(cfg *config.Config) WorkerPool {
return &workerPool{
sem: make(chan struct{}, config.MaxWorkers),
sem: make(chan struct{}, cfg.Concurrency.Workers),
}
}

Expand Down
19 changes: 11 additions & 8 deletions internal/app/runner/workerpool_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,10 +13,11 @@ import (
)

func Test_AcquireRelease(t *testing.T) {
pool := NewWorkerPool()
ctx := context.Background()
cfg := config.DefaultConfig()
pool := NewWorkerPool(cfg)

for i := 0; i < config.MaxWorkers; i++ {
for i := 0; i < cfg.Concurrency.Workers; i++ {
err := pool.Acquire(ctx)
require.NoError(t, err)
}
Expand Down Expand Up @@ -44,14 +45,15 @@ func Test_AcquireRelease(t *testing.T) {
t.Fatal("Should have acquired worker slot after release")
}

for i := 0; i < config.MaxWorkers; i++ {
for i := 0; i < cfg.Concurrency.Workers; i++ {
pool.Release()
}
}

func Test_ConcurrentWorkers(t *testing.T) {
ctx := context.Background()
pool := NewWorkerPool()
cfg := config.DefaultConfig()
pool := NewWorkerPool(cfg)

var (
activeWorkers int
Expand Down Expand Up @@ -93,15 +95,16 @@ func Test_ConcurrentWorkers(t *testing.T) {
wg.Wait()

assert.Equal(t, 0, activeWorkers)
assert.LessOrEqual(t, maxActive, config.MaxWorkers)
assert.LessOrEqual(t, maxActive, cfg.Concurrency.Workers)
assert.Greater(t, maxActive, 0)
}

func Test_AcquireContextCancelled(t *testing.T) {
ctx := context.Background()
pool := NewWorkerPool()
cfg := config.DefaultConfig()
pool := NewWorkerPool(cfg)

for i := 0; i < config.MaxWorkers; i++ {
for i := 0; i < cfg.Concurrency.Workers; i++ {
err := pool.Acquire(ctx)
require.NoError(t, err)
}
Expand All @@ -125,7 +128,7 @@ func Test_AcquireContextCancelled(t *testing.T) {
t.Fatal("Should have received context cancellation error")
}

for i := 0; i < config.MaxWorkers; i++ {
for i := 0; i < cfg.Concurrency.Workers; i++ {
pool.Release()
}
}
21 changes: 20 additions & 1 deletion internal/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,9 @@ type Config struct {
Level string `yaml:"level"`
Format string `yaml:"format"`
}
Concurrency struct {
Workers int `yaml:"workers"`
}
Version int
}

Expand Down Expand Up @@ -64,9 +67,12 @@ func DefaultConfig() *Config {
Version: 1,
}

cfg.Profiles[Default] = "*"

cfg.Logging.Level = LogLevel
cfg.Logging.Format = LogFormat
cfg.Profiles[Default] = "*"

cfg.Concurrency.Workers = MaxWorkers

return cfg
}
Expand Down Expand Up @@ -244,6 +250,10 @@ func parseTierOrder(data []byte) (*Topology, error) {

// Validate validates the configuration
func (c *Config) Validate() error {
if err := c.validateConcurrency(); err != nil {
return err
}

for name, service := range c.Services {
if err := service.validateReadiness(); err != nil {
return fmt.Errorf("service %s: %w", name, err)
Expand All @@ -253,6 +263,15 @@ func (c *Config) Validate() error {
return nil
}

// validateConcurrency validates concurrency settings
func (c *Config) validateConcurrency() error {
if c.Concurrency.Workers <= 0 {
return errors.ErrInvalidConcurrencyWorkers
}

return nil
}

// validateReadiness validates the readiness configuration
func (s *Service) validateReadiness() error {
if s.Readiness == nil {
Expand Down
Loading