Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
40 changes: 40 additions & 0 deletions fstream/common_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
package fstream_test

import (
"testing"

"github.com/primetalk/goio/fstream"
"github.com/primetalk/goio/io"
"github.com/stretchr/testify/assert"
)

var nats = fstream.Nats()
var nats10 = fstream.Take(nats, 10)
var nats10Values = []int{1, 2, 3, 4, 5, 6, 7, 8, 9, 10}

var Mul2 = fstream.MapPipe(func(i int) int { return i * 2 })

func UnsafeStreamToSlice[A any](t *testing.T, stm fstream.Stream[A]) []A {
return UnsafeIO(t, fstream.ToSlice(stm))
}

func UnsafeIOStreamToSlice[A any](t *testing.T, stm fstream.IOStream[A]) []A {
return UnsafeIO(t, io.FlatMap(io.IO[fstream.Stream[A]](stm), fstream.ToSlice[A]))
}

func UnsafeIO[A any](t *testing.T, ioa io.IO[A]) A {
res, err1 := io.UnsafeRunSync(ioa)
assert.NoError(t, err1)
return res
}

func UnsafeIOExpectError[A any](t *testing.T, expected error, ioa io.IO[A]) {
_, err1 := io.UnsafeRunSync(ioa)
if assert.Error(t, err1) {
assert.Equal(t, expected, err1)
}
}

func TestNats(t *testing.T) {
assert.ElementsMatch(t, nats10Values, UnsafeIOStreamToSlice(t, nats10))
}
18 changes: 18 additions & 0 deletions fstream/construct.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
package fstream

import "github.com/primetalk/goio/stream"

// Nats returns an infinite stream of ints starting from 1.
func Nats() Stream[int] {
return LiftStream(stream.Nats())
}

// Generate constructs an infinite stream of values using the production function.
func Generate[A any, S any](zero S, f func(s S) (S, A)) Stream[A] {
return LiftStream(stream.Generate(zero, f))
}

// Unfold constructs an infinite stream of values using the production function.
func Unfold[A any](zero A, f func(A) A) Stream[A] {
return LiftStream(stream.Unfold(zero, f))
}
107 changes: 107 additions & 0 deletions fstream/execution.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,107 @@
package fstream

import (
"github.com/primetalk/goio/fun"
"github.com/primetalk/goio/io"
"github.com/primetalk/goio/option"
"github.com/primetalk/goio/stream"
)

// Collect collects all element from the stream and for each element invokes
// the provided function
func Collect[A any](stm Stream[A], collector func(A) error) io.IOUnit {
return io.AndThen(
io.Finally(stream.Collect(stm.init.sa, collector), stm.init.finalizer),
option.Match(stm.cont,
func(iosa2 IOStream[A]) io.IOUnit {
return io.FlatMap(io.IO[Stream[A]](iosa2), func(sa2 Stream[A]) io.IOUnit {
return Collect(sa2, collector)
})
},
fun.Delay(io.IOUnit1),
),
)
}

func returnNilError[A any](collector func(A)) func(A) error {
return func(a A) error {
collector(a)
return nil
}
}

// ForEach invokes a simple function for each element of the stream.
func ForEach[A any](stm Stream[A], collector func(A)) io.IO[fun.Unit] {
return Collect(stm, returnNilError(collector))
}

// DrainAll executes the stream and throws away all values.
func DrainAll[A any](stm Stream[A]) io.IO[fun.Unit] {
return Collect(stm, fun.Const[A, error](nil))
}

// AppendToSlice executes the stream and appends it's results to the slice.
func AppendToSlice[A any](stm Stream[A], start []A) io.IO[[]A] {
appendToStart := func(a A) error {
start = append(start, a)
return nil
}
return io.AndThen(
Collect(stm, appendToStart),
io.Delay(func() io.IO[[]A] {
return io.Lift(start)
}),
)
}

// ToSlice executes the stream and collects all results to a slice.
func ToSlice[A any](stm Stream[A]) io.IO[[]A] {
return AppendToSlice(stm, []A{})
}

// Head takes the first element and returns it.
// It'll fail if the stream is empty.
func Head[A any](stm Stream[A]) io.IO[A] {
res := StreamMatch[A, A](
stm,
/*onFinish*/ func() io.IO[A] {
return io.Fail[A](stream.ErrHeadOfEmptyStream)
},
/*onValue */ func(a A, tail Stream[A]) io.IO[A] {
return io.AndThen(tail.init.finalizer, io.Lift(a))
},
/*onEmpty */ func(tail Stream[A]) io.IO[A] {
return Head(tail)
},
/*onError */ func(err error) io.IO[A] {
return io.Fail[A](err)
},
)
return res
}

// IOHead takes the first element of io-stream and returns it.
// It'll fail if the stream is empty.
func IOHead[A any](stm IOStream[A]) io.IO[A] {
return io.FlatMap[Stream[A]](io.IO[Stream[A]](stm), Head[A])
}

// Last keeps track of the current element of the stream
// and returns it when the stream completes.
func Last[A any](stm Stream[A]) io.IO[A] {
ea := EmptyIO[A]()
return IOHead(StateFlatMapWithFinish[A, A, option.Option[A]](
stm,
/*zero*/ option.None[A](),
/*prefix*/ EmptyIO[A](),
/*f*/ func(a A, s option.Option[A]) (option.Option[A], IOStream[A]) {
return option.Some(a), ea
},
/*onFinish*/ func(s option.Option[A]) IOStream[A] {
return option.Match(s,
LiftIO[A],
EmptyIO[A],
)
},
))
}
Loading