Skip to content
This repository was archived by the owner on Jun 10, 2021. It is now read-only.

Commit bfd8814

Browse files
committed
Test asyncstreams to work with twitter futures
1 parent 6742439 commit bfd8814

File tree

6 files changed

+104
-9
lines changed

6 files changed

+104
-9
lines changed

build.sbt

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -16,8 +16,8 @@ libraryDependencies ++= Seq(
1616
"org.scalaz" %% "scalaz-core" % "7.2.15",
1717
"io.monix" %% "monix-eval" % versions("monix") % Test,
1818
"io.monix" %% "monix-scalaz-72" % versions("monix") % Test,
19-
//"com.twitter" %% "util-core" % "6.43.0" % Test,
20-
//"io.catbird" %% "catbird-util" % "0.14.0" % Test, //cats instances for util-core
21-
//"com.codecommit" %% "shims-core" % "1.0-b0e5152" % Test,
19+
"com.twitter" %% "util-core" % "6.45.0" % Test,
20+
"io.catbird" %% "catbird-util" % "0.15.0" % Test,
21+
"me.jeffshaw.harmony" %% "harmony_cats0-9-0_scalaz7-2" % "1.1" % Test,
2222
"org.scalatest" %% "scalatest" % "3.0.3" % Test
2323
)

