11package asyncstreams
22
3+ import cats .kernel .Monoid
4+ import cats .{Alternative , Applicative , Monad }
5+ import cats .syntax .applicative ._
6+ import cats .syntax .flatMap ._
7+ import cats .syntax .functor ._
8+ import cats .syntax .semigroupk ._
9+
310import scala .annotation .unchecked .{uncheckedVariance => uV }
411import scala .collection .GenIterable
512import scala .collection .generic .CanBuildFrom
613import scala .language .higherKinds
7- import scalaz .syntax .monadPlus ._
8- import scalaz .{Monad , MonadPlus }
9-
10- class AsyncStream [F [+ _]: Monad , A ](val data : F [Step [A , AsyncStream [F , A ]]]) {
11- type SStep = Step [A , AsyncStream [F , A ]]
1214
13- def to [Col [_]](implicit cbf : CanBuildFrom [Nothing , A , Col [A @ uV]], methods : ASImpl [F ]): F [Col [A ]] =
14- methods.collectLeft(this )(cbf())((col, el) => col += el).map(_.result())
15+ class AsyncStream [F [+ _]: Monad , + A ](private [asyncstreams] val data : F [Step [A , AsyncStream [F , A ]]]) {
16+ def to [Col [+ _]](implicit cbf : CanBuildFrom [Nothing , A , Col [A @ uV]], impl : ASImpl [F ]): F [Col [A ]] =
17+ impl.collectLeft(this )(cbf())((col, el) => col += el).map(_.result())
1518
1619 def takeWhile (p : A => Boolean )(implicit impl : ASImpl [F ]): AsyncStream [F , A ] = impl.takeWhile(this )(p)
1720
18- def take (n : Int )(implicit smp : MonadPlus [AsyncStream [F , ? ]]): AsyncStream [F , A ] =
19- if (n <= 0 ) smp .empty
21+ def take (n : Int )(implicit alt : Alternative [AsyncStream [F , ? ]]): AsyncStream [F , A ] =
22+ if (n <= 0 ) alt .empty
2023 else AsyncStream {
2124 data.map(p => Step (p.value, p.rest.take(n - 1 )))
2225 }
2326
24- def foreach [U ](f : (A ) => U )(implicit methods : ASImpl [F ]): F [Unit ] =
25- methods.collectLeft(this )(())((_ : Unit , a : A ) => {f(a); ()})
27+ def drop (n : Int ): AsyncStream [F , A ] =
28+ if (n <= 0 ) this
29+ else AsyncStream {
30+ data.flatMap(p => p.rest.drop(n - 1 ).data)
31+ }
32+
33+ def foreach [U ](f : (A ) => U )(implicit impl : ASImpl [F ]): F [Unit ] =
34+ impl.collectLeft(this )(())((_ : Unit , a : A ) => {f(a); ()})
2635
2736 def foreachF [U ](f : (A ) => F [U ])(implicit impl : ASImpl [F ]): F [Unit ] =
28- impl.collectLeft(this )(().point [F ])((fu : F [Unit ], a : A ) => fu.flatMap(_ => f(a)).map(_ => ())).flatMap(identity)
37+ impl.collectLeft(this )(().pure [F ])((fu : F [Unit ], a : A ) => fu.flatMap(_ => f(a)).map(_ => ())).flatMap(identity)
2938
30- def flatten [B ](implicit asIterable : A => GenIterable [B ], smp : MonadPlus [AsyncStream [F , ? ]], impl : ASImpl [F ]): AsyncStream [F , B ] = {
31- def streamChunk (step : AsyncStream [F , A ]# SStep ): AsyncStream [F , B ] =
39+ def flatten [B ](implicit asIterable : A => GenIterable [B ], alt : Alternative [AsyncStream [F , ? ]], impl : ASImpl [F ]): AsyncStream [F , B ] = {
40+ def streamChunk (step : Step [ A , AsyncStream [F , A ]] ): AsyncStream [F , B ] =
3241 impl.fromIterable(asIterable(step.value).seq) <+> step.rest.flatten
3342
3443 AsyncStream (data.flatMap(step => streamChunk(step).data))
3544 }
3645
3746 def isEmpty (implicit impl : ASImpl [F ]): F [Boolean ] = impl.isEmpty(this )
3847 def nonEmpty (implicit impl : ASImpl [F ]): F [Boolean ] = impl.isEmpty(this ).map(! _)
48+
49+ def map [B ](f : A => B ): AsyncStream [F , B ] = AsyncStream {
50+ data.map(s => Step (f(s.value), s.rest.map(f)))
51+ }
52+
53+ def mapF [B ](f : A => F [B ]): AsyncStream [F , B ] = AsyncStream {
54+ data.flatMap(s => f(s.value).map(nv => Step (nv, s.rest.mapF(f))))
55+ }
56+
57+ def flatMap [B ](f : A => AsyncStream [F , B ])(implicit alt : Alternative [AsyncStream [F , ? ]]): AsyncStream [F , B ] = AsyncStream {
58+ data.flatMap(s => (f(s.value) <+> s.rest.flatMap(f)).data)
59+ }
60+
61+ def filter (p : A => Boolean ): AsyncStream [F , A ] = AsyncStream {
62+ data.flatMap { s =>
63+ if (p(s.value)) Step (s.value, s.rest.filter(p)).pure[F ]
64+ else s.rest.filter(p).data
65+ }
66+ }
67+
68+ def withFilter (p : A => Boolean ): AsyncStream [F , A ] = filter(p)
69+
70+ def find (p : A => Boolean )(implicit impl : ASImpl [F ]): F [Option [A ]] = impl.find(this , p)
71+ def findF (p : A => F [Boolean ])(implicit impl : ASImpl [F ]): F [Option [A ]] = impl.findF(this , p)
72+
73+ def partition (p : A => Boolean ): (AsyncStream [F , A ], AsyncStream [F , A ]) = (filter(p), filter(p.andThen(! _)))
74+
75+ def foldMap [B ](f : A => B )(implicit impl : ASImpl [F ], mb : Monoid [B ]): F [B ] = {
76+ impl.collectLeft(this )(mb.empty)((b, a) => mb.combine(b, f(a)))
77+ }
78+
79+ def zip [B ](sb : AsyncStream [F , B ]): AsyncStream [F , (A , B )] = AsyncStream {
80+ for {
81+ stepA <- data
82+ stepB <- sb.data
83+ } yield Step ((stepA.value, stepB.value), stepA.rest zip stepB.rest)
84+ }
85+
86+ def zipWithIndex (implicit app : Applicative [AsyncStream [F , ? ]]): AsyncStream [F , (A , Int )] =
87+ zip(AsyncStream .unfold(0 )(_ + 1 ))
88+
89+ def foldLeft [B ](init : B )(fold : (B , A ) => B )(implicit impl : ASImpl [F ]): F [B ] =
90+ impl.collectLeft(this )(init)(fold)
3991}
4092
4193object AsyncStream {
42- def apply [F [+ _]: Monad , A ](data : => F [Step [A , AsyncStream [F , A ]]]): AsyncStream [F , A ] = new AsyncStream (data)
94+ private [asyncstreams] def apply [F [+ _]: Monad , A ](data : => F [Step [A , AsyncStream [F , A ]]]): AsyncStream [F , A ] = new AsyncStream (data)
4395 def asyncNil [F [+ _]: Monad , A ](implicit impl : ASImpl [F ]): AsyncStream [F , A ] = impl.empty
4496
45- private [asyncstreams] def generate [F [+ _]: Monad , S , A ](start : S )(gen : S => F [(S , A )])(implicit smp : MonadPlus [AsyncStream [F , ? ]]): AsyncStream [F , A ] = AsyncStream {
97+ private [asyncstreams] def generate [F [+ _]: Monad , S , A ](start : S )(gen : S => F [(S , A )])(implicit app : Applicative [AsyncStream [F , ? ]]): AsyncStream [F , A ] = AsyncStream {
4698 gen(start).map((stateEl : (S , A )) => Step (stateEl._2, generate(stateEl._1)(gen)))
4799 }
48100
49- def unfold [F [+ _]: Monad , T ](start : T )(makeNext : T => T )(implicit smp : MonadPlus [AsyncStream [F , ? ]]): AsyncStream [F , T ] =
50- generate(start)(s => (makeNext(s), s).point[F ])
101+ def unfold [F [+ _]: Monad , T ](start : T )(makeNext : T => T )(implicit app : Applicative [AsyncStream [F , ? ]]): AsyncStream [F , T ] =
102+ generate(start)(s => (makeNext(s), s).pure[F ])
103+
104+ def unfoldM [F [+ _]: Monad , T ](start : T )(makeNext : T => F [T ])(implicit app : Applicative [AsyncStream [F , ? ]]): AsyncStream [F , T ] =
105+ generate(start)(s => makeNext(s).map(n => (n, s)))
51106
52- implicit class AsyncStreamOps [F [+ _]: Monad , A ]( stream : => AsyncStream [F , A ]) {
53- def ~:: ( el : A ) = AsyncStream ( Step (el, stream).point[ F ] )
107+ def unfoldMM [F [+ _]: Monad , T ]( start : F [ T ])( makeNext : T => F [ T ])( implicit app : Applicative [ AsyncStream [F , ? ]]) : AsyncStream [ F , T ] = AsyncStream {
108+ start.flatMap(initial => generate(initial)(s => makeNext(s).map(n => (n, s))).data )
54109 }
55110}
0 commit comments