Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ dynamic = ["version"]
description = "Async crawler and parsing service for data.gouv.fr"
authors = [{ name = "Opendata Team", email = "opendatateam@data.gouv.fr" }]
dependencies = [
"boto3>=1.35.0",
"aiohttp>=3.10.3",
"asyncpg>=0.29.0",
"coloredlogs>=15.0.1",
Expand All @@ -16,7 +17,6 @@ dependencies = [
"humanfriendly>=10.0",
"json-stream>=2.3.3",
"marshmallow>=3.14.1",
"minio>=7.2.8",
"owslib>=0.35.0",
"progressist>=0.1.0",
"pyarrow>=16.1.0",
Expand Down
42 changes: 15 additions & 27 deletions tests/test_analysis/test_analysis_csv.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
from udata_hydra.crawl.check_resources import check_resource
from udata_hydra.db.check import Check
from udata_hydra.db.resource import Resource
from udata_hydra.utils.minio import MinIOClient
from udata_hydra.utils.s3 import S3Client

pytestmark = pytest.mark.asyncio

Expand Down Expand Up @@ -652,30 +652,24 @@ async def test_csv_to_geojson_pmtiles(db, params, clean_db, mocker):
assert res is None
mock_func.assert_not_called()
else:
minio_url = "my.minio.fr"
s3_endpoint = "s3.example.com"
geojson_bucket = "geojson_bucket"
geojson_folder = "geojson_folder"
pmtiles_bucket = "pmtiles_bucket"
pmtiles_folder = "pmtiles_folder"
mocker.patch("udata_hydra.config.MINIO_URL", minio_url)
mocked_minio = MagicMock()
mocked_minio.fput_object.return_value = None
mocked_minio.bucket_exists.return_value = True
with patch("udata_hydra.utils.minio.Minio", return_value=mocked_minio):
mocked_minio_client_geojson = MinIOClient(
bucket=geojson_bucket, folder=geojson_folder
)
mocked_minio_client_pmtiles = MinIOClient(
bucket=pmtiles_bucket, folder=pmtiles_folder
)
mocker.patch("udata_hydra.config.S3_ENDPOINT", s3_endpoint)
mocked_resource = MagicMock()
mocked_resource.meta.client.head_bucket.return_value = {}
mocked_resource.Bucket.return_value = MagicMock()
with patch("udata_hydra.utils.s3.boto3.resource", return_value=mocked_resource):
mocked_s3_client_geojson = S3Client(bucket=geojson_bucket)
mocked_s3_client_pmtiles = S3Client(bucket=pmtiles_bucket)
with (
patch(
"udata_hydra.analysis.geojson.minio_client_geojson",
new=mocked_minio_client_geojson,
"udata_hydra.analysis.geojson.s3_client_geojson",
new=mocked_s3_client_geojson,
),
patch(
"udata_hydra.analysis.geojson.minio_client_pmtiles",
new=mocked_minio_client_pmtiles,
"udata_hydra.analysis.geojson.s3_client_pmtiles",
new=mocked_s3_client_pmtiles,
),
):
result = await csv_to_geojson_and_pmtiles(fp.name, inspection, RESOURCE_ID)
Expand Down Expand Up @@ -714,20 +708,14 @@ async def test_csv_to_geojson_pmtiles(db, params, clean_db, mocker):
]
)
assert not any(col in feat["properties"] for col in expected_formats)
assert (
geojson_url
== f"https://{minio_url}/{geojson_bucket}/{geojson_folder}/{RESOURCE_ID}.geojson"
)
assert geojson_url == f"https://{s3_endpoint}/{geojson_bucket}/{RESOURCE_ID}.geojson"
assert isinstance(geojson_size, int)

# checking PMTiles
with open(f"{RESOURCE_ID}.pmtiles", "rb") as f:
header = f.read(7)
assert header == b"PMTiles"
assert (
pmtiles_url
== f"https://{minio_url}/{pmtiles_bucket}/{pmtiles_folder}/{RESOURCE_ID}.pmtiles"
)
assert pmtiles_url == f"https://{s3_endpoint}/{pmtiles_bucket}/{RESOURCE_ID}.pmtiles"
assert isinstance(pmtiles_size, int)

