diff --git a/ingestify/domain/models/ingestion/ingestion_job.py b/ingestify/domain/models/ingestion/ingestion_job.py index 959480e..6b3b0b5 100644 --- a/ingestify/domain/models/ingestion/ingestion_job.py +++ b/ingestify/domain/models/ingestion/ingestion_job.py @@ -129,7 +129,6 @@ def run(self): with TaskSummary.update( self.task_id, dataset_identifier=dataset_identifier ) as task_summary: - files = { file_id: task_summary.record_load_file( lambda: load_file(file_resource, dataset=self.dataset), @@ -138,6 +137,8 @@ def run(self): for file_id, file_resource in self.dataset_resource.files.items() } + self.dataset_resource.run_post_load_files(files) + try: revision = self.store.update_dataset( dataset=self.dataset, @@ -181,6 +182,9 @@ def run(self): ) for file_id, file_resource in self.dataset_resource.files.items() } + + self.dataset_resource.run_post_load_files(files) + try: revision = self.store.create_dataset( dataset_type=self.dataset_resource.dataset_type, diff --git a/ingestify/domain/models/resources/dataset_resource.py b/ingestify/domain/models/resources/dataset_resource.py index 7473bf5..9b00f72 100644 --- a/ingestify/domain/models/resources/dataset_resource.py +++ b/ingestify/domain/models/resources/dataset_resource.py @@ -1,5 +1,5 @@ from datetime import datetime -from typing import Optional, Callable, Any, Protocol, TYPE_CHECKING # noqa +from typing import Optional, Callable, Any, Protocol, TYPE_CHECKING, Dict # noqa from pydantic import Field from ingestify.domain.models.base import BaseModel @@ -50,6 +50,18 @@ class DatasetResource(BaseModel): metadata: dict = Field(default_factory=dict) state: DatasetState = Field(default_factory=lambda: DatasetState.COMPLETE) files: dict[str, FileResource] = Field(default_factory=dict) + post_load_files: Optional[ + Callable[["DatasetResource", Dict[str, DraftFile]], None] + ] = None + + def run_post_load_files(self, files: Dict[str, DraftFile]): + """Hook to modify dataset attributes based on loaded file content. + + Useful for setting state based on file content, e.g., keep state=SCHEDULED + when files contain '{}', change to COMPLETE when they contain actual data. + """ + if self.post_load_files: + self.post_load_files(self, files) def add_file( self, diff --git a/ingestify/infra/fetch/http.py b/ingestify/infra/fetch/http.py index bfee22a..7ec418f 100644 --- a/ingestify/infra/fetch/http.py +++ b/ingestify/infra/fetch/http.py @@ -58,9 +58,9 @@ def retrieve_http( ) # else: # print(f"{current_file.modified_at=} {last_modified=}") - # headers["if-modified-since"] = ( - # format_datetime(current_file.modified_at, usegmt=True), - # ) + headers["if-modified-since"] = ( + format_datetime(current_file.modified_at, usegmt=True), + ) headers["if-none-match"] = current_file.tag http_kwargs = {} diff --git a/ingestify/tests/test_engine.py b/ingestify/tests/test_engine.py index e26feda..23b30b7 100644 --- a/ingestify/tests/test_engine.py +++ b/ingestify/tests/test_engine.py @@ -10,6 +10,7 @@ DataSpecVersionCollection, DraftFile, Dataset, + DatasetState, DatasetCreated, ) from ingestify.domain.models.dataset.collection_metadata import ( @@ -435,3 +436,66 @@ def test_dev_engine(): datasets = engine.store.get_dataset_collection() assert len(datasets) == 1 assert datasets.first().name == "Test Dataset" + + +def post_load_hook(dataset_resource: DatasetResource, files: dict[str, DraftFile]): + # Change state to COMPLETE if file content is not '{}' + for file in files.values(): + if file.size > 2: + dataset_resource.state = DatasetState.COMPLETE + break + + +def file_loader_with_hook(file_resource, current_file): + # First run: empty JSON, second run: actual data + content = "{}" if not current_file else '{"data": "value"}' + return DraftFile.from_input(content, data_feed_key="file1") + + +class SourceWithHook(Source): + provider = "test" + + def find_datasets( + self, + dataset_type: str, + data_spec_versions: DataSpecVersionCollection, + dataset_collection_metadata, + competition_id, + season_id, + **kwargs, + ): + last_modified = datetime.now(pytz.utc) + + yield ( + DatasetResource( + dataset_resource_id=dict( + competition_id=competition_id, season_id=season_id, match_id=1 + ), + provider="test", + dataset_type="match", + name="Test Dataset", + state=DatasetState.SCHEDULED, + post_load_files=post_load_hook, + ).add_file( + last_modified=last_modified, + data_feed_key="file1", + data_spec_version="v1", + file_loader=file_loader_with_hook, + ) + ) + + +def test_post_load_files_hook(config_file): + """Test that post_load_files hook changes state from SCHEDULED to COMPLETE when content is not empty.""" + engine = get_engine(config_file, "main") + add_ingestion_plan(engine, SourceWithHook("test"), competition_id=1, season_id=2) + + # First run: file contains '{}', state should remain SCHEDULED + engine.load() + dataset1 = engine.store.get_dataset_collection().first() + assert dataset1.state == DatasetState.SCHEDULED + + # Second run: file contains actual data, state should change to COMPLETE + engine.load() + dataset2 = engine.store.get_dataset_collection().first() + assert dataset2.state == DatasetState.COMPLETE