From 9141163846b0d9910f85d2209bd4181d5527ad70 Mon Sep 17 00:00:00 2001 From: Adrien Carpentier Date: Fri, 3 Apr 2026 16:37:35 +0200 Subject: [PATCH 1/7] feat(csv): split GeoJSON and PMTiles into low-priority RQ jobs Geo export runs as chained tasks on the low queue; GeoJSON is generated from the PostgreSQL parsing table when CSV_TO_DB is enabled. RQ entrypoint uses context for exception-queue routing. Made-with: Cursor --- tests/test_analysis/test_analysis_csv.py | 101 ++++++- udata_hydra/analysis/csv.py | 57 ++-- udata_hydra/analysis/geojson.py | 325 ++++++++++++++++++++--- udata_hydra/analysis/helpers.py | 23 +- udata_hydra/analysis/resource.py | 5 +- 5 files changed, 454 insertions(+), 57 deletions(-) diff --git a/tests/test_analysis/test_analysis_csv.py b/tests/test_analysis/test_analysis_csv.py index 4ed38d11..0ea5ef45 100644 --- a/tests/test_analysis/test_analysis_csv.py +++ b/tests/test_analysis/test_analysis_csv.py @@ -14,7 +14,11 @@ from tests.conftest import RESOURCE_ID, RESOURCE_URL from udata_hydra.analysis.csv import analyse_csv, csv_to_db -from udata_hydra.analysis.geojson import csv_to_geojson_and_pmtiles, csv_to_geojson_from_db +from udata_hydra.analysis.geojson import ( + csv_to_geojson_and_pmtiles, + csv_to_geojson_from_db, + task_csv_to_geojson, +) from udata_hydra.crawl.check_resources import check_resource from udata_hydra.db.check import Check from udata_hydra.db.resource import Resource @@ -1043,3 +1047,98 @@ async def test_file_with_nan( # inf does for max, mean and std assert all(profile["c"][method] is None for method in ["max", "mean", "std"]) assert profile["c"]["min"] is not None + + +async def test_analyse_csv_split_geo_rq_chain( + setup_catalog, + rmock, + db, + fake_check, + produce_mock, + mocker, +): + """Chained geo tasks (from the parsing table) set geojson and pmtiles URLs on the check.""" + check = await fake_check() + rmock.get( + check["url"], + status=200, + body=b"lat,long,name\n48.8,2.3,a\n48.9,2.4,b\n", + ) + mocker.patch("udata_hydra.config.MINIO_URL", "my.minio.fr") + mocked_minio = MagicMock() + mocked_minio.fput_object.return_value = None + mocked_minio.bucket_exists.return_value = True + + async def fake_download(dl_url: str, suffix: str = "") -> str: + p = Path(f"{RESOURCE_ID}.geojson") + assert p.is_file() + return str(p.resolve()) + + with patch("udata_hydra.utils.minio.Minio", return_value=mocked_minio): + geo_client = MinIOClient(bucket="geojson_bucket", folder="geojson_folder") + pmtiles_client = MinIOClient(bucket="pmtiles_bucket", folder="pmtiles_folder") + + with ( + patch("udata_hydra.config.CSV_TO_GEOJSON", True), + patch("udata_hydra.config.DB_TO_GEOJSON", True), + patch("udata_hydra.config.REMOVE_GENERATED_FILES", False), + patch("udata_hydra.analysis.geojson.minio_client_geojson", new=geo_client), + patch("udata_hydra.analysis.geojson.minio_client_pmtiles", new=pmtiles_client), + patch("udata_hydra.analysis.helpers.download_url_to_tempfile", new=fake_download), + ): + await analyse_csv(check=check) + + res = await Check.get_by_id(check["id"]) + assert res is not None + assert res["parsing_error"] is None + assert res["geojson_url"].endswith(f"{RESOURCE_ID}.geojson") + assert res["pmtiles_url"].endswith(f"{RESOURCE_ID}.pmtiles") + + for ext in ("geojson", "pmtiles"): + p = Path(f"{RESOURCE_ID}.{ext}") + if p.is_file(): + p.unlink() + + +async def test_analyse_csv_skips_geo_rq_when_csv_to_db_disabled( + setup_catalog, + rmock, + db, + fake_check, + produce_mock, + mocker, +): + check = await fake_check() + url = check["url"] + rmock.get(url, status=200, body="a,b\n1,2\n") + enq = mocker.patch("udata_hydra.analysis.csv.queue.enqueue") + with ( + patch("udata_hydra.config.CSV_TO_GEOJSON", True), + patch("udata_hydra.config.CSV_TO_DB", False), + ): + await analyse_csv(check=check) + geo_enqueues = [c for c in enq.call_args_list if c[0] and c[0][0] is task_csv_to_geojson] + assert len(geo_enqueues) == 0 + + +async def test_split_geo_chain_skips_pmtiles_without_geo_columns( + setup_catalog, + rmock, + db, + fake_check, + produce_mock, + mocker, +): + """PMTiles job must not run when the parsing table has no geographical columns.""" + check = await fake_check() + url = check["url"] + rmock.get(url, status=200, body="a,b,c\n1,2,3\n") + to_pmtiles = mocker.patch("udata_hydra.analysis.geojson.task_geojson_to_pmtiles") + + with ( + patch("udata_hydra.config.CSV_TO_GEOJSON", True), + patch("udata_hydra.config.DB_TO_GEOJSON", True), + ): + await analyse_csv(check=check) + + to_pmtiles.assert_not_called() diff --git a/udata_hydra/analysis/csv.py b/udata_hydra/analysis/csv.py index 511bef3a..2bdab66e 100644 --- a/udata_hydra/analysis/csv.py +++ b/udata_hydra/analysis/csv.py @@ -33,7 +33,7 @@ from udata_hydra import config, context from udata_hydra.analysis import helpers -from udata_hydra.analysis.geojson import csv_to_geojson_and_pmtiles +from udata_hydra.analysis.geojson import task_csv_to_geojson from udata_hydra.db import RESERVED_COLS, compute_insert_query from udata_hydra.db.check import Check from udata_hydra.db.resource import Resource @@ -44,6 +44,7 @@ Timer, detect_tabular_from_headers, handle_parse_exception, + queue, remove_remainders, ) from udata_hydra.utils.casting import generate_records @@ -77,12 +78,32 @@ minio_client = MinIOClient(bucket=config.MINIO_PARQUET_BUCKET, folder=config.MINIO_PARQUET_FOLDER) +async def task_analyse_csv( + check: Record | dict, + file_path: str | None = None, + debug_insert: bool = False, + worker_exception: bool = False, +) -> None: + """RQ task: CSV analysis; forwards worker_exception to chained task_csv_to_geojson / task_geojson_to_pmtiles.""" + await analyse_csv( + check, + file_path, + debug_insert, + worker_exception, + ) + + async def analyse_csv( check: Record | dict, file_path: str | None = None, debug_insert: bool = False, + worker_exception: bool = False, ) -> None: - """Launch csv analysis from a check or an URL (debug), using previously downloaded file at file_path if any""" + """Launch csv analysis from a check or an URL (debug), using previously downloaded file at file_path if any. + + worker_exception: when True (RQ crawl path for resources_exceptions), chained geo jobs use the same RQ + exception timeout as this run. + """ if not config.CSV_ANALYSIS: log.debug("CSV_ANALYSIS turned off, skipping.") return @@ -181,24 +202,20 @@ async def analyse_csv( check_id=check["id"], ) from e - try: - await csv_to_geojson_and_pmtiles( - file_path=tmp_file.name, - inspection=csv_inspection, - resource_id=resource_id, - check_id=check["id"], - timer=timer, - table_name=table_name if config.CSV_TO_DB else None, - ) - except Exception as e: - remove_remainders(resource_id, ["geojson", "pmtiles", "pmtiles-journal"]) - raise ParseException( - message=str(e), - step="geojson_export", - resource_id=resource_id, - url=url, - check_id=check["id"], - ) from e + if config.CSV_TO_GEOJSON or config.DB_TO_GEOJSON: + if not config.CSV_TO_DB: + log.debug( + "Skipping GeoJSON/PMTiles RQ jobs: CSV_TO_DB is false " + "(geo export requires the parsing table in PostgreSQL)." + ) + else: + queue.enqueue( + task_csv_to_geojson, + check["id"], + worker_exception=worker_exception, + _priority="low", + _exception=worker_exception, + ) check = await Check.update( # type: ignore[assignment] check["id"], diff --git a/udata_hydra/analysis/geojson.py b/udata_hydra/analysis/geojson.py index e7e20b48..faba0341 100644 --- a/udata_hydra/analysis/geojson.py +++ b/udata_hydra/analysis/geojson.py @@ -5,6 +5,7 @@ from pathlib import Path from typing import Any, Iterator +import aiohttp import tippecanoe from asyncpg import Record from json_stream import streamable_list @@ -20,6 +21,7 @@ ParseException, Timer, handle_parse_exception, + queue, remove_remainders, ) from udata_hydra.utils.casting import generate_records @@ -425,33 +427,33 @@ async def geojson_to_pmtiles( return pmtiles_size, pmtiles_url -async def csv_to_geojson_and_pmtiles( - file_path: str, +async def export_geojson_for_csv_analysis( + *, + resource_id: str | None, + check_id: int | None, inspection: dict, - resource_id: str | None = None, - check_id: int | None = None, + geojson_filepath: Path, + file_path: str | None, + table_name: str | None, timer: Timer | None = None, - table_name: str | None = None, -) -> tuple[Path, int, str | None, Path, int, str | None] | None: +) -> tuple[int, str | None] | None: + """Build GeoJSON from the parsing table (Hydra pipeline) or from a CSV file (CLI/tests). + + When ``table_name`` is set, reads features from PostgreSQL via ``csv_to_geojson_from_db``. + Otherwise requires ``file_path`` for ``csv_to_geojson``. + + Returns ``(geojson_size, geojson_url)`` or ``None`` when geo export is disabled or + there are no geographical columns. + """ if not config.CSV_TO_GEOJSON and not config.DB_TO_GEOJSON: - log.debug("CSV_TO_GEOJSON and DB_TO_GEOJSON turned off, skipping geojson/PMtiles export.") return None - log.debug( - f"Converting to geojson and PMtiles if relevant for {resource_id} and sending to MinIO." - ) - if resource_id: - geojson_filepath = Path(f"{resource_id}.geojson") - pmtiles_filepath = Path(f"{resource_id}.pmtiles") # Update resource status for GeoJSON conversion await Resource.update(resource_id, {"status": "CONVERTING_TO_GEOJSON"}) - else: - geojson_filepath = DEFAULT_GEOJSON_FILEPATH - pmtiles_filepath = DEFAULT_PMTILES_FILEPATH # Convert to GeoJSON — from DB if available and enabled, otherwise from CSV file - if config.DB_TO_GEOJSON and table_name: + if table_name: result = await csv_to_geojson_from_db( table_name, inspection, @@ -459,40 +461,297 @@ async def csv_to_geojson_and_pmtiles( upload_to_minio=True, ) else: + if not file_path: + raise ValueError("file_path is required when no parsing table name is provided") result = await csv_to_geojson(file_path, inspection, geojson_filepath, upload_to_minio=True) + if result is None: return None geojson_size, geojson_url = result if timer: timer.mark("csv-to-geojson") + if check_id is not None: + await Check.update( + check_id, + {"geojson_url": geojson_url, "geojson_size": geojson_size}, + ) + return geojson_size, geojson_url - await Check.update( - check_id, - { - "geojson_url": geojson_url, - "geojson_size": geojson_size, - }, - ) - # Update resource status for PMTiles conversion +async def export_pmtiles_from_local_geojson( + *, + resource_id: str | None, + check_id: int | None, + geojson_filepath: Path, + pmtiles_filepath: Path, + timer: Timer | None = None, + unlink_geojson_after: bool | None = None, +) -> tuple[int, str | None]: + """Run tippecanoe on a local GeoJSON path, upload PMTiles, update the check.""" if resource_id: + # Update resource status for PMTiles conversion await Resource.update(resource_id, {"status": "CONVERTING_TO_PMTILES"}) # Convert GeoJSON to PMTiles pmtiles_size, pmtiles_url = await geojson_to_pmtiles(geojson_filepath, pmtiles_filepath) if timer: timer.mark("geojson-to-pmtiles") + if check_id is not None: + await Check.update( + check_id, + {"pmtiles_url": pmtiles_url, "pmtiles_size": pmtiles_size}, + ) + do_unlink = ( + config.REMOVE_GENERATED_FILES if unlink_geojson_after is None else unlink_geojson_after + ) + if do_unlink and geojson_filepath.is_file(): + geojson_filepath.unlink() + return pmtiles_size, pmtiles_url + - await Check.update( - check_id, - { - "pmtiles_url": pmtiles_url, - "pmtiles_size": pmtiles_size, - }, +async def csv_to_geojson_and_pmtiles( + file_path: str, + inspection: dict, + resource_id: str | None = None, + check_id: int | None = None, + timer: Timer | None = None, + table_name: str | None = None, +) -> tuple[Path, int, str | None, Path, int, str | None] | None: + if not config.CSV_TO_GEOJSON and not config.DB_TO_GEOJSON: + log.debug("CSV_TO_GEOJSON and DB_TO_GEOJSON turned off, skipping geojson/PMtiles export.") + return None + + log.debug( + f"Converting to geojson and PMtiles if relevant for {resource_id} and sending to MinIO." ) - if config.REMOVE_GENERATED_FILES: - geojson_filepath.unlink() + if resource_id: + geojson_filepath = Path(f"{resource_id}.geojson") + pmtiles_filepath = Path(f"{resource_id}.pmtiles") + else: + geojson_filepath = DEFAULT_GEOJSON_FILEPATH + pmtiles_filepath = DEFAULT_PMTILES_FILEPATH + + geo_result = await export_geojson_for_csv_analysis( + resource_id=resource_id, + check_id=check_id, + inspection=inspection, + geojson_filepath=geojson_filepath, + file_path=file_path, + table_name=table_name, + timer=timer, + ) + if geo_result is None: + return None + geojson_size, geojson_url = geo_result + + pmtiles_size, pmtiles_url = await export_pmtiles_from_local_geojson( + resource_id=resource_id, + check_id=check_id, + geojson_filepath=geojson_filepath, + pmtiles_filepath=pmtiles_filepath, + timer=timer, + unlink_geojson_after=config.REMOVE_GENERATED_FILES, + ) # returning only for tests purposes return geojson_filepath, geojson_size, geojson_url, pmtiles_filepath, pmtiles_size, pmtiles_url + + +async def _load_csv_inspection_for_table(resource_id: str, parsing_table: str) -> dict | None: + db = await context.pool("csv") + row = await db.fetchrow( + "SELECT csv_detective FROM tables_index WHERE resource_id = $1 AND parsing_table = $2 " + "ORDER BY created_at DESC LIMIT 1", + resource_id, + parsing_table, + ) + if not row: + return None + return json.loads(row["csv_detective"]) + + +async def task_csv_to_geojson( + check_id: int, + worker_exception: bool = False, +) -> None: + """Generate GeoJSON from the CSV parsing table and enqueue PMTiles conversion.""" + if not config.CSV_TO_GEOJSON and not config.DB_TO_GEOJSON: + log.debug("CSV_TO_GEOJSON and DB_TO_GEOJSON turned off, skipping geo job.") + return + + record = await Check.get_by_id(check_id, with_deleted=True) + if not record: + log.error("task_csv_to_geojson: check %s not found", check_id) + return + + check: dict = dict(record) + resource_id = str(check["resource_id"]) + url = check["url"] + resource: Record | None = await Resource.get(resource_id) + parsing_table = check.get("parsing_table") + + if not parsing_table: + log.warning("task_csv_to_geojson: no parsing_table for check %s", check_id) + return + + inspection = await _load_csv_inspection_for_table(resource_id, parsing_table) + if not inspection: + log.error( + "task_csv_to_geojson: no tables_index row for resource=%s table=%s", + resource_id, + parsing_table, + ) + return + + geojson_path = Path(f"{resource_id}.geojson") + chained_pmtiles = False + + try: + try: + result = await export_geojson_for_csv_analysis( + resource_id=resource_id, + check_id=check_id, + inspection=inspection, + geojson_filepath=geojson_path, + file_path=None, + table_name=parsing_table, + timer=None, + ) + except Exception as e: + remove_remainders(resource_id, ["geojson", "pmtiles", "pmtiles-journal"]) + raise ParseException( + message=str(e), + step="geojson_export", + resource_id=resource_id, + url=url, + check_id=check_id, + ) from e + + if result is None: + log.debug("No geographical columns for %s, skipping geojson/PMTiles", resource_id) + return + + check = dict((await Check.get_by_id(check_id, with_deleted=True)) or check) + + # Update resource status for PMTiles conversion + await Resource.update(resource_id, {"status": "CONVERTING_TO_PMTILES"}) + queue.enqueue( + task_geojson_to_pmtiles, + check_id, + _priority="low", + _exception=worker_exception, + ) + chained_pmtiles = True + + if resource is not None: + await helpers.notify_udata(resource, check) + + except (ParseException, OSError, ValueError) as e: + if isinstance(e, ParseException): + await handle_parse_exception(e, parsing_table, check) + else: + await handle_parse_exception( + ParseException( + message=str(e), + step="geojson_export", + resource_id=resource_id, + url=url, + check_id=check_id, + ), + parsing_table, + check, + ) + if resource is not None: + refreshed = await Check.get_by_id(check_id, with_deleted=True) + if refreshed: + await helpers.notify_udata(resource, dict(refreshed)) + finally: + if not chained_pmtiles: + await Resource.update(resource_id, {"status": None}) + if config.REMOVE_GENERATED_FILES and geojson_path.is_file(): + try: + geojson_path.unlink() + except OSError: + log.warning("Could not remove %s", geojson_path) + + +async def task_geojson_to_pmtiles(check_id: int) -> None: + """Convert GeoJSON (from MinIO) to PMTiles and persist URLs on the check.""" + record = await Check.get_by_id(check_id, with_deleted=True) + if not record: + log.error("task_geojson_to_pmtiles: check %s not found", check_id) + return + + check: dict = dict(record) + resource_id = str(check["resource_id"]) + url = check["url"] + resource: Record | None = await Resource.get(resource_id) + geojson_url = check.get("geojson_url") + + if not geojson_url: + log.warning("task_geojson_to_pmtiles: no geojson_url for check %s", check_id) + await Resource.update(resource_id, {"status": None}) + return + + # Update resource status for PMTiles conversion + await Resource.update(resource_id, {"status": "CONVERTING_TO_PMTILES"}) + pmtiles_path = Path(f"{resource_id}.pmtiles") + tmp_geo: str | None = None + + try: + try: + tmp_geo = await helpers.download_url_to_tempfile(geojson_url, suffix=".geojson") + await export_pmtiles_from_local_geojson( + resource_id=resource_id, + check_id=check_id, + geojson_filepath=Path(tmp_geo), + pmtiles_filepath=pmtiles_path, + timer=None, + unlink_geojson_after=False, + ) + except Exception as e: + remove_remainders(resource_id, ["pmtiles", "pmtiles-journal"]) + raise ParseException( + message=str(e), + step="pmtiles_export", + resource_id=resource_id, + url=url, + check_id=check_id, + ) from e + + refreshed = await Check.get_by_id(check_id, with_deleted=True) + if resource is not None and refreshed: + await helpers.notify_udata(resource, dict(refreshed)) + + except (ParseException, OSError, aiohttp.ClientError) as e: + if isinstance(e, ParseException): + await handle_parse_exception(e, check.get("parsing_table"), check) + else: + await handle_parse_exception( + ParseException( + message=str(e), + step="pmtiles_export", + resource_id=resource_id, + url=url, + check_id=check_id, + ), + check.get("parsing_table"), + check, + ) + if resource is not None: + refreshed = await Check.get_by_id(check_id, with_deleted=True) + if refreshed: + await helpers.notify_udata(resource, dict(refreshed)) + finally: + await Resource.update(resource_id, {"status": None}) + if tmp_geo is not None: + try: + os.remove(tmp_geo) + except OSError: + log.warning("Could not remove temp geojson %s", tmp_geo) + if pmtiles_path.is_file() and config.REMOVE_GENERATED_FILES: + try: + pmtiles_path.unlink() + except OSError: + log.warning("Could not remove %s", pmtiles_path) diff --git a/udata_hydra/analysis/helpers.py b/udata_hydra/analysis/helpers.py index da4e4922..6e79be45 100644 --- a/udata_hydra/analysis/helpers.py +++ b/udata_hydra/analysis/helpers.py @@ -1,11 +1,17 @@ import json +import logging +import os +import tempfile from typing import IO +import aiohttp from asyncpg import Record from udata_hydra import config from udata_hydra.utils import IOException, UdataPayload, download_resource, queue, send +log = logging.getLogger("udata-hydra") + def get_python_type(column: dict) -> str: """Outsourcing the distinction of aware datetimes""" @@ -42,6 +48,21 @@ async def read_or_download_file( return tmp_file +async def download_url_to_tempfile(url: str, suffix: str = "") -> str: + """Download a URL to a named temporary file and return its filesystem path.""" + async with aiohttp.ClientSession() as session: + async with session.get(url) as resp: + resp.raise_for_status() + body = await resp.read() + fd, path = tempfile.mkstemp(suffix=suffix) + try: + os.write(fd, body) + finally: + os.close(fd) + log.debug("Downloaded %s to %s (%s bytes)", url, path, len(body)) + return path + + async def notify_udata(resource: Record | None, check: Record | dict | None) -> None: """Notify udata of the result of a parsing""" if check is None or resource is None: @@ -67,7 +88,7 @@ async def notify_udata(resource: Record | None, check: Record | dict | None) -> if config.GEOJSON_TO_PMTILES and check.get("pmtiles_url"): payload["document"]["analysis:parsing:pmtiles_url"] = check.get("pmtiles_url") payload["document"]["analysis:parsing:pmtiles_size"] = check.get("pmtiles_size") - if config.CSV_TO_GEOJSON and check.get("geojson_url"): + if (config.CSV_TO_GEOJSON or config.DB_TO_GEOJSON) and check.get("geojson_url"): payload["document"]["analysis:parsing:geojson_url"] = check.get("geojson_url") payload["document"]["analysis:parsing:geojson_size"] = check.get("geojson_size") if config.OGC_ANALYSIS_ENABLED and check.get("ogc_metadata"): diff --git a/udata_hydra/analysis/resource.py b/udata_hydra/analysis/resource.py index 13c878e9..9b026842 100644 --- a/udata_hydra/analysis/resource.py +++ b/udata_hydra/analysis/resource.py @@ -9,7 +9,7 @@ from dateparser import parse as date_parser from udata_hydra import config, context -from udata_hydra.analysis.csv import analyse_csv +from udata_hydra.analysis.csv import task_analyse_csv from udata_hydra.analysis.geojson import analyse_geojson from udata_hydra.analysis.ogc import analyse_ogc from udata_hydra.analysis.parquet import analyse_parquet @@ -166,9 +166,10 @@ async def analyse_resource( await Resource.update(resource_id, data={"status": "TO_ANALYSE_CSV"}) # Analyse CSV and create a table in the CSV database queue.enqueue( - analyse_csv, + task_analyse_csv, check=check, file_path=tmp_file.name, + worker_exception=bool(exception), _priority="high" if worker_priority == "high" else "default", _exception=bool(exception), ) From 01d2ffb739d5a530f21eec152abe8eaa292fe799 Mon Sep 17 00:00:00 2001 From: Adrien Carpentier Date: Fri, 3 Apr 2026 16:53:03 +0200 Subject: [PATCH 2/7] clean: remove CSV_TO_GEOJSON setting --- tests/test_analysis/test_analysis_csv.py | 10 +++------- udata_hydra/analysis/csv.py | 2 +- udata_hydra/analysis/geojson.py | 12 ++++++------ udata_hydra/analysis/helpers.py | 2 +- udata_hydra/config_default.toml | 1 - udata_hydra/routes/status.py | 1 - 6 files changed, 11 insertions(+), 17 deletions(-) diff --git a/tests/test_analysis/test_analysis_csv.py b/tests/test_analysis/test_analysis_csv.py index 0ea5ef45..b4de4fbb 100644 --- a/tests/test_analysis/test_analysis_csv.py +++ b/tests/test_analysis/test_analysis_csv.py @@ -646,7 +646,7 @@ async def test_csv_to_geojson_pmtiles(db, params, clean_db, mocker): assert expected_formats[col] in inspection["columns"][col]["format"] with ( - patch("udata_hydra.config.CSV_TO_GEOJSON", patched_config), + patch("udata_hydra.config.DB_TO_GEOJSON", patched_config), patch("udata_hydra.config.REMOVE_GENERATED_FILES", False), ): if not patched_config or expected_formats is None: @@ -1079,7 +1079,6 @@ async def fake_download(dl_url: str, suffix: str = "") -> str: pmtiles_client = MinIOClient(bucket="pmtiles_bucket", folder="pmtiles_folder") with ( - patch("udata_hydra.config.CSV_TO_GEOJSON", True), patch("udata_hydra.config.DB_TO_GEOJSON", True), patch("udata_hydra.config.REMOVE_GENERATED_FILES", False), patch("udata_hydra.analysis.geojson.minio_client_geojson", new=geo_client), @@ -1113,7 +1112,7 @@ async def test_analyse_csv_skips_geo_rq_when_csv_to_db_disabled( rmock.get(url, status=200, body="a,b\n1,2\n") enq = mocker.patch("udata_hydra.analysis.csv.queue.enqueue") with ( - patch("udata_hydra.config.CSV_TO_GEOJSON", True), + patch("udata_hydra.config.DB_TO_GEOJSON", True), patch("udata_hydra.config.CSV_TO_DB", False), ): await analyse_csv(check=check) @@ -1135,10 +1134,7 @@ async def test_split_geo_chain_skips_pmtiles_without_geo_columns( rmock.get(url, status=200, body="a,b,c\n1,2,3\n") to_pmtiles = mocker.patch("udata_hydra.analysis.geojson.task_geojson_to_pmtiles") - with ( - patch("udata_hydra.config.CSV_TO_GEOJSON", True), - patch("udata_hydra.config.DB_TO_GEOJSON", True), - ): + with patch("udata_hydra.config.DB_TO_GEOJSON", True): await analyse_csv(check=check) to_pmtiles.assert_not_called() diff --git a/udata_hydra/analysis/csv.py b/udata_hydra/analysis/csv.py index 2bdab66e..9cbd4a60 100644 --- a/udata_hydra/analysis/csv.py +++ b/udata_hydra/analysis/csv.py @@ -202,7 +202,7 @@ async def analyse_csv( check_id=check["id"], ) from e - if config.CSV_TO_GEOJSON or config.DB_TO_GEOJSON: + if config.DB_TO_GEOJSON: if not config.CSV_TO_DB: log.debug( "Skipping GeoJSON/PMTiles RQ jobs: CSV_TO_DB is false " diff --git a/udata_hydra/analysis/geojson.py b/udata_hydra/analysis/geojson.py index faba0341..4fc879ff 100644 --- a/udata_hydra/analysis/geojson.py +++ b/udata_hydra/analysis/geojson.py @@ -442,10 +442,10 @@ async def export_geojson_for_csv_analysis( When ``table_name`` is set, reads features from PostgreSQL via ``csv_to_geojson_from_db``. Otherwise requires ``file_path`` for ``csv_to_geojson``. - Returns ``(geojson_size, geojson_url)`` or ``None`` when geo export is disabled or + Returns ``(geojson_size, geojson_url)`` or ``None`` when ``DB_TO_GEOJSON`` is false or there are no geographical columns. """ - if not config.CSV_TO_GEOJSON and not config.DB_TO_GEOJSON: + if not config.DB_TO_GEOJSON: return None if resource_id: @@ -517,8 +517,8 @@ async def csv_to_geojson_and_pmtiles( timer: Timer | None = None, table_name: str | None = None, ) -> tuple[Path, int, str | None, Path, int, str | None] | None: - if not config.CSV_TO_GEOJSON and not config.DB_TO_GEOJSON: - log.debug("CSV_TO_GEOJSON and DB_TO_GEOJSON turned off, skipping geojson/PMtiles export.") + if not config.DB_TO_GEOJSON: + log.debug("DB_TO_GEOJSON turned off, skipping geojson/PMtiles export.") return None log.debug( @@ -576,8 +576,8 @@ async def task_csv_to_geojson( worker_exception: bool = False, ) -> None: """Generate GeoJSON from the CSV parsing table and enqueue PMTiles conversion.""" - if not config.CSV_TO_GEOJSON and not config.DB_TO_GEOJSON: - log.debug("CSV_TO_GEOJSON and DB_TO_GEOJSON turned off, skipping geo job.") + if not config.DB_TO_GEOJSON: + log.debug("DB_TO_GEOJSON turned off, skipping geo job.") return record = await Check.get_by_id(check_id, with_deleted=True) diff --git a/udata_hydra/analysis/helpers.py b/udata_hydra/analysis/helpers.py index 6e79be45..fdff71b4 100644 --- a/udata_hydra/analysis/helpers.py +++ b/udata_hydra/analysis/helpers.py @@ -88,7 +88,7 @@ async def notify_udata(resource: Record | None, check: Record | dict | None) -> if config.GEOJSON_TO_PMTILES and check.get("pmtiles_url"): payload["document"]["analysis:parsing:pmtiles_url"] = check.get("pmtiles_url") payload["document"]["analysis:parsing:pmtiles_size"] = check.get("pmtiles_size") - if (config.CSV_TO_GEOJSON or config.DB_TO_GEOJSON) and check.get("geojson_url"): + if config.DB_TO_GEOJSON and check.get("geojson_url"): payload["document"]["analysis:parsing:geojson_url"] = check.get("geojson_url") payload["document"]["analysis:parsing:geojson_size"] = check.get("geojson_size") if config.OGC_ANALYSIS_ENABLED and check.get("ogc_metadata"): diff --git a/udata_hydra/config_default.toml b/udata_hydra/config_default.toml index f2c83987..dd68e1c8 100644 --- a/udata_hydra/config_default.toml +++ b/udata_hydra/config_default.toml @@ -103,7 +103,6 @@ MINIO_PMTILES_BUCKET = "" MINIO_PMTILES_FOLDER = "" # no trailing slash # -- Geojson conversion settings -- # -CSV_TO_GEOJSON = false DB_TO_GEOJSON = false MINIO_GEOJSON_BUCKET = "" MINIO_GEOJSON_FOLDER = "" # no trailing slash diff --git a/udata_hydra/routes/status.py b/udata_hydra/routes/status.py index 0d8f7991..36714922 100644 --- a/udata_hydra/routes/status.py +++ b/udata_hydra/routes/status.py @@ -168,7 +168,6 @@ async def get_health(request: web.Request) -> web.Response: "csv_to_parquet": config.CSV_TO_PARQUET, "db_to_parquet": config.DB_TO_PARQUET, "geojson_to_pmtiles": config.GEOJSON_TO_PMTILES, - "csv_to_geojson": config.CSV_TO_GEOJSON, "db_to_geojson": config.DB_TO_GEOJSON, "parquet_to_db": config.PARQUET_TO_DB, } From f5b527a9d4944e2f2a4d3d3f4881421b5248fc6b Mon Sep 17 00:00:00 2001 From: Adrien Carpentier Date: Fri, 3 Apr 2026 16:59:47 +0200 Subject: [PATCH 3/7] clean: better string interpolation --- udata_hydra/analysis/geojson.py | 18 +++++++++--------- 1 file changed, 9 insertions(+), 9 deletions(-) diff --git a/udata_hydra/analysis/geojson.py b/udata_hydra/analysis/geojson.py index 4fc879ff..7eaaacac 100644 --- a/udata_hydra/analysis/geojson.py +++ b/udata_hydra/analysis/geojson.py @@ -582,7 +582,7 @@ async def task_csv_to_geojson( record = await Check.get_by_id(check_id, with_deleted=True) if not record: - log.error("task_csv_to_geojson: check %s not found", check_id) + log.error(f"task_csv_to_geojson: check {check_id} not found") return check: dict = dict(record) @@ -592,13 +592,13 @@ async def task_csv_to_geojson( parsing_table = check.get("parsing_table") if not parsing_table: - log.warning("task_csv_to_geojson: no parsing_table for check %s", check_id) + log.warning(f"task_csv_to_geojson: no parsing_table for check {check_id}") return inspection = await _load_csv_inspection_for_table(resource_id, parsing_table) if not inspection: log.error( - "task_csv_to_geojson: no tables_index row for resource=%s table=%s", + f"task_csv_to_geojson: no tables_index row for resource {resource_id} table {parsing_table}", resource_id, parsing_table, ) @@ -629,7 +629,7 @@ async def task_csv_to_geojson( ) from e if result is None: - log.debug("No geographical columns for %s, skipping geojson/PMTiles", resource_id) + log.debug(f"No geographical columns for {resource_id}, skipping geojson/PMTiles") return check = dict((await Check.get_by_id(check_id, with_deleted=True)) or check) @@ -673,14 +673,14 @@ async def task_csv_to_geojson( try: geojson_path.unlink() except OSError: - log.warning("Could not remove %s", geojson_path) + log.warning(f"Could not remove {geojson_path}") async def task_geojson_to_pmtiles(check_id: int) -> None: """Convert GeoJSON (from MinIO) to PMTiles and persist URLs on the check.""" record = await Check.get_by_id(check_id, with_deleted=True) if not record: - log.error("task_geojson_to_pmtiles: check %s not found", check_id) + log.error(f"task_geojson_to_pmtiles: check{check_id} not found") return check: dict = dict(record) @@ -690,7 +690,7 @@ async def task_geojson_to_pmtiles(check_id: int) -> None: geojson_url = check.get("geojson_url") if not geojson_url: - log.warning("task_geojson_to_pmtiles: no geojson_url for check %s", check_id) + log.warning(f"task_geojson_to_pmtiles: no geojson_url for check {check_id}") await Resource.update(resource_id, {"status": None}) return @@ -749,9 +749,9 @@ async def task_geojson_to_pmtiles(check_id: int) -> None: try: os.remove(tmp_geo) except OSError: - log.warning("Could not remove temp geojson %s", tmp_geo) + log.warning(f"Could not remove temp geojson {tmp_geo}") if pmtiles_path.is_file() and config.REMOVE_GENERATED_FILES: try: pmtiles_path.unlink() except OSError: - log.warning("Could not remove %s", pmtiles_path) + log.warning(f"Could not remove {pmtiles_path}") From 0bce2335f34e8a976a89ccc3a3f3d366798f4543 Mon Sep 17 00:00:00 2001 From: Adrien Carpentier Date: Fri, 3 Apr 2026 17:29:23 +0200 Subject: [PATCH 4/7] clean: use Path objects --- tests/test_analysis/test_analysis_csv.py | 4 ++-- udata_hydra/analysis/geojson.py | 8 ++++---- udata_hydra/analysis/helpers.py | 10 ++++++---- 3 files changed, 12 insertions(+), 10 deletions(-) diff --git a/tests/test_analysis/test_analysis_csv.py b/tests/test_analysis/test_analysis_csv.py index b4de4fbb..66d900d4 100644 --- a/tests/test_analysis/test_analysis_csv.py +++ b/tests/test_analysis/test_analysis_csv.py @@ -1069,10 +1069,10 @@ async def test_analyse_csv_split_geo_rq_chain( mocked_minio.fput_object.return_value = None mocked_minio.bucket_exists.return_value = True - async def fake_download(dl_url: str, suffix: str = "") -> str: + async def fake_download(dl_url: str, suffix: str = "") -> Path: p = Path(f"{RESOURCE_ID}.geojson") assert p.is_file() - return str(p.resolve()) + return p.resolve() with patch("udata_hydra.utils.minio.Minio", return_value=mocked_minio): geo_client = MinIOClient(bucket="geojson_bucket", folder="geojson_folder") diff --git a/udata_hydra/analysis/geojson.py b/udata_hydra/analysis/geojson.py index 7eaaacac..7e34950e 100644 --- a/udata_hydra/analysis/geojson.py +++ b/udata_hydra/analysis/geojson.py @@ -680,7 +680,7 @@ async def task_geojson_to_pmtiles(check_id: int) -> None: """Convert GeoJSON (from MinIO) to PMTiles and persist URLs on the check.""" record = await Check.get_by_id(check_id, with_deleted=True) if not record: - log.error(f"task_geojson_to_pmtiles: check{check_id} not found") + log.error(f"task_geojson_to_pmtiles: check {check_id} not found") return check: dict = dict(record) @@ -697,7 +697,7 @@ async def task_geojson_to_pmtiles(check_id: int) -> None: # Update resource status for PMTiles conversion await Resource.update(resource_id, {"status": "CONVERTING_TO_PMTILES"}) pmtiles_path = Path(f"{resource_id}.pmtiles") - tmp_geo: str | None = None + tmp_geo: Path | None = None try: try: @@ -705,7 +705,7 @@ async def task_geojson_to_pmtiles(check_id: int) -> None: await export_pmtiles_from_local_geojson( resource_id=resource_id, check_id=check_id, - geojson_filepath=Path(tmp_geo), + geojson_filepath=tmp_geo, pmtiles_filepath=pmtiles_path, timer=None, unlink_geojson_after=False, @@ -747,7 +747,7 @@ async def task_geojson_to_pmtiles(check_id: int) -> None: await Resource.update(resource_id, {"status": None}) if tmp_geo is not None: try: - os.remove(tmp_geo) + tmp_geo.unlink(missing_ok=True) except OSError: log.warning(f"Could not remove temp geojson {tmp_geo}") if pmtiles_path.is_file() and config.REMOVE_GENERATED_FILES: diff --git a/udata_hydra/analysis/helpers.py b/udata_hydra/analysis/helpers.py index fdff71b4..d58dd438 100644 --- a/udata_hydra/analysis/helpers.py +++ b/udata_hydra/analysis/helpers.py @@ -2,6 +2,7 @@ import logging import os import tempfile +from pathlib import Path from typing import IO import aiohttp @@ -48,18 +49,19 @@ async def read_or_download_file( return tmp_file -async def download_url_to_tempfile(url: str, suffix: str = "") -> str: - """Download a URL to a named temporary file and return its filesystem path.""" +async def download_url_to_tempfile(url: str, suffix: str = "") -> Path: + """Download a URL to a named temporary file and return its path.""" async with aiohttp.ClientSession() as session: async with session.get(url) as resp: resp.raise_for_status() body = await resp.read() - fd, path = tempfile.mkstemp(suffix=suffix) + fd, raw_path = tempfile.mkstemp(suffix=suffix) + path = Path(raw_path) try: os.write(fd, body) finally: os.close(fd) - log.debug("Downloaded %s to %s (%s bytes)", url, path, len(body)) + log.debug(f"Downloaded {url} to {path} ({len(body)} bytes)") return path From 23fd847df42c63e771e39ca64230e2e953f711eb Mon Sep 17 00:00:00 2001 From: Adrien Carpentier Date: Fri, 3 Apr 2026 18:05:48 +0200 Subject: [PATCH 5/7] refactor: refactor geojson.py into several files # Conflicts: # udata_hydra/analysis/geojson.py --- tests/test_analysis/test_analysis_csv.py | 8 +- tests/test_analysis/test_geojson.py | 6 +- udata_hydra/analysis/csv_geojson.py | 283 ++++++++++++++++++ udata_hydra/analysis/geojson.py | 355 ++--------------------- udata_hydra/analysis/pmtiles.py | 63 ++++ 5 files changed, 379 insertions(+), 336 deletions(-) create mode 100644 udata_hydra/analysis/csv_geojson.py create mode 100644 udata_hydra/analysis/pmtiles.py diff --git a/tests/test_analysis/test_analysis_csv.py b/tests/test_analysis/test_analysis_csv.py index 66d900d4..b9577664 100644 --- a/tests/test_analysis/test_analysis_csv.py +++ b/tests/test_analysis/test_analysis_csv.py @@ -674,11 +674,11 @@ async def test_csv_to_geojson_pmtiles(db, params, clean_db, mocker): ) with ( patch( - "udata_hydra.analysis.geojson.minio_client_geojson", + "udata_hydra.analysis.csv_geojson.minio_client_geojson", new=mocked_minio_client_geojson, ), patch( - "udata_hydra.analysis.geojson.minio_client_pmtiles", + "udata_hydra.analysis.pmtiles.minio_client_pmtiles", new=mocked_minio_client_pmtiles, ), ): @@ -1081,8 +1081,8 @@ async def fake_download(dl_url: str, suffix: str = "") -> Path: with ( patch("udata_hydra.config.DB_TO_GEOJSON", True), patch("udata_hydra.config.REMOVE_GENERATED_FILES", False), - patch("udata_hydra.analysis.geojson.minio_client_geojson", new=geo_client), - patch("udata_hydra.analysis.geojson.minio_client_pmtiles", new=pmtiles_client), + patch("udata_hydra.analysis.csv_geojson.minio_client_geojson", new=geo_client), + patch("udata_hydra.analysis.pmtiles.minio_client_pmtiles", new=pmtiles_client), patch("udata_hydra.analysis.helpers.download_url_to_tempfile", new=fake_download), ): await analyse_csv(check=check) diff --git a/tests/test_analysis/test_geojson.py b/tests/test_analysis/test_geojson.py index 1a368121..7929390d 100644 --- a/tests/test_analysis/test_geojson.py +++ b/tests/test_analysis/test_geojson.py @@ -56,7 +56,7 @@ async def test_geojson_to_pmtiles_valid_geometry(mocker): with patch("udata_hydra.utils.minio.Minio", return_value=mocked_minio): mocked_minio_client = MinIOClient(bucket=bucket, folder=folder) with ( - patch("udata_hydra.analysis.geojson.minio_client_pmtiles", new=mocked_minio_client), + patch("udata_hydra.analysis.pmtiles.minio_client_pmtiles", new=mocked_minio_client), patch("udata_hydra.config.REMOVE_GENERATED_FILES", False), ): mock_os = mocker.patch("udata_hydra.utils.minio.os") @@ -124,7 +124,7 @@ async def test_csv_to_geojson_big_file( with patch("udata_hydra.utils.minio.Minio", return_value=mocked_minio): mocked_minio_client = MinIOClient(bucket=bucket, folder=folder) - with patch("udata_hydra.analysis.geojson.minio_client_geojson", new=mocked_minio_client): + with patch("udata_hydra.analysis.csv_geojson.minio_client_geojson", new=mocked_minio_client): mock_os = mocker.patch("udata_hydra.utils.minio.os") mock_os.path = os.path mock_os.remove.return_value = None @@ -211,7 +211,7 @@ async def test_geojson_to_pmtiles_big_file(mocker, input_file: str | None): mocked_minio_client = MinIOClient(bucket=bucket, folder=folder) with ( - patch("udata_hydra.analysis.geojson.minio_client_pmtiles", new=mocked_minio_client), + patch("udata_hydra.analysis.pmtiles.minio_client_pmtiles", new=mocked_minio_client), patch("udata_hydra.config.REMOVE_GENERATED_FILES", False), ): mock_os = mocker.patch("udata_hydra.utils.minio.os") diff --git a/udata_hydra/analysis/csv_geojson.py b/udata_hydra/analysis/csv_geojson.py new file mode 100644 index 00000000..924c46a2 --- /dev/null +++ b/udata_hydra/analysis/csv_geojson.py @@ -0,0 +1,283 @@ +import json +import logging +import os +from pathlib import Path +from typing import Any, Iterator + +from json_stream import streamable_list + +from udata_hydra import config, context +from udata_hydra.db import RESERVED_COLS +from udata_hydra.utils.casting import generate_records +from udata_hydra.utils.minio import MinIOClient + +DEFAULT_GEOJSON_FILEPATH = Path("converted_from_csv.geojson") + +log = logging.getLogger("udata-hydra") + +minio_client_geojson = MinIOClient( + bucket=config.MINIO_GEOJSON_BUCKET, folder=config.MINIO_GEOJSON_FOLDER +) + + +def _cast_latlon(latlon: str) -> list[float]: + # Detection already validated the format; values may look like "[48.8566, 2.3522]" + # or "48.8566 , 2.3522". Strip spaces and brackets, then split on comma. + lat, lon = latlon.replace(" ", "").replace("[", "").replace("]", "").split(",") + # GeoJSON standard: longitude before latitude + return [float(lon), float(lat)] + + +def _quote_ident(name: str) -> str: + """Escape a PostgreSQL identifier to prevent SQL injection.""" + return '"' + name.replace('"', '""') + '"' + + +def _clean_pair_sql(col: str) -> str: + """SQL version — used when reading from PostgreSQL.""" + return f"replace(replace(replace({_quote_ident(col)}, ' ', ''), '[', ''), ']', '')" + + +async def csv_to_geojson( + file_path: str, + inspection: dict, + output_file_path: Path, + upload_to_minio: bool = True, +) -> tuple[int, str | None] | None: + """ + Convert a CSV file to GeoJSON format and optionally upload to MinIO. + + Detects geographical columns (geometry, latlon, lonlat, or lat/lon) and converts + CSV data to GeoJSON features. Rows with NaN values in geographical columns are skipped. + + Args: + file_path: Target CSV file to convert. + inspection: CSV detective analysis results with column format detection. + output_file_path: Path where the GeoJSON file should be saved. + upload_to_minio: Whether to upload to MinIO (default: True). + + Returns: + geojson_size: Size of the GeoJSON file in bytes. + geojson_url: URL of the GeoJSON file on MinIO. None if it was not uploaded to MinIO. + """ + + def get_features( + file_path: str, inspection: dict, geo: dict[str, Any] + ) -> Iterator[dict[str, Any]]: + for row in generate_records(file_path, inspection, cast_json=False, as_dict=True): + if "geojson" in geo: + yield { + "type": "Feature", + # empty geometry cells can happen, we keep them but they won't be displayable + "geometry": ( + json.loads(row[geo["geojson"]]) if row[geo["geojson"]] is not None else None + ), + "properties": {col: row[col] for col in row.keys() if col != geo["geojson"]}, # type: ignore[union-attr] + } + + elif "latlon" in geo or "lonlat" in geo: + pair_key = "latlon" if "latlon" in geo else "lonlat" + pair_col = geo[pair_key] + if row[pair_col] is None: + continue + coords = _cast_latlon(row[pair_col]) + # latlon already returns [lon, lat]; lonlat needs inversion + if pair_key == "lonlat": + coords = coords[::-1] + yield { + "type": "Feature", + "geometry": { + "type": "Point", + "coordinates": coords, + }, + "properties": {col: row[col] for col in row.keys() if col != pair_col}, # type: ignore[union-attr] + } + + else: + # skipping row if lat or lon is NaN + if any(coord is None for coord in (row[geo["longitude"]], row[geo["latitude"]])): + continue + yield { + "type": "Feature", + "geometry": { + "type": "Point", + # these columns are precast by csv-detective + "coordinates": [row[geo["longitude"]], row[geo["latitude"]]], + }, + "properties": { + col: row[col] + for col in row.keys() # type: ignore[union-attr] + if col not in [geo["longitude"], geo["latitude"]] + }, + } + + geo = _detect_geo_columns(inspection) + if geo is None: + log.debug("No geographical columns found, skipping") + return None + + template = {"type": "FeatureCollection"} + + template["features"] = streamable_list(get_features(file_path, inspection, geo)) + + with output_file_path.open("w") as f: + json.dump(template, f, indent=4, ensure_ascii=False, default=str) + + geojson_size: int = os.path.getsize(output_file_path) + + if upload_to_minio: + log.debug(f"Sending GeoJSON file {output_file_path} to MinIO") + geojson_url = minio_client_geojson.send_file(str(output_file_path), delete_source=False) + else: + geojson_url = None + + return geojson_size, geojson_url + + +def _detect_geo_columns(inspection: dict) -> dict[str, str] | None: + """Detect geographical columns from csv-detective inspection results. + + Returns a dict like {"latitude": "col_name", "longitude": "col_name"} + or {"geojson": "col_name"} etc, or None if no geo columns found. + """ + geo = {} + for column, detection in inspection["columns"].items(): + # see csv-detective's geo formats: + # https://github.com/datagouv/csv-detective/tree/main/csv_detective/formats + # geo looks like {fmt: (col, score), ...} + for fmt in ["geojson", "latlon", "lonlat", "latitude", "longitude"]: + # loop through the columns, for each geo format store the column that scored the highest + if fmt in detection["format"]: + if not geo.get(fmt): + geo[fmt] = (column, detection["score"]) + elif geo[fmt][1] < detection["score"]: + geo[fmt] = (column, detection["score"]) + # priority is given to geojson, then latlon, then lonlat, then latitude + longitude + if "geojson" in geo: + return {"geojson": geo["geojson"][0]} + elif "latlon" in geo: + return {"latlon": geo["latlon"][0]} + elif "lonlat" in geo: + return {"lonlat": geo["lonlat"][0]} + elif "latitude" in geo and "longitude" in geo: + return {"latitude": geo["latitude"][0], "longitude": geo["longitude"][0]} + return None + + +def _db_col_name(col: str) -> str: + """Map a CSV column name to its actual PostgreSQL column name.""" + return f"{col}__hydra_renamed" if col.lower() in RESERVED_COLS else col + + +def _build_feature_sql( + table_name: str, geo: dict[str, str], columns: list[str] +) -> tuple[str, list[str]]: + """Build a SQL query that generates GeoJSON features directly in PostgreSQL. + + Column names in `columns` and `geo` are the original CSV names. They are + mapped to their actual DB names (handling RESERVED_COLS renaming) for the + SQL identifiers, while the original names are passed as query parameters + for the JSON keys so the GeoJSON output matches what the CSV path produces. + + Returns (query, params) where params are the original column names used as + JSON keys via $1, $2… placeholders. + """ + property_cols = [c for c in columns if c not in geo.values()] + params: list[str] = [] + properties_fragments = [] + for col in property_cols: + params.append(col) + placeholder = f"${len(params)}::text" + properties_fragments.append(f"{placeholder}, {_quote_ident(_db_col_name(col))}") + + # PostgreSQL's json_build_object accepts max 100 arguments (50 key-value pairs). + # Split into chunks and merge with || when needed. + max_pairs = 50 + chunks = [ + properties_fragments[i : i + max_pairs] + for i in range(0, len(properties_fragments), max_pairs) + ] + if len(chunks) <= 1: + properties_sql = f"json_build_object({', '.join(properties_fragments)})" + else: + parts = [f"jsonb_build_object({', '.join(chunk)})" for chunk in chunks] + properties_sql = f"({' || '.join(parts)})::json" + + if "geojson" in geo: + col = _db_col_name(geo["geojson"]) + geometry_sql = f"({_quote_ident(col)})::json" + where = "" + elif "latlon" in geo or "lonlat" in geo: + pair_key = "latlon" if "latlon" in geo else "lonlat" + col = _db_col_name(geo[pair_key]) + # latlon = "lat,lon" → GeoJSON needs [lon, lat] so swap indices + # lonlat = "lon,lat" → already in GeoJSON order + lon_idx, lat_idx = ("2", "1") if pair_key == "latlon" else ("1", "2") + geometry_sql = f"""json_build_object( + 'type', 'Point', + 'coordinates', json_build_array( + (split_part({_clean_pair_sql(col)}, ',', {lon_idx}))::float, + (split_part({_clean_pair_sql(col)}, ',', {lat_idx}))::float + ) + )""" + where = f"WHERE {_quote_ident(col)} IS NOT NULL" + else: + lon_col = _db_col_name(geo["longitude"]) + lat_col = _db_col_name(geo["latitude"]) + geometry_sql = f"""json_build_object( + 'type', 'Point', + 'coordinates', json_build_array({_quote_ident(lon_col)}, {_quote_ident(lat_col)}) + )""" + where = f"WHERE {_quote_ident(lat_col)} IS NOT NULL AND {_quote_ident(lon_col)} IS NOT NULL" + + query = f""" + SELECT json_build_object( + 'type', 'Feature', + 'geometry', {geometry_sql}, + 'properties', {properties_sql} + )::text + FROM {_quote_ident(table_name)} + {where} + """ + return query, params + + +async def csv_to_geojson_from_db( + table_name: str, + inspection: dict, + output_file_path: Path, + upload_to_minio: bool = True, +) -> tuple[int, str | None] | None: + """Generate a GeoJSON file by streaming features directly from PostgreSQL.""" + geo = _detect_geo_columns(inspection) + if geo is None: + log.debug("No geographical columns found, skipping") + return None + + columns = list(inspection["columns"].keys()) + query, params = _build_feature_sql(table_name, geo, columns) + + db = await context.pool("csv") + async with db.acquire() as conn: + async with conn.transaction(): + cursor = conn.cursor(query, *params) + + with output_file_path.open("w") as f: + f.write('{"type": "FeatureCollection", "features": [\n') + first = True + async for row in cursor: + if not first: + f.write(",\n") + f.write(row[0]) + first = False + f.write("\n]}") + + geojson_size: int = os.path.getsize(output_file_path) + + if upload_to_minio: + log.debug(f"Sending GeoJSON file {output_file_path} to MinIO") + geojson_url = minio_client_geojson.send_file(str(output_file_path), delete_source=False) + else: + geojson_url = None + + return geojson_size, geojson_url diff --git a/udata_hydra/analysis/geojson.py b/udata_hydra/analysis/geojson.py index 7e34950e..46940294 100644 --- a/udata_hydra/analysis/geojson.py +++ b/udata_hydra/analysis/geojson.py @@ -3,16 +3,23 @@ import os from datetime import datetime, timezone from pathlib import Path -from typing import Any, Iterator import aiohttp -import tippecanoe from asyncpg import Record -from json_stream import streamable_list from udata_hydra import config, context from udata_hydra.analysis import helpers -from udata_hydra.db import RESERVED_COLS +from udata_hydra.analysis.csv_geojson import ( + DEFAULT_GEOJSON_FILEPATH, + csv_to_geojson, + csv_to_geojson_from_db, + minio_client_geojson, +) +from udata_hydra.analysis.pmtiles import ( + DEFAULT_PMTILES_FILEPATH, + geojson_to_pmtiles, + minio_client_pmtiles, +) from udata_hydra.db.check import Check from udata_hydra.db.resource import Resource from udata_hydra.db.resource_exception import ResourceException @@ -24,42 +31,24 @@ queue, remove_remainders, ) -from udata_hydra.utils.casting import generate_records -from udata_hydra.utils.minio import MinIOClient - -DEFAULT_GEOJSON_FILEPATH = Path("converted_from_csv.geojson") -DEFAULT_PMTILES_FILEPATH = Path("converted_from_geojson.pmtiles") log = logging.getLogger("udata-hydra") - -# latlon/lonlat columns can contain values like "[48.8566, 2.3522]" or "48.8566 , 2.3522" -# Both versions below strip spaces and brackets, then split on comma. - - -def _cast_latlon(latlon: str) -> list[float]: - """Python version — used when reading from CSV.""" - lat, lon = latlon.replace(" ", "").replace("[", "").replace("]", "").split(",") - # GeoJSON standard: longitude before latitude - return [float(lon), float(lat)] - - -def _quote_ident(name: str) -> str: - """Escape a PostgreSQL identifier to prevent SQL injection.""" - return '"' + name.replace('"', '""') + '"' - - -def _clean_pair_sql(col: str) -> str: - """SQL version — used when reading from PostgreSQL.""" - return f"replace(replace(replace({_quote_ident(col)}, ' ', ''), '[', ''), ']', '')" - - -minio_client_pmtiles = MinIOClient( - bucket=config.MINIO_PMTILES_BUCKET, folder=config.MINIO_PMTILES_FOLDER -) -minio_client_geojson = MinIOClient( - bucket=config.MINIO_GEOJSON_BUCKET, folder=config.MINIO_GEOJSON_FOLDER -) +__all__ = [ + "DEFAULT_GEOJSON_FILEPATH", + "DEFAULT_PMTILES_FILEPATH", + "analyse_geojson", + "csv_to_geojson", + "csv_to_geojson_and_pmtiles", + "csv_to_geojson_from_db", + "export_geojson_for_csv_analysis", + "export_pmtiles_from_local_geojson", + "geojson_to_pmtiles", + "minio_client_geojson", + "minio_client_pmtiles", + "task_csv_to_geojson", + "task_geojson_to_pmtiles", +] async def analyse_geojson( @@ -135,298 +124,6 @@ async def analyse_geojson( await Resource.update(resource_id, {"status": None}) -async def csv_to_geojson( - file_path: str, - inspection: dict, - output_file_path: Path, - upload_to_minio: bool = True, -) -> tuple[int, str | None] | None: - """ - Convert a CSV file to GeoJSON format and optionally upload to MinIO. - - Detects geographical columns (geometry, latlon, lonlat, or lat/lon) and converts - CSV data to GeoJSON features. Rows with NaN values in geographical columns are skipped. - - Args: - file_path: Target CSV file to convert. - inspection: CSV detective analysis results with column format detection. - output_file_path: Path where the GeoJSON file should be saved. - upload_to_minio: Whether to upload to MinIO (default: True). - - Returns: - geojson_size: Size of the GeoJSON file in bytes. - geojson_url: URL of the GeoJSON file on MinIO. None if it was not uploaded to MinIO. - """ - - def get_features( - file_path: str, inspection: dict, geo: dict[str, Any] - ) -> Iterator[dict[str, Any]]: - for row in generate_records(file_path, inspection, cast_json=False, as_dict=True): - if "geojson" in geo: - yield { - "type": "Feature", - # empty geometry cells can happen, we keep them but they won't be displayable - "geometry": ( - json.loads(row[geo["geojson"]]) if row[geo["geojson"]] is not None else None - ), - "properties": {col: row[col] for col in row.keys() if col != geo["geojson"]}, # type: ignore[union-attr] - } - - elif "latlon" in geo or "lonlat" in geo: - pair_key = "latlon" if "latlon" in geo else "lonlat" - pair_col = geo[pair_key] - if row[pair_col] is None: - continue - coords = _cast_latlon(row[pair_col]) - # latlon already returns [lon, lat]; lonlat needs inversion - if pair_key == "lonlat": - coords = coords[::-1] - yield { - "type": "Feature", - "geometry": { - "type": "Point", - "coordinates": coords, - }, - "properties": {col: row[col] for col in row.keys() if col != pair_col}, # type: ignore[union-attr] - } - - else: - # skipping row if lat or lon is NaN - if any(coord is None for coord in (row[geo["longitude"]], row[geo["latitude"]])): - continue - yield { - "type": "Feature", - "geometry": { - "type": "Point", - # these columns are precast by csv-detective - "coordinates": [row[geo["longitude"]], row[geo["latitude"]]], - }, - "properties": { - col: row[col] - for col in row.keys() # type: ignore[union-attr] - if col not in [geo["longitude"], geo["latitude"]] - }, - } - - geo = _detect_geo_columns(inspection) - if geo is None: - log.debug("No geographical columns found, skipping") - return None - - template = {"type": "FeatureCollection"} - - template["features"] = streamable_list(get_features(file_path, inspection, geo)) - - with output_file_path.open("w") as f: - json.dump(template, f, indent=4, ensure_ascii=False, default=str) - - geojson_size: int = os.path.getsize(output_file_path) - - if upload_to_minio: - log.debug(f"Sending GeoJSON file {output_file_path} to MinIO") - geojson_url = minio_client_geojson.send_file(str(output_file_path), delete_source=False) - else: - geojson_url = None - - return geojson_size, geojson_url - - -def _detect_geo_columns(inspection: dict) -> dict[str, str] | None: - """Detect geographical columns from csv-detective inspection results. - - Returns a dict like {"latitude": "col_name", "longitude": "col_name"} - or {"geojson": "col_name"} etc, or None if no geo columns found. - """ - geo = {} - for column, detection in inspection["columns"].items(): - # see csv-detective's geo formats: - # https://github.com/datagouv/csv-detective/tree/main/csv_detective/formats - # geo looks like {fmt: (col, score), ...} - for fmt in ["geojson", "latlon", "lonlat", "latitude", "longitude"]: - # loop through the columns, for each geo format store the column that scored the highest - if fmt in detection["format"]: - if not geo.get(fmt): - geo[fmt] = (column, detection["score"]) - elif geo[fmt][1] < detection["score"]: - geo[fmt] = (column, detection["score"]) - # priority is given to geojson, then latlon, then lonlat, then latitude + longitude - if "geojson" in geo: - return {"geojson": geo["geojson"][0]} - elif "latlon" in geo: - return {"latlon": geo["latlon"][0]} - elif "lonlat" in geo: - return {"lonlat": geo["lonlat"][0]} - elif "latitude" in geo and "longitude" in geo: - return {"latitude": geo["latitude"][0], "longitude": geo["longitude"][0]} - return None - - -def _db_col_name(col: str) -> str: - """Map a CSV column name to its actual PostgreSQL column name.""" - return f"{col}__hydra_renamed" if col.lower() in RESERVED_COLS else col - - -def _build_feature_sql( - table_name: str, geo: dict[str, str], columns: list[str] -) -> tuple[str, list[str]]: - """Build a SQL query that generates GeoJSON features directly in PostgreSQL. - - Column names in `columns` and `geo` are the original CSV names. They are - mapped to their actual DB names (handling RESERVED_COLS renaming) for the - SQL identifiers, while the original names are passed as query parameters - for the JSON keys so the GeoJSON output matches what the CSV path produces. - - Returns (query, params) where params are the original column names used as - JSON keys via $1, $2… placeholders. - """ - property_cols = [c for c in columns if c not in geo.values()] - params: list[str] = [] - properties_fragments = [] - for col in property_cols: - params.append(col) - placeholder = f"${len(params)}::text" - properties_fragments.append(f"{placeholder}, {_quote_ident(_db_col_name(col))}") - - # PostgreSQL's json_build_object accepts max 100 arguments (50 key-value pairs). - # Split into chunks and merge with || when needed. - max_pairs = 50 - chunks = [ - properties_fragments[i : i + max_pairs] - for i in range(0, len(properties_fragments), max_pairs) - ] - if len(chunks) <= 1: - properties_sql = f"json_build_object({', '.join(properties_fragments)})" - else: - parts = [f"jsonb_build_object({', '.join(chunk)})" for chunk in chunks] - properties_sql = f"({' || '.join(parts)})::json" - - if "geojson" in geo: - col = _db_col_name(geo["geojson"]) - geometry_sql = f"({_quote_ident(col)})::json" - where = "" - elif "latlon" in geo or "lonlat" in geo: - pair_key = "latlon" if "latlon" in geo else "lonlat" - col = _db_col_name(geo[pair_key]) - # latlon = "lat,lon" → GeoJSON needs [lon, lat] so swap indices - # lonlat = "lon,lat" → already in GeoJSON order - lon_idx, lat_idx = ("2", "1") if pair_key == "latlon" else ("1", "2") - geometry_sql = f"""json_build_object( - 'type', 'Point', - 'coordinates', json_build_array( - (split_part({_clean_pair_sql(col)}, ',', {lon_idx}))::float, - (split_part({_clean_pair_sql(col)}, ',', {lat_idx}))::float - ) - )""" - where = f"WHERE {_quote_ident(col)} IS NOT NULL" - else: - lon_col = _db_col_name(geo["longitude"]) - lat_col = _db_col_name(geo["latitude"]) - geometry_sql = f"""json_build_object( - 'type', 'Point', - 'coordinates', json_build_array({_quote_ident(lon_col)}, {_quote_ident(lat_col)}) - )""" - where = f"WHERE {_quote_ident(lat_col)} IS NOT NULL AND {_quote_ident(lon_col)} IS NOT NULL" - - query = f""" - SELECT json_build_object( - 'type', 'Feature', - 'geometry', {geometry_sql}, - 'properties', {properties_sql} - )::text - FROM {_quote_ident(table_name)} - {where} - """ - return query, params - - -async def csv_to_geojson_from_db( - table_name: str, - inspection: dict, - output_file_path: Path, - upload_to_minio: bool = True, -) -> tuple[int, str | None] | None: - """Generate a GeoJSON file by streaming features directly from PostgreSQL.""" - geo = _detect_geo_columns(inspection) - if geo is None: - log.debug("No geographical columns found, skipping") - return None - - columns = list(inspection["columns"].keys()) - query, params = _build_feature_sql(table_name, geo, columns) - - db = await context.pool("csv") - async with db.acquire() as conn: - async with conn.transaction(): - cursor = conn.cursor(query, *params) - - with output_file_path.open("w") as f: - f.write('{"type": "FeatureCollection", "features": [\n') - first = True - async for row in cursor: - if not first: - f.write(",\n") - f.write(row[0]) - first = False - f.write("\n]}") - - geojson_size: int = os.path.getsize(output_file_path) - - if upload_to_minio: - log.debug(f"Sending GeoJSON file {output_file_path} to MinIO") - geojson_url = minio_client_geojson.send_file(str(output_file_path), delete_source=False) - else: - geojson_url = None - - return geojson_size, geojson_url - - -async def geojson_to_pmtiles( - input_file_path: Path, - output_file_path: Path, - upload_to_minio: bool = True, -) -> tuple[int, str | None]: - """ - Convert a GeoJSON file to PMTiles file and optionally upload to MinIO. - - Args: - input_file_path: GeoJSON file path to convert. - output_file_path: Path where the PMTiles file should be saved. - upload_to_minio: Whether to upload to MinIO (default: True). - - Returns: - pmtiles_size: size of the PMTiles file. - pmtiles_url: URL of the PMTiles file on MinIO. None if it was not uploaded to MinIO. - """ - - log.debug(f"Converting GeoJSON file '{input_file_path}' to PMTiles file '{output_file_path}'") - - command = [ - "--maximum-zoom=g", # guess - "-o", - str(output_file_path), - "--force", # don't crash if output file already exists, override it - "--coalesce-densest-as-needed", - "--extend-zooms-if-still-dropping", - str(input_file_path), - ] - exit_code = tippecanoe._program("tippecanoe", *command) - if exit_code: - raise ValueError(f"GeoJSON to PMTiles conversion failed with exit code {exit_code}") - log.debug(f"Successfully converted {input_file_path} to {output_file_path}") - - pmtiles_size: int = os.path.getsize(output_file_path) - - if upload_to_minio: - log.debug(f"Sending PMTiles file {output_file_path} to MinIO") - pmtiles_url = minio_client_pmtiles.send_file( - str(output_file_path), delete_source=config.REMOVE_GENERATED_FILES - ) - else: - pmtiles_url = None - - return pmtiles_size, pmtiles_url - - async def export_geojson_for_csv_analysis( *, resource_id: str | None, diff --git a/udata_hydra/analysis/pmtiles.py b/udata_hydra/analysis/pmtiles.py new file mode 100644 index 00000000..0261470f --- /dev/null +++ b/udata_hydra/analysis/pmtiles.py @@ -0,0 +1,63 @@ +import logging +import os +from pathlib import Path + +import tippecanoe + +from udata_hydra import config +from udata_hydra.utils.minio import MinIOClient + +DEFAULT_PMTILES_FILEPATH = Path("converted_from_geojson.pmtiles") + +log = logging.getLogger("udata-hydra") + +minio_client_pmtiles = MinIOClient( + bucket=config.MINIO_PMTILES_BUCKET, folder=config.MINIO_PMTILES_FOLDER +) + + +async def geojson_to_pmtiles( + input_file_path: Path, + output_file_path: Path, + upload_to_minio: bool = True, +) -> tuple[int, str | None]: + """ + Convert a GeoJSON file to PMTiles file and optionally upload to MinIO. + + Args: + input_file_path: GeoJSON file path to convert. + output_file_path: Path where the PMTiles file should be saved. + upload_to_minio: Whether to upload to MinIO (default: True). + + Returns: + pmtiles_size: size of the PMTiles file. + pmtiles_url: URL of the PMTiles file on MinIO. None if it was not uploaded to MinIO. + """ + + log.debug(f"Converting GeoJSON file '{input_file_path}' to PMTiles file '{output_file_path}'") + + command = [ + "--maximum-zoom=g", # guess + "-o", + str(output_file_path), + "--force", # don't crash if output file already exists, override it + "--coalesce-densest-as-needed", + "--extend-zooms-if-still-dropping", + str(input_file_path), + ] + exit_code = tippecanoe._program("tippecanoe", *command) + if exit_code: + raise ValueError(f"GeoJSON to PMTiles conversion failed with exit code {exit_code}") + log.debug(f"Successfully converted {input_file_path} to {output_file_path}") + + pmtiles_size: int = os.path.getsize(output_file_path) + + if upload_to_minio: + log.debug(f"Sending PMTiles file {output_file_path} to MinIO") + pmtiles_url = minio_client_pmtiles.send_file( + str(output_file_path), delete_source=config.REMOVE_GENERATED_FILES + ) + else: + pmtiles_url = None + + return pmtiles_size, pmtiles_url From e2b379c932a3a3ec6d927ddb131aeb63dbf58ab5 Mon Sep 17 00:00:00 2001 From: Adrien Carpentier Date: Tue, 7 Apr 2026 13:38:43 +0200 Subject: [PATCH 6/7] clean: update udata_hydra/analysis/geojson.py Co-authored-by: Thibaud Ollagnier --- udata_hydra/analysis/geojson.py | 2 -- 1 file changed, 2 deletions(-) diff --git a/udata_hydra/analysis/geojson.py b/udata_hydra/analysis/geojson.py index 46940294..e3e6e282 100644 --- a/udata_hydra/analysis/geojson.py +++ b/udata_hydra/analysis/geojson.py @@ -296,8 +296,6 @@ async def task_csv_to_geojson( if not inspection: log.error( f"task_csv_to_geojson: no tables_index row for resource {resource_id} table {parsing_table}", - resource_id, - parsing_table, ) return From c309aedbd869a9e051f815d7ebdfaaf346c47b1a Mon Sep 17 00:00:00 2001 From: Adrien Carpentier Date: Tue, 7 Apr 2026 14:40:19 +0200 Subject: [PATCH 7/7] refactor(utils): unify streaming HTTP download and relocate download_url_to_tempfile MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - Add _http_get_to_temp_path (Path, total_bytes cap, shared session options) - Factor download_resource and download_url_to_tempfile through it - GeoJSON→PMTiles: headers, max_size_allowed, IOException handling - download_file: BinaryIO + HTTP_DOWNLOAD_CHUNK_SIZE; remove_remainders uses Path.unlink Made-with: Cursor --- tests/test_analysis/test_analysis_csv.py | 2 +- udata_hydra/analysis/geojson.py | 10 +- udata_hydra/analysis/helpers.py | 20 ---- udata_hydra/utils/__init__.py | 2 + udata_hydra/utils/file.py | 114 ++++++++++++++++------- 5 files changed, 90 insertions(+), 58 deletions(-) diff --git a/tests/test_analysis/test_analysis_csv.py b/tests/test_analysis/test_analysis_csv.py index b9577664..1f95d825 100644 --- a/tests/test_analysis/test_analysis_csv.py +++ b/tests/test_analysis/test_analysis_csv.py @@ -1083,7 +1083,7 @@ async def fake_download(dl_url: str, suffix: str = "") -> Path: patch("udata_hydra.config.REMOVE_GENERATED_FILES", False), patch("udata_hydra.analysis.csv_geojson.minio_client_geojson", new=geo_client), patch("udata_hydra.analysis.pmtiles.minio_client_pmtiles", new=pmtiles_client), - patch("udata_hydra.analysis.helpers.download_url_to_tempfile", new=fake_download), + patch("udata_hydra.utils.file.download_url_to_tempfile", new=fake_download), ): await analyse_csv(check=check) diff --git a/udata_hydra/analysis/geojson.py b/udata_hydra/analysis/geojson.py index e3e6e282..501af680 100644 --- a/udata_hydra/analysis/geojson.py +++ b/udata_hydra/analysis/geojson.py @@ -27,6 +27,7 @@ IOException, ParseException, Timer, + download_url_to_tempfile, handle_parse_exception, queue, remove_remainders, @@ -396,7 +397,12 @@ async def task_geojson_to_pmtiles(check_id: int) -> None: try: try: - tmp_geo = await helpers.download_url_to_tempfile(geojson_url, suffix=".geojson") + tmp_geo = await download_url_to_tempfile( + geojson_url, + suffix=".geojson", + headers=json.loads(check.get("headers") or "{}"), + max_size_allowed=int(config.MAX_FILESIZE_ALLOWED.get("geojson", "csv")), + ) await export_pmtiles_from_local_geojson( resource_id=resource_id, check_id=check_id, @@ -419,7 +425,7 @@ async def task_geojson_to_pmtiles(check_id: int) -> None: if resource is not None and refreshed: await helpers.notify_udata(resource, dict(refreshed)) - except (ParseException, OSError, aiohttp.ClientError) as e: + except (ParseException, OSError, aiohttp.ClientError, IOException) as e: if isinstance(e, ParseException): await handle_parse_exception(e, check.get("parsing_table"), check) else: diff --git a/udata_hydra/analysis/helpers.py b/udata_hydra/analysis/helpers.py index d58dd438..4ccfe3d7 100644 --- a/udata_hydra/analysis/helpers.py +++ b/udata_hydra/analysis/helpers.py @@ -1,11 +1,7 @@ import json import logging -import os -import tempfile -from pathlib import Path from typing import IO -import aiohttp from asyncpg import Record from udata_hydra import config @@ -49,22 +45,6 @@ async def read_or_download_file( return tmp_file -async def download_url_to_tempfile(url: str, suffix: str = "") -> Path: - """Download a URL to a named temporary file and return its path.""" - async with aiohttp.ClientSession() as session: - async with session.get(url) as resp: - resp.raise_for_status() - body = await resp.read() - fd, raw_path = tempfile.mkstemp(suffix=suffix) - path = Path(raw_path) - try: - os.write(fd, body) - finally: - os.close(fd) - log.debug(f"Downloaded {url} to {path} ({len(body)} bytes)") - return path - - async def notify_udata(resource: Record | None, check: Record | dict | None) -> None: """Notify udata of the result of a parsing""" if check is None or resource is None: diff --git a/udata_hydra/utils/__init__.py b/udata_hydra/utils/__init__.py index 6496ac07..ca5c9d9d 100644 --- a/udata_hydra/utils/__init__.py +++ b/udata_hydra/utils/__init__.py @@ -5,6 +5,7 @@ compute_checksum_from_file, download_file, download_resource, + download_url_to_tempfile, extract_gzip, remove_remainders, ) @@ -24,6 +25,7 @@ "compute_checksum_from_file", "download_file", "download_resource", + "download_url_to_tempfile", "extract_gzip", "remove_remainders", "detect_geojson_from_headers", diff --git a/udata_hydra/utils/file.py b/udata_hydra/utils/file.py index 2005ed47..81fe7bed 100644 --- a/udata_hydra/utils/file.py +++ b/udata_hydra/utils/file.py @@ -2,10 +2,10 @@ import hashlib import logging import mimetypes -import os import re import tempfile -from typing import IO +from pathlib import Path +from typing import IO, BinaryIO import aiohttp import magic @@ -15,6 +15,8 @@ log = logging.getLogger("udata-hydra") +HTTP_DOWNLOAD_CHUNK_SIZE = 1024 + def compute_checksum_from_file(filename: str) -> str: """Compute sha1 in blocks""" @@ -27,7 +29,7 @@ def compute_checksum_from_file(filename: str) -> str: return sha1sum.hexdigest() -def extract_gzip(file_path: str) -> IO[bytes]: +def extract_gzip(file_path: str | Path) -> IO[bytes]: with gzip.open(file_path, "rb") as gz_file: with tempfile.NamedTemporaryFile( dir=config.TEMPORARY_DOWNLOAD_FOLDER or None, mode="wb", delete=False @@ -36,16 +38,14 @@ def extract_gzip(file_path: str) -> IO[bytes]: return temp_file -async def download_resource( +async def _http_get_to_temp_path( url: str, - headers: dict | None = None, - max_size_allowed: int | None = None, -) -> tuple[IO[bytes], str]: - """ - Attempts downloading a resource from a given url. - Returns a tuple of (downloaded_file_object, detected_extension). - Raises custom IOException if the resource is too large or if the URL is unreachable. - """ + headers: dict | None, + max_size_allowed: int | None, + suffix: str, + io_error_message: str, +) -> tuple[Path, int]: + """Stream a GET response to a named temp file. Returns (path, total_bytes).""" if ( headers and max_size_allowed is not None @@ -53,69 +53,114 @@ async def download_resource( ): raise IOException("File too large to download", url=url) - tmp_file = tempfile.NamedTemporaryFile( - dir=config.TEMPORARY_DOWNLOAD_FOLDER or None, delete=False + tmp_file: IO[bytes] = tempfile.NamedTemporaryFile( + dir=config.TEMPORARY_DOWNLOAD_FOLDER or None, + delete=False, + suffix=suffix, ) - - chunk_size = 1024 - i = 0 - too_large, download_error = False, None + path = Path(tmp_file.name) + total_bytes: int = 0 + too_large: bool = False + download_error: aiohttp.ClientResponseError | None = None try: async with aiohttp.ClientSession( headers={"user-agent": config.USER_AGENT_FULL}, raise_for_status=True, ) as session: async with session.get(url, allow_redirects=True) as response: - async for chunk in response.content.iter_chunked(chunk_size): - if max_size_allowed is None or i * chunk_size < max_size_allowed: + async for chunk in response.content.iter_chunked(HTTP_DOWNLOAD_CHUNK_SIZE): + if max_size_allowed is None or total_bytes + len(chunk) <= max_size_allowed: tmp_file.write(chunk) + total_bytes += len(chunk) else: too_large = True break - i += 1 except aiohttp.ClientResponseError as e: download_error = e finally: tmp_file.close() + if too_large or download_error: + path.unlink(missing_ok=True) if too_large: - os.remove(tmp_file.name) raise IOException("File too large to download", url=url) if download_error: - os.remove(tmp_file.name) - raise IOException("Error downloading CSV", url=url) from download_error + raise IOException(io_error_message, url=url) from download_error + + return path, total_bytes + + +async def download_resource( + url: str, + headers: dict | None = None, + max_size_allowed: int | None = None, +) -> tuple[IO[bytes], str]: + """ + Attempts downloading a resource from a given url. + Returns a tuple of (downloaded_file_object, detected_extension). + Raises custom IOException if the resource is too large or if the URL is unreachable. + """ + path, _ = await _http_get_to_temp_path( + url, + headers, + max_size_allowed, + "", + "Error downloading CSV", + ) - detected_extension = "" + detected_extension: str = "" - if magic.from_file(tmp_file.name, mime=True) in [ + if magic.from_file(path, mime=True) in [ "application/x-gzip", "application/gzip", ]: # It's compressed - extract and determine extension from URL - gzip_tmp_file_name = tmp_file.name - tmp_file = extract_gzip(tmp_file.name) + tmp_file: IO[bytes] = extract_gzip(path) # Remove the gzip original temporary file - os.remove(gzip_tmp_file_name) + path.unlink() # Extract any extension before .gz using regex - match = re.search(r"\.([^.]+)\.gz$", url) + match: re.Match[str] | None = re.search(r"\.([^.]+)\.gz$", url) if match: detected_extension = f".{match.group(1)}" else: detected_extension = "" else: # Not compressed - use magic to detect type - mime_type = magic.from_file(tmp_file.name, mime=True) + mime_type: str = magic.from_file(path, mime=True) detected_extension = mimetypes.guess_extension(mime_type) or "" + tmp_file = open(path, "rb") + tmp_file.close() return tmp_file, detected_extension -async def download_file(url: str, fd): +async def download_url_to_tempfile( + url: str, + suffix: str = "", + headers: dict | None = None, + max_size_allowed: int | None = None, +) -> Path: + """Download a URL to a named temporary file and return its path. + + Streams the response body to disk to avoid loading large files into memory. + """ + path, total_bytes = await _http_get_to_temp_path( + url, + headers, + max_size_allowed, + suffix, + "Error downloading file", + ) + log.debug(f"Downloaded {url} to {path} ({total_bytes} bytes)") + return path + + +async def download_file(url: str, fd: BinaryIO) -> None: """Download a file from URL to a file descriptor""" async with aiohttp.ClientSession() as session: async with session.get(url) as resp: while True: - chunk = await resp.content.read(1024) + chunk: bytes = await resp.content.read(HTTP_DOWNLOAD_CHUNK_SIZE) if not chunk: break fd.write(chunk) @@ -124,5 +169,4 @@ async def download_file(url: str, fd): def remove_remainders(resource_id: str, extensions: list[str]) -> None: """Delete potential remainders from process that crashed""" for ext in extensions: - if os.path.exists(f"{resource_id}.{ext}"): - os.remove(f"{resource_id}.{ext}") + Path(f"{resource_id}.{ext}").unlink(missing_ok=True)