Skip to content
This repository was archived by the owner on Jun 10, 2021. It is now read-only.

Commit 9df9e70

Browse files
committed
Add AsyncStreamMonadFunctions
1 parent dbe2aac commit 9df9e70

File tree

5 files changed

+42
-13
lines changed

5 files changed

+42
-13
lines changed

src/main/scala/asyncstreams/AsyncStream.scala

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@ case class AsyncStream[A](data: Future[Pair[A, AsyncStream[A]]]) {
1111
def foldLeft[B](start: B)(f: (B, A) => B)(implicit executor: ExecutionContext): Future[B] = {
1212
def impl(d: Future[Pair[A, AsyncStream[A]]], acc: Future[B]): Future[B] =
1313
d.flatMap {
14-
case null => acc
14+
case END => acc
1515
case pair => impl(pair.second.data, acc map (b => f(b, pair.first)))
1616
}
1717

@@ -24,7 +24,7 @@ case class AsyncStream[A](data: Future[Pair[A, AsyncStream[A]]]) {
2424

2525
def takeWhile(p: A => Boolean)(implicit executor: ExecutionContext): AsyncStream[A] =
2626
new AsyncStream[A](data map {
27-
case null => END
27+
case END => END
2828
case pair if !p(pair.first) => END
2929
case pair => Pair(pair.first, pair.second.takeWhile(p))
3030
})
@@ -33,7 +33,7 @@ case class AsyncStream[A](data: Future[Pair[A, AsyncStream[A]]]) {
3333
def take(n: Int)(implicit executor: ExecutionContext): AsyncStream[A] =
3434
if (n <= 0) nil
3535
else AsyncStream(data.map {
36-
case null => END
36+
case END => END
3737
case p => Pair(p.first, p.second.take(n - 1))
3838
})
3939
}
@@ -46,13 +46,13 @@ object AsyncStream {
4646

4747
def generate[S, A](start: S)(gen: S => Future[(A, S)])(implicit executor: ExecutionContext): AsyncStream[A] =
4848
AsyncStream(gen(start).map {
49-
case null => END
49+
case END => END
5050
case (el, rest) => Pair(el, generate(rest)(gen))
5151
})
5252

5353
def concat[A](s1: AsyncStream[A], s2: AsyncStream[A])(implicit executor: ExecutionContext): AsyncStream[A] =
5454
new AsyncStream[A](s1.data.flatMap {
55-
case null => s2.data
55+
case END => s2.data
5656
case p => Future(Pair(p.first, concat(p.second, s2)))
5757
})
5858
}

src/main/scala/asyncstreams/AsyncStreamMonad.scala

Lines changed: 28 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,10 +15,37 @@ class AsyncStreamMonad(implicit executor: ExecutionContext) extends MonadPlus[As
1515
override def bind[A, B](ma: AsyncStream[A])(f: A => AsyncStream[B]): AsyncStream[B] =
1616
AsyncStream(
1717
ma.data.flatMap {
18-
case null => Future(null)
18+
case END => ENDF
1919
case pair => f(pair.first).data.map { pair2 =>
2020
Pair(pair2.first, concat(pair2.second, bind(pair.second)(f)))
2121
}
2222
}
2323
)
2424
}
25+
26+
trait AsyncStreamMonadFunctions {
27+
def foreach[A, S](stream: AsyncStream[A])(f: A => FState[S, Any])
28+
(implicit ex: ExecutionContext): FState[S, Unit] =
29+
FState(s => {
30+
stream.foldLeft(Future(s))((futureS, a) => futureS.flatMap(s2 => f(a)(s2).map(_._2)))
31+
.flatMap(f => f).map(((), _))
32+
})
33+
34+
def isEmpty[A, S](stream: AsyncStream[A])(implicit ex: ExecutionContext): FState[S, Boolean] =
35+
FState(s => stream.data.map(pair => (pair eq END, s)))
36+
37+
def isEmpty[A, S : FStateMonad](f: S => AsyncStream[A])(implicit fsm: FStateMonad[S], ex: ExecutionContext): FState[S, Boolean] =
38+
fsm.fconds((s: S) => isEmpty(f(s)))
39+
40+
def notEmpty[A, S](stream: AsyncStream[A])(implicit ex: ExecutionContext): FState[S, Boolean] =
41+
FState(s => stream.data map (pair => (!(pair eq END), s)))
42+
43+
def notEmpty[A, S](f: S => AsyncStream[A])(implicit fsm: FStateMonad[S], ex: ExecutionContext): FState[S, Boolean] =
44+
fsm.fconds(s => notEmpty(f(s)))
45+
46+
def get[A, S](stream: AsyncStream[A])(implicit ex: ExecutionContext): FState[S, (A, AsyncStream[A])] =
47+
FState(s => stream.data.map(pair => ((pair.first, pair.second), s)))
48+
49+
def generate[S,A](start: S)(gen: FState[S, A])(implicit ex: ExecutionContext) =
50+
AsyncStream.generate(start)(gen.func)
51+
}

src/main/scala/asyncstreams/FState.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@ package asyncstreams
22

33
import scala.concurrent.{ExecutionContext, Future}
44

5-
class FState[S, A](func: S => Future[(A, S)]) extends ((S) => Future[(A, S)]) {
5+
class FState[S, A](val func: S => Future[(A, S)]) extends ((S) => Future[(A, S)]) {
66
def apply(s: S) = func(s)
77

88
def flatMap[B](f: A => FState[S, B])(implicit ex: ExecutionContext): FState[S, B] = FState[S, B](

src/main/scala/asyncstreams/FStateMonad.scala

Lines changed: 7 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@ class FStateMonad[S](implicit ex: ExecutionContext)
1313

1414
override def bind[A, B](m: F[A])(f: A => F[B]): F[B] =
1515
FState((s: S) => m(s) flatMap {
16-
case null => ENDF
16+
case END => ENDF
1717
case (fst, snd) => f(fst)(snd)
1818
})
1919

@@ -31,12 +31,14 @@ trait FStateMonadFunctions {
3131
def gets[S](implicit ex: ExecutionContext): FState[S, S] = FState((s: S) => Future((s, s)))
3232
def puts[S](news: S)(implicit ex: ExecutionContext): FState[S, S] = FState((_: S) => Future((news, news)))
3333

34-
def conds[S](f: S => Boolean)(implicit m: FStateMonad[S]): FState[S, Boolean] =
34+
def conds[S](f: S => Boolean)(implicit m: FStateMonad[S]): FStateMonad[S]#F[Boolean] =
3535
m.conds(f)
3636

37-
def fconds[S](f: S => FState[S, Boolean])(implicit m: FStateMonad[S]): FState[S, Boolean] =
37+
/*
38+
def fconds[S](f: S => FState[S, Boolean])(implicit m: FStateMonad[S]): FStateMonad[S]#F[Boolean] =
3839
m.fconds(f)
40+
*/
3941

40-
def mods[S : FStateMonad](f: S => S)(implicit m: FStateMonad[S]): FState[S, S] =
42+
def mods[S : FStateMonad](f: S => S)(implicit m: FStateMonad[S]): FStateMonad[S]#F[S] =
4143
m.mods(f)
42-
}
44+
}

src/main/scala/asyncstreams/package.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,5 +10,5 @@ package object asyncstreams {
1010

1111
def fStateInstance[S](implicit executor: ExecutionContext) = new FStateMonad[S]
1212

13-
object monadops extends FStateMonadFunctions
13+
object monadops extends FStateMonadFunctions with AsyncStreamMonadFunctions
1414
}

0 commit comments

Comments
 (0)