From ed5fb49660f190e28c657e25651d6792380285d1 Mon Sep 17 00:00:00 2001 From: Oleg Ovcharuk Date: Mon, 25 Aug 2025 12:02:53 +0300 Subject: [PATCH] Fix stream next error handling --- ydb/_utilities.py | 12 ++++++++++-- ydb/query/session.py | 6 ++++-- ydb/query/transaction.py | 6 ++++-- 3 files changed, 18 insertions(+), 6 deletions(-) diff --git a/ydb/_utilities.py b/ydb/_utilities.py index 8496dbd9..8d0edfc1 100644 --- a/ydb/_utilities.py +++ b/ydb/_utilities.py @@ -149,9 +149,10 @@ def __next__(self): class SyncResponseIterator(object): - def __init__(self, it, wrapper): + def __init__(self, it, wrapper, error_converter=None): self.it = it self.wrapper = wrapper + self.error_converter = error_converter def cancel(self): self.it.cancel() @@ -161,9 +162,16 @@ def __iter__(self): return self def _next(self): - res = self.wrapper(next(self.it)) + try: + res = self.wrapper(next(self.it)) + except BaseException as e: + if self.error_converter: + raise self.error_converter(e) from e + raise e + if res is not None: return res + return self._next() def next(self): diff --git a/ydb/query/session.py b/ydb/query/session.py index 5cfbdc6c..2e081b83 100644 --- a/ydb/query/session.py +++ b/ydb/query/session.py @@ -19,6 +19,7 @@ from .transaction import QueryTxContext from .._constants import DEFAULT_INITIAL_RESPONSE_TIMEOUT, DEFAULT_LONG_STREAM_TIMEOUT +from .._errors import stream_error_converter logger = logging.getLogger(__name__) @@ -362,12 +363,13 @@ def execute( ) return base.SyncResponseContextIterator( - stream_it, - lambda resp: base.wrap_execute_query_response( + it=stream_it, + wrapper=lambda resp: base.wrap_execute_query_response( rpc_state=None, response_pb=resp, session_state=self._state, session=self, settings=self._settings, ), + error_converter=stream_error_converter, ) diff --git a/ydb/query/transaction.py b/ydb/query/transaction.py index 008ac7c4..34661f8a 100644 --- a/ydb/query/transaction.py +++ b/ydb/query/transaction.py @@ -16,6 +16,7 @@ from ..connection import _RpcState as RpcState from . import base +from .._errors import stream_error_converter from ..settings import BaseRequestSettings logger = logging.getLogger(__name__) @@ -500,8 +501,8 @@ def execute( ) self._prev_stream = base.SyncResponseContextIterator( - stream_it, - lambda resp: base.wrap_execute_query_response( + it=stream_it, + wrapper=lambda resp: base.wrap_execute_query_response( rpc_state=None, response_pb=resp, session_state=self._session_state, @@ -509,5 +510,6 @@ def execute( commit_tx=commit_tx, settings=self.session._settings, ), + error_converter=stream_error_converter, ) return self._prev_stream