Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
57 changes: 57 additions & 0 deletions tests/test_analysis/test_helpers.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
import os
import tempfile
from unittest.mock import MagicMock

import pytest

from udata_hydra.analysis import helpers
from udata_hydra.utils import IOException


@pytest.mark.asyncio
async def test_read_or_download_filename_resolution(mocker):
mocker.patch("udata_hydra.config.TEMPORARY_DOWNLOAD_FOLDER", "/tmp")
temp_dir = "/tmp"
os.makedirs(temp_dir, exist_ok=True)

# Create a temporary file in the configured temp directory
with tempfile.NamedTemporaryFile(mode="w", delete=False, dir=temp_dir) as tmp_file:
tmp_file.write("test content")
tmp_file_path = tmp_file.name

try:
mock_check = MagicMock()
mock_check["resource_id"] = "test_resource"
mock_check["url"] = "http://example.com/test.csv"

basename = os.path.basename(tmp_file_path)
result = await helpers.read_or_download_file(
check=mock_check, filename=basename, file_format="csv", exception=None
)

assert result.read() == b"test content"
result.close()

finally:
# Clean up
os.unlink(tmp_file_path)


@pytest.mark.asyncio
async def test_read_or_download_file_missing_file():
"""Test that read_or_download_file raises IOException for missing files"""

# Create a mock check record
mock_check = MagicMock()
mock_check["resource_id"] = "test_resource"
mock_check["url"] = "http://example.com/test.csv"

with pytest.raises(IOException) as exc_info:
await helpers.read_or_download_file(
check=mock_check,
filename="non_existent_file.csv",
file_format="csv",
exception=None,
)

assert "Temporary file not found" in str(exc_info.value)
6 changes: 3 additions & 3 deletions udata_hydra/analysis/csv.py
Original file line number Diff line number Diff line change
Expand Up @@ -80,10 +80,10 @@

async def analyse_csv(
check: Record | dict,
file_path: str | None = None,
filename: str | None = None,
debug_insert: bool = False,
) -> None:
"""Launch csv analysis from a check or an URL (debug), using previously downloaded file at file_path if any"""
"""Launch csv analysis from a check or an URL (debug), using previously downloaded file if any"""
if not config.CSV_ANALYSIS:
log.debug("CSV_ANALYSIS turned off, skipping.")
return
Expand Down Expand Up @@ -113,7 +113,7 @@ async def analyse_csv(
_, file_format = detect_tabular_from_headers(check)
tmp_file = await helpers.read_or_download_file(
check=check,
file_path=file_path,
filename=filename,
file_format=file_format,
exception=exception,
)
Expand Down
6 changes: 3 additions & 3 deletions udata_hydra/analysis/geojson.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,9 +39,9 @@

async def analyse_geojson(
check: Record | dict,
file_path: str | None = None,
filename: str | None = None,
) -> None:
"""Launch GeoJSON analysis from a check or an URL (debug), using previously downloaded file at file_path if any"""
"""Launch GeoJSON analysis from a check or an URL (debug), using previously downloaded file if any"""
if not config.GEOJSON_TO_PMTILES:
log.debug("GEOJSON_TO_PMTILES turned off, skipping.")
return
Expand All @@ -62,7 +62,7 @@ async def analyse_geojson(
try:
tmp_file = await helpers.read_or_download_file(
check=check,
file_path=file_path,
filename=filename,
file_format="geojson",
exception=exception,
)
Expand Down
15 changes: 9 additions & 6 deletions udata_hydra/analysis/helpers.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import json
import os
from typing import IO

from asyncpg import Record
Expand All @@ -18,19 +19,21 @@ def get_python_type(column: dict) -> str:

async def read_or_download_file(
check: Record | dict,
file_path: str | None,
filename: str | None,
file_format: str,
exception: Record | None,
) -> IO[bytes]:
if file_path:
if filename:
temp_dir = config.TEMPORARY_DOWNLOAD_FOLDER or "/tmp"
full_path = os.path.join(temp_dir, filename)
try:
return open(file_path, "rb")
except FileNotFoundError as e:
return open(full_path, "rb")
except FileNotFoundError:
raise IOException(
f"Temporary file not found: {file_path}",
f"Temporary file not found: {full_path}",
resource_id=check["resource_id"],
url=check["url"],
) from e
)
else:
tmp_file, _ = await download_resource(
url=check["url"],
Expand Down
4 changes: 2 additions & 2 deletions udata_hydra/analysis/parquet.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@

async def analyse_parquet(
check: Record | dict,
file_path: str | None = None,
filename: str | None = None,
debug_insert: bool = False,
) -> None:
"""Insert parquet file and metadata in db"""
Expand Down Expand Up @@ -79,7 +79,7 @@ async def analyse_parquet(
try:
tmp_file = await helpers.read_or_download_file(
check=check,
file_path=file_path,
filename=filename,
file_format="parquet",
exception=exception,
)
Expand Down
6 changes: 3 additions & 3 deletions udata_hydra/analysis/resource.py
Original file line number Diff line number Diff line change
Expand Up @@ -172,7 +172,7 @@ async def analyse_resource(
queue.enqueue(
analyse_csv,
check=check,
file_path=tmp_file.name,
filename=os.path.basename(tmp_file.name),
_priority="high" if worker_priority == "high" else "default",
_exception=bool(exception),
)
Expand All @@ -181,7 +181,7 @@ async def analyse_resource(
queue.enqueue(
analyse_geojson,
check=check,
file_path=tmp_file.name,
filename=os.path.basename(tmp_file.name),
_priority="high" if worker_priority == "high" else "default",
_exception=bool(exception),
)
Expand All @@ -190,7 +190,7 @@ async def analyse_resource(
queue.enqueue(
analyse_parquet,
check=check,
file_path=tmp_file.name,
filename=os.path.basename(tmp_file.name),
_priority="high" if worker_priority == "high" else "default",
_exception=bool(exception),
)
Expand Down