Skip to content

Commit 2460c26

Browse files
committed
Linter&coverage
1 parent e3adaba commit 2460c26

File tree

3 files changed

+59
-60
lines changed

3 files changed

+59
-60
lines changed

src/corva/handlers.py

Lines changed: 24 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -28,8 +28,7 @@
2828
from corva.models.merge.merge import PartialRerunMergeEvent
2929
from corva.models.merge.raw import RawPartialRerunMergeEvent
3030
from corva.models.scheduled.raw import RawScheduledEvent
31-
from corva.models.scheduled.scheduled import ScheduledEvent, ScheduledNaturalTimeEvent, ScheduledDataTimeEvent, \
32-
ScheduledDepthEvent
31+
from corva.models.scheduled.scheduled import ScheduledEvent, ScheduledNaturalTimeEvent
3332
from corva.models.scheduled.scheduler_type import SchedulerType
3433
from corva.models.stream.raw import RawStreamEvent
3534
from corva.models.stream.stream import StreamEvent
@@ -132,14 +131,20 @@ def stream(
132131
133132
Arguments:
134133
handler: logging handler to include in Corva logger.
135-
merge_events: if True - merge all incoming events into one before passing them to func
134+
merge_events: if True - merge all incoming events into one before
135+
passing them to func
136136
"""
137137

138138
if func is None:
139139
return functools.partial(stream, handler=handler, merge_events=merge_events)
140140

141141
@functools.wraps(func)
142-
@functools.partial(base_handler, raw_event_type=RawStreamEvent, handler=handler, merge_events=merge_events)
142+
@functools.partial(
143+
base_handler,
144+
raw_event_type=RawStreamEvent,
145+
handler=handler,
146+
merge_events=merge_events
147+
)
143148
def wrapper(
144149
event: RawStreamEvent,
145150
api_key: str,
@@ -241,7 +246,12 @@ def scheduled(
241246
return functools.partial(scheduled, handler=handler, merge_events=merge_events)
242247

243248
@functools.wraps(func)
244-
@functools.partial(base_handler, raw_event_type=RawScheduledEvent, handler=handler, merge_events=merge_events)
249+
@functools.partial(
250+
base_handler,
251+
raw_event_type=RawScheduledEvent,
252+
handler=handler,
253+
merge_events=merge_events
254+
)
245255
def wrapper(
246256
event: RawScheduledEvent,
247257
api_key: str,
@@ -564,12 +574,18 @@ def _merge_events(aws_event: Any, data_transformation_type: RawBaseEvent) -> Any
564574
if not isinstance(aws_event[0], dict):
565575
aws_event: List[dict] = list(itertools.chain(*aws_event))
566576
is_depth = aws_event[0]["scheduler_type"] == SchedulerType.data_depth_milestone
567-
event_start, event_end = ("top_depth", "bottom_depth") if is_depth else ("schedule_start", "schedule_end")
568-
min_event_start, max_event_end = aws_event[0][event_start], aws_event[0].get(event_end)
577+
event_start, event_end = ("top_depth", "bottom_depth")\
578+
if is_depth else ("schedule_start", "schedule_end")
579+
min_event_start, max_event_end = (aws_event[0][event_start],
580+
aws_event[0].get(event_end))
569581
for event in aws_event[1:]:
570582
if event[event_start] < min_event_start:
571583
min_event_start = event[event_start]
572-
if max_event_end and event[event_end] > max_event_end:
584+
if (
585+
max_event_end
586+
and event.get(event_end)
587+
and event[event_end] > max_event_end
588+
):
573589
max_event_end = event[event_end]
574590

575591
aws_event[0][event_start] = min_event_start

tests/unit/test_scheduled_app.py

Lines changed: 19 additions & 44 deletions
Original file line numberDiff line numberDiff line change
@@ -534,64 +534,39 @@ def scheduled_app(event, api, cache):
534534
scheduled_app(event, context)
535535

536536

537-
def test_merge_events_scheduled_event(context, requests_mock):
537+
@pytest.mark.parametrize(
538+
"time_ranges, flat",
539+
(
540+
[((60, 120), (61, None), (62, 122)), True],
541+
[((61, None), (60, None), (62, None)), False],
542+
),
543+
)
544+
def test_merge_events_scheduled_event(context, requests_mock, time_ranges, flat):
538545
@scheduled(merge_events=True)
539546
def scheduled_app(event, api, state):
540547
return event
541548

542-
event = [
543-
[
544-
RawScheduledDataTimeEvent(
545-
asset_id=int(),
546-
interval=60,
547-
schedule=int(),
548-
schedule_start=60,
549-
schedule_end=120,
550-
app_connection=int(),
551-
app_stream=int(),
552-
company=int(),
553-
scheduler_type=SchedulerType.data_time,
554-
).dict(
555-
by_alias=True,
556-
exclude_unset=True,
557-
),
558-
RawScheduledDataTimeEvent(
559-
asset_id=int(),
560-
interval=60,
561-
schedule=int(),
562-
schedule_start=61,
563-
schedule_end=121,
564-
app_connection=int(),
565-
app_stream=int(),
566-
company=int(),
567-
scheduler_type=SchedulerType.data_time,
568-
).dict(
569-
by_alias=True,
570-
exclude_unset=True,
571-
),
549+
event = []
550+
for schedule_start, schedule_end in time_ranges:
551+
event.append(
572552
RawScheduledDataTimeEvent(
573553
asset_id=int(),
574554
interval=60,
575555
schedule=int(),
576-
schedule_start=62,
577-
schedule_end=122,
556+
schedule_start=schedule_start,
557+
schedule_end=schedule_end,
578558
app_connection=int(),
579559
app_stream=int(),
580560
company=int(),
581561
scheduler_type=SchedulerType.data_time,
582-
).dict(
583-
by_alias=True,
584-
exclude_unset=True,
585-
)
586-
]
587-
]
588-
589-
# patch post request, that sets scheduled task as completed
590-
# looks for url path like /scheduler/123/completed
591-
post_mock = requests_mock.post(re.compile(r'/scheduler/\d+/completed'))
562+
).dict(by_alias=True, exclude_unset=True)
563+
)
564+
if not flat:
565+
event = [event]
592566

593567
result_event: ScheduledDataTimeEvent = scheduled_app(event, context)[0]
594568

595569
assert result_event.start_time == 1
596570
assert result_event.end_time == 60
597-
assert result_event.schedule_end == 122
571+
max_schedule_run_value = time_ranges[-1][-1]
572+
assert result_event.schedule_end == max_schedule_run_value

tests/unit/test_stream_app.py

Lines changed: 16 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -621,8 +621,10 @@ def test_merge_events_stream_event(context):
621621
"""
622622
merge_events parameter is merging records for "stream" apps
623623
624-
When 3 events with 2 records each are sent and @stream decorator has optional "merge_events" param set to True -
625-
we're supposed to merge incoming events into one event with all records(6 in our case) combined
624+
When 3 events with 2 records each are sent and @stream decorator
625+
has optional "merge_events" param set to True - we're supposed
626+
to merge incoming events into one event with all records(6 in our
627+
case) combined
626628
"""
627629
@stream(merge_events=True)
628630
def stream_app(event, api, cache):
@@ -634,12 +636,14 @@ def stream_app(event, api, cache):
634636
RawTimeRecord(
635637
collection=str(),
636638
timestamp=1,
637-
**{"asset_id": 1, "company_id": 1},
639+
asset_id=1,
640+
company_id=1,
638641
),
639642
RawTimeRecord(
640643
collection=str(),
641644
timestamp=2,
642-
**{"asset_id": 1, "company_id": 1},
645+
asset_id=1,
646+
company_id=1,
643647
),
644648
],
645649
metadata=RawMetadata(
@@ -653,12 +657,14 @@ def stream_app(event, api, cache):
653657
RawTimeRecord(
654658
collection=str(),
655659
timestamp=3,
656-
**{"asset_id": 1, "company_id": 1},
660+
asset_id=1,
661+
company_id=1,
657662
),
658663
RawTimeRecord(
659664
collection=str(),
660665
timestamp=4,
661-
**{"asset_id": 1, "company_id": 1},
666+
asset_id=1,
667+
company_id=1,
662668
)
663669
],
664670
metadata=RawMetadata(
@@ -672,12 +678,14 @@ def stream_app(event, api, cache):
672678
RawTimeRecord(
673679
collection=str(),
674680
timestamp=5,
675-
**{"asset_id": 1, "company_id": 1},
681+
asset_id=1,
682+
company_id=1,
676683
),
677684
RawTimeRecord(
678685
collection=str(),
679686
timestamp=6,
680-
**{"asset_id": 1, "company_id": 1},
687+
asset_id=1,
688+
company_id=1,
681689
)
682690
],
683691
metadata=RawMetadata(

0 commit comments

Comments
 (0)