readme.md

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,6 @@
11
asyncstreams [![Release](https://jitpack.io/v/danslapman/asyncstreams.svg)](https://jitpack.io/#danslapman/asyncstreams)
22
=========
33

4-
**Note: 0.4 release is outdated, use master-SNAPSHOT for now**
5-
64
asyncstreams is a monadic asynchronous stream library. It allows you to write stateful asynchronous algorithms
75
that emits elements into a stream:
86

@@ -22,6 +20,7 @@ See more examples in tests.
2220

2321
asyncstreams is tested to work with:
2422
- standard scala futures
23+
- twitter futures
2524
- monix tasks
2625

2726
asyncstreams is available via jitpack:

src/test/scala/asyncstreams/monixTask/AsyncStreamTests.scala renamed to src/test/scala/asyncstreams/monixTask/AsyncStreamTestsWithMonixTask.scala

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -13,14 +13,12 @@ import scala.concurrent.Await
1313
import scala.concurrent.duration._
1414
import scalaz.syntax.monadPlus._
1515

16-
class AsyncStreamTests extends FunSuite with Matchers {
16+
class AsyncStreamTestsWithMonixTask extends FunSuite with Matchers {
1717
import TaskZeroError.ze
1818
private implicit val scheduler = Scheduler.fixedPool("monix", 4)
1919
private def makeInfStream = AsyncStream.unfold[Task, Int](0)(_ + 1)
2020
private def wait[T](f: Task[T], d: FiniteDuration = 5.seconds): T = Await.result(f.runAsync, d)
2121

22-
Task.never
23-
2422
test("composition operator") {
2523
val s = 1 ~:: 2 ~:: 3 ~:: AsyncStream.asyncNil[Task, Int]
2624
wait(s.to[List]) shouldBe List(1, 2, 3)

src/test/scala/asyncstreams/stdFuture/AsyncStreamTests.scala renamed to src/test/scala/asyncstreams/stdFuture/AsyncStreamTestsWithStandardFuture.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@ import scala.concurrent.{Await, ExecutionContext, Future}
1414
import scalaz.std.scalaFuture._
1515
import scalaz.syntax.monadPlus._
1616

17-
class AsyncStreamTests extends FunSuite with Matchers {
17+
class AsyncStreamTestsWithStandardFuture extends FunSuite with Matchers {
1818
private implicit val executor = ExecutionContext.fromExecutor(Executors.newFixedThreadPool(4))
1919

2020
private def makeInfStream = AsyncStream.unfold(0)(_ + 1)
Lines changed: 86 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,86 @@
1+
package asyncstreams.twitterFuture
2+
3+
import asyncstreams.Utils._
4+
import asyncstreams.{ASImpl, AsyncStream}
5+
import asyncstreams.Implicits.MonadErrorInstances._
6+
import com.twitter.util.{Await, Future}
7+
import io.catbird.util.FutureInstances
8+
import org.scalatest.{FunSuite, Matchers}
9+
import harmony.toscalaz.typeclass.MonadErrorConverter._
10+
11+
import scala.collection.mutable.ArrayBuffer
12+
import scalaz.syntax.monadPlus._
13+
14+
class AsyncStreamTestsWithTwitterFuture extends FunSuite with Matchers with FutureInstances {
15+
import TwFutureZeroError.ze
16+
17+
private def wait[T](f: Future[T]): T = Await.result(f)
18+
private def makeInfStream = AsyncStream.unfold(0)(_ + 1)
19+
20+
test("composition operator") {
21+
val s = 1 ~:: 2 ~:: 3 ~:: AsyncStream.asyncNil[Future, Int]
22+
wait(s.to[List]) shouldBe List(1, 2, 3)
23+
}
24+
25+
test("foldLeft") {
26+
val s2 = List(2, 3).toAS[Future]
27+
val f = implicitly[ASImpl[Future]].collectLeft(s2)(List[Int]())((list, el) => el :: list)
28+
wait(f) shouldBe List(3, 2)
29+
}
30+
31+
test("concatenation") {
32+
val s1 = List(0, 1).toAS
33+
val s2 = List(2, 3).toAS
34+
val f = s1 <+> s2
35+
wait(f.to[List]) shouldBe List(0, 1, 2, 3)
36+
}
37+
38+
test("working as monad") {
39+
val s1 = List(0, 1).toAS
40+
val s2 = List(2, 3).toAS
41+
42+
val res = for {
43+
v1 <- s1
44+
v2 <- s2
45+
} yield v1 * v2
46+
47+
wait(res.to[List]) shouldBe List(0, 0, 2, 3)
48+
}
49+
50+
test("takeWhile") {
51+
val r = makeInfStream.takeWhile(_ < 4)
52+
wait(r.to[List]) shouldBe List(0, 1, 2, 3)
53+
}
54+
55+
test("take") {
56+
val r = makeInfStream.take(3)
57+
wait(r.to[List]) shouldBe List(0, 1, 2)
58+
}
59+
60+
test("folding large stream should not crash") {
61+
val r = makeInfStream.takeWhile(_ < 1000000)
62+
wait(r.to[List]) shouldBe (0 to 999999)
63+
}
64+
65+
test("foreach") {
66+
val stream = makeInfStream.take(10)
67+
val buffer = ArrayBuffer[Int]()
68+
val task = stream.foreach(i => buffer += i)
69+
Await.ready(task)
70+
buffer.to[List] shouldBe 0 :: 1 :: 2 :: 3 :: 4 :: 5 :: 6 :: 7 :: 8 :: 9 :: Nil
71+
}
72+
73+
test("foreachF") {
74+
val stream = makeInfStream.take(10)
75+
val buffer = ArrayBuffer[Int]()
76+
val task = stream.foreachF(i => Future(buffer += i))
77+
Await.ready(task)
78+
buffer.to[List] shouldBe 0 :: 1 :: 2 :: 3 :: 4 :: 5 :: 6 :: 7 :: 8 :: 9 :: Nil
79+
}
80+
81+
test("flatten") {
82+
val stream = Vector.range(0, 1000000).grouped(10).to[Vector].toAS
83+
val flatStream = stream.flatten
84+
wait(flatStream.to[Vector]) shouldBe Vector.range(0, 1000000)
85+
}
86+
}
Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,12 @@
1+
package asyncstreams.twitterFuture
2+
3+
import asyncstreams.ZeroError
4+
import com.twitter.util.Future
5+
import io.catbird.util.FutureInstances
6+
import harmony.toscalaz.typeclass.MonadErrorConverter._
7+
8+
object TwFutureZeroError extends FutureInstances {
9+
implicit val ze = new ZeroError[Throwable, Future] {
10+
override val zeroElement: Throwable = new NoSuchElementException
11+
}
12+
}

0 commit comments

Comments
 (0)