From 386d7d7a07353e5f2e46d935267fed2e0684bac5 Mon Sep 17 00:00:00 2001 From: Andrey Kiselev Date: Sat, 25 Jul 2020 22:29:26 +0300 Subject: [PATCH 1/2] HW6 is completed --- hw06_pipeline_execution/go.mod | 7 ++- hw06_pipeline_execution/go.sum | 2 + hw06_pipeline_execution/pipeline.go | 68 +++++++++++++++++++++++- hw06_pipeline_execution/pipeline_test.go | 30 +++++++++++ 4 files changed, 103 insertions(+), 4 deletions(-) diff --git a/hw06_pipeline_execution/go.mod b/hw06_pipeline_execution/go.mod index fbc0ee1..c4e36ad 100644 --- a/hw06_pipeline_execution/go.mod +++ b/hw06_pipeline_execution/go.mod @@ -1,5 +1,8 @@ -module github.com/fixme_my_friend/hw06_pipeline_execution +module github.com/ezhk/golang-learning/hw06_pipeline_execution go 1.14 -require github.com/stretchr/testify v1.5.1 +require ( + github.com/stretchr/testify v1.5.1 + mvdan.cc/xurls v1.1.0 // indirect +) diff --git a/hw06_pipeline_execution/go.sum b/hw06_pipeline_execution/go.sum index 331fa69..50eb423 100644 --- a/hw06_pipeline_execution/go.sum +++ b/hw06_pipeline_execution/go.sum @@ -9,3 +9,5 @@ gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405 h1:yhCVgyC4o1eVCa2tZl7eS0r+ gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/yaml.v2 v2.2.2 h1:ZCJp+EgiOT7lHqUV2J862kp8Qj64Jo6az82+3Td9dZw= gopkg.in/yaml.v2 v2.2.2/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= +mvdan.cc/xurls v1.1.0 h1:kj0j2lonKseISJCiq1Tfk+iTv65dDGCl0rTbanXJGGc= +mvdan.cc/xurls v1.1.0/go.mod h1:TNWuhvo+IqbUCmtUIb/3LJSQdrzel8loVpgFm0HikbI= diff --git a/hw06_pipeline_execution/pipeline.go b/hw06_pipeline_execution/pipeline.go index 79135bc..7485583 100644 --- a/hw06_pipeline_execution/pipeline.go +++ b/hw06_pipeline_execution/pipeline.go @@ -7,8 +7,72 @@ type ( ) type Stage func(in In) (out Out) +type I interface{} func ExecutePipeline(in In, done In, stages ...Stage) Out { - // Place your code here - return nil + outCh := make(Bi) + + // border case when in channel is nil + if in == nil { + close(outCh) + return outCh + } + + queueChannels := make([]Bi, 0) + for v := range in { + stagesOut := make(Bi) + queueChannels = append(queueChannels, stagesOut) + + go processStages(v, done, stagesOut, stages...) + } + + // process answers by order + go func(out Bi) { + defer close(out) + + for _, ch := range queueChannels { + select { + case <-done: + return + case v, ok := <-ch: + if ok { + out <- v + } + } + } + }(outCh) + + return outCh +} + +func processStages(value I, done In, out chan<- interface{}, stages ...Stage) { + defer close(out) + + for _, stage := range stages { + bufferedInterCh := make(chan interface{}) + + // goroutine put value to channel for stage + go func() { + defer close(bufferedInterCh) + + select { + case <-done: + return + case bufferedInterCh <- value: + } + }() + + // read stage result + select { + case <-done: + return + case value = <-stage(bufferedInterCh): + } + } + + select { + case <-done: + return + case out <- value: + } } diff --git a/hw06_pipeline_execution/pipeline_test.go b/hw06_pipeline_execution/pipeline_test.go index acd9f36..6f3b424 100644 --- a/hw06_pipeline_execution/pipeline_test.go +++ b/hw06_pipeline_execution/pipeline_test.go @@ -90,4 +90,34 @@ func TestPipeline(t *testing.T) { require.Len(t, result, 0) require.Less(t, int64(elapsed), int64(abortDur)+int64(fault)) }) + + t.Run("empty stage", func(t *testing.T) { + stages := []Stage{ + g("Empty", func(v I) I { return nil }), + } + + in := make(Bi, 1) + in <- 0 + close(in) + + result := make([]interface{}, 0) + for s := range ExecutePipeline(in, nil, stages...) { + result = append(result, s) + } + + require.Equal(t, result, []interface{}{nil}) + }) + + t.Run("nil input channel", func(t *testing.T) { + stages := []Stage{ + g("Empty", func(v I) I { return true }), + } + + // nil channel: return empty close channel + iterateConter := 0 + for _ = range ExecutePipeline(nil, nil, stages...) { + iterateConter++ + } + require.Equal(t, iterateConter, 0) + }) } From 8b31d06e7ce5d284d9748b3ef94bc4a48c97640c Mon Sep 17 00:00:00 2001 From: Andrey Kiselev Date: Sat, 25 Jul 2020 22:35:19 +0300 Subject: [PATCH 2/2] gofumpt tidy --- hw06_pipeline_execution/pipeline.go | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/hw06_pipeline_execution/pipeline.go b/hw06_pipeline_execution/pipeline.go index 7485583..5e13ba2 100644 --- a/hw06_pipeline_execution/pipeline.go +++ b/hw06_pipeline_execution/pipeline.go @@ -6,8 +6,10 @@ type ( Bi = chan interface{} ) -type Stage func(in In) (out Out) -type I interface{} +type ( + Stage func(in In) (out Out) + I interface{} +) func ExecutePipeline(in In, done In, stages ...Stage) Out { outCh := make(Bi)