Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
66 changes: 38 additions & 28 deletions temporalio/contrib/openai_agents/_trace_interceptor.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

from __future__ import annotations

import contextvars
import random
import uuid
from contextlib import contextmanager
Expand Down Expand Up @@ -400,48 +401,57 @@ async def signal_external_workflow(
def start_activity(
self, input: temporalio.worker.StartActivityInput
) -> temporalio.workflow.ActivityHandle:
trace = get_trace_provider().get_current_trace()
span: Optional[Span] = None
if trace:
span = custom_span(
name="temporal:startActivity", data={"activity": input.activity}
)
span.start(mark_as_current=True)

set_header_from_context(input, temporalio.workflow.payload_converter())
handle = self.next.start_activity(input)
ctx = contextvars.copy_context()
span = ctx.run(
self._create_span,
name="temporal:startActivity",
data={"activity": input.activity},
input=input,
)
handle = ctx.run(self.next.start_activity, input)
Comment on lines +405 to +411
Copy link
Member

@cretz cretz Oct 29, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think you have to run the things in the copied context (and add stack trace layers and such) just to be able to have the done callback use copied context. A quick glance at CPython shows that's what they're doing in add_done_callback when context is None (because copy_context is cheap and you only need an the copy for it to work properly).

Can just copy context up front and provide to add_done_callback without changing how these things are run. Granted all of that span stuff should be done before the copy_context. Also, in Python 3.12, there is actually a handle.get_context() you could use for add_done_callback, but understood that is newer than our oldest allowed Python version.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That doesn't appear to fix the problem. If the context is copied up front and then the subsequent operations are not run inside it, it does still fails to detach as the copy and the original are not the same context.

Copy link
Member

@cretz cretz Oct 29, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The context vars are the same though (assuming you copy the context after you mutate things on the context). Is there somewhere inside OpenAI that validates that a context is the exact instance? If you look at CPython code for add_done_callback at https://github.com/python/cpython/blob/v3.14.0/Lib/asyncio/futures.py#L236-L237, the default also calls copy_context because that's the normal thing to do to "get the current context" to execute under. How does that work today when context is None for add_done_callback? Or are you saying add_done_callback does not work today for OpenAI's span.finish when using default parameter for context?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Failed to detach context
Traceback (most recent call last):
  File "/Users/tconley/samples-python/.venv/lib/python3.13/site-packages/opentelemetry/context/__init__.py", line 155, in detach
    _RUNTIME_CONTEXT.detach(token)
    ~~~~~~~~~~~~~~~~~~~~~~~^^^^^^^
  File "/Users/tconley/samples-python/.venv/lib/python3.13/site-packages/opentelemetry/context/contextvars_context.py", line 53, in detach
    self._current_context.reset(token)
    ~~~~~~~~~~~~~~~~~~~~~~~~~~~^^^^^^^
ValueError: <Token var=<ContextVar name='current_context' default={} at 0x105566d90> at 0x10bb537c0> was created in a different Context

This still occurs with your suggestion. I think that despite the contexts having the same values, they aren't the same. That's my best guess anyway.

Copy link
Member

@cretz cretz Oct 29, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hrmm, how did add_done_callback work with span.finish before this PR since that calls span.finish on a copied context I wonder (since that is the default)? Or did span.finish never work? I think this curiosity is one of those worth understanding. I can make my own small replications to develop understanding if necessary.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It never 'worked' in the sense that this issue was always here, but it only appears if you use a different tracing provider than the default. OpenAI's default handles it fine, but the instrumentor from the report uses otel which has this context detach log.

Copy link
Member

@cretz cretz Oct 30, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hrmm, OTel already won't work for add_done_callback for workflows since they are distributed. I think we need to require being able to finish a span in a different context (and different machine) from where it was created or take a different approach.

(I am curious how OTel is used here anyways due to inherent OTel limitations concerning deterministic span/trace IDs and such, though I understand that's a different topic, but we may need to suggest the start-and-stop-span-immediately approach for OTel-based OpenAI tracing that we do for our other OTel-based tracing)

if span:
handle.add_done_callback(lambda _: span.finish()) # type: ignore
handle.add_done_callback(lambda _: span.finish(), context=ctx)

return handle

async def start_child_workflow(
self, input: temporalio.worker.StartChildWorkflowInput
) -> temporalio.workflow.ChildWorkflowHandle:
trace = get_trace_provider().get_current_trace()
span: Optional[Span] = None
if trace:
span = custom_span(
name="temporal:startChildWorkflow", data={"workflow": input.workflow}
)
span.start(mark_as_current=True)
set_header_from_context(input, temporalio.workflow.payload_converter())
handle = await self.next.start_child_workflow(input)
ctx = contextvars.copy_context()
span = ctx.run(
self._create_span,
name="temporal:startChildWorkflow",
data={"workflow": input.workflow},
input=input,
)
handle = await ctx.run(self.next.start_child_workflow, input)
if span:
handle.add_done_callback(lambda _: span.finish()) # type: ignore
handle.add_done_callback(lambda _: span.finish(), context=ctx)
return handle

def start_local_activity(
self, input: temporalio.worker.StartLocalActivityInput
) -> temporalio.workflow.ActivityHandle:
ctx = contextvars.copy_context()
span = ctx.run(
self._create_span,
name="temporal:startLocalActivity",
data={"activity": input.activity},
input=input,
)
handle = ctx.run(self.next.start_local_activity, input)
if span:
handle.add_done_callback(lambda _: span.finish(), context=ctx)
return handle

@staticmethod
def _create_span(
name: str, data: dict[str, Any], input: _InputWithHeaders
) -> Optional[Span]:
trace = get_trace_provider().get_current_trace()
span: Optional[Span] = None
if trace:
span = custom_span(
name="temporal:startLocalActivity", data={"activity": input.activity}
)
span = custom_span(name=name, data=data)
span.start(mark_as_current=True)
set_header_from_context(input, temporalio.workflow.payload_converter())
handle = self.next.start_local_activity(input)
if span:
handle.add_done_callback(lambda _: span.finish()) # type: ignore
return handle
return span
83 changes: 77 additions & 6 deletions tests/contrib/openai_agents/test_openai_tracing.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,10 @@
from datetime import timedelta
from typing import Any

from agents import Span, Trace, TracingProcessor
from agents import Span, Trace, TracingProcessor, trace
from agents.tracing import get_trace_provider

from temporalio import workflow
from temporalio.client import Client
from temporalio.contrib.openai_agents.testing import (
AgentEnvironment,
Expand All @@ -21,6 +22,10 @@ class MemoryTracingProcessor(TracingProcessor):
trace_events: list[tuple[Trace, bool]] = []
span_events: list[tuple[Span, bool]] = []

def __init__(self):
self.trace_events = []
self.span_events = []

def on_trace_start(self, trace: Trace) -> None:
self.trace_events.append((trace, True))

Expand All @@ -40,6 +45,12 @@ def force_flush(self) -> None:
pass


def paired_span(a: tuple[Span[Any], bool], b: tuple[Span[Any], bool]) -> None:
assert a[0].trace_id == b[0].trace_id
assert a[1]
assert not b[1]


async def test_tracing(client: Client):
async with AgentEnvironment(model=research_mock_model()) as env:
client = env.applied_on_client(client)
Expand Down Expand Up @@ -71,11 +82,6 @@ async def test_tracing(client: Client):
assert processor.trace_events[0][1]
assert not processor.trace_events[1][1]

def paired_span(a: tuple[Span[Any], bool], b: tuple[Span[Any], bool]) -> None:
assert a[0].trace_id == b[0].trace_id
assert a[1]
assert not b[1]

# Initial planner spans - There are only 3 because we don't make an actual model call
paired_span(processor.span_events[0], processor.span_events[5])
assert (
Expand Down Expand Up @@ -142,3 +148,68 @@ def paired_span(a: tuple[Span[Any], bool], b: tuple[Span[Any], bool]) -> None:
processor.span_events[-4][0].span_data.export().get("name")
== "temporal:executeActivity"
)


@workflow.defn
class ChildWorkflow:
@workflow.run
async def run(self) -> str:
return "A"


@workflow.defn
class ParentWorkflow:
@workflow.run
async def run(self) -> str:
with trace("Parent trace"):
return await workflow.execute_child_workflow(ChildWorkflow.run)


async def test_tracing_child_workflow(client: Client):
async with AgentEnvironment(model=research_mock_model()) as env:
client = env.applied_on_client(client)

provider = get_trace_provider()

processor = MemoryTracingProcessor()
provider.set_processors([processor])

async with new_worker(
client,
ParentWorkflow,
ChildWorkflow,
) as worker:
workflow_handle = await client.start_workflow(
ParentWorkflow.run,
id=f"openai-tracing-child-workflow-{uuid.uuid4()}",
task_queue=worker.task_queue,
execution_timeout=timedelta(seconds=120),
)
result = await workflow_handle.result()

# There is one closed root trace
assert len(processor.trace_events) == 2
assert (
processor.trace_events[0][0].trace_id
== processor.trace_events[1][0].trace_id
)
assert processor.trace_events[0][1]
assert not processor.trace_events[1][1]

for span, _ in processor.span_events:
print(
f"Span: {span.span_id}, parent: {span.parent_id}, data: {span.span_data.export()}"
)

# Two spans - startChildWorkflow > executeWorkflow
paired_span(processor.span_events[0], processor.span_events[3])
assert (
processor.span_events[0][0].span_data.export().get("name")
== "temporal:startChildWorkflow"
)

paired_span(processor.span_events[1], processor.span_events[2])
assert (
processor.span_events[1][0].span_data.export().get("name")
== "temporal:executeWorkflow"
)
Loading