Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 5 additions & 2 deletions hw06_pipeline_execution/go.mod
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
module github.com/fixme_my_friend/hw06_pipeline_execution
module github.com/ezhk/golang-learning/hw06_pipeline_execution

go 1.14

require github.com/stretchr/testify v1.5.1
require (
github.com/stretchr/testify v1.5.1
mvdan.cc/xurls v1.1.0 // indirect
)
2 changes: 2 additions & 0 deletions hw06_pipeline_execution/go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -9,3 +9,5 @@ gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405 h1:yhCVgyC4o1eVCa2tZl7eS0r+
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/yaml.v2 v2.2.2 h1:ZCJp+EgiOT7lHqUV2J862kp8Qj64Jo6az82+3Td9dZw=
gopkg.in/yaml.v2 v2.2.2/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
mvdan.cc/xurls v1.1.0 h1:kj0j2lonKseISJCiq1Tfk+iTv65dDGCl0rTbanXJGGc=
mvdan.cc/xurls v1.1.0/go.mod h1:TNWuhvo+IqbUCmtUIb/3LJSQdrzel8loVpgFm0HikbI=
72 changes: 69 additions & 3 deletions hw06_pipeline_execution/pipeline.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,75 @@ type (
Bi = chan interface{}
)

type Stage func(in In) (out Out)
type (
Stage func(in In) (out Out)
I interface{}
)

func ExecutePipeline(in In, done In, stages ...Stage) Out {
// Place your code here
return nil
outCh := make(Bi)

// border case when in channel is nil
if in == nil {
close(outCh)
return outCh
}

queueChannels := make([]Bi, 0)
for v := range in {
stagesOut := make(Bi)
queueChannels = append(queueChannels, stagesOut)

go processStages(v, done, stagesOut, stages...)
}

// process answers by order
go func(out Bi) {
defer close(out)

for _, ch := range queueChannels {
select {
case <-done:
return
case v, ok := <-ch:
if ok {
out <- v
}
}
}
}(outCh)

return outCh
}

func processStages(value I, done In, out chan<- interface{}, stages ...Stage) {
defer close(out)

for _, stage := range stages {
bufferedInterCh := make(chan interface{})

// goroutine put value to channel for stage
go func() {
defer close(bufferedInterCh)

select {
case <-done:
return
case bufferedInterCh <- value:
}
}()

// read stage result
select {
case <-done:
return
case value = <-stage(bufferedInterCh):
}
}

select {
case <-done:
return
case out <- value:
}
}
30 changes: 30 additions & 0 deletions hw06_pipeline_execution/pipeline_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,4 +90,34 @@ func TestPipeline(t *testing.T) {
require.Len(t, result, 0)
require.Less(t, int64(elapsed), int64(abortDur)+int64(fault))
})

t.Run("empty stage", func(t *testing.T) {
stages := []Stage{
g("Empty", func(v I) I { return nil }),
}

in := make(Bi, 1)
in <- 0
close(in)

result := make([]interface{}, 0)
for s := range ExecutePipeline(in, nil, stages...) {
result = append(result, s)
}

require.Equal(t, result, []interface{}{nil})
})

t.Run("nil input channel", func(t *testing.T) {
stages := []Stage{
g("Empty", func(v I) I { return true }),
}

// nil channel: return empty close channel
iterateConter := 0
for _ = range ExecutePipeline(nil, nil, stages...) {
iterateConter++
}
require.Equal(t, iterateConter, 0)
})
}