-
Notifications
You must be signed in to change notification settings - Fork 0
Refactor/58 move the conflation algorithm to a separate file #65
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Refactor/58 move the conflation algorithm to a separate file #65
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Pull request overview
This pull request refactors the conflation pipeline to make it region-aware, eliminating the need to process all regions together. The conflation logic now operates on a per-region basis, improving modularity and resource management. The changes include interface updates, new utility methods for file path manipulation and blob storage existence checks, and simplified return types.
Changes:
- Refactored conflation service methods (
get_fkb_osm_id_relationsandmerge_fkb_osm) to accept aregionparameter, making them region-specific - Simplified return type of
merge_fkb_osmfromDict[str, list[gpd.GeoDataFrame]]tolist[gpd.GeoDataFrame] - Added robustness by checking for file existence before executing queries, preventing errors when data files are missing for a region
- Changed SQL
UNIONtoUNION ALLfor performance optimization in queries where duplicates are not expected
Reviewed changes
Copilot reviewed 8 out of 8 changed files in this pull request and generated 6 comments.
Show a summary per file
| File | Description |
|---|---|
| src/application/contracts/conflation_service_interface.py | Updated interface to add region parameter and simplify return type |
| src/application/contracts/blob_storage_service_interface.py | Added has_files_under_blob_path_base method to check for file existence |
| src/application/contracts/file_path_service_interface.py | Added remove_blob_file_name_from_path utility method for path manipulation |
| src/infra/infrastructure/services/conflation_service.py | Implemented region-aware conflation logic with file existence checks and helper methods for CTE generation |
| src/infra/infrastructure/services/blob_storage_service.py | Implemented blob path existence check method |
| src/infra/infrastructure/services/file_path_service.py | Implemented path manipulation utility to extract directory paths from file paths |
| src/infra/infrastructure/containers.py | Updated dependency injection to pass blob_storage_service to ConflationService |
| src/presentation/entrypoints/release_pipeline.py | Updated pipeline to call conflation per-region within the loop instead of batch processing |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Pull request overview
Copilot reviewed 8 out of 8 changed files in this pull request and generated 6 comments.
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| osm_cte, fkb_cte = ConflationService.__create_merge_cte( | ||
| has_osm_files=has_osm_files, | ||
| osm_release=osm_release, | ||
| osm_filter=osm_filter, | ||
| has_fkb_files=has_fkb_files, | ||
| fkb_release=fkb_release, | ||
| fkb_filter=fkb_filter | ||
| ) | ||
|
|
||
| query = f''' | ||
| WITH {fkb_cte}, {osm_cte}, |
Copilot
AI
Feb 3, 2026
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The return order from __create_merge_cte is (osm_cte, fkb_cte) (line 412), but the variables are used in reverse order in the query construction (line 251 uses {fkb_cte}, {osm_cte}). This mismatch will cause the osm CTE to be named "fkb" and the fkb CTE to be named "osm", leading to incorrect query results. The return statement should be changed to return fkb_cte, osm_cte to match the usage order.
| @staticmethod | ||
| @abstractmethod | ||
| def remove_blob_file_name_from_path(file_path: str, file_name: str, prefix: str | None = None) -> str: | ||
| """ | ||
| Removes the file name from a blob path, returning the directory path. | ||
| :param prefix: Prefix to be removed from the path. | ||
| :param file_path: File path including the file name. | ||
| :param file_name: File name to be removed from the path. | ||
| :return: File path without the file name. | ||
| """ |
Copilot
AI
Feb 3, 2026
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The abstract method remove_blob_file_name_from_path is missing the raise NotImplementedError statement in its body. All abstract methods in the interface should raise NotImplementedError to maintain consistency with other methods in this interface (see lines 29, 41, 64).
| has_fkb_files: bool, | ||
| osm_release: str, | ||
| fkb_release: str, | ||
| osm_filter: str, |
Copilot
AI
Feb 3, 2026
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The parameter order in __create_merge_cte is inconsistent with the call site. The function signature has parameters ordered as (has_osm_files, has_fkb_files, osm_release, fkb_release, osm_filter, fkb_filter), but the call site provides them in a different sequence (has_osm_files, osm_release, osm_filter, has_fkb_files, fkb_release, fkb_filter). While this works due to named parameters, it reduces code readability and maintainability. Consider reordering the function signature to match the natural calling pattern where related parameters are grouped together.
| has_fkb_files: bool, | |
| osm_release: str, | |
| fkb_release: str, | |
| osm_filter: str, | |
| osm_release: str, | |
| osm_filter: str, | |
| has_fkb_files: bool, | |
| fkb_release: str, |
| CAST(NULL AS INTEGER) AS external_id, | ||
| CAST(NULL AS BLOB) AS geometry, | ||
| CAST(NULL AS STRUCT(minx DOUBLE, miny DOUBLE, maxx DOUBLE, maxy DOUBLE)) AS bbox, | ||
| CAST(NULL AS VARCHAR) AS region, | ||
| CAST(NULL AS VARCHAR) AS partition_key, | ||
| CAST(NULL AS VARCHAR) AS building_type, | ||
| CAST(NULL AS INTEGER) AS building_id, | ||
| CAST(NULL AS TIMESTAMP) AS feature_update_time, | ||
| CAST(NULL AS TIMESTAMP) AS feature_capture_time, | ||
| 'osm' AS source | ||
| WHERE FALSE | ||
| ) | ||
| """ | ||
|
|
||
| df["geometry"] = df["geometry"].apply(lambda g: bytes(g) if isinstance(g, (bytearray, memoryview)) else g) | ||
| df["geometry"] = df["geometry"].apply(from_wkb) | ||
| gdf = gpd.GeoDataFrame(df, geometry="geometry", crs=f"EPSG:{EPSGCode.WGS84.value}") | ||
| if has_fkb_files: | ||
| fkb_cte = f"""fkb AS | ||
| ( | ||
| SELECT | ||
| external_id, | ||
| ST_AsWKB(geometry) AS geometry, | ||
| bbox, | ||
| region, | ||
| partition_key, | ||
| TRY_CAST(building_type AS VARCHAR) AS building_type, | ||
| TRY_CAST(building_id AS INTEGER) AS building_id, | ||
| NULL AS feature_update_time, | ||
| NULL AS feature_capture_time, | ||
| 'fkb' AS source | ||
| FROM read_parquet('{fkb_release}', union_by_name = true) | ||
| WHERE {fkb_filter} | ||
| ) | ||
| """ | ||
| else: | ||
| fkb_cte = """ | ||
| fkb AS ( | ||
| SELECT | ||
| CAST(NULL AS VARCHAR) AS external_id, |
Copilot
AI
Feb 3, 2026
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Type inconsistency in empty CTEs: OSM's external_id is cast to INTEGER (line 362) while FKB's external_id is cast to VARCHAR (line 398) in the empty result sets. Since these CTEs are combined using UNION ALL in the query (line 254-256), this type mismatch could cause SQL errors when one dataset has files but the other doesn't. Both should use the same type, likely VARCHAR to match FKB's actual data type or the common type from the actual parquet files.
| theme=Theme.BUILDINGS, | ||
| ids=relation_ids, | ||
| region=region |
Copilot
AI
Feb 3, 2026
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The parameter order in the method call doesn't match the method signature. The call uses (release, theme, ids, region) while the signature defines (release, region, theme, ids). Although this works correctly with named parameters, it reduces code readability. Consider reordering the call to match the signature: release, region, theme, ids.
| theme=Theme.BUILDINGS, | |
| ids=relation_ids, | |
| region=region | |
| region=region, | |
| theme=Theme.BUILDINGS, | |
| ids=relation_ids, |
| osm_cte, fkb_cte = ConflationService.__create_relation_cte( | ||
| has_osm_files=has_osm_files, | ||
| has_fkb_files=has_fkb_files, | ||
| osm_release=osm_release, | ||
| fkb_release=fkb_release | ||
| ) | ||
|
|
||
| query = f''' | ||
| WITH {fkb_cte}, {osm_cte}, |
Copilot
AI
Feb 3, 2026
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The return order from __create_relation_cte is (osm_cte, fkb_cte) (line 329), but the variables are used in reverse order in the query construction (line 89 uses {fkb_cte}, {osm_cte}). This mismatch will cause the osm CTE to be named "fkb" and the fkb CTE to be named "osm", leading to incorrect query results. The return statement should be changed to return fkb_cte, osm_cte to match the usage order.
This pull request introduces several improvements and refactorings to the conflation pipeline, primarily focused on adding region-awareness to the conflation logic, improving robustness when dealing with missing files, and simplifying interfaces and return types. The changes affect contract definitions, service implementations, dependency injection, and entrypoints.
Conflation logic and region-awareness
IConflationServiceinterface and its implementation inconflation_service.pyto require aregionparameter for bothget_fkb_osm_id_relationsandmerge_fkb_osm, making conflation operations region-specific. The return type ofmerge_fkb_osmwas simplified from a dictionary of lists to a list of partitionedGeoDataFrames. [1] [2] [3] [4] [5] [6] [7]Blob storage and file path utilities
has_files_under_blob_path_baseto theIBlobStorageServicecontract and implemented it inblob_storage_service.py, allowing checks for the existence of files under a given path. [1] [2]remove_blob_file_name_from_pathto theIFilePathServicecontract and implemented it infile_path_service.py, enabling extraction of directory paths from blob file paths with optional prefix removal. [1] [2]Dependency injection and entrypoint updates
containers.pyto inject the newblob_storage_serviceintoConflationService.These changes collectively improve the maintainability, correctness, and scalability of the conflation pipeline, especially for region-based processing.
References: [1] [2] [3] [4] [5]