Skip to content

RedisLockManager breaks Ray and Dask flows #18017

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
bogdibota opened this issue May 10, 2025 · 1 comment · May be fixed by #18018
Open

RedisLockManager breaks Ray and Dask flows #18017

bogdibota opened this issue May 10, 2025 · 1 comment · May be fixed by #18018
Labels
bug Something isn't working

Comments

@bogdibota
Copy link

Bug summary

Hello! I tried to use cached tasks with IsolationLevel.SERIALIZABLE in a flow with RayTaskRunner (or DaskTaskRunner), but it errors before starting. From my understanding, something related to RedisLockManager seems to error when being serialized.

Here is the minimal example

# /// script
# dependencies = ["prefect", "prefect[redis]", "prefect[dask]", "prefect[ray]"]
# ///

import prefect
from prefect.cache_policies import TASK_SOURCE, INPUTS
from prefect.transactions import IsolationLevel
from prefect_dask.task_runners import DaskTaskRunner
from prefect_redis import RedisLockManager
from prefect_ray import RayTaskRunner

@prefect.task(
    cache_policy=(TASK_SOURCE + INPUTS).configure(
        isolation_level=IsolationLevel.SERIALIZABLE,
        lock_manager=RedisLockManager(host="localhost", port=6379),
    )
)
def cached_task(input: int):
    logger = prefect.get_run_logger()
    logger.info(f"executed task for {input}")
    return input + 1

def simple_flow():
    logger = prefect.get_run_logger()
    results = cached_task.map([1, 2, 2, 3, 3, 3]).result()
    logger.info(f"results: {results}")

if __name__ == "__main__":
    ray_flow = prefect.flow(task_runner=RayTaskRunner)(simple_flow)
    dask_flow = prefect.flow(task_runner=DaskTaskRunner)(simple_flow)
    concurrent_flow = prefect.flow()(simple_flow)

    ray_flow() # fails
    dask_flow() # fails
    concurrent_flow() # works as expected

concurrent_flow work as expected, but both ray_flow and dask_flow flow. I did comment/uncomment to run only one at a time.

disclaimer: i'm new to both python and prefect, maybe i'm missing something here 😅

Version info

Version:             3.4.1
API version:         0.8.4
Python version:      3.13.2
Git commit:          b47ad8e1
Built:               Thu, May 08, 2025 08:42 PM
OS/Arch:             linux/x86_64
Profile:             prefect-cloud
Server type:         cloud
Pydantic version:    2.11.4

Additional context

The regular flow works as expected:

05:49:16.413 | INFO    | Task run 'cached_task-ed2' - executed task for 3
05:49:16.414 | INFO    | Task run 'cached_task-6cb' - executed task for 1
05:49:16.414 | INFO    | Task run 'cached_task-642' - executed task for 2
05:49:16.421 | INFO    | Task run 'cached_task-ed2' - Finished in state Completed()
05:49:16.423 | INFO    | Task run 'cached_task-642' - Finished in state Completed()
05:49:16.423 | INFO    | Task run 'cached_task-6cb' - Finished in state Completed()
05:49:16.517 | INFO    | Task run 'cached_task-bc0' - Finished in state Cached(type=COMPLETED)
05:49:16.519 | INFO    | Task run 'cached_task-a90' - Finished in state Cached(type=COMPLETED)
05:49:16.616 | INFO    | Task run 'cached_task-cec' - Finished in state Cached(type=COMPLETED)
05:49:16.618 | INFO    | Flow run 'mustard-cat' - results: [2, 3, 3, 4, 4, 4]
05:49:16.803 | INFO    | Flow run 'mustard-cat' - Finished in state Completed()

Ray errors with:

TypeError: Could not serialize the argument <prefect.tasks.Task object at 0x7f203c529950> for a task or actor prefect_ray.task_runners.RayTaskRunner._run_prefect_task:
=========================================================================
Checking Serializability of <prefect.tasks.Task object at 0x7f203c529950>
=========================================================================
!!! FAIL serialization: cannot pickle '_thread.lock' object
    Serializing '__wrapped__' <function cached_task at 0x7f2040f2b1a0>...
    Serializing 'fn' <function cached_task at 0x7f2040f2b1a0>...
    Serializing 'apply_async' <bound method Task.apply_async of <prefect.tasks.Task object at 0x7f203c529950>>...
    !!! FAIL serialization: cannot pickle '_thread.lock' object
        Serializing '__func__' <function Task.apply_async at 0x7f203d4620c0>...
    WARNING: Did not find non-serializable object in <bound method Task.apply_async of <prefect.tasks.Task object at 0x7f203c529950>>. This may be an oversight.
=========================================================================
Variable:

        FailTuple(apply_async [obj=<bound method Task.apply_async of <prefect.tasks.Task object at 0x7f203c529950>>, parent=<prefect.tasks.Task object at 0x7f203c529950>])

was found to be non-serializable. There may be multiple other undetected variables that were non-serializable.
Consider either removing the instantiation/imports of these variables or moving the instantiation into the scope of the function/class.
=========================================================================
Check https://docs.ray.io/en/master/ray-core/objects/serialization.html#troubleshooting for more information.
If you have any suggestions on how to improve this error message, please reach out to the Ray developers on github.com/ray-project/ray/issues/
=========================================================================

Dask errors with:

