Skip to content

Commit e3adaba

Browse files
committed
Add optional merge_events parameter to stream and scheduled decorators
1 parent a4dc307 commit e3adaba

File tree

3 files changed

+182
-5
lines changed

3 files changed

+182
-5
lines changed

src/corva/handlers.py

Lines changed: 44 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
import contextlib
22
import functools
3+
import itertools
34
import logging
45
import sys
56
import warnings
@@ -27,7 +28,9 @@
2728
from corva.models.merge.merge import PartialRerunMergeEvent
2829
from corva.models.merge.raw import RawPartialRerunMergeEvent
2930
from corva.models.scheduled.raw import RawScheduledEvent
30-
from corva.models.scheduled.scheduled import ScheduledEvent, ScheduledNaturalTimeEvent
31+
from corva.models.scheduled.scheduled import ScheduledEvent, ScheduledNaturalTimeEvent, ScheduledDataTimeEvent, \
32+
ScheduledDepthEvent
33+
from corva.models.scheduled.scheduler_type import SchedulerType
3134
from corva.models.stream.raw import RawStreamEvent
3235
from corva.models.stream.stream import StreamEvent
3336
from corva.models.task import RawTaskEvent, TaskEvent, TaskStatus
@@ -58,6 +61,7 @@ def base_handler(
5861
func: Callable,
5962
raw_event_type: Type[RawBaseEvent],
6063
handler: Optional[logging.Handler],
64+
merge_events: Optional[bool] = False
6165
) -> Callable[[Any, Any], List[Any]]:
6266
@functools.wraps(func)
6367
def wrapper(aws_event: Any, aws_context: Any) -> List[Any]:
@@ -84,6 +88,8 @@ def wrapper(aws_event: Any, aws_context: Any) -> List[Any]:
8488
url=SETTINGS.CACHE_URL, decode_responses=True, max_connections=1
8589
)
8690
data_transformation_type = raw_custom_event_type or raw_event_type
91+
if merge_events:
92+
aws_event = _merge_events(aws_event, data_transformation_type)
8793
raw_events = data_transformation_type.from_raw_event(event=aws_event)
8894

