Skip to content

Commit d4864cd

Browse files
authored
Merge pull request #170 from ydb-platform/topic-writer-flush-on-close
topic-writer: fix default flush on close parameter, renaming
2 parents 896a677 + 4db696c commit d4864cd

File tree

2 files changed

+23
-19
lines changed

2 files changed

+23
-19
lines changed

ydb/_topic_writer/topic_writer_asyncio.py

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -65,13 +65,13 @@ def __del__(self):
6565

6666
self._loop.call_soon(self.close)
6767

68-
async def close(self):
68+
async def close(self, *, flush: bool = True):
6969
if self._closed:
7070
return
7171

7272
self._closed = True
7373

74-
await self._reconnector.close()
74+
await self._reconnector.close(flush)
7575

7676
async def write_with_ack(
7777
self,
@@ -109,13 +109,13 @@ async def write_with_ack_future(
109109
For wait with timeout use asyncio.wait_for.
110110
"""
111111
if isinstance(messages, PublicMessage):
112-
futures = await self._reconnector.write_with_ack([messages])
112+
futures = await self._reconnector.write_with_ack_future([messages])
113113
return futures[0]
114114
if isinstance(messages, list):
115115
for m in messages:
116116
if not isinstance(m, PublicMessage):
117117
raise NotImplementedError()
118-
return await self._reconnector.write_with_ack(messages)
118+
return await self._reconnector.write_with_ack_future(messages)
119119
raise NotImplementedError()
120120

121121
async def write(
@@ -185,7 +185,7 @@ def __init__(self, driver: SupportedDriverType, settings: WriterSettings):
185185
asyncio.create_task(self._connection_loop(), name="connection_loop")
186186
]
187187

188-
async def close(self, flush: bool = True):
188+
async def close(self, flush: bool):
189189
if self._closed:
190190
return
191191

@@ -223,7 +223,7 @@ async def wait_init(self) -> PublicWriterInitInfo:
223223
async def wait_stop(self) -> Exception:
224224
return await self._stop_reason
225225

226-
async def write_with_ack(
226+
async def write_with_ack_future(
227227
self, messages: List[PublicMessage]
228228
) -> List[asyncio.Future]:
229229
# todo check internal buffer limit

ydb/_topic_writer/topic_writer_asyncio_test.py

Lines changed: 17 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -295,7 +295,7 @@ async def test_reconnect_and_resent_non_acked_messages_on_retriable_error(
295295
seqno=2,
296296
created_at=now,
297297
)
298-
await reconnector.write_with_ack([message1, message2])
298+
await reconnector.write_with_ack_future([message1, message2])
299299

300300
# sent to first stream
301301
stream_writer = get_stream_writer()
@@ -317,7 +317,7 @@ async def test_reconnect_and_resent_non_acked_messages_on_retriable_error(
317317
assert second_sent_msg == expected_messages
318318

319319
second_writer.from_server.put_nowait(self.make_default_ack_message(seq_no=2))
320-
await reconnector.close()
320+
await reconnector.close(flush=True)
321321

322322
async def test_stop_on_unexpected_exception(
323323
self, reconnector: WriterAsyncIOReconnector, get_stream_writer
@@ -337,7 +337,7 @@ class TestException(Exception):
337337

338338
async def wait_stop():
339339
while True:
340-
await reconnector.write_with_ack([message])
340+
await reconnector.write_with_ack_future([message])
341341
await asyncio.sleep(0.1)
342342

343343
await asyncio.wait_for(wait_stop(), 1)
@@ -380,7 +380,7 @@ async def test_write_message(
380380
data="123",
381381
seqno=3,
382382
)
383-
await reconnector.write_with_ack([message])
383+
await reconnector.write_with_ack_future([message])
384384

385385
sent_messages = await asyncio.wait_for(stream_writer.from_client.get(), 1)
386386
assert sent_messages == [InternalMessage(message)]
@@ -399,8 +399,8 @@ async def test_auto_seq_no(
399399

400400
reconnector = WriterAsyncIOReconnector(default_driver, settings)
401401

402-
await reconnector.write_with_ack([PublicMessage(data="123")])
403-
await reconnector.write_with_ack([PublicMessage(data="456")])
402+
await reconnector.write_with_ack_future([PublicMessage(data="123")])
403+
await reconnector.write_with_ack_future([PublicMessage(data="456")])
404404

405405
stream_writer = get_stream_writer()
406406

@@ -415,22 +415,26 @@ async def test_auto_seq_no(
415415
] == sent
416416

417417
with pytest.raises(TopicWriterError):
418-
await reconnector.write_with_ack(
418+
await reconnector.write_with_ack_future(
419419
[PublicMessage(seqno=last_seq_no + 3, data="123")]
420420
)
421421

422422
await reconnector.close(flush=False)
423423

424424
async def test_deny_double_seqno(self, reconnector: WriterAsyncIOReconnector):
425-
await reconnector.write_with_ack([PublicMessage(seqno=10, data="123")])
425+
await reconnector.write_with_ack_future([PublicMessage(seqno=10, data="123")])
426426

427427
with pytest.raises(TopicWriterError):
428-
await reconnector.write_with_ack([PublicMessage(seqno=9, data="123")])
428+
await reconnector.write_with_ack_future(
429+
[PublicMessage(seqno=9, data="123")]
430+
)
429431

430432
with pytest.raises(TopicWriterError):
431-
await reconnector.write_with_ack([PublicMessage(seqno=10, data="123")])
433+
await reconnector.write_with_ack_future(
434+
[PublicMessage(seqno=10, data="123")]
435+
)
432436

433-
await reconnector.write_with_ack([PublicMessage(seqno=11, data="123")])
437+
await reconnector.write_with_ack_future([PublicMessage(seqno=11, data="123")])
434438

435439
await reconnector.close(flush=False)
436440

@@ -443,7 +447,7 @@ async def test_auto_created_at(
443447
settings = copy.deepcopy(default_settings)
444448
settings.auto_created_at = True
445449
reconnector = WriterAsyncIOReconnector(default_driver, settings)
446-
await reconnector.write_with_ack([PublicMessage(seqno=4, data="123")])
450+
await reconnector.write_with_ack_future([PublicMessage(seqno=4, data="123")])
447451

448452
stream_writer = get_stream_writer()
449453
sent = await stream_writer.from_client.get()
@@ -468,7 +472,7 @@ def __init__(self):
468472
self.futures = []
469473
self.messages_writted = asyncio.Event()
470474

471-
async def write_with_ack(self, messages: typing.List[InternalMessage]):
475+
async def write_with_ack_future(self, messages: typing.List[InternalMessage]):
472476
async with self.lock:
473477
futures = [asyncio.Future() for _ in messages]
474478
self.messages.extend(messages)

0 commit comments

Comments
 (0)