From c11454042fc5c6fe847ce3c452437ad1eebd7da0 Mon Sep 17 00:00:00 2001 From: Co-vengers Date: Tue, 10 Mar 2026 22:07:46 +0530 Subject: [PATCH 1/3] fix(workers): reconstruct trace context from serialized span IDs (#353) Worker accessed task_operation["_current_span"] but scheduler now sends primitive trace_id/span_id strings. Add _reconstruct_span() helper to rebuild a NonRecordingSpan from hex-encoded IDs with graceful fallback. --- bindu/server/workers/base.py | 48 +++++++++++++++++++++++++++++++++--- 1 file changed, 45 insertions(+), 3 deletions(-) diff --git a/bindu/server/workers/base.py b/bindu/server/workers/base.py index 1ff58950..8162ba73 100644 --- a/bindu/server/workers/base.py +++ b/bindu/server/workers/base.py @@ -27,6 +27,12 @@ import anyio from opentelemetry.trace import get_tracer, use_span +from opentelemetry.trace.span import ( + INVALID_SPAN_CONTEXT, + NonRecordingSpan, + SpanContext, + TraceFlags, +) from bindu.common.protocol.types import Artifact, Message, TaskIdParams, TaskSendParams from bindu.server.scheduler.base import Scheduler @@ -37,6 +43,38 @@ logger = get_logger(__name__) +def _reconstruct_span( + trace_id: str | None, span_id: str | None +) -> NonRecordingSpan: + """Reconstruct a NonRecordingSpan from serialized trace_id/span_id strings. + + Used to restore OpenTelemetry trace context after the scheduler serializes + the span into primitive strings (required for Redis JSON serialization). + + Args: + trace_id: Hex-encoded trace ID (32 chars) or None + span_id: Hex-encoded span ID (16 chars) or None + + Returns: + A NonRecordingSpan that carries the trace context for correlation + """ + if trace_id and span_id: + try: + ctx = SpanContext( + trace_id=int(trace_id, 16), + span_id=int(span_id, 16), + is_remote=True, + trace_flags=TraceFlags(TraceFlags.SAMPLED), + ) + return NonRecordingSpan(ctx) + except (ValueError, TypeError): + logger.warning( + f"Invalid trace context: trace_id={trace_id}, span_id={span_id}" + ) + # Return a no-op span with invalid context as fallback + return NonRecordingSpan(INVALID_SPAN_CONTEXT) + + @dataclass class Worker(ABC): """Abstract base worker for A2A protocol task execution. @@ -104,7 +142,7 @@ async def _handle_task_operation(self, task_operation: dict[str, Any]) -> None: """Dispatch task operation to appropriate handler. Args: - task_operation: Operation dict with 'operation', 'params', and '_current_span' + task_operation: Operation dict with 'operation', 'params', 'trace_id', 'span_id' Supported Operations: - run: Execute a task @@ -124,8 +162,12 @@ async def _handle_task_operation(self, task_operation: dict[str, Any]) -> None: } try: - # Preserve trace context from scheduler - with use_span(task_operation["_current_span"]): + # Reconstruct trace context from serialized trace_id/span_id + span = _reconstruct_span( + task_operation.get("trace_id"), + task_operation.get("span_id"), + ) + with use_span(span): with tracer.start_as_current_span( f"{task_operation['operation']} task", attributes={"logfire.tags": ["bindu"]}, From 526461692f54a034bada7708601d3aa125a3deaf Mon Sep 17 00:00:00 2001 From: Co-vengers Date: Tue, 10 Mar 2026 22:07:59 +0530 Subject: [PATCH 2/3] fix(scheduler): replace unbounded stream buffer with bounded limit Replace math.inf buffer size with a constant of 100 to prevent unbounded memory growth while still allowing task enqueue before the worker loop is ready. --- bindu/server/scheduler/memory_scheduler.py | 12 +++++++----- 1 file changed, 7 insertions(+), 5 deletions(-) diff --git a/bindu/server/scheduler/memory_scheduler.py b/bindu/server/scheduler/memory_scheduler.py index 7aa64493..c5c8acd0 100644 --- a/bindu/server/scheduler/memory_scheduler.py +++ b/bindu/server/scheduler/memory_scheduler.py @@ -2,7 +2,6 @@ from __future__ import annotations as _annotations -import math from collections.abc import AsyncIterator from contextlib import AsyncExitStack from typing import Any @@ -24,6 +23,10 @@ logger = get_logger("bindu.server.scheduler.memory_scheduler") +# Bounded buffer prevents unbounded memory growth while allowing the API +# handler to enqueue a task before the worker loop is ready to receive. +_TASK_QUEUE_BUFFER_SIZE = 100 + def _get_trace_context() -> tuple[str | None, str | None]: """Extract primitive trace context from the live OpenTelemetry span.""" @@ -46,12 +49,11 @@ async def __aenter__(self): self.aexit_stack = AsyncExitStack() await self.aexit_stack.__aenter__() - # FIX: Added math.inf to create a buffered stream. - # Without this, the stream defaults to 0 (unbuffered), which causes - # the API server to deadlock/hang waiting for a worker to receive the task. + # Bounded buffer allows the API handler to enqueue tasks before the + # worker loop is ready while preventing unbounded memory growth. self._write_stream, self._read_stream = anyio.create_memory_object_stream[ TaskOperation - ](math.inf) + ](_TASK_QUEUE_BUFFER_SIZE) await self.aexit_stack.enter_async_context(self._read_stream) await self.aexit_stack.enter_async_context(self._write_stream) From 7f08f052ea551bc2eac995501568e4e3ccb9777c Mon Sep 17 00:00:00 2001 From: Co-vengers Date: Tue, 10 Mar 2026 22:08:14 +0530 Subject: [PATCH 3/3] test: add opentelemetry.trace.span stubs for NonRecordingSpan imports Add SpanContext, TraceFlags, NonRecordingSpan, and INVALID_SPAN_CONTEXT mocks. Register opentelemetry.trace.span submodule so worker imports resolve in the test environment. --- tests/conftest.py | 39 +++++++++++++++++++++++++++++++++++++++ 1 file changed, 39 insertions(+) diff --git a/tests/conftest.py b/tests/conftest.py index 5a8adb0c..f5264c2f 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -8,7 +8,27 @@ ot_trace = ModuleType("opentelemetry.trace") +class _SpanContext: + """Mock SpanContext for testing.""" + + def __init__(self, trace_id=0, span_id=0, is_remote=False, trace_flags=None): + self.trace_id = trace_id + self.span_id = span_id + self.is_remote = is_remote + self.trace_flags = trace_flags or _TraceFlags(0) + self.is_valid = trace_id != 0 and span_id != 0 + + +class _TraceFlags(int): + """Mock TraceFlags for testing.""" + + SAMPLED = 1 + + class _Span: + def __init__(self, context=None): + self._context = context or _SpanContext() + def is_recording(self): return True @@ -24,6 +44,16 @@ def set_attribute(self, *args, **kwargs): # noqa: D401 def set_status(self, *args, **kwargs): # noqa: D401 return None + def get_span_context(self): + return self._context + + +# NonRecordingSpan is the same as _Span for testing +_NonRecordingSpan = _Span + +# Invalid span context constant +_INVALID_SPAN_CONTEXT = _SpanContext(trace_id=0, span_id=0, is_remote=False) + def get_current_span(): # noqa: D401 """Return a mock span for testing without OpenTelemetry.""" @@ -63,6 +93,14 @@ def __init__(self, *args, **kwargs): # noqa: D401, ARG002 ot_trace.Span = _Span # type: ignore[attr-defined] ot_trace.use_span = lambda span: _SpanCtx() # type: ignore[attr-defined] +# --- OpenTelemetry trace.span submodule stub --- +ot_trace_span = ModuleType("opentelemetry.trace.span") +ot_trace_span.NonRecordingSpan = _NonRecordingSpan # type: ignore[attr-defined] +ot_trace_span.SpanContext = _SpanContext # type: ignore[attr-defined] +ot_trace_span.TraceFlags = _TraceFlags # type: ignore[attr-defined] +ot_trace_span.INVALID_SPAN_CONTEXT = _INVALID_SPAN_CONTEXT # type: ignore[attr-defined] +ot_trace.span = ot_trace_span # type: ignore[attr-defined] + # Build minimal opentelemetry root and metrics stub op_root = ModuleType("opentelemetry") @@ -107,6 +145,7 @@ def get_meter(name: str): # noqa: D401, ARG001 sys.modules["opentelemetry"] = op_root sys.modules["opentelemetry.trace"] = ot_trace +sys.modules["opentelemetry.trace.span"] = ot_trace_span sys.modules["opentelemetry.metrics"] = metrics_mod