Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 5 additions & 1 deletion ingestify/domain/models/ingestion/ingestion_job.py
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,6 @@ def run(self):
with TaskSummary.update(
self.task_id, dataset_identifier=dataset_identifier
) as task_summary:

files = {
file_id: task_summary.record_load_file(
lambda: load_file(file_resource, dataset=self.dataset),
Expand All @@ -138,6 +137,8 @@ def run(self):
for file_id, file_resource in self.dataset_resource.files.items()
}

self.dataset_resource.run_post_load_files(files)

try:
revision = self.store.update_dataset(
dataset=self.dataset,
Expand Down Expand Up @@ -181,6 +182,9 @@ def run(self):
)
for file_id, file_resource in self.dataset_resource.files.items()
}

self.dataset_resource.run_post_load_files(files)

try:
revision = self.store.create_dataset(
dataset_type=self.dataset_resource.dataset_type,
Expand Down
14 changes: 13 additions & 1 deletion ingestify/domain/models/resources/dataset_resource.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
from datetime import datetime
from typing import Optional, Callable, Any, Protocol, TYPE_CHECKING # noqa
from typing import Optional, Callable, Any, Protocol, TYPE_CHECKING, Dict # noqa
from pydantic import Field

from ingestify.domain.models.base import BaseModel
Expand Down Expand Up @@ -50,6 +50,18 @@ class DatasetResource(BaseModel):
metadata: dict = Field(default_factory=dict)
state: DatasetState = Field(default_factory=lambda: DatasetState.COMPLETE)
files: dict[str, FileResource] = Field(default_factory=dict)
post_load_files: Optional[
Callable[["DatasetResource", Dict[str, DraftFile]], None]
] = None

def run_post_load_files(self, files: Dict[str, DraftFile]):
"""Hook to modify dataset attributes based on loaded file content.

Useful for setting state based on file content, e.g., keep state=SCHEDULED
when files contain '{}', change to COMPLETE when they contain actual data.
"""
if self.post_load_files:
self.post_load_files(self, files)

def add_file(
self,
Expand Down
6 changes: 3 additions & 3 deletions ingestify/infra/fetch/http.py
Original file line number Diff line number Diff line change
Expand Up @@ -58,9 +58,9 @@ def retrieve_http(
)
# else:
# print(f"{current_file.modified_at=} {last_modified=}")
# headers["if-modified-since"] = (
# format_datetime(current_file.modified_at, usegmt=True),
# )
headers["if-modified-since"] = (
format_datetime(current_file.modified_at, usegmt=True),
)
headers["if-none-match"] = current_file.tag

http_kwargs = {}
Expand Down
64 changes: 64 additions & 0 deletions ingestify/tests/test_engine.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
DataSpecVersionCollection,
DraftFile,
Dataset,
DatasetState,
DatasetCreated,
)
from ingestify.domain.models.dataset.collection_metadata import (
Expand Down Expand Up @@ -435,3 +436,66 @@ def test_dev_engine():
datasets = engine.store.get_dataset_collection()
assert len(datasets) == 1
assert datasets.first().name == "Test Dataset"


def post_load_hook(dataset_resource: DatasetResource, files: dict[str, DraftFile]):
# Change state to COMPLETE if file content is not '{}'
for file in files.values():
if file.size > 2:
dataset_resource.state = DatasetState.COMPLETE
break


def file_loader_with_hook(file_resource, current_file):
# First run: empty JSON, second run: actual data
content = "{}" if not current_file else '{"data": "value"}'
return DraftFile.from_input(content, data_feed_key="file1")


class SourceWithHook(Source):
provider = "test"

def find_datasets(
self,
dataset_type: str,
data_spec_versions: DataSpecVersionCollection,
dataset_collection_metadata,
competition_id,
season_id,
**kwargs,
):
last_modified = datetime.now(pytz.utc)

yield (
DatasetResource(
dataset_resource_id=dict(
competition_id=competition_id, season_id=season_id, match_id=1
),
provider="test",
dataset_type="match",
name="Test Dataset",
state=DatasetState.SCHEDULED,
post_load_files=post_load_hook,
).add_file(
last_modified=last_modified,
data_feed_key="file1",
data_spec_version="v1",
file_loader=file_loader_with_hook,
)
)


def test_post_load_files_hook(config_file):
"""Test that post_load_files hook changes state from SCHEDULED to COMPLETE when content is not empty."""
engine = get_engine(config_file, "main")
add_ingestion_plan(engine, SourceWithHook("test"), competition_id=1, season_id=2)

# First run: file contains '{}', state should remain SCHEDULED
engine.load()
dataset1 = engine.store.get_dataset_collection().first()
assert dataset1.state == DatasetState.SCHEDULED

# Second run: file contains actual data, state should change to COMPLETE
engine.load()
dataset2 = engine.store.get_dataset_collection().first()
assert dataset2.state == DatasetState.COMPLETE