Skip to content

Commit e510d57

Browse files
authored
New optional merge_events parameter for stream and scheduled apps
2 parents 780bead + 46e69be commit e510d57

File tree

6 files changed

+223
-6
lines changed

6 files changed

+223
-6
lines changed

CHANGELOG.md

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,10 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
55
and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html).
66

77
## [Unreleased]
8-
- Added functionality to make retries for the failed HTTP requests.
8+
9+
### Added
10+
- Optional merge_events parameter to @stream and @scheduled decorators. Default is False, if True - merge all incoming events into one.
11+
- Added functionality to make retries for the failed HTTP requests.
912

1013

1114
# [1.10.0] - 2023-11-08
Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,10 @@
1+
from corva import Api, Cache, StreamTimeEvent, stream
2+
3+
4+
# imagine we actually have 3 incoming events with 3 records each
5+
@stream(merge_events=True)
6+
def app(event: StreamTimeEvent, api: Api, cache: Cache):
7+
# since we passed merge_events=True all 3 incoming events
8+
# and their records will be merged into a single event with 9 records
9+
assert len(event.records) == 9 # this will not fail
10+
return event
Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
from corva import Api, Cache, ScheduledNaturalTimeEvent, scheduled
2+
3+
4+
@scheduled(merge_events=True)
5+
def app(event: ScheduledNaturalTimeEvent, api: Api, cache: Cache):
6+
return event

docs/modules/ROOT/pages/index.adoc

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -541,6 +541,34 @@ and pass corresponding logging handler
541541
as a keyword argument to the app decorator.
542542
Use code samples above as the examples.
543543

544+
== Merging incoming events
545+
546+
[TIP]
547+
====
548+
Only <<stream,`stream`>>
549+
and <<scheduled,`scheduled`>>
550+
apps can use this feature.
551+
====
552+
553+
Sometimes Corva can send more than one event to <<scheduled,`scheduled`>> and <<stream,`stream`>> apps.
554+
Optionally we can ask to merge them into one event by providing `merge_events=True` parameter.
555+
[source,python]
556+
----
557+
include::example$merging/tutorial001.py[]
558+
----
559+
[source,python]
560+
----
561+
include::example$merging/tutorial002.py[]
562+
----
563+
Usually this is needed to save some IO operations by processing data in bigger batches.
564+
Use this parameter with care, in pessimistic scenario you can receive too much data, try to
565+
process it in "one go" and fail with timeout. In that case your app will be automatically
566+
restarted and you will start from the beginning and fail again.
567+
Without this parameter after each processed event corva-sdk will "remember" that event was processed.
568+
So, for example, if you will fail at event #5 and your app will be restarted - app will start processing
569+
from event #5(and not #1 like in case of `merge_events=True`)
570+
571+
544572
== Followable apps
545573

546574
[TIP]

src/corva/handlers.py

Lines changed: 73 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
import contextlib
22
import functools
3+
import itertools
34
import logging
45
import sys
56
import warnings
@@ -28,6 +29,7 @@
2829
from corva.models.merge.raw import RawPartialRerunMergeEvent
2930
from corva.models.scheduled.raw import RawScheduledEvent
3031
from corva.models.scheduled.scheduled import ScheduledEvent, ScheduledNaturalTimeEvent
32+
from corva.models.scheduled.scheduler_type import SchedulerType
3133
from corva.models.stream.raw import RawStreamEvent
3234
from corva.models.stream.stream import StreamEvent
3335
from corva.models.task import RawTaskEvent, TaskEvent, TaskStatus
@@ -58,6 +60,7 @@ def base_handler(
5860
func: Callable,
5961
raw_event_type: Type[RawBaseEvent],
6062
handler: Optional[logging.Handler],
63+
merge_events: bool = False,
6164
) -> Callable[[Any, Any], List[Any]]:
6265
@functools.wraps(func)
6366
def wrapper(aws_event: Any, aws_context: Any) -> List[Any]:
@@ -84,6 +87,8 @@ def wrapper(aws_event: Any, aws_context: Any) -> List[Any]:
8487
url=SETTINGS.CACHE_URL, decode_responses=True, max_connections=1
8588
)
8689
data_transformation_type = raw_custom_event_type or raw_event_type
90+
if merge_events:
91+
aws_event = _merge_events(aws_event, data_transformation_type)
8792
raw_events = data_transformation_type.from_raw_event(event=aws_event)
8893

