From 060da4ae2aae230b12d5238011798ac0fcb72858 Mon Sep 17 00:00:00 2001 From: Joseph Hamman Date: Fri, 12 Sep 2025 15:09:27 -0700 Subject: [PATCH 1/8] feature: optionally support uvloop --- pyproject.toml | 1 + src/zarr/core/config.py | 2 +- src/zarr/core/sync.py | 29 ++++++- tests/test_sync.py | 170 +++++++++++++++++++++++++++++++++++++++- 4 files changed, 195 insertions(+), 7 deletions(-) diff --git a/pyproject.toml b/pyproject.toml index bea8d77127..3d4b2f6dc9 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -92,6 +92,7 @@ remote_tests = [ "requests", ] optional = ["rich", "universal-pathlib"] +uvloop = ["uvloop"] docs = [ # Doc building 'sphinx==8.1.3', diff --git a/src/zarr/core/config.py b/src/zarr/core/config.py index cc3c33cd17..48ceb51cf0 100644 --- a/src/zarr/core/config.py +++ b/src/zarr/core/config.py @@ -107,7 +107,7 @@ def enable_gpu(self) -> ConfigSet: "order": "C", "write_empty_chunks": False, }, - "async": {"concurrency": 10, "timeout": None}, + "async": {"concurrency": 10, "timeout": None, "use_uvloop": True}, "threading": {"max_workers": None}, "json_indent": 2, "codec_pipeline": { diff --git a/src/zarr/core/sync.py b/src/zarr/core/sync.py index ffb04e764d..a093b5b644 100644 --- a/src/zarr/core/sync.py +++ b/src/zarr/core/sync.py @@ -4,6 +4,7 @@ import atexit import logging import os +import sys import threading from concurrent.futures import ThreadPoolExecutor, wait from typing import TYPE_CHECKING, TypeVar @@ -165,6 +166,31 @@ def sync( return return_result +def _create_event_loop() -> asyncio.AbstractEventLoop: + """Create a new event loop, optionally using uvloop if available and enabled.""" + use_uvloop = config.get("async.use_uvloop", True) + + if use_uvloop and sys.platform != "win32": + try: + import uvloop + + logger.debug("Creating Zarr event loop with uvloop") + # uvloop.new_event_loop() returns a loop compatible with AbstractEventLoop + loop: asyncio.AbstractEventLoop = uvloop.new_event_loop() + except ImportError: + logger.debug("uvloop not available, falling back to asyncio") + else: + return loop + else: + if not use_uvloop: + logger.debug("uvloop disabled via config, using asyncio") + else: + logger.debug("uvloop not supported on Windows, using asyncio") + + logger.debug("Creating Zarr event loop with asyncio") + return asyncio.new_event_loop() + + def _get_loop() -> asyncio.AbstractEventLoop: """Create or return the default fsspec IO loop @@ -175,8 +201,7 @@ def _get_loop() -> asyncio.AbstractEventLoop: # repeat the check just in case the loop got filled between the # previous two calls from another thread if loop[0] is None: - logger.debug("Creating Zarr event loop") - new_loop = asyncio.new_event_loop() + new_loop = _create_event_loop() loop[0] = new_loop iothread[0] = threading.Thread(target=new_loop.run_forever, name="zarr_io") assert iothread[0] is not None diff --git a/tests/test_sync.py b/tests/test_sync.py index c5eadb0f4f..3b388acf1e 100644 --- a/tests/test_sync.py +++ b/tests/test_sync.py @@ -1,6 +1,8 @@ import asyncio +import importlib.util +import sys from collections.abc import AsyncGenerator -from unittest.mock import AsyncMock, patch +from unittest.mock import AsyncMock, call, patch import pytest @@ -8,6 +10,7 @@ from zarr.core.sync import ( SyncError, SyncMixin, + _create_event_loop, _get_executor, _get_lock, _get_loop, @@ -150,16 +153,175 @@ def test_threadpool_executor(clean_state, workers: int | None) -> None: if workers is None: # confirm no executor was created if no workers were specified # (this is the default behavior) - assert loop[0]._default_executor is None + # Note: uvloop doesn't expose _default_executor attribute, so we skip this check for uvloop + if hasattr(loop[0], "_default_executor"): + assert loop[0]._default_executor is None else: # confirm executor was created and attached to loop as the default executor # note: python doesn't have a direct way to get the default executor so we - # use the private attribute - assert _get_executor() is loop[0]._default_executor + # use the private attribute (when available) assert _get_executor()._max_workers == workers + if hasattr(loop[0], "_default_executor"): + assert _get_executor() is loop[0]._default_executor def test_cleanup_resources_idempotent() -> None: _get_executor() # trigger resource creation (iothread, loop, thread-pool) cleanup_resources() cleanup_resources() + + +def test_create_event_loop_default_config() -> None: + """Test that _create_event_loop respects the default config.""" + # Reset config to default + with zarr.config.set({"async.use_uvloop": True}): + loop = _create_event_loop() + if sys.platform != "win32": + if importlib.util.find_spec("uvloop") is not None: + # uvloop is available, should use it + assert "uvloop" in str(type(loop)) + else: + # uvloop not available, should use asyncio + assert isinstance(loop, asyncio.AbstractEventLoop) + assert "uvloop" not in str(type(loop)) + else: + # Windows doesn't support uvloop + assert isinstance(loop, asyncio.AbstractEventLoop) + assert "uvloop" not in str(type(loop)) + + loop.close() + + +def test_create_event_loop_uvloop_disabled() -> None: + """Test that uvloop can be disabled via config.""" + with zarr.config.set({"async.use_uvloop": False}): + loop = _create_event_loop() + # Should always use asyncio when disabled + assert isinstance(loop, asyncio.AbstractEventLoop) + assert "uvloop" not in str(type(loop)) + loop.close() + + +@pytest.mark.skipif(sys.platform == "win32", reason="uvloop is not supported on Windows") +@pytest.mark.skipif(importlib.util.find_spec("uvloop") is None, reason="uvloop is not installed") +def test_create_event_loop_uvloop_enabled_non_windows() -> None: + """Test uvloop usage on non-Windows platforms when uvloop is installed.""" + with zarr.config.set({"async.use_uvloop": True}): + loop = _create_event_loop() + # uvloop is available and should be used + assert "uvloop" in str(type(loop)) + loop.close() + + +@pytest.mark.skipif(sys.platform != "win32", reason="This test is specific to Windows behavior") +def test_create_event_loop_windows_no_uvloop() -> None: + """Test that uvloop is never used on Windows.""" + with zarr.config.set({"async.use_uvloop": True}): + loop = _create_event_loop() + # Should use asyncio even when uvloop is requested on Windows + assert isinstance(loop, asyncio.AbstractEventLoop) + assert "uvloop" not in str(type(loop)) + loop.close() + + +def test_uvloop_config_environment_variable() -> None: + """Test that uvloop can be controlled via environment variable.""" + # This test verifies the config system works with uvloop setting + # We test both True and False values + with zarr.config.set({"async.use_uvloop": False}): + assert zarr.config.get("async.use_uvloop") is False + + with zarr.config.set({"async.use_uvloop": True}): + assert zarr.config.get("async.use_uvloop") is True + + +def test_uvloop_integration_with_zarr_operations(clean_state) -> None: + """Test that uvloop integration doesn't break zarr operations.""" + # Test with uvloop enabled (default) + with zarr.config.set({"async.use_uvloop": True}): + arr = zarr.zeros((10, 10), chunks=(5, 5)) + arr[0, 0] = 42.0 + result = arr[0, 0] + assert result == 42.0 + + # Test with uvloop disabled + with zarr.config.set({"async.use_uvloop": False}): + arr2 = zarr.zeros((10, 10), chunks=(5, 5)) + arr2[0, 0] = 24.0 + result2 = arr2[0, 0] + assert result2 == 24.0 + + +@patch("zarr.core.sync.logger.debug") +def test_uvloop_logging_availability(mock_debug, clean_state) -> None: + """Test that appropriate debug messages are logged.""" + # Test with uvloop enabled + with zarr.config.set({"async.use_uvloop": True}): + loop = _create_event_loop() + + if sys.platform != "win32": + if importlib.util.find_spec("uvloop") is not None: + # Should log that uvloop is being used + mock_debug.assert_called_with("Creating Zarr event loop with uvloop") + else: + # Should log fallback to asyncio + mock_debug.assert_called_with("uvloop not available, falling back to asyncio") + else: + # Should log that uvloop is not supported on Windows + mock_debug.assert_called_with("uvloop not supported on Windows, using asyncio") + + loop.close() + + +@pytest.mark.skipif(sys.platform == "win32", reason="uvloop is not supported on Windows") +@pytest.mark.skipif(importlib.util.find_spec("uvloop") is None, reason="uvloop is not installed") +@patch("zarr.core.sync.logger.debug") +def test_uvloop_logging_with_uvloop_installed(mock_debug, clean_state) -> None: + """Test that uvloop is logged when installed and enabled.""" + with zarr.config.set({"async.use_uvloop": True}): + loop = _create_event_loop() + # Should log that uvloop is being used + mock_debug.assert_called_with("Creating Zarr event loop with uvloop") + loop.close() + + +@pytest.mark.skipif(importlib.util.find_spec("uvloop") is not None, reason="uvloop is installed") +@patch("zarr.core.sync.logger.debug") +def test_uvloop_logging_without_uvloop_installed(mock_debug, clean_state) -> None: + """Test that fallback to asyncio is logged when uvloop is not installed.""" + with zarr.config.set({"async.use_uvloop": True}): + loop = _create_event_loop() + if sys.platform != "win32": + # Should log fallback to asyncio + mock_debug.assert_called_with("uvloop not available, falling back to asyncio") + else: + # Should log that uvloop is not supported on Windows + mock_debug.assert_called_with("uvloop not supported on Windows, using asyncio") + loop.close() + + +@patch("zarr.core.sync.logger.debug") +def test_uvloop_logging_disabled(mock_debug, clean_state) -> None: + """Test that appropriate debug message is logged when uvloop is disabled.""" + with zarr.config.set({"async.use_uvloop": False}): + loop = _create_event_loop() + # Should log both that uvloop is disabled and the final loop creation + expected_calls = [ + call("uvloop disabled via config, using asyncio"), + call("Creating Zarr event loop with asyncio"), + ] + mock_debug.assert_has_calls(expected_calls) + loop.close() + + +def test_uvloop_mock_import_error(clean_state) -> None: + """Test graceful handling when uvloop import fails.""" + with zarr.config.set({"async.use_uvloop": True}): + # Mock uvloop import failure + with patch.dict("sys.modules", {"uvloop": None}): + with patch("builtins.__import__", side_effect=ImportError("No module named 'uvloop'")): + loop = _create_event_loop() + # Should fall back to asyncio + assert isinstance(loop, asyncio.AbstractEventLoop) + assert "uvloop" not in str(type(loop)) + loop.close() From 870b4af411ddfb7fb77fcb8277bebfc59f625cac Mon Sep 17 00:00:00 2001 From: Joseph Hamman Date: Fri, 12 Sep 2025 20:41:06 -0700 Subject: [PATCH 2/8] feature: optionally support uvloop --- changes/xxxx.feature.rst | 6 +++ docs/user-guide/performance.rst | 70 +++++++++++++++++++++++++++++++++ pyproject.toml | 4 +- tests/test_config.py | 2 +- 4 files changed, 79 insertions(+), 3 deletions(-) create mode 100644 changes/xxxx.feature.rst diff --git a/changes/xxxx.feature.rst b/changes/xxxx.feature.rst new file mode 100644 index 0000000000..d268ed46ff --- /dev/null +++ b/changes/xxxx.feature.rst @@ -0,0 +1,6 @@ +Add optional uvloop support for improved async performance. + +When uvloop is available, Zarr will automatically use it as the event loop implementation +for better I/O performance. This can be controlled via the ``async.use_uvloop`` configuration +setting or the ``ZARR_ASYNC__USE_UVLOOP`` environment variable. uvloop can be installed +with ``pip install 'zarr[uvloop]'``. \ No newline at end of file diff --git a/docs/user-guide/performance.rst b/docs/user-guide/performance.rst index 0f31e5d7be..70a9219bab 100644 --- a/docs/user-guide/performance.rst +++ b/docs/user-guide/performance.rst @@ -270,6 +270,76 @@ E.g., pickle/unpickle an local store array:: >>> np.all(z1[:] == z2[:]) np.True_ +.. _user-guide-uvloop: + +Event loop optimization with uvloop +----------------------------------- + +Zarr can optionally use `uvloop `_, a fast, +drop-in replacement for the default Python asyncio event loop implementation. +uvloop is written in Cython and built on top of libuv, providing significantly +better performance for I/O-intensive operations. + +When uvloop is available, Zarr will use it by default for better performance. +This is particularly beneficial when working with remote storage backends or +performing many concurrent operations. + +Installation +~~~~~~~~~~~~ + +To enable uvloop support, install it as an optional dependency:: + + pip install 'zarr[uvloop]' + +Or install uvloop directly:: + + pip install uvloop + +Configuration +~~~~~~~~~~~~~ + +uvloop usage can be controlled via Zarr's configuration system: + +.. code-block:: python + + import zarr + + # Enable uvloop (default when available) + zarr.config.set({"async.use_uvloop": True}) + + # Disable uvloop (use standard asyncio) + zarr.config.set({"async.use_uvloop": False}) + +You can also control this via environment variables:: + + # Disable uvloop + export ZARR_ASYNC__USE_UVLOOP=false + +Platform Support +~~~~~~~~~~~~~~~~~ + +uvloop is supported on: + +- Linux +- macOS +- Other Unix-like systems + +uvloop is **not** supported on Windows. On Windows, Zarr will automatically +fall back to the standard asyncio event loop regardless of the configuration setting. + +Performance Benefits +~~~~~~~~~~~~~~~~~~~~ + +uvloop can provide performance improvements for: + +- Remote storage operations (S3, GCS, etc.) +- Concurrent array operations +- Large numbers of small I/O operations +- Network-bound workloads + +The performance improvement varies depending on the workload, but can be +substantial for I/O-intensive operations. + .. _user-guide-tips-blosc: Configuring Blosc diff --git a/pyproject.toml b/pyproject.toml index 3d4b2f6dc9..afd3b4f687 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -91,8 +91,7 @@ remote_tests = [ "moto[s3,server]", "requests", ] -optional = ["rich", "universal-pathlib"] -uvloop = ["uvloop"] +optional = ["rich", "universal-pathlib", "uvloop"] docs = [ # Doc building 'sphinx==8.1.3', @@ -230,6 +229,7 @@ dependencies = [ 'typing_extensions==4.9.*', 'donfig==0.8.*', 'obstore==0.5.*', + 'uvloop==0.20.0', # test deps 'zarr[test]', 'zarr[remote_tests]', diff --git a/tests/test_config.py b/tests/test_config.py index 0c029dda3a..e177b6becf 100644 --- a/tests/test_config.py +++ b/tests/test_config.py @@ -55,7 +55,7 @@ def test_config_defaults_set() -> None: "order": "C", "write_empty_chunks": False, }, - "async": {"concurrency": 10, "timeout": None}, + "async": {"concurrency": 10, "timeout": None, "use_uvloop": True}, "threading": {"max_workers": None}, "json_indent": 2, "codec_pipeline": { From 2e59923e0183ddfa6d0e584fce4fd96b01fb8ac9 Mon Sep 17 00:00:00 2001 From: Joseph Hamman Date: Sat, 13 Sep 2025 20:19:14 -0700 Subject: [PATCH 3/8] fixup tests --- tests/test_sync.py | 108 ++++----------------------------------------- 1 file changed, 9 insertions(+), 99 deletions(-) diff --git a/tests/test_sync.py b/tests/test_sync.py index 3b388acf1e..4a68026fe9 100644 --- a/tests/test_sync.py +++ b/tests/test_sync.py @@ -1,8 +1,7 @@ import asyncio -import importlib.util import sys from collections.abc import AsyncGenerator -from unittest.mock import AsyncMock, call, patch +from unittest.mock import AsyncMock, patch import pytest @@ -177,10 +176,11 @@ def test_create_event_loop_default_config() -> None: with zarr.config.set({"async.use_uvloop": True}): loop = _create_event_loop() if sys.platform != "win32": - if importlib.util.find_spec("uvloop") is not None: - # uvloop is available, should use it - assert "uvloop" in str(type(loop)) - else: + try: + import uvloop + + assert isinstance(loop, uvloop.Loop) + except ImportError: # uvloop not available, should use asyncio assert isinstance(loop, asyncio.AbstractEventLoop) assert "uvloop" not in str(type(loop)) @@ -203,13 +203,13 @@ def test_create_event_loop_uvloop_disabled() -> None: @pytest.mark.skipif(sys.platform == "win32", reason="uvloop is not supported on Windows") -@pytest.mark.skipif(importlib.util.find_spec("uvloop") is None, reason="uvloop is not installed") def test_create_event_loop_uvloop_enabled_non_windows() -> None: """Test uvloop usage on non-Windows platforms when uvloop is installed.""" + uvloop = pytest.importorskip("uvloop") + with zarr.config.set({"async.use_uvloop": True}): loop = _create_event_loop() - # uvloop is available and should be used - assert "uvloop" in str(type(loop)) + assert isinstance(loop, uvloop.Loop) loop.close() @@ -224,96 +224,6 @@ def test_create_event_loop_windows_no_uvloop() -> None: loop.close() -def test_uvloop_config_environment_variable() -> None: - """Test that uvloop can be controlled via environment variable.""" - # This test verifies the config system works with uvloop setting - # We test both True and False values - with zarr.config.set({"async.use_uvloop": False}): - assert zarr.config.get("async.use_uvloop") is False - - with zarr.config.set({"async.use_uvloop": True}): - assert zarr.config.get("async.use_uvloop") is True - - -def test_uvloop_integration_with_zarr_operations(clean_state) -> None: - """Test that uvloop integration doesn't break zarr operations.""" - # Test with uvloop enabled (default) - with zarr.config.set({"async.use_uvloop": True}): - arr = zarr.zeros((10, 10), chunks=(5, 5)) - arr[0, 0] = 42.0 - result = arr[0, 0] - assert result == 42.0 - - # Test with uvloop disabled - with zarr.config.set({"async.use_uvloop": False}): - arr2 = zarr.zeros((10, 10), chunks=(5, 5)) - arr2[0, 0] = 24.0 - result2 = arr2[0, 0] - assert result2 == 24.0 - - -@patch("zarr.core.sync.logger.debug") -def test_uvloop_logging_availability(mock_debug, clean_state) -> None: - """Test that appropriate debug messages are logged.""" - # Test with uvloop enabled - with zarr.config.set({"async.use_uvloop": True}): - loop = _create_event_loop() - - if sys.platform != "win32": - if importlib.util.find_spec("uvloop") is not None: - # Should log that uvloop is being used - mock_debug.assert_called_with("Creating Zarr event loop with uvloop") - else: - # Should log fallback to asyncio - mock_debug.assert_called_with("uvloop not available, falling back to asyncio") - else: - # Should log that uvloop is not supported on Windows - mock_debug.assert_called_with("uvloop not supported on Windows, using asyncio") - - loop.close() - - -@pytest.mark.skipif(sys.platform == "win32", reason="uvloop is not supported on Windows") -@pytest.mark.skipif(importlib.util.find_spec("uvloop") is None, reason="uvloop is not installed") -@patch("zarr.core.sync.logger.debug") -def test_uvloop_logging_with_uvloop_installed(mock_debug, clean_state) -> None: - """Test that uvloop is logged when installed and enabled.""" - with zarr.config.set({"async.use_uvloop": True}): - loop = _create_event_loop() - # Should log that uvloop is being used - mock_debug.assert_called_with("Creating Zarr event loop with uvloop") - loop.close() - - -@pytest.mark.skipif(importlib.util.find_spec("uvloop") is not None, reason="uvloop is installed") -@patch("zarr.core.sync.logger.debug") -def test_uvloop_logging_without_uvloop_installed(mock_debug, clean_state) -> None: - """Test that fallback to asyncio is logged when uvloop is not installed.""" - with zarr.config.set({"async.use_uvloop": True}): - loop = _create_event_loop() - if sys.platform != "win32": - # Should log fallback to asyncio - mock_debug.assert_called_with("uvloop not available, falling back to asyncio") - else: - # Should log that uvloop is not supported on Windows - mock_debug.assert_called_with("uvloop not supported on Windows, using asyncio") - loop.close() - - -@patch("zarr.core.sync.logger.debug") -def test_uvloop_logging_disabled(mock_debug, clean_state) -> None: - """Test that appropriate debug message is logged when uvloop is disabled.""" - with zarr.config.set({"async.use_uvloop": False}): - loop = _create_event_loop() - # Should log both that uvloop is disabled and the final loop creation - expected_calls = [ - call("uvloop disabled via config, using asyncio"), - call("Creating Zarr event loop with asyncio"), - ] - mock_debug.assert_has_calls(expected_calls) - loop.close() - - def test_uvloop_mock_import_error(clean_state) -> None: """Test graceful handling when uvloop import fails.""" with zarr.config.set({"async.use_uvloop": True}): From 64bc79f65a00a2857c4ef3181b46a749d1481f6f Mon Sep 17 00:00:00 2001 From: Joseph Hamman Date: Sat, 13 Sep 2025 20:20:10 -0700 Subject: [PATCH 4/8] change note --- changes/{xxxx.feature.rst => 3452.feature.rst} | 0 1 file changed, 0 insertions(+), 0 deletions(-) rename changes/{xxxx.feature.rst => 3452.feature.rst} (100%) diff --git a/changes/xxxx.feature.rst b/changes/3452.feature.rst similarity index 100% rename from changes/xxxx.feature.rst rename to changes/3452.feature.rst From 9a0ea9d9cc1cb4dedefe3a817f0462903c9f7892 Mon Sep 17 00:00:00 2001 From: Joseph Hamman Date: Sat, 13 Sep 2025 20:31:19 -0700 Subject: [PATCH 5/8] fixup ci --- changes/3452.feature.rst | 2 +- docs/user-guide/performance.rst | 9 +++++++-- pyproject.toml | 8 ++++++-- 3 files changed, 14 insertions(+), 5 deletions(-) diff --git a/changes/3452.feature.rst b/changes/3452.feature.rst index d268ed46ff..ed2f416ada 100644 --- a/changes/3452.feature.rst +++ b/changes/3452.feature.rst @@ -3,4 +3,4 @@ Add optional uvloop support for improved async performance. When uvloop is available, Zarr will automatically use it as the event loop implementation for better I/O performance. This can be controlled via the ``async.use_uvloop`` configuration setting or the ``ZARR_ASYNC__USE_UVLOOP`` environment variable. uvloop can be installed -with ``pip install 'zarr[uvloop]'``. \ No newline at end of file +with ``pip install 'zarr[optional]'`` (Unix/Linux/macOS only). \ No newline at end of file diff --git a/docs/user-guide/performance.rst b/docs/user-guide/performance.rst index 70a9219bab..2f5e19eb30 100644 --- a/docs/user-guide/performance.rst +++ b/docs/user-guide/performance.rst @@ -289,12 +289,17 @@ Installation To enable uvloop support, install it as an optional dependency:: - pip install 'zarr[uvloop]' + pip install 'zarr[optional]' -Or install uvloop directly:: +Or install uvloop directly (Unix/Linux/macOS only):: pip install uvloop +.. note:: + uvloop is automatically included in the ``optional`` dependency group, but only + installed on supported platforms (Unix/Linux/macOS). On Windows, the installation + will succeed but uvloop will be skipped. + Configuration ~~~~~~~~~~~~~ diff --git a/pyproject.toml b/pyproject.toml index afd3b4f687..9f30117bc3 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -91,7 +91,11 @@ remote_tests = [ "moto[s3,server]", "requests", ] -optional = ["rich", "universal-pathlib", "uvloop"] +optional = [ + "rich", + "universal-pathlib", + "uvloop; sys_platform != 'win32'" +] docs = [ # Doc building 'sphinx==8.1.3', @@ -229,7 +233,7 @@ dependencies = [ 'typing_extensions==4.9.*', 'donfig==0.8.*', 'obstore==0.5.*', - 'uvloop==0.20.0', + 'uvloop==0.20.0; sys_platform != "win32"', # test deps 'zarr[test]', 'zarr[remote_tests]', From a68324485e8d5ee5a225d0283f1da65827090886 Mon Sep 17 00:00:00 2001 From: Joseph Hamman Date: Sat, 13 Sep 2025 20:41:12 -0700 Subject: [PATCH 6/8] fixup doctest --- docs/user-guide/config.rst | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/docs/user-guide/config.rst b/docs/user-guide/config.rst index 0ae8017ca9..cad3c45375 100644 --- a/docs/user-guide/config.rst +++ b/docs/user-guide/config.rst @@ -44,7 +44,7 @@ This is the current default configuration:: >>> zarr.config.pprint() {'array': {'order': 'C', 'write_empty_chunks': False}, - 'async': {'concurrency': 10, 'timeout': None}, + 'async': {'concurrency': 10, 'timeout': None, 'use_uvloop': True}, 'buffer': 'zarr.buffer.cpu.Buffer', 'codec_pipeline': {'batch_size': 1, 'path': 'zarr.core.codec_pipeline.BatchedCodecPipeline'}, From fb4d1c09c9dbaec4dd1fe75031e758427de92ae2 Mon Sep 17 00:00:00 2001 From: Joseph Hamman Date: Sat, 13 Sep 2025 20:46:39 -0700 Subject: [PATCH 7/8] fixup for windows --- tests/test_sync.py | 18 ++++++++++-------- 1 file changed, 10 insertions(+), 8 deletions(-) diff --git a/tests/test_sync.py b/tests/test_sync.py index 4a68026fe9..f471e25343 100644 --- a/tests/test_sync.py +++ b/tests/test_sync.py @@ -68,7 +68,7 @@ def test_sync_timeout() -> None: async def foo() -> None: await asyncio.sleep(duration) - with pytest.raises(asyncio.TimeoutError): + with pytest.raises(TimeoutError): sync(foo(), timeout=duration / 10) @@ -224,14 +224,16 @@ def test_create_event_loop_windows_no_uvloop() -> None: loop.close() +@pytest.mark.skipif(sys.platform == "win32", reason="uvloop is not supported on Windows") def test_uvloop_mock_import_error(clean_state) -> None: """Test graceful handling when uvloop import fails.""" with zarr.config.set({"async.use_uvloop": True}): - # Mock uvloop import failure + # Mock uvloop import failure by putting None in sys.modules + # This simulates the module being unavailable/corrupted with patch.dict("sys.modules", {"uvloop": None}): - with patch("builtins.__import__", side_effect=ImportError("No module named 'uvloop'")): - loop = _create_event_loop() - # Should fall back to asyncio - assert isinstance(loop, asyncio.AbstractEventLoop) - assert "uvloop" not in str(type(loop)) - loop.close() + # When Python tries to import uvloop, it will get None and treat it as ImportError + loop = _create_event_loop() + # Should fall back to asyncio + assert isinstance(loop, asyncio.AbstractEventLoop) + assert "uvloop" not in str(type(loop)) + loop.close() From 531167e470c6cfff1487c9c2c03aeb5e02a8ec30 Mon Sep 17 00:00:00 2001 From: Joseph Hamman Date: Wed, 15 Oct 2025 09:28:46 +0200 Subject: [PATCH 8/8] set event loop api --- src/zarr/__init__.py | 2 + src/zarr/core/config.py | 2 +- src/zarr/core/sync.py | 102 ++++++++++++++++- tests/test_config.py | 2 +- tests/test_sync.py | 235 ++++++++++++++++++++++++++++++++++++---- 5 files changed, 320 insertions(+), 23 deletions(-) diff --git a/src/zarr/__init__.py b/src/zarr/__init__.py index 0d58ecf8e8..1b90f58c56 100644 --- a/src/zarr/__init__.py +++ b/src/zarr/__init__.py @@ -33,6 +33,7 @@ from zarr.core.array import Array, AsyncArray from zarr.core.config import config from zarr.core.group import AsyncGroup, Group +from zarr.core.sync import set_event_loop # in case setuptools scm screw up and find version to be 0.0.0 assert not __version__.startswith("0.0.0") @@ -119,6 +120,7 @@ def print_packages(packages: list[str]) -> None: "save", "save_array", "save_group", + "set_event_loop", "tree", "zeros", "zeros_like", diff --git a/src/zarr/core/config.py b/src/zarr/core/config.py index 3e21a913d5..d88a41cfa8 100644 --- a/src/zarr/core/config.py +++ b/src/zarr/core/config.py @@ -107,7 +107,7 @@ def enable_gpu(self) -> ConfigSet: "order": "C", "write_empty_chunks": False, }, - "async": {"concurrency": 10, "timeout": None, "use_uvloop": True}, + "async": {"concurrency": 10, "timeout": None, "use_uvloop": False}, "threading": {"max_workers": None}, "json_indent": 2, "codec_pipeline": { diff --git a/src/zarr/core/sync.py b/src/zarr/core/sync.py index a093b5b644..970d4489fe 100644 --- a/src/zarr/core/sync.py +++ b/src/zarr/core/sync.py @@ -167,8 +167,13 @@ def sync( def _create_event_loop() -> asyncio.AbstractEventLoop: - """Create a new event loop, optionally using uvloop if available and enabled.""" - use_uvloop = config.get("async.use_uvloop", True) + """Create a new event loop, optionally using uvloop if available and enabled. + + By default, creates a standard asyncio event loop. If uvloop is desired for + improved performance in I/O-intensive workloads, set the config option + ``async.use_uvloop`` to ``True``. + """ + use_uvloop = config.get("async.use_uvloop", False) if use_uvloop and sys.platform != "win32": try: @@ -254,3 +259,96 @@ async def _with_semaphore( return await func() async with semaphore: return await func() + + +def set_event_loop(new_loop: asyncio.AbstractEventLoop) -> None: + """Set a custom event loop for Zarr operations. + + This function allows third-party developers to provide their own event loop + implementation (e.g., uvloop) for Zarr to use for async operations. The provided + event loop should already be running in a background thread, or be managed by + the caller. + + .. warning:: + This is an advanced API. Setting a custom event loop after Zarr has already + created its own internal loop may lead to unexpected behavior. It's recommended + to call this function before any Zarr operations are performed. + + .. note:: + The provided event loop must be running and will be used for all subsequent + Zarr async operations. The caller is responsible for managing the loop's + lifecycle (starting, stopping, cleanup). + + Parameters + ---------- + new_loop : asyncio.AbstractEventLoop + The event loop instance to use for Zarr operations. Must be an instance of + asyncio.AbstractEventLoop (or a compatible implementation like uvloop.Loop). + + Raises + ------ + TypeError + If new_loop is not an instance of asyncio.AbstractEventLoop + + Examples + -------- + Using uvloop with Zarr: + + >>> import asyncio + >>> import threading + >>> import uvloop + >>> from zarr.core.sync import set_event_loop + >>> + >>> # Create and start uvloop in a background thread + >>> uvloop_instance = uvloop.new_event_loop() + >>> thread = threading.Thread(target=uvloop_instance.run_forever, daemon=True) + >>> thread.start() + >>> + >>> # Set it as Zarr's event loop + >>> set_event_loop(uvloop_instance) + >>> + >>> # Now all Zarr operations will use uvloop + >>> import zarr + >>> store = zarr.storage.MemoryStore() + >>> group = zarr.open_group(store=store, mode="w") + + Using a custom event loop with specific configuration: + + >>> import asyncio + >>> import threading + >>> from zarr.core.sync import set_event_loop + >>> + >>> # Create event loop with custom settings + >>> custom_loop = asyncio.new_event_loop() + >>> custom_loop.set_debug(True) # Enable debug mode + >>> + >>> # Start the loop in a background thread + >>> thread = threading.Thread(target=custom_loop.run_forever, daemon=True) + >>> thread.start() + >>> + >>> # Set as Zarr's loop + >>> set_event_loop(custom_loop) + + See Also + -------- + zarr.core.sync.cleanup_resources : Clean up Zarr's event loop and thread pool + """ + if not isinstance(new_loop, asyncio.AbstractEventLoop): + raise TypeError( + f"new_loop must be an instance of asyncio.AbstractEventLoop, " + f"got {type(new_loop).__name__}" + ) + + global loop, iothread + + with _get_lock(): + if loop[0] is not None: + logger.warning( + "Replacing existing Zarr event loop. This may cause issues if the " + "previous loop had pending operations. Consider calling " + "cleanup_resources() first." + ) + + loop[0] = new_loop + # Note: iothread is managed by the caller when using set_event_loop + iothread[0] = None diff --git a/tests/test_config.py b/tests/test_config.py index 1929e20f7a..a3fa26e70a 100644 --- a/tests/test_config.py +++ b/tests/test_config.py @@ -55,7 +55,7 @@ def test_config_defaults_set() -> None: "order": "C", "write_empty_chunks": False, }, - "async": {"concurrency": 10, "timeout": None, "use_uvloop": True}, + "async": {"concurrency": 10, "timeout": None, "use_uvloop": False}, "threading": {"max_workers": None}, "json_indent": 2, "codec_pipeline": { diff --git a/tests/test_sync.py b/tests/test_sync.py index f471e25343..2476d0a102 100644 --- a/tests/test_sync.py +++ b/tests/test_sync.py @@ -1,5 +1,7 @@ import asyncio import sys +import threading +import time from collections.abc import AsyncGenerator from unittest.mock import AsyncMock, patch @@ -15,8 +17,10 @@ _get_loop, cleanup_resources, loop, + set_event_loop, sync, ) +from zarr.storage import MemoryStore @pytest.fixture(params=[True, False]) @@ -171,25 +175,13 @@ def test_cleanup_resources_idempotent() -> None: def test_create_event_loop_default_config() -> None: - """Test that _create_event_loop respects the default config.""" - # Reset config to default - with zarr.config.set({"async.use_uvloop": True}): - loop = _create_event_loop() - if sys.platform != "win32": - try: - import uvloop - - assert isinstance(loop, uvloop.Loop) - except ImportError: - # uvloop not available, should use asyncio - assert isinstance(loop, asyncio.AbstractEventLoop) - assert "uvloop" not in str(type(loop)) - else: - # Windows doesn't support uvloop - assert isinstance(loop, asyncio.AbstractEventLoop) - assert "uvloop" not in str(type(loop)) - - loop.close() + """Test that _create_event_loop uses asyncio by default.""" + # Default config should use asyncio (not uvloop) + loop = _create_event_loop() + # Should always use asyncio by default + assert isinstance(loop, asyncio.AbstractEventLoop) + assert "uvloop" not in str(type(loop)) + loop.close() def test_create_event_loop_uvloop_disabled() -> None: @@ -237,3 +229,208 @@ def test_uvloop_mock_import_error(clean_state) -> None: assert isinstance(loop, asyncio.AbstractEventLoop) assert "uvloop" not in str(type(loop)) loop.close() + + +# Tests for set_event_loop + + +def test_set_event_loop_basic(clean_state) -> None: + """Test basic functionality of set_event_loop.""" + # Create a custom event loop + custom_loop = asyncio.new_event_loop() + + # Start it in a background thread + thread = threading.Thread(target=custom_loop.run_forever, daemon=True) + thread.start() + time.sleep(0.1) + + # Set it as Zarr's loop + set_event_loop(custom_loop) + + # Verify that Zarr operations use this loop + store = MemoryStore() + group = zarr.open_group(store=store, mode="w") + array = group.create_array("test", shape=(10, 10), chunks=(5, 5), dtype="float32") + + # Write and read data + import numpy as np + + data = np.random.random((10, 10)).astype("float32") + array[:] = data + result = array[:] + + assert np.array_equal(data, result) + + # Clean up + custom_loop.call_soon_threadsafe(custom_loop.stop) + thread.join(timeout=1.0) + custom_loop.close() + + +@pytest.mark.skipif(sys.platform == "win32", reason="uvloop not supported on Windows") +def test_set_event_loop_with_uvloop(clean_state) -> None: + """Test set_event_loop with uvloop.""" + uvloop = pytest.importorskip("uvloop") + + # Create uvloop instance + uvloop_instance = uvloop.new_event_loop() + + # Start it in a background thread + thread = threading.Thread(target=uvloop_instance.run_forever, daemon=True) + thread.start() + time.sleep(0.1) + + # Set as Zarr's loop + set_event_loop(uvloop_instance) + + # Perform Zarr operations + store = MemoryStore() + group = zarr.open_group(store=store, mode="w") + array = group.create_array("test", shape=(20, 20), chunks=(10, 10), dtype="int32") + + # Verify operations work + import numpy as np + + data = np.arange(400).reshape(20, 20).astype("int32") + array[:] = data + result = array[:] + + assert np.array_equal(data, result) + + # Verify we're actually using uvloop + assert "uvloop" in str(type(uvloop_instance)) + + # Clean up + uvloop_instance.call_soon_threadsafe(uvloop_instance.stop) + thread.join(timeout=1.0) + uvloop_instance.close() + + +def test_set_event_loop_type_validation() -> None: + """Test that set_event_loop validates the input type.""" + # Should raise TypeError for non-loop objects + with pytest.raises(TypeError, match="must be an instance of asyncio.AbstractEventLoop"): + set_event_loop("not a loop") # type: ignore[arg-type] + + with pytest.raises(TypeError, match="must be an instance of asyncio.AbstractEventLoop"): + set_event_loop(123) # type: ignore[arg-type] + + with pytest.raises(TypeError, match="must be an instance of asyncio.AbstractEventLoop"): + set_event_loop(None) # type: ignore[arg-type] + + +def test_set_event_loop_warns_on_replacement(clean_state, caplog) -> None: + """Test that replacing an existing loop produces a warning.""" + # First, trigger creation of Zarr's default loop + store = MemoryStore() + _ = zarr.open_group(store=store, mode="w") + + # Now try to replace it + custom_loop = asyncio.new_event_loop() + thread = threading.Thread(target=custom_loop.run_forever, daemon=True) + thread.start() + time.sleep(0.1) + + # Should produce a warning + with caplog.at_level("WARNING"): + set_event_loop(custom_loop) + + assert "Replacing existing Zarr event loop" in caplog.text + + # Clean up + custom_loop.call_soon_threadsafe(custom_loop.stop) + thread.join(timeout=1.0) + custom_loop.close() + + +def test_set_event_loop_concurrent_operations(clean_state) -> None: + """Test that custom loop handles concurrent Zarr operations.""" + custom_loop = asyncio.new_event_loop() + thread = threading.Thread(target=custom_loop.run_forever, daemon=True) + thread.start() + time.sleep(0.1) + + set_event_loop(custom_loop) + + # Create multiple arrays and perform concurrent operations + store = MemoryStore() + group = zarr.open_group(store=store, mode="w") + + arrays = [] + for i in range(5): + arr = group.create_array(f"array_{i}", shape=(10, 10), chunks=(5, 5), dtype="float32") + arrays.append(arr) + + import numpy as np + + # Write to all arrays + data = np.random.random((10, 10)).astype("float32") + for arr in arrays: + arr[:] = data + + # Read from all arrays + for arr in arrays: + result = arr[:] + assert np.array_equal(data, result) + + # Clean up + custom_loop.call_soon_threadsafe(custom_loop.stop) + thread.join(timeout=1.0) + custom_loop.close() + + +def test_set_event_loop_before_first_use(clean_state) -> None: + """Test setting custom loop before any Zarr operations (recommended usage).""" + # Create and set custom loop BEFORE any Zarr operations + custom_loop = asyncio.new_event_loop() + thread = threading.Thread(target=custom_loop.run_forever, daemon=True) + thread.start() + time.sleep(0.1) + + # Set the loop before doing anything with Zarr + set_event_loop(custom_loop) + + # Now perform Zarr operations + store = MemoryStore() + group = zarr.open_group(store=store, mode="w") + array = group.create_array("test", shape=(10, 10), chunks=(5, 5), dtype="float32") + + import numpy as np + + data = np.random.random((10, 10)).astype("float32") + array[:] = data + result = array[:] + + assert np.array_equal(data, result) + + # Clean up + custom_loop.call_soon_threadsafe(custom_loop.stop) + thread.join(timeout=1.0) + custom_loop.close() + + +def test_set_event_loop_thread_safety(clean_state) -> None: + """Test that set_event_loop is thread-safe.""" + import concurrent.futures + + custom_loop = asyncio.new_event_loop() + thread = threading.Thread(target=custom_loop.run_forever, daemon=True) + thread.start() + time.sleep(0.1) + + # Try setting the loop from multiple threads simultaneously + def set_loop(): + set_event_loop(custom_loop) + return True + + with concurrent.futures.ThreadPoolExecutor(max_workers=5) as executor: + futures = [executor.submit(set_loop) for _ in range(10)] + results = [f.result() for f in concurrent.futures.as_completed(futures)] + + # All should succeed + assert all(results) + + # Clean up + custom_loop.call_soon_threadsafe(custom_loop.stop) + thread.join(timeout=1.0) + custom_loop.close()