From 5d1bbd0a0c5cfe32325247e72f9a06452e3feb90 Mon Sep 17 00:00:00 2001 From: Adrien Carpentier Date: Fri, 3 Apr 2026 18:54:59 +0200 Subject: [PATCH 1/5] feat(webhook): optional instant_analysis for high worker queue When udata sets instant_analysis on resource create/update, Hydra stores it on catalog, orders batch selection ahead of other priority rows, and uses the high RQ tier for crawl/analysis. Cleared after preprocess like priority. Backward compatible (default false, unknown JSON keys ignored). Closes https://github.com/datagouv/hydra/issues/386 Made-with: Cursor --- tests/test_api/test_api_resources.py | 12 ++++ tests/test_crawl/test_crawl.py | 13 +++- udata_hydra/crawl/check_resources.py | 8 ++- udata_hydra/crawl/preprocess_check_data.py | 5 +- udata_hydra/crawl/select_batch.py | 17 +++-- udata_hydra/db/resource.py | 72 ++++++++++++++----- .../20260403_add_instant_analysis_catalog.sql | 2 + udata_hydra/routes/resources.py | 6 ++ udata_hydra/schemas/resource.py | 6 +- 9 files changed, 109 insertions(+), 32 deletions(-) create mode 100644 udata_hydra/migrations/main/20260403_add_instant_analysis_catalog.sql diff --git a/tests/test_api/test_api_resources.py b/tests/test_api/test_api_resources.py index a3d5a789..be848a9c 100644 --- a/tests/test_api/test_api_resources.py +++ b/tests/test_api/test_api_resources.py @@ -29,6 +29,7 @@ async def test_get_resource(setup_catalog, client): assert data["resource_id"] == RESOURCE_ID assert data["status"] is None assert data["status_since"] is None + assert data["instant_analysis"] is False async def test_create_resource( @@ -67,6 +68,17 @@ async def test_create_resource( assert text == "Missing document body" +async def test_create_resource_instant_analysis(client, api_headers, udata_resource_payload, db): + """Optional webhook flag stores instant_analysis for high RQ tier (#386).""" + payload = {**udata_resource_payload, "instant_analysis": True} + resp = await client.post(path="/api/resources/", headers=api_headers, json=payload) + assert resp.status == 201 + rid = payload["resource_id"] + row = await db.fetchrow("SELECT instant_analysis FROM catalog WHERE resource_id = $1", rid) + assert row is not None + assert row["instant_analysis"] is True + + async def test_update_resource(client, api_headers, api_headers_wrong_token): # Test invalid PUT data stupid_post_data: dict = {"stupid": "stupid"} diff --git a/tests/test_crawl/test_crawl.py b/tests/test_crawl/test_crawl.py index 0405184f..ea28680e 100644 --- a/tests/test_crawl/test_crawl.py +++ b/tests/test_crawl/test_crawl.py @@ -797,12 +797,15 @@ async def test_reset_statuses(fake_check, db, setup_catalog, check_duration): assert row["status"] == status +@pytest.mark.parametrize( + "instant_analysis,expected_worker_priority", [(False, "default"), (True, "high")] +) @pytest.mark.parametrize( "mock_function", [ ( "udata_hydra.crawl.check_resources.check_resource", - {"url": ANY, "resource": ANY, "session": ANY, "worker_priority": "default"}, + {"url": ANY, "resource": ANY, "session": ANY}, "ok", ), ( @@ -811,7 +814,6 @@ async def test_reset_statuses(fake_check, db, setup_catalog, check_duration): "check": ANY, "last_check": ANY, "force_analysis": False, - "worker_priority": "default", }, None, ), @@ -826,16 +828,21 @@ async def test_new_resource_priority( produce_mock, api_headers, mock_function, + instant_analysis, + expected_worker_priority, ): func_path, kwargs, result = mock_function + kwargs = {**kwargs, "worker_priority": expected_worker_priority} # delete the catalog content, we only want to test the new resource await db.execute("DELETE FROM catalog") rurl = udata_resource_payload["document"]["url"] rmock.head(rurl, headers={"content-length": "1"}) - res = await client.post(path="/api/resources", headers=api_headers, json=udata_resource_payload) + payload = {**udata_resource_payload, "instant_analysis": instant_analysis} + res = await client.post(path="/api/resources", headers=api_headers, json=payload) assert res.status == 201 res = await db.fetch("SELECT * FROM catalog") assert len(res) == 1 and res[0]["priority"] is True + assert res[0]["instant_analysis"] is instant_analysis # we have to mock the functions separately, because they are intricated with patch(func_path) as mock_func: mock_func.return_value = result diff --git a/udata_hydra/crawl/check_resources.py b/udata_hydra/crawl/check_resources.py index d1ddb9db..41c064ca 100644 --- a/udata_hydra/crawl/check_resources.py +++ b/udata_hydra/crawl/check_resources.py @@ -32,6 +32,12 @@ results: defaultdict = defaultdict(int) +def _batch_worker_priority(row: Record) -> str: + if row.get("instant_analysis"): + return "high" + return "default" if row["priority"] else "low" + + async def check_batch_resources(to_parse: list[Record]) -> None: """Check a batch of resources""" context.monitor().set_status("Checking resources...") @@ -46,7 +52,7 @@ async def check_batch_resources(to_parse: list[Record]) -> None: url=row["url"], resource=row, session=session, - worker_priority="default" if row["priority"] else "low", + worker_priority=_batch_worker_priority(row), ) ) for task in asyncio.as_completed(tasks): diff --git a/udata_hydra/crawl/preprocess_check_data.py b/udata_hydra/crawl/preprocess_check_data.py index ed47c103..d68d7c39 100644 --- a/udata_hydra/crawl/preprocess_check_data.py +++ b/udata_hydra/crawl/preprocess_check_data.py @@ -72,7 +72,10 @@ async def preprocess_check_data(dataset_id: str, check_data: dict) -> tuple[dict # Update resource following check: # Reset resource status so that it's not forbidden to be checked again. # Reset priority so that it's not prioritised anymore. - await Resource.update(check_data["resource_id"], data={"status": None, "priority": False}) + await Resource.update( + check_data["resource_id"], + data={"status": None, "priority": False, "instant_analysis": False}, + ) return new_check, last_check diff --git a/udata_hydra/crawl/select_batch.py b/udata_hydra/crawl/select_batch.py index ac35c383..f1ad69ee 100644 --- a/udata_hydra/crawl/select_batch.py +++ b/udata_hydra/crawl/select_batch.py @@ -46,27 +46,29 @@ async def select_batch_resources_to_check() -> list[Record]: # first resources that are prioritised q = f""" SELECT * FROM ( - SELECT catalog.url, dataset_id, resource_id, priority + SELECT catalog.url, dataset_id, resource_id, priority, + catalog.instant_analysis, catalog.status_since FROM catalog WHERE {excluded} AND priority = True ) s - ORDER BY random() LIMIT {config.BATCH_SIZE} + ORDER BY instant_analysis DESC, status_since ASC NULLS LAST, random() LIMIT {config.BATCH_SIZE} """ to_check: list[Record] = await select_rows_based_on_query(connection, q) # then resources with no last check - # (either because they have never been checked before, or because the last check has been deleted) + # (either because they have never been checked before, or because the last check was deleted) if len(to_check) < config.BATCH_SIZE: q = f""" SELECT * FROM ( - SELECT catalog.url, dataset_id, resource_id, priority + SELECT catalog.url, dataset_id, resource_id, priority, + catalog.instant_analysis, catalog.status_since FROM catalog WHERE catalog.last_check IS NULL AND {excluded} AND priority = False ) s - ORDER BY random() LIMIT {config.BATCH_SIZE} + ORDER BY instant_analysis DESC, status_since ASC NULLS LAST, random() LIMIT {config.BATCH_SIZE} """ to_check += await select_rows_based_on_query(connection, q) @@ -76,7 +78,8 @@ async def select_batch_resources_to_check() -> list[Record]: limit = config.BATCH_SIZE - len(to_check) q = f""" SELECT * FROM ( - SELECT catalog.url, dataset_id, catalog.resource_id, catalog.priority + SELECT catalog.url, dataset_id, catalog.resource_id, catalog.priority, + catalog.instant_analysis, catalog.status_since FROM catalog, checks WHERE catalog.last_check IS NOT NULL AND catalog.last_check = checks.id @@ -84,7 +87,7 @@ async def select_batch_resources_to_check() -> list[Record]: AND {excluded} AND catalog.priority = False ) s - ORDER BY random() LIMIT {limit} + ORDER BY instant_analysis DESC, status_since ASC NULLS LAST, random() LIMIT {limit} """ to_check += await select_rows_based_on_query(connection, q, now) diff --git a/udata_hydra/db/resource.py b/udata_hydra/db/resource.py index 42c99a82..52a9a249 100644 --- a/udata_hydra/db/resource.py +++ b/udata_hydra/db/resource.py @@ -49,6 +49,7 @@ async def insert( title: str, status: str | None = None, priority: bool = True, + instant_analysis: bool = False, ) -> Record | None: if status and status not in cls.STATUSES.keys(): raise ValueError(f"Invalid status: {status}") @@ -57,8 +58,8 @@ async def insert( async with pool.acquire() as connection: # Insert new resource in catalog table and mark as high priority for crawling q = """ - INSERT INTO catalog (dataset_id, resource_id, url, type, format, deleted, status, priority, title) - VALUES ($1, $2, $3, $4, $5, FALSE, $6, $7, $8) + INSERT INTO catalog (dataset_id, resource_id, url, type, format, deleted, status, priority, title, instant_analysis) + VALUES ($1, $2, $3, $4, $5, FALSE, $6, $7, $8, $9) ON CONFLICT (resource_id) DO UPDATE SET dataset_id = $1, url = $3, @@ -67,10 +68,20 @@ async def insert( deleted = FALSE, status = $6, priority = $7, - title = $8 + title = $8, + instant_analysis = $9 RETURNING *;""" return await connection.fetchrow( - q, dataset_id, resource_id, url, type, format, status, priority, title + q, + dataset_id, + resource_id, + url, + type, + format, + status, + priority, + title, + instant_analysis, ) @classmethod @@ -102,6 +113,7 @@ async def update_or_insert( title: str, status: str | None = None, priority: bool = True, # Make resource high priority by default for crawling + instant_analysis: bool | None = None, ) -> Record | None: if status and status not in cls.STATUSES.keys(): raise ValueError(f"Invalid status: {status}") @@ -110,27 +122,49 @@ async def update_or_insert( async with pool.acquire() as connection: # Check if resource is in catalog then insert or update into table if await Resource.get(resource_id): + if instant_analysis is not None: + q = """ + UPDATE catalog + SET dataset_id = $1, url = $3, type = $4, format=$5, status = $6, priority = $7, title = $8, instant_analysis = $9 + WHERE resource_id = $2 + RETURNING *;""" + return await connection.fetchrow( + q, + dataset_id, + resource_id, + url, + type, + format, + status, + priority, + title, + instant_analysis, + ) q = """ UPDATE catalog SET dataset_id = $1, url = $3, type = $4, format=$5, status = $6, priority = $7, title = $8 WHERE resource_id = $2 RETURNING *;""" - else: - q = """ - INSERT INTO catalog (dataset_id, resource_id, url, type, format, deleted, status, priority, title) - VALUES ($1, $2, $3, $4, $5, FALSE, $6, $7, $8) - ON CONFLICT (resource_id) DO UPDATE SET - dataset_id = $1, - url = $3, - type = $4, - format = $5, - deleted = FALSE, - status = $6, - priority = $7, - title = $8 - RETURNING *;""" + return await connection.fetchrow( + q, dataset_id, resource_id, url, type, format, status, priority, title + ) + ia = instant_analysis if instant_analysis is not None else False + q = """ + INSERT INTO catalog (dataset_id, resource_id, url, type, format, deleted, status, priority, title, instant_analysis) + VALUES ($1, $2, $3, $4, $5, FALSE, $6, $7, $8, $9) + ON CONFLICT (resource_id) DO UPDATE SET + dataset_id = $1, + url = $3, + type = $4, + format = $5, + deleted = FALSE, + status = $6, + priority = $7, + title = $8, + instant_analysis = $9 + RETURNING *;""" return await connection.fetchrow( - q, dataset_id, resource_id, url, type, format, status, priority, title + q, dataset_id, resource_id, url, type, format, status, priority, title, ia ) @classmethod diff --git a/udata_hydra/migrations/main/20260403_add_instant_analysis_catalog.sql b/udata_hydra/migrations/main/20260403_add_instant_analysis_catalog.sql new file mode 100644 index 00000000..ed1ef9d9 --- /dev/null +++ b/udata_hydra/migrations/main/20260403_add_instant_analysis_catalog.sql @@ -0,0 +1,2 @@ +-- Optional flag from udata webhook: request high-priority RQ path for first analysis (#386) +ALTER TABLE catalog ADD COLUMN IF NOT EXISTS instant_analysis BOOLEAN NOT NULL DEFAULT FALSE; diff --git a/udata_hydra/routes/resources.py b/udata_hydra/routes/resources.py index c99f7d7d..603ec0c3 100644 --- a/udata_hydra/routes/resources.py +++ b/udata_hydra/routes/resources.py @@ -44,6 +44,7 @@ async def create_resource(request: web.Request) -> web.Response: dataset_id = valid_payload["dataset_id"] resource_id = valid_payload["resource_id"] + instant_analysis: bool = bool(valid_payload.get("instant_analysis", False)) await Resource.insert( dataset_id=dataset_id, @@ -53,6 +54,7 @@ async def create_resource(request: web.Request) -> web.Response: format=document["format"], priority=True, title=document["title"], + instant_analysis=instant_analysis, ) return web.json_response(ResourceDocumentSchema().dump(dict(document)), status=201) @@ -77,6 +79,9 @@ async def update_resource(request: web.Request) -> web.Response: dataset_id: str = valid_payload["dataset_id"] resource_id: str = valid_payload["resource_id"] + instant_analysis: bool | None = ( + valid_payload["instant_analysis"] if "instant_analysis" in valid_payload else None + ) await Resource.update_or_insert( dataset_id=dataset_id, @@ -85,6 +90,7 @@ async def update_resource(request: web.Request) -> web.Response: type=document["type"], format=document["format"], title=document["title"], + instant_analysis=instant_analysis, ) return web.json_response(ResourceDocumentSchema().dump(document), status=200) diff --git a/udata_hydra/schemas/resource.py b/udata_hydra/schemas/resource.py index bc5a64e9..0d91484b 100644 --- a/udata_hydra/schemas/resource.py +++ b/udata_hydra/schemas/resource.py @@ -1,4 +1,4 @@ -from marshmallow import Schema, fields +from marshmallow import EXCLUDE, Schema, fields class ResourceDocumentSchema(Schema): @@ -21,8 +21,12 @@ class ResourceDocumentSchema(Schema): class ResourceSchema(Schema): + class Meta: + unknown = EXCLUDE + dataset_id = fields.Str(required=True) resource_id = fields.Str(required=True) status = fields.Str(required=False) status_since = fields.DateTime(required=False) document = fields.Nested(ResourceDocumentSchema(), allow_none=True) + instant_analysis = fields.Bool(required=False) From fc18df66ef0656fc3da771877b3bc143a45ccbcb Mon Sep 17 00:00:00 2001 From: Adrien Carpentier Date: Fri, 3 Apr 2026 18:56:55 +0200 Subject: [PATCH 2/5] feat(webhook): background check when instant_analysis on create Schedule check_resource after POST /api/resources when instant_analysis is true; document udata coordination and CLI in README (#386). Made-with: Cursor --- tests/test_api/test_api_resources.py | 18 ++++++++++++++-- udata_hydra/routes/resources.py | 31 ++++++++++++++++++++++++++++ 2 files changed, 47 insertions(+), 2 deletions(-) diff --git a/tests/test_api/test_api_resources.py b/tests/test_api/test_api_resources.py index be848a9c..b25ad79b 100644 --- a/tests/test_api/test_api_resources.py +++ b/tests/test_api/test_api_resources.py @@ -68,8 +68,11 @@ async def test_create_resource( assert text == "Missing document body" -async def test_create_resource_instant_analysis(client, api_headers, udata_resource_payload, db): - """Optional webhook flag stores instant_analysis for high RQ tier (#386).""" +async def test_create_resource_instant_analysis( + client, api_headers, udata_resource_payload, db, mocker +): + """Optional webhook flag stores instant_analysis and schedules a background check (#386).""" + mock_create_task = mocker.patch("udata_hydra.routes.resources.asyncio.create_task") payload = {**udata_resource_payload, "instant_analysis": True} resp = await client.post(path="/api/resources/", headers=api_headers, json=payload) assert resp.status == 201 @@ -77,6 +80,17 @@ async def test_create_resource_instant_analysis(client, api_headers, udata_resou row = await db.fetchrow("SELECT instant_analysis FROM catalog WHERE resource_id = $1", rid) assert row is not None assert row["instant_analysis"] is True + mock_create_task.assert_called_once() + + +async def test_create_resource_without_instant_analysis_no_background_task( + client, api_headers, udata_resource_payload, mocker +): + mock_create_task = mocker.patch("udata_hydra.routes.resources.asyncio.create_task") + payload = {**udata_resource_payload} + resp = await client.post(path="/api/resources/", headers=api_headers, json=payload) + assert resp.status == 201 + mock_create_task.assert_not_called() async def test_update_resource(client, api_headers, api_headers_wrong_token): diff --git a/udata_hydra/routes/resources.py b/udata_hydra/routes/resources.py index 603ec0c3..3873c06a 100644 --- a/udata_hydra/routes/resources.py +++ b/udata_hydra/routes/resources.py @@ -1,12 +1,40 @@ +import asyncio import json +import logging import uuid +import aiohttp from aiohttp import web from asyncpg import Record +from udata_hydra import config from udata_hydra.db.resource import Resource from udata_hydra.schemas import ResourceDocumentSchema, ResourceSchema +log = logging.getLogger(__name__) + + +async def _immediate_check_resource(resource_id: str) -> None: + """Run check_resource in the background (same RQ tier as POST /api/checks).""" + from udata_hydra.crawl.check_resources import check_resource + + try: + resource = await Resource.get(resource_id) + if resource is None or resource["status"] is not None: + return + async with aiohttp.ClientSession( + timeout=None, + headers={"user-agent": config.USER_AGENT_FULL}, + ) as session: + await check_resource( + url=resource["url"], + resource=resource, + session=session, + worker_priority="high", + ) + except Exception: + log.exception("Background check failed for resource %s", resource_id) + async def get_resource(request: web.Request) -> web.Response: """Endpoint to get a resource from the DB @@ -57,6 +85,9 @@ async def create_resource(request: web.Request) -> web.Response: instant_analysis=instant_analysis, ) + if instant_analysis: + asyncio.create_task(_immediate_check_resource(resource_id)) + return web.json_response(ResourceDocumentSchema().dump(dict(document)), status=201) From 357549ebf6613d559f8d3995a13de026b9fc97cd Mon Sep 17 00:00:00 2001 From: Adrien Carpentier Date: Wed, 8 Apr 2026 16:06:05 +0200 Subject: [PATCH 3/5] refactor(routes): hoist check_resource import in resources module Made-with: Cursor --- udata_hydra/routes/resources.py | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/udata_hydra/routes/resources.py b/udata_hydra/routes/resources.py index 3873c06a..aa7fa508 100644 --- a/udata_hydra/routes/resources.py +++ b/udata_hydra/routes/resources.py @@ -8,6 +8,7 @@ from asyncpg import Record from udata_hydra import config +from udata_hydra.crawl.check_resources import check_resource from udata_hydra.db.resource import Resource from udata_hydra.schemas import ResourceDocumentSchema, ResourceSchema @@ -16,8 +17,6 @@ async def _immediate_check_resource(resource_id: str) -> None: """Run check_resource in the background (same RQ tier as POST /api/checks).""" - from udata_hydra.crawl.check_resources import check_resource - try: resource = await Resource.get(resource_id) if resource is None or resource["status"] is not None: From cc4a9966e7a50ce4466d3980b16d89afa1a0cd54 Mon Sep 17 00:00:00 2001 From: Adrien Carpentier Date: Wed, 8 Apr 2026 16:19:40 +0200 Subject: [PATCH 4/5] fix(api): retain asyncio task refs for instant analysis checks Made-with: Cursor --- udata_hydra/app.py | 1 + udata_hydra/routes/resources.py | 5 ++++- 2 files changed, 5 insertions(+), 1 deletion(-) diff --git a/udata_hydra/app.py b/udata_hydra/app.py index 1909aea8..17f7b4a6 100644 --- a/udata_hydra/app.py +++ b/udata_hydra/app.py @@ -12,6 +12,7 @@ async def app_factory() -> web.Application: async def app_startup(app): app["pool"] = await context.pool() app["started_at"] = datetime.now(timezone.utc) + app["background_tasks"] = set() async def app_cleanup(app): if "pool" in app: diff --git a/udata_hydra/routes/resources.py b/udata_hydra/routes/resources.py index aa7fa508..5a48337e 100644 --- a/udata_hydra/routes/resources.py +++ b/udata_hydra/routes/resources.py @@ -85,7 +85,10 @@ async def create_resource(request: web.Request) -> web.Response: ) if instant_analysis: - asyncio.create_task(_immediate_check_resource(resource_id)) + background_tasks = request.app["background_tasks"] + task = asyncio.create_task(_immediate_check_resource(resource_id)) + background_tasks.add(task) + task.add_done_callback(background_tasks.discard) return web.json_response(ResourceDocumentSchema().dump(dict(document)), status=201) From c321609472de196883a87b7ba043ed3fc09cdb7c Mon Sep 17 00:00:00 2001 From: Adrien Carpentier Date: Wed, 8 Apr 2026 16:29:24 +0200 Subject: [PATCH 5/5] fix(resources): claim CRAWLING_URL before immediate check Avoid batch selecting the same resource while instant_analysis runs. Clear stuck CRAWLING_URL if check_resource fails unexpectedly. Made-with: Cursor --- tests/test_api/test_api_resources.py | 13 +++++++++++ tests/test_db/test_db_resources.py | 35 +++++++++++++++++++++++++++- udata_hydra/db/resource.py | 20 ++++++++++++++++ udata_hydra/routes/resources.py | 11 ++++++--- 4 files changed, 75 insertions(+), 4 deletions(-) diff --git a/tests/test_api/test_api_resources.py b/tests/test_api/test_api_resources.py index b25ad79b..df226222 100644 --- a/tests/test_api/test_api_resources.py +++ b/tests/test_api/test_api_resources.py @@ -8,6 +8,7 @@ import pytest from tests.conftest import DATASET_ID, NOT_EXISTING_RESOURCE_ID, RESOURCE_ID, RESOURCE_URL +from udata_hydra.routes import resources as resources_routes pytestmark = pytest.mark.asyncio @@ -93,6 +94,18 @@ async def test_create_resource_without_instant_analysis_no_background_task( mock_create_task.assert_not_called() +async def test_immediate_check_clears_crawling_url_when_check_raises(setup_catalog, db, mocker): + mocker.patch.object( + resources_routes, + "check_resource", + side_effect=RuntimeError("simulated failure"), + ) + await resources_routes._immediate_check_resource(RESOURCE_ID) + row = await db.fetchrow("SELECT status FROM catalog WHERE resource_id = $1", RESOURCE_ID) + assert row is not None + assert row["status"] is None + + async def test_update_resource(client, api_headers, api_headers_wrong_token): # Test invalid PUT data stupid_post_data: dict = {"stupid": "stupid"} diff --git a/tests/test_db/test_db_resources.py b/tests/test_db/test_db_resources.py index c881208a..8e37322b 100644 --- a/tests/test_db/test_db_resources.py +++ b/tests/test_db/test_db_resources.py @@ -2,15 +2,48 @@ import nest_asyncio2 as nest_asyncio import pytest -from tests.conftest import DATABASE_URL, DATASET_ID, RESOURCE_ID, RESOURCE_URL +from tests.conftest import ( + DATABASE_URL, + DATASET_ID, + NOT_EXISTING_RESOURCE_ID, + RESOURCE_ID, + RESOURCE_URL, +) from udata_hydra.cli import load_catalog from udata_hydra.crawl import start_checks +from udata_hydra.db.resource import Resource pytestmark = pytest.mark.asyncio # allows nested async to test async with async :mindblown: nest_asyncio.apply() +async def test_claim_for_crawl_sets_crawling_url(setup_catalog): + row = await Resource.claim_for_crawl(RESOURCE_ID) + assert row is not None + assert row["status"] == "CRAWLING_URL" + assert row["status_since"] is not None + + +async def test_claim_for_crawl_returns_none_when_already_crawling(setup_catalog): + assert await Resource.claim_for_crawl(RESOURCE_ID) is not None + assert await Resource.claim_for_crawl(RESOURCE_ID) is None + + +async def test_claim_for_crawl_from_backoff(setup_catalog, db): + await db.execute( + "UPDATE catalog SET status = 'BACKOFF' WHERE resource_id = $1", + RESOURCE_ID, + ) + row = await Resource.claim_for_crawl(RESOURCE_ID) + assert row is not None + assert row["status"] == "CRAWLING_URL" + + +async def test_claim_for_crawl_missing_resource(setup_catalog): + assert await Resource.claim_for_crawl(NOT_EXISTING_RESOURCE_ID) is None + + async def test_catalog(setup_catalog, db): res = await db.fetch( "SELECT * FROM catalog WHERE resource_id = $1", diff --git a/udata_hydra/db/resource.py b/udata_hydra/db/resource.py index 52a9a249..b0db3ec7 100644 --- a/udata_hydra/db/resource.py +++ b/udata_hydra/db/resource.py @@ -38,6 +38,26 @@ async def get(cls, resource_id: str, column_name: str = "*") -> Record | None: q = f"""SELECT {column_name} FROM catalog WHERE resource_id = '{resource_id}';""" return await connection.fetchrow(q) + @classmethod + async def claim_for_crawl(cls, resource_id: str) -> Record | None: + """Mark the resource as CRAWLING_URL if it is eligible for batch selection. + + Eligible means: not deleted and ``status`` is NULL or BACKOFF (same idea as + ``get_excluded_clause``). Returns the updated row, or None if missing / not eligible. + """ + pool = await context.pool() + now = datetime.now(timezone.utc) + async with pool.acquire() as connection: + q = """ + UPDATE catalog + SET status = 'CRAWLING_URL', status_since = $2 + WHERE resource_id = $1 + AND deleted = FALSE + AND (status IS NULL OR status = 'BACKOFF') + RETURNING *; + """ + return await connection.fetchrow(q, resource_id, now) + @classmethod async def insert( cls, diff --git a/udata_hydra/routes/resources.py b/udata_hydra/routes/resources.py index 5a48337e..064378d8 100644 --- a/udata_hydra/routes/resources.py +++ b/udata_hydra/routes/resources.py @@ -17,9 +17,10 @@ async def _immediate_check_resource(resource_id: str) -> None: """Run check_resource in the background (same RQ tier as POST /api/checks).""" + resource: Record | None = None try: - resource = await Resource.get(resource_id) - if resource is None or resource["status"] is not None: + resource = await Resource.claim_for_crawl(resource_id) + if resource is None: return async with aiohttp.ClientSession( timeout=None, @@ -32,7 +33,11 @@ async def _immediate_check_resource(resource_id: str) -> None: worker_priority="high", ) except Exception: - log.exception("Background check failed for resource %s", resource_id) + log.exception(f"Immediate resource check failed for {resource_id}") + if resource is not None: + row = await Resource.get(resource_id, "status") + if row is not None and row["status"] == "CRAWLING_URL": + await Resource.update(resource_id, {"status": None}) async def get_resource(request: web.Request) -> web.Response: