Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
103 changes: 99 additions & 4 deletions tests/test_analysis/test_analysis_csv.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,11 @@

from tests.conftest import RESOURCE_ID, RESOURCE_URL
from udata_hydra.analysis.csv import analyse_csv, csv_to_db
from udata_hydra.analysis.geojson import csv_to_geojson_and_pmtiles, csv_to_geojson_from_db
from udata_hydra.analysis.geojson import (
csv_to_geojson_and_pmtiles,
csv_to_geojson_from_db,
task_csv_to_geojson,
)
from udata_hydra.crawl.check_resources import check_resource
from udata_hydra.db.check import Check
from udata_hydra.db.resource import Resource
Expand Down Expand Up @@ -642,7 +646,7 @@ async def test_csv_to_geojson_pmtiles(db, params, clean_db, mocker):
assert expected_formats[col] in inspection["columns"][col]["format"]

with (
patch("udata_hydra.config.CSV_TO_GEOJSON", patched_config),
patch("udata_hydra.config.DB_TO_GEOJSON", patched_config),
patch("udata_hydra.config.REMOVE_GENERATED_FILES", False),
):
if not patched_config or expected_formats is None:
Expand Down Expand Up @@ -670,11 +674,11 @@ async def test_csv_to_geojson_pmtiles(db, params, clean_db, mocker):
)
with (
patch(
"udata_hydra.analysis.geojson.minio_client_geojson",
"udata_hydra.analysis.csv_geojson.minio_client_geojson",
new=mocked_minio_client_geojson,
),
patch(
"udata_hydra.analysis.geojson.minio_client_pmtiles",
"udata_hydra.analysis.pmtiles.minio_client_pmtiles",
new=mocked_minio_client_pmtiles,
),
):
Expand Down Expand Up @@ -1043,3 +1047,94 @@ async def test_file_with_nan(
# inf does for max, mean and std
assert all(profile["c"][method] is None for method in ["max", "mean", "std"])
assert profile["c"]["min"] is not None


async def test_analyse_csv_split_geo_rq_chain(
setup_catalog,
rmock,
db,
fake_check,
produce_mock,
mocker,
):
"""Chained geo tasks (from the parsing table) set geojson and pmtiles URLs on the check."""
check = await fake_check()
rmock.get(
check["url"],
status=200,
body=b"lat,long,name\n48.8,2.3,a\n48.9,2.4,b\n",
)
mocker.patch("udata_hydra.config.MINIO_URL", "my.minio.fr")
mocked_minio = MagicMock()
mocked_minio.fput_object.return_value = None
mocked_minio.bucket_exists.return_value = True

async def fake_download(dl_url: str, suffix: str = "") -> Path:
p = Path(f"{RESOURCE_ID}.geojson")
assert p.is_file()
return p.resolve()

with patch("udata_hydra.utils.minio.Minio", return_value=mocked_minio):
geo_client = MinIOClient(bucket="geojson_bucket", folder="geojson_folder")
pmtiles_client = MinIOClient(bucket="pmtiles_bucket", folder="pmtiles_folder")

with (
patch("udata_hydra.config.DB_TO_GEOJSON", True),
patch("udata_hydra.config.REMOVE_GENERATED_FILES", False),
patch("udata_hydra.analysis.csv_geojson.minio_client_geojson", new=geo_client),
patch("udata_hydra.analysis.pmtiles.minio_client_pmtiles", new=pmtiles_client),
patch("udata_hydra.utils.file.download_url_to_tempfile", new=fake_download),
):
await analyse_csv(check=check)

res = await Check.get_by_id(check["id"])
assert res is not None
assert res["parsing_error"] is None
assert res["geojson_url"].endswith(f"{RESOURCE_ID}.geojson")
assert res["pmtiles_url"].endswith(f"{RESOURCE_ID}.pmtiles")

for ext in ("geojson", "pmtiles"):
p = Path(f"{RESOURCE_ID}.{ext}")
if p.is_file():
p.unlink()


async def test_analyse_csv_skips_geo_rq_when_csv_to_db_disabled(
setup_catalog,
rmock,
db,
fake_check,
produce_mock,
mocker,
):
check = await fake_check()
url = check["url"]
rmock.get(url, status=200, body="a,b\n1,2\n")
enq = mocker.patch("udata_hydra.analysis.csv.queue.enqueue")
with (
patch("udata_hydra.config.DB_TO_GEOJSON", True),
patch("udata_hydra.config.CSV_TO_DB", False),
):
await analyse_csv(check=check)
geo_enqueues = [c for c in enq.call_args_list if c[0] and c[0][0] is task_csv_to_geojson]
assert len(geo_enqueues) == 0


async def test_split_geo_chain_skips_pmtiles_without_geo_columns(
setup_catalog,
rmock,
db,
fake_check,
produce_mock,
mocker,
):
"""PMTiles job must not run when the parsing table has no geographical columns."""
check = await fake_check()
url = check["url"]
rmock.get(url, status=200, body="a,b,c\n1,2,3\n")
to_pmtiles = mocker.patch("udata_hydra.analysis.geojson.task_geojson_to_pmtiles")

with patch("udata_hydra.config.DB_TO_GEOJSON", True):
await analyse_csv(check=check)

to_pmtiles.assert_not_called()
6 changes: 3 additions & 3 deletions tests/test_analysis/test_geojson.py
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ async def test_geojson_to_pmtiles_valid_geometry(mocker):
with patch("udata_hydra.utils.minio.Minio", return_value=mocked_minio):
mocked_minio_client = MinIOClient(bucket=bucket, folder=folder)
with (
patch("udata_hydra.analysis.geojson.minio_client_pmtiles", new=mocked_minio_client),
patch("udata_hydra.analysis.pmtiles.minio_client_pmtiles", new=mocked_minio_client),
patch("udata_hydra.config.REMOVE_GENERATED_FILES", False),
):
mock_os = mocker.patch("udata_hydra.utils.minio.os")
Expand Down Expand Up @@ -124,7 +124,7 @@ async def test_csv_to_geojson_big_file(
with patch("udata_hydra.utils.minio.Minio", return_value=mocked_minio):
mocked_minio_client = MinIOClient(bucket=bucket, folder=folder)

with patch("udata_hydra.analysis.geojson.minio_client_geojson", new=mocked_minio_client):
with patch("udata_hydra.analysis.csv_geojson.minio_client_geojson", new=mocked_minio_client):
mock_os = mocker.patch("udata_hydra.utils.minio.os")
mock_os.path = os.path
mock_os.remove.return_value = None
Expand Down Expand Up @@ -211,7 +211,7 @@ async def test_geojson_to_pmtiles_big_file(mocker, input_file: str | None):
mocked_minio_client = MinIOClient(bucket=bucket, folder=folder)

with (
patch("udata_hydra.analysis.geojson.minio_client_pmtiles", new=mocked_minio_client),
patch("udata_hydra.analysis.pmtiles.minio_client_pmtiles", new=mocked_minio_client),
patch("udata_hydra.config.REMOVE_GENERATED_FILES", False),
):
mock_os = mocker.patch("udata_hydra.utils.minio.os")
Expand Down
57 changes: 37 additions & 20 deletions udata_hydra/analysis/csv.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@

from udata_hydra import config, context
from udata_hydra.analysis import helpers
from udata_hydra.analysis.geojson import csv_to_geojson_and_pmtiles
from udata_hydra.analysis.geojson import task_csv_to_geojson
from udata_hydra.db import RESERVED_COLS, compute_insert_query
from udata_hydra.db.check import Check
from udata_hydra.db.resource import Resource
Expand All @@ -44,6 +44,7 @@
Timer,
detect_tabular_from_headers,
handle_parse_exception,
queue,
remove_remainders,
)
from udata_hydra.utils.casting import generate_records
Expand Down Expand Up @@ -77,12 +78,32 @@
minio_client = MinIOClient(bucket=config.MINIO_PARQUET_BUCKET, folder=config.MINIO_PARQUET_FOLDER)


async def task_analyse_csv(
check: Record | dict,
file_path: str | None = None,
debug_insert: bool = False,
worker_exception: bool = False,
) -> None:
"""RQ task: CSV analysis; forwards worker_exception to chained task_csv_to_geojson / task_geojson_to_pmtiles."""
await analyse_csv(
check,
file_path,
debug_insert,
worker_exception,
)


async def analyse_csv(
check: Record | dict,
file_path: str | None = None,
debug_insert: bool = False,
worker_exception: bool = False,
) -> None:
"""Launch csv analysis from a check or an URL (debug), using previously downloaded file at file_path if any"""
"""Launch csv analysis from a check or an URL (debug), using previously downloaded file at file_path if any.

worker_exception: when True (RQ crawl path for resources_exceptions), chained geo jobs use the same RQ
exception timeout as this run.
"""
if not config.CSV_ANALYSIS:
log.debug("CSV_ANALYSIS turned off, skipping.")
return
Expand Down Expand Up @@ -181,24 +202,20 @@ async def analyse_csv(
check_id=check["id"],
) from e

try:
await csv_to_geojson_and_pmtiles(
file_path=tmp_file.name,
inspection=csv_inspection,
resource_id=resource_id,
check_id=check["id"],
timer=timer,
table_name=table_name if config.CSV_TO_DB else None,
)
except Exception as e:
remove_remainders(resource_id, ["geojson", "pmtiles", "pmtiles-journal"])
raise ParseException(
message=str(e),
step="geojson_export",
resource_id=resource_id,
url=url,
check_id=check["id"],
) from e
if config.DB_TO_GEOJSON:
if not config.CSV_TO_DB:
log.debug(
"Skipping GeoJSON/PMTiles RQ jobs: CSV_TO_DB is false "
"(geo export requires the parsing table in PostgreSQL)."
)
else:
queue.enqueue(
task_csv_to_geojson,
check["id"],
worker_exception=worker_exception,
_priority="low",
_exception=worker_exception,
)

check = await Check.update( # type: ignore[assignment]
check["id"],
Expand Down
Loading