Skip to content
This repository was archived by the owner on Sep 10, 2024. It is now read-only.
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 21 additions & 0 deletions reactive-core/src/main/scala/reactive/EventStream.scala
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,8 @@ object EventStream {
* @see EventSource
*/
trait EventStream[+T] extends Foreachable[T] {
type Runner = ( =>Unit)=>Unit

/**
* Registers a listener function to run whenever
* an event is fired. The function is held with a WeakReference
Expand Down Expand Up @@ -148,6 +150,16 @@ trait EventStream[+T] extends Foreachable[T] {
*/
def nonblocking: EventStream[T]

/**
* Returns a derived event stream in which event propagation does not happen on the thread firing it and block it.
* This is helpful when handling events can be time consuming.
* The implementation delegates propagation to runner function, that can handle events in different context,
* not accessable from EventStream (Android UI thread, thread pools, etc.)
* @param runner function of type ( =>Unit)=>Unit to be passed from outside context
* to run inner function in different context
*/
def withRunner(implicit runner: Runner): EventStream[T]

/**
* Returns an EventStream whose tuple-valued events include a function for testing staleness.
* The events will be of type (T, ()=>Boolean), where T is the type of the
Expand Down Expand Up @@ -446,6 +458,14 @@ class EventSource[T] extends EventStream[T] with Logger {
def throttle(period: Long): EventStream[T] = new Throttled(period)

def nonblocking: EventStream[T] = new ActorEventStream

def withRunner(implicit runner: Runner): EventStream[T] = new ChildEventSource[T, Unit](()) {
override def debugName = "%s.runnedOn(%s)" format (EventSource.this.debugName, runner)
def handler = {
case (parentEvent, _) =>
runner(parentEvent, fire(parentEvent))
}
}

private[reactive] def addListener(f: (T) => Unit): Unit = synchronized {
trace(AddingListener(f))
Expand Down Expand Up @@ -565,6 +585,7 @@ trait EventStreamProxy[T] extends EventStream[T] {
def nonrecursive: EventStream[T] = underlying.nonrecursive
def distinct: EventStream[T] = underlying.distinct
def nonblocking: EventStream[T] = underlying.nonblocking
def withRunner(implicit runner: Runner): EventStream[T] = underlying.withRunner(runner)
def zipWithStaleness: EventStream[(T, () => Boolean)] = underlying.zipWithStaleness
def throttle(period: Long): EventStream[T] = underlying.throttle(period)
private[reactive] def addListener(f: (T) => Unit): Unit = underlying.addListener(f)
Expand Down