Skip to content
This repository was archived by the owner on Jun 10, 2021. It is now read-only.

Commit 1e28338

Browse files
committed
Introduce ZeroK
1 parent bfd8814 commit 1e28338

File tree

14 files changed

+59
-58
lines changed

14 files changed

+59
-58
lines changed

build.sbt

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@ name := "asyncstreams"
22

33
version := "1.0"
44

5-
scalaVersion := "2.12.3"
5+
scalaVersion := "2.12.4"
66

77
parallelExecution in ThisBuild := false
88

@@ -13,11 +13,11 @@ val versions = Map(
1313
)
1414

1515
libraryDependencies ++= Seq(
16-
"org.scalaz" %% "scalaz-core" % "7.2.15",
16+
"org.scalaz" %% "scalaz-core" % "7.2.16",
1717
"io.monix" %% "monix-eval" % versions("monix") % Test,
1818
"io.monix" %% "monix-scalaz-72" % versions("monix") % Test,
1919
"com.twitter" %% "util-core" % "6.45.0" % Test,
2020
"io.catbird" %% "catbird-util" % "0.15.0" % Test,
2121
"me.jeffshaw.harmony" %% "harmony_cats0-9-0_scalaz7-2" % "1.1" % Test,
22-
"org.scalatest" %% "scalatest" % "3.0.3" % Test
22+
"org.scalatest" %% "scalatest" % "3.0.4" % Test
2323
)

src/main/scala/asyncstreams/ASImpl.scala

Lines changed: 8 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,7 @@
11
package asyncstreams
22

3+
import asyncstreams.typeclass.ZeroK
4+
35
import scala.language.higherKinds
46
import scalaz.MonadError
57

@@ -11,10 +13,10 @@ trait ASImpl[F[+_]] {
1113
def isEmpty[T](s: AsyncStream[F, T]): F[Boolean]
1214
}
1315

