Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
39 changes: 39 additions & 0 deletions tests/test_api/test_api_resources.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
import pytest

from tests.conftest import DATASET_ID, NOT_EXISTING_RESOURCE_ID, RESOURCE_ID, RESOURCE_URL
from udata_hydra.routes import resources as resources_routes

pytestmark = pytest.mark.asyncio

Expand All @@ -29,6 +30,7 @@ async def test_get_resource(setup_catalog, client):
assert data["resource_id"] == RESOURCE_ID
assert data["status"] is None
assert data["status_since"] is None
assert data["instant_analysis"] is False


async def test_create_resource(
Expand Down Expand Up @@ -67,6 +69,43 @@ async def test_create_resource(
assert text == "Missing document body"


async def test_create_resource_instant_analysis(
client, api_headers, udata_resource_payload, db, mocker
):
"""Optional webhook flag stores instant_analysis and schedules a background check (#386)."""
mock_create_task = mocker.patch("udata_hydra.routes.resources.asyncio.create_task")
payload = {**udata_resource_payload, "instant_analysis": True}
resp = await client.post(path="/api/resources/", headers=api_headers, json=payload)
assert resp.status == 201
rid = payload["resource_id"]
row = await db.fetchrow("SELECT instant_analysis FROM catalog WHERE resource_id = $1", rid)
assert row is not None
assert row["instant_analysis"] is True
mock_create_task.assert_called_once()


async def test_create_resource_without_instant_analysis_no_background_task(
client, api_headers, udata_resource_payload, mocker
):
mock_create_task = mocker.patch("udata_hydra.routes.resources.asyncio.create_task")
payload = {**udata_resource_payload}
resp = await client.post(path="/api/resources/", headers=api_headers, json=payload)
assert resp.status == 201
mock_create_task.assert_not_called()


async def test_immediate_check_clears_crawling_url_when_check_raises(setup_catalog, db, mocker):
mocker.patch.object(
resources_routes,
"check_resource",
side_effect=RuntimeError("simulated failure"),
)
await resources_routes._immediate_check_resource(RESOURCE_ID)
row = await db.fetchrow("SELECT status FROM catalog WHERE resource_id = $1", RESOURCE_ID)
assert row is not None
assert row["status"] is None


async def test_update_resource(client, api_headers, api_headers_wrong_token):
# Test invalid PUT data
stupid_post_data: dict = {"stupid": "stupid"}
Expand Down
13 changes: 10 additions & 3 deletions tests/test_crawl/test_crawl.py
Original file line number Diff line number Diff line change
Expand Up @@ -797,12 +797,15 @@ async def test_reset_statuses(fake_check, db, setup_catalog, check_duration):
assert row["status"] == status


@pytest.mark.parametrize(
"instant_analysis,expected_worker_priority", [(False, "default"), (True, "high")]
)
@pytest.mark.parametrize(
"mock_function",
[
(
"udata_hydra.crawl.check_resources.check_resource",
{"url": ANY, "resource": ANY, "session": ANY, "worker_priority": "default"},
{"url": ANY, "resource": ANY, "session": ANY},
"ok",
),
(
Expand All @@ -811,7 +814,6 @@ async def test_reset_statuses(fake_check, db, setup_catalog, check_duration):
"check": ANY,
"last_check": ANY,
"force_analysis": False,
"worker_priority": "default",
},
None,
),
Expand All @@ -826,16 +828,21 @@ async def test_new_resource_priority(
produce_mock,
api_headers,
mock_function,
instant_analysis,
expected_worker_priority,
):
func_path, kwargs, result = mock_function
kwargs = {**kwargs, "worker_priority": expected_worker_priority}
# delete the catalog content, we only want to test the new resource
await db.execute("DELETE FROM catalog")
rurl = udata_resource_payload["document"]["url"]
rmock.head(rurl, headers={"content-length": "1"})
res = await client.post(path="/api/resources", headers=api_headers, json=udata_resource_payload)
payload = {**udata_resource_payload, "instant_analysis": instant_analysis}
res = await client.post(path="/api/resources", headers=api_headers, json=payload)
assert res.status == 201
res = await db.fetch("SELECT * FROM catalog")
assert len(res) == 1 and res[0]["priority"] is True
assert res[0]["instant_analysis"] is instant_analysis
# we have to mock the functions separately, because they are intricated
with patch(func_path) as mock_func:
mock_func.return_value = result
Expand Down
35 changes: 34 additions & 1 deletion tests/test_db/test_db_resources.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,15 +2,48 @@
import nest_asyncio2 as nest_asyncio
import pytest

from tests.conftest import DATABASE_URL, DATASET_ID, RESOURCE_ID, RESOURCE_URL
from tests.conftest import (
DATABASE_URL,
DATASET_ID,
NOT_EXISTING_RESOURCE_ID,
RESOURCE_ID,
RESOURCE_URL,
)
from udata_hydra.cli import load_catalog
from udata_hydra.crawl import start_checks
from udata_hydra.db.resource import Resource

pytestmark = pytest.mark.asyncio
# allows nested async to test async with async :mindblown:
nest_asyncio.apply()


async def test_claim_for_crawl_sets_crawling_url(setup_catalog):
row = await Resource.claim_for_crawl(RESOURCE_ID)
assert row is not None
assert row["status"] == "CRAWLING_URL"
assert row["status_since"] is not None


async def test_claim_for_crawl_returns_none_when_already_crawling(setup_catalog):
assert await Resource.claim_for_crawl(RESOURCE_ID) is not None
assert await Resource.claim_for_crawl(RESOURCE_ID) is None


async def test_claim_for_crawl_from_backoff(setup_catalog, db):
await db.execute(
"UPDATE catalog SET status = 'BACKOFF' WHERE resource_id = $1",
RESOURCE_ID,
)
row = await Resource.claim_for_crawl(RESOURCE_ID)
assert row is not None
assert row["status"] == "CRAWLING_URL"


async def test_claim_for_crawl_missing_resource(setup_catalog):
assert await Resource.claim_for_crawl(NOT_EXISTING_RESOURCE_ID) is None


async def test_catalog(setup_catalog, db):
res = await db.fetch(
"SELECT * FROM catalog WHERE resource_id = $1",
Expand Down
1 change: 1 addition & 0 deletions udata_hydra/app.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ async def app_factory() -> web.Application:
async def app_startup(app):
app["pool"] = await context.pool()
app["started_at"] = datetime.now(timezone.utc)
app["background_tasks"] = set()

async def app_cleanup(app):
if "pool" in app:
Expand Down
8 changes: 7 additions & 1 deletion udata_hydra/crawl/check_resources.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,12 @@
results: defaultdict = defaultdict(int)


def _batch_worker_priority(row: Record) -> str:
if row.get("instant_analysis"):
return "high"
return "default" if row["priority"] else "low"


async def check_batch_resources(to_parse: list[Record]) -> None:
"""Check a batch of resources"""
context.monitor().set_status("Checking resources...")
Expand All @@ -46,7 +52,7 @@ async def check_batch_resources(to_parse: list[Record]) -> None:
url=row["url"],
resource=row,
session=session,
worker_priority="default" if row["priority"] else "low",
worker_priority=_batch_worker_priority(row),
)
)
for task in asyncio.as_completed(tasks):
Expand Down
5 changes: 4 additions & 1 deletion udata_hydra/crawl/preprocess_check_data.py
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,10 @@ async def preprocess_check_data(dataset_id: str, check_data: dict) -> tuple[dict
# Update resource following check:
# Reset resource status so that it's not forbidden to be checked again.
# Reset priority so that it's not prioritised anymore.
await Resource.update(check_data["resource_id"], data={"status": None, "priority": False})
await Resource.update(
check_data["resource_id"],
data={"status": None, "priority": False, "instant_analysis": False},
)

return new_check, last_check

Expand Down
17 changes: 10 additions & 7 deletions udata_hydra/crawl/select_batch.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,27 +46,29 @@ async def select_batch_resources_to_check() -> list[Record]:
# first resources that are prioritised
q = f"""
SELECT * FROM (
SELECT catalog.url, dataset_id, resource_id, priority
SELECT catalog.url, dataset_id, resource_id, priority,
catalog.instant_analysis, catalog.status_since
FROM catalog
WHERE {excluded}
AND priority = True
) s
ORDER BY random() LIMIT {config.BATCH_SIZE}
ORDER BY instant_analysis DESC, status_since ASC NULLS LAST, random() LIMIT {config.BATCH_SIZE}
"""
to_check: list[Record] = await select_rows_based_on_query(connection, q)

# then resources with no last check
# (either because they have never been checked before, or because the last check has been deleted)
# (either because they have never been checked before, or because the last check was deleted)
if len(to_check) < config.BATCH_SIZE:
q = f"""
SELECT * FROM (
SELECT catalog.url, dataset_id, resource_id, priority
SELECT catalog.url, dataset_id, resource_id, priority,
catalog.instant_analysis, catalog.status_since
FROM catalog
WHERE catalog.last_check IS NULL
AND {excluded}
AND priority = False
) s
ORDER BY random() LIMIT {config.BATCH_SIZE}
ORDER BY instant_analysis DESC, status_since ASC NULLS LAST, random() LIMIT {config.BATCH_SIZE}
"""
to_check += await select_rows_based_on_query(connection, q)

Expand All @@ -76,15 +78,16 @@ async def select_batch_resources_to_check() -> list[Record]:
limit = config.BATCH_SIZE - len(to_check)
q = f"""
SELECT * FROM (
SELECT catalog.url, dataset_id, catalog.resource_id, catalog.priority
SELECT catalog.url, dataset_id, catalog.resource_id, catalog.priority,
catalog.instant_analysis, catalog.status_since
FROM catalog, checks
WHERE catalog.last_check IS NOT NULL
AND catalog.last_check = checks.id
AND (checks.next_check_at <= $1 OR checks.next_check_at IS NULL)
AND {excluded}
AND catalog.priority = False
) s
ORDER BY random() LIMIT {limit}
ORDER BY instant_analysis DESC, status_since ASC NULLS LAST, random() LIMIT {limit}
"""
to_check += await select_rows_based_on_query(connection, q, now)

Expand Down
92 changes: 73 additions & 19 deletions udata_hydra/db/resource.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,26 @@ async def get(cls, resource_id: str, column_name: str = "*") -> Record | None:
q = f"""SELECT {column_name} FROM catalog WHERE resource_id = '{resource_id}';"""
return await connection.fetchrow(q)

@classmethod
async def claim_for_crawl(cls, resource_id: str) -> Record | None:
"""Mark the resource as CRAWLING_URL if it is eligible for batch selection.

Eligible means: not deleted and ``status`` is NULL or BACKOFF (same idea as
``get_excluded_clause``). Returns the updated row, or None if missing / not eligible.
"""
pool = await context.pool()
now = datetime.now(timezone.utc)
async with pool.acquire() as connection:
q = """
UPDATE catalog
SET status = 'CRAWLING_URL', status_since = $2
WHERE resource_id = $1
AND deleted = FALSE
AND (status IS NULL OR status = 'BACKOFF')
RETURNING *;
"""
return await connection.fetchrow(q, resource_id, now)

@classmethod
async def insert(
cls,
Expand All @@ -49,6 +69,7 @@ async def insert(
title: str,
status: str | None = None,
priority: bool = True,
instant_analysis: bool = False,
) -> Record | None:
if status and status not in cls.STATUSES.keys():
raise ValueError(f"Invalid status: {status}")
Expand All @@ -57,8 +78,8 @@ async def insert(
async with pool.acquire() as connection:
# Insert new resource in catalog table and mark as high priority for crawling
q = """
INSERT INTO catalog (dataset_id, resource_id, url, type, format, deleted, status, priority, title)
VALUES ($1, $2, $3, $4, $5, FALSE, $6, $7, $8)
INSERT INTO catalog (dataset_id, resource_id, url, type, format, deleted, status, priority, title, instant_analysis)
VALUES ($1, $2, $3, $4, $5, FALSE, $6, $7, $8, $9)
ON CONFLICT (resource_id) DO UPDATE SET
dataset_id = $1,
url = $3,
Expand All @@ -67,10 +88,20 @@ async def insert(
deleted = FALSE,
status = $6,
priority = $7,
title = $8
title = $8,
instant_analysis = $9
RETURNING *;"""
return await connection.fetchrow(
q, dataset_id, resource_id, url, type, format, status, priority, title
q,
dataset_id,
resource_id,
url,
type,
format,
status,
priority,
title,
instant_analysis,
)

@classmethod
Expand Down Expand Up @@ -102,6 +133,7 @@ async def update_or_insert(
title: str,
status: str | None = None,
priority: bool = True, # Make resource high priority by default for crawling
instant_analysis: bool | None = None,
) -> Record | None:
if status and status not in cls.STATUSES.keys():
raise ValueError(f"Invalid status: {status}")
Expand All @@ -110,27 +142,49 @@ async def update_or_insert(
async with pool.acquire() as connection:
# Check if resource is in catalog then insert or update into table
if await Resource.get(resource_id):
if instant_analysis is not None:
q = """
UPDATE catalog
SET dataset_id = $1, url = $3, type = $4, format=$5, status = $6, priority = $7, title = $8, instant_analysis = $9
WHERE resource_id = $2
RETURNING *;"""
return await connection.fetchrow(
q,
dataset_id,
resource_id,
url,
type,
format,
status,
priority,
title,
instant_analysis,
)
q = """
UPDATE catalog
SET dataset_id = $1, url = $3, type = $4, format=$5, status = $6, priority = $7, title = $8
WHERE resource_id = $2
RETURNING *;"""
else:
q = """
INSERT INTO catalog (dataset_id, resource_id, url, type, format, deleted, status, priority, title)
VALUES ($1, $2, $3, $4, $5, FALSE, $6, $7, $8)
ON CONFLICT (resource_id) DO UPDATE SET
dataset_id = $1,
url = $3,
type = $4,
format = $5,
deleted = FALSE,
status = $6,
priority = $7,
title = $8
RETURNING *;"""
return await connection.fetchrow(
q, dataset_id, resource_id, url, type, format, status, priority, title
)
ia = instant_analysis if instant_analysis is not None else False
q = """
INSERT INTO catalog (dataset_id, resource_id, url, type, format, deleted, status, priority, title, instant_analysis)
VALUES ($1, $2, $3, $4, $5, FALSE, $6, $7, $8, $9)
ON CONFLICT (resource_id) DO UPDATE SET
dataset_id = $1,
url = $3,
type = $4,
format = $5,
deleted = FALSE,
status = $6,
priority = $7,
title = $8,
instant_analysis = $9
RETURNING *;"""
return await connection.fetchrow(
q, dataset_id, resource_id, url, type, format, status, priority, title
q, dataset_id, resource_id, url, type, format, status, priority, title, ia
)

@classmethod
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
-- Optional flag from udata webhook: request high-priority RQ path for first analysis (#386)
ALTER TABLE catalog ADD COLUMN IF NOT EXISTS instant_analysis BOOLEAN NOT NULL DEFAULT FALSE;
Loading