diff --git a/tests/test_api/test_api_resources.py b/tests/test_api/test_api_resources.py index a3d5a789..df226222 100644 --- a/tests/test_api/test_api_resources.py +++ b/tests/test_api/test_api_resources.py @@ -8,6 +8,7 @@ import pytest from tests.conftest import DATASET_ID, NOT_EXISTING_RESOURCE_ID, RESOURCE_ID, RESOURCE_URL +from udata_hydra.routes import resources as resources_routes pytestmark = pytest.mark.asyncio @@ -29,6 +30,7 @@ async def test_get_resource(setup_catalog, client): assert data["resource_id"] == RESOURCE_ID assert data["status"] is None assert data["status_since"] is None + assert data["instant_analysis"] is False async def test_create_resource( @@ -67,6 +69,43 @@ async def test_create_resource( assert text == "Missing document body" +async def test_create_resource_instant_analysis( + client, api_headers, udata_resource_payload, db, mocker +): + """Optional webhook flag stores instant_analysis and schedules a background check (#386).""" + mock_create_task = mocker.patch("udata_hydra.routes.resources.asyncio.create_task") + payload = {**udata_resource_payload, "instant_analysis": True} + resp = await client.post(path="/api/resources/", headers=api_headers, json=payload) + assert resp.status == 201 + rid = payload["resource_id"] + row = await db.fetchrow("SELECT instant_analysis FROM catalog WHERE resource_id = $1", rid) + assert row is not None + assert row["instant_analysis"] is True + mock_create_task.assert_called_once() + + +async def test_create_resource_without_instant_analysis_no_background_task( + client, api_headers, udata_resource_payload, mocker +): + mock_create_task = mocker.patch("udata_hydra.routes.resources.asyncio.create_task") + payload = {**udata_resource_payload} + resp = await client.post(path="/api/resources/", headers=api_headers, json=payload) + assert resp.status == 201 + mock_create_task.assert_not_called() + + +async def test_immediate_check_clears_crawling_url_when_check_raises(setup_catalog, db, mocker): + mocker.patch.object( + resources_routes, + "check_resource", + side_effect=RuntimeError("simulated failure"), + ) + await resources_routes._immediate_check_resource(RESOURCE_ID) + row = await db.fetchrow("SELECT status FROM catalog WHERE resource_id = $1", RESOURCE_ID) + assert row is not None + assert row["status"] is None + + async def test_update_resource(client, api_headers, api_headers_wrong_token): # Test invalid PUT data stupid_post_data: dict = {"stupid": "stupid"} diff --git a/tests/test_crawl/test_crawl.py b/tests/test_crawl/test_crawl.py index 0405184f..ea28680e 100644 --- a/tests/test_crawl/test_crawl.py +++ b/tests/test_crawl/test_crawl.py @@ -797,12 +797,15 @@ async def test_reset_statuses(fake_check, db, setup_catalog, check_duration): assert row["status"] == status +@pytest.mark.parametrize( + "instant_analysis,expected_worker_priority", [(False, "default"), (True, "high")] +) @pytest.mark.parametrize( "mock_function", [ ( "udata_hydra.crawl.check_resources.check_resource", - {"url": ANY, "resource": ANY, "session": ANY, "worker_priority": "default"}, + {"url": ANY, "resource": ANY, "session": ANY}, "ok", ), ( @@ -811,7 +814,6 @@ async def test_reset_statuses(fake_check, db, setup_catalog, check_duration): "check": ANY, "last_check": ANY, "force_analysis": False, - "worker_priority": "default", }, None, ), @@ -826,16 +828,21 @@ async def test_new_resource_priority( produce_mock, api_headers, mock_function, + instant_analysis, + expected_worker_priority, ): func_path, kwargs, result = mock_function + kwargs = {**kwargs, "worker_priority": expected_worker_priority} # delete the catalog content, we only want to test the new resource await db.execute("DELETE FROM catalog") rurl = udata_resource_payload["document"]["url"] rmock.head(rurl, headers={"content-length": "1"}) - res = await client.post(path="/api/resources", headers=api_headers, json=udata_resource_payload) + payload = {**udata_resource_payload, "instant_analysis": instant_analysis} + res = await client.post(path="/api/resources", headers=api_headers, json=payload) assert res.status == 201 res = await db.fetch("SELECT * FROM catalog") assert len(res) == 1 and res[0]["priority"] is True + assert res[0]["instant_analysis"] is instant_analysis # we have to mock the functions separately, because they are intricated with patch(func_path) as mock_func: mock_func.return_value = result diff --git a/tests/test_db/test_db_resources.py b/tests/test_db/test_db_resources.py index c881208a..8e37322b 100644 --- a/tests/test_db/test_db_resources.py +++ b/tests/test_db/test_db_resources.py @@ -2,15 +2,48 @@ import nest_asyncio2 as nest_asyncio import pytest -from tests.conftest import DATABASE_URL, DATASET_ID, RESOURCE_ID, RESOURCE_URL +from tests.conftest import ( + DATABASE_URL, + DATASET_ID, + NOT_EXISTING_RESOURCE_ID, + RESOURCE_ID, + RESOURCE_URL, +) from udata_hydra.cli import load_catalog from udata_hydra.crawl import start_checks +from udata_hydra.db.resource import Resource pytestmark = pytest.mark.asyncio # allows nested async to test async with async :mindblown: nest_asyncio.apply() +async def test_claim_for_crawl_sets_crawling_url(setup_catalog): + row = await Resource.claim_for_crawl(RESOURCE_ID) + assert row is not None + assert row["status"] == "CRAWLING_URL" + assert row["status_since"] is not None + + +async def test_claim_for_crawl_returns_none_when_already_crawling(setup_catalog): + assert await Resource.claim_for_crawl(RESOURCE_ID) is not None + assert await Resource.claim_for_crawl(RESOURCE_ID) is None + + +async def test_claim_for_crawl_from_backoff(setup_catalog, db): + await db.execute( + "UPDATE catalog SET status = 'BACKOFF' WHERE resource_id = $1", + RESOURCE_ID, + ) + row = await Resource.claim_for_crawl(RESOURCE_ID) + assert row is not None + assert row["status"] == "CRAWLING_URL" + + +async def test_claim_for_crawl_missing_resource(setup_catalog): + assert await Resource.claim_for_crawl(NOT_EXISTING_RESOURCE_ID) is None + + async def test_catalog(setup_catalog, db): res = await db.fetch( "SELECT * FROM catalog WHERE resource_id = $1", diff --git a/udata_hydra/app.py b/udata_hydra/app.py index 1909aea8..17f7b4a6 100644 --- a/udata_hydra/app.py +++ b/udata_hydra/app.py @@ -12,6 +12,7 @@ async def app_factory() -> web.Application: async def app_startup(app): app["pool"] = await context.pool() app["started_at"] = datetime.now(timezone.utc) + app["background_tasks"] = set() async def app_cleanup(app): if "pool" in app: diff --git a/udata_hydra/crawl/check_resources.py b/udata_hydra/crawl/check_resources.py index d1ddb9db..41c064ca 100644 --- a/udata_hydra/crawl/check_resources.py +++ b/udata_hydra/crawl/check_resources.py @@ -32,6 +32,12 @@ results: defaultdict = defaultdict(int) +def _batch_worker_priority(row: Record) -> str: + if row.get("instant_analysis"): + return "high" + return "default" if row["priority"] else "low" + + async def check_batch_resources(to_parse: list[Record]) -> None: """Check a batch of resources""" context.monitor().set_status("Checking resources...") @@ -46,7 +52,7 @@ async def check_batch_resources(to_parse: list[Record]) -> None: url=row["url"], resource=row, session=session, - worker_priority="default" if row["priority"] else "low", + worker_priority=_batch_worker_priority(row), ) ) for task in asyncio.as_completed(tasks): diff --git a/udata_hydra/crawl/preprocess_check_data.py b/udata_hydra/crawl/preprocess_check_data.py index ed47c103..d68d7c39 100644 --- a/udata_hydra/crawl/preprocess_check_data.py +++ b/udata_hydra/crawl/preprocess_check_data.py @@ -72,7 +72,10 @@ async def preprocess_check_data(dataset_id: str, check_data: dict) -> tuple[dict # Update resource following check: # Reset resource status so that it's not forbidden to be checked again. # Reset priority so that it's not prioritised anymore. - await Resource.update(check_data["resource_id"], data={"status": None, "priority": False}) + await Resource.update( + check_data["resource_id"], + data={"status": None, "priority": False, "instant_analysis": False}, + ) return new_check, last_check diff --git a/udata_hydra/crawl/select_batch.py b/udata_hydra/crawl/select_batch.py index ac35c383..f1ad69ee 100644 --- a/udata_hydra/crawl/select_batch.py +++ b/udata_hydra/crawl/select_batch.py @@ -46,27 +46,29 @@ async def select_batch_resources_to_check() -> list[Record]: # first resources that are prioritised q = f""" SELECT * FROM ( - SELECT catalog.url, dataset_id, resource_id, priority + SELECT catalog.url, dataset_id, resource_id, priority, + catalog.instant_analysis, catalog.status_since FROM catalog WHERE {excluded} AND priority = True ) s - ORDER BY random() LIMIT {config.BATCH_SIZE} + ORDER BY instant_analysis DESC, status_since ASC NULLS LAST, random() LIMIT {config.BATCH_SIZE} """ to_check: list[Record] = await select_rows_based_on_query(connection, q) # then resources with no last check - # (either because they have never been checked before, or because the last check has been deleted) + # (either because they have never been checked before, or because the last check was deleted) if len(to_check) < config.BATCH_SIZE: q = f""" SELECT * FROM ( - SELECT catalog.url, dataset_id, resource_id, priority + SELECT catalog.url, dataset_id, resource_id, priority, + catalog.instant_analysis, catalog.status_since FROM catalog WHERE catalog.last_check IS NULL AND {excluded} AND priority = False ) s - ORDER BY random() LIMIT {config.BATCH_SIZE} + ORDER BY instant_analysis DESC, status_since ASC NULLS LAST, random() LIMIT {config.BATCH_SIZE} """ to_check += await select_rows_based_on_query(connection, q) @@ -76,7 +78,8 @@ async def select_batch_resources_to_check() -> list[Record]: limit = config.BATCH_SIZE - len(to_check) q = f""" SELECT * FROM ( - SELECT catalog.url, dataset_id, catalog.resource_id, catalog.priority + SELECT catalog.url, dataset_id, catalog.resource_id, catalog.priority, + catalog.instant_analysis, catalog.status_since FROM catalog, checks WHERE catalog.last_check IS NOT NULL AND catalog.last_check = checks.id @@ -84,7 +87,7 @@ async def select_batch_resources_to_check() -> list[Record]: AND {excluded} AND catalog.priority = False ) s - ORDER BY random() LIMIT {limit} + ORDER BY instant_analysis DESC, status_since ASC NULLS LAST, random() LIMIT {limit} """ to_check += await select_rows_based_on_query(connection, q, now) diff --git a/udata_hydra/db/resource.py b/udata_hydra/db/resource.py index 42c99a82..b0db3ec7 100644 --- a/udata_hydra/db/resource.py +++ b/udata_hydra/db/resource.py @@ -38,6 +38,26 @@ async def get(cls, resource_id: str, column_name: str = "*") -> Record | None: q = f"""SELECT {column_name} FROM catalog WHERE resource_id = '{resource_id}';""" return await connection.fetchrow(q) + @classmethod + async def claim_for_crawl(cls, resource_id: str) -> Record | None: + """Mark the resource as CRAWLING_URL if it is eligible for batch selection. + + Eligible means: not deleted and ``status`` is NULL or BACKOFF (same idea as + ``get_excluded_clause``). Returns the updated row, or None if missing / not eligible. + """ + pool = await context.pool() + now = datetime.now(timezone.utc) + async with pool.acquire() as connection: + q = """ + UPDATE catalog + SET status = 'CRAWLING_URL', status_since = $2 + WHERE resource_id = $1 + AND deleted = FALSE + AND (status IS NULL OR status = 'BACKOFF') + RETURNING *; + """ + return await connection.fetchrow(q, resource_id, now) + @classmethod async def insert( cls, @@ -49,6 +69,7 @@ async def insert( title: str, status: str | None = None, priority: bool = True, + instant_analysis: bool = False, ) -> Record | None: if status and status not in cls.STATUSES.keys(): raise ValueError(f"Invalid status: {status}") @@ -57,8 +78,8 @@ async def insert( async with pool.acquire() as connection: # Insert new resource in catalog table and mark as high priority for crawling q = """ - INSERT INTO catalog (dataset_id, resource_id, url, type, format, deleted, status, priority, title) - VALUES ($1, $2, $3, $4, $5, FALSE, $6, $7, $8) + INSERT INTO catalog (dataset_id, resource_id, url, type, format, deleted, status, priority, title, instant_analysis) + VALUES ($1, $2, $3, $4, $5, FALSE, $6, $7, $8, $9) ON CONFLICT (resource_id) DO UPDATE SET dataset_id = $1, url = $3, @@ -67,10 +88,20 @@ async def insert( deleted = FALSE, status = $6, priority = $7, - title = $8 + title = $8, + instant_analysis = $9 RETURNING *;""" return await connection.fetchrow( - q, dataset_id, resource_id, url, type, format, status, priority, title + q, + dataset_id, + resource_id, + url, + type, + format, + status, + priority, + title, + instant_analysis, ) @classmethod @@ -102,6 +133,7 @@ async def update_or_insert( title: str, status: str | None = None, priority: bool = True, # Make resource high priority by default for crawling + instant_analysis: bool | None = None, ) -> Record | None: if status and status not in cls.STATUSES.keys(): raise ValueError(f"Invalid status: {status}") @@ -110,27 +142,49 @@ async def update_or_insert( async with pool.acquire() as connection: # Check if resource is in catalog then insert or update into table if await Resource.get(resource_id): + if instant_analysis is not None: + q = """ + UPDATE catalog + SET dataset_id = $1, url = $3, type = $4, format=$5, status = $6, priority = $7, title = $8, instant_analysis = $9 + WHERE resource_id = $2 + RETURNING *;""" + return await connection.fetchrow( + q, + dataset_id, + resource_id, + url, + type, + format, + status, + priority, + title, + instant_analysis, + ) q = """ UPDATE catalog SET dataset_id = $1, url = $3, type = $4, format=$5, status = $6, priority = $7, title = $8 WHERE resource_id = $2 RETURNING *;""" - else: - q = """ - INSERT INTO catalog (dataset_id, resource_id, url, type, format, deleted, status, priority, title) - VALUES ($1, $2, $3, $4, $5, FALSE, $6, $7, $8) - ON CONFLICT (resource_id) DO UPDATE SET - dataset_id = $1, - url = $3, - type = $4, - format = $5, - deleted = FALSE, - status = $6, - priority = $7, - title = $8 - RETURNING *;""" + return await connection.fetchrow( + q, dataset_id, resource_id, url, type, format, status, priority, title + ) + ia = instant_analysis if instant_analysis is not None else False + q = """ + INSERT INTO catalog (dataset_id, resource_id, url, type, format, deleted, status, priority, title, instant_analysis) + VALUES ($1, $2, $3, $4, $5, FALSE, $6, $7, $8, $9) + ON CONFLICT (resource_id) DO UPDATE SET + dataset_id = $1, + url = $3, + type = $4, + format = $5, + deleted = FALSE, + status = $6, + priority = $7, + title = $8, + instant_analysis = $9 + RETURNING *;""" return await connection.fetchrow( - q, dataset_id, resource_id, url, type, format, status, priority, title + q, dataset_id, resource_id, url, type, format, status, priority, title, ia ) @classmethod diff --git a/udata_hydra/migrations/main/20260403_add_instant_analysis_catalog.sql b/udata_hydra/migrations/main/20260403_add_instant_analysis_catalog.sql new file mode 100644 index 00000000..ed1ef9d9 --- /dev/null +++ b/udata_hydra/migrations/main/20260403_add_instant_analysis_catalog.sql @@ -0,0 +1,2 @@ +-- Optional flag from udata webhook: request high-priority RQ path for first analysis (#386) +ALTER TABLE catalog ADD COLUMN IF NOT EXISTS instant_analysis BOOLEAN NOT NULL DEFAULT FALSE; diff --git a/udata_hydra/routes/resources.py b/udata_hydra/routes/resources.py index c99f7d7d..064378d8 100644 --- a/udata_hydra/routes/resources.py +++ b/udata_hydra/routes/resources.py @@ -1,12 +1,44 @@ +import asyncio import json +import logging import uuid +import aiohttp from aiohttp import web from asyncpg import Record +from udata_hydra import config +from udata_hydra.crawl.check_resources import check_resource from udata_hydra.db.resource import Resource from udata_hydra.schemas import ResourceDocumentSchema, ResourceSchema +log = logging.getLogger(__name__) + + +async def _immediate_check_resource(resource_id: str) -> None: + """Run check_resource in the background (same RQ tier as POST /api/checks).""" + resource: Record | None = None + try: + resource = await Resource.claim_for_crawl(resource_id) + if resource is None: + return + async with aiohttp.ClientSession( + timeout=None, + headers={"user-agent": config.USER_AGENT_FULL}, + ) as session: + await check_resource( + url=resource["url"], + resource=resource, + session=session, + worker_priority="high", + ) + except Exception: + log.exception(f"Immediate resource check failed for {resource_id}") + if resource is not None: + row = await Resource.get(resource_id, "status") + if row is not None and row["status"] == "CRAWLING_URL": + await Resource.update(resource_id, {"status": None}) + async def get_resource(request: web.Request) -> web.Response: """Endpoint to get a resource from the DB @@ -44,6 +76,7 @@ async def create_resource(request: web.Request) -> web.Response: dataset_id = valid_payload["dataset_id"] resource_id = valid_payload["resource_id"] + instant_analysis: bool = bool(valid_payload.get("instant_analysis", False)) await Resource.insert( dataset_id=dataset_id, @@ -53,8 +86,15 @@ async def create_resource(request: web.Request) -> web.Response: format=document["format"], priority=True, title=document["title"], + instant_analysis=instant_analysis, ) + if instant_analysis: + background_tasks = request.app["background_tasks"] + task = asyncio.create_task(_immediate_check_resource(resource_id)) + background_tasks.add(task) + task.add_done_callback(background_tasks.discard) + return web.json_response(ResourceDocumentSchema().dump(dict(document)), status=201) @@ -77,6 +117,9 @@ async def update_resource(request: web.Request) -> web.Response: dataset_id: str = valid_payload["dataset_id"] resource_id: str = valid_payload["resource_id"] + instant_analysis: bool | None = ( + valid_payload["instant_analysis"] if "instant_analysis" in valid_payload else None + ) await Resource.update_or_insert( dataset_id=dataset_id, @@ -85,6 +128,7 @@ async def update_resource(request: web.Request) -> web.Response: type=document["type"], format=document["format"], title=document["title"], + instant_analysis=instant_analysis, ) return web.json_response(ResourceDocumentSchema().dump(document), status=200) diff --git a/udata_hydra/schemas/resource.py b/udata_hydra/schemas/resource.py index bc5a64e9..0d91484b 100644 --- a/udata_hydra/schemas/resource.py +++ b/udata_hydra/schemas/resource.py @@ -1,4 +1,4 @@ -from marshmallow import Schema, fields +from marshmallow import EXCLUDE, Schema, fields class ResourceDocumentSchema(Schema): @@ -21,8 +21,12 @@ class ResourceDocumentSchema(Schema): class ResourceSchema(Schema): + class Meta: + unknown = EXCLUDE + dataset_id = fields.Str(required=True) resource_id = fields.Str(required=True) status = fields.Str(required=False) status_since = fields.DateTime(required=False) document = fields.Nested(ResourceDocumentSchema(), allow_none=True) + instant_analysis = fields.Bool(required=False)