Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions changes/3452.feature.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
Add optional uvloop support for improved async performance.

When uvloop is available, Zarr will automatically use it as the event loop implementation
for better I/O performance. This can be controlled via the ``async.use_uvloop`` configuration
setting or the ``ZARR_ASYNC__USE_UVLOOP`` environment variable. uvloop can be installed
with ``pip install 'zarr[optional]'`` (Unix/Linux/macOS only).
2 changes: 1 addition & 1 deletion docs/user-guide/config.rst
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ This is the current default configuration::

>>> zarr.config.pprint()
{'array': {'order': 'C', 'write_empty_chunks': False},
'async': {'concurrency': 10, 'timeout': None},
'async': {'concurrency': 10, 'timeout': None, 'use_uvloop': True},
'buffer': 'zarr.buffer.cpu.Buffer',
'codec_pipeline': {'batch_size': 1,
'path': 'zarr.core.codec_pipeline.BatchedCodecPipeline'},
Expand Down
75 changes: 75 additions & 0 deletions docs/user-guide/performance.rst
Original file line number Diff line number Diff line change
Expand Up @@ -270,6 +270,81 @@ E.g., pickle/unpickle an local store array::
>>> np.all(z1[:] == z2[:])
np.True_

.. _user-guide-uvloop:

Event loop optimization with uvloop
-----------------------------------

Zarr can optionally use `uvloop <https://github.com/MagicStack/uvloop>`_, a fast,
drop-in replacement for the default Python asyncio event loop implementation.
uvloop is written in Cython and built on top of libuv, providing significantly
better performance for I/O-intensive operations.

When uvloop is available, Zarr will use it by default for better performance.
This is particularly beneficial when working with remote storage backends or
performing many concurrent operations.

Installation
~~~~~~~~~~~~

To enable uvloop support, install it as an optional dependency::

pip install 'zarr[optional]'

Or install uvloop directly (Unix/Linux/macOS only)::

pip install uvloop

.. note::
uvloop is automatically included in the ``optional`` dependency group, but only
installed on supported platforms (Unix/Linux/macOS). On Windows, the installation
will succeed but uvloop will be skipped.

Configuration
~~~~~~~~~~~~~

uvloop usage can be controlled via Zarr's configuration system:

.. code-block:: python

import zarr

# Enable uvloop (default when available)
zarr.config.set({"async.use_uvloop": True})

# Disable uvloop (use standard asyncio)
zarr.config.set({"async.use_uvloop": False})

You can also control this via environment variables::

# Disable uvloop
export ZARR_ASYNC__USE_UVLOOP=false

Platform Support
~~~~~~~~~~~~~~~~~

uvloop is supported on:

- Linux
- macOS
- Other Unix-like systems

uvloop is **not** supported on Windows. On Windows, Zarr will automatically
fall back to the standard asyncio event loop regardless of the configuration setting.

Performance Benefits
~~~~~~~~~~~~~~~~~~~~

uvloop can provide performance improvements for:

- Remote storage operations (S3, GCS, etc.)
- Concurrent array operations
- Large numbers of small I/O operations
- Network-bound workloads

The performance improvement varies depending on the workload, but can be
substantial for I/O-intensive operations.

.. _user-guide-tips-blosc:

Configuring Blosc
Expand Down
7 changes: 6 additions & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,11 @@ remote_tests = [
"moto[s3,server]",
"requests",
]
optional = ["rich", "universal-pathlib"]
optional = [
"rich",
"universal-pathlib",
"uvloop; sys_platform != 'win32'"
]
docs = [
# Doc building
'sphinx==8.1.3',
Expand Down Expand Up @@ -234,6 +238,7 @@ dependencies = [
'typing_extensions==4.9.*',
'donfig==0.8.*',
'obstore==0.5.*',
'uvloop==0.20.0; sys_platform != "win32"',
# test deps
'zarr[test]',
'zarr[remote_tests]',
Expand Down
2 changes: 2 additions & 0 deletions src/zarr/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
from zarr.core.array import Array, AsyncArray
from zarr.core.config import config
from zarr.core.group import AsyncGroup, Group
from zarr.core.sync import set_event_loop

# in case setuptools scm screw up and find version to be 0.0.0
assert not __version__.startswith("0.0.0")
Expand Down Expand Up @@ -177,6 +178,7 @@ def set_format(log_format: str) -> None:
"save",
"save_array",
"save_group",
"set_event_loop",
"tree",
"zeros",
"zeros_like",
Expand Down
2 changes: 1 addition & 1 deletion src/zarr/core/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,7 @@ def enable_gpu(self) -> ConfigSet:
"order": "C",
"write_empty_chunks": False,
},
"async": {"concurrency": 10, "timeout": None},
"async": {"concurrency": 10, "timeout": None, "use_uvloop": False},
"threading": {"max_workers": None},
"json_indent": 2,
"codec_pipeline": {
Expand Down
127 changes: 125 additions & 2 deletions src/zarr/core/sync.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import atexit
import logging
import os
import sys
import threading
from concurrent.futures import ThreadPoolExecutor, wait
from typing import TYPE_CHECKING, TypeVar
Expand Down Expand Up @@ -165,6 +166,36 @@ def sync(
return return_result


def _create_event_loop() -> asyncio.AbstractEventLoop:
"""Create a new event loop, optionally using uvloop if available and enabled.

By default, creates a standard asyncio event loop. If uvloop is desired for
improved performance in I/O-intensive workloads, set the config option
``async.use_uvloop`` to ``True``.
"""
use_uvloop = config.get("async.use_uvloop", False)

if use_uvloop and sys.platform != "win32":
try:
import uvloop

logger.debug("Creating Zarr event loop with uvloop")
# uvloop.new_event_loop() returns a loop compatible with AbstractEventLoop
loop: asyncio.AbstractEventLoop = uvloop.new_event_loop()
except ImportError:
logger.debug("uvloop not available, falling back to asyncio")
else:
return loop
else:
if not use_uvloop:
logger.debug("uvloop disabled via config, using asyncio")
else:
logger.debug("uvloop not supported on Windows, using asyncio")

logger.debug("Creating Zarr event loop with asyncio")
return asyncio.new_event_loop()


def _get_loop() -> asyncio.AbstractEventLoop:
"""Create or return the default fsspec IO loop

Expand All @@ -175,8 +206,7 @@ def _get_loop() -> asyncio.AbstractEventLoop:
# repeat the check just in case the loop got filled between the
# previous two calls from another thread
if loop[0] is None:
logger.debug("Creating Zarr event loop")
new_loop = asyncio.new_event_loop()
new_loop = _create_event_loop()
loop[0] = new_loop
iothread[0] = threading.Thread(target=new_loop.run_forever, name="zarr_io")
assert iothread[0] is not None
Expand Down Expand Up @@ -229,3 +259,96 @@ async def _with_semaphore(
return await func()
async with semaphore:
return await func()


def set_event_loop(new_loop: asyncio.AbstractEventLoop) -> None:
"""Set a custom event loop for Zarr operations.

This function allows third-party developers to provide their own event loop
implementation (e.g., uvloop) for Zarr to use for async operations. The provided
event loop should already be running in a background thread, or be managed by
the caller.

.. warning::
This is an advanced API. Setting a custom event loop after Zarr has already
created its own internal loop may lead to unexpected behavior. It's recommended
to call this function before any Zarr operations are performed.

.. note::
The provided event loop must be running and will be used for all subsequent
Zarr async operations. The caller is responsible for managing the loop's
lifecycle (starting, stopping, cleanup).

Parameters
----------
new_loop : asyncio.AbstractEventLoop
The event loop instance to use for Zarr operations. Must be an instance of
asyncio.AbstractEventLoop (or a compatible implementation like uvloop.Loop).

Raises
------
TypeError
If new_loop is not an instance of asyncio.AbstractEventLoop

Examples
--------
Using uvloop with Zarr:

>>> import asyncio
>>> import threading
>>> import uvloop
>>> from zarr.core.sync import set_event_loop
>>>
>>> # Create and start uvloop in a background thread
>>> uvloop_instance = uvloop.new_event_loop()
>>> thread = threading.Thread(target=uvloop_instance.run_forever, daemon=True)
>>> thread.start()
>>>
>>> # Set it as Zarr's event loop
>>> set_event_loop(uvloop_instance)
>>>
>>> # Now all Zarr operations will use uvloop
>>> import zarr
>>> store = zarr.storage.MemoryStore()
>>> group = zarr.open_group(store=store, mode="w")

Using a custom event loop with specific configuration:

>>> import asyncio
>>> import threading
>>> from zarr.core.sync import set_event_loop
>>>
>>> # Create event loop with custom settings
>>> custom_loop = asyncio.new_event_loop()
>>> custom_loop.set_debug(True) # Enable debug mode
>>>
>>> # Start the loop in a background thread
>>> thread = threading.Thread(target=custom_loop.run_forever, daemon=True)
>>> thread.start()
>>>
>>> # Set as Zarr's loop
>>> set_event_loop(custom_loop)

See Also
--------
zarr.core.sync.cleanup_resources : Clean up Zarr's event loop and thread pool
"""
if not isinstance(new_loop, asyncio.AbstractEventLoop):
raise TypeError(
f"new_loop must be an instance of asyncio.AbstractEventLoop, "
f"got {type(new_loop).__name__}"
)

global loop, iothread

with _get_lock():
if loop[0] is not None:
logger.warning(
"Replacing existing Zarr event loop. This may cause issues if the "
"previous loop had pending operations. Consider calling "
"cleanup_resources() first."
)

loop[0] = new_loop
# Note: iothread is managed by the caller when using set_event_loop
iothread[0] = None
2 changes: 1 addition & 1 deletion tests/test_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ def test_config_defaults_set() -> None:
"order": "C",
"write_empty_chunks": False,
},
"async": {"concurrency": 10, "timeout": None},
"async": {"concurrency": 10, "timeout": None, "use_uvloop": False},
"threading": {"max_workers": None},
"json_indent": 2,
"codec_pipeline": {
Expand Down
Loading