Skip to content

Commit 32d3a2b

Browse files
committed
Addressed code review comments
1 parent 1df0c6d commit 32d3a2b

File tree

3 files changed

+36
-16
lines changed

3 files changed

+36
-16
lines changed

src/corva/models/stream/raw.py

Lines changed: 28 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@ class RawBaseRecord(CorvaBaseEvent, abc.ABC):
1919
company_id: int
2020
collection: str
2121

22-
data: Optional[dict] = {}
22+
data: dict = {}
2323
metadata: dict = {}
2424

2525
@property
@@ -73,7 +73,7 @@ class RawMetadata(CorvaBaseEvent):
7373
RecordsDepth = Sequence[RawDepthRecord]
7474
else:
7575
RecordsBase = pydantic.conlist(RawBaseRecord, min_items=1)
76-
RecordsTime = pydantic.conlist(RawTimeRecord, min_items=1)
76+
RecordsTime = pydantic.conlist(RawTimeRecord, min_items=0)
7777
RecordsDepth = pydantic.conlist(RawDepthRecord, min_items=1)
7878

7979

@@ -105,7 +105,10 @@ def is_completed(self) -> bool:
105105
There can only be 1 completed record always located at the end of the list.
106106
"""
107107

108-
return self.records[-1].collection == 'wits.completed'
108+
if not self.records:
109+
return False
110+
111+
return self.records[-1].collection == "wits.completed"
109112

110113
@property
111114
def max_record_value(self) -> Union[int, float]:
@@ -145,7 +148,7 @@ def filter_records(
145148
) -> List[RawBaseRecord]:
146149
new_records = copy.deepcopy(self.records)
147150

148-
if self.is_completed:
151+
if self.is_completed and new_records:
149152
new_records = new_records[:-1] # remove "completed" record
150153

151154
if old_max_record_value is None:
@@ -165,33 +168,48 @@ def filter_records(
165168
def set_asset_id(cls, values: dict) -> dict:
166169
"""Calculates asset_id field."""
167170

168-
records: List[RawBaseRecord] = values['records']
171+
records: List[RawBaseRecord] = values["records"]
169172

170-
values["asset_id"] = int(records[0].asset_id)
173+
if records:
174+
values["asset_id"] = int(records[0].asset_id)
171175

172176
return values
173177

174178
@pydantic.root_validator(pre=False, skip_on_failure=True)
175179
def set_company_id(cls, values: dict) -> dict:
176180
"""Calculates company_id field."""
177181

178-
records: List[RawBaseRecord] = values['records']
182+
records: List[RawBaseRecord] = values["records"]
179183

180-
values["company_id"] = int(records[0].company_id)
184+
if records:
185+
values["company_id"] = int(records[0].company_id)
181186

182187
return values
183188

189+
@pydantic.validator("records", pre=True)
190+
def validate_records(cls, v):
191+
if isinstance(v, List):
192+
return [
193+
record
194+
for record in v
195+
if (
196+
(isinstance(record, dict) and record.get("data") is not None)
197+
or (hasattr(record, "data") and record.data is not None)
198+
)
199+
]
200+
return v
201+
184202

185203
class RawStreamTimeEvent(RawStreamEvent):
186204
records: RecordsTime
187205
rerun: Optional[RerunTime] = None
188-
_max_record_value_cache_key: ClassVar[str] = 'last_processed_timestamp'
206+
_max_record_value_cache_key: ClassVar[str] = "last_processed_timestamp"
189207

190208

191209
class RawStreamDepthEvent(RawStreamEvent):
192210
records: RecordsDepth
193211
rerun: Optional[RerunDepth] = None
194-
_max_record_value_cache_key: ClassVar[str] = 'last_processed_depth'
212+
_max_record_value_cache_key: ClassVar[str] = "last_processed_depth"
195213
log_identifier: str = None # type: ignore
196214

197215
@pydantic.root_validator(pre=False, skip_on_failure=True)

src/corva/models/stream/stream.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@ class StreamTimeRecord(CorvaBaseEvent):
1616
"""
1717

1818
timestamp: int
19-
data: Optional[dict] = {}
19+
data: dict = {}
2020
metadata: dict = {}
2121

2222

tests/unit/test_stream_app.py

Lines changed: 7 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -624,7 +624,10 @@ def test_raw_stream_event_with_none_data_field_returns_expected_result(context):
624624

625625
@stream
626626
def stream_app(event, api, cache):
627-
return event
627+
pytest.fail(
628+
"Stream app call should be skipped "
629+
"because there is no data to build an event"
630+
)
628631

629632
event = [
630633
{
@@ -644,11 +647,10 @@ def stream_app(event, api, cache):
644647
"provider": "corva",
645648
"timestamp": 1688999883,
646649
"version": 1,
647-
}
650+
} # DEVC-627. This record should be filtered out because data is None.
648651
],
649652
}
650653
]
651654

652-
result_event: StreamTimeEvent = stream_app(event, context)[0]
653-
654-
assert result_event is None
655+
_ = stream_app(event, context)[0]
656+
assert True, "App call should be skipped"

0 commit comments

Comments
 (0)