Skip to content
This repository was archived by the owner on Jun 10, 2021. It is now read-only.

Commit c9efda1

Browse files
committed
Initial implementation
1 parent 93263a0 commit c9efda1

File tree

11 files changed

+185
-0
lines changed

11 files changed

+185
-0
lines changed

.gitignore

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,3 +15,4 @@ project/plugins/project/
1515
# Scala-IDE specific
1616
.scala_dependencies
1717
.worksheet
18+
.idea/

build.sbt

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,10 @@
1+
name := "asyncstreams"
2+
3+
version := "1.0"
4+
5+
scalaVersion := "2.11.8"
6+
7+
libraryDependencies ++= Seq(
8+
"org.scalaz" %% "scalaz-core" % "7.2.6",
9+
"org.scalatest" %% "scalatest" % "3.0.0" % Test
10+
)

copying.txt

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,13 @@
1+
DO WHAT THE FUCK YOU WANT TO PUBLIC LICENSE
2+
Version 2, December 2004
3+
4+
Copyright (C) 2016 Daniil Smirnov <danslapman@gmail.com>
5+
6+
Everyone is permitted to copy and distribute verbatim or modified
7+
copies of this license document, and changing it is allowed as long
8+
as the name is changed.
9+
10+
DO WHAT THE FUCK YOU WANT TO PUBLIC LICENSE
11+
TERMS AND CONDITIONS FOR COPYING, DISTRIBUTION AND MODIFICATION
12+
13+
0. You just DO WHAT THE FUCK YOU WANT TO.

project/build.properties

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
sbt.version = 0.13.12

