diff --git a/bindu/server/scheduler/base.py b/bindu/server/scheduler/base.py index fd06e878..8e98b1f5 100644 --- a/bindu/server/scheduler/base.py +++ b/bindu/server/scheduler/base.py @@ -8,6 +8,7 @@ from pydantic import Discriminator from typing_extensions import Self, TypedDict +from opentelemetry.trace import Span from bindu.common.protocol.types import TaskIdParams, TaskSendParams from bindu.utils.logging import get_logger @@ -64,14 +65,14 @@ def receive_task_operations(self) -> AsyncIterator[TaskOperation]: class _TaskOperation(TypedDict, Generic[OperationT, ParamsT]): """A task operation. - Refactored to use primitive trace_id/span_id instead of a live OpenTelemetry - Span object to support distributed JSON serialization safely. + Carries a live OpenTelemetry Span so the worker can restore trace context. + The Redis scheduler uses a separate trace_id/span_id serialization approach + (see redis_scheduler.py) since live Span objects cannot cross process boundaries. """ operation: OperationT params: ParamsT - trace_id: str | None - span_id: str | None + _current_span: Span _RunTask = _TaskOperation[Literal["run"], TaskSendParams] diff --git a/bindu/server/scheduler/memory_scheduler.py b/bindu/server/scheduler/memory_scheduler.py index 39070332..609da0b4 100644 --- a/bindu/server/scheduler/memory_scheduler.py +++ b/bindu/server/scheduler/memory_scheduler.py @@ -2,12 +2,12 @@ from __future__ import annotations as _annotations -import math from collections.abc import AsyncIterator from contextlib import AsyncExitStack from typing import Any import anyio +from opentelemetry.trace import get_current_span from bindu.common.protocol.types import TaskIdParams, TaskSendParams from bindu.server.scheduler.base import ( @@ -20,7 +20,6 @@ ) from bindu.utils.logging import get_logger from bindu.utils.retry import retry_scheduler_operation -from bindu.utils.tracing import get_trace_context logger = get_logger("bindu.server.scheduler.memory_scheduler") @@ -38,12 +37,12 @@ async def __aenter__(self): self.aexit_stack = AsyncExitStack() await self.aexit_stack.__aenter__() - # FIX: Added math.inf to create a buffered stream. - # Without this, the stream defaults to 0 (unbuffered), which causes - # the API server to deadlock/hang waiting for a worker to receive the task. + # Buffer of 100 prevents deadlock: without buffering the sender blocks + # until a worker is ready to receive, which stalls the API server. + # math.inf was previously used here but removed to restore backpressure. self._write_stream, self._read_stream = anyio.create_memory_object_stream[ TaskOperation - ](math.inf) + ](100) await self.aexit_stack.enter_async_context(self._read_stream) await self.aexit_stack.enter_async_context(self._write_stream) @@ -59,16 +58,15 @@ async def _send_operation( operation: str, params: TaskSendParams | TaskIdParams, ) -> None: - """Send task operation with trace context. + """Send task operation with live span for trace context. Args: operation_class: The operation class to instantiate operation: Operation type string params: Task parameters """ - trace_id, span_id = get_trace_context() task_op = operation_class( - operation=operation, params=params, trace_id=trace_id, span_id=span_id + operation=operation, params=params, _current_span=get_current_span() ) await self._write_stream.send(task_op)