From a84ca85433677d4ef260351f7dfc5edaeb15590b Mon Sep 17 00:00:00 2001 From: Arseniy Zhizhelev Date: Thu, 29 Jun 2023 09:06:24 +0300 Subject: [PATCH] Add initial fstream implementation --- fstream/common_test.go | 40 ++++ fstream/construct.go | 18 ++ fstream/execution.go | 107 +++++++++++ fstream/fstream.go | 399 ++++++++++++++++++++++++++++++++++++++++ fstream/fstream_test.go | 70 +++++++ fstream/lengthy.go | 91 +++++++++ fstream/lengthy_test.go | 20 ++ 7 files changed, 745 insertions(+) create mode 100644 fstream/common_test.go create mode 100644 fstream/construct.go create mode 100644 fstream/execution.go create mode 100644 fstream/fstream.go create mode 100644 fstream/fstream_test.go create mode 100644 fstream/lengthy.go create mode 100644 fstream/lengthy_test.go diff --git a/fstream/common_test.go b/fstream/common_test.go new file mode 100644 index 0000000..4b19908 --- /dev/null +++ b/fstream/common_test.go @@ -0,0 +1,40 @@ +package fstream_test + +import ( + "testing" + + "github.com/primetalk/goio/fstream" + "github.com/primetalk/goio/io" + "github.com/stretchr/testify/assert" +) + +var nats = fstream.Nats() +var nats10 = fstream.Take(nats, 10) +var nats10Values = []int{1, 2, 3, 4, 5, 6, 7, 8, 9, 10} + +var Mul2 = fstream.MapPipe(func(i int) int { return i * 2 }) + +func UnsafeStreamToSlice[A any](t *testing.T, stm fstream.Stream[A]) []A { + return UnsafeIO(t, fstream.ToSlice(stm)) +} + +func UnsafeIOStreamToSlice[A any](t *testing.T, stm fstream.IOStream[A]) []A { + return UnsafeIO(t, io.FlatMap(io.IO[fstream.Stream[A]](stm), fstream.ToSlice[A])) +} + +func UnsafeIO[A any](t *testing.T, ioa io.IO[A]) A { + res, err1 := io.UnsafeRunSync(ioa) + assert.NoError(t, err1) + return res +} + +func UnsafeIOExpectError[A any](t *testing.T, expected error, ioa io.IO[A]) { + _, err1 := io.UnsafeRunSync(ioa) + if assert.Error(t, err1) { + assert.Equal(t, expected, err1) + } +} + +func TestNats(t *testing.T) { + assert.ElementsMatch(t, nats10Values, UnsafeIOStreamToSlice(t, nats10)) +} diff --git a/fstream/construct.go b/fstream/construct.go new file mode 100644 index 0000000..7c2f462 --- /dev/null +++ b/fstream/construct.go @@ -0,0 +1,18 @@ +package fstream + +import "github.com/primetalk/goio/stream" + +// Nats returns an infinite stream of ints starting from 1. +func Nats() Stream[int] { + return LiftStream(stream.Nats()) +} + +// Generate constructs an infinite stream of values using the production function. +func Generate[A any, S any](zero S, f func(s S) (S, A)) Stream[A] { + return LiftStream(stream.Generate(zero, f)) +} + +// Unfold constructs an infinite stream of values using the production function. +func Unfold[A any](zero A, f func(A) A) Stream[A] { + return LiftStream(stream.Unfold(zero, f)) +} diff --git a/fstream/execution.go b/fstream/execution.go new file mode 100644 index 0000000..47a61e1 --- /dev/null +++ b/fstream/execution.go @@ -0,0 +1,107 @@ +package fstream + +import ( + "github.com/primetalk/goio/fun" + "github.com/primetalk/goio/io" + "github.com/primetalk/goio/option" + "github.com/primetalk/goio/stream" +) + +// Collect collects all element from the stream and for each element invokes +// the provided function +func Collect[A any](stm Stream[A], collector func(A) error) io.IOUnit { + return io.AndThen( + io.Finally(stream.Collect(stm.init.sa, collector), stm.init.finalizer), + option.Match(stm.cont, + func(iosa2 IOStream[A]) io.IOUnit { + return io.FlatMap(io.IO[Stream[A]](iosa2), func(sa2 Stream[A]) io.IOUnit { + return Collect(sa2, collector) + }) + }, + fun.Delay(io.IOUnit1), + ), + ) +} + +func returnNilError[A any](collector func(A)) func(A) error { + return func(a A) error { + collector(a) + return nil + } +} + +// ForEach invokes a simple function for each element of the stream. +func ForEach[A any](stm Stream[A], collector func(A)) io.IO[fun.Unit] { + return Collect(stm, returnNilError(collector)) +} + +// DrainAll executes the stream and throws away all values. +func DrainAll[A any](stm Stream[A]) io.IO[fun.Unit] { + return Collect(stm, fun.Const[A, error](nil)) +} + +// AppendToSlice executes the stream and appends it's results to the slice. +func AppendToSlice[A any](stm Stream[A], start []A) io.IO[[]A] { + appendToStart := func(a A) error { + start = append(start, a) + return nil + } + return io.AndThen( + Collect(stm, appendToStart), + io.Delay(func() io.IO[[]A] { + return io.Lift(start) + }), + ) +} + +// ToSlice executes the stream and collects all results to a slice. +func ToSlice[A any](stm Stream[A]) io.IO[[]A] { + return AppendToSlice(stm, []A{}) +} + +// Head takes the first element and returns it. +// It'll fail if the stream is empty. +func Head[A any](stm Stream[A]) io.IO[A] { + res := StreamMatch[A, A]( + stm, + /*onFinish*/ func() io.IO[A] { + return io.Fail[A](stream.ErrHeadOfEmptyStream) + }, + /*onValue */ func(a A, tail Stream[A]) io.IO[A] { + return io.AndThen(tail.init.finalizer, io.Lift(a)) + }, + /*onEmpty */ func(tail Stream[A]) io.IO[A] { + return Head(tail) + }, + /*onError */ func(err error) io.IO[A] { + return io.Fail[A](err) + }, + ) + return res +} + +// IOHead takes the first element of io-stream and returns it. +// It'll fail if the stream is empty. +func IOHead[A any](stm IOStream[A]) io.IO[A] { + return io.FlatMap[Stream[A]](io.IO[Stream[A]](stm), Head[A]) +} + +// Last keeps track of the current element of the stream +// and returns it when the stream completes. +func Last[A any](stm Stream[A]) io.IO[A] { + ea := EmptyIO[A]() + return IOHead(StateFlatMapWithFinish[A, A, option.Option[A]]( + stm, + /*zero*/ option.None[A](), + /*prefix*/ EmptyIO[A](), + /*f*/ func(a A, s option.Option[A]) (option.Option[A], IOStream[A]) { + return option.Some(a), ea + }, + /*onFinish*/ func(s option.Option[A]) IOStream[A] { + return option.Match(s, + LiftIO[A], + EmptyIO[A], + ) + }, + )) +} diff --git a/fstream/fstream.go b/fstream/fstream.go new file mode 100644 index 0000000..9f32ce9 --- /dev/null +++ b/fstream/fstream.go @@ -0,0 +1,399 @@ +package fstream + +import ( + "log" + + "github.com/primetalk/goio/fun" + "github.com/primetalk/goio/io" + "github.com/primetalk/goio/option" + "github.com/primetalk/goio/stream" +) + +type Scope struct { + Finalizer io.IOUnit +} + +type ScopedStream[A any] struct { + sa stream.Stream[A] + finalizer io.IOUnit +} + +func NewScopedStream[A any](sa stream.Stream[A], finalizer io.IOUnit) ScopedStream[A] { + return ScopedStream[A]{ + sa: sa, + finalizer: finalizer, + } +} + +func (stm ScopedStream[A]) Close() io.IOUnit { + return stm.finalizer +} + +func ScopedStreamMatch[A any, B any](stm ScopedStream[A], + onFinish func() io.IO[B], + onValue func(a A, tail ScopedStream[A]) io.IO[B], + onEmpty func(tail ScopedStream[A]) io.IO[B], + onError func(err error) io.IO[B], +) io.IO[B] { + return stream.StreamMatch(stm.sa, + func() io.IO[B] { + return io.AndThen(stm.finalizer, onFinish()) + }, + func(a A, tail stream.Stream[A]) io.IO[B] { + return onValue(a, NewScopedStream(tail, stm.finalizer)) + }, + func(tail stream.Stream[A]) io.IO[B] { + return onEmpty(NewScopedStream(tail, stm.finalizer)) + }, + func(err error) io.IO[B] { + return io.Fold(stm.finalizer, + func(u fun.Unit) io.IO[B] { + return onError(err) + }, + func(err2 error) io.IO[B] { + log.Printf("double error during fstream.ScopedStreamMatch: %+v", err2) + return onError(err) + }) + }, + ) +} + +// fstream.Stream is similar to stream.Stream, but it keeps track of +// a finalizer for the stream +// - an action that needs to be executed after the stream is finished. +// We maintain finalizer in all operations. +// Finalizer is executed regardless of the stream failure. +// After finalizer, some tail stream could be returned. +// Finalizer is the very first operation of the second stream. +type Stream[A any] struct { + init ScopedStream[A] + cont option.Option[IOStream[A]] +} + +type NotYetOpenedStream[A any] io.IO[Stream[A]] +type IOStream[A any] io.IO[Stream[A]] + +func IOStreamFlatMap[A any, B any](ios IOStream[A], f func(Stream[A]) io.IO[B]) io.IO[B] { + return io.FlatMap(io.IO[Stream[A]](ios), f) +} + +type Pipe[A any, B any] func(Stream[A]) Stream[B] +type IOPipe[A any, B any] func(Stream[A]) io.IO[Stream[B]] + +// NewStreamWithConfinuation constructs a fresh instance of stream. +func NewStream[A any](init ScopedStream[A], last option.Option[IOStream[A]]) Stream[A] { + return Stream[A]{ + init: init, + cont: last, + } +} + +// NewStreamWithConfinuation constructs a fresh instance of stream. +func NewStreamWithConfinuation[A any](init stream.Stream[A], finalizer io.IOUnit, last IOStream[A]) Stream[A] { + return NewStream(NewScopedStream(init, finalizer), option.Some(last)) +} + +// NewStreamWithoutConfinuation constructs a fresh instance of stream. +func NewStreamWithoutConfinuation[A any](init stream.Stream[A], finalizer io.IOUnit) Stream[A] { + return NewStream(NewScopedStream(init, finalizer), option.None[IOStream[A]]()) +} + +func EmptyIO[A any]() IOStream[A] { + return IOStream[A](io.Delay(func() io.IO[Stream[A]] { return io.Lift(Empty[A]()) })) +} + +func Empty[A any]() Stream[A] { + return NewStreamWithoutConfinuation(stream.Empty[A](), io.IOUnit1) +} + +// Lift converts ordinary stream to fstream.Stream. +func Lift[A any](a A) Stream[A] { + return LiftStream(stream.Lift(a)) +} + +// LiftIO returns an io-stream of a single element a. +func LiftIO[A any](a A) IOStream[A] { + return Lift(a).ToIOStream() +} + +// Lift converts ordinary stream to fstream.Stream. +func LiftMany[A any](as ...A) Stream[A] { + return LiftStream(stream.LiftMany(as...)) +} + +// Lift converts ordinary stream to fstream.Stream. +func LiftStream[A any](sa stream.Stream[A]) Stream[A] { + return NewStreamWithoutConfinuation(sa, io.IOUnit1) +} + +// LiftStreamIO converts ordinary stream to fstream.Stream. +func LiftStreamIO[A any](sa stream.Stream[A]) IOStream[A] { + return IOStream[A](io.Delay(func() io.IO[Stream[A]] { return io.Lift(LiftStream(sa)) })) +} + +func (sa Stream[A]) ToIOStream() IOStream[A] { + return IOStream[A](io.Lift(sa)) +} + +func (iosa IOStream[A]) ToStream() Stream[A] { + return NewStream(NewScopedStream(stream.Empty[A](), io.IOUnit1), option.Some(iosa)) +} + +// ConcatPlainStreams concanenates a pair of ordinary streams. +func ConcatPlainStreams[A any](sa1 stream.Stream[A], sa2 stream.Stream[A]) (res Stream[A]) { + return NewStreamWithConfinuation(sa1, io.IOUnit1, LiftStreamIO(sa2)) +} + +// StreamMatch performs arbitrary processing of a stream's single step result. +// in case of errors, it'll first run finalizer and then the user function. +func StreamMatch[A any, B any]( + stm Stream[A], + onFinish func() io.IO[B], + onValue func(a A, tail Stream[A]) io.IO[B], + onEmpty func(tail Stream[A]) io.IO[B], + onError func(err error) io.IO[B], +) io.IO[B] { + return stream.StreamMatch(stm.init.sa, + func() io.IO[B] { + afterFinalizer := option.Match(stm.cont, + func(cont IOStream[A]) io.IO[B] { + return io.FlatMap(io.IO[Stream[A]](cont), func(stm2 Stream[A]) io.IO[B] { + return StreamMatch(stm2, onFinish, onValue, onEmpty, onError) + }) + }, + onFinish, + ) + return io.AndThen(stm.init.finalizer, afterFinalizer) + }, + func(a A, tail stream.Stream[A]) io.IO[B] { + return onValue(a, NewStream(NewScopedStream(tail, stm.init.finalizer), stm.cont)) + }, + func(tail stream.Stream[A]) io.IO[B] { + return onEmpty(NewStream(NewScopedStream(tail, stm.init.finalizer), stm.cont)) + }, + func(err error) io.IO[B] { + // from the finalizer stream we only need the first side effect. + return io.AndThen(stm.init.finalizer, onError(err)) + }, + ) +} + +func IOStreamMatch[A any, B any]( + stm Stream[A], + onFinish func() IOStream[B], + onValue func(a A, tail Stream[A]) IOStream[B], + onEmpty func(tail Stream[A]) IOStream[B], + onError func(err error) IOStream[B], +) IOStream[B] { + return IOStream[B](StreamMatch(stm, + func() io.IO[Stream[B]] { return io.IO[Stream[B]](onFinish()) }, + func(a A, tail Stream[A]) io.IO[Stream[B]] { return io.IO[Stream[B]](onValue(a, tail)) }, + func(tail Stream[A]) io.IO[Stream[B]] { return io.IO[Stream[B]](onEmpty(tail)) }, + func(err error) io.IO[Stream[B]] { return io.IO[Stream[B]](onError(err)) }, + )) +} + +// MapEval maps the values of the stream. The provided function returns an IO. +func MapEval[A any, B any](stm Stream[A], f func(a A) io.IO[B]) Stream[B] { + return NewStream(NewScopedStream(stream.MapEval(stm.init.sa, f), stm.init.finalizer), + option.Option[IOStream[B]](option.Map(stm.cont, func(iosa IOStream[A]) IOStream[B] { + return IOStream[B](io.Map(io.IO[Stream[A]](iosa), func(sa Stream[A]) Stream[B] { + return MapEval(sa, f) + })) + })), + ) +} + +// Map converts values of the stream. +func Map[A any, B any](stm Stream[A], f func(a A) B) Stream[B] { + return NewStream(NewScopedStream(stream.Map(stm.init.sa, f), stm.init.finalizer), + option.Map(stm.cont, func(iosa IOStream[A]) IOStream[B] { + return IOStream[B](io.Map(io.IO[Stream[A]](iosa), func(sa Stream[A]) Stream[B] { + return Map(sa, f) + })) + }), + ) +} + +// MapPipe creates a pipe that maps one stream through the provided function. +func MapPipe[A any, B any](f func(a A) B) Pipe[A, B] { + return func(sa Stream[A]) Stream[B] { + return Map(sa, f) + } +} + +// AndThen appends another stream after the end of the first one. +// Deprecated. The second stream won't be finalized. +func AndThen[A any](stm1 Stream[A], stm2 Stream[A]) Stream[A] { + return AndThenLazy(stm1, IOStream[A](io.Lift(stm2))) +} + +// AndThenLazy appends another stream. The other stream is constructed lazily. +// If the second stream has not constructed, it won't be finalized as well. +func AndThenLazy[A any](stm1 Stream[A], stm2 IOStream[A]) Stream[A] { + return option.Match(stm1.cont, + func(iosa IOStream[A]) Stream[A] { + return NewStream(stm1.init, option.Some( + IOStream[A](io.Map(io.IO[Stream[A]](iosa), func(sa Stream[A]) Stream[A] { + return AndThenLazy(sa, stm2) + })), + )) + }, + func() Stream[A] { + return NewStream(stm1.init, option.Some(stm2)) + }, + ) +} + +// IOAndThen concatenates a pair of io streams. +func IOAndThen[A any](stm1 IOStream[A], stm2 IOStream[A]) IOStream[A] { + return IOStream[A]( + io.FlatMap( + io.IO[Stream[A]](stm1), + func(stm Stream[A]) io.IO[Stream[A]] { + return io.Lift(AndThenLazy(stm, stm2)) + }), + ) +} + +// FlatMap constructs a stream of streams. +func FlatMap[A any, B any](stm Stream[A], f func(a A) IOStream[B]) IOStream[B] { + return IOStream[B](StreamMatch(stm, + func() io.IO[Stream[B]] { + return io.IO[Stream[B]](EmptyIO[B]()) + }, + func(a A, tail Stream[A]) io.IO[Stream[B]] { + return io.Map(io.IO[Stream[B]](f(a)), func(sb Stream[B]) Stream[B] { + return AndThenLazy(sb, + IOStream[B](io.Delay(func() io.IO[Stream[B]] { + return io.IO[Stream[B]](FlatMap(tail, f)) + })), + ) + }) + }, + func(tail Stream[A]) io.IO[Stream[B]] { + return io.IO[Stream[B]](FlatMap(tail, f)) + }, + func(err error) io.IO[Stream[B]] { + return io.Fail[Stream[B]](err) + }, + )) +} + +// FlatMapPipe creates a pipe that flatmaps one stream through the provided function. +func FlatMapPipe[A any, B any](f func(a A) IOStream[B]) IOPipe[A, B] { + return func(sa Stream[A]) io.IO[Stream[B]] { + return io.IO[Stream[B]](FlatMap(sa, f)) + } +} + +// Flatten simplifies a stream of streams to just the stream of values by concatenating all +// inner streams. +func Flatten[A any](stm Stream[IOStream[A]]) IOStream[A] { + return FlatMap(stm, fun.Identity[IOStream[A]]) +} + +// StateFlatMap maintains state along the way. +func StateFlatMap[A any, B any, S any](stm Stream[A], zero S, f func(a A, s S) (S, IOStream[B])) IOStream[B] { + esb := EmptyIO[B]() + onFinish := func(S) IOStream[B] { return esb } + return StateFlatMapWithFinish(stm, zero, esb, f, onFinish) +} + +// StateFlatMapWithFinish maintains state along the way. +// When the source stream finishes, it invokes onFinish with the last state. +func StateFlatMapWithFinish[A any, B any, S any]( + stm Stream[A], + zero S, + prefix IOStream[B], + f func(a A, s S) (S, IOStream[B]), + onFinish func(s S) IOStream[B], +) IOStream[B] { + res := StreamMatch[A, Stream[B]]( + stm, + /*onFinish*/ func() io.IO[Stream[B]] { + return io.IO[Stream[B]](IOAndThen(prefix, onFinish(zero))) + }, + /*onValue */ func(a A, tail Stream[A]) io.IO[Stream[B]] { + zero2, stm2 := f(a, zero) + return io.IO[Stream[B]](StateFlatMapWithFinish(tail, zero2, IOAndThen(prefix, stm2), f, onFinish)) + }, + /*onEmpty */ func(tail Stream[A]) io.IO[Stream[B]] { + return io.IO[Stream[B]](StateFlatMapWithFinish(tail, zero, prefix, f, onFinish)) + }, + /*onError */ func(err error) io.IO[Stream[B]] { + return io.Fail[Stream[B]](err) + }, + ) + return IOStream[B](res) +} + +// Filter leaves in the stream only the elements that satisfy the given predicate. +func Filter[A any](stm Stream[A], predicate func(A) bool) IOStream[A] { + res := StreamMatch[A, Stream[A]]( + stm, + /*onFinish*/ func() io.IO[Stream[A]] { + return io.IO[Stream[A]](EmptyIO[A]()) + }, + /*onValue */ func(a A, tail Stream[A]) io.IO[Stream[A]] { + if predicate(a) { + return io.Lift(AndThen(Lift(a), tail)) + } else { + return io.Lift(tail) + } + }, + /*onEmpty */ func(tail Stream[A]) io.IO[Stream[A]] { + return io.Lift(tail) + }, + /*onError */ func(err error) io.IO[Stream[A]] { + return io.Fail[Stream[A]](err) + }, + ) + return IOStream[A](res) +} + +// Sum is a pipe that returns a stream of 1 element that is +// the sum of all elements of the original stream. +func Sum[A fun.Number](sa Stream[A]) IOStream[A] { + h := stream.Head(stream.Sum(sa.init.sa)) + hf := io.Finally(h, sa.init.finalizer) + hfs := stream.Eval(hf) + return option.Match(sa.cont, + func(sa2 IOStream[A]) IOStream[A] { + iosa2 := io.IO[Stream[A]](sa2) + return IOStream[A](io.FlatMap(hf, + func(sum1 A) io.IO[Stream[A]] { + return io.FlatMap(iosa2, + func(s2 Stream[A]) io.IO[Stream[A]] { + sum2 := io.IO[Stream[A]](Sum(s2)) + return io.Map(sum2, func(stm3 Stream[A]) Stream[A] { + return Map(stm3, func(el A) A { + return sum1 + el + }) + }) + }) + })) + }, + func() IOStream[A] { return LiftStreamIO(hfs) }, + ) +} + +// Sum2 is a pipe that returns a stream of 1 element that is sum of all elements of the original stream. +// It's another implementation of the same logic as in Sum. +func Sum2[A fun.Number](sa Stream[A]) IOStream[A] { + var zero A + emptyIOA := EmptyIO[A]() + return StateFlatMapWithFinish(sa, + zero, + emptyIOA, + func(a A, s A) (A, IOStream[A]) { + return s + a, emptyIOA + }, + LiftIO[A], + ) +} + +// Len is a pipe that returns a stream of 1 element that is the count of elements of the original stream. +func Len[A any](sa Stream[A]) IOStream[int] { + return Sum(Map(sa, fun.Const[A](1))) +} diff --git a/fstream/fstream_test.go b/fstream/fstream_test.go new file mode 100644 index 0000000..624837b --- /dev/null +++ b/fstream/fstream_test.go @@ -0,0 +1,70 @@ +package fstream_test + +import ( + "testing" + + "github.com/primetalk/goio/fstream" + "github.com/primetalk/goio/io" + "github.com/stretchr/testify/assert" +) + +func TestStream(t *testing.T) { + empty := fstream.Empty[int]() + _ = UnsafeIO(t, fstream.DrainAll(empty)) + stream10_12 := fstream.LiftMany(10, 11, 12) + stream20_24 := Mul2(stream10_12) + res := UnsafeIO(t, fstream.ToSlice(stream20_24)) + assert.Equal(t, []int{20, 22, 24}, res) +} + +func TestGenerate(t *testing.T) { + powers2 := fstream.Unfold(1, func(s int) int { + return s * 2 + }) + + res := UnsafeIO(t, fstream.Head(powers2)) + assert.Equal(t, 2, res) + + powers2_10 := fstream.Drop(powers2, 9) + res = UnsafeIO(t, fstream.IOStreamFlatMap(powers2_10, fstream.Head[int])) + assert.Equal(t, 1024, res) + + res = UnsafeIO(t, fstream.IOStreamFlatMap(fstream.Take(powers2, 10), fstream.Last[int])) + assert.Equal(t, 1024, res) +} + +func TestDrainAll(t *testing.T) { + results := []int{} + natsAppend := fstream.MapEval( + fstream.Take(fstream.Repeat(nats10.ToStream()), 10).ToStream(), + func(a int) io.IO[int] { + return io.Eval(func() (int, error) { + results = append(results, a) + return a, nil + }) + }) + _ = UnsafeIO(t, fstream.DrainAll(natsAppend)) + assert.ElementsMatch(t, results, nats10Values) +} + +func TestSum(t *testing.T) { + sumStream := fstream.Sum(nats10.ToStream()) + ioSum := fstream.IOHead(sumStream) + sum := UnsafeIO(t, ioSum) + assert.Equal(t, 55, sum) +} + +func TestStateFlatMap(t *testing.T) { + sumStream := fstream.Sum2(nats10.ToStream()) + ioSum := fstream.IOHead(sumStream) + sum := UnsafeIO(t, ioSum) + assert.Equal(t, 55, sum) +} + +func isEven(i int) bool { + return i%2 == 0 +} + +func plus(b int, a int) int { + return a + b +} diff --git a/fstream/lengthy.go b/fstream/lengthy.go new file mode 100644 index 0000000..014d5eb --- /dev/null +++ b/fstream/lengthy.go @@ -0,0 +1,91 @@ +package fstream + +import ( + "github.com/primetalk/goio/io" +) + +// Repeat appends the same stream infinitely. +func Repeat[A any](stm Stream[A]) Stream[A] { + return AndThenLazy(stm, IOStream[A](io.Pure(func() Stream[A] { return Repeat(stm) }))) +} + +// Take cuts the stream after n elements. +func Take[A any](stm Stream[A], n int) IOStream[A] { + if n <= 0 { + return IOStream[A](EmptyIO[A]()) + } else { + return IOStream[A](StreamMatch(stm, + func() io.IO[Stream[A]] { return io.IO[Stream[A]](EmptyIO[A]()) }, + func(a A, tail Stream[A]) io.IO[Stream[A]] { + return io.IO[Stream[A]](AndThenLazy(Lift(a), Take(tail, n-1)).ToIOStream()) + }, + func(tail Stream[A]) io.IO[Stream[A]] { + return io.IO[Stream[A]](Take(tail, n)) + }, + func(err error) io.IO[Stream[A]] { + return io.IO[Stream[A]](io.Fail[Stream[A]](err)) + }, + )) + } +} + +// Drop skips n elements in the stream. +func Drop[A any](stm Stream[A], n int) IOStream[A] { + if n <= 0 { + return stm.ToIOStream() + } else { + return IOStream[A](StreamMatch(stm, + func() io.IO[Stream[A]] { return io.IO[Stream[A]](EmptyIO[A]()) }, + func(a A, tail Stream[A]) io.IO[Stream[A]] { + return io.IO[Stream[A]](Drop(tail, n-1)) + }, + func(tail Stream[A]) io.IO[Stream[A]] { + return io.IO[Stream[A]](Drop(tail, n)) + }, + func(err error) io.IO[Stream[A]] { + return io.IO[Stream[A]](io.Fail[Stream[A]](err)) + }, + )) + } +} + +// TakeWhile returns the beginning of the stream such that all elements satisfy the predicate. +func TakeWhile[A any](stm Stream[A], predicate func(A) bool) IOStream[A] { + return IOStreamMatch(stm, + EmptyIO[A], + func(a A, tail Stream[A]) IOStream[A] { + if predicate(a) { + return AndThenLazy(Lift(a), TakeWhile(tail, predicate)).ToIOStream() + } else { + return EmptyIO[A]() + } + }, + func(tail Stream[A]) IOStream[A] { + return TakeWhile(tail, predicate) + }, + func(err error) IOStream[A] { + return IOStream[A](io.Fail[Stream[A]](err)) + }, + ) +} + +// DropWhile removes the beginning of the stream so that the new stream starts with an element +// that falsifies the predicate. +func DropWhile[A any](stm Stream[A], predicate func(A) bool) IOStream[A] { + return IOStreamMatch(stm, + EmptyIO[A], + func(a A, tail Stream[A]) IOStream[A] { + if predicate(a) { + return DropWhile(tail, predicate) + } else { + return AndThen(Lift(a), tail).ToIOStream() + } + }, + func(tail Stream[A]) IOStream[A] { + return DropWhile(tail, predicate) + }, + func(err error) IOStream[A] { + return IOStream[A](io.Fail[Stream[A]](err)) + }, + ) +} diff --git a/fstream/lengthy_test.go b/fstream/lengthy_test.go new file mode 100644 index 0000000..b2aa263 --- /dev/null +++ b/fstream/lengthy_test.go @@ -0,0 +1,20 @@ +package fstream_test + +import ( + "testing" + + "github.com/primetalk/goio/fstream" + "github.com/stretchr/testify/assert" +) + +func TestTakeWhile(t *testing.T) { + nats1112 := fstream.TakeWhile( + fstream.DropWhile( + nats, + func(i int) bool { return i < 10 }, + ).ToStream(), + func(i int) bool { return i < 12 }, + ) + res := UnsafeIOStreamToSlice(t, nats1112) + assert.ElementsMatch(t, res, []int{10, 11}) +}