Skip to content

Commit 38d94fd

Browse files
committed
Fix coverage, linter
1 parent 2460c26 commit 38d94fd

File tree

3 files changed

+27
-20
lines changed

3 files changed

+27
-20
lines changed

src/corva/handlers.py

Lines changed: 16 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -60,7 +60,7 @@ def base_handler(
6060
func: Callable,
6161
raw_event_type: Type[RawBaseEvent],
6262
handler: Optional[logging.Handler],
63-
merge_events: Optional[bool] = False
63+
merge_events: Optional[bool] = False,
6464
) -> Callable[[Any, Any], List[Any]]:
6565
@functools.wraps(func)
6666
def wrapper(aws_event: Any, aws_context: Any) -> List[Any]:
@@ -125,7 +125,7 @@ def stream(
125125
func: Optional[Callable[[StreamEventT, Api, UserRedisSdk], Any]] = None,
126126
*,
127127
handler: Optional[logging.Handler] = None,
128-
merge_events: Optional[bool] = False
128+
merge_events: Optional[bool] = False,
129129
) -> Callable:
130130
"""Runs stream app.
131131
@@ -143,7 +143,7 @@ def stream(
143143
base_handler,
144144
raw_event_type=RawStreamEvent,
145145
handler=handler,
146-
merge_events=merge_events
146+
merge_events=merge_events,
147147
)
148148
def wrapper(
149149
event: RawStreamEvent,
@@ -234,7 +234,7 @@ def scheduled(
234234
func: Optional[Callable[[ScheduledEventT, Api, UserRedisSdk], Any]] = None,
235235
*,
236236
handler: Optional[logging.Handler] = None,
237-
merge_events: Optional[bool] = False
237+
merge_events: Optional[bool] = False,
238238
) -> Callable:
239239
"""Runs scheduled app.
240240
@@ -250,7 +250,7 @@ def scheduled(
250250
base_handler,
251251
raw_event_type=RawScheduledEvent,
252252
handler=handler,
253-
merge_events=merge_events
253+
merge_events=merge_events,
254254
)
255255
def wrapper(
256256
event: RawScheduledEvent,
@@ -565,19 +565,24 @@ def _get_custom_event_type_by_raw_aws_event(
565565
return None, None
566566

567567

568-
def _merge_events(aws_event: Any, data_transformation_type: RawBaseEvent) -> Any:
568+
def _merge_events(aws_event: Any, data_transformation_type: Type[RawBaseEvent]) -> Any:
569569
"""
570570
Merges incoming aws_events into one.
571571
Merge happens differently, depending on app type.
572572
"""
573573
if data_transformation_type == RawScheduledEvent:
574574
if not isinstance(aws_event[0], dict):
575-
aws_event: List[dict] = list(itertools.chain(*aws_event))
575+
aws_event = list(itertools.chain(*aws_event))
576576
is_depth = aws_event[0]["scheduler_type"] == SchedulerType.data_depth_milestone
577-
event_start, event_end = ("top_depth", "bottom_depth")\
578-
if is_depth else ("schedule_start", "schedule_end")
579-
min_event_start, max_event_end = (aws_event[0][event_start],
580-
aws_event[0].get(event_end))
577+
event_start, event_end = (
578+
("top_depth", "bottom_depth")
579+
if is_depth
580+
else ("schedule_start", "schedule_end")
581+
)
582+
min_event_start, max_event_end = (
583+
aws_event[0][event_start],
584+
aws_event[0].get(event_end),
585+
)
581586
for event in aws_event[1:]:
582587
if event[event_start] < min_event_start:
583588
min_event_start = event[event_start]

tests/unit/test_scheduled_app.py

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
import logging
22
import re
3+
from typing import Dict, List, Union
34

45
import pytest
56
import redis
@@ -543,10 +544,10 @@ def scheduled_app(event, api, cache):
543544
)
544545
def test_merge_events_scheduled_event(context, requests_mock, time_ranges, flat):
545546
@scheduled(merge_events=True)
546-
def scheduled_app(event, api, state):
547-
return event
547+
def scheduled_app(_event: ScheduledDataTimeEvent, api, state):
548+
return _event
548549

549-
event = []
550+
event: List[Union[List, Dict]] = []
550551
for schedule_start, schedule_end in time_ranges:
551552
event.append(
552553
RawScheduledDataTimeEvent(
@@ -568,5 +569,5 @@ def scheduled_app(event, api, state):
568569

569570
assert result_event.start_time == 1
570571
assert result_event.end_time == 60
571-
max_schedule_run_value = time_ranges[-1][-1]
572-
assert result_event.schedule_end == max_schedule_run_value
572+
max_schedule_value = time_ranges[-1][-1]
573+
assert result_event.schedule_end == max_schedule_value # type: ignore[attr-defined]

tests/unit/test_stream_app.py

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -626,6 +626,7 @@ def test_merge_events_stream_event(context):
626626
to merge incoming events into one event with all records(6 in our
627627
case) combined
628628
"""
629+
629630
@stream(merge_events=True)
630631
def stream_app(event, api, cache):
631632
return event
@@ -665,7 +666,7 @@ def stream_app(event, api, cache):
665666
timestamp=4,
666667
asset_id=1,
667668
company_id=1,
668-
)
669+
),
669670
],
670671
metadata=RawMetadata(
671672
app_stream_id=1,
@@ -686,15 +687,15 @@ def stream_app(event, api, cache):
686687
timestamp=6,
687688
asset_id=1,
688689
company_id=1,
689-
)
690+
),
690691
],
691692
metadata=RawMetadata(
692693
app_stream_id=1,
693694
apps={SETTINGS.APP_KEY: RawAppMetadata(app_connection_id=1)},
694695
log_type=LogType.time,
695696
),
696-
).dict()
697+
).dict(),
697698
]
698699

699-
result_event: StreamEvent = stream_app(event, context)[0]
700+
result_event: StreamTimeEvent = stream_app(event, context)[0]
700701
assert len(result_event.records) == 6, "records were not merged into a single event"

0 commit comments

Comments
 (0)