Skip to content

Commit 290feec

Browse files
authored
Merge pull request #177 test sync topic writer, fix timeout param
2 parents d4864cd + c0f6f7b commit 290feec

File tree

7 files changed

+96
-139
lines changed

7 files changed

+96
-139
lines changed

tests/topics/test_topic_writer.py

Lines changed: 44 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -50,3 +50,47 @@ async def test_auto_flush_on_close(self, driver: ydb.aio.Driver, topic_path):
5050
) as writer:
5151
init_info = await writer.wait_init()
5252
assert init_info.last_seqno == last_seqno
53+
54+
55+
class TestTopicWriterSync:
56+
def test_send_message(self, driver_sync: ydb.Driver, topic_path):
57+
writer = driver_sync.topic_client.topic_writer(
58+
topic_path, producer_and_message_group_id="test"
59+
)
60+
writer.write(ydb.TopicWriterMessage(data="123".encode()))
61+
writer.close()
62+
63+
def test_wait_last_seqno(self, driver_sync: ydb.Driver, topic_path):
64+
with driver_sync.topic_client.topic_writer(
65+
topic_path,
66+
producer_and_message_group_id="test",
67+
auto_seqno=False,
68+
) as writer:
69+
writer.write_with_ack(ydb.TopicWriterMessage(data="123".encode(), seqno=5))
70+
71+
with driver_sync.topic_client.topic_writer(
72+
topic_path,
73+
producer_and_message_group_id="test",
74+
get_last_seqno=True,
75+
) as writer2:
76+
init_info = writer2.wait_init()
77+
assert init_info.last_seqno == 5
78+
79+
def test_auto_flush_on_close(self, driver_sync: ydb.Driver, topic_path):
80+
with driver_sync.topic_client.topic_writer(
81+
topic_path,
82+
producer_and_message_group_id="test",
83+
auto_seqno=False,
84+
) as writer:
85+
last_seqno = 0
86+
for i in range(10):
87+
last_seqno = i + 1
88+
writer.write(ydb.TopicWriterMessage(data=f"msg-{i}", seqno=last_seqno))
89+
90+
with driver_sync.topic_client.topic_writer(
91+
topic_path,
92+
producer_and_message_group_id="test",
93+
get_last_seqno=True,
94+
) as writer:
95+
init_info = writer.wait_init()
96+
assert init_info.last_seqno == last_seqno

ydb/_grpc/grpcwrapper/common_utils.py

Lines changed: 23 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,9 @@
22

