diff --git a/hw05_parallel_execution/go.mod b/hw05_parallel_execution/go.mod index 89e9922..84435c4 100644 --- a/hw05_parallel_execution/go.mod +++ b/hw05_parallel_execution/go.mod @@ -1,4 +1,4 @@ -module github.com/fixme_my_friend/hw05_parallel_execution +module github.com/ezhk/golang-learning/hw05_parallel_execution go 1.14 diff --git a/hw05_parallel_execution/run.go b/hw05_parallel_execution/run.go index 6be37a4..8c12456 100644 --- a/hw05_parallel_execution/run.go +++ b/hw05_parallel_execution/run.go @@ -2,14 +2,56 @@ package hw05_parallel_execution //nolint:golint,stylecheck import ( "errors" + "sync" + "sync/atomic" ) -var ErrErrorsLimitExceeded = errors.New("errors limit exceeded") +var ( + ErrErrorsLimitExceeded = errors.New("errors limit exceeded") + ErrGoroutinesLimitNonPositive = errors.New("goroutines limit contains non positive value") + ErrErrorsAmountNonPositive = errors.New("errors amount contains non positive value") +) type Task func() error -// Run starts tasks in N goroutines and stops its work when receiving M errors from tasks -func Run(tasks []Task, N int, M int) error { - // Place your code here +func Run(tasks []Task, goroutinesLimit, maxErrors int) error { + if goroutinesLimit < 1 { + return ErrGoroutinesLimitNonPositive + } + if maxErrors < 1 { + return ErrErrorsAmountNonPositive + } + + errorsLeft := int64(maxErrors) + concurrentCh := make(chan struct{}, goroutinesLimit) + defer close(concurrentCh) + + var wg sync.WaitGroup + for _, task := range tasks { + if atomic.LoadInt64(&errorsLeft) < 1 { + break + } + + // wait for next step: use channel as task queue + concurrentCh <- struct{}{} + + wg.Add(1) + go func(t Task) { + defer func() { + <-concurrentCh + wg.Done() + }() + + if err := t(); err != nil { + atomic.AddInt64(&errorsLeft, -1) + } + }(task) + } + wg.Wait() + + if errorsLeft < 1 { + return ErrErrorsLimitExceeded + } + return nil } diff --git a/hw05_parallel_execution/run_test.go b/hw05_parallel_execution/run_test.go index d1157c8..d537183 100644 --- a/hw05_parallel_execution/run_test.go +++ b/hw05_parallel_execution/run_test.go @@ -66,4 +66,21 @@ func TestRun(t *testing.T) { require.Equal(t, runTasksCount, int32(tasksCount), "not all tasks were completed") require.LessOrEqual(t, int64(elapsedTime), int64(sumTime/2), "tasks were run sequentially?") }) + + t.Run("non positive argument values", func(t *testing.T) { + for _, test := range []struct { + N int + M int + err error + }{ + {-1, 10, ErrGoroutinesLimitNonPositive}, + {0, 5, ErrGoroutinesLimitNonPositive}, + {-1, -1, ErrGoroutinesLimitNonPositive}, + {10, -1, ErrErrorsAmountNonPositive}, + {5, 0, ErrErrorsAmountNonPositive}, + } { + result := Run([]Task{func() error { return nil }}, test.N, test.M) + require.Equal(t, test.err, result) + } + }) }