Skip to content
This repository was archived by the owner on Jun 10, 2021. It is now read-only.

Commit 0390ca5

Browse files
authored
Merge pull request #3 from danslapman/scalaz-reimpl
Abandon FState, use StateT
2 parents 05550cc + 760d7b6 commit 0390ca5

27 files changed

+366
-511
lines changed

build.sbt

Lines changed: 10 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,19 +1,23 @@
11
name := "asyncstreams"
22

3-
version := "0.5-SNAPSHOT"
3+
version := "1.0"
44

5-
scalaVersion := "2.11.8"
5+
scalaVersion := "2.12.2"
6+
7+
parallelExecution in ThisBuild := false
68

79
addCompilerPlugin("org.spire-math" %% "kind-projector" % "0.9.3")
810

911
val versions = Map(
10-
"monix" -> "2.2.3"
12+
"monix" -> "2.2.4"
1113
)
1214

1315
libraryDependencies ++= Seq(
14-
"org.scalaz" %% "scalaz-core" % "7.2.9",
15-
"com.twitter" %% "util-core" % "6.41.0" % Test,
16+
"org.scalaz" %% "scalaz-core" % "7.2.11",
1617
"io.monix" %% "monix-eval" % versions("monix") % Test,
1718
"io.monix" %% "monix-scalaz-72" % versions("monix") % Test,
18-
"org.scalatest" %% "scalatest" % "3.0.1" % Test
19+
//"com.twitter" %% "util-core" % "6.43.0" % Test,
20+
//"io.catbird" %% "catbird-util" % "0.14.0" % Test, //cats instances for util-core
21+
//"com.codecommit" %% "shims-core" % "1.0-b0e5152" % Test,
22+
"org.scalatest" %% "scalatest" % "3.0.3" % Test
1923
)

copying.txt

Lines changed: 0 additions & 13 deletions
This file was deleted.

project/build.properties

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1 +1 @@
1-
sbt.version = 0.13.13
1+
sbt.version = 0.13.15

project/plugins.sbt

Lines changed: 0 additions & 1 deletion
This file was deleted.

