From 79f894e245768bd63ea44168ec9556ea0696acb6 Mon Sep 17 00:00:00 2001 From: Andrey Kiselev Date: Sat, 4 Jul 2020 22:53:44 +0300 Subject: [PATCH 1/7] HW5 is completed --- hw05_parallel_execution/go.mod | 2 +- hw05_parallel_execution/run.go | 33 +++++++++++++++++++++++++++-- hw05_parallel_execution/run_test.go | 17 +++++++++++++++ 3 files changed, 49 insertions(+), 3 deletions(-) diff --git a/hw05_parallel_execution/go.mod b/hw05_parallel_execution/go.mod index 89e9922..84435c4 100644 --- a/hw05_parallel_execution/go.mod +++ b/hw05_parallel_execution/go.mod @@ -1,4 +1,4 @@ -module github.com/fixme_my_friend/hw05_parallel_execution +module github.com/ezhk/golang-learning/hw05_parallel_execution go 1.14 diff --git a/hw05_parallel_execution/run.go b/hw05_parallel_execution/run.go index 6be37a4..120df1d 100644 --- a/hw05_parallel_execution/run.go +++ b/hw05_parallel_execution/run.go @@ -5,11 +5,40 @@ import ( ) var ErrErrorsLimitExceeded = errors.New("errors limit exceeded") +var ErrGoroutinesLimitNotPositive = errors.New("goroutines limit set to negative value") type Task func() error // Run starts tasks in N goroutines and stops its work when receiving M errors from tasks -func Run(tasks []Task, N int, M int) error { - // Place your code here +func Run(tasks []Task, concurrentGoroutinesLimit int, allowedErrorsAmount int) error { + if concurrentGoroutinesLimit < 1 { + return ErrGoroutinesLimitNotPositive + } + + concurrentGoroutines := make(chan struct{}, concurrentGoroutinesLimit) + defer close(concurrentGoroutines) + + for _, t := range tasks { + if allowedErrorsAmount < 1 { + return ErrErrorsLimitExceeded + } + + concurrentGoroutines <- struct{}{} + go func(t Task) { + if err := t(); err != nil { + allowedErrorsAmount-- + } + <-concurrentGoroutines + }(t) + } + + // wait completed tasks + for len(concurrentGoroutines) > 0 { + } + + if allowedErrorsAmount < 1 { + return ErrErrorsLimitExceeded + } + return nil } diff --git a/hw05_parallel_execution/run_test.go b/hw05_parallel_execution/run_test.go index d1157c8..40a2954 100644 --- a/hw05_parallel_execution/run_test.go +++ b/hw05_parallel_execution/run_test.go @@ -66,4 +66,21 @@ func TestRun(t *testing.T) { require.Equal(t, runTasksCount, int32(tasksCount), "not all tasks were completed") require.LessOrEqual(t, int64(elapsedTime), int64(sumTime/2), "tasks were run sequentially?") }) + + t.Run("non positive argument values", func(t *testing.T) { + for _, test := range []struct { + N int + M int + err error + }{ + {-1, 10, ErrGoroutinesLimitNotPositive}, + {0, 5, ErrGoroutinesLimitNotPositive}, + {-1, -1, ErrGoroutinesLimitNotPositive}, + {10, -1, ErrErrorsLimitExceeded}, + {5, 0, ErrErrorsLimitExceeded}, + } { + result := Run([]Task{func() error { return nil }}, test.N, test.M) + require.Equal(t, test.err, result) + } + }) } From 758d8bddb7c1882baf6eb32c71e50f293972098b Mon Sep 17 00:00:00 2001 From: Andrey Kiselev Date: Sat, 4 Jul 2020 22:59:19 +0300 Subject: [PATCH 2/7] remove unused comment --- hw05_parallel_execution/run.go | 1 - 1 file changed, 1 deletion(-) diff --git a/hw05_parallel_execution/run.go b/hw05_parallel_execution/run.go index 120df1d..b02bbcb 100644 --- a/hw05_parallel_execution/run.go +++ b/hw05_parallel_execution/run.go @@ -9,7 +9,6 @@ var ErrGoroutinesLimitNotPositive = errors.New("goroutines limit set to negative type Task func() error -// Run starts tasks in N goroutines and stops its work when receiving M errors from tasks func Run(tasks []Task, concurrentGoroutinesLimit int, allowedErrorsAmount int) error { if concurrentGoroutinesLimit < 1 { return ErrGoroutinesLimitNotPositive From 15078991757393bfed7bac9daa7f9319a4f02d6d Mon Sep 17 00:00:00 2001 From: Andrey Kiselev Date: Sun, 5 Jul 2020 22:48:37 +0300 Subject: [PATCH 3/7] atomic decrement errors --- hw05_parallel_execution/run.go | 37 +++++++++++++++++------------ hw05_parallel_execution/run_test.go | 10 ++++---- 2 files changed, 27 insertions(+), 20 deletions(-) diff --git a/hw05_parallel_execution/run.go b/hw05_parallel_execution/run.go index b02bbcb..50caa7a 100644 --- a/hw05_parallel_execution/run.go +++ b/hw05_parallel_execution/run.go @@ -2,40 +2,47 @@ package hw05_parallel_execution //nolint:golint,stylecheck import ( "errors" + "sync/atomic" ) var ErrErrorsLimitExceeded = errors.New("errors limit exceeded") -var ErrGoroutinesLimitNotPositive = errors.New("goroutines limit set to negative value") +var ErrGoroutinesLimitNonPositive = errors.New("goroutines limit contains non positive value") +var ErrErrorsAmountNonPositive = errors.New("errors amount contains non positive value") type Task func() error -func Run(tasks []Task, concurrentGoroutinesLimit int, allowedErrorsAmount int) error { - if concurrentGoroutinesLimit < 1 { - return ErrGoroutinesLimitNotPositive +func Run(tasks []Task, goroutinesLimit int, allowedErrors int) error { + if goroutinesLimit < 1 { + return ErrGoroutinesLimitNonPositive + } + if allowedErrors < 1 { + return ErrErrorsAmountNonPositive } - concurrentGoroutines := make(chan struct{}, concurrentGoroutinesLimit) - defer close(concurrentGoroutines) + errorsLeft := int64(allowedErrors) + concurrentCh := make(chan struct{}, goroutinesLimit) - for _, t := range tasks { - if allowedErrorsAmount < 1 { + for idx, task := range tasks { + if errorsLeft < 1 { return ErrErrorsLimitExceeded } - concurrentGoroutines <- struct{}{} + concurrentCh <- struct{}{} go func(t Task) { + defer func() { <-concurrentCh }() + if err := t(); err != nil { - allowedErrorsAmount-- + atomic.AddInt64(&errorsLeft, -1) } - <-concurrentGoroutines - }(t) + }(task) } - // wait completed tasks - for len(concurrentGoroutines) > 0 { + // wait completed tasks and close chan + for len(concurrentCh) > 0 { } + close(concurrentCh) - if allowedErrorsAmount < 1 { + if errorsLeft < 1 { return ErrErrorsLimitExceeded } diff --git a/hw05_parallel_execution/run_test.go b/hw05_parallel_execution/run_test.go index 40a2954..d537183 100644 --- a/hw05_parallel_execution/run_test.go +++ b/hw05_parallel_execution/run_test.go @@ -73,11 +73,11 @@ func TestRun(t *testing.T) { M int err error }{ - {-1, 10, ErrGoroutinesLimitNotPositive}, - {0, 5, ErrGoroutinesLimitNotPositive}, - {-1, -1, ErrGoroutinesLimitNotPositive}, - {10, -1, ErrErrorsLimitExceeded}, - {5, 0, ErrErrorsLimitExceeded}, + {-1, 10, ErrGoroutinesLimitNonPositive}, + {0, 5, ErrGoroutinesLimitNonPositive}, + {-1, -1, ErrGoroutinesLimitNonPositive}, + {10, -1, ErrErrorsAmountNonPositive}, + {5, 0, ErrErrorsAmountNonPositive}, } { result := Run([]Task{func() error { return nil }}, test.N, test.M) require.Equal(t, test.err, result) From bb3ff5e6b204699bbda24e4b763709152391d8ae Mon Sep 17 00:00:00 2001 From: Andrey Kiselev Date: Sun, 5 Jul 2020 22:49:46 +0300 Subject: [PATCH 4/7] atomic decrement errors: remove unused variable --- hw05_parallel_execution/run.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/hw05_parallel_execution/run.go b/hw05_parallel_execution/run.go index 50caa7a..a8b34c1 100644 --- a/hw05_parallel_execution/run.go +++ b/hw05_parallel_execution/run.go @@ -22,7 +22,7 @@ func Run(tasks []Task, goroutinesLimit int, allowedErrors int) error { errorsLeft := int64(allowedErrors) concurrentCh := make(chan struct{}, goroutinesLimit) - for idx, task := range tasks { + for _, task := range tasks { if errorsLeft < 1 { return ErrErrorsLimitExceeded } From aa30d9f0efe9b87d6b4ee0568be9a60d028d27c9 Mon Sep 17 00:00:00 2001 From: Andrey Kiselev Date: Mon, 6 Jul 2020 12:44:52 +0300 Subject: [PATCH 5/7] update code style --- hw05_parallel_execution/run.go | 16 +++++++++------- 1 file changed, 9 insertions(+), 7 deletions(-) diff --git a/hw05_parallel_execution/run.go b/hw05_parallel_execution/run.go index a8b34c1..3953375 100644 --- a/hw05_parallel_execution/run.go +++ b/hw05_parallel_execution/run.go @@ -5,22 +5,25 @@ import ( "sync/atomic" ) -var ErrErrorsLimitExceeded = errors.New("errors limit exceeded") -var ErrGoroutinesLimitNonPositive = errors.New("goroutines limit contains non positive value") -var ErrErrorsAmountNonPositive = errors.New("errors amount contains non positive value") +var ( + ErrErrorsLimitExceeded = errors.New("errors limit exceeded") + ErrGoroutinesLimitNonPositive = errors.New("goroutines limit contains non positive value") + ErrErrorsAmountNonPositive = errors.New("errors amount contains non positive value") +) type Task func() error -func Run(tasks []Task, goroutinesLimit int, allowedErrors int) error { +func Run(tasks []Task, goroutinesLimit int, maxErrors int) error { if goroutinesLimit < 1 { return ErrGoroutinesLimitNonPositive } - if allowedErrors < 1 { + if maxErrors < 1 { return ErrErrorsAmountNonPositive } - errorsLeft := int64(allowedErrors) + errorsLeft := int64(maxErrors) concurrentCh := make(chan struct{}, goroutinesLimit) + defer close(concurrentCh) for _, task := range tasks { if errorsLeft < 1 { @@ -40,7 +43,6 @@ func Run(tasks []Task, goroutinesLimit int, allowedErrors int) error { // wait completed tasks and close chan for len(concurrentCh) > 0 { } - close(concurrentCh) if errorsLeft < 1 { return ErrErrorsLimitExceeded From a02193e8eaf419b6b0ca5aac17709113a0570acf Mon Sep 17 00:00:00 2001 From: Andrey Kiselev Date: Mon, 6 Jul 2020 13:37:06 +0300 Subject: [PATCH 6/7] race fix: use WaitGroup --- hw05_parallel_execution/run.go | 19 ++++++++++++------- 1 file changed, 12 insertions(+), 7 deletions(-) diff --git a/hw05_parallel_execution/run.go b/hw05_parallel_execution/run.go index 3953375..aed40ed 100644 --- a/hw05_parallel_execution/run.go +++ b/hw05_parallel_execution/run.go @@ -2,6 +2,7 @@ package hw05_parallel_execution //nolint:golint,stylecheck import ( "errors" + "sync" "sync/atomic" ) @@ -25,24 +26,28 @@ func Run(tasks []Task, goroutinesLimit int, maxErrors int) error { concurrentCh := make(chan struct{}, goroutinesLimit) defer close(concurrentCh) + var wg sync.WaitGroup for _, task := range tasks { - if errorsLeft < 1 { - return ErrErrorsLimitExceeded + if atomic.LoadInt64(&errorsLeft) < 1 { + break } + // wait for next step: use channel as task queue concurrentCh <- struct{}{} + + wg.Add(1) go func(t Task) { - defer func() { <-concurrentCh }() + defer func() { + <-concurrentCh + wg.Done() + }() if err := t(); err != nil { atomic.AddInt64(&errorsLeft, -1) } }(task) } - - // wait completed tasks and close chan - for len(concurrentCh) > 0 { - } + wg.Wait() if errorsLeft < 1 { return ErrErrorsLimitExceeded From 4d30fa52c39f54a93f2085dd70c68fea3f28e832 Mon Sep 17 00:00:00 2001 From: Andrey Kiselev Date: Mon, 6 Jul 2020 17:26:18 +0300 Subject: [PATCH 7/7] update with gofumpt --- hw05_parallel_execution/run.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/hw05_parallel_execution/run.go b/hw05_parallel_execution/run.go index aed40ed..8c12456 100644 --- a/hw05_parallel_execution/run.go +++ b/hw05_parallel_execution/run.go @@ -14,7 +14,7 @@ var ( type Task func() error -func Run(tasks []Task, goroutinesLimit int, maxErrors int) error { +func Run(tasks []Task, goroutinesLimit, maxErrors int) error { if goroutinesLimit < 1 { return ErrGoroutinesLimitNonPositive }