From 7f91ed1ed316c6509fe5feb9d98c13ee28958b39 Mon Sep 17 00:00:00 2001 From: Yaroslav Geryatovich Date: Mon, 28 Jan 2013 18:46:06 +0200 Subject: [PATCH 1/5] add takeUntil functionality --- .../src/main/scala/reactive/EventStream.scala | 22 +++++++++++++++++++ 1 file changed, 22 insertions(+) diff --git a/reactive-core/src/main/scala/reactive/EventStream.scala b/reactive-core/src/main/scala/reactive/EventStream.scala index acbd48c0..e0bc9c68 100644 --- a/reactive-core/src/main/scala/reactive/EventStream.scala +++ b/reactive-core/src/main/scala/reactive/EventStream.scala @@ -78,6 +78,10 @@ trait EventStream[+T] extends Foreachable[T] { * be fired by the new EventStream. */ def filter(f: T => Boolean): EventStream[T] + + def once: EventStream[T] + + def takeUntil(stream: EventStream[Any])(implicit observing: Observing): EventStream[T] /** * Filter and map in one step. Takes a PartialFunction. * Whenever an event is received, if the PartialFunction @@ -277,6 +281,19 @@ class EventSource[T] extends EventStream[T] with Logger { } } + class TakeUntil(stream: EventStream[Any], obs: Observing) extends ChildEventSource[T, Boolean](true) { + implicit val observing = obs + override def debugName = "%s.takeUntil" format (EventSource.this.debugName) + stream.once.foreach(_ => state = false) + def handler = (event, _) => { + if (state) + fire(event) + else + EventSource.this.removeListener(listener) + state + } + } + class Collected[U](pf: PartialFunction[T, U]) extends ChildEventSource[U, Unit] { override def debugName = "%s.collect(%s)" format (EventSource.this.debugName, pf) private val pf0 = pf @@ -355,6 +372,9 @@ class EventSource[T] extends EventStream[T] with Logger { } def collect[U](pf: PartialFunction[T, U]): EventStream[U] = new Collected(pf) + + def once: EventStream[T] = new Once + def takeUntil(es: EventStream[Any])(implicit observing: Observing): EventStream[T] = new TakeUntil(es, observing) def map[U](f: T => U): EventStream[U] = { new ChildEventSource[U, Unit] { @@ -565,6 +585,8 @@ trait EventStreamProxy[T] extends EventStream[T] { def nonrecursive: EventStream[T] = underlying.nonrecursive def distinct: EventStream[T] = underlying.distinct def nonblocking: EventStream[T] = underlying.nonblocking + def once: EventStream[T] = underlying.once + def takeUntil(es: EventStream[Any])(implicit observing: Observing): EventStream[T] = underlying.takeUntil(es)(observing) def zipWithStaleness: EventStream[(T, () => Boolean)] = underlying.zipWithStaleness def throttle(period: Long): EventStream[T] = underlying.throttle(period) private[reactive] def addListener(f: (T) => Unit): Unit = underlying.addListener(f) From 0d9fc70206d4e90cfd5a0cd410fdc27635959c4d Mon Sep 17 00:00:00 2001 From: Yaroslav Geryatovich Date: Tue, 29 Jan 2013 13:06:56 +0200 Subject: [PATCH 2/5] add documentation --- .../src/main/scala/reactive/EventStream.scala | 11 +++++++++-- 1 file changed, 9 insertions(+), 2 deletions(-) diff --git a/reactive-core/src/main/scala/reactive/EventStream.scala b/reactive-core/src/main/scala/reactive/EventStream.scala index e0bc9c68..a26226b5 100644 --- a/reactive-core/src/main/scala/reactive/EventStream.scala +++ b/reactive-core/src/main/scala/reactive/EventStream.scala @@ -78,9 +78,16 @@ trait EventStream[+T] extends Foreachable[T] { * be fired by the new EventStream. */ def filter(f: T => Boolean): EventStream[T] - + /** + * Return a new EventStream that will propagate only one event from + * this EventStream + */ def once: EventStream[T] - + /** + * Returns a new EventStream that propagates this EventStream's events + * until the provided EventStream stream fires first event. + * @param stream the stream, that will shut down this EventStream + */ def takeUntil(stream: EventStream[Any])(implicit observing: Observing): EventStream[T] /** * Filter and map in one step. Takes a PartialFunction. From cfb096f415d8641c97c06e1b0cb684cc8e493ff3 Mon Sep 17 00:00:00 2001 From: Yaroslav Geryatovich Date: Tue, 29 Jan 2013 14:02:32 +0200 Subject: [PATCH 3/5] use provided observing value --- reactive-core/src/main/scala/reactive/EventStream.scala | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/reactive-core/src/main/scala/reactive/EventStream.scala b/reactive-core/src/main/scala/reactive/EventStream.scala index a26226b5..9225c7e3 100644 --- a/reactive-core/src/main/scala/reactive/EventStream.scala +++ b/reactive-core/src/main/scala/reactive/EventStream.scala @@ -288,10 +288,9 @@ class EventSource[T] extends EventStream[T] with Logger { } } - class TakeUntil(stream: EventStream[Any], obs: Observing) extends ChildEventSource[T, Boolean](true) { - implicit val observing = obs + class TakeUntil(stream: EventStream[Any], observing: Observing) extends ChildEventSource[T, Boolean](true) { override def debugName = "%s.takeUntil" format (EventSource.this.debugName) - stream.once.foreach(_ => state = false) + stream.once.foreach(_ => state = false)(observing) def handler = (event, _) => { if (state) fire(event) From 127ba7996fd6190ce6821ccd94b67788775666fe Mon Sep 17 00:00:00 2001 From: Yaroslav Geryatovich Date: Tue, 29 Jan 2013 14:02:32 +0200 Subject: [PATCH 4/5] add take(n) method, rename takeUntil to until, add tests --- .../src/main/scala/reactive/EventStream.scala | 37 ++++++++++++++---- .../scala/reactive/EventStreamTests.scala | 39 +++++++++++++++++++ 2 files changed, 68 insertions(+), 8 deletions(-) diff --git a/reactive-core/src/main/scala/reactive/EventStream.scala b/reactive-core/src/main/scala/reactive/EventStream.scala index 9225c7e3..b5b11525 100644 --- a/reactive-core/src/main/scala/reactive/EventStream.scala +++ b/reactive-core/src/main/scala/reactive/EventStream.scala @@ -79,8 +79,14 @@ trait EventStream[+T] extends Foreachable[T] { */ def filter(f: T => Boolean): EventStream[T] /** - * Return a new EventStream that will propagate only one event from - * this EventStream + * Returns a new EventStream that propagates only first n events + * of this EventStream + * @param n number of first events to propagate + */ + def take(n: Int): EventStream[T] + /** + * Returns a new EventStream that propagates only first event + * of this EventStream */ def once: EventStream[T] /** @@ -88,7 +94,7 @@ trait EventStream[+T] extends Foreachable[T] { * until the provided EventStream stream fires first event. * @param stream the stream, that will shut down this EventStream */ - def takeUntil(stream: EventStream[Any])(implicit observing: Observing): EventStream[T] + def until(stream: EventStream[Any])(implicit observing: Observing): EventStream[T] /** * Filter and map in one step. Takes a PartialFunction. * Whenever an event is received, if the PartialFunction @@ -287,9 +293,20 @@ class EventSource[T] extends EventStream[T] with Logger { next } } + + class Take(n: Int) extends ChildEventSource[T, Int](0) { + override def debugName = "%s.take(%d)" format (EventSource.this.debugName, n) + def handler = (event, last) => { + if (last < n) + fire(event) + else + EventSource.this.removeListener(listener) + last + 1 + } + } - class TakeUntil(stream: EventStream[Any], observing: Observing) extends ChildEventSource[T, Boolean](true) { - override def debugName = "%s.takeUntil" format (EventSource.this.debugName) + class Until(stream: EventStream[Any], observing: Observing) extends ChildEventSource[T, Boolean](true) { + override def debugName = "%s.until(%s)" format (EventSource.this.debugName, stream) stream.once.foreach(_ => state = false)(observing) def handler = (event, _) => { if (state) @@ -379,8 +396,11 @@ class EventSource[T] extends EventStream[T] with Logger { def collect[U](pf: PartialFunction[T, U]): EventStream[U] = new Collected(pf) - def once: EventStream[T] = new Once - def takeUntil(es: EventStream[Any])(implicit observing: Observing): EventStream[T] = new TakeUntil(es, observing) + def once: EventStream[T] = new Take(1) + + def take(n: Int) = new Take(n) + + def until(es: EventStream[Any])(implicit observing: Observing): EventStream[T] = new Until(es, observing) def map[U](f: T => U): EventStream[U] = { new ChildEventSource[U, Unit] { @@ -592,7 +612,8 @@ trait EventStreamProxy[T] extends EventStream[T] { def distinct: EventStream[T] = underlying.distinct def nonblocking: EventStream[T] = underlying.nonblocking def once: EventStream[T] = underlying.once - def takeUntil(es: EventStream[Any])(implicit observing: Observing): EventStream[T] = underlying.takeUntil(es)(observing) + def take(n: Int): EventStream[T] = underlying.take(n) + def until(es: EventStream[Any])(implicit observing: Observing): EventStream[T] = underlying.until(es)(observing) def zipWithStaleness: EventStream[(T, () => Boolean)] = underlying.zipWithStaleness def throttle(period: Long): EventStream[T] = underlying.throttle(period) private[reactive] def addListener(f: (T) => Unit): Unit = underlying.addListener(f) diff --git a/reactive-core/src/test/scala/reactive/EventStreamTests.scala b/reactive-core/src/test/scala/reactive/EventStreamTests.scala index d52b8680..8c643f74 100644 --- a/reactive-core/src/test/scala/reactive/EventStreamTests.scala +++ b/reactive-core/src/test/scala/reactive/EventStreamTests.scala @@ -113,6 +113,45 @@ class EventStreamTests extends FunSuite with ShouldMatchers with CollectEvents w es fire 2 } should equal (List(2, 1)) } + + test("take") { + val es = new EventSource[Int] {} + val n = 3 + val take = es take n + collecting(take){ + es fire 2 + es fire 1 + es fire 4 + es fire 2 + es fire 42 + } should equal (List(2, 1, 4)) + } + + test("once") { + val es = new EventSource[Int] {} + val onced = es.once + collecting(onced){ + es fire 2 + es fire 1 + es fire 4 + es fire 2 + es fire 42 + } should equal (List(2)) + } + + test("until") { + val es = new EventSource[Int] {} + val killer = new EventSource[Unit] {} + val res = es.until(killer) + collecting(res){ + es fire 2 + es fire 1 + es fire 4 + killer fire() + es fire 2 + es fire 42 + } should equal (List(2, 1, 4)) + } test("foldLeft") { val es = new EventSource[Int] {} From 7f0e034b94e7d93fda4c42e2742a3e3784fad351 Mon Sep 17 00:00:00 2001 From: Yaroslav Geryatovich Date: Wed, 30 Jan 2013 14:17:54 +0200 Subject: [PATCH 5/5] add description about take and until methods in web-demo --- .../src/main/webapp/core/EventStream.html | 17 +++++++++++++++++ 1 file changed, 17 insertions(+) diff --git a/reactive-web-demo/src/main/webapp/core/EventStream.html b/reactive-web-demo/src/main/webapp/core/EventStream.html index 0c6dbcba..196f0016 100644 --- a/reactive-web-demo/src/main/webapp/core/EventStream.html +++ b/reactive-web-demo/src/main/webapp/core/EventStream.html @@ -104,6 +104,23 @@

Finer-grained lifetime control: takeWhile

+

take and once

+

When you just want to receive only first n events +you can use take method. It takes n as parameter +and return you EventStream. You should think about this method +as the similar one in collection framework.

+

once is special case when you only need first event +of the EventStream and it actually equals take(1)

+ +

until

+

In some cases you need to shut down one EventStream +by some event from another EventStream. +So lets think about it that way: You want to take all events until some other event happens. +In terms of reactive those events are provided by two EventStreams. +So for that case you can use one.until(another) method, where another +is EventStream that shut down one after its first event. +

+

What's the point?

At this point you may be wondering why go through all these hoops? Why not use the regular listener pattern directly, like Swing