readme.md

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,13 @@
11
asyncstreams [![Release](https://jitpack.io/v/danslapman/asyncstreams.svg)](https://jitpack.io/#danslapman/asyncstreams)
22
=========
33

4+
**Note: 0.4 release is outdated, use master-SNAPSHOT for now**
5+
46
asyncstreams is a monadic asynchronous stream library. It allows you to write stateful asynchronous algorithms
57
that emits elements into a stream:
68

79
```scala
8-
val stream = generateS(0) {
10+
val stream = genS(0) {
911
for {
1012
s <- getS[Int]
1113
if s < 3
@@ -20,15 +22,14 @@ See more examples in tests.
2022

2123
asyncstreams is tested to work with:
2224
- standard scala futures
23-
- twitter futures (with some [instances](https://github.com/danslapman/asyncstreams/blob/master/src/test/scala/asyncstreams/twitterFutures/TwitterInstances.scala))
24-
- monix tasks
25+
- monix tasks (WIP, there are some issues)
2526

2627
asyncstreams is available via jitpack:
2728

2829
```
2930
resolvers += "jitpack" at "https://jitpack.io"
3031
31-
libraryDependencies += "com.github.danslapman" %% "asyncstreams" % "0.4"
32+
libraryDependencies += "com.github.danslapman" %% "asyncstreams" % "master-SNAPSHOT"
3233
```
3334

34-
asyncstreams is based on [scala-async](https://github.com/iboltaev/scala-async) ideas.
35+
asyncstreams initially based on [scala-async](https://github.com/iboltaev/scala-async) ideas.
Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,38 @@
1+
package asyncstreams
2+
3+
import scala.language.higherKinds
4+
import scalaz.MonadError
5+
6+
trait ASImpl[F[+_]] {
7+
def empty[A]: AsyncStream[F, A]
8+
def collectLeft[A, B](s: AsyncStream[F, A])(init: B)(f: (B, A) => B): F[B]
9+
def fromIterable[T](it: Iterable[T]): AsyncStream[F, T]
10+
def takeWhile[T](s: AsyncStream[F, T])(p: T => Boolean): AsyncStream[F, T]
11+
def isEmpty[T](s: AsyncStream[F, T]): F[Boolean]
12+
}
13+
14+
class ASImplForMonadError[F[+_]](implicit fmp: MonadError[F, Throwable], ze: ZeroError[Throwable, F]) extends ASImpl[F] {
15+
import scalaz.syntax.monadError._
16+
17+
override def empty[A]: AsyncStream[F, A] = AsyncStream(ze.zeroElement.raiseError[F, AsyncStream[F, A]#SStep])
18+
19+
override def collectLeft[A, B](s: AsyncStream[F, A])(init: B)(f: (B, A) => B): F[B] = {
20+
def impl(d: F[Step[A, AsyncStream[F, A]]], acc: F[B]): F[B] =
21+
d.flatMap(step => impl(step.rest.data, acc.map(b => f(b, step.value)))).handleError(_ => acc)
22+
23+
impl(s.data, init.point[F])
24+
}
25+
26+
override def fromIterable[T](it: Iterable[T]): AsyncStream[F, T] = AsyncStream {
27+
if (it.nonEmpty) Step(it.head, fromIterable(it.tail)).point[F] else ze.zeroElement.raiseError[F, AsyncStream[F, T]#SStep]
28+
}
29+
30+
override def takeWhile[T](s: AsyncStream[F, T])(p: (T) => Boolean): AsyncStream[F, T] = AsyncStream {
31+
s.data.map {
32+
case step if !p(step.value) => throw ze.zeroElement
33+
case step => Step(step.value, takeWhile(step.rest)(p))
34+
}
35+
}
36+
37+
override def isEmpty[T](s: AsyncStream[F, T]): F[Boolean] = s.data.map(_ => false).handleError(_ => true.point[F])
38+
}
Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,20 @@
1+
package asyncstreams
2+
3+
import scala.language.higherKinds
4+
import scalaz.syntax.monadError._
5+
import scalaz.{MonadError, MonadPlus}
6+
7+
class ASMonadPlusForMonadError[F[+_]](implicit fmp: MonadError[F, Throwable], ze: ZeroError[Throwable, F]) extends MonadPlus[AsyncStream[F, ?]] {
8+
override def bind[A, B](fa: AsyncStream[F, A])(f: (A) => AsyncStream[F, B]): AsyncStream[F, B] = AsyncStream {
9+
fa.data.flatMap(step => f(step.value).data.map(step2 => Step(step2.value, plus(step2.rest, bind(step.rest)(f)))))
10+
.handleError(_ => fmp.raiseError(ze.zeroElement))
11+
}
12+
13+
override def plus[A](a: AsyncStream[F, A], b: => AsyncStream[F, A]): AsyncStream[F, A] = AsyncStream {
14+
a.data.map(step => Step(step.value, plus(step.rest, b))).handleError(_ => b.data)
15+
}
16+
17+
override def point[A](a: => A): AsyncStream[F, A] = AsyncStream(Step(a, empty[A]).point[F])
18+
19+
override def empty[A]: AsyncStream[F, A] = AsyncStream(ze.zeroElement.raiseError[F, AsyncStream[F, A]#SStep])
20+
}
Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,35 @@
1+
package asyncstreams
2+
3+
import scala.language.higherKinds
4+
import scalaz.syntax.monad._
5+
import scalaz.{IndexedStateT, Monad, MonadPlus, MonadState, StateT}
6+
7+
class ASStateTOps[F[+_]: Monad](implicit methods: ASImpl[F]) {
8+
def foreach[A, S](stream: AsyncStream[F, A])(f: A => StateT[F, S, _]): StateT[F, S, Unit] = StateT { s =>
9+
methods.collectLeft(stream)(s.point[F])((fS, a) => fS.flatMap(s2 => f(a)(s2).map(_._1)))
10+
.flatMap(identity).map((_, ()))
11+
}
12+
13+
def isEmpty[A, S](stream: AsyncStream[F, A]): StateT[F, S, Boolean] = StateT { s =>
14+
stream.isEmpty.map((s, _))
15+
}
16+
17+
def isEmpty[A, S](f: S => AsyncStream[F, A])(implicit ms: MonadState[IndexedStateT[F, S, S, ?], S]): StateT[F, S, Boolean] = {
18+
ms.get >>= ((s: S) => isEmpty(f(s)))
19+
}
20+
21+
def notEmpty[A, S](stream: AsyncStream[F, A]): StateT[F, S, Boolean] = StateT { s =>
22+
stream.nonEmpty.map((s, _))
23+
}
24+
25+
def notEmpty[A, S](f: S => AsyncStream[F, A])(implicit ms: MonadState[IndexedStateT[F, S, S, ?], S]): StateT[F, S, Boolean] = {
26+
ms.get >>= ((s: S) => notEmpty(f(s)))
27+
}
28+
29+
def get[A, S](stream: AsyncStream[F, A]): StateT[F, S, (AsyncStream[F, A], A)] = StateT { s =>
30+
stream.data.map(step => (s, (step.rest, step.value)))
31+
}
32+
33+
def genS[S, A](start: S)(gen: StateT[F, S, A])(implicit smp: MonadPlus[AsyncStream[F, ?]]): AsyncStream[F, A] =
34+
AsyncStream.generate(start)(gen.run)
35+
}

src/main/scala/asyncstreams/AsyncStream.scala

Lines changed: 34 additions & 55 deletions
Original file line numberDiff line numberDiff line change
@@ -4,73 +4,52 @@ import scala.annotation.unchecked.{uncheckedVariance => uV}
44
import scala.collection.GenIterable
55
import scala.collection.generic.CanBuildFrom
66
import scala.language.higherKinds
7-
import scalaz.Monad
8-
import scalaz.syntax.monad._
7+
import scalaz.syntax.monadPlus._
8+
import scalaz.{Monad, MonadPlus}
99

10-
case class AsyncStream[F[+_]: Monad, A](data: F[Step[A, AsyncStream[F, A]]]) {
11-
import AsyncStream._
10+
class AsyncStream[F[+_]: Monad, A](val data: F[Step[A, AsyncStream[F, A]]]) {
11+
type SStep = Step[A, AsyncStream[F, A]]
1212

13-
def foldLeft[B](start: B)(f: (B, A) => B): F[B] = {
14-
def impl(d: F[Step[A, AsyncStream[F, A]]], acc: F[B]): F[B] =
15-
d.flatMap {
16-
case END => acc
17-
case step => impl(step.rest.data, acc map (b => f(b, step.value)))
18-
}
19-
20-
impl(data, start.point[F])
21-
}
22-
23-
def to[Col[_]](implicit cbf: CanBuildFrom[Nothing, A, Col[A @uV]]): F[Col[A]] =
24-
foldLeft(cbf())((col, el) => col += el).map(_.result())
25-
26-
27-
def takeWhile(p: A => Boolean): AsyncStream[F, A] =
28-
new AsyncStream[F, A](data map {
29-
case END => END
30-
case step if !p(step.value) => END
31-
case step => Step(step.value, step.rest.takeWhile(p))
32-
})
13+
def to[Col[_]](implicit cbf: CanBuildFrom[Nothing, A, Col[A @uV]], methods: ASImpl[F]): F[Col[A]] =
14+
methods.collectLeft(this)(cbf())((col, el) => col += el).map(_.result())
3315

16+
def takeWhile(p: A => Boolean)(implicit impl: ASImpl[F]): AsyncStream[F, A] = impl.takeWhile(this)(p)
3417

35-
def take(n: Int): AsyncStream[F, A] =
36-
if (n <= 0) nil
37-
else AsyncStream(data.map {
38-
case END => END
39-
case p => Step(p.value, p.rest.take(n - 1))
40-
})
18+
def take(n: Int)(implicit smp: MonadPlus[AsyncStream[F, ?]]): AsyncStream[F, A] =
19+
if (n <= 0) smp.empty
20+
else AsyncStream {
21+
data.map(p => Step(p.value, p.rest.take(n - 1)))
22+
}
4123

42-
def foreach[U](f: (A) => U): F[Unit] =
43-
foldLeft(())((_: Unit, a: A) => {f(a); ()})
24+
def foreach[U](f: (A) => U)(implicit methods: ASImpl[F]): F[Unit] =
25+
methods.collectLeft(this)(())((_: Unit, a: A) => {f(a); ()})
4426

45-
def foreachF[U](f: (A) => F[U]): F[Unit] =
46-
foldLeft(().point[F])((fu: F[Unit], a: A) => fu.flatMap(_ => f(a)).map(_ => ())).flatMap(identity)
27+
def foreachF[U](f: (A) => F[U])(implicit impl: ASImpl[F]): F[Unit] =
28+
impl.collectLeft(this)(().point[F])((fu: F[Unit], a: A) => fu.flatMap(_ => f(a)).map(_ => ())).flatMap(identity)
4729

48-
def flatten[B](implicit asIterable: A => GenIterable[B]): AsyncStream[F, B] = {
49-
val streamChunk = (p: Step[A, AsyncStream[F, A]]) =>
50-
concat(generate(asIterable(p.value))(it => if (it.nonEmpty) (it.head, it.tail).point[F] else ENDF[F]), p.rest.flatten)
30+
def flatten[B](implicit asIterable: A => GenIterable[B], smp: MonadPlus[AsyncStream[F, ?]], impl: ASImpl[F]): AsyncStream[F, B] = {
31+
def streamChunk(step: AsyncStream[F, A]#SStep): AsyncStream[F, B] =
32+
impl.fromIterable(asIterable(step.value).seq) <+> step.rest.flatten
5133

52-
AsyncStream(data.flatMap {
53-
case END => ENDF[F]
54-
case step => streamChunk(step).data
55-
})
34+
AsyncStream(data.flatMap(step => streamChunk(step).data))
5635
}
36+
37+
def isEmpty(implicit impl: ASImpl[F]): F[Boolean] = impl.isEmpty(this)
38+
def nonEmpty(implicit impl: ASImpl[F]): F[Boolean] = impl.isEmpty(this).map(!_)
5739
}
5840

5941
object AsyncStream {
60-
def nil[F[+_]: Monad, A]: AsyncStream[F, A] = AsyncStream(ENDF[F])
61-
def single[F[+_]: Monad, A](item: A): AsyncStream[F, A] =
62-
AsyncStream(Step(item, nil[F, A]).point[F])
42+
def apply[F[+_]: Monad, A](data: => F[Step[A, AsyncStream[F, A]]]): AsyncStream[F, A] = new AsyncStream(data)
43+
def asyncNil[F[+_]: Monad, A](implicit impl: ASImpl[F]): AsyncStream[F, A] = impl.empty
6344

64-
def generate[F[+_]: Monad, S, A](start: S)(gen: S => F[(A, S)]): AsyncStream[F, A] =
65-
AsyncStream(gen(start).map {
66-
case END => END
67-
case (el, rest) => Step(el, generate(rest)(gen))
68-
})
45+
private[asyncstreams] def generate[F[+_]: Monad, S, A](start: S)(gen: S => F[(S, A)])(implicit smp: MonadPlus[AsyncStream[F, ?]]): AsyncStream[F, A] = AsyncStream {
46+
gen(start).map((stateEl: (S, A)) => Step(stateEl._2, generate(stateEl._1)(gen)))
47+
}
6948

70-
def concat[F[+_]: Monad, A](s1: AsyncStream[F, A], s2: AsyncStream[F, A]): AsyncStream[F, A] =
71-
new AsyncStream[F, A](s1.data.flatMap {
72-
case END => s2.data
73-
case step => Step(step.value, concat(step.rest, s2)).point[F]
74-
})
75-
}
49+
def unfold[F[+_]: Monad, T](start: T)(makeNext: T => T)(implicit smp: MonadPlus[AsyncStream[F, ?]]): AsyncStream[F, T] =
50+
generate(start)(s => (makeNext(s), s).point[F])
7651

52+
implicit class AsyncStreamOps[F[+_]: Monad, A](stream: => AsyncStream[F, A]) {
53+
def ~::(el: A) = AsyncStream(Step(el, stream).point[F])
54+
}
55+
}

src/main/scala/asyncstreams/AsyncStreamMonad.scala

Lines changed: 0 additions & 51 deletions
This file was deleted.

0 commit comments

Comments
 (0)