diff --git a/hw06_pipeline_execution/go.mod b/hw06_pipeline_execution/go.mod index fbc0ee1..c4e36ad 100644 --- a/hw06_pipeline_execution/go.mod +++ b/hw06_pipeline_execution/go.mod @@ -1,5 +1,8 @@ -module github.com/fixme_my_friend/hw06_pipeline_execution +module github.com/ezhk/golang-learning/hw06_pipeline_execution go 1.14 -require github.com/stretchr/testify v1.5.1 +require ( + github.com/stretchr/testify v1.5.1 + mvdan.cc/xurls v1.1.0 // indirect +) diff --git a/hw06_pipeline_execution/go.sum b/hw06_pipeline_execution/go.sum index 331fa69..50eb423 100644 --- a/hw06_pipeline_execution/go.sum +++ b/hw06_pipeline_execution/go.sum @@ -9,3 +9,5 @@ gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405 h1:yhCVgyC4o1eVCa2tZl7eS0r+ gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/yaml.v2 v2.2.2 h1:ZCJp+EgiOT7lHqUV2J862kp8Qj64Jo6az82+3Td9dZw= gopkg.in/yaml.v2 v2.2.2/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= +mvdan.cc/xurls v1.1.0 h1:kj0j2lonKseISJCiq1Tfk+iTv65dDGCl0rTbanXJGGc= +mvdan.cc/xurls v1.1.0/go.mod h1:TNWuhvo+IqbUCmtUIb/3LJSQdrzel8loVpgFm0HikbI= diff --git a/hw06_pipeline_execution/pipeline.go b/hw06_pipeline_execution/pipeline.go index 79135bc..5e13ba2 100644 --- a/hw06_pipeline_execution/pipeline.go +++ b/hw06_pipeline_execution/pipeline.go @@ -6,9 +6,75 @@ type ( Bi = chan interface{} ) -type Stage func(in In) (out Out) +type ( + Stage func(in In) (out Out) + I interface{} +) func ExecutePipeline(in In, done In, stages ...Stage) Out { - // Place your code here - return nil + outCh := make(Bi) + + // border case when in channel is nil + if in == nil { + close(outCh) + return outCh + } + + queueChannels := make([]Bi, 0) + for v := range in { + stagesOut := make(Bi) + queueChannels = append(queueChannels, stagesOut) + + go processStages(v, done, stagesOut, stages...) + } + + // process answers by order + go func(out Bi) { + defer close(out) + + for _, ch := range queueChannels { + select { + case <-done: + return + case v, ok := <-ch: + if ok { + out <- v + } + } + } + }(outCh) + + return outCh +} + +func processStages(value I, done In, out chan<- interface{}, stages ...Stage) { + defer close(out) + + for _, stage := range stages { + bufferedInterCh := make(chan interface{}) + + // goroutine put value to channel for stage + go func() { + defer close(bufferedInterCh) + + select { + case <-done: + return + case bufferedInterCh <- value: + } + }() + + // read stage result + select { + case <-done: + return + case value = <-stage(bufferedInterCh): + } + } + + select { + case <-done: + return + case out <- value: + } } diff --git a/hw06_pipeline_execution/pipeline_test.go b/hw06_pipeline_execution/pipeline_test.go index acd9f36..6f3b424 100644 --- a/hw06_pipeline_execution/pipeline_test.go +++ b/hw06_pipeline_execution/pipeline_test.go @@ -90,4 +90,34 @@ func TestPipeline(t *testing.T) { require.Len(t, result, 0) require.Less(t, int64(elapsed), int64(abortDur)+int64(fault)) }) + + t.Run("empty stage", func(t *testing.T) { + stages := []Stage{ + g("Empty", func(v I) I { return nil }), + } + + in := make(Bi, 1) + in <- 0 + close(in) + + result := make([]interface{}, 0) + for s := range ExecutePipeline(in, nil, stages...) { + result = append(result, s) + } + + require.Equal(t, result, []interface{}{nil}) + }) + + t.Run("nil input channel", func(t *testing.T) { + stages := []Stage{ + g("Empty", func(v I) I { return true }), + } + + // nil channel: return empty close channel + iterateConter := 0 + for _ = range ExecutePipeline(nil, nil, stages...) { + iterateConter++ + } + require.Equal(t, iterateConter, 0) + }) }