Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 7 additions & 2 deletions juniper_data/api/routes/datasets.py
Original file line number Diff line number Diff line change
Expand Up @@ -426,8 +426,13 @@ async def batch_update_tags(
updated: list[str] = []
not_found: list[str] = []

# BUG-JD-10 (2026-05-05 audit): ``store.get_meta`` and
# ``store.update_meta`` are synchronous filesystem I/O. Offload
# each call to a thread so the FastAPI event loop stays
# responsive during large batches — same pattern as
# ``generator_class.generate`` above.
for dataset_id in request.dataset_ids:
meta = store.get_meta(dataset_id)
meta = await asyncio.to_thread(store.get_meta, dataset_id)
if meta is None:
not_found.append(dataset_id)
continue
Expand All @@ -436,7 +441,7 @@ async def batch_update_tags(
current_tags.update(request.add_tags)
current_tags -= set(request.remove_tags)
meta.tags = sorted(current_tags)
store.update_meta(dataset_id, meta)
await asyncio.to_thread(store.update_meta, dataset_id, meta)
updated.append(dataset_id)

return BatchUpdateTagsResponse(
Expand Down
59 changes: 59 additions & 0 deletions juniper_data/tests/api/test_batch_operations.py
Original file line number Diff line number Diff line change
Expand Up @@ -350,6 +350,65 @@ def test_empty_dataset_ids_returns_422(self, client: TestClient) -> None:

assert response.status_code == 422

def test_store_calls_run_off_event_loop(self, client: TestClient, memory_store: InMemoryDatasetStore) -> None:
"""BUG-JD-10 (2026-05-05 audit): ``store.get_meta`` and
``store.update_meta`` must run off the FastAPI event loop so
large batches don't block. Pre-fix they were called
synchronously inside the ``async def`` route handler.

Wraps both methods to record which thread they executed on.
``await asyncio.to_thread(...)`` dispatches to a worker
thread — the call should never run on ``asyncio``'s event
loop thread.
"""
import threading

# Seed two real datasets so the store actually has metadata
# to read; avoids ``not_found`` short-circuiting the call path.
id1 = _create_spiral(client, seed=200, tags=["original"])
id2 = _create_spiral(client, seed=201, tags=["original"])

called_threads: list[str] = []
original_get_meta = memory_store.get_meta
original_update_meta = memory_store.update_meta

def recording_get_meta(dataset_id):
called_threads.append(f"get:{threading.current_thread().name}")
return original_get_meta(dataset_id)

def recording_update_meta(dataset_id, meta):
called_threads.append(f"update:{threading.current_thread().name}")
return original_update_meta(dataset_id, meta)

memory_store.get_meta = recording_get_meta # type: ignore[method-assign]
memory_store.update_meta = recording_update_meta # type: ignore[method-assign]

try:
response = client.patch(
"/v1/datasets/batch-tags",
json={"dataset_ids": [id1, id2], "add_tags": ["wrapped-call"]},
)
assert response.status_code == 200, response.text
finally:
memory_store.get_meta = original_get_meta # type: ignore[method-assign]
memory_store.update_meta = original_update_meta # type: ignore[method-assign]

# Sanity: both methods got called twice (once per dataset id).
get_calls = [t for t in called_threads if t.startswith("get:")]
update_calls = [t for t in called_threads if t.startswith("update:")]
assert len(get_calls) == 2, f"expected 2 get_meta calls, got {get_calls}"
assert len(update_calls) == 2, f"expected 2 update_meta calls, got {update_calls}"

# Each call must have run on a worker thread, not the main
# thread. ``asyncio.to_thread`` uses a default executor whose
# threads are named ``asyncio_*`` or ``ThreadPoolExecutor-*``;
# if any call shows up on the main thread, the wrapping was
# lost (the regression that BUG-JD-10 caught).
main_thread_name = threading.main_thread().name
for tag in called_threads:
thread = tag.split(":", 1)[1]
assert thread != main_thread_name, f"BUG-JD-10 regression: store call ran on the main thread ({thread!r}), expected an asyncio worker thread. Was the ``await asyncio.to_thread(...)`` wrapping removed from ``batch_update_tags``?"


# ---------------------------------------------------------------------------
# POST /v1/datasets/batch-export
Expand Down
Loading