project/plugins.sbt

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
logLevel := Level.Warn
Lines changed: 47 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,47 @@
1+
package asyncstreams
2+
3+
import scala.concurrent.{ExecutionContext, Future}
4+
import scalaz.std.scalaFuture._
5+
import scalaz.syntax.std.option._
6+
import scalaz.syntax.monad._
7+
import scalaz.OptionT.{optionT => opT}
8+
9+
case class AsyncStream[A](data: Future[Chunk[A, AsyncStream[A]]]) {
10+
import AsyncStream._
11+
12+
def foldLeft[B](start: B)(f: (B, A) => B)(implicit executor: ExecutionContext): Future[B] = {
13+
def impl(d: Future[Chunk[A, AsyncStream[A]]], acc: Future[B]): Future[B] =
14+
d.flatMap(chunk => chunk.map(p => impl(p.second.data, acc map (b => f(b, p.first)))).getOrElse(acc))
15+
16+
impl(data, Future(start))
17+
}
18+
19+
def toList(implicit executor: ExecutionContext): Future[List[A]] =
20+
foldLeft[List[A]](Nil)((list, el) => el :: list) map (_.reverse)
21+
22+
def takeWhile(p: A => Boolean)(implicit executor: ExecutionContext): AsyncStream[A] =
23+
new AsyncStream[A](data map {
24+
case None => None
25+
case Some(pair) if !p(pair.first) => None
26+
case Some(pair) => Some(Pair(pair.first, pair.second.takeWhile(p)))
27+
})
28+
29+
def take(n: Int)(implicit executor: ExecutionContext): AsyncStream[A] =
30+
if (n <= 0) nil
31+
else AsyncStream(opT(data).map(p => Pair(p.first, p.second.take(n - 1))).run)
32+
}
33+
34+
object AsyncStream {
35+
def nil[A](implicit executor: ExecutionContext): AsyncStream[A] = AsyncStream(None.point[Future])
36+
def single[A](item: A)(implicit executor: ExecutionContext): AsyncStream[A] =
37+
AsyncStream(Pair(item, nil[A]).some.point[Future])
38+
39+
def generate[S, A](start: S)(gen: S => Future[Option[(S, A)]])(implicit executor: ExecutionContext): AsyncStream[A] =
40+
AsyncStream(opT(gen(start)).map(p => Pair(p._2, generate(p._1)(gen))).run)
41+
42+
def concat[A](s1: AsyncStream[A], s2: AsyncStream[A])(implicit executor: ExecutionContext): AsyncStream[A] =
43+
new AsyncStream[A](s1.data.flatMap {
44+
case None => s2.data
45+
case Some(p) => Pair(p.first, concat(p.second, s2)).some.point[Future]
46+
})
47+
}
Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,27 @@
1+
package asyncstreams
2+
3+
import scala.concurrent.{ExecutionContext, Future}
4+
import scalaz.std.scalaFuture._
5+
import scalaz.syntax.std.option._
6+
import scalaz.syntax.monad._
7+
import scalaz.MonadPlus
8+
import scalaz.OptionT.{optionT => opT}
9+
10+
class AsyncStreamMonad(implicit executor: ExecutionContext) extends MonadPlus[AsyncStream] {
11+
import AsyncStream._
12+
13+
override def empty[A] = nil[A]
14+
15+
override def point[A](a: => A): AsyncStream[A] = single(a)
16+
17+
override def plus[A](a: AsyncStream[A], b: => AsyncStream[A]) = concat(a, b)
18+
19+
override def bind[A, B](ma: AsyncStream[A])(f: A => AsyncStream[B]): AsyncStream[B] = {
20+
val resData = opT(ma.data).flatMap(pair =>
21+
opT(f(pair.first).data).map(pair2 =>
22+
Pair(pair2.first, concat(pair2.second, bind(pair.second)(f)))
23+
)
24+
).run
25+
AsyncStream(resData)
26+
}
27+
}
Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,10 @@
1+
package asyncstreams
2+
3+
class Pair[A, B](fp: A, sp: => B) {
4+
val first = fp
5+
lazy val second = sp
6+
}
7+
8+
object Pair {
9+
def apply[A, B](first: A, second: => B) = new Pair[A, B](first, second)
10+
}
Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,10 @@
1+
import scala.concurrent.{ExecutionContext, Future}
2+
import scalaz.{Monad, StateT}
3+
4+
package object asyncstreams {
5+
type FState[S, A] = StateT[Future, S, A]
6+
type Chunk[A, B] = Option[Pair[A, B]]
7+
8+
implicit def asyncStreamInstance(implicit executor: ExecutionContext): Monad[AsyncStream] =
9+
new AsyncStreamMonad
10+
}
Lines changed: 60 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,60 @@
1+
package asyncstreams
2+
3+
import scala.concurrent.{Await, Future}
4+
import scala.concurrent.duration._
5+
import scala.concurrent.ExecutionContext.Implicits.global
6+
import asyncstreams.AsyncStream._
7+
8+
import scalaz.std.scalaFuture._
9+
import scalaz.syntax.std.option._
10+
import scalaz.syntax.monad._
11+
import scalaz.syntax.std.boolean._
12+
13+
14+
class AsyncStreamTests extends BaseSuite {
15+
private def makeStream(l: List[Int]) = generate(l)(l => l.nonEmpty.option((l.tail, l.head)).point[Future])
16+
17+
private def makeInfStream = generate(0)(v => Future((v + 1, v).some))
18+
19+
private def wait[T](f: Future[T]): T = Await.result(f, 10.seconds)
20+
21+
test("foldLeft") {
22+
val s2 = makeStream(2 :: 3 :: Nil)
23+
val f = s2.foldLeft(List[Int]())((list, el) => el :: list)
24+
wait(f) shouldBe List(3, 2)
25+
}
26+
27+
test("concat") {
28+
val s1 = makeStream(0 :: 1 :: Nil)
29+
val s2 = makeStream(2 :: 3 :: Nil)
30+
val f = concat(s1, s2)
31+
wait(f.toList) shouldBe List(0, 1, 2, 3)
32+
}
33+
34+
test("working as monad") {
35+
val s1 = makeStream(0 :: 1 :: Nil)
36+
val s2 = makeStream(2 :: 3 :: Nil)
37+
38+
val res = for {
39+
v1 <- s1
40+
v2 <- s2
41+
} yield v1 * v2
42+
43+
wait(res.toList) shouldBe List(0, 0, 2, 3)
44+
}
45+
46+
test("takeWhile") {
47+
val r = makeInfStream.takeWhile(_ < 4)
48+
wait(r.toList) shouldBe List(0, 1, 2, 3)
49+
}
50+
51+
test("take") {
52+
val r = makeInfStream.take(3)
53+
wait(r.toList) shouldBe List(0, 1, 2)
54+
}
55+
56+
test("folding large stream should not crash") {
57+
val r = makeInfStream.takeWhile(_ < 1000000)
58+
wait(r.toList) shouldBe (0 to 999999)
59+
}
60+
}

0 commit comments

Comments
 (0)