Skip to content
This repository was archived by the owner on Jun 10, 2021. It is now read-only.

Commit 2a2cf9e

Browse files
committed
Add & test monadic FState-based AsyncStream generators. Refactor few methods and fix flatMap in FState
1 parent 9df9e70 commit 2a2cf9e

File tree

5 files changed

+116
-34
lines changed

5 files changed

+116
-34
lines changed

src/main/scala/asyncstreams/AsyncStreamMonad.scala

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@ class AsyncStreamMonad(implicit executor: ExecutionContext) extends MonadPlus[As
2424
}
2525

2626
trait AsyncStreamMonadFunctions {
27-
def foreach[A, S](stream: AsyncStream[A])(f: A => FState[S, Any])
27+
def foreach[A, S](stream: AsyncStream[A])(f: A => FState[S, _])
2828
(implicit ex: ExecutionContext): FState[S, Unit] =
2929
FState(s => {
3030
stream.foldLeft(Future(s))((futureS, a) => futureS.flatMap(s2 => f(a)(s2).map(_._2)))
@@ -35,17 +35,17 @@ trait AsyncStreamMonadFunctions {
3535
FState(s => stream.data.map(pair => (pair eq END, s)))
3636

3737
def isEmpty[A, S : FStateMonad](f: S => AsyncStream[A])(implicit fsm: FStateMonad[S], ex: ExecutionContext): FState[S, Boolean] =
38-
fsm.fconds((s: S) => isEmpty(f(s)))
38+
fsm.fcondS((s: S) => isEmpty(f(s)))
3939

4040
def notEmpty[A, S](stream: AsyncStream[A])(implicit ex: ExecutionContext): FState[S, Boolean] =
4141
FState(s => stream.data map (pair => (!(pair eq END), s)))
4242

4343
def notEmpty[A, S](f: S => AsyncStream[A])(implicit fsm: FStateMonad[S], ex: ExecutionContext): FState[S, Boolean] =
44-
fsm.fconds(s => notEmpty(f(s)))
44+
fsm.fcondS(s => notEmpty(f(s)))
4545

4646
def get[A, S](stream: AsyncStream[A])(implicit ex: ExecutionContext): FState[S, (A, AsyncStream[A])] =
4747
FState(s => stream.data.map(pair => ((pair.first, pair.second), s)))
4848

49-
def generate[S,A](start: S)(gen: FState[S, A])(implicit ex: ExecutionContext) =
49+
def generateS[S,A](start: S)(gen: FState[S, A])(implicit ex: ExecutionContext) =
5050
AsyncStream.generate(start)(gen.func)
5151
}

src/main/scala/asyncstreams/FState.scala

Lines changed: 19 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2,18 +2,35 @@ package asyncstreams
22

33
import scala.concurrent.{ExecutionContext, Future}
44

5-
class FState[S, A](val func: S => Future[(A, S)]) extends ((S) => Future[(A, S)]) {
5+
class FState[S, A](val func: S => Future[(A, S)]) {
6+
import FState._
7+
68
def apply(s: S) = func(s)
79

810
def flatMap[B](f: A => FState[S, B])(implicit ex: ExecutionContext): FState[S, B] = FState[S, B](
9-
(s: S) => func(s) flatMap ((fst: A, snd: S) => f(fst)(snd)).tupled
11+
(s: S) => func(s).flatMap {
12+
case END => ENDF
13+
case (fst, snd) => f(fst)(snd)
14+
}
1015
)
1116

1217
def map[B](f: A => B)(implicit ex: ExecutionContext): FState[S, B] =
1318
flatMap((a: A) => FState.unit(f(a)))
19+
20+
def bind[B](f: A => FState[S, B])(implicit ex: ExecutionContext): FState[S, B] =
21+
FState((s: S) => func(s) flatMap {
22+
case END => ENDF
23+
case (fst, snd) => f(fst)(snd)
24+
})
25+
26+
def filter(p: A => Boolean)(implicit executor: ExecutionContext): FState[S, A] =
27+
bind(a => if (p(a)) unit(a) else empty[S, A])
28+
29+
def withFilter(p: A => Boolean)(implicit executor: ExecutionContext): FState[S, A] = filter(p)
1430
}
1531

1632
object FState {
1733
def apply[S, A](f: S => Future[(A, S)]) = new FState[S, A](f)
1834
def unit[S, A](a: => A)(implicit ex: ExecutionContext) = FState[S, A]((s: S) => Future((a, s)))
35+
def empty[S, A](implicit ex: ExecutionContext) = FState[S, A]((s: S) => ENDF)
1936
}

src/main/scala/asyncstreams/FStateMonad.scala

Lines changed: 12 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -7,38 +7,30 @@ class FStateMonad[S](implicit ex: ExecutionContext)
77
extends MonadPlus[({ type f[X] = FState[S, X]})#f] with FStateMonadFunctions {
88
type F[X] = FState[S, X]
99

10-
override def empty[A]: F[A] = FState((s: S) => ENDF)
10+
override def empty[A]: F[A] = FState.empty[S, A]
1111

1212
override def point[A](a: => A): F[A] = FState.unit(a)
1313

14-
override def bind[A, B](m: F[A])(f: A => F[B]): F[B] =
15-
FState((s: S) => m(s) flatMap {
16-
case END => ENDF
17-
case (fst, snd) => f(fst)(snd)
18-
})
14+
override def bind[A, B](m: F[A])(f: A => F[B]): F[B] = m.bind(f)
1915

2016
override def plus[A](a: F[A],b: => F[A]): F[A] = bind(a)(_ => b)
2117

22-
def conds(f: S => Boolean): F[Boolean] = bind(gets[S])(vs => point(f(vs)))
23-
def fconds(f: S => F[Boolean]): F[Boolean] = bind(gets[S])(f)
24-
def mods(f: S => S): F[S] = bind(gets[S])(vs => puts(f(vs)))
18+
def condS(f: S => Boolean): F[Boolean] = bind(getS[S])(vs => point(f(vs)))
19+
def fcondS(f: S => F[Boolean]): F[Boolean] = bind(getS[S])(f)
20+
def modS(f: S => S): F[S] = bind(getS[S])(vs => putS(f(vs)))
2521

2622
def forM_[A](cond: S => Boolean, mod: S => S)(action: => F[A]): F[Unit] =
27-
whileM_(conds(cond), bind(action)(va => mods(mod)))
23+
whileM_(condS(cond), bind(action)(va => modS(mod)))
2824
}
2925

3026
trait FStateMonadFunctions {
31-
def gets[S](implicit ex: ExecutionContext): FState[S, S] = FState((s: S) => Future((s, s)))
32-
def puts[S](news: S)(implicit ex: ExecutionContext): FState[S, S] = FState((_: S) => Future((news, news)))
27+
def getS[S](implicit ex: ExecutionContext): FState[S, S] = FState((s: S) => Future((s, s)))
3328

34-
def conds[S](f: S => Boolean)(implicit m: FStateMonad[S]): FStateMonad[S]#F[Boolean] =
35-
m.conds(f)
29+
def putS[S](news: S)(implicit ex: ExecutionContext): FState[S, S] = FState((_: S) => Future((news, news)))
3630

37-
/*
38-
def fconds[S](f: S => FState[S, Boolean])(implicit m: FStateMonad[S]): FStateMonad[S]#F[Boolean] =
39-
m.fconds(f)
40-
*/
31+
def condS[S](f: S => Boolean)(implicit m: FStateMonad[S]): FStateMonad[S]#F[Boolean] = m.condS(f)
4132

42-
def mods[S : FStateMonad](f: S => S)(implicit m: FStateMonad[S]): FStateMonad[S]#F[S] =
43-
m.mods(f)
33+
def fconds[S](f: S => FState[S, Boolean])(implicit m: FStateMonad[S]): FStateMonad[S]#F[Boolean] = m.fconds(f)
34+
35+
def modS[S](f: S => S)(implicit m: FStateMonad[S]): FStateMonad[S]#F[S] = m.modS(f)
4436
}
Lines changed: 72 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,72 @@
1+
package asyncstreams
2+
3+
import monadops._
4+
import AsyncStream._
5+
6+
import scala.concurrent.duration._
7+
import scala.concurrent.{Await, Future}
8+
import scala.concurrent.ExecutionContext.Implicits.global
9+
10+
class AsyncStreamMonadicOperationsTests extends BaseSuite {
11+
def makeStream(l: List[Int]) = generate(l)(l => if (l.isEmpty) ENDF else Future((l.head, l.tail)))
12+
13+
private def wait[T](f: Future[T]): T = Await.result(f, 10.seconds)
14+
15+
test("foreach") {
16+
implicit val fsm = fStateInstance[Int]
17+
18+
val fstate = for {
19+
_ <- foreach(makeStream(0 :: 1 :: 2 :: Nil)) {
20+
v => modS[Int](_ + 1)
21+
}
22+
v2 <- getS[Int]
23+
} yield v2
24+
25+
wait(fstate(0)) shouldBe (3, 3)
26+
}
27+
28+
test("get, isEmpty") {
29+
case class State(counter: Int, stream: AsyncStream[Int])
30+
implicit val fsm = fStateInstance[State]
31+
32+
val stream = makeStream(0 :: 1 :: 2 :: 3 :: Nil)
33+
34+
val fstate = for {
35+
_ <- fsm.whileM_(notEmpty(_.stream), for {
36+
s <- getS[State]
37+
(el, newStream) <- get[Int, State](s.stream)
38+
_ <- putS[State](State(s.counter + el, newStream))
39+
} yield ())
40+
v <- getS[State]
41+
} yield v.counter
42+
43+
wait(fstate(State(0, stream)))._1 shouldBe 6
44+
}
45+
46+
test("FState as generator") {
47+
implicit val fsm = fStateInstance[Int]
48+
49+
val stream = generateS(0) {
50+
for {
51+
s <- getS[Int]
52+
_ <- putS[Int](s + 1)
53+
} yield s
54+
} take 3
55+
56+
wait(stream.to[List]) shouldBe (0 :: 1 :: 2 :: Nil)
57+
}
58+
59+
test("Generate finite stream") {
60+
implicit val fsm = fStateInstance[Int]
61+
62+
val stream = generateS(0) {
63+
for {
64+
s <- getS[Int]
65+
if s < 3
66+
_ <- putS(s + 1)
67+
} yield s
68+
}
69+
70+
wait(stream.to[List]) shouldBe (0 :: 1 :: 2 :: Nil)
71+
}
72+
}

src/test/scala/asyncstreams/FStateTests.scala

Lines changed: 9 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -24,22 +24,23 @@ class FStateTests extends BaseSuite {
2424
val fsm = fStateInstance[Int]
2525

2626
val t = for {
27-
_ <- fsm.whileM_(gets[Int] map (_ < 10), for {
28-
i <- gets[Int]
29-
_ <- puts(i + 1)
30-
} yield (()))
31-
v1 <- gets[Int]
27+
_ <- fsm.whileM_(getS[Int] map (_ < 10), for {
28+
i <- getS[Int]
29+
_ <- putS(i + 1)
30+
} yield ())
31+
v1 <- getS[Int]
3232
} yield v1
3333

3434
wait(t(0)) shouldBe (10, 10)
3535
}
3636

3737
test("conds & mods") {
3838
implicit val fsm = fStateInstance[Int]
39+
import fsm.whileM_
3940

4041
val t = for {
41-
_ <- fsm.whileM_(conds(_ < 10), mods(_ + 1))
42-
v1 <- gets[Int]
42+
_ <- whileM_(condS(_ < 10), modS(_ + 1))
43+
v1 <- getS[Int]
4344
} yield v1
4445

4546
wait(t(0)) shouldBe (10, 10)
@@ -52,7 +53,7 @@ class FStateTests extends BaseSuite {
5253
_ <- fsm.forM_(_ < 10, _ + 1) {
5354
fsm.point("AAAAAA")
5455
}
55-
v1 <- gets[Int]
56+
v1 <- getS[Int]
5657
} yield v1
5758

5859
wait(t(0)) shouldBe (10, 10)

0 commit comments

Comments
 (0)