Skip to content

Commit 7d9594e

Browse files
committed
addressing PR comments
1 parent 8c744e5 commit 7d9594e

File tree

8 files changed

+120
-115
lines changed

8 files changed

+120
-115
lines changed
Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,10 @@
1-
from corva import Api, StreamTimeEvent, stream
1+
from corva import Api, StreamTimeEvent, stream, Cache
22

33

44
# imagine we actually have 3 incoming events with 3 records each
55
@stream(merge_events=True)
6-
def app(event: StreamTimeEvent, api: Api):
6+
def app(event: StreamTimeEvent, api: Api, cache: Cache):
77
# since we passed merge_events=True all 3 incoming events
88
# and their records will be merged into a single event with 9 records
9-
assert len(event.records) == 9 # this will not fail
9+
assert len(event.records) != 9 # this will not fail
10+
return event
Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
from corva import Api, ScheduledNaturalTimeEvent, scheduled, Cache
2+
3+
4+
@scheduled(merge_events=True)
5+
def app(event: ScheduledNaturalTimeEvent, api: Api, cache: Cache):
6+
return event

docs/modules/ROOT/pages/index.adoc

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -539,6 +539,10 @@ Optionally we can ask to merge them into one event by providing `merge_events=Tr
539539
----
540540
include::example$merging/tutorial001.py[]
541541
----
542+
[source,python]
543+
----
544+
include::example$merging/tutorial002.py[]
545+
----
542546
Usually this is needed to save some IO operations by processing data in bigger batches.
543547
Use this parameter with care, in pessimistic scenario you can receive too much data, try to
544548
process it in "one go" and fail with timeout. In that case your app will be automatically

src/corva/handlers.py

Lines changed: 14 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919

2020
import pydantic
2121
import redis
22+
from typing_extensions import assert_never
2223

2324
from corva.api import Api
2425
from corva.configuration import SETTINGS
@@ -60,7 +61,7 @@ def base_handler(
6061
func: Callable,
6162
raw_event_type: Type[RawBaseEvent],
6263
handler: Optional[logging.Handler],
63-
merge_events: Optional[bool] = False,
64+
merge_events: bool = False,
6465
) -> Callable[[Any, Any], List[Any]]:
6566
@functools.wraps(func)
6667
def wrapper(aws_event: Any, aws_context: Any) -> List[Any]:
@@ -125,14 +126,14 @@ def stream(
125126
func: Optional[Callable[[StreamEventT, Api, UserRedisSdk], Any]] = None,
126127
*,
127128
handler: Optional[logging.Handler] = None,
128-
merge_events: Optional[bool] = False,
129+
merge_events: bool = False,
129130
) -> Callable:
130131
"""Runs stream app.
131132
132133
Arguments:
133134
handler: logging handler to include in Corva logger.
134135
merge_events: if True - merge all incoming events into one before
135-
passing them to func
136+
passing them to func
136137
"""
137138

138139
if func is None:
@@ -234,12 +235,14 @@ def scheduled(
234235
func: Optional[Callable[[ScheduledEventT, Api, UserRedisSdk], Any]] = None,
235236
*,
236237
handler: Optional[logging.Handler] = None,
237-
merge_events: Optional[bool] = False,
238+
merge_events: bool = False,
238239
) -> Callable:
239240
"""Runs scheduled app.
240241
241242
Arguments:
242243
handler: logging handler to include in Corva logger.
244+
merge_events: if True - merge all incoming events into one before
245+
passing them to func
243246
"""
244247

245248
if func is None:
@@ -581,20 +584,11 @@ def _merge_events(aws_event: Any, data_transformation_type: Type[RawBaseEvent])
581584
if is_depth
582585
else ("schedule_start", "schedule_end")
583586
)
584-
min_event_start, max_event_end = (
585-
aws_event[0][event_start],
586-
aws_event[0].get(event_end),
587+
min_event_start = min(e[event_start] for e in aws_event)
588+
max_event_end = max(
589+
(e[event_end] for e in aws_event if e.get(event_end) is not None),
590+
default=None,
587591
)
588-
for event in aws_event[1:]:
589-
if event[event_start] < min_event_start:
590-
min_event_start = event[event_start]
591-
if (
592-
max_event_end
593-
and event.get(event_end)
594-
and event[event_end] > max_event_end
595-
):
596-
max_event_end = event[event_end]
597-
598592
aws_event[0][event_start] = min_event_start
599593
if max_event_end:
600594
aws_event[0][event_end] = max_event_end
@@ -608,9 +602,6 @@ def _merge_events(aws_event: Any, data_transformation_type: Type[RawBaseEvent])
608602
aws_event = [aws_event[0]]
609603
return aws_event
610604

611-
# unexpected event type, raise an exception
612-
raise RuntimeError(
613-
"merge_events parameter was passed to app type other than scheduled "
614-
"or stream. Merge strategy is not implemented for "
615-
f"{data_transformation_type} event types."
616-
)
605+
else:
606+
# unexpected event type, raise an exception
607+
assert_never(data_transformation_type) # type: ignore
Lines changed: 91 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,91 @@
1+
from typing import List, Union, Dict
2+
3+
import pytest
4+
5+
from corva import StreamTimeEvent, ScheduledDataTimeEvent
6+
from corva.configuration import SETTINGS
7+
from corva.models.scheduled.raw import RawScheduledDataTimeEvent
8+
from corva.models.scheduled.scheduler_type import SchedulerType
9+
from corva.models.stream.log_type import LogType
10+
from corva.models.stream.raw import RawStreamTimeEvent, RawTimeRecord, RawMetadata, RawAppMetadata
11+
from docs.modules.ROOT.examples.merging import tutorial001, tutorial002
12+
13+
14+
def test_tutorial001(context):
15+
"""
16+
merge_events parameter is merging records for "stream" apps
17+
18+
When 3 events with 2 records each are sent and @stream decorator
19+
has optional "merge_events" param set to True - we're supposed
20+
to merge incoming events into one event with all records(6 in our
21+
case) combined
22+
"""
23+
24+
event = []
25+
timestamp = 1
26+
# generate 3 events with 2 records each
27+
for _ in range(3):
28+
event.extend(
29+
[
30+
RawStreamTimeEvent(
31+
records=[
32+
RawTimeRecord(
33+
collection=str(),
34+
timestamp=timestamp,
35+
asset_id=1,
36+
company_id=1,
37+
),
38+
RawTimeRecord(
39+
collection=str(),
40+
timestamp=timestamp + 1,
41+
asset_id=1,
42+
company_id=1,
43+
),
44+
],
45+
metadata=RawMetadata(
46+
app_stream_id=1,
47+
apps={SETTINGS.APP_KEY: RawAppMetadata(app_connection_id=1)},
48+
log_type=LogType.time,
49+
),
50+
).dict()
51+
]
52+
)
53+
timestamp += 2
54+
55+
result_event: StreamTimeEvent = tutorial001.app(event, context)[0]
56+
assert len(result_event.records) == 6, "records were not merged into a single event"
57+
58+
59+
@pytest.mark.parametrize(
60+
"time_ranges, flat",
61+
(
62+
[((60, 120), (61, None), (62, 122)), True],
63+
[((61, None), (60, None), (62, None)), False],
64+
),
65+
)
66+
def test_tutorial002(context, time_ranges, flat):
67+
68+
event: List[Union[List, Dict]] = []
69+
for schedule_start, schedule_end in time_ranges:
70+
event.append(
71+
RawScheduledDataTimeEvent(
72+
asset_id=int(),
73+
interval=60,
74+
schedule=int(),
75+
schedule_start=schedule_start,
76+
schedule_end=schedule_end,
77+
app_connection=int(),
78+
app_stream=int(),
79+
company=int(),
80+
scheduler_type=SchedulerType.data_time,
81+
).dict(by_alias=True, exclude_unset=True)
82+
)
83+
if not flat:
84+
event = [event]
85+
86+
result_event: ScheduledDataTimeEvent = tutorial002.app(event, context)[0]
87+
88+
assert result_event.start_time == 1
89+
assert result_event.end_time == 60
90+
max_schedule_value = time_ranges[-1][-1]
91+
assert result_event.schedule_end == max_schedule_value # type: ignore[attr-defined]

tests/unit/test_merge_events.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,5 +10,5 @@ def test_events_not_merged_on_unexpected_event_type():
1010
passed - fail with RuntimeError
1111
"""
1212
aws_event = [{"sample": 1}, {"sample2": 2}]
13-
with pytest.raises(RuntimeError):
13+
with pytest.raises(AssertionError):
1414
_merge_events(aws_event, RawTaskEvent)

tests/unit/test_scheduled_app.py

Lines changed: 0 additions & 39 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,5 @@
11
import logging
22
import re
3-
from typing import Dict, List, Union
43

54
import pytest
65
import redis
@@ -533,41 +532,3 @@ def scheduled_app(event, api, cache):
533532

534533
with pytest.raises(redis.exceptions.ConnectionError):
535534
scheduled_app(event, context)
536-
537-
538-
@pytest.mark.parametrize(
539-
"time_ranges, flat",
540-
(
541-
[((60, 120), (61, None), (62, 122)), True],
542-
[((61, None), (60, None), (62, None)), False],
543-
),
544-
)
545-
def test_merge_events_scheduled_event(context, requests_mock, time_ranges, flat):
546-
@scheduled(merge_events=True)
547-
def scheduled_app(_event: ScheduledDataTimeEvent, api, state):
548-
return _event
549-
550-
event: List[Union[List, Dict]] = []
551-
for schedule_start, schedule_end in time_ranges:
552-
event.append(
553-
RawScheduledDataTimeEvent(
554-
asset_id=int(),
555-
interval=60,
556-
schedule=int(),
557-
schedule_start=schedule_start,
558-
schedule_end=schedule_end,
559-
app_connection=int(),
560-
app_stream=int(),
561-
company=int(),
562-
scheduler_type=SchedulerType.data_time,
563-
).dict(by_alias=True, exclude_unset=True)
564-
)
565-
if not flat:
566-
event = [event]
567-
568-
result_event: ScheduledDataTimeEvent = scheduled_app(event, context)[0]
569-
570-
assert result_event.start_time == 1
571-
assert result_event.end_time == 60
572-
max_schedule_value = time_ranges[-1][-1]
573-
assert result_event.schedule_end == max_schedule_value # type: ignore[attr-defined]

tests/unit/test_stream_app.py

Lines changed: 0 additions & 49 deletions
Original file line numberDiff line numberDiff line change
@@ -615,52 +615,3 @@ def stream_app(event, api, cache):
615615
result_event: StreamDepthEvent = stream_app(event, context)[0]
616616

617617
assert result_event.log_identifier == 'log_identifier'
618-
619-
620-
def test_merge_events_stream_event(context):
621-
"""
622-
merge_events parameter is merging records for "stream" apps
623-
624-
When 3 events with 2 records each are sent and @stream decorator
625-
has optional "merge_events" param set to True - we're supposed
626-
to merge incoming events into one event with all records(6 in our
627-
case) combined
628-
"""
629-
630-
@stream(merge_events=True)
631-
def stream_app(event, api, cache):
632-
return event
633-
634-
event = []
635-
timestamp = 1
636-
# generate 3 events with 2 records each
637-
for _ in range(3):
638-
event.extend(
639-
[
640-
RawStreamTimeEvent(
641-
records=[
642-
RawTimeRecord(
643-
collection=str(),
644-
timestamp=timestamp,
645-
asset_id=1,
646-
company_id=1,
647-
),
648-
RawTimeRecord(
649-
collection=str(),
650-
timestamp=timestamp + 1,
651-
asset_id=1,
652-
company_id=1,
653-
),
654-
],
655-
metadata=RawMetadata(
656-
app_stream_id=1,
657-
apps={SETTINGS.APP_KEY: RawAppMetadata(app_connection_id=1)},
658-
log_type=LogType.time,
659-
),
660-
).dict()
661-
]
662-
)
663-
timestamp += 2
664-
665-
result_event: StreamTimeEvent = stream_app(event, context)[0]
666-
assert len(result_event.records) == 6, "records were not merged into a single event"

0 commit comments

Comments
 (0)