Skip to content

Commit 5ae03ca

Browse files
authored
Merge pull request #79 from corva-ai/feature/DRO-178_adding_merging_events_handler
DRO-178 | Adding merging events handler
2 parents 9d1ba6f + 8d11a78 commit 5ae03ca

File tree

12 files changed

+525
-32
lines changed

12 files changed

+525
-32
lines changed

CHANGELOG.md

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,12 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
77
## [Unreleased]
88

99

10+
# [1.10.0] - 2023-11-08
11+
12+
### Added
13+
- New handler to process partial rerun merge events
14+
15+
1016
## [1.9.2] - 2023-10-25
1117

1218
### Fixed
Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,23 @@
1+
from corva import ( # <1>
2+
Api,
3+
Cache,
4+
PartialRerunMergeEvent,
5+
StreamTimeEvent,
6+
partial_rerun_merge,
7+
stream,
8+
)
9+
10+
11+
@stream
12+
def stream_app(event: StreamTimeEvent, api: Api, cache: Cache):
13+
return "Handling stream event..."
14+
15+
16+
@partial_rerun_merge # <3>
17+
def partial_rerun_app(
18+
event: PartialRerunMergeEvent,
19+
api: Api,
20+
asset_cache: Cache,
21+
rerun_asset_cache: Cache,
22+
): # <2>
23+
return "Hello, World!" # <4>

docs/modules/ROOT/pages/index.adoc

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,7 @@ There are three app types that you can build:
4848
(e.g., once a minute, once every 3 ft.).
4949
3. <<task,`task`>> - works with data on-demand.
5050

51+
Each app type can have optional <<partial_merge_rerun,`handler`>> to process partial rerun merge events.
5152
TIP: Use type hints like in examples below
5253
for better support from editors and tools.
5354

@@ -140,6 +141,21 @@ The arguments serve as building blocks for your app.
140141
<.> {app-step3} `task`.
141142
<.> {app-step4}
142143