14-
class ASImplForMonadError[F[+_]](implicit fmp: MonadError[F, Throwable], ze: ZeroError[Throwable, F]) extends ASImpl[F] {
16+
class ASImplForMonadError[F[+_]](implicit fmp: MonadError[F, Throwable], ze: ZeroK[F]) extends ASImpl[F] {
1517
import scalaz.syntax.monadError._
1618

17-
override def empty[A]: AsyncStream[F, A] = AsyncStream(ze.zeroElement.raiseError[F, AsyncStream[F, A]#SStep])
19+
override def empty[A]: AsyncStream[F, A] = AsyncStream(ze.zero)
1820

1921
override def collectLeft[A, B](s: AsyncStream[F, A])(init: B)(f: (B, A) => B): F[B] = {
2022
def impl(d: F[Step[A, AsyncStream[F, A]]], acc: F[B]): F[B] =
@@ -24,13 +26,13 @@ class ASImplForMonadError[F[+_]](implicit fmp: MonadError[F, Throwable], ze: Zer
2426
}
2527

2628
override def fromIterable[T](it: Iterable[T]): AsyncStream[F, T] = AsyncStream {
27-
if (it.nonEmpty) Step(it.head, fromIterable(it.tail)).point[F] else ze.zeroElement.raiseError[F, AsyncStream[F, T]#SStep]
29+
if (it.nonEmpty) Step(it.head, fromIterable(it.tail)).point[F] else ze.zero
2830
}
2931

3032
override def takeWhile[T](s: AsyncStream[F, T])(p: (T) => Boolean): AsyncStream[F, T] = AsyncStream {
31-
s.data.map {
32-
case step if !p(step.value) => throw ze.zeroElement
33-
case step => Step(step.value, takeWhile(step.rest)(p))
33+
s.data.flatMap {
34+
case step if !p(step.value) => ze.zero
35+
case step => Step(step.value, takeWhile(step.rest)(p)).point[F]
3436
}
3537
}
3638

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,15 @@
11
package asyncstreams
22

3+
import asyncstreams.typeclass.ZeroK
4+
35
import scala.language.higherKinds
46
import scalaz.syntax.monadError._
57
import scalaz.{MonadError, MonadPlus}
68

7-
class ASMonadPlusForMonadError[F[+_]](implicit fmp: MonadError[F, Throwable], ze: ZeroError[Throwable, F]) extends MonadPlus[AsyncStream[F, ?]] {
9+
class ASMonadPlusForMonadError[F[+_]](implicit fmp: MonadError[F, Throwable], zk: ZeroK[F]) extends MonadPlus[AsyncStream[F, ?]] {
810
override def bind[A, B](fa: AsyncStream[F, A])(f: (A) => AsyncStream[F, B]): AsyncStream[F, B] = AsyncStream {
911
fa.data.flatMap(step => f(step.value).data.map(step2 => Step(step2.value, plus(step2.rest, bind(step.rest)(f)))))
10-
.handleError(_ => fmp.raiseError(ze.zeroElement))
12+
.handleError(_ => zk.zero)
1113
}
1214

1315
override def plus[A](a: AsyncStream[F, A], b: => AsyncStream[F, A]): AsyncStream[F, A] = AsyncStream {
@@ -16,5 +18,5 @@ class ASMonadPlusForMonadError[F[+_]](implicit fmp: MonadError[F, Throwable], ze
1618

1719
override def point[A](a: => A): AsyncStream[F, A] = AsyncStream(Step(a, empty[A]).point[F])
1820

19-
override def empty[A]: AsyncStream[F, A] = AsyncStream(ze.zeroElement.raiseError[F, AsyncStream[F, A]#SStep])
21+
override def empty[A]: AsyncStream[F, A] = AsyncStream(zk.zero)
2022
}
Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,18 +1,22 @@
11
package asyncstreams
22

3+
import asyncstreams.typeclass.ZeroK
4+
35
import scala.concurrent.{ExecutionContext, Future}
46
import scala.language.higherKinds
57
import scalaz.{Monad, MonadError, MonadPlus}
68

79
object Implicits {
810
object MonadErrorInstances {
9-
implicit def streamMonadPlus[F[+_]: λ[`x[+_]` => MonadError[x, Throwable]] : λ[`x[+_]` => ZeroError[Throwable, x]]]: MonadPlus[AsyncStream[F, ?]] = new ASMonadPlusForMonadError[F]
10-
implicit def impl[F[+_]: λ[`x[+_]` => MonadError[x, Throwable]] : λ[`x[+_]` => ZeroError[Throwable, x]]]: ASImpl[F] = new ASImplForMonadError[F]
11+
implicit def streamMonadPlus[F[+_]: λ[`x[+_]` => MonadError[x, Throwable]] : ZeroK]: MonadPlus[AsyncStream[F, ?]] = new ASMonadPlusForMonadError[F]
12+
implicit def impl[F[+_]: λ[`x[+_]` => MonadError[x, Throwable]] : ZeroK]: ASImpl[F] = new ASImplForMonadError[F]
1113
}
1214

1315
def asStateTOps[F[+_]: Monad](implicit methods: ASImpl[F]) = new ASStateTOps[F]
1416

1517
object ScalaFuture {
16-
implicit def scalaFutureZero(implicit ec: ExecutionContext): ZeroError[Throwable, Future] = new FutureZeroError()
18+
implicit def scalaFutureZero(implicit ec: ExecutionContext): ZeroK[Future] = new ZeroK[Future] {
19+
override def zero[A]: Future[A] = Future.failed(new NoSuchElementException)
20+
}
1721
}
1822
}

src/main/scala/asyncstreams/MonadFilterForMonadError.scala

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,17 +1,18 @@
11
package asyncstreams
22

3+
import asyncstreams.typeclass.ZeroK
4+
35
import scala.language.higherKinds
4-
import scalaz.syntax.monadError._
56
import scalaz.{MonadError, MonadPlus}
67

78
/**
89
* This class doesn't fully implement MonadPlus
910
* it is usable only for filtering
1011
*/
11-
class MonadFilterForMonadError[F[+_]](implicit fmp: MonadError[F, Throwable], ze: ZeroError[Throwable, F]) extends MonadPlus[F] {
12+
class MonadFilterForMonadError[F[+_]](implicit fmp: MonadError[F, Throwable], zk: ZeroK[F]) extends MonadPlus[F] {
1213
override def point[A](a: => A): F[A] = fmp.point(a)
1314

14-
override def empty[A]: F[A] = ze.zeroElement.raiseError
15+
override def empty[A]: F[A] = zk.zero
1516

1617
override def bind[A, B](fa: F[A])(f: (A) => F[B]): F[B] = fmp.bind(fa)(f)
1718

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,7 @@
11
package asyncstreams
22

3+
import asyncstreams.typeclass.ZeroK
4+
35
import scala.language.higherKinds
46
import scalaz.{Monad, MonadError, MonadPlus}
57

@@ -8,6 +10,6 @@ object Utils {
810
def toAS[F[+_]: Monad](implicit methods: ASImpl[F]): AsyncStream[F, T] = methods.fromIterable(it)
911
}
1012

11-
def monadErrorFilter[F[+_]: λ[`x[+_]` => MonadError[x, Throwable]] : λ[`x[+_]` => ZeroError[Throwable, x]]]: MonadPlus[F] =
13+
def monadErrorFilter[F[+_]: λ[`x[+_]` => MonadError[x, Throwable]] : ZeroK]: MonadPlus[F] =
1214
new MonadFilterForMonadError[F]
1315
}

src/main/scala/asyncstreams/ZeroError.scala

Lines changed: 0 additions & 14 deletions
This file was deleted.
Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,7 @@
1+
package asyncstreams.typeclass
2+
3+
import scala.language.higherKinds
4+
5+
trait ZeroK[F[+_]] {
6+
def zero[A]: F[A]
7+
}

src/test/scala/asyncstreams/monixTask/AsyncStreamTestsWithMonixTask.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@ import scala.concurrent.duration._
1414
import scalaz.syntax.monadPlus._
1515

1616
class AsyncStreamTestsWithMonixTask extends FunSuite with Matchers {
17-
import TaskZeroError.ze
17+
import TaskZeroK.taskZeroK
1818
private implicit val scheduler = Scheduler.fixedPool("monix", 4)
1919
private def makeInfStream = AsyncStream.unfold[Task, Int](0)(_ + 1)
2020
private def wait[T](f: Task[T], d: FiniteDuration = 5.seconds): T = Await.result(f.runAsync, d)

src/test/scala/asyncstreams/monixTask/TaskZeroError.scala

Lines changed: 0 additions & 11 deletions
This file was deleted.

0 commit comments

Comments
 (0)