8994
if (
@@ -120,18 +125,26 @@ def stream(
120125
func: Optional[Callable[[StreamEventT, Api, UserRedisSdk], Any]] = None,
121126
*,
122127
handler: Optional[logging.Handler] = None,
128+
merge_events: bool = False,
123129
) -> Callable:
124130
"""Runs stream app.
125131
126132
Arguments:
127133
handler: logging handler to include in Corva logger.
134+
merge_events: if True - merge all incoming events into one before
135+
passing them to func
128136
"""
129137

130138
if func is None:
131-
return functools.partial(stream, handler=handler)
139+
return functools.partial(stream, handler=handler, merge_events=merge_events)
132140

133141
@functools.wraps(func)
134-
@functools.partial(base_handler, raw_event_type=RawStreamEvent, handler=handler)
142+
@functools.partial(
143+
base_handler,
144+
raw_event_type=RawStreamEvent,
145+
handler=handler,
146+
merge_events=merge_events,
147+
)
135148
def wrapper(
136149
event: RawStreamEvent,
137150
api_key: str,
@@ -221,18 +234,26 @@ def scheduled(
221234
func: Optional[Callable[[ScheduledEventT, Api, UserRedisSdk], Any]] = None,
222235
*,
223236
handler: Optional[logging.Handler] = None,
237+
merge_events: bool = False,
224238
) -> Callable:
225239
"""Runs scheduled app.
226240
227241
Arguments:
228242
handler: logging handler to include in Corva logger.
243+
merge_events: if True - merge all incoming events into one before
244+
passing them to func
229245
"""
230246

231247
if func is None:
232-
return functools.partial(scheduled, handler=handler)
248+
return functools.partial(scheduled, handler=handler, merge_events=merge_events)
233249

234250
@functools.wraps(func)
235-
@functools.partial(base_handler, raw_event_type=RawScheduledEvent, handler=handler)
251+
@functools.partial(
252+
base_handler,
253+
raw_event_type=RawScheduledEvent,
254+
handler=handler,
255+
merge_events=merge_events,
256+
)
236257
def wrapper(
237258
event: RawScheduledEvent,
238259
api_key: str,
@@ -441,7 +462,6 @@ def wrapper(
441462
logging_ctx: LoggingContext,
442463
redis_client: redis.Redis,
443464
) -> Any:
444-
445465
logging_ctx.asset_id = event.data.asset_id
446466
logging_ctx.app_connection_id = event.data.app_connection_id
447467

@@ -544,3 +564,51 @@ def _get_custom_event_type_by_raw_aws_event(
544564
if events:
545565
return event_type, handler
546566
return None, None
567+
568+
569+
def _merge_events(
570+
aws_event: Any,
571+
data_transformation_type: Type[RawBaseEvent],
572+
) -> Any:
573+
"""
574+
Merges incoming aws_events into one.
575+
Merge happens differently, depending on app type.
576+
Only "scheduled" and "stream" type of apps can be processed here.
577+
If somehow any other type is passed - raise an exception
578+
"""
579+
if data_transformation_type is RawScheduledEvent:
580+
# scheduled event
581+
if not isinstance(aws_event[0], dict):
582+
aws_event = list(itertools.chain(*aws_event))
583+
scheduler_type = aws_event[0]["scheduler_type"]
584+
if isinstance(scheduler_type, SchedulerType):
585+
scheduler_type = scheduler_type.value
586+
is_depth = scheduler_type == SchedulerType.data_depth_milestone.value
587+
event_start, event_end = (
588+
("top_depth", "bottom_depth")
589+
if is_depth
590+
else ("schedule_start", "schedule_end")
591+
)
592+
min_event_start = min(e[event_start] for e in aws_event)
593+
max_event_end = max(
594+
(e[event_end] for e in aws_event if e.get(event_end) is not None),
595+
default=None,
596+
)
597+
aws_event[0][event_start] = min_event_start
598+
if max_event_end:
599+
aws_event[0][event_end] = max_event_end
600+
aws_event = aws_event[0]
601+
602+
elif data_transformation_type is RawStreamEvent:
603+
# stream event
604+
for event in aws_event[1:]:
605+
aws_event[0]["records"].extend(event["records"])
606+
aws_event = [aws_event[0]]
607+
608+
else:
609+
CORVA_LOGGER.warning(
610+
f"{data_transformation_type.__name__} does not support `merge event` "
611+
"parameter."
612+
)
613+
614+
return aws_event
Lines changed: 102 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,102 @@
1+
from typing import Dict, List, Union
2+
3+
import pytest
4+
5+
from corva import ScheduledDataTimeEvent, StreamTimeEvent
6+
from corva.configuration import SETTINGS
7+
from corva.models.scheduled.raw import RawScheduledDataTimeEvent
8+
from corva.models.scheduled.scheduler_type import SchedulerType
9+
from corva.models.stream.log_type import LogType
10+
from corva.models.stream.raw import (
11+
RawAppMetadata,
12+
RawMetadata,
13+
RawStreamTimeEvent,
14+
RawTimeRecord,
15+
)
16+
from docs.modules.ROOT.examples.merging import tutorial001, tutorial002
17+
18+
19+
def test_tutorial001(context):
20+
"""
21+
merge_events parameter is merging records for "stream" apps
22+
23+
When 3 events with 2 records each are sent and @stream decorator
24+
has optional "merge_events" param set to True - we're supposed
25+
to merge incoming events into one event with all records(6 in our
26+
case) combined
27+
"""
28+
29+
event = []
30+
timestamp = 1
31+
# generate 3 events with 2 records each
32+
for _ in range(3):
33+
event.extend(
34+
[
35+
RawStreamTimeEvent(
36+
records=[
37+
RawTimeRecord(
38+
collection=str(),
39+
timestamp=timestamp,
40+
asset_id=1,
41+
company_id=1,
42+
),
43+
RawTimeRecord(
44+
collection=str(),
45+
timestamp=timestamp + 1,
46+
asset_id=1,
47+
company_id=1,
48+
),
49+
RawTimeRecord(
50+
collection=str(),
51+
timestamp=timestamp + 2,
52+
asset_id=1,
53+
company_id=1,
54+
),
55+
],
56+
metadata=RawMetadata(
57+
app_stream_id=1,
58+
apps={SETTINGS.APP_KEY: RawAppMetadata(app_connection_id=1)},
59+
log_type=LogType.time,
60+
),
61+
).dict()
62+
]
63+
)
64+
timestamp += 3
65+
66+
result_event: StreamTimeEvent = tutorial001.app(event, context)[0]
67+
assert len(result_event.records) == 9, "records were not merged into a single event"
68+
69+
70+
@pytest.mark.parametrize(
71+
"time_ranges, flat",
72+
(
73+
[((60, 120), (61, None), (62, 122)), True],
74+
[((61, None), (60, None), (62, None)), False],
75+
),
76+
)
77+
def test_tutorial002(context, time_ranges, flat):
78+
79+
event: List[Union[List, Dict]] = []
80+
for schedule_start, schedule_end in time_ranges:
81+
event.append(
82+
RawScheduledDataTimeEvent(
83+
asset_id=int(),
84+
interval=60,
85+
schedule=int(),
86+
schedule_start=schedule_start,
87+
schedule_end=schedule_end,
88+
app_connection=int(),
89+
app_stream=int(),
90+
company=int(),
91+
scheduler_type=SchedulerType.data_time,
92+
).dict(by_alias=True, exclude_unset=True)
93+
)
94+
if not flat:
95+
event = [event]
96+
97+
result_event: ScheduledDataTimeEvent = tutorial002.app(event, context)[0]
98+
99+
assert result_event.start_time == 1
100+
assert result_event.end_time == 60
101+
max_schedule_value = time_ranges[-1][-1]
102+
assert result_event.schedule_end == max_schedule_value # type: ignore[attr-defined]

0 commit comments

Comments
 (0)