144+
[#partial_merge_rerun]
145+
=== Partial Rerun Merge Handler (Optional)
146+
147+
[source,python]
148+
----
149+
include::example$app_types/tutorial007.py[]
150+
----
151+
<.> {app-step1}
152+
<.> Define your function.
153+
It must receive 4 arguments:
154+
<<event,event>>, <<api,api>>, <<cache,cache>>, <<cache,cache>>.
155+
The arguments serve as building blocks for your app.
156+
<.> {app-step3} `partial_rerun_merge`.
157+
<.> {app-step4}
158+
143159
[#event]
144160
== Event
145161
Event is an object that contains essential data for the app.

src/corva/__init__.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
from .api import Api
2-
from .handlers import scheduled, stream, task
2+
from .handlers import scheduled, stream, task, partial_rerun_merge
33
from .logger import CORVA_LOGGER as Logger
44
from .models.rerun import RerunDepth, RerunDepthRange, RerunTime, RerunTimeRange
55
from .models.scheduled.scheduled import (
@@ -13,6 +13,7 @@
1313
StreamTimeEvent,
1414
StreamTimeRecord,
1515
)
16+
from .models.merge.merge import PartialRerunMergeEvent
1617
from .models.task import TaskEvent
1718
from .service.cache_sdk import UserRedisSdk as Cache
1819
from .shared import SECRETS as secrets

src/corva/handlers.py

Lines changed: 182 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -1,16 +1,31 @@
1+
import contextlib
12
import functools
23
import logging
34
import sys
45
import warnings
5-
from typing import Any, Callable, Dict, List, Optional, Type, TypeVar, Union, cast
6-
6+
from typing import (
7+
Any,
8+
Callable,
9+
Dict,
10+
List,
11+
Optional,
12+
Tuple,
13+
Type,
14+
TypeVar,
15+
Union,
16+
cast,
17+
)
18+
19+
import pydantic
720
import redis
821

922
from corva.api import Api
1023
from corva.configuration import SETTINGS
1124
from corva.logger import CORVA_LOGGER, CorvaLoggerHandler, LoggingContext
1225
from corva.models.base import RawBaseEvent
1326
from corva.models.context import CorvaContext
27+
from corva.models.merge.merge import PartialRerunMergeEvent
28+
from corva.models.merge.raw import RawPartialRerunMergeEvent
1429
from corva.models.scheduled.raw import RawScheduledEvent
1530
from corva.models.scheduled.scheduled import ScheduledEvent, ScheduledNaturalTimeEvent
1631
from corva.models.stream.raw import RawStreamEvent
@@ -20,8 +35,10 @@
2035
from corva.service.api_sdk import CachingApiSdk, CorvaApiSdk
2136
from corva.service.cache_sdk import FakeInternalCacheSdk, InternalRedisSdk, UserRedisSdk
2237

23-
StreamEventT = TypeVar('StreamEventT', bound=StreamEvent)
24-
ScheduledEventT = TypeVar('ScheduledEventT', bound=ScheduledEvent)
38+
StreamEventT = TypeVar("StreamEventT", bound=StreamEvent)
39+
ScheduledEventT = TypeVar("ScheduledEventT", bound=ScheduledEvent)
40+
HANDLERS: Dict[Type[RawBaseEvent], Callable] = {}
41+
GENERIC_APP_EVENT_TYPES = [RawStreamEvent, RawScheduledEvent, RawTaskEvent]
2542

2643

2744
def get_cache_key(
@@ -32,8 +49,8 @@ def get_cache_key(
3249
app_connection_id: int,
3350
) -> str:
3451
return (
35-
f'{provider}/well/{asset_id}/stream/{app_stream_id}/'
36-
f'{app_key}/{app_connection_id}'
52+
f"{provider}/well/{asset_id}/stream/{app_stream_id}/"
53+
f"{app_key}/{app_connection_id}"
3754
)
3855

3956

@@ -52,6 +69,12 @@ def wrapper(aws_event: Any, aws_context: Any) -> List[Any]:
5269
user_handler=handler,
5370
logger=CORVA_LOGGER,
5471
) as logging_ctx:
72+
(
73+
raw_custom_event_type,
74+
custom_handler,
75+
) = _get_custom_event_type_by_raw_aws_event(aws_event)
76+
specific_callable = custom_handler or func
77+
5578
try:
5679
context = CorvaContext.from_aws(
5780
aws_event=aws_event, aws_context=aws_context
@@ -60,10 +83,21 @@ def wrapper(aws_event: Any, aws_context: Any) -> List[Any]:
6083
redis_client = redis.Redis.from_url(
6184
url=SETTINGS.CACHE_URL, decode_responses=True, max_connections=1
6285
)
63-
raw_events = raw_event_type.from_raw_event(event=aws_event)
86+
data_transformation_type = raw_custom_event_type or raw_event_type
87+
raw_events = data_transformation_type.from_raw_event(event=aws_event)
88+
89+
if (
90+
custom_handler is None
91+
and data_transformation_type not in GENERIC_APP_EVENT_TYPES
92+
):
93+
CORVA_LOGGER.warning(
94+
f"Handler for {data_transformation_type.__name__!r} "
95+
f"event not found. Skipping..."
96+
)
97+
return []
6498

6599
results = [
66-
func(
100+
specific_callable(
67101
raw_event,
68102
context.api_key,
69103
context.aws_request_id,
@@ -76,7 +110,7 @@ def wrapper(aws_event: Any, aws_context: Any) -> List[Any]:
76110
return results
77111

78112
except Exception:
79-
CORVA_LOGGER.exception('The app failed to execute.')
113+
CORVA_LOGGER.exception("The app failed to execute.")
80114
raise
81115

82116
return wrapper
@@ -141,7 +175,7 @@ def wrapper(
141175
return
142176

143177
app_event = event.metadata.log_type.event.parse_obj(
144-
event.copy(update={'records': records}, deep=True)
178+
event.copy(update={"records": records}, deep=True)
145179
)
146180
with LoggingContext(
147181
aws_request_id=aws_request_id,
@@ -151,7 +185,7 @@ def wrapper(
151185
max_message_size=SETTINGS.LOG_THRESHOLD_MESSAGE_SIZE,
152186
max_message_count=SETTINGS.LOG_THRESHOLD_MESSAGE_COUNT,
153187
logger=CORVA_LOGGER,
154-
placeholder=' ...',
188+
placeholder=" ...",
155189
),
156190
user_handler=handler,
157191
logger=CORVA_LOGGER,
@@ -176,7 +210,7 @@ def wrapper(
176210
event.set_cached_max_record_value(cache=user_cache_sdk)
177211
except Exception as e:
178212
# lambda succeeds if we're unable to cache the value
179-
CORVA_LOGGER.warning(f'Could not save data to cache. Details: {str(e)}.')
213+
CORVA_LOGGER.warning(f"Could not save data to cache. Details: {str(e)}.")
180214

181215
return result
182216

@@ -243,7 +277,7 @@ def wrapper(
243277
max_message_size=SETTINGS.LOG_THRESHOLD_MESSAGE_SIZE,
244278
max_message_count=SETTINGS.LOG_THRESHOLD_MESSAGE_COUNT,
245279
logger=CORVA_LOGGER,
246-
placeholder=' ...',
280+
placeholder=" ...",
247281
),
248282
user_handler=handler,
249283
logger=CORVA_LOGGER,
@@ -281,7 +315,7 @@ def set_schedule_as_completed(event: RawScheduledEvent, api: Api) -> None:
281315
event.set_schedule_as_completed(api=api)
282316
except Exception as e:
283317
# lambda succeeds if we're unable to set completed status
284-
CORVA_LOGGER.warning(f'Could not set schedule as completed. Details: {str(e)}.')
318+
CORVA_LOGGER.warning(f"Could not set schedule as completed. Details: {str(e)}.")
285319

286320

287321
def task(
@@ -332,7 +366,7 @@ def wrapper(
332366
max_message_size=SETTINGS.LOG_THRESHOLD_MESSAGE_SIZE,
333367
max_message_count=SETTINGS.LOG_THRESHOLD_MESSAGE_COUNT,
334368
logger=CORVA_LOGGER,
335-
placeholder=' ...',
369+
placeholder=" ...",
336370
),
337371
user_handler=handler,
338372
logger=CORVA_LOGGER,
@@ -358,15 +392,15 @@ def wrapper(
358392
FutureWarning,
359393
)
360394

361-
data['payload'] = result
395+
data["payload"] = result
362396

363397
status = TaskStatus.success
364398

365399
return result
366400

367401
except Exception as exc:
368-
CORVA_LOGGER.exception('Task app failed to execute.')
369-
data = {'fail_reason': str(exc)}
402+
CORVA_LOGGER.exception("Task app failed to execute.")
403+
data = {"fail_reason": str(exc)}
370404
raise
371405

372406
finally:
@@ -378,6 +412,135 @@ def wrapper(
378412
).raise_for_status()
379413
except Exception as e:
380414
# lambda succeeds if we're unable to update task data
381-
CORVA_LOGGER.warning(f'Could not update task data. Details: {str(e)}.')
415+
CORVA_LOGGER.warning(f"Could not update task data. Details: {str(e)}.")
382416

383417
return wrapper
418+
419+
420+
def partial_rerun_merge(
421+
func: Optional[
422+
Callable[[PartialRerunMergeEvent, Api, UserRedisSdk, UserRedisSdk], Any]
423+
] = None,
424+
*,
425+
handler: Optional[logging.Handler] = None,
426+
) -> Callable:
427+
"""Runs partial merge app.
428+
429+
Arguments:
430+
handler: logging handler to include in Corva logger.
431+
"""
432+
433+
if func is None:
434+
return functools.partial(partial_rerun_merge, handler=handler)
435+
436+
@functools.wraps(func)
437+
def wrapper(
438+
event: RawPartialRerunMergeEvent,
439+
api_key: str,
440+
aws_request_id: str,
441+
logging_ctx: LoggingContext,
442+
redis_client: redis.Redis,
443+
) -> Any:
444+
445+
logging_ctx.asset_id = event.data.asset_id
446+
logging_ctx.app_connection_id = event.data.app_connection_id
447+
448+
api = Api(
449+
api_url=SETTINGS.API_ROOT_URL,
450+
data_api_url=SETTINGS.DATA_API_ROOT_URL,
451+
api_key=api_key,
452+
app_key=SETTINGS.APP_KEY,
453+
)
454+
455+
asset_cache_hash_name = get_cache_key(
456+
provider=SETTINGS.PROVIDER,
457+
asset_id=event.data.asset_id,
458+
app_stream_id=event.data.app_stream_id,
459+
app_key=SETTINGS.APP_KEY,
460+
app_connection_id=event.data.app_connection_id,
461+
)
462+
rerun_asset_cache_hash_name = get_cache_key(
463+
provider=SETTINGS.PROVIDER,
464+
asset_id=event.data.rerun_asset_id,
465+
app_stream_id=event.data.rerun_app_stream_id,
466+
app_key=SETTINGS.APP_KEY,
467+
app_connection_id=event.data.rerun_app_connection_id,
468+
)
469+
internal_cache_hash_name = get_cache_key(
470+
provider=SETTINGS.PROVIDER,
471+
asset_id=event.data.rerun_asset_id,
472+
app_stream_id=event.data.rerun_app_stream_id,
473+
app_key=SETTINGS.APP_KEY,
474+
app_connection_id=event.data.app_connection_id,
475+
)
476+
477+
asset_cache = UserRedisSdk(
478+
hash_name=asset_cache_hash_name,
479+
redis_dsn=SETTINGS.CACHE_URL,
480+
redis_client=redis_client,
481+
)
482+
rerun_asset_cache = UserRedisSdk(
483+
hash_name=rerun_asset_cache_hash_name,
484+
redis_dsn=SETTINGS.CACHE_URL,
485+
redis_client=redis_client,
486+
)
487+
app_event = PartialRerunMergeEvent(
488+
**event.data.dict(), event_type=event.event_type
489+
)
490+
491+
with LoggingContext(
492+
aws_request_id=aws_request_id,
493+
asset_id=event.data.asset_id,
494+
app_connection_id=event.data.app_connection_id,
495+
handler=CorvaLoggerHandler(
496+
max_message_size=SETTINGS.LOG_THRESHOLD_MESSAGE_SIZE,
497+
max_message_count=SETTINGS.LOG_THRESHOLD_MESSAGE_COUNT,
498+
logger=CORVA_LOGGER,
499+
placeholder=" ...",
500+
),
501+
user_handler=handler,
502+
logger=CORVA_LOGGER,
503+
):
504+
result = service.run_app(
505+
has_secrets=event.has_secrets,
506+
app_key=SETTINGS.APP_KEY,
507+
api_sdk=CachingApiSdk(
508+
api_sdk=CorvaApiSdk(api_adapter=api),
509+
ttl=SETTINGS.SECRETS_CACHE_TTL,
510+
),
511+
cache_sdk=InternalRedisSdk(
512+
hash_name=internal_cache_hash_name, redis_client=redis_client
513+
),
514+
app=functools.partial(
515+
cast(
516+
Callable[
517+
[PartialRerunMergeEvent, Api, UserRedisSdk, UserRedisSdk],
518+
Any,
519+
],
520+
func,
521+
),
522+
app_event,
523+
api,
524+
asset_cache,
525+
rerun_asset_cache,
526+
),
527+
)
528+
529+
return result
530+
531+
HANDLERS[RawPartialRerunMergeEvent] = wrapper
532+
return wrapper
533+
534+
535+
def _get_custom_event_type_by_raw_aws_event(
536+
aws_event: Any,
537+
) -> Union[Tuple[Type[RawBaseEvent], Callable], Tuple[None, None]]:
538+
events = None
539+
# Here we do not know what schema will future custom events have,
540+
# so trying to parse all registered custom types.
541+
for event_type, handler in HANDLERS.items():
542+
with contextlib.suppress(pydantic.ValidationError):
543+
events = event_type.from_raw_event(aws_event)
544+
if events:
545+
return event_type, handler
546+
return None, None

src/corva/models/merge/__init__.py

Whitespace-only changes.

0 commit comments

Comments
 (0)