From cba039f0482981674f860061c8d1b1439bc5ea92 Mon Sep 17 00:00:00 2001 From: maudetes Date: Thu, 16 Apr 2026 14:35:33 +0200 Subject: [PATCH] fix: use basename as param in jobs because TEMPORARY_DOWNLOAD_FOLDER may differ between workers --- tests/test_analysis/test_helpers.py | 57 +++++++++++++++++++++++++++++ udata_hydra/analysis/csv.py | 6 +-- udata_hydra/analysis/geojson.py | 6 +-- udata_hydra/analysis/helpers.py | 15 +++++--- udata_hydra/analysis/parquet.py | 4 +- udata_hydra/analysis/resource.py | 6 +-- 6 files changed, 77 insertions(+), 17 deletions(-) create mode 100644 tests/test_analysis/test_helpers.py diff --git a/tests/test_analysis/test_helpers.py b/tests/test_analysis/test_helpers.py new file mode 100644 index 00000000..837f31c0 --- /dev/null +++ b/tests/test_analysis/test_helpers.py @@ -0,0 +1,57 @@ +import os +import tempfile +from unittest.mock import MagicMock + +import pytest + +from udata_hydra.analysis import helpers +from udata_hydra.utils import IOException + + +@pytest.mark.asyncio +async def test_read_or_download_filename_resolution(mocker): + mocker.patch("udata_hydra.config.TEMPORARY_DOWNLOAD_FOLDER", "/tmp") + temp_dir = "/tmp" + os.makedirs(temp_dir, exist_ok=True) + + # Create a temporary file in the configured temp directory + with tempfile.NamedTemporaryFile(mode="w", delete=False, dir=temp_dir) as tmp_file: + tmp_file.write("test content") + tmp_file_path = tmp_file.name + + try: + mock_check = MagicMock() + mock_check["resource_id"] = "test_resource" + mock_check["url"] = "http://example.com/test.csv" + + basename = os.path.basename(tmp_file_path) + result = await helpers.read_or_download_file( + check=mock_check, filename=basename, file_format="csv", exception=None + ) + + assert result.read() == b"test content" + result.close() + + finally: + # Clean up + os.unlink(tmp_file_path) + + +@pytest.mark.asyncio +async def test_read_or_download_file_missing_file(): + """Test that read_or_download_file raises IOException for missing files""" + + # Create a mock check record + mock_check = MagicMock() + mock_check["resource_id"] = "test_resource" + mock_check["url"] = "http://example.com/test.csv" + + with pytest.raises(IOException) as exc_info: + await helpers.read_or_download_file( + check=mock_check, + filename="non_existent_file.csv", + file_format="csv", + exception=None, + ) + + assert "Temporary file not found" in str(exc_info.value) diff --git a/udata_hydra/analysis/csv.py b/udata_hydra/analysis/csv.py index acdfbed6..d04087d2 100644 --- a/udata_hydra/analysis/csv.py +++ b/udata_hydra/analysis/csv.py @@ -80,10 +80,10 @@ async def analyse_csv( check: Record | dict, - file_path: str | None = None, + filename: str | None = None, debug_insert: bool = False, ) -> None: - """Launch csv analysis from a check or an URL (debug), using previously downloaded file at file_path if any""" + """Launch csv analysis from a check or an URL (debug), using previously downloaded file if any""" if not config.CSV_ANALYSIS: log.debug("CSV_ANALYSIS turned off, skipping.") return @@ -113,7 +113,7 @@ async def analyse_csv( _, file_format = detect_tabular_from_headers(check) tmp_file = await helpers.read_or_download_file( check=check, - file_path=file_path, + filename=filename, file_format=file_format, exception=exception, ) diff --git a/udata_hydra/analysis/geojson.py b/udata_hydra/analysis/geojson.py index 72b0cdbc..63804fa4 100644 --- a/udata_hydra/analysis/geojson.py +++ b/udata_hydra/analysis/geojson.py @@ -39,9 +39,9 @@ async def analyse_geojson( check: Record | dict, - file_path: str | None = None, + filename: str | None = None, ) -> None: - """Launch GeoJSON analysis from a check or an URL (debug), using previously downloaded file at file_path if any""" + """Launch GeoJSON analysis from a check or an URL (debug), using previously downloaded file if any""" if not config.GEOJSON_TO_PMTILES: log.debug("GEOJSON_TO_PMTILES turned off, skipping.") return @@ -62,7 +62,7 @@ async def analyse_geojson( try: tmp_file = await helpers.read_or_download_file( check=check, - file_path=file_path, + filename=filename, file_format="geojson", exception=exception, ) diff --git a/udata_hydra/analysis/helpers.py b/udata_hydra/analysis/helpers.py index 597ce76a..0e44cd8c 100644 --- a/udata_hydra/analysis/helpers.py +++ b/udata_hydra/analysis/helpers.py @@ -1,4 +1,5 @@ import json +import os from typing import IO from asyncpg import Record @@ -18,19 +19,21 @@ def get_python_type(column: dict) -> str: async def read_or_download_file( check: Record | dict, - file_path: str | None, + filename: str | None, file_format: str, exception: Record | None, ) -> IO[bytes]: - if file_path: + if filename: + temp_dir = config.TEMPORARY_DOWNLOAD_FOLDER or "/tmp" + full_path = os.path.join(temp_dir, filename) try: - return open(file_path, "rb") - except FileNotFoundError as e: + return open(full_path, "rb") + except FileNotFoundError: raise IOException( - f"Temporary file not found: {file_path}", + f"Temporary file not found: {full_path}", resource_id=check["resource_id"], url=check["url"], - ) from e + ) else: tmp_file, _ = await download_resource( url=check["url"], diff --git a/udata_hydra/analysis/parquet.py b/udata_hydra/analysis/parquet.py index 6beec008..435ff480 100755 --- a/udata_hydra/analysis/parquet.py +++ b/udata_hydra/analysis/parquet.py @@ -50,7 +50,7 @@ async def analyse_parquet( check: Record | dict, - file_path: str | None = None, + filename: str | None = None, debug_insert: bool = False, ) -> None: """Insert parquet file and metadata in db""" @@ -79,7 +79,7 @@ async def analyse_parquet( try: tmp_file = await helpers.read_or_download_file( check=check, - file_path=file_path, + filename=filename, file_format="parquet", exception=exception, ) diff --git a/udata_hydra/analysis/resource.py b/udata_hydra/analysis/resource.py index bf574b69..f00e9c7d 100644 --- a/udata_hydra/analysis/resource.py +++ b/udata_hydra/analysis/resource.py @@ -172,7 +172,7 @@ async def analyse_resource( queue.enqueue( analyse_csv, check=check, - file_path=tmp_file.name, + filename=os.path.basename(tmp_file.name), _priority="high" if worker_priority == "high" else "default", _exception=bool(exception), ) @@ -181,7 +181,7 @@ async def analyse_resource( queue.enqueue( analyse_geojson, check=check, - file_path=tmp_file.name, + filename=os.path.basename(tmp_file.name), _priority="high" if worker_priority == "high" else "default", _exception=bool(exception), ) @@ -190,7 +190,7 @@ async def analyse_resource( queue.enqueue( analyse_parquet, check=check, - file_path=tmp_file.name, + filename=os.path.basename(tmp_file.name), _priority="high" if worker_priority == "high" else "default", _exception=bool(exception), )