# Clean up files after tests
Expand Down
75 changes: 33 additions & 42 deletions tests/test_analysis/test_geojson.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
csv_to_geojson,
geojson_to_pmtiles,
)
from udata_hydra.utils.minio import MinIOClient
from udata_hydra.utils.s3 import S3Client
from udata_hydra.utils.timer import Timer

log = logging.getLogger("udata-hydra")
Expand Down Expand Up @@ -44,32 +44,29 @@ async def test_geojson_to_pmtiles_invalid_geometry():
@pytest.mark.asyncio
async def test_geojson_to_pmtiles_valid_geometry(mocker):
"""Test handling of valid geometry"""
minio_url = "my.minio.fr"
s3_endpoint = "s3.example.com"
bucket = "bucket"
folder = "folder"
mocker.patch("udata_hydra.config.MINIO_URL", minio_url)
mocked_minio = MagicMock()
mocked_minio.fput_object.return_value = None
mocked_minio.bucket_exists.return_value = True
mocker.patch("udata_hydra.config.S3_ENDPOINT", s3_endpoint)
mocked_resource = MagicMock()
mocked_resource.meta.client.head_bucket.return_value = {}
mocked_resource.Bucket.return_value = MagicMock()
# Make sure that we don't crash even if output pmtiles already exists
Path(f"{RESOURCE_ID}.pmtiles").touch()
with patch("udata_hydra.utils.minio.Minio", return_value=mocked_minio):
mocked_minio_client = MinIOClient(bucket=bucket, folder=folder)
with patch("udata_hydra.utils.s3.boto3.resource", return_value=mocked_resource):
mocked_s3_client = S3Client(bucket=bucket)
with (
patch("udata_hydra.analysis.geojson.minio_client_pmtiles", new=mocked_minio_client),
patch("udata_hydra.analysis.geojson.s3_client_pmtiles", new=mocked_s3_client),
patch("udata_hydra.config.REMOVE_GENERATED_FILES", False),
):
mock_os = mocker.patch("udata_hydra.utils.minio.os")
mock_os.path = os.path
mock_os.remove.return_value = None
mocker.patch("udata_hydra.utils.s3.Path.unlink", MagicMock())
size, url = await geojson_to_pmtiles(
Path("tests/data/valid.geojson"), Path(f"{RESOURCE_ID}.pmtiles")
)
# very (too?) simple test, we could install a specific library to read the file
with open(f"{RESOURCE_ID}.pmtiles", "rb") as f:
header = f.read(7)
assert header == b"PMTiles"
assert url == f"https://{minio_url}/{bucket}/{folder}/{RESOURCE_ID}.pmtiles"
assert url == f"https://{s3_endpoint}/{bucket}/{RESOURCE_ID}.pmtiles"
# size slightly differs depending on the env
assert 850 <= size <= 900
os.remove(f"{RESOURCE_ID}.pmtiles")
Expand Down Expand Up @@ -112,22 +109,19 @@ async def test_csv_to_geojson_big_file(
# Create timer for performance measurement
timer = Timer("csv-to-geojson-performance-test")

# Mock MinIO for the test
minio_url = "my.minio.fr"
# Mock S3 for the test
s3_endpoint = "s3.example.com"
bucket = "bucket"
folder = "folder"
mocker.patch("udata_hydra.config.MINIO_URL", minio_url)
mocked_minio = MagicMock()
mocked_minio.fput_object.return_value = None
mocked_minio.bucket_exists.return_value = True
mocker.patch("udata_hydra.config.S3_ENDPOINT", s3_endpoint)
mocked_resource = MagicMock()
mocked_resource.meta.client.head_bucket.return_value = {}
mocked_resource.Bucket.return_value = MagicMock()

with patch("udata_hydra.utils.minio.Minio", return_value=mocked_minio):
mocked_minio_client = MinIOClient(bucket=bucket, folder=folder)
with patch("udata_hydra.utils.s3.boto3.resource", return_value=mocked_resource):
mocked_s3_client = S3Client(bucket=bucket)

with patch("udata_hydra.analysis.geojson.minio_client_geojson", new=mocked_minio_client):
mock_os = mocker.patch("udata_hydra.utils.minio.os")
mock_os.path = os.path
mock_os.remove.return_value = None
with patch("udata_hydra.analysis.geojson.s3_client_geojson", new=mocked_s3_client):
mocker.patch("udata_hydra.utils.s3.Path.unlink", MagicMock())

# Analyze the CSV with csv_detective first
inspection, df = csv_detective_routine(
Expand All @@ -145,7 +139,7 @@ async def test_csv_to_geojson_big_file(
file_path=str(csv_path),
inspection=inspection,
output_file_path=test_geojson_path,
upload_to_minio=False,
upload_to_s3=False,
)
timer.mark("geojson-conversion")

Expand Down Expand Up @@ -198,25 +192,22 @@ async def test_geojson_to_pmtiles_big_file(mocker, input_file: str | None):
# Create timer for performance measurement
timer = Timer("geojson-to-pmtiles-performance-test")

# Mock MinIO for the test
minio_url = "my.minio.fr"
# Mock S3 for the test
s3_endpoint = "s3.example.com"
bucket = "bucket"
folder = "folder"
mocker.patch("udata_hydra.config.MINIO_URL", minio_url)
mocked_minio = MagicMock()
mocked_minio.fput_object.return_value = None
mocked_minio.bucket_exists.return_value = True
mocker.patch("udata_hydra.config.S3_ENDPOINT", s3_endpoint)
mocked_resource = MagicMock()
mocked_resource.meta.client.head_bucket.return_value = {}
mocked_resource.Bucket.return_value = MagicMock()

with patch("udata_hydra.utils.minio.Minio", return_value=mocked_minio):
mocked_minio_client = MinIOClient(bucket=bucket, folder=folder)
with patch("udata_hydra.utils.s3.boto3.resource", return_value=mocked_resource):
mocked_s3_client = S3Client(bucket=bucket)

with (
patch("udata_hydra.analysis.geojson.minio_client_pmtiles", new=mocked_minio_client),
patch("udata_hydra.analysis.geojson.s3_client_pmtiles", new=mocked_s3_client),
patch("udata_hydra.config.REMOVE_GENERATED_FILES", False),
):
mock_os = mocker.patch("udata_hydra.utils.minio.os")
mock_os.path = os.path
mock_os.remove.return_value = None
mocker.patch("udata_hydra.utils.s3.Path.unlink", MagicMock())

# Test the performance of geojson_to_pmtiles with the real file
result = await geojson_to_pmtiles(geojson_path, test_pmtiles_path)
Expand All @@ -227,7 +218,7 @@ async def test_geojson_to_pmtiles_big_file(mocker, input_file: str | None):
with test_pmtiles_path.open("rb") as f:
header = f.read(7)
assert header == b"PMTiles"
assert pmtiles_url == f"https://{minio_url}/{bucket}/{folder}/{geojson_path.stem}.pmtiles"
assert pmtiles_url == f"https://{s3_endpoint}/{bucket}/{geojson_path.stem}.pmtiles"

# The size should be significantly larger than the small test file
assert pmtiles_size > 5000 # Should be much larger than the 850-900 range of small file
Expand Down
21 changes: 10 additions & 11 deletions tests/test_analysis/test_parquet_export.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,8 @@
csv_to_parquet,
generate_records,
)
from udata_hydra.utils.minio import MinIOClient
from udata_hydra.utils.parquet import save_as_parquet, save_as_parquet_from_db
from udata_hydra.utils.s3 import S3Client

pytestmark = pytest.mark.asyncio

Expand Down Expand Up @@ -109,18 +109,17 @@ async def execute_csv_to_parquet() -> tuple[str, int] | None:
assert not await execute_csv_to_parquet()

else:
minio_url = "my.minio.fr"
s3_endpoint = "s3.example.com"
bucket = "bucket"
folder = "folder"
mocker.patch("udata_hydra.config.MINIO_URL", minio_url)
mocked_minio = MagicMock()
mocked_minio.fput_object.return_value = None
mocked_minio.bucket_exists.return_value = True
with patch("udata_hydra.utils.minio.Minio", return_value=mocked_minio):
mocked_minio_client = MinIOClient(bucket=bucket, folder=folder)
with patch("udata_hydra.analysis.csv.minio_client", new=mocked_minio_client):
mocker.patch("udata_hydra.config.S3_ENDPOINT", s3_endpoint)
mocked_resource = MagicMock()
mocked_resource.meta.client.head_bucket.return_value = {}
mocked_resource.Bucket.return_value = MagicMock()
with patch("udata_hydra.utils.s3.boto3.resource", return_value=mocked_resource):
mocked_s3_client = S3Client(bucket=bucket)
with patch("udata_hydra.analysis.csv.s3_client", new=mocked_s3_client):
result = await execute_csv_to_parquet()
assert result is not None
parquet_url, parquet_size = result
assert parquet_url == f"https://{minio_url}/{bucket}/{folder}/{RESOURCE_ID}.parquet"
assert parquet_url == f"https://{s3_endpoint}/{bucket}/{RESOURCE_ID}.parquet"
assert isinstance(parquet_size, int)
41 changes: 41 additions & 0 deletions tests/test_utils/test_s3_client.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
"""Unit tests for S3Client (object key prefix / public URL)."""

from collections.abc import Iterator
from pathlib import Path
from unittest.mock import MagicMock, patch

import pytest
from pytest_mock import MockerFixture

from udata_hydra.utils.s3 import S3Client


@pytest.fixture
def mock_s3(mocker: MockerFixture) -> Iterator[MagicMock]:
mocker.patch("udata_hydra.config.S3_ENDPOINT", "s3.example.com")
resource = MagicMock()
resource.meta.client.head_bucket.return_value = {}
bucket = MagicMock()
resource.Bucket.return_value = bucket
with patch("udata_hydra.utils.s3.boto3.resource", return_value=resource):
yield bucket


def test_s3_client_upload_at_bucket_root(mock_s3: MagicMock, tmp_path: Path) -> None:
f = tmp_path / "file.parquet"
f.write_bytes(b"x")
client = S3Client(bucket="my-bucket")
url = client.send_file(f, delete_source=False)

mock_s3.upload_file.assert_called_once_with(str(f), "file.parquet")
assert url == "https://s3.example.com/my-bucket/file.parquet"


def test_s3_client_upload_with_prefix(mock_s3: MagicMock, tmp_path: Path) -> None:
f = tmp_path / "file.parquet"
f.write_bytes(b"x")
client = S3Client(bucket="my-bucket", prefix="exports")
url = client.send_file(f, delete_source=False)

mock_s3.upload_file.assert_called_once_with(str(f), "exports/file.parquet")
assert url == "https://s3.example.com/my-bucket/exports/file.parquet"
3 changes: 0 additions & 3 deletions udata_hydra/__init__.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,9 @@
import importlib.metadata
import logging
import os
import re
import tomllib
from pathlib import Path

log = logging.getLogger("udata-hydra")


class Configurator:
"""Loads a dict of config from TOML file(s) and behaves like an object, ie config.VALUE"""
Expand Down
6 changes: 3 additions & 3 deletions udata_hydra/analysis/csv.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,8 +47,8 @@
remove_remainders,
)
from udata_hydra.utils.casting import generate_records
from udata_hydra.utils.minio import MinIOClient
from udata_hydra.utils.parquet import save_as_parquet, save_as_parquet_from_db
from udata_hydra.utils.s3 import S3Client

log = logging.getLogger("udata-hydra")

Expand All @@ -75,7 +75,7 @@
}

RESERVED_COLS = ("__id", "cmin", "cmax", "collation", "ctid", "tableoid", "xmin", "xmax")
minio_client = MinIOClient(bucket=config.MINIO_PARQUET_BUCKET, folder=config.MINIO_PARQUET_FOLDER)
s3_client = S3Client(bucket=config.S3_PARQUET_BUCKET, prefix=config.S3_PARQUET_PREFIX)


async def analyse_csv(
Expand Down Expand Up @@ -353,7 +353,7 @@ async def csv_to_parquet(
output_filename=resource_id,
)
parquet_size: int = os.path.getsize(parquet_file)
parquet_url: str = minio_client.send_file(parquet_file)
parquet_url: str = s3_client.send_file(parquet_file)

await Check.update(
check_id,
Expand Down
Loading