Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion hw05_parallel_execution/go.mod
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
module github.com/fixme_my_friend/hw05_parallel_execution
module github.com/ezhk/golang-learning/hw05_parallel_execution

go 1.14

Expand Down
50 changes: 46 additions & 4 deletions hw05_parallel_execution/run.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,56 @@ package hw05_parallel_execution //nolint:golint,stylecheck

import (
"errors"
"sync"
"sync/atomic"
)

var ErrErrorsLimitExceeded = errors.New("errors limit exceeded")
var (
ErrErrorsLimitExceeded = errors.New("errors limit exceeded")
ErrGoroutinesLimitNonPositive = errors.New("goroutines limit contains non positive value")
ErrErrorsAmountNonPositive = errors.New("errors amount contains non positive value")
)

type Task func() error

// Run starts tasks in N goroutines and stops its work when receiving M errors from tasks
func Run(tasks []Task, N int, M int) error {
// Place your code here
func Run(tasks []Task, goroutinesLimit, maxErrors int) error {
if goroutinesLimit < 1 {
return ErrGoroutinesLimitNonPositive
}
if maxErrors < 1 {
return ErrErrorsAmountNonPositive
}

errorsLeft := int64(maxErrors)
concurrentCh := make(chan struct{}, goroutinesLimit)
defer close(concurrentCh)
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

зачем?


var wg sync.WaitGroup
for _, task := range tasks {
if atomic.LoadInt64(&errorsLeft) < 1 {
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

FYI: в https://github.com/uber-go/atomic есть API поприятнее

break
}

// wait for next step: use channel as task queue
concurrentCh <- struct{}{}

wg.Add(1)
go func(t Task) {
defer func() {
<-concurrentCh
wg.Done()
}()

if err := t(); err != nil {
atomic.AddInt64(&errorsLeft, -1)
}
}(task)
}
wg.Wait()

if errorsLeft < 1 {
return ErrErrorsLimitExceeded
}

return nil
}
17 changes: 17 additions & 0 deletions hw05_parallel_execution/run_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,4 +66,21 @@ func TestRun(t *testing.T) {
require.Equal(t, runTasksCount, int32(tasksCount), "not all tasks were completed")
require.LessOrEqual(t, int64(elapsedTime), int64(sumTime/2), "tasks were run sequentially?")
})

t.Run("non positive argument values", func(t *testing.T) {
for _, test := range []struct {
N int
M int
err error
}{
{-1, 10, ErrGoroutinesLimitNonPositive},
{0, 5, ErrGoroutinesLimitNonPositive},
{-1, -1, ErrGoroutinesLimitNonPositive},
{10, -1, ErrErrorsAmountNonPositive},
{5, 0, ErrErrorsAmountNonPositive},
} {
result := Run([]Task{func() error { return nil }}, test.N, test.M)
require.Equal(t, test.err, result)
}
})
}