-
Notifications
You must be signed in to change notification settings - Fork 0
Removed dataset creation from pipeline #45
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Removed dataset creation from pipeline #45
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Pull request overview
This pull request modernizes the geospatial data ingestion pipeline by replacing file-based workflows with reading pre-processed Parquet files from blob storage for both FKB and OSM datasets. The changes simplify data loading and improve performance by leveraging pre-processed data.
Changes:
- Replaced raw file processing with direct Parquet file loading from blob storage for FKB and OSM datasets
- Added new utility method
convert_parquet_bytes_to_gdffor converting Parquet bytes with WKB geometry to GeoDataFrames - Updated SQL queries to use standardized field names (
external_id,building_id) and added optional timestamp fields for better data tracking
Reviewed changes
Copilot reviewed 12 out of 12 changed files in this pull request and generated 15 comments.
Show a summary per file
| File | Description |
|---|---|
| src/presentation/entrypoints/release_pipeline.py | Commented out obsolete download and polygon creation steps in the pipeline |
| src/infra/persistence/context/duckdb.py | Added platform-specific Azure transport configuration for Linux compatibility |
| src/infra/persistence/context/azure_blob_storage.py | Removed BOM character from import statement |
| src/infra/infrastructure/services/vector_service.py | Enhanced clipping logic to dynamically handle optional timestamp columns |
| src/infra/infrastructure/services/open_street_map_service.py | Replaced PBF file processing with Parquet file reading from blob storage |
| src/infra/infrastructure/services/fkb_service.py | Replaced Hugging Face download workflow with Parquet file reading from blob storage |
| src/infra/infrastructure/services/conflation_service.py | Standardized SQL queries with external_id and building_id fields and added timestamp columns |
| src/infra/infrastructure/services/bytes_service.py | Implemented new convert_parquet_bytes_to_gdf method for Parquet to GeoDataFrame conversion |
| src/infra/infrastructure/containers.py | Updated dependency injection to provide blob storage and bytes services |
| src/domain/enums/storage_container.py | Added CONTRIBUTION container enum value |
| src/config.py | Improved environment variable loading with find_dotenv() |
| src/application/contracts/bytes_service_interface.py | Added interface definition for convert_parquet_bytes_to_gdf method |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>
Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>
…data-from-contribution-container' of https://github.com/kartAI/doppa-data into refactor/44-remove-data-reading-from-pipeline-and-load-data-from-contribution-container
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Pull request overview
Copilot reviewed 12 out of 12 changed files in this pull request and generated 9 comments.
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| import geopandas as gpd | ||
| from duckdb import DuckDBPyConnection | ||
|
|
Copilot
AI
Jan 20, 2026
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The deprecated import at line 1 is not used anywhere in this file. The commented-out method at lines 39-50 doesn't use the @deprecated decorator. Consider removing this unused import.
| import geopandas as gpd | |
| from duckdb import DuckDBPyConnection |
| # building_polygons = fkb_service.create_building_polygons(gdf=fkb_dataset, crs=EPSGCode.WGS84) | ||
| return [fkb_dataset] |
Copilot
AI
Jan 20, 2026
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Commented-out code should be removed rather than left in the codebase. If this functionality needs to be preserved for reference, it should be documented in version control history. Keeping commented-out code reduces readability and maintainability.
| select_parts = [ | ||
| "external_id", | ||
| "geometry", | ||
| "building_id", | ||
| "* EXCLUDE(external_id, geometry, building_id)" |
Copilot
AI
Jan 20, 2026
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The SELECT clause includes both explicit column references (external_id, geometry, building_id) and then uses EXCLUDE to select all other columns except these same three. This creates redundancy in the query. Consider restructuring to either: 1) Use SELECT * if all columns are needed in their original positions, or 2) Only select the specific columns that need repositioning, followed by * EXCLUDE(...) for the remainder.
| @staticmethod | ||
| @abstractmethod | ||
| def convert_parquet_bytes_to_gdf(data: bytes, epsg_code: EPSGCode) -> gpd.GeoDataFrame: | ||
| """ | ||
| Converts a byte array to a GeoPandas GeoDataFrame. This assumes that the file is a parquet file and that | ||
| there is a geometry column with geometries represented as WKB. | ||
|
|
||
| :param data: Byte array of the parquet file. Often downloaded from blob storage. | ||
| :param epsg_code: EPSG code for the coordinate reference system (CRS). | ||
| :return: GeoDataFrame representation of the byte array. | ||
| :rtype: gpd.GeoDataFrame | ||
| :raises NotImplementedError: This method must be implemented by subclasses. |
Copilot
AI
Jan 20, 2026
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The docstring is missing a raise NotImplementedError statement, which is inconsistent with the other abstract methods in this interface (e.g., line 18). Abstract methods should raise NotImplementedError to ensure proper interface contract enforcement.
| @@ -1,4 +1,6 @@ | |||
| import duckdb | |||
| import platform | |||
Copilot
AI
Jan 20, 2026
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The os module is imported but not used in this file. Only the platform module is needed for the platform check. Consider removing the unused import.
| import platform |
| ) | ||
|
|
Copilot
AI
Jan 20, 2026
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The logger and Config imports are no longer used in the active code - they only appear in the commented-out implementation. If the commented code is removed (as recommended), these imports should also be removed to keep the imports clean.
| from shapely import from_wkb | ||
|
|
||
| from src import Config | ||
| from src.application.common import logger | ||
| from application.contracts import IBlobStorageService | ||
| from domain.enums import StorageContainer | ||
| from src.application.contracts import ( | ||
| IFKBService, IFKBFileService, IZipService, IBytesService | ||
| ) |
Copilot
AI
Jan 20, 2026
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The logger and Config imports (lines 10-11) are no longer used in the active code - they only appear in the commented-out implementation. If the commented code is removed (as recommended), these imports should also be removed to keep the imports clean.
| from shapely import from_wkb | |
| from src import Config | |
| from src.application.common import logger | |
| from application.contracts import IBlobStorageService | |
| from domain.enums import StorageContainer | |
| from src.application.contracts import ( | |
| IFKBService, IFKBFileService, IZipService, IBytesService | |
| ) | |
| from shapely import from_wkb | |
| from application.contracts import IBlobStorageService | |
| from domain.enums import StorageContainer |
| partition_key, | ||
| TRY_CAST(bygningstype AS VARCHAR) AS type, | ||
| TRY_CAST(bygningsnummer AS INTEGER) AS building_id, | ||
| TRY_CAST(building_type AS VARCHAR) AS building_type, |
Copilot
AI
Jan 20, 2026
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There is a spelling error in the column alias: "buildling_type" should be "building_type".
| TRY_CAST(building_type AS VARCHAR) AS building_type, | |
| TRY_CAST(building_type AS VARCHAR) AS building_type, |
This pull request introduces significant changes to how geospatial datasets (FKB and OSM) are loaded and processed, moving from file-based workflows to reading pre-processed Parquet files from blob storage. It also introduces new utility methods for handling Parquet bytes as GeoDataFrames, updates dependency injection wiring, and refines the SQL queries and data handling for improved consistency and extensibility.
Major workflow changes:
fkb.parquet,osm.parquet) from blob storage, replacing previous workflows that relied on downloading and processing raw files. This simplifies and speeds up data ingestion. [1] [2] [3] [4] [5] [6]New and updated utility methods:
convert_parquet_bytes_to_gdfmethod toIBytesServiceand its implementation, enabling direct conversion of Parquet byte arrays (with WKB geometry) to GeoPandas GeoDataFrames. [1] [2]containers.pyto provideblob_storage_serviceandbytes_serviceto services that now require them. [1] [2]SQL and schema improvements:
conflation_service.pyto useexternal_idandbuilding_idfields, and added new fields (feature_update_time,feature_capture_time,building_type) for better data tracking and compatibility. [1] [2] [3]clip_dataframes_to_wkbto dynamically include optional columns if present, improving robustness and extensibility. [1] [2] [3]Configuration and enum updates:
CONTRIBUTIONto theStorageContainerenum to support the new blob storage workflow.find_dotenv()inconfig.pyfor more reliable configuration.Platform-specific and minor improvements:
curlon Linux for compatibility. [1] [2]These changes collectively modernize the data ingestion pipeline, making it more maintainable and performant by leveraging pre-processed data and improving code modularity.