33
import abc
44
import asyncio
5+
import contextvars
56
import datetime
7+
import functools
68
import typing
79
from typing import (
810
Optional,
@@ -118,7 +120,7 @@ def __aiter__(self):
118120

119121
async def __anext__(self):
120122
try:
121-
res = await asyncio.to_thread(self._sync_iterator.__next__)
123+
res = await to_thread(self._sync_iterator.__next__)
122124
return res
123125
except StopAsyncIteration:
124126
raise StopIteration()
@@ -180,7 +182,7 @@ async def _start_asyncio_driver(self, driver: ydb.aio.Driver, stub, method):
180182

181183
async def _start_sync_driver(self, driver: ydb.Driver, stub, method):
182184
requests_iterator = AsyncQueueToSyncIteratorAsyncIO(self.from_client_grpc)
183-
stream_call = await asyncio.to_thread(
185+
stream_call = await to_thread(
184186
driver,
185187
requests_iterator,
186188
stub,
@@ -257,6 +259,25 @@ def callback_from_asyncio(
257259
return loop.run_in_executor(None, callback)
258260

259261

262+
async def to_thread(func, /, *args, **kwargs):
263+
"""Asynchronously run function *func* in a separate thread.
264+
265+
Any *args and **kwargs supplied for this function are directly passed
266+
to *func*. Also, the current :class:`contextvars.Context` is propagated,
267+
allowing context variables from the main thread to be accessed in the
268+
separate thread.
269+
270+
Return a coroutine that can be awaited to get the eventual result of *func*.
271+
272+
copy to_thread from 3.10
273+
"""
274+
275+
loop = asyncio.get_running_loop()
276+
ctx = contextvars.copy_context()
277+
func_call = functools.partial(ctx.run, func, *args, **kwargs)
278+
return await loop.run_in_executor(None, func_call)
279+
280+
260281
def proto_duration_from_timedelta(t: Optional[datetime.timedelta]) -> ProtoDuration:
261282
if t is None:
262283
return None

ydb/_topic_common/common.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44
from .._grpc.grpcwrapper.common_utils import IFromProtoWithProtoType
55

66
TokenGetterFuncType = typing.Optional[typing.Callable[[], str]]
7+
TimeoutType = typing.Union[int, float]
78

89

910
def wrap_operation(rpc_state, response_pb, driver=None):

ydb/_topic_writer/topic_writer.py

Lines changed: 1 addition & 121 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,3 @@
1-
import concurrent.futures
21
import datetime
32
import enum
43
from dataclasses import dataclass
@@ -12,126 +11,7 @@
1211
from .._grpc.grpcwrapper.common_utils import IToProto
1312

1413

15-
class Writer:
16-
@property
17-
def last_seqno(self) -> int:
18-
raise NotImplementedError()
19-
20-
def __init__(self, db: ydb.Driver):
21-
pass
22-
23-
def __enter__(self):
24-
return self
25-
26-
def __exit__(self, exc_type, exc_val, exc_tb):
27-
self.close()
28-
29-
def close(self):
30-
pass
31-
32-
MessageType = typing.Union["PublicMessage", "PublicMessage.SimpleMessageSourceType"]
33-
34-
def write(
35-
self,
36-
message: Union[MessageType, List[MessageType]],
37-
*args: Optional[MessageType],
38-
timeout: [float, None] = None,
39-
):
40-
"""
41-
send one or number of messages to server.
42-
it fast put message to internal buffer, without wait message result
43-
return None
44-
45-
message will send independent of wait/no wait result
46-
47-
timeout - time for waiting for put message into internal queue.
48-
if 0 or negative - non block calls
49-
if None or not set - infinite wait
50-
It will raise TimeoutError() exception if it can't put message to internal queue by limits during timeout.
51-
"""
52-
raise NotImplementedError()
53-
54-
def async_write_with_ack(
55-
self,
56-
message: Union[MessageType, List[MessageType]],
57-
*args: Optional[MessageType],
58-
timeout: [float, None] = None,
59-
) -> concurrent.futures.Future:
60-
"""
61-
send one or number of messages to server.
62-
return feature, which can be waited for check send result: ack/duplicate/error
63-
64-
Usually it is fast method, but can wait if internal buffer is full.
65-
66-
timeout - time for waiting for put message into internal queue.
67-
The method can be blocked up to timeout seconds before return future.
68-
69-
if 0 or negative - non block calls
70-
if None or not set - infinite wait
71-
It will raise TimeoutError() exception if it can't put message to internal queue by limits during timeout.
72-
"""
73-
raise NotImplementedError()
74-
75-
def write_with_ack(
76-
self,
77-
message: Union[MessageType, List[MessageType]],
78-
*args: Optional[MessageType],
79-
buffer_timeout: [float, None] = None,
80-
) -> Union["MessageWriteStatus", List["MessageWriteStatus"]]:
81-
"""
82-
IT IS SLOWLY WAY. IT IS BAD CHOISE IN MOST CASES.
83-
It is recommended to use write with optionally flush or async_write_with_ack and receive acks by wait future.
84-
85-
send one or number of messages to server.
86-
blocked until receive server ack for the message/messages.
87-
88-
message will send independent of wait/no wait result
89-
90-
buffer_timeout - time for send message to server and receive ack.
91-
if 0 or negative - non block calls
92-
if None or not set - infinite wait
93-
It will raise TimeoutError() exception if it isn't receive ack in timeout
94-
"""
95-
raise NotImplementedError()
96-
97-
def async_flush(self):
98-
"""
99-
Force send all messages from internal buffer and wait acks from server for all
100-
messages.
101-
102-
flush starts of flush process, and return Future for wait result.
103-
messages will be flushed independent of future waiting.
104-
"""
105-
raise NotImplementedError()
106-
107-
def flush(self, timeout: Optional[float] = None) -> concurrent.futures.Future:
108-
"""
109-
Force send all messages from internal buffer and wait acks from server for all
110-
messages.
111-
112-
timeout - time for waiting for send all messages and receive server ack.
113-
if 0 or negative - non block calls
114-
if None or not set - infinite wait
115-
It will raise TimeoutError() exception if it isn't receive ack in timeout
116-
"""
117-
raise NotImplementedError()
118-
119-
def async_wait_init(self) -> concurrent.futures.Future:
120-
"""
121-
Return feature, which done when underling connection established
122-
"""
123-
raise NotImplementedError()
124-
125-
def wait_init(self, timeout: Optional[float] = None):
126-
"""
127-
Wait until underling connection established
128-
129-
timeout - time for waiting for send all messages and receive server ack.
130-
if 0 or negative - non block calls
131-
if None or not set - infinite wait
132-
It will raise TimeoutError() exception if it isn't receive ack in timeout
133-
"""
134-
raise NotImplementedError()
14+
MessageType = typing.Union["PublicMessage", "PublicMessage.SimpleMessageSourceType"]
13515

13616

13717
@dataclass

ydb/_topic_writer/topic_writer_asyncio.py

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -7,14 +7,14 @@
77
from .topic_writer import (
88
PublicWriterSettings,
99
WriterSettings,
10-
Writer,
1110
PublicMessage,
1211
PublicWriterInitInfo,
1312
InternalMessage,
1413
TopicWriterStopped,
1514
TopicWriterError,
1615
messages_to_proto_requests,
1716
PublicWriteResultTypes,
17+
MessageType,
1818
)
1919
from .. import (
2020
_apis,
@@ -75,8 +75,8 @@ async def close(self, *, flush: bool = True):
7575

7676
async def write_with_ack(
7777
self,
78-
messages: Union[Writer.MessageType, List[Writer.MessageType]],
79-
*args: Optional[Writer.MessageType],
78+
messages: Union[MessageType, List[MessageType]],
79+
*args: Optional[MessageType],
8080
) -> Union[PublicWriteResultTypes, List[PublicWriteResultTypes]]:
8181
"""
8282
IT IS SLOWLY WAY. IT IS BAD CHOISE IN MOST CASES.
@@ -97,8 +97,8 @@ async def write_with_ack(
9797

9898
async def write_with_ack_future(
9999
self,
100-
messages: Union[Writer.MessageType, List[Writer.MessageType]],
101-
*args: Optional[Writer.MessageType],
100+
messages: Union[MessageType, List[MessageType]],
101+
*args: Optional[MessageType],
102102
) -> Union[asyncio.Future, List[asyncio.Future]]:
103103
"""
104104
send one or number of messages to server.
@@ -120,8 +120,8 @@ async def write_with_ack_future(
120120

121121
async def write(
122122
self,
123-
messages: Union[Writer.MessageType, List[Writer.MessageType]],
124-
*args: Optional[Writer.MessageType],
123+
messages: Union[MessageType, List[MessageType]],
124+
*args: Optional[MessageType],
125125
):
126126
"""
127127
send one or number of messages to server.

ydb/_topic_writer/topic_writer_sync.py

Lines changed: 14 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -11,11 +11,12 @@
1111
TopicWriterError,
1212
PublicWriterInitInfo,
1313
PublicMessage,
14-
Writer,
1514
PublicWriteResult,
15+
MessageType,
1616
)
1717

1818
from .topic_writer_asyncio import WriterAsyncIO
19+
from .._topic_common.common import TimeoutType
1920

2021
_shared_event_loop_lock = threading.Lock()
2122
_shared_event_loop = None # type: Optional[asyncio.AbstractEventLoop]
@@ -78,6 +79,12 @@ async def create_async_writer():
7879
create_async_writer(), self._loop
7980
).result()
8081

82+
def __enter__(self):
83+
return self
84+
85+
def __exit__(self, exc_type, exc_val, exc_tb):
86+
self.close()
87+
8188
def _call(self, coro, *args, **kwargs):
8289
if self._closed:
8390
raise TopicWriterError("writer is closed")
@@ -87,7 +94,7 @@ def _call(self, coro, *args, **kwargs):
8794
def _call_sync(self, coro: Coroutine, timeout, *args, **kwargs):
8895
f = self._call(coro, *args, **kwargs)
8996
try:
90-
return f.result()
97+
return f.result(timeout=timeout)
9198
except TimeoutError:
9299
f.cancel()
93100
raise
@@ -111,7 +118,7 @@ def flush(self, timeout=None):
111118
def async_wait_init(self) -> Future[PublicWriterInitInfo]:
112119
return self._call(self._async_writer.wait_init())
113120

114-
def wait_init(self, timeout) -> PublicWriterInitInfo:
121+
def wait_init(self, timeout: Optional[TimeoutType] = None) -> PublicWriterInitInfo:
115122
return self._call_sync(self._async_writer.wait_init(), timeout)
116123

117124
def write(
@@ -124,15 +131,15 @@ def write(
124131

125132
def async_write_with_ack(
126133
self,
127-
messages: Union[Writer.MessageType, List[Writer.MessageType]],
128-
*args: Optional[Writer.MessageType],
134+
messages: Union[MessageType, List[MessageType]],
135+
*args: Optional[MessageType],
129136
) -> Future[Union[PublicWriteResult, List[PublicWriteResult]]]:
130137
return self._call(self._async_writer.write_with_ack(messages, *args))
131138

132139
def write_with_ack(
133140
self,
134-
messages: Union[Writer.MessageType, List[Writer.MessageType]],
135-
*args: Optional[Writer.MessageType],
141+
messages: Union[MessageType, List[MessageType]],
142+
*args: Optional[MessageType],
136143
timeout: Union[float, None] = None,
137144
) -> Union[PublicWriteResult, List[PublicWriteResult]]:
138145
return self._call_sync(

ydb/topic.py

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -19,12 +19,13 @@
1919
)
2020

2121
from ._topic_writer.topic_writer import ( # noqa: F401
22-
Writer as TopicWriter,
2322
PublicWriterSettings as TopicWriterSettings,
2423
PublicMessage as TopicWriterMessage,
2524
RetryPolicy as TopicWriterRetryPolicy,
2625
)
2726

27+
from ._topic_writer.topic_writer_sync import WriterSync as TopicWriter
28+
2829
from ._topic_common.common import (
2930
wrap_operation as _wrap_operation,
3031
create_result_wrapper as _create_result_wrapper,
@@ -278,7 +279,10 @@ def topic_writer(
278279
get_last_seqno: bool = False,
279280
retry_policy: Union["TopicWriterRetryPolicy", None] = None,
280281
) -> TopicWriter:
281-
raise NotImplementedError()
282+
args = locals()
283+
del args["self"]
284+
settings = TopicWriterSettings(**args)
285+
return TopicWriter(self._driver, settings)
282286

283287

284288
class TopicClientSettings:

0 commit comments

Comments
 (0)