Skip to content
This repository was archived by the owner on Jun 10, 2021. It is now read-only.

Commit 21abd2e

Browse files
committed
Add zipWithIndex, some minor cleanup
1 parent 1ccbadb commit 21abd2e

File tree

2 files changed

+15
-6
lines changed

2 files changed

+15
-6
lines changed

src/main/scala/asyncstreams/AsyncStream.scala

Lines changed: 9 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
package asyncstreams
22

33
import cats.kernel.Monoid
4-
import cats.{Alternative, Monad}
4+
import cats.{Alternative, Applicative, Monad}
55
import cats.syntax.applicative._
66
import cats.syntax.flatMap._
77
import cats.syntax.functor._
@@ -24,7 +24,7 @@ class AsyncStream[F[+_]: Monad, +A](private[asyncstreams] val data: F[Step[A, As
2424
data.map(p => Step(p.value, p.rest.take(n - 1)))
2525
}
2626

27-
def drop(n: Int)(implicit smp: Alternative[AsyncStream[F, ?]]): AsyncStream[F, A] =
27+
def drop(n: Int): AsyncStream[F, A] =
2828
if (n <= 0) this
2929
else AsyncStream {
3030
data.flatMap(p => p.rest.drop(n - 1).data)
@@ -82,23 +82,26 @@ class AsyncStream[F[+_]: Monad, +A](private[asyncstreams] val data: F[Step[A, As
8282
stepB <- sb.data
8383
} yield Step((stepA.value, stepB.value), stepA.rest zip stepB.rest)
8484
}
85+
86+
def zipWithIndex(implicit app: Applicative[AsyncStream[F, ?]]): AsyncStream[F, (A, Int)] =
87+
zip(AsyncStream.unfold(0)(_ + 1))
8588
}
8689

8790
object AsyncStream {
8891
private[asyncstreams] def apply[F[+_]: Monad, A](data: => F[Step[A, AsyncStream[F, A]]]): AsyncStream[F, A] = new AsyncStream(data)
8992
def asyncNil[F[+_]: Monad, A](implicit impl: ASImpl[F]): AsyncStream[F, A] = impl.empty
9093

91-
private[asyncstreams] def generate[F[+_]: Monad, S, A](start: S)(gen: S => F[(S, A)])(implicit smp: Alternative[AsyncStream[F, ?]]): AsyncStream[F, A] = AsyncStream {
94+
private[asyncstreams] def generate[F[+_]: Monad, S, A](start: S)(gen: S => F[(S, A)])(implicit smp: Applicative[AsyncStream[F, ?]]): AsyncStream[F, A] = AsyncStream {
9295
gen(start).map((stateEl: (S, A)) => Step(stateEl._2, generate(stateEl._1)(gen)))
9396
}
9497

95-
def unfold[F[+_]: Monad, T](start: T)(makeNext: T => T)(implicit smp: Alternative[AsyncStream[F, ?]]): AsyncStream[F, T] =
98+
def unfold[F[+_]: Monad, T](start: T)(makeNext: T => T)(implicit smp: Applicative[AsyncStream[F, ?]]): AsyncStream[F, T] =
9699
generate(start)(s => (makeNext(s), s).pure[F])
97100

98-
def unfoldM[F[+_]: Monad, T](start: T)(makeNext: T => F[T])(implicit alt: Alternative[AsyncStream[F, ?]]): AsyncStream[F, T] =
101+
def unfoldM[F[+_]: Monad, T](start: T)(makeNext: T => F[T])(implicit alt: Applicative[AsyncStream[F, ?]]): AsyncStream[F, T] =
99102
generate(start)(s => makeNext(s).map(n => (n, s)))
100103

101-
def unfoldMM[F[+_]: Monad, T](start: F[T])(makeNext: T => F[T])(implicit alt: Alternative[AsyncStream[F, ?]]): AsyncStream[F, T] = AsyncStream {
104+
def unfoldMM[F[+_]: Monad, T](start: F[T])(makeNext: T => F[T])(implicit alt: Applicative[AsyncStream[F, ?]]): AsyncStream[F, T] = AsyncStream {
102105
start.flatMap(initial => generate(initial)(s => makeNext(s).map(n => (n, s))).data)
103106
}
104107
}

src/test/scala/asyncstreams/stdFuture/AsyncStreamOperations.scala

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -131,4 +131,10 @@ class AsyncStreamOperations extends FunSuite with Matchers {
131131

132132
await(res.to[List]) shouldBe (1, 4) :: (2, 5) :: Nil
133133
}
134+
135+
test("zipWithIndex") {
136+
val res = stream.zipWithIndex.to[Vector]
137+
138+
await(res) shouldBe (0 to 30).zipWithIndex
139+
}
134140
}

0 commit comments

Comments
 (0)