From 3181c12458f75308cb9a6db9020bdc2f54128187 Mon Sep 17 00:00:00 2001 From: Thibaud Dauce Date: Thu, 2 Apr 2026 12:38:27 +0200 Subject: [PATCH 01/12] feat: generate GeoJSON from PostgreSQL instead of re-reading CSV Stream GeoJSON features directly from the database using a cursor, avoiding a third read of the CSV file. PostgreSQL builds the JSON with json_build_object, so no Python-level casting is needed. Also extract _detect_geo_columns to deduplicate the geo column detection logic, and add detailed timer marks for the geojson and pmtiles conversion steps. --- udata_hydra/analysis/csv.py | 1 + udata_hydra/analysis/geojson.py | 168 +++++++++++++++++++++++++++----- 2 files changed, 146 insertions(+), 23 deletions(-) diff --git a/udata_hydra/analysis/csv.py b/udata_hydra/analysis/csv.py index bb575957..f9eb39bb 100644 --- a/udata_hydra/analysis/csv.py +++ b/udata_hydra/analysis/csv.py @@ -188,6 +188,7 @@ async def analyse_csv( resource_id=resource_id, check_id=check["id"], timer=timer, + table_name=table_name if config.CSV_TO_DB else None, ) except Exception as e: remove_remainders(resource_id, ["geojson", "pmtiles", "pmtiles-journal"]) diff --git a/udata_hydra/analysis/geojson.py b/udata_hydra/analysis/geojson.py index 72b0cdbc..a4d54694 100644 --- a/udata_hydra/analysis/geojson.py +++ b/udata_hydra/analysis/geojson.py @@ -9,7 +9,7 @@ from asyncpg import Record from json_stream import streamable_list -from udata_hydra import config +from udata_hydra import config, context from udata_hydra.analysis import helpers from udata_hydra.db.check import Check from udata_hydra.db.resource import Resource @@ -29,6 +29,28 @@ log = logging.getLogger("udata-hydra") + +# latlon/lonlat columns can contain values like "[48.8566, 2.3522]" or "48.8566 , 2.3522" +# Both versions below strip spaces and brackets, then split on comma. + + +def _cast_latlon(latlon: str) -> list[float]: + """Python version — used when reading from CSV.""" + lat, lon = latlon.replace(" ", "").replace("[", "").replace("]", "").split(",") + # GeoJSON standard: longitude before latitude + return [float(lon), float(lat)] + + +def _quote_ident(name: str) -> str: + """Escape a PostgreSQL identifier to prevent SQL injection.""" + return '"' + name.replace('"', '""') + '"' + + +def _clean_pair_sql(col: str) -> str: + """SQL version — used when reading from PostgreSQL.""" + return f"replace(replace(replace({_quote_ident(col)}, ' ', ''), '[', ''), ']', '')" + + minio_client_pmtiles = MinIOClient( bucket=config.MINIO_PMTILES_BUCKET, folder=config.MINIO_PMTILES_FOLDER ) @@ -133,13 +155,6 @@ async def csv_to_geojson( geojson_url: URL of the GeoJSON file on MinIO. None if it was not uploaded to MinIO. """ - def cast_latlon(latlon: str) -> list[float]: - # we can safely do this as the detection was successful - # removing potential blank and brackets - lat, lon = latlon.replace(" ", "").replace("[", "").replace("]", "").split(",") - # using the geojson standard: longitude before latitude - return [float(lon), float(lat)] - def get_features( file_path: str, inspection: dict, geo: dict[str, Any] ) -> Iterator[dict[str, Any]]: @@ -163,7 +178,7 @@ def get_features( "type": "Feature", "geometry": { "type": "Point", - "coordinates": cast_latlon(row[geo["latlon"]]), + "coordinates": _cast_latlon(row[geo["latlon"]]), }, "properties": {col: row[col] for col in row.keys() if col != geo["latlon"]}, # type: ignore[union-attr] } @@ -178,7 +193,7 @@ def get_features( "geometry": { "type": "Point", # inverting lon and lat to match the standard - "coordinates": cast_latlon(row[geo["lonlat"]])[::-1], + "coordinates": _cast_latlon(row[geo["lonlat"]])[::-1], }, "properties": {col: row[col] for col in row.keys() if col != geo["lonlat"]}, # type: ignore[union-attr] } @@ -201,6 +216,35 @@ def get_features( }, } + geo = _detect_geo_columns(inspection) + if geo is None: + log.debug("No geographical columns found, skipping") + return None + + template = {"type": "FeatureCollection"} + + template["features"] = streamable_list(get_features(file_path, inspection, geo)) + + with output_file_path.open("w") as f: + json.dump(template, f, indent=4, ensure_ascii=False, default=str) + + geojson_size: int = os.path.getsize(output_file_path) + + if upload_to_minio: + log.debug(f"Sending GeoJSON file {output_file_path} to MinIO") + geojson_url = minio_client_geojson.send_file(str(output_file_path), delete_source=False) + else: + geojson_url = None + + return geojson_size, geojson_url + + +def _detect_geo_columns(inspection: dict) -> dict[str, str] | None: + """Detect geographical columns from csv-detective inspection results. + + Returns a dict like {"latitude": "col_name", "longitude": "col_name"} + or {"geojson": "col_name"} etc, or None if no geo columns found. + """ geo = {} for column, detection in inspection["columns"].items(): # see csv-detective's geo formats: @@ -215,26 +259,95 @@ def get_features( geo[fmt] = (column, detection["score"]) # priority is given to geojson, then latlon, then lonlat, then latitude + longitude if "geojson" in geo: - geo = {"geojson": geo["geojson"][0]} + return {"geojson": geo["geojson"][0]} elif "latlon" in geo: - geo = {"latlon": geo["latlon"][0]} + return {"latlon": geo["latlon"][0]} elif "lonlat" in geo: - geo = {"lonlat": geo["lonlat"][0]} + return {"lonlat": geo["lonlat"][0]} elif "latitude" in geo and "longitude" in geo: - geo = { - "latitude": geo["latitude"][0], - "longitude": geo["longitude"][0], - } + return {"latitude": geo["latitude"][0], "longitude": geo["longitude"][0]} + return None + + +def _build_feature_sql(table_name: str, geo: dict[str, str], columns: list[str]) -> str: + """Build a SQL query that generates GeoJSON features directly in PostgreSQL.""" + property_cols = [c for c in columns if c not in geo.values()] + properties_args = ", ".join(f"'{col}', {_quote_ident(col)}" for col in property_cols) + + if "geojson" in geo: + col = geo["geojson"] + geometry_sql = f"({_quote_ident(col)})::json" + where = "" + elif "latlon" in geo: + # latlon = "lat,lon" → GeoJSON needs [lon, lat] + col = geo["latlon"] + geometry_sql = f"""json_build_object( + 'type', 'Point', + 'coordinates', json_build_array( + (split_part({_clean_pair_sql(col)}, ',', 2))::float, + (split_part({_clean_pair_sql(col)}, ',', 1))::float + ) + )""" + where = f"WHERE {_quote_ident(col)} IS NOT NULL" + elif "lonlat" in geo: + col = geo["lonlat"] + geometry_sql = f"""json_build_object( + 'type', 'Point', + 'coordinates', json_build_array( + (split_part({_clean_pair_sql(col)}, ',', 1))::float, + (split_part({_clean_pair_sql(col)}, ',', 2))::float + ) + )""" + where = f"WHERE {_quote_ident(col)} IS NOT NULL" else: + lon_col = geo["longitude"] + lat_col = geo["latitude"] + geometry_sql = f"""json_build_object( + 'type', 'Point', + 'coordinates', json_build_array({_quote_ident(lon_col)}, {_quote_ident(lat_col)}) + )""" + where = f"WHERE {_quote_ident(lat_col)} IS NOT NULL AND {_quote_ident(lon_col)} IS NOT NULL" + + return f""" + SELECT json_build_object( + 'type', 'Feature', + 'geometry', {geometry_sql}, + 'properties', json_build_object({properties_args}) + )::text + FROM {_quote_ident(table_name)} + {where} + """ + + +async def csv_to_geojson_from_db( + table_name: str, + inspection: dict, + output_file_path: Path, + upload_to_minio: bool = True, +) -> tuple[int, str | None] | None: + """Generate a GeoJSON file by streaming features directly from PostgreSQL.""" + geo = _detect_geo_columns(inspection) + if geo is None: log.debug("No geographical columns found, skipping") return None - template = {"type": "FeatureCollection"} + columns = list(inspection["columns"].keys()) + query = _build_feature_sql(table_name, geo, columns) - template["features"] = streamable_list(get_features(file_path, inspection, geo)) + db = await context.pool("csv") + async with db.acquire() as conn: + async with conn.transaction(): + cursor = conn.cursor(query) - with output_file_path.open("w") as f: - json.dump(template, f, indent=4, ensure_ascii=False, default=str) + with output_file_path.open("w") as f: + f.write('{"type": "FeatureCollection", "features": [\n') + first = True + async for row in cursor: + if not first: + f.write(",\n") + f.write(row[0]) + first = False + f.write("\n]}") geojson_size: int = os.path.getsize(output_file_path) @@ -300,6 +413,7 @@ async def csv_to_geojson_and_pmtiles( resource_id: str | None = None, check_id: int | None = None, timer: Timer | None = None, + table_name: str | None = None, ) -> tuple[Path, int, str | None, Path, int, str | None] | None: if not config.CSV_TO_GEOJSON: log.debug("CSV_TO_GEOJSON turned off, skipping geojson/PMtiles export.") @@ -318,8 +432,16 @@ async def csv_to_geojson_and_pmtiles( geojson_filepath = DEFAULT_GEOJSON_FILEPATH pmtiles_filepath = DEFAULT_PMTILES_FILEPATH - # Convert CSV to GeoJSON - result = await csv_to_geojson(file_path, inspection, geojson_filepath, upload_to_minio=True) + # Convert to GeoJSON — from DB if available, otherwise from CSV file + if table_name: + result = await csv_to_geojson_from_db( + table_name, + inspection, + geojson_filepath, + upload_to_minio=True, + ) + else: + result = await csv_to_geojson(file_path, inspection, geojson_filepath, upload_to_minio=True) if result is None: return None geojson_size, geojson_url = result From 76a7757f5f1f9fe44c506acca1070501978e8bd8 Mon Sep 17 00:00:00 2001 From: Thibaud Dauce Date: Thu, 2 Apr 2026 13:25:22 +0200 Subject: [PATCH 02/12] add tests --- tests/test_analysis/test_analysis_csv.py | 79 +++++++++++++++++++++++- 1 file changed, 78 insertions(+), 1 deletion(-) diff --git a/tests/test_analysis/test_analysis_csv.py b/tests/test_analysis/test_analysis_csv.py index 04f4874f..dcf9e063 100644 --- a/tests/test_analysis/test_analysis_csv.py +++ b/tests/test_analysis/test_analysis_csv.py @@ -14,7 +14,7 @@ from tests.conftest import RESOURCE_ID, RESOURCE_URL from udata_hydra.analysis.csv import analyse_csv, csv_to_db -from udata_hydra.analysis.geojson import csv_to_geojson_and_pmtiles +from udata_hydra.analysis.geojson import csv_to_geojson_and_pmtiles, csv_to_geojson_from_db from udata_hydra.crawl.check_resources import check_resource from udata_hydra.db.check import Check from udata_hydra.db.resource import Resource @@ -735,6 +735,83 @@ async def test_csv_to_geojson_pmtiles(db, params, clean_db, mocker): pmtiles_filepath.unlink() +@pytest.mark.parametrize( + "geo_columns,expected_geo_key", + ( + # latlon format: "lat,lon" → GeoJSON coordinates should be [lon, lat] + ( + {"coords": [f"{10.0 * k * (-1) ** k},{20.0 * k * (-1) ** k}" for k in range(1, 6)]}, + "latlon", + ), + # separate latitude + longitude columns + ( + { + "lat": [10.0 * k * (-1) ** k for k in range(1, 6)], + "long": [20.0 * k * (-1) ** k for k in range(1, 6)], + }, + "latitude", + ), + ), +) +async def test_csv_to_geojson_from_db(db, geo_columns, expected_geo_key, clean_db): + output_path = Path(f"{RESOURCE_ID}.geojson") + try: + output_path.unlink() + except FileNotFoundError: + pass + + other_columns = { + "nombre": range(1, 6), + "score": [0.01, 1.2, 34.5, 678.9, 10], + } + sep = ";" + columns = other_columns | geo_columns + file = sep.join(columns) + "\n" + for i in range(len(other_columns["nombre"])): + file += sep.join(str(val) for val in [data[i] for data in columns.values()]) + "\n" + + with NamedTemporaryFile(delete=False) as fp: + fp.write(file.encode("utf-8")) + fp.seek(0) + inspection = csv_detective_routine( + file_path=fp.name, + output_profile=True, + num_rows=-1, + save_results=False, + ) + + table_name = "test_geojson_from_db" + with patch("udata_hydra.config.CSV_TO_DB", True): + await csv_to_db(fp.name, inspection, table_name) + + result = await csv_to_geojson_from_db( + table_name, inspection, output_path, upload_to_minio=False + ) + assert result is not None + geojson_size, geojson_url = result + assert geojson_url is None + assert geojson_size > 0 + + with open(output_path) as f: + geojson = json.load(f) + + assert geojson["type"] == "FeatureCollection" + assert len(geojson["features"]) == 5 + for feat in geojson["features"]: + assert feat["type"] == "Feature" + assert feat["geometry"]["type"] == "Point" + coords = feat["geometry"]["coordinates"] + assert isinstance(coords[0], (int, float)) + assert isinstance(coords[1], (int, float)) + assert "nombre" in feat["properties"] + assert "score" in feat["properties"] + for geo_col in geo_columns: + assert geo_col not in feat["properties"] + + output_path.unlink() + await db.execute(f'DROP TABLE IF EXISTS "{table_name}"') + + @pytest.mark.parametrize( "params", ( From 8fc9eadee080d913c02b11ffff31d5a947b60d0d Mon Sep 17 00:00:00 2001 From: Thibaud Dauce Date: Thu, 2 Apr 2026 13:30:52 +0200 Subject: [PATCH 03/12] fix bug with reserved column renaming --- tests/test_analysis/test_analysis_csv.py | 50 ++++++++++++++++++++++ tests/test_analysis/test_parquet_export.py | 2 +- udata_hydra/analysis/csv.py | 3 +- udata_hydra/analysis/geojson.py | 29 +++++++++---- udata_hydra/analysis/parquet.py | 4 +- udata_hydra/db/__init__.py | 5 +++ 6 files changed, 79 insertions(+), 14 deletions(-) diff --git a/tests/test_analysis/test_analysis_csv.py b/tests/test_analysis/test_analysis_csv.py index dcf9e063..ae43da0a 100644 --- a/tests/test_analysis/test_analysis_csv.py +++ b/tests/test_analysis/test_analysis_csv.py @@ -812,6 +812,56 @@ async def test_csv_to_geojson_from_db(db, geo_columns, expected_geo_key, clean_d await db.execute(f'DROP TABLE IF EXISTS "{table_name}"') +async def test_csv_to_geojson_from_db_with_reserved_column(db, clean_db): + """A CSV with a reserved PG column name (xmin) should still produce valid GeoJSON from DB.""" + output_path = Path(f"{RESOURCE_ID}.geojson") + try: + output_path.unlink() + except FileNotFoundError: + pass + + sep = ";" + columns = { + "xmin": range(1, 6), + "lat": [10.0 * k * (-1) ** k for k in range(1, 6)], + "long": [20.0 * k * (-1) ** k for k in range(1, 6)], + } + file = sep.join(columns) + "\n" + for i in range(5): + file += sep.join(str(val) for val in [data[i] for data in columns.values()]) + "\n" + + with NamedTemporaryFile(delete=False) as fp: + fp.write(file.encode("utf-8")) + fp.seek(0) + inspection = csv_detective_routine( + file_path=fp.name, + output_profile=True, + num_rows=-1, + save_results=False, + ) + + table_name = "test_geojson_reserved_col" + with patch("udata_hydra.config.CSV_TO_DB", True): + await csv_to_db(fp.name, inspection, table_name) + + result = await csv_to_geojson_from_db( + table_name, inspection, output_path, upload_to_minio=False + ) + assert result is not None + geojson_size, _ = result + + with open(output_path) as f: + geojson = json.load(f) + + assert len(geojson["features"]) == 5 + expected_xmin_values = list(range(1, 6)) + actual_xmin_values = [feat["properties"]["xmin"] for feat in geojson["features"]] + assert actual_xmin_values == expected_xmin_values + + output_path.unlink() + await db.execute(f'DROP TABLE IF EXISTS "{table_name}"') + + @pytest.mark.parametrize( "params", ( diff --git a/tests/test_analysis/test_parquet_export.py b/tests/test_analysis/test_parquet_export.py index 4b25f22e..758c149f 100755 --- a/tests/test_analysis/test_parquet_export.py +++ b/tests/test_analysis/test_parquet_export.py @@ -7,10 +7,10 @@ from tests.conftest import RESOURCE_ID from udata_hydra.analysis.csv import ( - RESERVED_COLS, csv_to_parquet, generate_records, ) +from udata_hydra.db import RESERVED_COLS from udata_hydra.utils.minio import MinIOClient from udata_hydra.utils.parquet import save_as_parquet diff --git a/udata_hydra/analysis/csv.py b/udata_hydra/analysis/csv.py index f9eb39bb..e0bd4765 100644 --- a/udata_hydra/analysis/csv.py +++ b/udata_hydra/analysis/csv.py @@ -34,7 +34,7 @@ from udata_hydra import config, context from udata_hydra.analysis import helpers from udata_hydra.analysis.geojson import csv_to_geojson_and_pmtiles -from udata_hydra.db import compute_insert_query +from udata_hydra.db import RESERVED_COLS, compute_insert_query from udata_hydra.db.check import Check from udata_hydra.db.resource import Resource from udata_hydra.db.resource_exception import ResourceException @@ -74,7 +74,6 @@ "binary": LargeBinary, } -RESERVED_COLS = ("__id", "cmin", "cmax", "collation", "ctid", "tableoid", "xmin", "xmax") minio_client = MinIOClient(bucket=config.MINIO_PARQUET_BUCKET, folder=config.MINIO_PARQUET_FOLDER) diff --git a/udata_hydra/analysis/geojson.py b/udata_hydra/analysis/geojson.py index a4d54694..7beae48d 100644 --- a/udata_hydra/analysis/geojson.py +++ b/udata_hydra/analysis/geojson.py @@ -11,6 +11,7 @@ from udata_hydra import config, context from udata_hydra.analysis import helpers +from udata_hydra.db import RESERVED_COLS from udata_hydra.db.check import Check from udata_hydra.db.resource import Resource from udata_hydra.db.resource_exception import ResourceException @@ -269,18 +270,30 @@ def _detect_geo_columns(inspection: dict) -> dict[str, str] | None: return None +def _db_col_name(col: str) -> str: + """Map a CSV column name to its actual PostgreSQL column name.""" + return f"{col}__hydra_renamed" if col.lower() in RESERVED_COLS else col + + def _build_feature_sql(table_name: str, geo: dict[str, str], columns: list[str]) -> str: - """Build a SQL query that generates GeoJSON features directly in PostgreSQL.""" + """Build a SQL query that generates GeoJSON features directly in PostgreSQL. + + Column names in `columns` and `geo` are the original CSV names. They are + mapped to their actual DB names (handling RESERVED_COLS renaming) for the + SQL identifiers, while the original names are kept as JSON keys so the + GeoJSON output matches what the CSV path produces. + """ property_cols = [c for c in columns if c not in geo.values()] - properties_args = ", ".join(f"'{col}', {_quote_ident(col)}" for col in property_cols) + properties_args = ", ".join( + f"'{col}', {_quote_ident(_db_col_name(col))}" for col in property_cols + ) if "geojson" in geo: - col = geo["geojson"] + col = _db_col_name(geo["geojson"]) geometry_sql = f"({_quote_ident(col)})::json" where = "" elif "latlon" in geo: - # latlon = "lat,lon" → GeoJSON needs [lon, lat] - col = geo["latlon"] + col = _db_col_name(geo["latlon"]) geometry_sql = f"""json_build_object( 'type', 'Point', 'coordinates', json_build_array( @@ -290,7 +303,7 @@ def _build_feature_sql(table_name: str, geo: dict[str, str], columns: list[str]) )""" where = f"WHERE {_quote_ident(col)} IS NOT NULL" elif "lonlat" in geo: - col = geo["lonlat"] + col = _db_col_name(geo["lonlat"]) geometry_sql = f"""json_build_object( 'type', 'Point', 'coordinates', json_build_array( @@ -300,8 +313,8 @@ def _build_feature_sql(table_name: str, geo: dict[str, str], columns: list[str]) )""" where = f"WHERE {_quote_ident(col)} IS NOT NULL" else: - lon_col = geo["longitude"] - lat_col = geo["latitude"] + lon_col = _db_col_name(geo["longitude"]) + lat_col = _db_col_name(geo["latitude"]) geometry_sql = f"""json_build_object( 'type', 'Point', 'coordinates', json_build_array({_quote_ident(lon_col)}, {_quote_ident(lat_col)}) diff --git a/udata_hydra/analysis/parquet.py b/udata_hydra/analysis/parquet.py index 32ab50bb..e2fdbe9b 100755 --- a/udata_hydra/analysis/parquet.py +++ b/udata_hydra/analysis/parquet.py @@ -15,7 +15,7 @@ from udata_hydra import config, context from udata_hydra.analysis import helpers from udata_hydra.analysis.csv import compute_create_table_query, csv_to_db_index -from udata_hydra.db import compute_insert_query +from udata_hydra.db import RESERVED_COLS, compute_insert_query from udata_hydra.db.check import Check from udata_hydra.db.resource import Resource from udata_hydra.db.resource_exception import ResourceException @@ -45,8 +45,6 @@ r"^timestamp\[\ws,": "datetime_aware", # the rest of the field depends on the timezone } -RESERVED_COLS = ("__id", "cmin", "cmax", "collation", "ctid", "tableoid", "xmin", "xmax") - async def analyse_parquet( check: Record | dict, diff --git a/udata_hydra/db/__init__.py b/udata_hydra/db/__init__.py index 78db2ee8..abb72a10 100644 --- a/udata_hydra/db/__init__.py +++ b/udata_hydra/db/__init__.py @@ -4,6 +4,11 @@ from udata_hydra import context +# PostgreSQL system columns and hydra's own __id that must be renamed when +# a user CSV happens to use them as headers. Shared across csv, parquet and +# geojson modules. +RESERVED_COLS = ("__id", "cmin", "cmax", "collation", "ctid", "tableoid", "xmin", "xmax") + def convert_dict_values_to_json(data: dict) -> dict: """ From 211caaf2b786153d8ac282e5ba33164a8d127632 Mon Sep 17 00:00:00 2001 From: Thibaud Dauce Date: Thu, 2 Apr 2026 13:35:10 +0200 Subject: [PATCH 04/12] =?UTF-8?q?prevent=20SQL=C2=A0injections?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- tests/test_analysis/test_analysis_csv.py | 48 ++++++++++++++++++++++++ udata_hydra/analysis/geojson.py | 28 +++++++++----- 2 files changed, 67 insertions(+), 9 deletions(-) diff --git a/tests/test_analysis/test_analysis_csv.py b/tests/test_analysis/test_analysis_csv.py index ae43da0a..907d9634 100644 --- a/tests/test_analysis/test_analysis_csv.py +++ b/tests/test_analysis/test_analysis_csv.py @@ -862,6 +862,54 @@ async def test_csv_to_geojson_from_db_with_reserved_column(db, clean_db): await db.execute(f'DROP TABLE IF EXISTS "{table_name}"') +async def test_csv_to_geojson_from_db_with_quote_in_column_name(db, clean_db): + """A CSV with a single quote in a column name should not break the SQL query.""" + output_path = Path(f"{RESOURCE_ID}.geojson") + try: + output_path.unlink() + except FileNotFoundError: + pass + + sep = ";" + columns = { + "l'adresse": ["10 rue de la Paix", "5 avenue Foch", "3 bd Raspail", "1 place Vendôme", "8 rue Rivoli"], + "lat": [10.0 * k * (-1) ** k for k in range(1, 6)], + "long": [20.0 * k * (-1) ** k for k in range(1, 6)], + } + file = sep.join(columns) + "\n" + for i in range(5): + file += sep.join(str(val) for val in [data[i] for data in columns.values()]) + "\n" + + with NamedTemporaryFile(delete=False) as fp: + fp.write(file.encode("utf-8")) + fp.seek(0) + inspection = csv_detective_routine( + file_path=fp.name, + output_profile=True, + num_rows=-1, + save_results=False, + ) + + table_name = "test_geojson_quote_col" + with patch("udata_hydra.config.CSV_TO_DB", True): + await csv_to_db(fp.name, inspection, table_name) + + result = await csv_to_geojson_from_db( + table_name, inspection, output_path, upload_to_minio=False + ) + assert result is not None + + with open(output_path) as f: + geojson = json.load(f) + + assert len(geojson["features"]) == 5 + for feat in geojson["features"]: + assert "l'adresse" in feat["properties"] + + output_path.unlink() + await db.execute(f'DROP TABLE IF EXISTS "{table_name}"') + + @pytest.mark.parametrize( "params", ( diff --git a/udata_hydra/analysis/geojson.py b/udata_hydra/analysis/geojson.py index 7beae48d..f3c91224 100644 --- a/udata_hydra/analysis/geojson.py +++ b/udata_hydra/analysis/geojson.py @@ -275,18 +275,27 @@ def _db_col_name(col: str) -> str: return f"{col}__hydra_renamed" if col.lower() in RESERVED_COLS else col -def _build_feature_sql(table_name: str, geo: dict[str, str], columns: list[str]) -> str: +def _build_feature_sql( + table_name: str, geo: dict[str, str], columns: list[str] +) -> tuple[str, list[str]]: """Build a SQL query that generates GeoJSON features directly in PostgreSQL. Column names in `columns` and `geo` are the original CSV names. They are mapped to their actual DB names (handling RESERVED_COLS renaming) for the - SQL identifiers, while the original names are kept as JSON keys so the - GeoJSON output matches what the CSV path produces. + SQL identifiers, while the original names are passed as query parameters + for the JSON keys so the GeoJSON output matches what the CSV path produces. + + Returns (query, params) where params are the original column names used as + JSON keys via $1, $2… placeholders. """ property_cols = [c for c in columns if c not in geo.values()] - properties_args = ", ".join( - f"'{col}', {_quote_ident(_db_col_name(col))}" for col in property_cols - ) + params: list[str] = [] + properties_fragments = [] + for col in property_cols: + params.append(col) + placeholder = f"${len(params)}::text" + properties_fragments.append(f"{placeholder}, {_quote_ident(_db_col_name(col))}") + properties_args = ", ".join(properties_fragments) if "geojson" in geo: col = _db_col_name(geo["geojson"]) @@ -321,7 +330,7 @@ def _build_feature_sql(table_name: str, geo: dict[str, str], columns: list[str]) )""" where = f"WHERE {_quote_ident(lat_col)} IS NOT NULL AND {_quote_ident(lon_col)} IS NOT NULL" - return f""" + query = f""" SELECT json_build_object( 'type', 'Feature', 'geometry', {geometry_sql}, @@ -330,6 +339,7 @@ def _build_feature_sql(table_name: str, geo: dict[str, str], columns: list[str]) FROM {_quote_ident(table_name)} {where} """ + return query, params async def csv_to_geojson_from_db( @@ -345,12 +355,12 @@ async def csv_to_geojson_from_db( return None columns = list(inspection["columns"].keys()) - query = _build_feature_sql(table_name, geo, columns) + query, params = _build_feature_sql(table_name, geo, columns) db = await context.pool("csv") async with db.acquire() as conn: async with conn.transaction(): - cursor = conn.cursor(query) + cursor = conn.cursor(query, *params) with output_file_path.open("w") as f: f.write('{"type": "FeatureCollection", "features": [\n') From ebd2a9474e55389a96165f6978ec208904e39fca Mon Sep 17 00:00:00 2001 From: Thibaud Dauce Date: Thu, 2 Apr 2026 13:39:06 +0200 Subject: [PATCH 05/12] simplify lat lon and lon lat --- udata_hydra/analysis/geojson.py | 53 +++++++++++---------------------- 1 file changed, 18 insertions(+), 35 deletions(-) diff --git a/udata_hydra/analysis/geojson.py b/udata_hydra/analysis/geojson.py index f3c91224..408242c0 100644 --- a/udata_hydra/analysis/geojson.py +++ b/udata_hydra/analysis/geojson.py @@ -170,33 +170,22 @@ def get_features( "properties": {col: row[col] for col in row.keys() if col != geo["geojson"]}, # type: ignore[union-attr] } - elif "latlon" in geo: - # ending up here means we either have the exact lat,lon format, or NaN - # skipping row if NaN - if row[geo["latlon"]] is None: + elif "latlon" in geo or "lonlat" in geo: + pair_key = "latlon" if "latlon" in geo else "lonlat" + pair_col = geo[pair_key] + if row[pair_col] is None: continue + coords = _cast_latlon(row[pair_col]) + # latlon already returns [lon, lat]; lonlat needs inversion + if pair_key == "lonlat": + coords = coords[::-1] yield { "type": "Feature", "geometry": { "type": "Point", - "coordinates": _cast_latlon(row[geo["latlon"]]), + "coordinates": coords, }, - "properties": {col: row[col] for col in row.keys() if col != geo["latlon"]}, # type: ignore[union-attr] - } - - elif "lonlat" in geo: - # ending up here means we either have the exact lon,lat format, or NaN - # skipping row if NaN - if row[geo["lonlat"]] is None: - continue - yield { - "type": "Feature", - "geometry": { - "type": "Point", - # inverting lon and lat to match the standard - "coordinates": _cast_latlon(row[geo["lonlat"]])[::-1], - }, - "properties": {col: row[col] for col in row.keys() if col != geo["lonlat"]}, # type: ignore[union-attr] + "properties": {col: row[col] for col in row.keys() if col != pair_col}, # type: ignore[union-attr] } else: @@ -301,23 +290,17 @@ def _build_feature_sql( col = _db_col_name(geo["geojson"]) geometry_sql = f"({_quote_ident(col)})::json" where = "" - elif "latlon" in geo: - col = _db_col_name(geo["latlon"]) - geometry_sql = f"""json_build_object( - 'type', 'Point', - 'coordinates', json_build_array( - (split_part({_clean_pair_sql(col)}, ',', 2))::float, - (split_part({_clean_pair_sql(col)}, ',', 1))::float - ) - )""" - where = f"WHERE {_quote_ident(col)} IS NOT NULL" - elif "lonlat" in geo: - col = _db_col_name(geo["lonlat"]) + elif "latlon" in geo or "lonlat" in geo: + pair_key = "latlon" if "latlon" in geo else "lonlat" + col = _db_col_name(geo[pair_key]) + # latlon = "lat,lon" → GeoJSON needs [lon, lat] so swap indices + # lonlat = "lon,lat" → already in GeoJSON order + lon_idx, lat_idx = ("2", "1") if pair_key == "latlon" else ("1", "2") geometry_sql = f"""json_build_object( 'type', 'Point', 'coordinates', json_build_array( - (split_part({_clean_pair_sql(col)}, ',', 1))::float, - (split_part({_clean_pair_sql(col)}, ',', 2))::float + (split_part({_clean_pair_sql(col)}, ',', {lon_idx}))::float, + (split_part({_clean_pair_sql(col)}, ',', {lat_idx}))::float ) )""" where = f"WHERE {_quote_ident(col)} IS NOT NULL" From 21a2c586815bde22046af8d1a71773f4ff877ac0 Mon Sep 17 00:00:00 2001 From: Thibaud Dauce Date: Thu, 2 Apr 2026 13:42:36 +0200 Subject: [PATCH 06/12] lint --- tests/test_analysis/test_analysis_csv.py | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) diff --git a/tests/test_analysis/test_analysis_csv.py b/tests/test_analysis/test_analysis_csv.py index 907d9634..31f7371d 100644 --- a/tests/test_analysis/test_analysis_csv.py +++ b/tests/test_analysis/test_analysis_csv.py @@ -872,7 +872,13 @@ async def test_csv_to_geojson_from_db_with_quote_in_column_name(db, clean_db): sep = ";" columns = { - "l'adresse": ["10 rue de la Paix", "5 avenue Foch", "3 bd Raspail", "1 place Vendôme", "8 rue Rivoli"], + "l'adresse": [ + "10 rue de la Paix", + "5 avenue Foch", + "3 bd Raspail", + "1 place Vendôme", + "8 rue Rivoli", + ], "lat": [10.0 * k * (-1) ** k for k in range(1, 6)], "long": [20.0 * k * (-1) ** k for k in range(1, 6)], } From 67172adf6af50123203b7c9aab19b136d412c4fb Mon Sep 17 00:00:00 2001 From: Thibaud Dauce Date: Thu, 2 Apr 2026 13:47:35 +0200 Subject: [PATCH 07/12] add DB_TO_GEOJSON config --- udata_hydra/analysis/geojson.py | 8 ++++---- udata_hydra/config_default.toml | 1 + udata_hydra/routes/status.py | 1 + 3 files changed, 6 insertions(+), 4 deletions(-) diff --git a/udata_hydra/analysis/geojson.py b/udata_hydra/analysis/geojson.py index 408242c0..abc02b7a 100644 --- a/udata_hydra/analysis/geojson.py +++ b/udata_hydra/analysis/geojson.py @@ -421,8 +421,8 @@ async def csv_to_geojson_and_pmtiles( timer: Timer | None = None, table_name: str | None = None, ) -> tuple[Path, int, str | None, Path, int, str | None] | None: - if not config.CSV_TO_GEOJSON: - log.debug("CSV_TO_GEOJSON turned off, skipping geojson/PMtiles export.") + if not config.CSV_TO_GEOJSON and not config.DB_TO_GEOJSON: + log.debug("CSV_TO_GEOJSON and DB_TO_GEOJSON turned off, skipping geojson/PMtiles export.") return None log.debug( @@ -438,8 +438,8 @@ async def csv_to_geojson_and_pmtiles( geojson_filepath = DEFAULT_GEOJSON_FILEPATH pmtiles_filepath = DEFAULT_PMTILES_FILEPATH - # Convert to GeoJSON — from DB if available, otherwise from CSV file - if table_name: + # Convert to GeoJSON — from DB if available and enabled, otherwise from CSV file + if config.DB_TO_GEOJSON and table_name: result = await csv_to_geojson_from_db( table_name, inspection, diff --git a/udata_hydra/config_default.toml b/udata_hydra/config_default.toml index 99861d87..94a77427 100644 --- a/udata_hydra/config_default.toml +++ b/udata_hydra/config_default.toml @@ -102,6 +102,7 @@ MINIO_PMTILES_FOLDER = "" # no trailing slash # -- Geojson conversion settings -- # CSV_TO_GEOJSON = false +DB_TO_GEOJSON = false MINIO_GEOJSON_BUCKET = "" MINIO_GEOJSON_FOLDER = "" # no trailing slash diff --git a/udata_hydra/routes/status.py b/udata_hydra/routes/status.py index 9d118798..3b6825b4 100644 --- a/udata_hydra/routes/status.py +++ b/udata_hydra/routes/status.py @@ -168,6 +168,7 @@ async def get_health(request: web.Request) -> web.Response: "csv_to_parquet": config.CSV_TO_PARQUET, "geojson_to_pmtiles": config.GEOJSON_TO_PMTILES, "csv_to_geojson": config.CSV_TO_GEOJSON, + "db_to_geojson": config.DB_TO_GEOJSON, "parquet_to_db": config.PARQUET_TO_DB, } ) From 23cdfd7c18a88b8454ff1a2ff90ee039556bef2c Mon Sep 17 00:00:00 2001 From: Thibaud Dauce Date: Mon, 6 Apr 2026 11:34:01 +0200 Subject: [PATCH 08/12] fix more than 100 arguments to pg function --- udata_hydra/analysis/geojson.py | 16 ++++++++++++++-- 1 file changed, 14 insertions(+), 2 deletions(-) diff --git a/udata_hydra/analysis/geojson.py b/udata_hydra/analysis/geojson.py index abc02b7a..e7e20b48 100644 --- a/udata_hydra/analysis/geojson.py +++ b/udata_hydra/analysis/geojson.py @@ -284,7 +284,19 @@ def _build_feature_sql( params.append(col) placeholder = f"${len(params)}::text" properties_fragments.append(f"{placeholder}, {_quote_ident(_db_col_name(col))}") - properties_args = ", ".join(properties_fragments) + + # PostgreSQL's json_build_object accepts max 100 arguments (50 key-value pairs). + # Split into chunks and merge with || when needed. + max_pairs = 50 + chunks = [ + properties_fragments[i : i + max_pairs] + for i in range(0, len(properties_fragments), max_pairs) + ] + if len(chunks) <= 1: + properties_sql = f"json_build_object({', '.join(properties_fragments)})" + else: + parts = [f"jsonb_build_object({', '.join(chunk)})" for chunk in chunks] + properties_sql = f"({' || '.join(parts)})::json" if "geojson" in geo: col = _db_col_name(geo["geojson"]) @@ -317,7 +329,7 @@ def _build_feature_sql( SELECT json_build_object( 'type', 'Feature', 'geometry', {geometry_sql}, - 'properties', json_build_object({properties_args}) + 'properties', {properties_sql} )::text FROM {_quote_ident(table_name)} {where} From df12267ae454d1a35cc6f67ed606ef3be2276231 Mon Sep 17 00:00:00 2001 From: Thibaud Dauce Date: Thu, 16 Apr 2026 17:01:09 +0200 Subject: [PATCH 09/12] apply code review --- tests/test_analysis/test_analysis_csv.py | 14 +-- udata_hydra/analysis/geojson.py | 36 ++++--- udata_hydra/utils/casting.py | 120 +++++++++++++---------- udata_hydra/utils/minio.py | 2 +- 4 files changed, 98 insertions(+), 74 deletions(-) diff --git a/tests/test_analysis/test_analysis_csv.py b/tests/test_analysis/test_analysis_csv.py index 4ed38d11..de895f1f 100644 --- a/tests/test_analysis/test_analysis_csv.py +++ b/tests/test_analysis/test_analysis_csv.py @@ -14,7 +14,7 @@ from tests.conftest import RESOURCE_ID, RESOURCE_URL from udata_hydra.analysis.csv import analyse_csv, csv_to_db -from udata_hydra.analysis.geojson import csv_to_geojson_and_pmtiles, csv_to_geojson_from_db +from udata_hydra.analysis.geojson import csv_to_geojson_and_pmtiles, save_as_geojson_from_db from udata_hydra.crawl.check_resources import check_resource from udata_hydra.db.check import Check from udata_hydra.db.resource import Resource @@ -753,7 +753,7 @@ async def test_csv_to_geojson_pmtiles(db, params, clean_db, mocker): ), ), ) -async def test_csv_to_geojson_from_db(db, geo_columns, expected_geo_key, clean_db): +async def test_save_as_geojson_from_db(db, geo_columns, expected_geo_key, clean_db): output_path = Path(f"{RESOURCE_ID}.geojson") try: output_path.unlink() @@ -784,7 +784,7 @@ async def test_csv_to_geojson_from_db(db, geo_columns, expected_geo_key, clean_d with patch("udata_hydra.config.CSV_TO_DB", True): await csv_to_db(fp.name, inspection, table_name) - result = await csv_to_geojson_from_db( + result = await save_as_geojson_from_db( table_name, inspection, output_path, upload_to_minio=False ) assert result is not None @@ -812,7 +812,7 @@ async def test_csv_to_geojson_from_db(db, geo_columns, expected_geo_key, clean_d await db.execute(f'DROP TABLE IF EXISTS "{table_name}"') -async def test_csv_to_geojson_from_db_with_reserved_column(db, clean_db): +async def test_save_as_geojson_from_db_with_reserved_column(db, clean_db): """A CSV with a reserved PG column name (xmin) should still produce valid GeoJSON from DB.""" output_path = Path(f"{RESOURCE_ID}.geojson") try: @@ -844,7 +844,7 @@ async def test_csv_to_geojson_from_db_with_reserved_column(db, clean_db): with patch("udata_hydra.config.CSV_TO_DB", True): await csv_to_db(fp.name, inspection, table_name) - result = await csv_to_geojson_from_db( + result = await save_as_geojson_from_db( table_name, inspection, output_path, upload_to_minio=False ) assert result is not None @@ -862,7 +862,7 @@ async def test_csv_to_geojson_from_db_with_reserved_column(db, clean_db): await db.execute(f'DROP TABLE IF EXISTS "{table_name}"') -async def test_csv_to_geojson_from_db_with_quote_in_column_name(db, clean_db): +async def test_save_as_geojson_from_db_with_quote_in_column_name(db, clean_db): """A CSV with a single quote in a column name should not break the SQL query.""" output_path = Path(f"{RESOURCE_ID}.geojson") try: @@ -900,7 +900,7 @@ async def test_csv_to_geojson_from_db_with_quote_in_column_name(db, clean_db): with patch("udata_hydra.config.CSV_TO_DB", True): await csv_to_db(fp.name, inspection, table_name) - result = await csv_to_geojson_from_db( + result = await save_as_geojson_from_db( table_name, inspection, output_path, upload_to_minio=False ) assert result is not None diff --git a/udata_hydra/analysis/geojson.py b/udata_hydra/analysis/geojson.py index e7e20b48..3398a5a2 100644 --- a/udata_hydra/analysis/geojson.py +++ b/udata_hydra/analysis/geojson.py @@ -31,10 +31,6 @@ log = logging.getLogger("udata-hydra") -# latlon/lonlat columns can contain values like "[48.8566, 2.3522]" or "48.8566 , 2.3522" -# Both versions below strip spaces and brackets, then split on comma. - - def _cast_latlon(latlon: str) -> list[float]: """Python version — used when reading from CSV.""" lat, lon = latlon.replace(" ", "").replace("[", "").replace("]", "").split(",") @@ -173,6 +169,7 @@ def get_features( elif "latlon" in geo or "lonlat" in geo: pair_key = "latlon" if "latlon" in geo else "lonlat" pair_col = geo[pair_key] + # skipping row if geo data is None (NaN in original CSV) if row[pair_col] is None: continue coords = _cast_latlon(row[pair_col]) @@ -280,10 +277,11 @@ def _build_feature_sql( property_cols = [c for c in columns if c not in geo.values()] params: list[str] = [] properties_fragments = [] - for col in property_cols: + for idx, col in enumerate(property_cols): params.append(col) - placeholder = f"${len(params)}::text" - properties_fragments.append(f"{placeholder}, {_quote_ident(_db_col_name(col))}") + # $N::text parameters are JSON *keys* (column names), not values. + # Values come from the quoted column identifiers and keep their native PG types. + properties_fragments.append(f"${idx + 1}::text, {_quote_ident(_db_col_name(col))}") # PostgreSQL's json_build_object accepts max 100 arguments (50 key-value pairs). # Split into chunks and merge with || when needed. @@ -330,20 +328,34 @@ def _build_feature_sql( 'type', 'Feature', 'geometry', {geometry_sql}, 'properties', {properties_sql} - )::text + )::text AS feature_json FROM {_quote_ident(table_name)} {where} """ return query, params -async def csv_to_geojson_from_db( +async def save_as_geojson_from_db( table_name: str, inspection: dict, output_file_path: Path, upload_to_minio: bool = True, ) -> tuple[int, str | None] | None: - """Generate a GeoJSON file by streaming features directly from PostgreSQL.""" + """Generate a GeoJSON file by streaming features directly from PostgreSQL. + + Uses a server-side cursor to avoid loading all features in memory. + Rows with NULL geographical columns are skipped. + + Args: + table_name: Name of the PostgreSQL table containing the CSV data. + inspection: CSV detective analysis results with column format detection. + output_file_path: Path where the GeoJSON file should be saved. + upload_to_minio: Whether to upload to MinIO (default: True). + + Returns: + geojson_size: Size of the GeoJSON file in bytes. + geojson_url: URL of the GeoJSON file on MinIO. None if not uploaded. + """ geo = _detect_geo_columns(inspection) if geo is None: log.debug("No geographical columns found, skipping") @@ -363,7 +375,7 @@ async def csv_to_geojson_from_db( async for row in cursor: if not first: f.write(",\n") - f.write(row[0]) + f.write(row["feature_json"]) first = False f.write("\n]}") @@ -452,7 +464,7 @@ async def csv_to_geojson_and_pmtiles( # Convert to GeoJSON — from DB if available and enabled, otherwise from CSV file if config.DB_TO_GEOJSON and table_name: - result = await csv_to_geojson_from_db( + result = await save_as_geojson_from_db( table_name, inspection, geojson_filepath, diff --git a/udata_hydra/utils/casting.py b/udata_hydra/utils/casting.py index a956f891..a537bc0d 100644 --- a/udata_hydra/utils/casting.py +++ b/udata_hydra/utils/casting.py @@ -1,54 +1,66 @@ -import logging -from typing import Any, Iterator - -from csv_detective.output.dataframe import cast - -from udata_hydra.utils.reader import Reader - -log = logging.getLogger("udata-hydra") - - -def smart_cast(_type: str, value, cast_json: bool = True, failsafe: bool = False) -> Any: - try: - if value is None or value == "": - return None - if _type == "json" and not cast_json: - # handing JSON as string to postgres, which casts it itself - return value - return cast(value, _type) - except ValueError as e: - if not failsafe: - raise e - log.warning(f'Could not convert "{value}" to {_type}, defaulting to null') - return None - - -def generate_records( - file_path: str, inspection: dict, cast_json: bool = True, as_dict: bool = False -) -> Iterator[list | dict]: - # because we need the iterator multiple times, not possible to - # handle db, parquet and geojson through the same iteration - columns = {col: v["python_type"] for col, v in inspection["columns"].items()} - with Reader(file_path, inspection) as reader: - for line in reader: - if line: - if not as_dict: - yield [ - smart_cast( - _type, - value if isinstance(value, str) or value is None else str(value), - cast_json=cast_json, - failsafe=False, - ) - for _type, value in zip(columns.values(), line) - ] - else: - yield { - col: smart_cast( - _type, - value if isinstance(value, str) or value is None else str(value), - cast_json=cast_json, - failsafe=False, - ) - for (col, _type), value in zip(columns.items(), line) - } +import inspect +import logging +from typing import Any, Iterator + +from csv_detective.output.dataframe import cast + +from udata_hydra.utils.reader import Reader + +log = logging.getLogger("udata-hydra") + +_cast_supports_date_format = "date_format" in inspect.signature(cast).parameters + + +def smart_cast( + _type: str, value, cast_json: bool = True, failsafe: bool = False, + date_format: list[str] | None = None, +) -> Any: + try: + if value is None or value == "": + return None + if _type == "json" and not cast_json: + return value + if _cast_supports_date_format: + return cast(value, _type, date_format=date_format) + return cast(value, _type) + except ValueError as e: + if not failsafe: + raise e + log.warning(f'Could not convert "{value}" to {_type}, defaulting to null') + return None + + +def generate_records( + file_path: str, inspection: dict, cast_json: bool = True, as_dict: bool = False +) -> Iterator[list | dict]: + # because we need the iterator multiple times, not possible to + # handle db, parquet and geojson through the same iteration + columns = { + col: (v["python_type"], v.get("date_format")) + for col, v in inspection["columns"].items() + } + with Reader(file_path, inspection) as reader: + for line in reader: + if line: + if not as_dict: + yield [ + smart_cast( + _type, + value if isinstance(value, str) or value is None else str(value), + cast_json=cast_json, + failsafe=False, + date_format=date_fmt, + ) + for (_type, date_fmt), value in zip(columns.values(), line) + ] + else: + yield { + col: smart_cast( + _type, + value if isinstance(value, str) or value is None else str(value), + cast_json=cast_json, + failsafe=False, + date_format=date_fmt, + ) + for (col, (_type, date_fmt)), value in zip(columns.items(), line) + } diff --git a/udata_hydra/utils/minio.py b/udata_hydra/utils/minio.py index 663918f3..6c2714d9 100644 --- a/udata_hydra/utils/minio.py +++ b/udata_hydra/utils/minio.py @@ -18,7 +18,7 @@ def __init__(self, bucket: str, folder: str): config.MINIO_URL or "test", access_key=self.user or "test", secret_key=self.password or "test", - secure=True, + secure=config.MINIO_SECURE if config.MINIO_SECURE is not None else True, ) if self.bucket: self.bucket_exists = self.client.bucket_exists(self.bucket) From 2b5038b9f1071391c8b7839dc14e1099b55e1a17 Mon Sep 17 00:00:00 2001 From: Thibaud Dauce Date: Thu, 16 Apr 2026 17:08:08 +0200 Subject: [PATCH 10/12] add tests --- tests/test_analysis/test_analysis_csv.py | 159 +++++++++++++++++++++++ 1 file changed, 159 insertions(+) diff --git a/tests/test_analysis/test_analysis_csv.py b/tests/test_analysis/test_analysis_csv.py index de895f1f..f36c61c6 100644 --- a/tests/test_analysis/test_analysis_csv.py +++ b/tests/test_analysis/test_analysis_csv.py @@ -916,6 +916,165 @@ async def test_save_as_geojson_from_db_with_quote_in_column_name(db, clean_db): await db.execute(f'DROP TABLE IF EXISTS "{table_name}"') +async def test_save_as_geojson_from_db_lonlat(db, clean_db): + """lonlat format ("[lon, lat]") should produce correct GeoJSON coordinates [lon, lat].""" + output_path = Path(f"{RESOURCE_ID}.geojson") + try: + output_path.unlink() + except FileNotFoundError: + pass + + lons = [20.0 * k * (-1) ** k for k in range(1, 6)] + lats = [10.0 * k * (-1) ** k for k in range(1, 6)] + sep = ";" + columns = { + "nombre": range(1, 6), + "geopoint": [f"[{lon}, {lat}]" for lon, lat in zip(lons, lats)], + } + file = sep.join(columns) + "\n" + for i in range(5): + file += sep.join(str(val) for val in [data[i] for data in columns.values()]) + "\n" + + with NamedTemporaryFile(delete=False) as fp: + fp.write(file.encode("utf-8")) + fp.seek(0) + inspection = csv_detective_routine( + file_path=fp.name, + output_profile=True, + num_rows=-1, + save_results=False, + ) + + assert "lonlat" in inspection["columns"]["geopoint"]["format"] + + table_name = "test_geojson_lonlat" + with patch("udata_hydra.config.CSV_TO_DB", True): + await csv_to_db(fp.name, inspection, table_name) + + result = await save_as_geojson_from_db( + table_name, inspection, output_path, upload_to_minio=False + ) + assert result is not None + + with open(output_path) as f: + geojson = json.load(f) + + assert len(geojson["features"]) == 5 + for i, feat in enumerate(geojson["features"]): + coords = feat["geometry"]["coordinates"] + assert coords[0] == pytest.approx(lons[i]) + assert coords[1] == pytest.approx(lats[i]) + assert "geopoint" not in feat["properties"] + assert "nombre" in feat["properties"] + + output_path.unlink() + await db.execute(f'DROP TABLE IF EXISTS "{table_name}"') + + +async def test_save_as_geojson_from_db_geojson_column(db, clean_db): + """A column containing GeoJSON strings should produce valid geometry from DB.""" + output_path = Path(f"{RESOURCE_ID}.geojson") + try: + output_path.unlink() + except FileNotFoundError: + pass + + geometries = [ + {"type": "Point", "coordinates": [10 * k * (-1) ** k, 20 * k * (-1) ** k]} + for k in range(1, 6) + ] + sep = ";" + columns = { + "nombre": range(1, 6), + "polyg": [json.dumps(g) for g in geometries], + } + file = sep.join(columns) + "\n" + for i in range(5): + file += sep.join(str(val) for val in [data[i] for data in columns.values()]) + "\n" + + with NamedTemporaryFile(delete=False) as fp: + fp.write(file.encode("utf-8")) + fp.seek(0) + inspection = csv_detective_routine( + file_path=fp.name, + output_profile=True, + num_rows=-1, + save_results=False, + ) + + assert "geojson" in inspection["columns"]["polyg"]["format"] + + table_name = "test_geojson_geojson_col" + with patch("udata_hydra.config.CSV_TO_DB", True): + await csv_to_db(fp.name, inspection, table_name) + + result = await save_as_geojson_from_db( + table_name, inspection, output_path, upload_to_minio=False + ) + assert result is not None + + with open(output_path) as f: + geojson = json.load(f) + + assert len(geojson["features"]) == 5 + for i, feat in enumerate(geojson["features"]): + assert feat["geometry"] == geometries[i] + assert "polyg" not in feat["properties"] + assert "nombre" in feat["properties"] + + output_path.unlink() + await db.execute(f'DROP TABLE IF EXISTS "{table_name}"') + + +async def test_save_as_geojson_from_db_many_columns(db, clean_db): + """More than 50 property columns should trigger json_build_object chunking.""" + output_path = Path(f"{RESOURCE_ID}.geojson") + try: + output_path.unlink() + except FileNotFoundError: + pass + + sep = ";" + columns = {f"col_{i:03d}": range(1, 6) for i in range(55)} + columns["lat"] = [10.0 * k * (-1) ** k for k in range(1, 6)] + columns["long"] = [20.0 * k * (-1) ** k for k in range(1, 6)] + file = sep.join(columns) + "\n" + for i in range(5): + file += sep.join(str(val) for val in [data[i] for data in columns.values()]) + "\n" + + with NamedTemporaryFile(delete=False) as fp: + fp.write(file.encode("utf-8")) + fp.seek(0) + inspection = csv_detective_routine( + file_path=fp.name, + output_profile=True, + num_rows=-1, + save_results=False, + ) + + table_name = "test_geojson_many_cols" + with patch("udata_hydra.config.CSV_TO_DB", True): + await csv_to_db(fp.name, inspection, table_name) + + result = await save_as_geojson_from_db( + table_name, inspection, output_path, upload_to_minio=False + ) + assert result is not None + + with open(output_path) as f: + geojson = json.load(f) + + assert len(geojson["features"]) == 5 + feat = geojson["features"][0] + for i in range(55): + assert f"col_{i:03d}" in feat["properties"] + assert "lat" not in feat["properties"] + assert "long" not in feat["properties"] + + output_path.unlink() + await db.execute(f'DROP TABLE IF EXISTS "{table_name}"') + + @pytest.mark.parametrize( "params", ( From 661c92769cbe4728ebdf6cffad7c4dd15056a594 Mon Sep 17 00:00:00 2001 From: Thibaud Dauce Date: Thu, 16 Apr 2026 17:12:05 +0200 Subject: [PATCH 11/12] lint --- udata_hydra/utils/casting.py | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/udata_hydra/utils/casting.py b/udata_hydra/utils/casting.py index a537bc0d..990e32f0 100644 --- a/udata_hydra/utils/casting.py +++ b/udata_hydra/utils/casting.py @@ -12,7 +12,10 @@ def smart_cast( - _type: str, value, cast_json: bool = True, failsafe: bool = False, + _type: str, + value, + cast_json: bool = True, + failsafe: bool = False, date_format: list[str] | None = None, ) -> Any: try: @@ -36,8 +39,7 @@ def generate_records( # because we need the iterator multiple times, not possible to # handle db, parquet and geojson through the same iteration columns = { - col: (v["python_type"], v.get("date_format")) - for col, v in inspection["columns"].items() + col: (v["python_type"], v.get("date_format")) for col, v in inspection["columns"].items() } with Reader(file_path, inspection) as reader: for line in reader: From 8bf53b30b2a35a56afffbbe67b48804dff7bae82 Mon Sep 17 00:00:00 2001 From: Thibaud Dauce Date: Mon, 20 Apr 2026 10:57:28 +0200 Subject: [PATCH 12/12] revert unrelated date_format changes --- udata_hydra/utils/casting.py | 122 ++++++++++++++++------------------- 1 file changed, 54 insertions(+), 68 deletions(-) diff --git a/udata_hydra/utils/casting.py b/udata_hydra/utils/casting.py index 990e32f0..a956f891 100644 --- a/udata_hydra/utils/casting.py +++ b/udata_hydra/utils/casting.py @@ -1,68 +1,54 @@ -import inspect -import logging -from typing import Any, Iterator - -from csv_detective.output.dataframe import cast - -from udata_hydra.utils.reader import Reader - -log = logging.getLogger("udata-hydra") - -_cast_supports_date_format = "date_format" in inspect.signature(cast).parameters - - -def smart_cast( - _type: str, - value, - cast_json: bool = True, - failsafe: bool = False, - date_format: list[str] | None = None, -) -> Any: - try: - if value is None or value == "": - return None - if _type == "json" and not cast_json: - return value - if _cast_supports_date_format: - return cast(value, _type, date_format=date_format) - return cast(value, _type) - except ValueError as e: - if not failsafe: - raise e - log.warning(f'Could not convert "{value}" to {_type}, defaulting to null') - return None - - -def generate_records( - file_path: str, inspection: dict, cast_json: bool = True, as_dict: bool = False -) -> Iterator[list | dict]: - # because we need the iterator multiple times, not possible to - # handle db, parquet and geojson through the same iteration - columns = { - col: (v["python_type"], v.get("date_format")) for col, v in inspection["columns"].items() - } - with Reader(file_path, inspection) as reader: - for line in reader: - if line: - if not as_dict: - yield [ - smart_cast( - _type, - value if isinstance(value, str) or value is None else str(value), - cast_json=cast_json, - failsafe=False, - date_format=date_fmt, - ) - for (_type, date_fmt), value in zip(columns.values(), line) - ] - else: - yield { - col: smart_cast( - _type, - value if isinstance(value, str) or value is None else str(value), - cast_json=cast_json, - failsafe=False, - date_format=date_fmt, - ) - for (col, (_type, date_fmt)), value in zip(columns.items(), line) - } +import logging +from typing import Any, Iterator + +from csv_detective.output.dataframe import cast + +from udata_hydra.utils.reader import Reader + +log = logging.getLogger("udata-hydra") + + +def smart_cast(_type: str, value, cast_json: bool = True, failsafe: bool = False) -> Any: + try: + if value is None or value == "": + return None + if _type == "json" and not cast_json: + # handing JSON as string to postgres, which casts it itself + return value + return cast(value, _type) + except ValueError as e: + if not failsafe: + raise e + log.warning(f'Could not convert "{value}" to {_type}, defaulting to null') + return None + + +def generate_records( + file_path: str, inspection: dict, cast_json: bool = True, as_dict: bool = False +) -> Iterator[list | dict]: + # because we need the iterator multiple times, not possible to + # handle db, parquet and geojson through the same iteration + columns = {col: v["python_type"] for col, v in inspection["columns"].items()} + with Reader(file_path, inspection) as reader: + for line in reader: + if line: + if not as_dict: + yield [ + smart_cast( + _type, + value if isinstance(value, str) or value is None else str(value), + cast_json=cast_json, + failsafe=False, + ) + for _type, value in zip(columns.values(), line) + ] + else: + yield { + col: smart_cast( + _type, + value if isinstance(value, str) or value is None else str(value), + cast_json=cast_json, + failsafe=False, + ) + for (col, _type), value in zip(columns.items(), line) + }