diff --git a/dist/ingestify-0.0.1-py3.9.egg b/dist/ingestify-0.0.1-py3.9.egg new file mode 100644 index 0000000..3e109f2 Binary files /dev/null and b/dist/ingestify-0.0.1-py3.9.egg differ diff --git a/dist/ingestify-0.0.5-py3.10.egg b/dist/ingestify-0.0.5-py3.10.egg new file mode 100644 index 0000000..e02a543 Binary files /dev/null and b/dist/ingestify-0.0.5-py3.10.egg differ diff --git a/dist/ingestify-0.0.5-py3.9.egg b/dist/ingestify-0.0.5-py3.9.egg new file mode 100644 index 0000000..31db817 Binary files /dev/null and b/dist/ingestify-0.0.5-py3.9.egg differ diff --git a/get_pokemons.duckdb b/get_pokemons.duckdb new file mode 100644 index 0000000..01b5838 Binary files /dev/null and b/get_pokemons.duckdb differ diff --git a/ingestify.egg-info/PKG-INFO b/ingestify.egg-info/PKG-INFO new file mode 100644 index 0000000..0c76bb9 --- /dev/null +++ b/ingestify.egg-info/PKG-INFO @@ -0,0 +1,265 @@ +Metadata-Version: 2.4 +Name: ingestify +Version: 0.9.1 +Summary: Data Ingestion Framework +Author: Koen Vossen +Author-email: info@koenvossen.nl +License: AGPL +Description-Content-Type: text/markdown +Requires-Dist: requests<3,>=2.0.0 +Requires-Dist: SQLAlchemy<3,>=2 +Requires-Dist: click>=8 +Requires-Dist: python-dotenv +Requires-Dist: pyaml_env +Requires-Dist: boto3 +Requires-Dist: pydantic>=2.0.0 +Provides-Extra: test +Requires-Dist: pytest<7,>=6.2.5; extra == "test" +Requires-Dist: pytz; extra == "test" +Dynamic: author +Dynamic: author-email +Dynamic: description +Dynamic: description-content-type +Dynamic: license +Dynamic: provides-extra +Dynamic: requires-dist +Dynamic: summary + +# Ingestify + +_Ingest everything – JSON, CSV, tracking ZIPs, even MP4 – keep it version‑safe, sync only what changed, and analyse while you ingest._ + +--- + +## Why Ingestify? + +Football‐data APIs are often **slow**, **rate‑limited** or just **down**. One parsing bug and you’re forced to pull tens of gigabytes again. +Ingestify fixes that by building **your own data lake** of untouched provider files and fetching only what’s new: + +* **Own your lake** – The first time you ask for a match, Ingestify downloads the original files (metadata, line‑ups, events, tracking, video) and stores them untouched in local disk, S3, GCS… every later query hits *your* lake, not the provider. +* **Never re‑fetch the world** – A file‑level checksum / timestamp check moves only changed bundles across the wire. +* **Atomic, complete packages** – A *Dataset* is all‑or‑nothing: + + | Dataset type | Always contains | + |--------------|-----------------| + | **Match Dataset** | metadata + line‑ups + events | + | **Tracking Dataset** | metadata + raw tracking frames | + + You never analyse events v2 with lineups v1, or yesterday’s first half with today’s second half. +* **Query while ingesting** – Datasets stream out of the engine the moment their files land, so notebooks or downstream services can start before the full season is in. + +--- + +## The Ingestify Workflow + + +--- + +## What you gain + +### For football‑analytics practitioners + +| Pain | Ingestify fix | +|------|---------------| +| API slowness / downtime | One request → lake; retries and parallelism happen behind the scenes. | +| Full re‑ingest after a bug | File‑level deltas mean you fetch only the corrected bundles. | +| Partial / drifting data | Dataset is atomic, versioned, and validated before it becomes visible. | +| Waiting hours for a season to sync | Stream each Dataset as soon as it lands; analyse while you ingest. | +| Boilerplate joins | `engine.load_dataset_with_kloppy(dataset)` → analysis‑ready object. | + +### For software engineers + +| Need | How Ingestify helps | +|------|---------------------| +| **Domain‑Driven Design** | `Dataset`, `Revision`, `Selector` plus rich domain events read like the problem space. | +| **Event‑driven integrations** | Subscribe to `RevisionAdded` and push to Kafka, AWS Lambda, Airflow… | +| **Pluggable everything** | Swap `Source`, `FetchPolicy`, `DatasetStore` subclasses to add providers, change delta logic, or move storage back‑ends. | +| **Safety & speed** | Multiprocessing downloader with temp‑dir commits – no half‑written matches; near‑linear I/O speed‑ups. | +| **Any file type** | JSON, CSV, MP4, proprietary binaries – stored verbatim so you parse / transcode later under version control. | + +--- + +## Quick start + +```bash +pip install ingestify # or: pip install git+https://github.com/PySport/ingestify.git +``` + +### Developing a new Source + +When developing a new `Source`, use the `debug_source()` helper for rapid iteration: + +```python +from ingestify import Source, debug_source + +class MyCustomSource(Source): + provider = "my_provider" + + def __init__(self, name: str, api_key: str): + super().__init__(name) + self.api_key = api_key + + def find_datasets(self, dataset_type, data_spec_versions, **kwargs): + # Your source implementation + ... + +# Quick debug - runs full ingestion with temp storage +if __name__ == "__main__": + source = MyCustomSource(name="test", api_key="...") + + debug_source( + source, + dataset_type="match", + data_spec_versions={"events": "v1"}, + ) +``` + +The `debug_source()` helper: +- ✅ Creates an ephemeral dev engine with temp storage +- ✅ Configures logging automatically +- ✅ Runs the full ingestion cycle +- ✅ Shows storage location and results + +Perfect for testing your source before adding it to production config! + +### Minimal `config.yaml` + +```yaml +main: + metadata_url: sqlite:///database/catalog.db # where revision metadata lives + file_url: file://database/files/ # where raw files live + default_bucket: main + +sources: + statsbomb: + type: ingestify.statsbomb_github # open‑data provider + +ingestion_plans: + - source: statsbomb + dataset_type: match + # selectors can narrow the scope + # selectors: + # - competition_id: 11 + # season_id: [90] +``` + +### First ingest + +When you configured event subscribers, all domain events are dispatched to the subscriber. Publishing the events to +Kafka, RabbitMQ or any other system becomes trivial. + +```bash +mkdir -p database +pip install kloppy + +ingestify run # fills your data lake +``` + +--- + +## Using the data + +By default, Ingestify will search in your DatasetStore when you request data. You can pass several filters to only fetch what you need. + +```python +from ingestify.main import get_engine + +engine = get_engine("config.yaml") + +for dataset in engine.iter_datasets( + dataset_state="complete", + provider="statsbomb", + dataset_type="match", + competition_id=11, + season_id=90): + df = ( + engine + .load_dataset_with_kloppy(dataset) + .to_df(engine="polars") + ) + df.write_parquet(f"out/{dataset.identifier['match_id']}.parquet") +``` + +#### Auto Ingestion + +When you don't want to use event driven architecture but just want to work with the latest data, ingestify got you covered. With the `auto_ingest` option, ingestify syncs the data in the background when you ask for the data. + + +```python +from ingestify.main import get_engine + +engine = get_engine("config.yaml") + +for dataset in engine.iter_datasets( + # When set to True it will first do a full sync and then start yielding datasets + auto_ingest=True, + + # With streaming enabled all Datasets are yielded when they are up-to-date (not changed, or refetched) + # auto_ingest={"streaming": True} + + dataset_state="complete", + provider="statsbomb", + dataset_type="match", + competition_id=11, + season_id=90): + df = ( + engine + .load_dataset_with_kloppy(dataset) + .to_df(engine="polars") + ) + df.write_parquet(f"out/{dataset.identifier['match_id']}.parquet") +``` + +#### Open data + +Ingestify has build-in support for StatsBomb Open Data (more to come). + +```shell +mkdir database_open_data +pip install kloppy +``` + +```python +import logging, sys + +from ingestify.main import get_engine + +logging.basicConfig( + level=logging.INFO, + format="%(asctime)s [%(levelname)s] %(name)s: %(message)s", + stream=sys.stderr, +) + +engine = get_engine( + metadata_url="sqlite:///database_open_data/catalog.db", + file_url="file://database_open_data/files/" +) + +dataset_iter = engine.iter_datasets( + # This will tell ingestify to look for an Open Data provider + auto_ingest={"use_open_data": True, "streaming": True}, + + provider="statsbomb", + dataset_type="match", + competition_id=43, # "FIFA World Cup" + #season_id=281 +) + +for dataset in dataset_iter: + kloppy_dataset = engine.load_dataset_with_kloppy(dataset) + logging.info(f"Loaded {kloppy_dataset}") +``` + + +--- + +## Roadmap + +* Workflow orchestration helpers (Airflow, Dagster, Prefect) +* Built‑in Kafka / Kinesis event emitters +* Streaming data ingestion +* Data quality hooks (Great Expectations) + +--- + +**Stop refetching the world. Own your data lake, keep it version‑safe, and analyse football faster with Ingestify.** diff --git a/ingestify.egg-info/SOURCES.txt b/ingestify.egg-info/SOURCES.txt new file mode 100644 index 0000000..a112a0c --- /dev/null +++ b/ingestify.egg-info/SOURCES.txt @@ -0,0 +1,93 @@ +README.md +setup.py +ingestify/__init__.py +ingestify/cmdline.py +ingestify/exceptions.py +ingestify/main.py +ingestify/server.py +ingestify/source_base.py +ingestify/utils.py +ingestify.egg-info/PKG-INFO +ingestify.egg-info/SOURCES.txt +ingestify.egg-info/dependency_links.txt +ingestify.egg-info/entry_points.txt +ingestify.egg-info/requires.txt +ingestify.egg-info/top_level.txt +ingestify/application/__init__.py +ingestify/application/dataset_store.py +ingestify/application/ingestion_engine.py +ingestify/application/loader.py +ingestify/application/secrets_manager.py +ingestify/domain/__init__.py +ingestify/domain/models/__init__.py +ingestify/domain/models/base.py +ingestify/domain/models/data_spec_version_collection.py +ingestify/domain/models/fetch_policy.py +ingestify/domain/models/sink.py +ingestify/domain/models/source.py +ingestify/domain/models/timing.py +ingestify/domain/models/dataset/__init__.py +ingestify/domain/models/dataset/collection.py +ingestify/domain/models/dataset/collection_metadata.py +ingestify/domain/models/dataset/dataset.py +ingestify/domain/models/dataset/dataset_repository.py +ingestify/domain/models/dataset/dataset_state.py +ingestify/domain/models/dataset/events.py +ingestify/domain/models/dataset/file.py +ingestify/domain/models/dataset/file_collection.py +ingestify/domain/models/dataset/file_repository.py +ingestify/domain/models/dataset/identifier.py +ingestify/domain/models/dataset/revision.py +ingestify/domain/models/dataset/selector.py +ingestify/domain/models/event/__init__.py +ingestify/domain/models/event/_old_event.py +ingestify/domain/models/event/dispatcher.py +ingestify/domain/models/event/domain_event.py +ingestify/domain/models/event/event_bus.py +ingestify/domain/models/event/publisher.py +ingestify/domain/models/event/subscriber.py +ingestify/domain/models/ingestion/__init__.py +ingestify/domain/models/ingestion/ingestion_job.py +ingestify/domain/models/ingestion/ingestion_job_summary.py +ingestify/domain/models/ingestion/ingestion_plan.py +ingestify/domain/models/resources/__init__.py +ingestify/domain/models/resources/dataset_resource.py +ingestify/domain/models/task/__init__.py +ingestify/domain/models/task/set.py +ingestify/domain/models/task/task.py +ingestify/domain/models/task/task_summary.py +ingestify/domain/services/__init__.py +ingestify/domain/services/identifier_key_transformer.py +ingestify/domain/services/transformers/__init__.py +ingestify/domain/services/transformers/kloppy_to_pandas.py +ingestify/examples/__init__.py +ingestify/examples/dynamodb_example.py +ingestify/infra/__init__.py +ingestify/infra/fetch/__init__.py +ingestify/infra/fetch/http.py +ingestify/infra/serialization/__init__.py +ingestify/infra/sink/__init__.py +ingestify/infra/sink/postgresql.py +ingestify/infra/source/__init__.py +ingestify/infra/source/statsbomb_github.py +ingestify/infra/source/statsbomb/__init__.py +ingestify/infra/source/statsbomb/base.py +ingestify/infra/source/statsbomb/match.py +ingestify/infra/store/__init__.py +ingestify/infra/store/dataset/__init__.py +ingestify/infra/store/dataset/sqlalchemy/__init__.py +ingestify/infra/store/dataset/sqlalchemy/repository.py +ingestify/infra/store/dataset/sqlalchemy/tables.py +ingestify/infra/store/file/__init__.py +ingestify/infra/store/file/dummy_file_repository.py +ingestify/infra/store/file/local_file_repository.py +ingestify/infra/store/file/s3_file_repository.py +ingestify/tests/__init__.py +ingestify/tests/conftest.py +ingestify/tests/test_auto_ingest.py +ingestify/tests/test_engine.py +ingestify/tests/test_events.py +ingestify/tests/test_file_cache.py +ingestify/tests/test_pagination.py +ingestify/tests/test_store_version.py +ingestify/tests/test_table_prefix.py \ No newline at end of file diff --git a/ingestify.egg-info/dependency_links.txt b/ingestify.egg-info/dependency_links.txt new file mode 100644 index 0000000..8b13789 --- /dev/null +++ b/ingestify.egg-info/dependency_links.txt @@ -0,0 +1 @@ + diff --git a/ingestify.egg-info/entry_points.txt b/ingestify.egg-info/entry_points.txt new file mode 100644 index 0000000..14a5fd2 --- /dev/null +++ b/ingestify.egg-info/entry_points.txt @@ -0,0 +1,2 @@ +[console_scripts] +ingestify = ingestify.cmdline:main diff --git a/ingestify.egg-info/requires.txt b/ingestify.egg-info/requires.txt new file mode 100644 index 0000000..6f99266 --- /dev/null +++ b/ingestify.egg-info/requires.txt @@ -0,0 +1,11 @@ +requests<3,>=2.0.0 +SQLAlchemy<3,>=2 +click>=8 +python-dotenv +pyaml_env +boto3 +pydantic>=2.0.0 + +[test] +pytest<7,>=6.2.5 +pytz diff --git a/ingestify.egg-info/top_level.txt b/ingestify.egg-info/top_level.txt new file mode 100644 index 0000000..3e8f905 --- /dev/null +++ b/ingestify.egg-info/top_level.txt @@ -0,0 +1 @@ +ingestify diff --git a/ingestify/application/dataset_store.py b/ingestify/application/dataset_store.py index 55922cf..66a8fdf 100644 --- a/ingestify/application/dataset_store.py +++ b/ingestify/application/dataset_store.py @@ -383,6 +383,7 @@ def add_revision( files: Dict[str, DraftFile], revision_source: RevisionSource, description: str = "Update", + force_save: bool = False, ): """ Create new revision first, so FileRepository can use @@ -392,7 +393,7 @@ def add_revision( created_at = utcnow() persisted_files_ = self._persist_files(dataset, revision_id, files) - if persisted_files_: + if persisted_files_ or force_save: # It can happen an API tells us data is changed, but it was not changed. In this case # we decide to ignore it. # Make sure there are files changed before creating a new revision @@ -487,7 +488,9 @@ def create_dataset( updated_at=now, last_modified_at=None, # Not known at this moment ) - revision = self.add_revision(dataset, files, revision_source, description) + revision = self.add_revision( + dataset, files, revision_source, description, force_save=True + ) self.dispatch(DatasetCreated(dataset=dataset)) return revision diff --git a/ingestify/domain/models/dataset/revision.py b/ingestify/domain/models/dataset/revision.py index c43b16f..29dbd0c 100644 --- a/ingestify/domain/models/dataset/revision.py +++ b/ingestify/domain/models/dataset/revision.py @@ -38,7 +38,9 @@ class Revision(BaseModel): @property def last_modified_at(self): - return max(file.modified_at for file in self.modified_files) + if self.modified_files: + return max(file.modified_at for file in self.modified_files) + return None @property def modified_files_map(self) -> Dict[str, File]: diff --git a/ingestify/domain/models/task/task_summary.py b/ingestify/domain/models/task/task_summary.py index 6c53dc6..e2f54bf 100644 --- a/ingestify/domain/models/task/task_summary.py +++ b/ingestify/domain/models/task/task_summary.py @@ -86,9 +86,7 @@ def set_stats_from_revision(self, revision: Optional["Revision"]): if revision: self.persisted_file_count = len(revision.modified_files) self.bytes_retrieved = sum(file.size for file in revision.modified_files) - self.last_modified = max( - file.modified_at for file in revision.modified_files - ) + self.last_modified = revision.last_modified_at else: self.state = TaskState.FINISHED_IGNORED diff --git a/ingestify/infra/store/dataset/sqlalchemy/repository.py b/ingestify/infra/store/dataset/sqlalchemy/repository.py index c816495..1438297 100644 --- a/ingestify/infra/store/dataset/sqlalchemy/repository.py +++ b/ingestify/infra/store/dataset/sqlalchemy/repository.py @@ -375,7 +375,9 @@ def _load_datasets(self, dataset_ids: list[str]) -> list[Dataset]: dataset_ids_cte.c.dataset_id == self.revision_table.c.dataset_id, ) ) - .order_by(self.revision_table.c.dataset_id) + .order_by( + self.revision_table.c.dataset_id, self.revision_table.c.revision_id + ) ) for dataset_id, revisions in itertools.groupby( @@ -560,22 +562,22 @@ def destroy(self, dataset: Dataset): try: # Delete modified files related to the dataset connection.execute( - file_table.delete().where( - file_table.c.dataset_id == dataset.dataset_id + self.file_table.delete().where( + self.file_table.c.dataset_id == dataset.dataset_id ) ) # Delete revisions related to the dataset connection.execute( - revision_table.delete().where( - revision_table.c.dataset_id == dataset.dataset_id + self.revision_table.delete().where( + self.revision_table.c.dataset_id == dataset.dataset_id ) ) # Delete the dataset itself connection.execute( - dataset_table.delete().where( - dataset_table.c.dataset_id == dataset.dataset_id + self.dataset_table.delete().where( + self.dataset_table.c.dataset_id == dataset.dataset_id ) ) diff --git a/ingestify/tests/test_engine.py b/ingestify/tests/test_engine.py index 23b30b7..97da3c6 100644 --- a/ingestify/tests/test_engine.py +++ b/ingestify/tests/test_engine.py @@ -251,6 +251,28 @@ def find_datasets( raise Exception("some failure") +class NoFilesSource(Source): + provider = "fake" + + def find_datasets( + self, + dataset_type: str, + data_spec_versions: DataSpecVersionCollection, + dataset_collection_metadata: DatasetCollectionMetadata, + competition_id, + season_id, + **kwargs, + ): + yield DatasetResource( + dataset_resource_id=dict( + competition_id=competition_id, season_id=season_id, match_id=1 + ), + provider="fake", + dataset_type="match", + name="Dataset Without Files", + ) + + def test_engine(config_file): engine = get_engine(config_file, "main") @@ -499,3 +521,17 @@ def test_post_load_files_hook(config_file): engine.load() dataset2 = engine.store.get_dataset_collection().first() assert dataset2.state == DatasetState.COMPLETE + + +def test_force_save_creates_revision(config_file): + """Test that datasets get a revision even when no files are persisted.""" + engine = get_engine(config_file, "main") + add_ingestion_plan( + engine, NoFilesSource("fake-source"), competition_id=1, season_id=2 + ) + + engine.load() + dataset = engine.store.get_dataset_collection().first() + + assert len(dataset.revisions) == 1 + assert len(dataset.current_revision.modified_files) == 0