Skip to content
This repository was archived by the owner on Jun 10, 2021. It is now read-only.

Commit 624f5fd

Browse files
committed
Use null as an "EndOfStream" indicator, reimplement AsyncStream without monad transformers
1 parent 72d7e35 commit 624f5fd

File tree

4 files changed

+41
-34
lines changed

4 files changed

+41
-34
lines changed
Lines changed: 27 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -1,50 +1,61 @@
11
package asyncstreams
22

3+
import scala.annotation.unchecked.{uncheckedVariance => uV}
34
import scala.collection.generic.CanBuildFrom
4-
import scala.annotation.unchecked.{ uncheckedVariance => uV }
55
import scala.concurrent.{ExecutionContext, Future}
66
import scala.language.higherKinds
77
import scalaz.std.scalaFuture._
8-
import scalaz.syntax.std.option._
98
import scalaz.syntax.monad._
10-
import scalaz.OptionT.{optionT => opT}
119

12-
case class AsyncStream[A](data: Future[Chunk[A, AsyncStream[A]]]) {
10+
case class AsyncStream[A](data: Future[Pair[A, AsyncStream[A]]]) {
1311
import AsyncStream._
1412

1513
def foldLeft[B](start: B)(f: (B, A) => B)(implicit executor: ExecutionContext): Future[B] = {
16-
def impl(d: Future[Chunk[A, AsyncStream[A]]], acc: Future[B]): Future[B] =
17-
d.flatMap(chunk => chunk.map(p => impl(p.second.data, acc map (b => f(b, p.first)))).getOrElse(acc))
14+
def impl(d: Future[Pair[A, AsyncStream[A]]], acc: Future[B]): Future[B] =
15+
d.flatMap {
16+
case null => acc
17+
case pair => impl(pair.second.data, acc map (b => f(b, pair.first)))
18+
}
1819

1920
impl(data, Future(start))
2021
}
2122

2223
def to[Col[_]](implicit executor: ExecutionContext, cbf: CanBuildFrom[Nothing, A, Col[A @uV]]): Future[Col[A]] =
2324
foldLeft(cbf())((col, el) => col += el).map(_.result())
2425

26+
2527
def takeWhile(p: A => Boolean)(implicit executor: ExecutionContext): AsyncStream[A] =
2628
new AsyncStream[A](data map {
27-
case None => None
28-
case Some(pair) if !p(pair.first) => None
29-
case Some(pair) => Some(Pair(pair.first, pair.second.takeWhile(p)))
29+
case null => END
30+
case pair if !p(pair.first) => END
31+
case pair => Pair(pair.first, pair.second.takeWhile(p))
3032
})
3133

34+
3235
def take(n: Int)(implicit executor: ExecutionContext): AsyncStream[A] =
3336
if (n <= 0) nil
34-
else AsyncStream(opT(data).map(p => Pair(p.first, p.second.take(n - 1))).run)
37+
else AsyncStream(data.map {
38+
case null => END
39+
case p => Pair(p.first, p.second.take(n - 1))
40+
})
3541
}
3642

43+
3744
object AsyncStream {
38-
def nil[A](implicit executor: ExecutionContext): AsyncStream[A] = AsyncStream(None.point[Future])
45+
def nil[A](implicit executor: ExecutionContext): AsyncStream[A] = AsyncStream(ENDF)
3946
def single[A](item: A)(implicit executor: ExecutionContext): AsyncStream[A] =
40-
AsyncStream(Pair(item, nil[A]).some.point[Future])
47+
AsyncStream(Pair(item, nil[A]).point[Future])
4148

42-
def generate[S, A](start: S)(gen: S => Future[Option[(S, A)]])(implicit executor: ExecutionContext): AsyncStream[A] =
43-
AsyncStream(opT(gen(start)).map(p => Pair(p._2, generate(p._1)(gen))).run)
49+
def generate[S, A](start: S)(gen: S => Future[(A, S)])(implicit executor: ExecutionContext): AsyncStream[A] =
50+
AsyncStream(gen(start).map {
51+
case null => END
52+
case (el, rest) => Pair(el, generate(rest)(gen))
53+
})
4454

4555
def concat[A](s1: AsyncStream[A], s2: AsyncStream[A])(implicit executor: ExecutionContext): AsyncStream[A] =
4656
new AsyncStream[A](s1.data.flatMap {
47-
case None => s2.data
48-
case Some(p) => Pair(p.first, concat(p.second, s2)).some.point[Future]
57+
case null => s2.data
58+
case p => Pair(p.first, concat(p.second, s2)).point[Future]
4959
})
5060
}
61+
Lines changed: 9 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,7 @@
11
package asyncstreams
22

33
import scala.concurrent.{ExecutionContext, Future}
4-
import scalaz.std.scalaFuture._
5-
import scalaz.syntax.std.option._
6-
import scalaz.syntax.monad._
74
import scalaz.MonadPlus
8-
import scalaz.OptionT.{optionT => opT}
95

106
class AsyncStreamMonad(implicit executor: ExecutionContext) extends MonadPlus[AsyncStream] {
117
import AsyncStream._
@@ -16,12 +12,13 @@ class AsyncStreamMonad(implicit executor: ExecutionContext) extends MonadPlus[As
1612

1713
override def plus[A](a: AsyncStream[A], b: => AsyncStream[A]) = concat(a, b)
1814

19-
override def bind[A, B](ma: AsyncStream[A])(f: A => AsyncStream[B]): AsyncStream[B] = {
20-
val resData = opT(ma.data).flatMap(pair =>
21-
opT(f(pair.first).data).map(pair2 =>
22-
Pair(pair2.first, concat(pair2.second, bind(pair.second)(f)))
23-
)
24-
).run
25-
AsyncStream(resData)
26-
}
15+
override def bind[A, B](ma: AsyncStream[A])(f: A => AsyncStream[B]): AsyncStream[B] =
16+
AsyncStream(
17+
ma.data.flatMap {
18+
case null => Future(null)
19+
case pair => f(pair.first).data.map { pair2 =>
20+
Pair(pair2.first, concat(pair2.second, bind(pair.second)(f)))
21+
}
22+
}
23+
)
2724
}

src/main/scala/asyncstreams/package.scala

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,9 @@
11
import scala.concurrent.{ExecutionContext, Future}
2-
import scalaz.{Monad, StateT}
2+
import scalaz.Monad
33

44
package object asyncstreams {
5-
type FState[S, A] = StateT[Future, S, A]
6-
type Chunk[A, B] = Option[Pair[A, B]]
5+
final val END: Null = null
6+
final def ENDF(implicit executor: ExecutionContext): Future[Null] = Future(END)
77

88
implicit def asyncStreamInstance(implicit executor: ExecutionContext): Monad[AsyncStream] =
99
new AsyncStreamMonad

src/test/scala/asyncstreams/AsyncStreamTests.scala

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -6,15 +6,14 @@ import scala.concurrent.ExecutionContext.Implicits.global
66
import asyncstreams.AsyncStream._
77

88
import scalaz.std.scalaFuture._
9-
import scalaz.syntax.std.option._
109
import scalaz.syntax.monad._
1110
import scalaz.syntax.std.boolean._
1211

1312

1413
class AsyncStreamTests extends BaseSuite {
15-
private def makeStream(l: List[Int]) = generate(l)(l => l.nonEmpty.option((l.tail, l.head)).point[Future])
14+
private def makeStream(l: List[Int]) = generate(l)(l => ((l.nonEmpty)?(l.head, l.tail)|END).point[Future])
1615

17-
private def makeInfStream = generate(0)(v => Future((v + 1, v).some))
16+
private def makeInfStream = generate(0)(v => Future((v, v + 1)))
1817

1918
private def wait[T](f: Future[T]): T = Await.result(f, 10.seconds)
2019

0 commit comments

Comments
 (0)