diff --git a/tests/test_analysis/test_analysis_csv.py b/tests/test_analysis/test_analysis_csv.py index ef2c34e3..f36c61c6 100644 --- a/tests/test_analysis/test_analysis_csv.py +++ b/tests/test_analysis/test_analysis_csv.py @@ -14,7 +14,7 @@ from tests.conftest import RESOURCE_ID, RESOURCE_URL from udata_hydra.analysis.csv import analyse_csv, csv_to_db -from udata_hydra.analysis.geojson import csv_to_geojson_and_pmtiles +from udata_hydra.analysis.geojson import csv_to_geojson_and_pmtiles, save_as_geojson_from_db from udata_hydra.crawl.check_resources import check_resource from udata_hydra.db.check import Check from udata_hydra.db.resource import Resource @@ -735,6 +735,346 @@ async def test_csv_to_geojson_pmtiles(db, params, clean_db, mocker): pmtiles_filepath.unlink() +@pytest.mark.parametrize( + "geo_columns,expected_geo_key", + ( + # latlon format: "lat,lon" → GeoJSON coordinates should be [lon, lat] + ( + {"coords": [f"{10.0 * k * (-1) ** k},{20.0 * k * (-1) ** k}" for k in range(1, 6)]}, + "latlon", + ), + # separate latitude + longitude columns + ( + { + "lat": [10.0 * k * (-1) ** k for k in range(1, 6)], + "long": [20.0 * k * (-1) ** k for k in range(1, 6)], + }, + "latitude", + ), + ), +) +async def test_save_as_geojson_from_db(db, geo_columns, expected_geo_key, clean_db): + output_path = Path(f"{RESOURCE_ID}.geojson") + try: + output_path.unlink() + except FileNotFoundError: + pass + + other_columns = { + "nombre": range(1, 6), + "score": [0.01, 1.2, 34.5, 678.9, 10], + } + sep = ";" + columns = other_columns | geo_columns + file = sep.join(columns) + "\n" + for i in range(len(other_columns["nombre"])): + file += sep.join(str(val) for val in [data[i] for data in columns.values()]) + "\n" + + with NamedTemporaryFile(delete=False) as fp: + fp.write(file.encode("utf-8")) + fp.seek(0) + inspection = csv_detective_routine( + file_path=fp.name, + output_profile=True, + num_rows=-1, + save_results=False, + ) + + table_name = "test_geojson_from_db" + with patch("udata_hydra.config.CSV_TO_DB", True): + await csv_to_db(fp.name, inspection, table_name) + + result = await save_as_geojson_from_db( + table_name, inspection, output_path, upload_to_minio=False + ) + assert result is not None + geojson_size, geojson_url = result + assert geojson_url is None + assert geojson_size > 0 + + with open(output_path) as f: + geojson = json.load(f) + + assert geojson["type"] == "FeatureCollection" + assert len(geojson["features"]) == 5 + for feat in geojson["features"]: + assert feat["type"] == "Feature" + assert feat["geometry"]["type"] == "Point" + coords = feat["geometry"]["coordinates"] + assert isinstance(coords[0], (int, float)) + assert isinstance(coords[1], (int, float)) + assert "nombre" in feat["properties"] + assert "score" in feat["properties"] + for geo_col in geo_columns: + assert geo_col not in feat["properties"] + + output_path.unlink() + await db.execute(f'DROP TABLE IF EXISTS "{table_name}"') + + +async def test_save_as_geojson_from_db_with_reserved_column(db, clean_db): + """A CSV with a reserved PG column name (xmin) should still produce valid GeoJSON from DB.""" + output_path = Path(f"{RESOURCE_ID}.geojson") + try: + output_path.unlink() + except FileNotFoundError: + pass + + sep = ";" + columns = { + "xmin": range(1, 6), + "lat": [10.0 * k * (-1) ** k for k in range(1, 6)], + "long": [20.0 * k * (-1) ** k for k in range(1, 6)], + } + file = sep.join(columns) + "\n" + for i in range(5): + file += sep.join(str(val) for val in [data[i] for data in columns.values()]) + "\n" + + with NamedTemporaryFile(delete=False) as fp: + fp.write(file.encode("utf-8")) + fp.seek(0) + inspection = csv_detective_routine( + file_path=fp.name, + output_profile=True, + num_rows=-1, + save_results=False, + ) + + table_name = "test_geojson_reserved_col" + with patch("udata_hydra.config.CSV_TO_DB", True): + await csv_to_db(fp.name, inspection, table_name) + + result = await save_as_geojson_from_db( + table_name, inspection, output_path, upload_to_minio=False + ) + assert result is not None + geojson_size, _ = result + + with open(output_path) as f: + geojson = json.load(f) + + assert len(geojson["features"]) == 5 + expected_xmin_values = list(range(1, 6)) + actual_xmin_values = [feat["properties"]["xmin"] for feat in geojson["features"]] + assert actual_xmin_values == expected_xmin_values + + output_path.unlink() + await db.execute(f'DROP TABLE IF EXISTS "{table_name}"') + + +async def test_save_as_geojson_from_db_with_quote_in_column_name(db, clean_db): + """A CSV with a single quote in a column name should not break the SQL query.""" + output_path = Path(f"{RESOURCE_ID}.geojson") + try: + output_path.unlink() + except FileNotFoundError: + pass + + sep = ";" + columns = { + "l'adresse": [ + "10 rue de la Paix", + "5 avenue Foch", + "3 bd Raspail", + "1 place Vendôme", + "8 rue Rivoli", + ], + "lat": [10.0 * k * (-1) ** k for k in range(1, 6)], + "long": [20.0 * k * (-1) ** k for k in range(1, 6)], + } + file = sep.join(columns) + "\n" + for i in range(5): + file += sep.join(str(val) for val in [data[i] for data in columns.values()]) + "\n" + + with NamedTemporaryFile(delete=False) as fp: + fp.write(file.encode("utf-8")) + fp.seek(0) + inspection = csv_detective_routine( + file_path=fp.name, + output_profile=True, + num_rows=-1, + save_results=False, + ) + + table_name = "test_geojson_quote_col" + with patch("udata_hydra.config.CSV_TO_DB", True): + await csv_to_db(fp.name, inspection, table_name) + + result = await save_as_geojson_from_db( + table_name, inspection, output_path, upload_to_minio=False + ) + assert result is not None + + with open(output_path) as f: + geojson = json.load(f) + + assert len(geojson["features"]) == 5 + for feat in geojson["features"]: + assert "l'adresse" in feat["properties"] + + output_path.unlink() + await db.execute(f'DROP TABLE IF EXISTS "{table_name}"') + + +async def test_save_as_geojson_from_db_lonlat(db, clean_db): + """lonlat format ("[lon, lat]") should produce correct GeoJSON coordinates [lon, lat].""" + output_path = Path(f"{RESOURCE_ID}.geojson") + try: + output_path.unlink() + except FileNotFoundError: + pass + + lons = [20.0 * k * (-1) ** k for k in range(1, 6)] + lats = [10.0 * k * (-1) ** k for k in range(1, 6)] + sep = ";" + columns = { + "nombre": range(1, 6), + "geopoint": [f"[{lon}, {lat}]" for lon, lat in zip(lons, lats)], + } + file = sep.join(columns) + "\n" + for i in range(5): + file += sep.join(str(val) for val in [data[i] for data in columns.values()]) + "\n" + + with NamedTemporaryFile(delete=False) as fp: + fp.write(file.encode("utf-8")) + fp.seek(0) + inspection = csv_detective_routine( + file_path=fp.name, + output_profile=True, + num_rows=-1, + save_results=False, + ) + + assert "lonlat" in inspection["columns"]["geopoint"]["format"] + + table_name = "test_geojson_lonlat" + with patch("udata_hydra.config.CSV_TO_DB", True): + await csv_to_db(fp.name, inspection, table_name) + + result = await save_as_geojson_from_db( + table_name, inspection, output_path, upload_to_minio=False + ) + assert result is not None + + with open(output_path) as f: + geojson = json.load(f) + + assert len(geojson["features"]) == 5 + for i, feat in enumerate(geojson["features"]): + coords = feat["geometry"]["coordinates"] + assert coords[0] == pytest.approx(lons[i]) + assert coords[1] == pytest.approx(lats[i]) + assert "geopoint" not in feat["properties"] + assert "nombre" in feat["properties"] + + output_path.unlink() + await db.execute(f'DROP TABLE IF EXISTS "{table_name}"') + + +async def test_save_as_geojson_from_db_geojson_column(db, clean_db): + """A column containing GeoJSON strings should produce valid geometry from DB.""" + output_path = Path(f"{RESOURCE_ID}.geojson") + try: + output_path.unlink() + except FileNotFoundError: + pass + + geometries = [ + {"type": "Point", "coordinates": [10 * k * (-1) ** k, 20 * k * (-1) ** k]} + for k in range(1, 6) + ] + sep = ";" + columns = { + "nombre": range(1, 6), + "polyg": [json.dumps(g) for g in geometries], + } + file = sep.join(columns) + "\n" + for i in range(5): + file += sep.join(str(val) for val in [data[i] for data in columns.values()]) + "\n" + + with NamedTemporaryFile(delete=False) as fp: + fp.write(file.encode("utf-8")) + fp.seek(0) + inspection = csv_detective_routine( + file_path=fp.name, + output_profile=True, + num_rows=-1, + save_results=False, + ) + + assert "geojson" in inspection["columns"]["polyg"]["format"] + + table_name = "test_geojson_geojson_col" + with patch("udata_hydra.config.CSV_TO_DB", True): + await csv_to_db(fp.name, inspection, table_name) + + result = await save_as_geojson_from_db( + table_name, inspection, output_path, upload_to_minio=False + ) + assert result is not None + + with open(output_path) as f: + geojson = json.load(f) + + assert len(geojson["features"]) == 5 + for i, feat in enumerate(geojson["features"]): + assert feat["geometry"] == geometries[i] + assert "polyg" not in feat["properties"] + assert "nombre" in feat["properties"] + + output_path.unlink() + await db.execute(f'DROP TABLE IF EXISTS "{table_name}"') + + +async def test_save_as_geojson_from_db_many_columns(db, clean_db): + """More than 50 property columns should trigger json_build_object chunking.""" + output_path = Path(f"{RESOURCE_ID}.geojson") + try: + output_path.unlink() + except FileNotFoundError: + pass + + sep = ";" + columns = {f"col_{i:03d}": range(1, 6) for i in range(55)} + columns["lat"] = [10.0 * k * (-1) ** k for k in range(1, 6)] + columns["long"] = [20.0 * k * (-1) ** k for k in range(1, 6)] + file = sep.join(columns) + "\n" + for i in range(5): + file += sep.join(str(val) for val in [data[i] for data in columns.values()]) + "\n" + + with NamedTemporaryFile(delete=False) as fp: + fp.write(file.encode("utf-8")) + fp.seek(0) + inspection = csv_detective_routine( + file_path=fp.name, + output_profile=True, + num_rows=-1, + save_results=False, + ) + + table_name = "test_geojson_many_cols" + with patch("udata_hydra.config.CSV_TO_DB", True): + await csv_to_db(fp.name, inspection, table_name) + + result = await save_as_geojson_from_db( + table_name, inspection, output_path, upload_to_minio=False + ) + assert result is not None + + with open(output_path) as f: + geojson = json.load(f) + + assert len(geojson["features"]) == 5 + feat = geojson["features"][0] + for i in range(55): + assert f"col_{i:03d}" in feat["properties"] + assert "lat" not in feat["properties"] + assert "long" not in feat["properties"] + + output_path.unlink() + await db.execute(f'DROP TABLE IF EXISTS "{table_name}"') + + @pytest.mark.parametrize( "params", ( diff --git a/tests/test_analysis/test_parquet_export.py b/tests/test_analysis/test_parquet_export.py index 29f69e1a..72bf49ac 100755 --- a/tests/test_analysis/test_parquet_export.py +++ b/tests/test_analysis/test_parquet_export.py @@ -7,11 +7,11 @@ from tests.conftest import RESOURCE_ID from udata_hydra.analysis.csv import ( - RESERVED_COLS, csv_to_db, csv_to_parquet, generate_records, ) +from udata_hydra.db import RESERVED_COLS from udata_hydra.utils.minio import MinIOClient from udata_hydra.utils.parquet import save_as_parquet, save_as_parquet_from_db diff --git a/udata_hydra/analysis/csv.py b/udata_hydra/analysis/csv.py index acdfbed6..511bef3a 100644 --- a/udata_hydra/analysis/csv.py +++ b/udata_hydra/analysis/csv.py @@ -34,7 +34,7 @@ from udata_hydra import config, context from udata_hydra.analysis import helpers from udata_hydra.analysis.geojson import csv_to_geojson_and_pmtiles -from udata_hydra.db import compute_insert_query +from udata_hydra.db import RESERVED_COLS, compute_insert_query from udata_hydra.db.check import Check from udata_hydra.db.resource import Resource from udata_hydra.db.resource_exception import ResourceException @@ -74,7 +74,6 @@ "binary": LargeBinary, } -RESERVED_COLS = ("__id", "cmin", "cmax", "collation", "ctid", "tableoid", "xmin", "xmax") minio_client = MinIOClient(bucket=config.MINIO_PARQUET_BUCKET, folder=config.MINIO_PARQUET_FOLDER) @@ -189,6 +188,7 @@ async def analyse_csv( resource_id=resource_id, check_id=check["id"], timer=timer, + table_name=table_name if config.CSV_TO_DB else None, ) except Exception as e: remove_remainders(resource_id, ["geojson", "pmtiles", "pmtiles-journal"]) diff --git a/udata_hydra/analysis/geojson.py b/udata_hydra/analysis/geojson.py index 72b0cdbc..3398a5a2 100644 --- a/udata_hydra/analysis/geojson.py +++ b/udata_hydra/analysis/geojson.py @@ -9,8 +9,9 @@ from asyncpg import Record from json_stream import streamable_list -from udata_hydra import config +from udata_hydra import config, context from udata_hydra.analysis import helpers +from udata_hydra.db import RESERVED_COLS from udata_hydra.db.check import Check from udata_hydra.db.resource import Resource from udata_hydra.db.resource_exception import ResourceException @@ -29,6 +30,24 @@ log = logging.getLogger("udata-hydra") + +def _cast_latlon(latlon: str) -> list[float]: + """Python version — used when reading from CSV.""" + lat, lon = latlon.replace(" ", "").replace("[", "").replace("]", "").split(",") + # GeoJSON standard: longitude before latitude + return [float(lon), float(lat)] + + +def _quote_ident(name: str) -> str: + """Escape a PostgreSQL identifier to prevent SQL injection.""" + return '"' + name.replace('"', '""') + '"' + + +def _clean_pair_sql(col: str) -> str: + """SQL version — used when reading from PostgreSQL.""" + return f"replace(replace(replace({_quote_ident(col)}, ' ', ''), '[', ''), ']', '')" + + minio_client_pmtiles = MinIOClient( bucket=config.MINIO_PMTILES_BUCKET, folder=config.MINIO_PMTILES_FOLDER ) @@ -133,13 +152,6 @@ async def csv_to_geojson( geojson_url: URL of the GeoJSON file on MinIO. None if it was not uploaded to MinIO. """ - def cast_latlon(latlon: str) -> list[float]: - # we can safely do this as the detection was successful - # removing potential blank and brackets - lat, lon = latlon.replace(" ", "").replace("[", "").replace("]", "").split(",") - # using the geojson standard: longitude before latitude - return [float(lon), float(lat)] - def get_features( file_path: str, inspection: dict, geo: dict[str, Any] ) -> Iterator[dict[str, Any]]: @@ -154,33 +166,23 @@ def get_features( "properties": {col: row[col] for col in row.keys() if col != geo["geojson"]}, # type: ignore[union-attr] } - elif "latlon" in geo: - # ending up here means we either have the exact lat,lon format, or NaN - # skipping row if NaN - if row[geo["latlon"]] is None: + elif "latlon" in geo or "lonlat" in geo: + pair_key = "latlon" if "latlon" in geo else "lonlat" + pair_col = geo[pair_key] + # skipping row if geo data is None (NaN in original CSV) + if row[pair_col] is None: continue + coords = _cast_latlon(row[pair_col]) + # latlon already returns [lon, lat]; lonlat needs inversion + if pair_key == "lonlat": + coords = coords[::-1] yield { "type": "Feature", "geometry": { "type": "Point", - "coordinates": cast_latlon(row[geo["latlon"]]), + "coordinates": coords, }, - "properties": {col: row[col] for col in row.keys() if col != geo["latlon"]}, # type: ignore[union-attr] - } - - elif "lonlat" in geo: - # ending up here means we either have the exact lon,lat format, or NaN - # skipping row if NaN - if row[geo["lonlat"]] is None: - continue - yield { - "type": "Feature", - "geometry": { - "type": "Point", - # inverting lon and lat to match the standard - "coordinates": cast_latlon(row[geo["lonlat"]])[::-1], - }, - "properties": {col: row[col] for col in row.keys() if col != geo["lonlat"]}, # type: ignore[union-attr] + "properties": {col: row[col] for col in row.keys() if col != pair_col}, # type: ignore[union-attr] } else: @@ -201,6 +203,35 @@ def get_features( }, } + geo = _detect_geo_columns(inspection) + if geo is None: + log.debug("No geographical columns found, skipping") + return None + + template = {"type": "FeatureCollection"} + + template["features"] = streamable_list(get_features(file_path, inspection, geo)) + + with output_file_path.open("w") as f: + json.dump(template, f, indent=4, ensure_ascii=False, default=str) + + geojson_size: int = os.path.getsize(output_file_path) + + if upload_to_minio: + log.debug(f"Sending GeoJSON file {output_file_path} to MinIO") + geojson_url = minio_client_geojson.send_file(str(output_file_path), delete_source=False) + else: + geojson_url = None + + return geojson_size, geojson_url + + +def _detect_geo_columns(inspection: dict) -> dict[str, str] | None: + """Detect geographical columns from csv-detective inspection results. + + Returns a dict like {"latitude": "col_name", "longitude": "col_name"} + or {"geojson": "col_name"} etc, or None if no geo columns found. + """ geo = {} for column, detection in inspection["columns"].items(): # see csv-detective's geo formats: @@ -215,26 +246,138 @@ def get_features( geo[fmt] = (column, detection["score"]) # priority is given to geojson, then latlon, then lonlat, then latitude + longitude if "geojson" in geo: - geo = {"geojson": geo["geojson"][0]} + return {"geojson": geo["geojson"][0]} elif "latlon" in geo: - geo = {"latlon": geo["latlon"][0]} + return {"latlon": geo["latlon"][0]} elif "lonlat" in geo: - geo = {"lonlat": geo["lonlat"][0]} + return {"lonlat": geo["lonlat"][0]} elif "latitude" in geo and "longitude" in geo: - geo = { - "latitude": geo["latitude"][0], - "longitude": geo["longitude"][0], - } + return {"latitude": geo["latitude"][0], "longitude": geo["longitude"][0]} + return None + + +def _db_col_name(col: str) -> str: + """Map a CSV column name to its actual PostgreSQL column name.""" + return f"{col}__hydra_renamed" if col.lower() in RESERVED_COLS else col + + +def _build_feature_sql( + table_name: str, geo: dict[str, str], columns: list[str] +) -> tuple[str, list[str]]: + """Build a SQL query that generates GeoJSON features directly in PostgreSQL. + + Column names in `columns` and `geo` are the original CSV names. They are + mapped to their actual DB names (handling RESERVED_COLS renaming) for the + SQL identifiers, while the original names are passed as query parameters + for the JSON keys so the GeoJSON output matches what the CSV path produces. + + Returns (query, params) where params are the original column names used as + JSON keys via $1, $2… placeholders. + """ + property_cols = [c for c in columns if c not in geo.values()] + params: list[str] = [] + properties_fragments = [] + for idx, col in enumerate(property_cols): + params.append(col) + # $N::text parameters are JSON *keys* (column names), not values. + # Values come from the quoted column identifiers and keep their native PG types. + properties_fragments.append(f"${idx + 1}::text, {_quote_ident(_db_col_name(col))}") + + # PostgreSQL's json_build_object accepts max 100 arguments (50 key-value pairs). + # Split into chunks and merge with || when needed. + max_pairs = 50 + chunks = [ + properties_fragments[i : i + max_pairs] + for i in range(0, len(properties_fragments), max_pairs) + ] + if len(chunks) <= 1: + properties_sql = f"json_build_object({', '.join(properties_fragments)})" else: + parts = [f"jsonb_build_object({', '.join(chunk)})" for chunk in chunks] + properties_sql = f"({' || '.join(parts)})::json" + + if "geojson" in geo: + col = _db_col_name(geo["geojson"]) + geometry_sql = f"({_quote_ident(col)})::json" + where = "" + elif "latlon" in geo or "lonlat" in geo: + pair_key = "latlon" if "latlon" in geo else "lonlat" + col = _db_col_name(geo[pair_key]) + # latlon = "lat,lon" → GeoJSON needs [lon, lat] so swap indices + # lonlat = "lon,lat" → already in GeoJSON order + lon_idx, lat_idx = ("2", "1") if pair_key == "latlon" else ("1", "2") + geometry_sql = f"""json_build_object( + 'type', 'Point', + 'coordinates', json_build_array( + (split_part({_clean_pair_sql(col)}, ',', {lon_idx}))::float, + (split_part({_clean_pair_sql(col)}, ',', {lat_idx}))::float + ) + )""" + where = f"WHERE {_quote_ident(col)} IS NOT NULL" + else: + lon_col = _db_col_name(geo["longitude"]) + lat_col = _db_col_name(geo["latitude"]) + geometry_sql = f"""json_build_object( + 'type', 'Point', + 'coordinates', json_build_array({_quote_ident(lon_col)}, {_quote_ident(lat_col)}) + )""" + where = f"WHERE {_quote_ident(lat_col)} IS NOT NULL AND {_quote_ident(lon_col)} IS NOT NULL" + + query = f""" + SELECT json_build_object( + 'type', 'Feature', + 'geometry', {geometry_sql}, + 'properties', {properties_sql} + )::text AS feature_json + FROM {_quote_ident(table_name)} + {where} + """ + return query, params + + +async def save_as_geojson_from_db( + table_name: str, + inspection: dict, + output_file_path: Path, + upload_to_minio: bool = True, +) -> tuple[int, str | None] | None: + """Generate a GeoJSON file by streaming features directly from PostgreSQL. + + Uses a server-side cursor to avoid loading all features in memory. + Rows with NULL geographical columns are skipped. + + Args: + table_name: Name of the PostgreSQL table containing the CSV data. + inspection: CSV detective analysis results with column format detection. + output_file_path: Path where the GeoJSON file should be saved. + upload_to_minio: Whether to upload to MinIO (default: True). + + Returns: + geojson_size: Size of the GeoJSON file in bytes. + geojson_url: URL of the GeoJSON file on MinIO. None if not uploaded. + """ + geo = _detect_geo_columns(inspection) + if geo is None: log.debug("No geographical columns found, skipping") return None - template = {"type": "FeatureCollection"} + columns = list(inspection["columns"].keys()) + query, params = _build_feature_sql(table_name, geo, columns) - template["features"] = streamable_list(get_features(file_path, inspection, geo)) + db = await context.pool("csv") + async with db.acquire() as conn: + async with conn.transaction(): + cursor = conn.cursor(query, *params) - with output_file_path.open("w") as f: - json.dump(template, f, indent=4, ensure_ascii=False, default=str) + with output_file_path.open("w") as f: + f.write('{"type": "FeatureCollection", "features": [\n') + first = True + async for row in cursor: + if not first: + f.write(",\n") + f.write(row["feature_json"]) + first = False + f.write("\n]}") geojson_size: int = os.path.getsize(output_file_path) @@ -300,9 +443,10 @@ async def csv_to_geojson_and_pmtiles( resource_id: str | None = None, check_id: int | None = None, timer: Timer | None = None, + table_name: str | None = None, ) -> tuple[Path, int, str | None, Path, int, str | None] | None: - if not config.CSV_TO_GEOJSON: - log.debug("CSV_TO_GEOJSON turned off, skipping geojson/PMtiles export.") + if not config.CSV_TO_GEOJSON and not config.DB_TO_GEOJSON: + log.debug("CSV_TO_GEOJSON and DB_TO_GEOJSON turned off, skipping geojson/PMtiles export.") return None log.debug( @@ -318,8 +462,16 @@ async def csv_to_geojson_and_pmtiles( geojson_filepath = DEFAULT_GEOJSON_FILEPATH pmtiles_filepath = DEFAULT_PMTILES_FILEPATH - # Convert CSV to GeoJSON - result = await csv_to_geojson(file_path, inspection, geojson_filepath, upload_to_minio=True) + # Convert to GeoJSON — from DB if available and enabled, otherwise from CSV file + if config.DB_TO_GEOJSON and table_name: + result = await save_as_geojson_from_db( + table_name, + inspection, + geojson_filepath, + upload_to_minio=True, + ) + else: + result = await csv_to_geojson(file_path, inspection, geojson_filepath, upload_to_minio=True) if result is None: return None geojson_size, geojson_url = result diff --git a/udata_hydra/analysis/parquet.py b/udata_hydra/analysis/parquet.py index 6beec008..0ade8653 100755 --- a/udata_hydra/analysis/parquet.py +++ b/udata_hydra/analysis/parquet.py @@ -15,7 +15,7 @@ from udata_hydra import config, context from udata_hydra.analysis import helpers from udata_hydra.analysis.csv import compute_create_table_query, csv_to_db_index -from udata_hydra.db import compute_insert_query +from udata_hydra.db import RESERVED_COLS, compute_insert_query from udata_hydra.db.check import Check from udata_hydra.db.resource import Resource from udata_hydra.db.resource_exception import ResourceException @@ -45,8 +45,6 @@ r"^timestamp\[\ws,": "datetime_aware", # the rest of the field depends on the timezone } -RESERVED_COLS = ("__id", "cmin", "cmax", "collation", "ctid", "tableoid", "xmin", "xmax") - async def analyse_parquet( check: Record | dict, diff --git a/udata_hydra/config_default.toml b/udata_hydra/config_default.toml index 865088d9..f2c83987 100644 --- a/udata_hydra/config_default.toml +++ b/udata_hydra/config_default.toml @@ -104,6 +104,7 @@ MINIO_PMTILES_FOLDER = "" # no trailing slash # -- Geojson conversion settings -- # CSV_TO_GEOJSON = false +DB_TO_GEOJSON = false MINIO_GEOJSON_BUCKET = "" MINIO_GEOJSON_FOLDER = "" # no trailing slash diff --git a/udata_hydra/db/__init__.py b/udata_hydra/db/__init__.py index 78db2ee8..abb72a10 100644 --- a/udata_hydra/db/__init__.py +++ b/udata_hydra/db/__init__.py @@ -4,6 +4,11 @@ from udata_hydra import context +# PostgreSQL system columns and hydra's own __id that must be renamed when +# a user CSV happens to use them as headers. Shared across csv, parquet and +# geojson modules. +RESERVED_COLS = ("__id", "cmin", "cmax", "collation", "ctid", "tableoid", "xmin", "xmax") + def convert_dict_values_to_json(data: dict) -> dict: """ diff --git a/udata_hydra/routes/status.py b/udata_hydra/routes/status.py index 9657b2b8..0d8f7991 100644 --- a/udata_hydra/routes/status.py +++ b/udata_hydra/routes/status.py @@ -169,6 +169,7 @@ async def get_health(request: web.Request) -> web.Response: "db_to_parquet": config.DB_TO_PARQUET, "geojson_to_pmtiles": config.GEOJSON_TO_PMTILES, "csv_to_geojson": config.CSV_TO_GEOJSON, + "db_to_geojson": config.DB_TO_GEOJSON, "parquet_to_db": config.PARQUET_TO_DB, } ) diff --git a/udata_hydra/utils/minio.py b/udata_hydra/utils/minio.py index 663918f3..6c2714d9 100644 --- a/udata_hydra/utils/minio.py +++ b/udata_hydra/utils/minio.py @@ -18,7 +18,7 @@ def __init__(self, bucket: str, folder: str): config.MINIO_URL or "test", access_key=self.user or "test", secret_key=self.password or "test", - secure=True, + secure=config.MINIO_SECURE if config.MINIO_SECURE is not None else True, ) if self.bucket: self.bucket_exists = self.client.bucket_exists(self.bucket)