Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 5 additions & 4 deletions bindu/server/scheduler/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@

from pydantic import Discriminator
from typing_extensions import Self, TypedDict
from opentelemetry.trace import Span

from bindu.common.protocol.types import TaskIdParams, TaskSendParams
from bindu.utils.logging import get_logger
Expand Down Expand Up @@ -64,14 +65,14 @@ def receive_task_operations(self) -> AsyncIterator[TaskOperation]:
class _TaskOperation(TypedDict, Generic[OperationT, ParamsT]):
"""A task operation.

Refactored to use primitive trace_id/span_id instead of a live OpenTelemetry
Span object to support distributed JSON serialization safely.
Carries a live OpenTelemetry Span so the worker can restore trace context.
The Redis scheduler uses a separate trace_id/span_id serialization approach
(see redis_scheduler.py) since live Span objects cannot cross process boundaries.
"""

operation: OperationT
params: ParamsT
trace_id: str | None
span_id: str | None
_current_span: Span


_RunTask = _TaskOperation[Literal["run"], TaskSendParams]
Expand Down
16 changes: 7 additions & 9 deletions bindu/server/scheduler/memory_scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,12 @@

from __future__ import annotations as _annotations

import math
from collections.abc import AsyncIterator
from contextlib import AsyncExitStack
from typing import Any

import anyio
from opentelemetry.trace import get_current_span

from bindu.common.protocol.types import TaskIdParams, TaskSendParams
from bindu.server.scheduler.base import (
Expand All @@ -20,7 +20,6 @@
)
from bindu.utils.logging import get_logger
from bindu.utils.retry import retry_scheduler_operation
from bindu.utils.tracing import get_trace_context

logger = get_logger("bindu.server.scheduler.memory_scheduler")

Expand All @@ -38,12 +37,12 @@ async def __aenter__(self):
self.aexit_stack = AsyncExitStack()
await self.aexit_stack.__aenter__()

# FIX: Added math.inf to create a buffered stream.
# Without this, the stream defaults to 0 (unbuffered), which causes
# the API server to deadlock/hang waiting for a worker to receive the task.
# Buffer of 100 prevents deadlock: without buffering the sender blocks
# until a worker is ready to receive, which stalls the API server.
# math.inf was previously used here but removed to restore backpressure.
self._write_stream, self._read_stream = anyio.create_memory_object_stream[
TaskOperation
](math.inf)
](100)
await self.aexit_stack.enter_async_context(self._read_stream)
await self.aexit_stack.enter_async_context(self._write_stream)

Expand All @@ -59,16 +58,15 @@ async def _send_operation(
operation: str,
params: TaskSendParams | TaskIdParams,
) -> None:
"""Send task operation with trace context.
"""Send task operation with live span for trace context.

Args:
operation_class: The operation class to instantiate
operation: Operation type string
params: Task parameters
"""
trace_id, span_id = get_trace_context()
task_op = operation_class(
operation=operation, params=params, trace_id=trace_id, span_id=span_id
operation=operation, params=params, _current_span=get_current_span()
)
await self._write_stream.send(task_op)

Expand Down