From 6f97f0f39a3a2b55ea0d837a28b2f3d4bc536236 Mon Sep 17 00:00:00 2001 From: Paul Calnon Date: Tue, 5 May 2026 23:00:20 -0500 Subject: [PATCH] fix(api): wrap batch_update_tags store calls in asyncio.to_thread (BUG-JD-10) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit ``PATCH /v1/datasets/batch-tags`` is an ``async def`` route handler that called ``store.get_meta(...)`` and ``store.update_meta(...)`` synchronously inside a per-dataset loop. Those are filesystem I/O calls; running them on the FastAPI event loop blocks every other in-flight request for the duration of the batch. The async-wrap pattern (``await asyncio.to_thread(...)``) is applied elsewhere in the same file (line 142, ``generator_class.generate``) but was missed on this one route. Surfaced by the 2026-05-05 roadmap audit (``notes/ROADMAP_AUDIT_2026-05-05.md`` ยง4.1, BUG-JD-10). Fix --- Wrap each ``store.get_meta`` and ``store.update_meta`` call in ``await asyncio.to_thread(...)`` so the synchronous I/O runs on the default executor while the event loop stays free to service other requests. The user-visible behaviour is unchanged; only the event-loop fairness improves. Tests ----- - New regression case ``test_store_calls_run_off_event_loop`` โ€” patches both store methods to record their executing thread, drives a 2-dataset batch through the route, and asserts every call ran on a worker thread (not the main thread). Future removal of the ``asyncio.to_thread`` wrapping fails the test with a clear "BUG-JD-10 regression" message naming the expected wrapping. - Existing 6 cases in ``TestBatchUpdateTags`` still pass; total 7 cases green. Co-Authored-By: Claude Opus 4.7 (1M context) --- juniper_data/api/routes/datasets.py | 9 ++- .../tests/api/test_batch_operations.py | 59 +++++++++++++++++++ 2 files changed, 66 insertions(+), 2 deletions(-) diff --git a/juniper_data/api/routes/datasets.py b/juniper_data/api/routes/datasets.py index 56eb95a..86e05f5 100644 --- a/juniper_data/api/routes/datasets.py +++ b/juniper_data/api/routes/datasets.py @@ -426,8 +426,13 @@ async def batch_update_tags( updated: list[str] = [] not_found: list[str] = [] + # BUG-JD-10 (2026-05-05 audit): ``store.get_meta`` and + # ``store.update_meta`` are synchronous filesystem I/O. Offload + # each call to a thread so the FastAPI event loop stays + # responsive during large batches โ€” same pattern as + # ``generator_class.generate`` above. for dataset_id in request.dataset_ids: - meta = store.get_meta(dataset_id) + meta = await asyncio.to_thread(store.get_meta, dataset_id) if meta is None: not_found.append(dataset_id) continue @@ -436,7 +441,7 @@ async def batch_update_tags( current_tags.update(request.add_tags) current_tags -= set(request.remove_tags) meta.tags = sorted(current_tags) - store.update_meta(dataset_id, meta) + await asyncio.to_thread(store.update_meta, dataset_id, meta) updated.append(dataset_id) return BatchUpdateTagsResponse( diff --git a/juniper_data/tests/api/test_batch_operations.py b/juniper_data/tests/api/test_batch_operations.py index 6725ef1..ed489fe 100644 --- a/juniper_data/tests/api/test_batch_operations.py +++ b/juniper_data/tests/api/test_batch_operations.py @@ -350,6 +350,65 @@ def test_empty_dataset_ids_returns_422(self, client: TestClient) -> None: assert response.status_code == 422 + def test_store_calls_run_off_event_loop(self, client: TestClient, memory_store: InMemoryDatasetStore) -> None: + """BUG-JD-10 (2026-05-05 audit): ``store.get_meta`` and + ``store.update_meta`` must run off the FastAPI event loop so + large batches don't block. Pre-fix they were called + synchronously inside the ``async def`` route handler. + + Wraps both methods to record which thread they executed on. + ``await asyncio.to_thread(...)`` dispatches to a worker + thread โ€” the call should never run on ``asyncio``'s event + loop thread. + """ + import threading + + # Seed two real datasets so the store actually has metadata + # to read; avoids ``not_found`` short-circuiting the call path. + id1 = _create_spiral(client, seed=200, tags=["original"]) + id2 = _create_spiral(client, seed=201, tags=["original"]) + + called_threads: list[str] = [] + original_get_meta = memory_store.get_meta + original_update_meta = memory_store.update_meta + + def recording_get_meta(dataset_id): + called_threads.append(f"get:{threading.current_thread().name}") + return original_get_meta(dataset_id) + + def recording_update_meta(dataset_id, meta): + called_threads.append(f"update:{threading.current_thread().name}") + return original_update_meta(dataset_id, meta) + + memory_store.get_meta = recording_get_meta # type: ignore[method-assign] + memory_store.update_meta = recording_update_meta # type: ignore[method-assign] + + try: + response = client.patch( + "/v1/datasets/batch-tags", + json={"dataset_ids": [id1, id2], "add_tags": ["wrapped-call"]}, + ) + assert response.status_code == 200, response.text + finally: + memory_store.get_meta = original_get_meta # type: ignore[method-assign] + memory_store.update_meta = original_update_meta # type: ignore[method-assign] + + # Sanity: both methods got called twice (once per dataset id). + get_calls = [t for t in called_threads if t.startswith("get:")] + update_calls = [t for t in called_threads if t.startswith("update:")] + assert len(get_calls) == 2, f"expected 2 get_meta calls, got {get_calls}" + assert len(update_calls) == 2, f"expected 2 update_meta calls, got {update_calls}" + + # Each call must have run on a worker thread, not the main + # thread. ``asyncio.to_thread`` uses a default executor whose + # threads are named ``asyncio_*`` or ``ThreadPoolExecutor-*``; + # if any call shows up on the main thread, the wrapping was + # lost (the regression that BUG-JD-10 caught). + main_thread_name = threading.main_thread().name + for tag in called_threads: + thread = tag.split(":", 1)[1] + assert thread != main_thread_name, f"BUG-JD-10 regression: store call ran on the main thread ({thread!r}), expected an asyncio worker thread. Was the ``await asyncio.to_thread(...)`` wrapping removed from ``batch_update_tags``?" + # --------------------------------------------------------------------------- # POST /v1/datasets/batch-export