diff --git a/reactive-core/src/main/scala/reactive/EventStream.scala b/reactive-core/src/main/scala/reactive/EventStream.scala index acbd48c0..1bb79f69 100644 --- a/reactive-core/src/main/scala/reactive/EventStream.scala +++ b/reactive-core/src/main/scala/reactive/EventStream.scala @@ -42,6 +42,8 @@ object EventStream { * @see EventSource */ trait EventStream[+T] extends Foreachable[T] { + type Runner = ( =>Unit)=>Unit + /** * Registers a listener function to run whenever * an event is fired. The function is held with a WeakReference @@ -148,6 +150,16 @@ trait EventStream[+T] extends Foreachable[T] { */ def nonblocking: EventStream[T] + /** + * Returns a derived event stream in which event propagation does not happen on the thread firing it and block it. + * This is helpful when handling events can be time consuming. + * The implementation delegates propagation to runner function, that can handle events in different context, + * not accessable from EventStream (Android UI thread, thread pools, etc.) + * @param runner function of type ( =>Unit)=>Unit to be passed from outside context + * to run inner function in different context + */ + def withRunner(implicit runner: Runner): EventStream[T] + /** * Returns an EventStream whose tuple-valued events include a function for testing staleness. * The events will be of type (T, ()=>Boolean), where T is the type of the @@ -446,6 +458,14 @@ class EventSource[T] extends EventStream[T] with Logger { def throttle(period: Long): EventStream[T] = new Throttled(period) def nonblocking: EventStream[T] = new ActorEventStream + + def withRunner(implicit runner: Runner): EventStream[T] = new ChildEventSource[T, Unit](()) { + override def debugName = "%s.runnedOn(%s)" format (EventSource.this.debugName, runner) + def handler = { + case (parentEvent, _) => + runner(parentEvent, fire(parentEvent)) + } + } private[reactive] def addListener(f: (T) => Unit): Unit = synchronized { trace(AddingListener(f)) @@ -565,6 +585,7 @@ trait EventStreamProxy[T] extends EventStream[T] { def nonrecursive: EventStream[T] = underlying.nonrecursive def distinct: EventStream[T] = underlying.distinct def nonblocking: EventStream[T] = underlying.nonblocking + def withRunner(implicit runner: Runner): EventStream[T] = underlying.withRunner(runner) def zipWithStaleness: EventStream[(T, () => Boolean)] = underlying.zipWithStaleness def throttle(period: Long): EventStream[T] = underlying.throttle(period) private[reactive] def addListener(f: (T) => Unit): Unit = underlying.addListener(f)