Traceback (most recent call last):
  File "/home/bogdi/projects/reactive/varyn/varyn-core/flows/redis_lock_bug.py", line 34, in <module>
    dask_flow()
    ~~~~~~~~~^^
  File "/home/bogdi/.cache/uv/environments-v2/redis-lock-bug-63ce28e9b3b3ffe7/lib64/python3.13/site-packages/prefect/flows.py", line 1691, in __call__
    return run_flow(
        flow=self,
    ...<2 lines>...
        return_type=return_type,
    )
  File "/home/bogdi/.cache/uv/environments-v2/redis-lock-bug-63ce28e9b3b3ffe7/lib64/python3.13/site-packages/prefect/flow_engine.py", line 1527, in run_flow
    ret_val = run_flow_sync(**kwargs)
  File "/home/bogdi/.cache/uv/environments-v2/redis-lock-bug-63ce28e9b3b3ffe7/lib64/python3.13/site-packages/prefect/flow_engine.py", line 1372, in run_flow_sync
    return engine.state if return_type == "state" else engine.result()
                                                       ~~~~~~~~~~~~~^^
  File "/home/bogdi/.cache/uv/environments-v2/redis-lock-bug-63ce28e9b3b3ffe7/lib64/python3.13/site-packages/prefect/flow_engine.py", line 350, in result
    raise self._raised
  File "/home/bogdi/.cache/uv/environments-v2/redis-lock-bug-63ce28e9b3b3ffe7/lib64/python3.13/site-packages/prefect/flow_engine.py", line 763, in run_context
    yield self
  File "/home/bogdi/.cache/uv/environments-v2/redis-lock-bug-63ce28e9b3b3ffe7/lib64/python3.13/site-packages/prefect/flow_engine.py", line 1370, in run_flow_sync
    engine.call_flow_fn()
    ~~~~~~~~~~~~~~~~~~~^^
  File "/home/bogdi/.cache/uv/environments-v2/redis-lock-bug-63ce28e9b3b3ffe7/lib64/python3.13/site-packages/prefect/flow_engine.py", line 783, in call_flow_fn
    result = call_with_parameters(self.flow.fn, self.parameters)
  File "/home/bogdi/.cache/uv/environments-v2/redis-lock-bug-63ce28e9b3b3ffe7/lib64/python3.13/site-packages/prefect/utilities/callables.py", line 210, in call_with_parameters
    return fn(*args, **kwargs)
  File "/home/bogdi/projects/reactive/varyn/varyn-core/flows/redis_lock_bug.py", line 25, in simple_flow
    results = cached_task.map([1, 2, 2, 3, 3, 3]).result()
              ~~~~~~~~~~~~~~~^^^^^^^^^^^^^^^^^^^^
  File "/home/bogdi/.cache/uv/environments-v2/redis-lock-bug-63ce28e9b3b3ffe7/lib64/python3.13/site-packages/prefect/tasks.py", line 1438, in map
    futures = task_runner.map(self, parameters, wait_for)
  File "/home/bogdi/.cache/uv/environments-v2/redis-lock-bug-63ce28e9b3b3ffe7/lib64/python3.13/site-packages/prefect_dask/task_runners.py", line 481, in map
    return super().map(task, parameters, wait_for)
           ~~~~~~~~~~~^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/home/bogdi/.cache/uv/environments-v2/redis-lock-bug-63ce28e9b3b3ffe7/lib64/python3.13/site-packages/prefect/task_runners.py", line 208, in map
    self.submit(
    ~~~~~~~~~~~^
        task=task,
        ^^^^^^^^^^
    ...<2 lines>...
        dependencies=task_inputs,
        ^^^^^^^^^^^^^^^^^^^^^^^^^
    )
    ^
  File "/home/bogdi/.cache/uv/environments-v2/redis-lock-bug-63ce28e9b3b3ffe7/lib64/python3.13/site-packages/prefect_dask/task_runners.py", line 448, in submit
    future = self.client.submit(
        task,
    ...<3 lines>...
        return_type="state",
    )
  File "/home/bogdi/.cache/uv/environments-v2/redis-lock-bug-63ce28e9b3b3ffe7/lib64/python3.13/site-packages/prefect_dask/client.py", line 64, in submit
    future = super().submit(
        wrapper_func,
    ...<10 lines>...
        **run_task_kwargs,
    )
  File "/home/bogdi/.cache/uv/environments-v2/redis-lock-bug-63ce28e9b3b3ffe7/lib64/python3.13/site-packages/distributed/client.py", line 2141, in submit
    expr = LLGExpr(
        {
    ...<6 lines>...
        }
    )
  File "/home/bogdi/.cache/uv/environments-v2/redis-lock-bug-63ce28e9b3b3ffe7/lib64/python3.13/site-packages/dask/_expr.py", line 72, in __new__
    inst._name
  File "/usr/lib64/python3.13/functools.py", line 1042, in __get__
    val = self.func(instance)
  File "/home/bogdi/.cache/uv/environments-v2/redis-lock-bug-63ce28e9b3b3ffe7/lib64/python3.13/site-packages/dask/_expr.py", line 522, in _name
    return self._funcname + "-" + self.deterministic_token
                                  ^^^^^^^^^^^^^^^^^^^^^^^^
  File "/home/bogdi/.cache/uv/environments-v2/redis-lock-bug-63ce28e9b3b3ffe7/lib64/python3.13/site-packages/dask/_expr.py", line 517, in deterministic_token
    self._determ_token = self.__dask_tokenize__()
                         ~~~~~~~~~~~~~~~~~~~~~~^^
  File "/home/bogdi/.cache/uv/environments-v2/redis-lock-bug-63ce28e9b3b3ffe7/lib64/python3.13/site-packages/dask/_expr.py", line 147, in __dask_tokenize__
    self._determ_token = _tokenize_deterministic(type(self), *self.operands)
                         ~~~~~~~~~~~~~~~~~~~~~~~^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/home/bogdi/.cache/uv/environments-v2/redis-lock-bug-63ce28e9b3b3ffe7/lib64/python3.13/site-packages/dask/tokenize.py", line 457, in _tokenize_deterministic
    return tokenize(*args, ensure_deterministic=True, **kwargs)
  File "/home/bogdi/.cache/uv/environments-v2/redis-lock-bug-63ce28e9b3b3ffe7/lib64/python3.13/site-packages/dask/tokenize.py", line 76, in tokenize
    return _tokenize(*args, **kwargs)
  File "/home/bogdi/.cache/uv/environments-v2/redis-lock-bug-63ce28e9b3b3ffe7/lib64/python3.13/site-packages/dask/tokenize.py", line 34, in _tokenize
    token: object = _normalize_seq_func(args)
                    ~~~~~~~~~~~~~~~~~~~^^^^^^
  File "/home/bogdi/.cache/uv/environments-v2/redis-lock-bug-63ce28e9b3b3ffe7/lib64/python3.13/site-packages/dask/tokenize.py", line 154, in _normalize_seq_func
    return tuple(map(_inner_normalize_token, seq))
  File "/home/bogdi/.cache/uv/environments-v2/redis-lock-bug-63ce28e9b3b3ffe7/lib64/python3.13/site-packages/dask/tokenize.py", line 147, in _inner_normalize_token
    return normalize_token(item)
  File "/home/bogdi/.cache/uv/environments-v2/redis-lock-bug-63ce28e9b3b3ffe7/lib64/python3.13/site-packages/dask/utils.py", line 772, in __call__
    return meth(arg, *args, **kwargs)
  File "/home/bogdi/.cache/uv/environments-v2/redis-lock-bug-63ce28e9b3b3ffe7/lib64/python3.13/site-packages/dask/tokenize.py", line 122, in normalize_dict
    return "dict", _normalize_seq_func(
                   ~~~~~~~~~~~~~~~~~~~^
        sorted(d.items(), key=lambda kv: str(kv[0]))
        ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
    )
    ^
  File "/home/bogdi/.cache/uv/environments-v2/redis-lock-bug-63ce28e9b3b3ffe7/lib64/python3.13/site-packages/dask/tokenize.py", line 154, in _normalize_seq_func
    return tuple(map(_inner_normalize_token, seq))
  File "/home/bogdi/.cache/uv/environments-v2/redis-lock-bug-63ce28e9b3b3ffe7/lib64/python3.13/site-packages/dask/tokenize.py", line 147, in _inner_normalize_token
    return normalize_token(item)
  File "/home/bogdi/.cache/uv/environments-v2/redis-lock-bug-63ce28e9b3b3ffe7/lib64/python3.13/site-packages/dask/utils.py", line 772, in __call__
    return meth(arg, *args, **kwargs)
  File "/home/bogdi/.cache/uv/environments-v2/redis-lock-bug-63ce28e9b3b3ffe7/lib64/python3.13/site-packages/dask/tokenize.py", line 161, in normalize_seq
    return type(seq).__name__, _normalize_seq_func(seq)
                               ~~~~~~~~~~~~~~~~~~~^^^^^
  File "/home/bogdi/.cache/uv/environments-v2/redis-lock-bug-63ce28e9b3b3ffe7/lib64/python3.13/site-packages/dask/tokenize.py", line 154, in _normalize_seq_func
    return tuple(map(_inner_normalize_token, seq))
  File "/home/bogdi/.cache/uv/environments-v2/redis-lock-bug-63ce28e9b3b3ffe7/lib64/python3.13/site-packages/dask/tokenize.py", line 147, in _inner_normalize_token
    return normalize_token(item)
  File "/home/bogdi/.cache/uv/environments-v2/redis-lock-bug-63ce28e9b3b3ffe7/lib64/python3.13/site-packages/dask/utils.py", line 772, in __call__
    return meth(arg, *args, **kwargs)
  File "/home/bogdi/.cache/uv/environments-v2/redis-lock-bug-63ce28e9b3b3ffe7/lib64/python3.13/site-packages/dask/tokenize.py", line 198, in normalize_object
    return method()
  File "/home/bogdi/.cache/uv/environments-v2/redis-lock-bug-63ce28e9b3b3ffe7/lib64/python3.13/site-packages/dask/_task_spec.py", line 707, in __dask_tokenize__
    return self._get_token()
           ~~~~~~~~~~~~~~~^^
  File "/home/bogdi/.cache/uv/environments-v2/redis-lock-bug-63ce28e9b3b3ffe7/lib64/python3.13/site-packages/dask/_task_spec.py", line 696, in _get_token
    self._token = tokenize(
                  ~~~~~~~~^
        (
        ^
    ...<4 lines>...
        )
        ^
    )
    ^
  File "/home/bogdi/.cache/uv/environments-v2/redis-lock-bug-63ce28e9b3b3ffe7/lib64/python3.13/site-packages/dask/tokenize.py", line 76, in tokenize
    return _tokenize(*args, **kwargs)
  File "/home/bogdi/.cache/uv/environments-v2/redis-lock-bug-63ce28e9b3b3ffe7/lib64/python3.13/site-packages/dask/tokenize.py", line 34, in _tokenize
    token: object = _normalize_seq_func(args)
                    ~~~~~~~~~~~~~~~~~~~^^^^^^
  File "/home/bogdi/.cache/uv/environments-v2/redis-lock-bug-63ce28e9b3b3ffe7/lib64/python3.13/site-packages/dask/tokenize.py", line 154, in _normalize_seq_func
    return tuple(map(_inner_normalize_token, seq))
  File "/home/bogdi/.cache/uv/environments-v2/redis-lock-bug-63ce28e9b3b3ffe7/lib64/python3.13/site-packages/dask/tokenize.py", line 147, in _inner_normalize_token
    return normalize_token(item)
  File "/home/bogdi/.cache/uv/environments-v2/redis-lock-bug-63ce28e9b3b3ffe7/lib64/python3.13/site-packages/dask/utils.py", line 772, in __call__
    return meth(arg, *args, **kwargs)
  File "/home/bogdi/.cache/uv/environments-v2/redis-lock-bug-63ce28e9b3b3ffe7/lib64/python3.13/site-packages/dask/tokenize.py", line 161, in normalize_seq
    return type(seq).__name__, _normalize_seq_func(seq)
                               ~~~~~~~~~~~~~~~~~~~^^^^^
  File "/home/bogdi/.cache/uv/environments-v2/redis-lock-bug-63ce28e9b3b3ffe7/lib64/python3.13/site-packages/dask/tokenize.py", line 154, in _normalize_seq_func
    return tuple(map(_inner_normalize_token, seq))
  File "/home/bogdi/.cache/uv/environments-v2/redis-lock-bug-63ce28e9b3b3ffe7/lib64/python3.13/site-packages/dask/tokenize.py", line 147, in _inner_normalize_token
    return normalize_token(item)
  File "/home/bogdi/.cache/uv/environments-v2/redis-lock-bug-63ce28e9b3b3ffe7/lib64/python3.13/site-packages/dask/utils.py", line 772, in __call__
    return meth(arg, *args, **kwargs)
  File "/home/bogdi/.cache/uv/environments-v2/redis-lock-bug-63ce28e9b3b3ffe7/lib64/python3.13/site-packages/dask/tokenize.py", line 212, in normalize_object
    _maybe_raise_nondeterministic(
    ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~^
        f"Object {o!r} cannot be deterministically hashed. This likely "
        ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
        "indicates that the object cannot be serialized deterministically."
        ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
    )
    ^
  File "/home/bogdi/.cache/uv/environments-v2/redis-lock-bug-63ce28e9b3b3ffe7/lib64/python3.13/site-packages/dask/tokenize.py", line 89, in _maybe_raise_nondeterministic
    raise TokenizationError(msg)
dask.tokenize.TokenizationError: Object <function cached_task at 0x7f8addd1f9c0> cannot be deterministically hashed. This likely indicates that the object cannot be serialized deterministically.
@bogdibota bogdibota added the bug Something isn't working label May 10, 2025
@zzstoatzz
Copy link
Collaborator

zzstoatzz commented May 10, 2025

hi @bogdibota - thanks for the issue! this should be fixed in the PR linked above

can you try your reproduction against this branch? for example, copying everything besides the inline script metadata

pbpaste | uv run \
--with git+https://github.com/prefecthq/prefect.git@redis-locking-serialization#subdirectory=src/integrations/prefect-redis \
--with git+https://github.com/prefecthq/prefect.git@redis-locking-serialization#subdirectory=src/integrations/prefect-ray \
--with git+https://github.com/prefecthq/prefect.git@redis-locking-serialization#subdirectory=src/integrations/prefect-dask -

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
bug Something isn't working
Projects
None yet
Development

Successfully merging a pull request may close this issue.

2 participants