Skip to content

feat(csv): split GeoJSON and PMTiles into low-priority RQ jobs#408

Open
bolinocroustibat wants to merge 7 commits intogeojson-from-dbfrom
feat/separate-geo-tasks
Open

feat(csv): split GeoJSON and PMTiles into low-priority RQ jobs#408
bolinocroustibat wants to merge 7 commits intogeojson-from-dbfrom
feat/separate-geo-tasks

Conversation

@bolinocroustibat
Copy link
Copy Markdown
Contributor

@bolinocroustibat bolinocroustibat commented Apr 3, 2026

Closes #412 (and duplicate datagouv/data.gouv.fr#1991).

This is built on #404 where GeoJSON is generated from the PostgreSQL parsing table (streaming from the DB) instead of re-reading the CSV file.

Main Changes:

  • GeoJSON and PMTiles now run as separate RQ jobs on the low queue so scheduling can treat them differently from CSV-to-DB ingest. When CSV_TO_DB is enabled, geo work does not depend on the download temp file still being on disk.

  • The previous single geojson.py module is split into three smaller modules so CSV export, PMTiles conversion, and the remaining orchestration are easier to follow and test.

  • Removed CSV_TO_GEOJSON config: since GeoJSON for CSVs now comes from the Postgres parsing table, not by reading the file again, this old flag was redundant. One switch (DB_TO_GEOJSON) is enough to turn that on or off.

  • Refactored utils/files.py to use streaming HTTP for download_url_to_tempfile and relocate and unify it with download_resource : 357dfb0

Suggested in later, separate PRs:

  • Harmonize task_* RQ entrypoint names across the codebase
  • Pick a clearer module name than geojson.py for what that file still owns now that CSV GeoJSON and PMTiles live in their own modules (what about geo_analysis.py?)

@bolinocroustibat bolinocroustibat self-assigned this Apr 3, 2026
@bolinocroustibat bolinocroustibat marked this pull request as draft April 3, 2026 14:40
@bolinocroustibat bolinocroustibat added the enhancement New feature or request label Apr 3, 2026
@bolinocroustibat bolinocroustibat force-pushed the feat/separate-geo-tasks branch 7 times, most recently from a76bc0e to af597d2 Compare April 3, 2026 15:47
@bolinocroustibat bolinocroustibat marked this pull request as ready for review April 3, 2026 15:48
@bolinocroustibat bolinocroustibat changed the base branch from main to geojson-from-db April 3, 2026 15:54
@bolinocroustibat bolinocroustibat force-pushed the feat/separate-geo-tasks branch 4 times, most recently from 98ff286 to 90df48e Compare April 3, 2026 16:23
Comment thread udata_hydra/analysis/geojson.py
Comment thread udata_hydra/analysis/helpers.py Outdated
Comment on lines +45 to +58
async def download_url_to_tempfile(url: str, suffix: str = "") -> Path:
"""Download a URL to a named temporary file and return its path."""
async with aiohttp.ClientSession() as session:
async with session.get(url) as resp:
resp.raise_for_status()
body = await resp.read()
fd, raw_path = tempfile.mkstemp(suffix=suffix)
path = Path(raw_path)
try:
os.write(fd, body)
finally:
os.close(fd)
log.debug(f"Downloaded {url} to {path} ({len(body)} bytes)")
return path
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it possible to stream to file instead of putting the all file is RAM before writing to disk?

Copy link
Copy Markdown
Contributor Author

@bolinocroustibat bolinocroustibat Apr 7, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's possible, we are already doing it in utils/files.py -> download_resource. We could use a shared function between the two.

Copy link
Copy Markdown
Contributor Author

@bolinocroustibat bolinocroustibat Apr 7, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Refactored utils/files.py to use streaming HTTP for download_url_to_tempfile, also relocated and unify it with download_resource: 357dfb0

Comment on lines +280 to +283
record = await Check.get_by_id(check_id, with_deleted=True)
if not record:
log.error(f"task_csv_to_geojson: check {check_id} not found")
return
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Check.get_by_id return the check only if it is the last one for this resource. If there is a new check before this job start it will not run. Not sure if it's a problem in real life but maybe worth noting that if the low queue have trouble depiling we may never run this task (if new checks for this ressource arrive before the queue unpile)

Geo export runs as chained tasks on the low queue; GeoJSON is generated from the PostgreSQL parsing table when CSV_TO_DB is enabled. RQ entrypoint uses context for exception-queue routing.

Made-with: Cursor
bolinocroustibat and others added 3 commits April 8, 2026 09:11
# Conflicts:
#	udata_hydra/analysis/geojson.py
Co-authored-by: Thibaud Ollagnier <ThibaudDauce@users.noreply.github.com>
…url_to_tempfile

- Add _http_get_to_temp_path (Path, total_bytes cap, shared session options)
- Factor download_resource and download_url_to_tempfile through it
- GeoJSON→PMTiles: headers, max_size_allowed, IOException handling
- download_file: BinaryIO + HTTP_DOWNLOAD_CHUNK_SIZE; remove_remainders uses Path.unlink

Made-with: Cursor
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

enhancement New feature or request

Projects

None yet

Development

Successfully merging this pull request may close these issues.

[hydra] Découper les tâches d'analyse csv en sous-tâches

2 participants