From d49c1884bfe175b23ae1bb20343f1b34204f4182 Mon Sep 17 00:00:00 2001 From: Ihor Indyk Date: Thu, 22 Jan 2026 09:50:52 -0800 Subject: [PATCH] Cancel buffer join thread on worker shutdown. PiperOrigin-RevId: 859654450 --- .../_src/python/dataset/transformations/process_prefetch.py | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/grain/_src/python/dataset/transformations/process_prefetch.py b/grain/_src/python/dataset/transformations/process_prefetch.py index 4b271ff1e..523b6a167 100644 --- a/grain/_src/python/dataset/transformations/process_prefetch.py +++ b/grain/_src/python/dataset/transformations/process_prefetch.py @@ -126,7 +126,7 @@ def _clear_queue_and_maybe_unlink_shm(q: queues.Queue[Any]) -> int: try: shared_memory_array.unlink_shm(q.get_nowait()) count += 1 - except queue.Empty: + except Exception: # pylint: disable=broad-except return count @@ -261,7 +261,11 @@ def _put_dataset_elements_in_buffer( ) return _clear_queue_and_maybe_unlink_shm(buffer) + buffer.cancel_join_thread() + buffer.close() _clear_queue_and_maybe_unlink_shm(set_state_queue) + set_state_queue.cancel_join_thread() + set_state_queue.close() class _SetStateIsDone: