Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 5 additions & 5 deletions eventkit/ops/transform.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
import time
from collections import deque

from ..util import NO_VALUE
from ..util import NO_VALUE, get_event_loop
from .combine import Chain, Concat, Merge, Switch
from .op import Op

Expand Down Expand Up @@ -226,7 +226,7 @@ def __init__(self, func, timeout=0, ordered=True, task_limit=None, source=None):

def on_source(self, *args):
obj = self._func(*args)
if asyncio.iscoroutine(obj):
if hasattr(obj, "__await__"):
# function returns an awaitable
if not self._task_limit or len(self._tasks) < self._task_limit:
# schedule right away
Expand All @@ -245,12 +245,12 @@ def on_source_done(self, source):

self._source = None

def _create_task(self, coro):
def _create_task(self, awaitable):
# schedule a task to be run
if self._timeout:
coro = asyncio.wait_for(coro, self._timeout)
awaitable = asyncio.wait_for(awaitable, self._timeout)

task = asyncio.create_task(coro)
task = asyncio.ensure_future(awaitable, loop=get_event_loop())
task.add_done_callback(self._on_task_done)
self._tasks.append(task)

Expand Down
35 changes: 35 additions & 0 deletions tests/transform_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,18 @@

import numpy as np

import eventkit as ev
from eventkit import Event
from eventkit.util import get_event_loop

array = list(range(20))


def run(*args, **kwargs):
loop = get_event_loop()
return loop.run_until_complete(*args, **kwargs)


class TransformTest(unittest.TestCase):
def test_constant(self):
event = Event.sequence(array).constant(42)
Expand Down Expand Up @@ -151,3 +158,31 @@ def test_switchmap(self):
]
event = Event.range(3).switchmap(lambda v: Event.marble(marbles[v]))
self.assertEqual(event.run(), ["A", "B", "1", "2", "K", "L", "M", "N"])

def test_map_with_future(self):
"""Verify that Map correctly handles functions that return a Future."""
# Create a future that we will complete manually
my_future = asyncio.Future()

# The map function will just return our future
def map_func(x):
return my_future

event = Event.sequence([1]).map(map_func)
result = []
event.connect(result.append)

# Give the event loop a chance to run the map
run(asyncio.sleep(0))

# The event should not have emitted yet, as the future is not done
self.assertEqual(result, [])

# Now, complete the future
my_future.set_result(42)

# Give the event loop a chance to process the completion
run(asyncio.sleep(0))

# The event should now have emitted the future's result
self.assertEqual(result, [42])