From 9e829c77c5d2f0006d99ebb53ff6529074f2436c Mon Sep 17 00:00:00 2001 From: chandan-1427 Date: Wed, 18 Mar 2026 21:24:04 +0530 Subject: [PATCH] fix(streaming): resolve architectural flaws in SSE streaming --- bindu/server/endpoints/a2a_protocol.py | 6 +++--- bindu/server/handlers/message_handlers.py | 7 +++++-- bindu/utils/task_telemetry.py | 4 ++++ 3 files changed, 12 insertions(+), 5 deletions(-) diff --git a/bindu/server/endpoints/a2a_protocol.py b/bindu/server/endpoints/a2a_protocol.py index 2951eb4d..fb4298c6 100644 --- a/bindu/server/endpoints/a2a_protocol.py +++ b/bindu/server/endpoints/a2a_protocol.py @@ -6,7 +6,7 @@ from typing import Any from starlette.requests import Request -from starlette.responses import Response +from starlette.responses import Response, StreamingResponse from bindu.common.protocol.types import ( InternalError, @@ -172,8 +172,8 @@ async def agent_run_endpoint(app: BinduApplication, request: Request) -> Respons logger.debug(f"A2A response to {client_ip}: method={method}, id={request_id}") - # Streaming handlers return a Starlette Response directly - if isinstance(jsonrpc_response, Response): + # FIX: Strictly check for StreamingResponse to prevent bypassing JSON-RPC + if isinstance(jsonrpc_response, StreamingResponse): if x402_is_requested(request): jsonrpc_response = x402_add_header(jsonrpc_response) return jsonrpc_response diff --git a/bindu/server/handlers/message_handlers.py b/bindu/server/handlers/message_handlers.py index 184a864b..e4758fb6 100644 --- a/bindu/server/handlers/message_handlers.py +++ b/bindu/server/handlers/message_handlers.py @@ -228,7 +228,9 @@ async def stream_generator(): return return + # FIX: Exponential backoff to prevent DB hammering await anyio.sleep(poll_interval) + poll_interval = min(poll_interval * 1.5, 2.0) except cancelled_exc: logger.debug(f"Streaming client disconnected for task {task['id']}") return @@ -264,15 +266,16 @@ async def stream_generator(): exc_info=True, ) + # FIX: Force terminal "failed" state so the client UI doesn't hang forever error_event = { "kind": "status-update", "task_id": str(task["id"]), "context_id": str(context_id), "status": { - "state": current_state, + "state": "failed", "timestamp": timestamp, }, - "final": current_state in app_settings.agent.terminal_states, + "final": True, "error": str(e), } yield self._sse_event(error_event) diff --git a/bindu/utils/task_telemetry.py b/bindu/utils/task_telemetry.py index fd4cb411..b1d4c476 100644 --- a/bindu/utils/task_telemetry.py +++ b/bindu/utils/task_telemetry.py @@ -201,6 +201,10 @@ async def wrapper(self, request, *args, **kwargs): # Decrement active tasks for completion/cancellation operations if operation in ["cancel_task"]: active_tasks.add(-1, {"operation": "cancel"}) + + # FIX: Ensure creation operations decrement when the request cycle completes + elif operation in create_operations: + active_tasks.add(-1, {"operation": "completed"}) return result