Skip to content

Result Persistence Doesn't Work With run_deployment Submitted Async #17557

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
kr-hansen opened this issue Mar 21, 2025 · 2 comments
Open

Result Persistence Doesn't Work With run_deployment Submitted Async #17557

kr-hansen opened this issue Mar 21, 2025 · 2 comments
Labels
bug Something isn't working

Comments

@kr-hansen
Copy link

Bug summary

When I try using result persistence for submitting flows in parallel using run_deployment, I cannot get the results back from those deployment using the result_persistence and result_storage parameters. I only get back coroutines without being able to get the results.

If I run/submit the same flows to run_deployment not in an async fashion, I can get the results back just fine. It is just a function of how the async functionality works in the flow.

Minimum Reproducible Example

import asyncio
from typing import List

from prefect import flow, task
from prefect.client.schemas import FlowRun
from prefect.deployments import run_deployment
from prefect_gcp import GcsBucket
from prefect_ray import RayTaskRunner

result_storage = GcsBucket(
    bucket="my-bucket",
    bucket_folder="Prefect3Test",
)
result_storage.save(name="prefect3test-results", overwrite=True, _sync=True)

@flow(
    log_prints=True,
    result_storage=result_storage,
    persist_result=True,
    retries=1,
    name="persist_test",
    task_runner=RayTaskRunner,
)
def persist_test(some_param: int):
    success_task = passing_task(some_param = some_param)
    return success_task

@task
def passing_task(some_param: int = 42, parent_run_id=None):
    print("This task should be skipped on retry")
    return some_param

@flow(log_prints=True, task_runner=RayTaskRunner)
async def parent_flow():
    # run the run_deployment_of_child_flow task via the ThreadPoolTaskRunner and wait for the
    #  result and return the result

    runs: List[FlowRun] = await asyncio.gather(
        *[
            run_deployment(
                name="persist_test/persist_test",
                parameters={"some_param": val},
            ) 
            for val in range(2)
        ]
    )

    print([run.state.result(fetch=True) for run in runs])

if __name__ == "__main__":
    asyncio.run(parent_flow(), debug=True)

What I get in the final print statement is [<coroutine object sync_compatible.<locals>.coroutine_wrapper.<locals>.ctx_call at 0x11f2cbb40>, <coroutine object sync_compatible.<locals>.coroutine_wrapper.<locals>.ctx_call at 0x128325b40>].

If I drop the await asyncio.gather and just get a list of flow runs, then I get back [0, 1] as I'd expect.

Is there a better way to submit deployments in parallel instead of in series than using asyncio.gather? Or is there a way to make it so I can get results back from these deployments in an async manner?

Version info

Version:             3.2.13
API version:         0.8.4
Python version:      3.11.6
Git commit:          12800297
Built:               Fri, Mar 14, 2025 8:37 PM
OS/Arch:             darwin/arm64
Profile:             local
Server type:         server
Pydantic version:    2.10.6
Integrations:
  prefect-gcp:       0.6.4
  prefect-ray:       0.4.3

Additional context

I wanted to confirm prefect-ray wasn't the main cause here, and I'm pretty sure it isn't. I kept it in the example, but it could be dropped for reproducing the problem most likely.

@kr-hansen kr-hansen added the bug Something isn't working label Mar 21, 2025
@desertaxle
Copy link
Member

Hey @kr-hansen! Are you able to get the results back if you use asyncio.gather when retrieving the results? The .result method is a little weird. It is async when called in an async context with fetch=True, so I'd expect that you need to await those calls.

@zzstoatzz
Copy link
Collaborator

related to #15008

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
bug Something isn't working
Projects
None yet
Development

No branches or pull requests

3 participants