diff --git a/juniper_data/api/routes/datasets.py b/juniper_data/api/routes/datasets.py index 56eb95a..86e05f5 100644 --- a/juniper_data/api/routes/datasets.py +++ b/juniper_data/api/routes/datasets.py @@ -426,8 +426,13 @@ async def batch_update_tags( updated: list[str] = [] not_found: list[str] = [] + # BUG-JD-10 (2026-05-05 audit): ``store.get_meta`` and + # ``store.update_meta`` are synchronous filesystem I/O. Offload + # each call to a thread so the FastAPI event loop stays + # responsive during large batches — same pattern as + # ``generator_class.generate`` above. for dataset_id in request.dataset_ids: - meta = store.get_meta(dataset_id) + meta = await asyncio.to_thread(store.get_meta, dataset_id) if meta is None: not_found.append(dataset_id) continue @@ -436,7 +441,7 @@ async def batch_update_tags( current_tags.update(request.add_tags) current_tags -= set(request.remove_tags) meta.tags = sorted(current_tags) - store.update_meta(dataset_id, meta) + await asyncio.to_thread(store.update_meta, dataset_id, meta) updated.append(dataset_id) return BatchUpdateTagsResponse( diff --git a/juniper_data/tests/api/test_batch_operations.py b/juniper_data/tests/api/test_batch_operations.py index 6725ef1..ed489fe 100644 --- a/juniper_data/tests/api/test_batch_operations.py +++ b/juniper_data/tests/api/test_batch_operations.py @@ -350,6 +350,65 @@ def test_empty_dataset_ids_returns_422(self, client: TestClient) -> None: assert response.status_code == 422 + def test_store_calls_run_off_event_loop(self, client: TestClient, memory_store: InMemoryDatasetStore) -> None: + """BUG-JD-10 (2026-05-05 audit): ``store.get_meta`` and + ``store.update_meta`` must run off the FastAPI event loop so + large batches don't block. Pre-fix they were called + synchronously inside the ``async def`` route handler. + + Wraps both methods to record which thread they executed on. + ``await asyncio.to_thread(...)`` dispatches to a worker + thread — the call should never run on ``asyncio``'s event + loop thread. + """ + import threading + + # Seed two real datasets so the store actually has metadata + # to read; avoids ``not_found`` short-circuiting the call path. + id1 = _create_spiral(client, seed=200, tags=["original"]) + id2 = _create_spiral(client, seed=201, tags=["original"]) + + called_threads: list[str] = [] + original_get_meta = memory_store.get_meta + original_update_meta = memory_store.update_meta + + def recording_get_meta(dataset_id): + called_threads.append(f"get:{threading.current_thread().name}") + return original_get_meta(dataset_id) + + def recording_update_meta(dataset_id, meta): + called_threads.append(f"update:{threading.current_thread().name}") + return original_update_meta(dataset_id, meta) + + memory_store.get_meta = recording_get_meta # type: ignore[method-assign] + memory_store.update_meta = recording_update_meta # type: ignore[method-assign] + + try: + response = client.patch( + "/v1/datasets/batch-tags", + json={"dataset_ids": [id1, id2], "add_tags": ["wrapped-call"]}, + ) + assert response.status_code == 200, response.text + finally: + memory_store.get_meta = original_get_meta # type: ignore[method-assign] + memory_store.update_meta = original_update_meta # type: ignore[method-assign] + + # Sanity: both methods got called twice (once per dataset id). + get_calls = [t for t in called_threads if t.startswith("get:")] + update_calls = [t for t in called_threads if t.startswith("update:")] + assert len(get_calls) == 2, f"expected 2 get_meta calls, got {get_calls}" + assert len(update_calls) == 2, f"expected 2 update_meta calls, got {update_calls}" + + # Each call must have run on a worker thread, not the main + # thread. ``asyncio.to_thread`` uses a default executor whose + # threads are named ``asyncio_*`` or ``ThreadPoolExecutor-*``; + # if any call shows up on the main thread, the wrapping was + # lost (the regression that BUG-JD-10 caught). + main_thread_name = threading.main_thread().name + for tag in called_threads: + thread = tag.split(":", 1)[1] + assert thread != main_thread_name, f"BUG-JD-10 regression: store call ran on the main thread ({thread!r}), expected an asyncio worker thread. Was the ``await asyncio.to_thread(...)`` wrapping removed from ``batch_update_tags``?" + # --------------------------------------------------------------------------- # POST /v1/datasets/batch-export