-
Notifications
You must be signed in to change notification settings - Fork 1
Expand file tree
/
Copy path25_async_runtime_tutorial.py
More file actions
83 lines (70 loc) · 2.13 KB
/
25_async_runtime_tutorial.py
File metadata and controls
83 lines (70 loc) · 2.13 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
# %% [markdown]
# # 25 Async Runtime Basics
# Notebook-style companion for the async runtime tutorial.
#
# This file shows the async runtime contract in three small steps:
# 1. an async resolver handler
# 2. a `StepContext` call
# 3. `AsyncWorkflowRuntime` construction
# %%
import asyncio
from _helpers import banner, show
from kogwistar.runtime import AsyncMappingStepResolver, AsyncWorkflowRuntime, StepContext
from kogwistar.runtime.models import RunSuccess
# %%
banner("Define async resolver.")
resolver = AsyncMappingStepResolver()
@resolver.register("hello_async")
async def hello_async(ctx: StepContext):
return RunSuccess(
conversation_node_id=None,
state_update=[
("u", {"hello": "async"}),
("a", {"run_id": ctx.run_id, "step_seq": ctx.step_seq}),
],
)
ctx = StepContext(
run_id="run-async-tutorial",
workflow_id="wf-async-tutorial",
workflow_node_id="node-async-tutorial",
op="hello_async",
token_id="tok-async-tutorial",
attempt=1,
step_seq=1,
cache_dir=None,
state={},
)
result = asyncio.run(resolver.resolve_async("hello_async")(ctx))
show(
"resolver call",
{
"ops": sorted(resolver.ops),
"status": result.status,
"state_update": result.state_update,
},
)
# %%
banner("Construct async runtime contract.")
runtime = AsyncWorkflowRuntime(
workflow_engine={"name": "demo-workflow-engine"},
conversation_engine={"name": "demo-conversation-engine"},
step_resolver=resolver,
predicate_registry={},
trace=False,
)
show(
"runtime contract",
{
"runtime_class": type(runtime).__name__,
"sync_runtime_class": type(runtime.sync_runtime).__name__,
"workflow_engine": runtime.sync_runtime.workflow_engine,
"conversation_engine": runtime.sync_runtime.conversation_engine,
"step_context_type": runtime.step_context_type.__name__,
"step_result_type": str(runtime.step_result_type),
},
)
# %%
# Invariant:
# - async resolver returns same StepRunResult shape
# - async runtime wraps sync runtime contract
# - execution style changes, workflow meaning does not