8995
if (
@@ -120,18 +126,20 @@ def stream(
120126
func: Optional[Callable[[StreamEventT, Api, UserRedisSdk], Any]] = None,
121127
*,
122128
handler: Optional[logging.Handler] = None,
129+
merge_events: Optional[bool] = False
123130
) -> Callable:
124131
"""Runs stream app.
125132
126133
Arguments:
127134
handler: logging handler to include in Corva logger.
135+
merge_events: if True - merge all incoming events into one before passing them to func
128136
"""
129137

130138
if func is None:
131-
return functools.partial(stream, handler=handler)
139+
return functools.partial(stream, handler=handler, merge_events=merge_events)
132140

133141
@functools.wraps(func)
134-
@functools.partial(base_handler, raw_event_type=RawStreamEvent, handler=handler)
142+
@functools.partial(base_handler, raw_event_type=RawStreamEvent, handler=handler, merge_events=merge_events)
135143
def wrapper(
136144
event: RawStreamEvent,
137145
api_key: str,
@@ -221,6 +229,7 @@ def scheduled(
221229
func: Optional[Callable[[ScheduledEventT, Api, UserRedisSdk], Any]] = None,
222230
*,
223231
handler: Optional[logging.Handler] = None,
232+
merge_events: Optional[bool] = False
224233
) -> Callable:
225234
"""Runs scheduled app.
226235
@@ -229,10 +238,10 @@ def scheduled(
229238
"""
230239

231240
if func is None:
232-
return functools.partial(scheduled, handler=handler)
241+
return functools.partial(scheduled, handler=handler, merge_events=merge_events)
233242

234243
@functools.wraps(func)
235-
@functools.partial(base_handler, raw_event_type=RawScheduledEvent, handler=handler)
244+
@functools.partial(base_handler, raw_event_type=RawScheduledEvent, handler=handler, merge_events=merge_events)
236245
def wrapper(
237246
event: RawScheduledEvent,
238247
api_key: str,
@@ -544,3 +553,33 @@ def _get_custom_event_type_by_raw_aws_event(
544553
if events:
545554
return event_type, handler
546555
return None, None
556+
557+
558+
def _merge_events(aws_event: Any, data_transformation_type: RawBaseEvent) -> Any:
559+
"""
560+
Merges incoming aws_events into one.
561+
Merge happens differently, depending on app type.
562+
"""
563+
if data_transformation_type == RawScheduledEvent:
564+
if not isinstance(aws_event[0], dict):
565+
aws_event: List[dict] = list(itertools.chain(*aws_event))
566+
is_depth = aws_event[0]["scheduler_type"] == SchedulerType.data_depth_milestone
567+
event_start, event_end = ("top_depth", "bottom_depth") if is_depth else ("schedule_start", "schedule_end")
568+
min_event_start, max_event_end = aws_event[0][event_start], aws_event[0].get(event_end)
569+
for event in aws_event[1:]:
570+
if event[event_start] < min_event_start:
571+
min_event_start = event[event_start]
572+
if max_event_end and event[event_end] > max_event_end:
573+
max_event_end = event[event_end]
574+
575+
aws_event[0][event_start] = min_event_start
576+
if max_event_end:
577+
aws_event[0][event_end] = max_event_end
578+
aws_event = aws_event[0]
579+
return aws_event
580+
581+
# stream event
582+
for event in aws_event[1:]:
583+
aws_event[0]["records"].extend(event["records"])
584+
aws_event = [aws_event[0]]
585+
return aws_event

tests/unit/test_scheduled_app.py

Lines changed: 63 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -532,3 +532,66 @@ def scheduled_app(event, api, cache):
532532

533533
with pytest.raises(redis.exceptions.ConnectionError):
534534
scheduled_app(event, context)
535+
536+
537+
def test_merge_events_scheduled_event(context, requests_mock):
538+
@scheduled(merge_events=True)
539+
def scheduled_app(event, api, state):
540+
return event
541+
542+
event = [
543+
[
544+
RawScheduledDataTimeEvent(
545+
asset_id=int(),
546+
interval=60,
547+
schedule=int(),
548+
schedule_start=60,
549+
schedule_end=120,
550+
app_connection=int(),
551+
app_stream=int(),
552+
company=int(),
553+
scheduler_type=SchedulerType.data_time,
554+
).dict(
555+
by_alias=True,
556+
exclude_unset=True,
557+
),
558+
RawScheduledDataTimeEvent(
559+
asset_id=int(),
560+
interval=60,
561+
schedule=int(),
562+
schedule_start=61,
563+
schedule_end=121,
564+
app_connection=int(),
565+
app_stream=int(),
566+
company=int(),
567+
scheduler_type=SchedulerType.data_time,
568+
).dict(
569+
by_alias=True,
570+
exclude_unset=True,
571+
),
572+
RawScheduledDataTimeEvent(
573+
asset_id=int(),
574+
interval=60,
575+
schedule=int(),
576+
schedule_start=62,
577+
schedule_end=122,
578+
app_connection=int(),
579+
app_stream=int(),
580+
company=int(),
581+
scheduler_type=SchedulerType.data_time,
582+
).dict(
583+
by_alias=True,
584+
exclude_unset=True,
585+
)
586+
]
587+
]
588+
589+
# patch post request, that sets scheduled task as completed
590+
# looks for url path like /scheduler/123/completed
591+
post_mock = requests_mock.post(re.compile(r'/scheduler/\d+/completed'))
592+
593+
result_event: ScheduledDataTimeEvent = scheduled_app(event, context)[0]
594+
595+
assert result_event.start_time == 1
596+
assert result_event.end_time == 60
597+
assert result_event.schedule_end == 122

tests/unit/test_stream_app.py

Lines changed: 75 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -615,3 +615,78 @@ def stream_app(event, api, cache):
615615
result_event: StreamDepthEvent = stream_app(event, context)[0]
616616

617617
assert result_event.log_identifier == 'log_identifier'
618+
619+
620+
def test_merge_events_stream_event(context):
621+
"""
622+
merge_events parameter is merging records for "stream" apps
623+
624+
When 3 events with 2 records each are sent and @stream decorator has optional "merge_events" param set to True -
625+
we're supposed to merge incoming events into one event with all records(6 in our case) combined
626+
"""
627+
@stream(merge_events=True)
628+
def stream_app(event, api, cache):
629+
return event
630+
631+
event = [
632+
RawStreamTimeEvent(
633+
records=[
634+
RawTimeRecord(
635+
collection=str(),
636+
timestamp=1,
637+
**{"asset_id": 1, "company_id": 1},
638+
),
639+
RawTimeRecord(
640+
collection=str(),
641+
timestamp=2,
642+
**{"asset_id": 1, "company_id": 1},
643+
),
644+
],
645+
metadata=RawMetadata(
646+
app_stream_id=1,
647+
apps={SETTINGS.APP_KEY: RawAppMetadata(app_connection_id=1)},
648+
log_type=LogType.time,
649+
),
650+
).dict(),
651+
RawStreamTimeEvent(
652+
records=[
653+
RawTimeRecord(
654+
collection=str(),
655+
timestamp=3,
656+
**{"asset_id": 1, "company_id": 1},
657+
),
658+
RawTimeRecord(
659+
collection=str(),
660+
timestamp=4,
661+
**{"asset_id": 1, "company_id": 1},
662+
)
663+
],
664+
metadata=RawMetadata(
665+
app_stream_id=1,
666+
apps={SETTINGS.APP_KEY: RawAppMetadata(app_connection_id=1)},
667+
log_type=LogType.time,
668+
),
669+
).dict(),
670+
RawStreamTimeEvent(
671+
records=[
672+
RawTimeRecord(
673+
collection=str(),
674+
timestamp=5,
675+
**{"asset_id": 1, "company_id": 1},
676+
),
677+
RawTimeRecord(
678+
collection=str(),
679+
timestamp=6,
680+
**{"asset_id": 1, "company_id": 1},
681+
)
682+
],
683+
metadata=RawMetadata(
684+
app_stream_id=1,
685+
apps={SETTINGS.APP_KEY: RawAppMetadata(app_connection_id=1)},
686+
log_type=LogType.time,
687+
),
688+
).dict()
689+
]
690+
691+
result_event: StreamEvent = stream_app(event, context)[0]
692+
assert len(result_event.records) == 6, "records were not merged into a single event"

0 commit comments

Comments
 (0)