Skip to content

Commit 8ac947e

Browse files
committed
Make commit_offset call retryable
1 parent 25986a2 commit 8ac947e

File tree

3 files changed

+120
-0
lines changed

3 files changed

+120
-0
lines changed

tests/topics/test_topic_reader.py

Lines changed: 53 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -96,6 +96,32 @@ async def test_commit_offset_with_session_id_works(self, driver, topic_with_mess
9696
msg2 = await reader.receive_message()
9797
assert msg2.seqno == 2
9898

99+
async def test_commit_offset_retry_on_ydb_errors(self, driver, topic_with_messages, topic_consumer, monkeypatch):
100+
async with driver.topic_client.reader(topic_with_messages, topic_consumer) as reader:
101+
message = await reader.receive_message()
102+
103+
call_count = 0
104+
original_driver_call = driver.topic_client._driver
105+
106+
async def mock_driver_call(*args, **kwargs):
107+
nonlocal call_count
108+
call_count += 1
109+
110+
if call_count == 1:
111+
raise ydb.Unavailable("Service temporarily unavailable")
112+
elif call_count == 2:
113+
raise ydb.Cancelled("Operation was cancelled")
114+
else:
115+
return await original_driver_call(*args, **kwargs)
116+
117+
monkeypatch.setattr(driver.topic_client, "_driver", mock_driver_call)
118+
119+
await driver.topic_client.commit_offset(
120+
topic_with_messages, topic_consumer, message.partition_id, message.offset + 1
121+
)
122+
123+
assert call_count == 3
124+
99125
async def test_reader_reconnect_after_commit_offset(self, driver, topic_with_messages, topic_consumer):
100126
async with driver.topic_client.reader(topic_with_messages, topic_consumer) as reader:
101127
for out in ["123", "456", "789", "0"]:
@@ -257,6 +283,33 @@ def test_commit_offset_with_session_id_works(self, driver_sync, topic_with_messa
257283
msg2 = reader.receive_message()
258284
assert msg2.seqno == 2
259285

286+
def test_commit_offset_retry_on_ydb_errors(self, driver_sync, topic_with_messages, topic_consumer, monkeypatch):
287+
with driver_sync.topic_client.reader(topic_with_messages, topic_consumer) as reader:
288+
message = reader.receive_message()
289+
290+
# Counter to track retry attempts
291+
call_count = 0
292+
original_driver_call = driver_sync.topic_client._driver
293+
294+
def mock_driver_call(*args, **kwargs):
295+
nonlocal call_count
296+
call_count += 1
297+
298+
if call_count == 1:
299+
raise ydb.Unavailable("Service temporarily unavailable")
300+
elif call_count == 2:
301+
raise ydb.Cancelled("Operation was cancelled")
302+
else:
303+
return original_driver_call(*args, **kwargs)
304+
305+
monkeypatch.setattr(driver_sync.topic_client, "_driver", mock_driver_call)
306+
307+
driver_sync.topic_client.commit_offset(
308+
topic_with_messages, topic_consumer, message.partition_id, message.offset + 1
309+
)
310+
311+
assert call_count == 3
312+
260313
def test_reader_reconnect_after_commit_offset(self, driver_sync, topic_with_messages, topic_consumer):
261314
with driver_sync.topic_client.reader(topic_with_messages, topic_consumer) as reader:
262315
for out in ["123", "456", "789", "0"]:

ydb/retries.py

Lines changed: 63 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,6 @@
11
import asyncio
2+
import functools
3+
import inspect
24
import random
35
import time
46

@@ -161,3 +163,64 @@ async def retry_operation_async(callee, retry_settings=None, *args, **kwargs):
161163
return await next_opt.result
162164
except BaseException as e: # pylint: disable=W0703
163165
next_opt.set_exception(e)
166+
167+
168+
def ydb_retry(
169+
max_retries=10,
170+
max_session_acquire_timeout=None,
171+
on_ydb_error_callback=None,
172+
backoff_ceiling=6,
173+
backoff_slot_duration=1,
174+
get_session_client_timeout=5,
175+
fast_backoff_settings=None,
176+
slow_backoff_settings=None,
177+
idempotent=False,
178+
retry_cancelled=False,
179+
):
180+
"""
181+
Decorator for automatic function retry in case of YDB errors.
182+
183+
Supports both synchronous and asynchronous functions.
184+
185+
:param max_retries: Maximum number of retries (default: 10)
186+
:param max_session_acquire_timeout: Maximum session acquisition timeout (default: None)
187+
:param on_ydb_error_callback: Callback for handling YDB errors (default: None)
188+
:param backoff_ceiling: Ceiling for backoff algorithm (default: 6)
189+
:param backoff_slot_duration: Slot duration for backoff (default: 1)
190+
:param get_session_client_timeout: Session client timeout (default: 5)
191+
:param fast_backoff_settings: Fast backoff settings (default: None)
192+
:param slow_backoff_settings: Slow backoff settings (default: None)
193+
:param idempotent: Whether the operation is idempotent (default: False)
194+
:param retry_cancelled: Whether to retry cancelled operations (default: False)
195+
"""
196+
197+
def decorator(func):
198+
retry_settings = RetrySettings(
199+
max_retries=max_retries,
200+
max_session_acquire_timeout=max_session_acquire_timeout,
201+
on_ydb_error_callback=on_ydb_error_callback,
202+
backoff_ceiling=backoff_ceiling,
203+
backoff_slot_duration=backoff_slot_duration,
204+
get_session_client_timeout=get_session_client_timeout,
205+
fast_backoff_settings=fast_backoff_settings,
206+
slow_backoff_settings=slow_backoff_settings,
207+
idempotent=idempotent,
208+
retry_cancelled=retry_cancelled,
209+
)
210+
211+
if inspect.iscoroutinefunction(func):
212+
213+
@functools.wraps(func)
214+
async def async_wrapper(*args, **kwargs):
215+
return await retry_operation_async(func, retry_settings, *args, **kwargs)
216+
217+
return async_wrapper
218+
else:
219+
220+
@functools.wraps(func)
221+
def sync_wrapper(*args, **kwargs):
222+
return retry_operation_sync(func, retry_settings, *args, **kwargs)
223+
224+
return sync_wrapper
225+
226+
return decorator

ydb/topic.py

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -98,6 +98,8 @@
9898
PublicAlterAutoPartitioningSettings as TopicAlterAutoPartitioningSettings,
9999
)
100100

101+
from .retries import ydb_retry
102+
101103
logger = logging.getLogger(__name__)
102104

103105

@@ -348,6 +350,7 @@ def tx_writer(
348350

349351
return TopicTxWriterAsyncIO(tx=tx, driver=self._driver, settings=settings, _client=self)
350352

353+
@ydb_retry(retry_cancelled=True)
351354
async def commit_offset(
352355
self, path: str, consumer: str, partition_id: int, offset: int, read_session_id: Optional[str] = None
353356
) -> None:
@@ -645,6 +648,7 @@ def tx_writer(
645648

646649
return TopicTxWriter(tx, self._driver, settings, _parent=self)
647650

651+
@ydb_retry(retry_cancelled=True)
648652
def commit_offset(
649653
self, path: str, consumer: str, partition_id: int, offset: int, read_session_id: Optional[str] = None
650654
) -> None:

0 commit comments

Comments
 (0)