11import asyncio
22import contextlib
3- import logging
43from typing import Optional , Set
54
65import ydb
76from ydb import issues , _apis
87from ydb ._grpc .grpcwrapper .common_utils import IToProto , GrpcWrapperAsyncIO
98from ydb ._grpc .grpcwrapper .ydb_coordination import FromServer , Ping , SessionStart
109
11- logger = logging .getLogger (__name__ )
12-
1310
1411class CoordinationStream :
1512 def __init__ (self , driver : "ydb.aio.Driver" ):
1613 self ._driver = driver
17- self ._stream : Optional [ GrpcWrapperAsyncIO ] = None
14+ self ._stream : GrpcWrapperAsyncIO = GrpcWrapperAsyncIO ( FromServer . from_proto )
1815 self ._background_tasks : Set [asyncio .Task ] = set ()
1916 self ._incoming_queue : asyncio .Queue = asyncio .Queue ()
2017 self ._closed = False
2118 self ._started = False
22- self ._first_error : asyncio .Future = asyncio .get_running_loop ().create_future ()
2319 self .session_id : Optional [int ] = None
24- self ._state_changed = asyncio .Event ()
2520
2621 async def start_session (self , path : str , timeout_millis : int ):
2722 if self ._started :
2823 raise issues .Error ("CoordinationStream already started" )
2924
3025 self ._started = True
31- self ._stream = GrpcWrapperAsyncIO (FromServer .from_proto )
3226 await self ._stream .start (self ._driver , _apis .CoordinationService .Stub , _apis .CoordinationService .Session )
3327
34- start_msg : IToProto = SessionStart (path = path , timeout_millis = timeout_millis )
35- self ._stream .write (start_msg )
28+ self ._stream .write (SessionStart (path = path , timeout_millis = timeout_millis ))
3629
37- try :
38- async for resp in self . _stream . from_server_grpc :
39- fs = FromServer . from_proto ( resp )
40- if fs .session_started :
41- self .session_id = fs .session_started
30+ while True :
31+ try :
32+ resp = await self . _stream . receive ( timeout = 3 )
33+ if resp .session_started :
34+ self .session_id = resp .session_started
4235 break
43- else :
44- raise issues . Error ( "Failed to start coordination session" )
45- except asyncio .TimeoutError :
46- raise issues .Error ("Timeout waiting for SessionStart response" )
47- except Exception as e :
48- raise issues .Error (f"Failed to start session: { e } " )
36+ else :
37+ continue
38+ except asyncio .TimeoutError :
39+ raise issues .Error ("Timeout waiting for SessionStart response" )
40+ except Exception as e :
41+ raise issues .Error (f"Failed to start session: { e } " )
4942
5043 task = asyncio .get_running_loop ().create_task (self ._reader_loop ())
5144 self ._background_tasks .add (task )
52- logger .debug ("CoordinationStream: started reader task %r" , task )
5345
5446 async def _reader_loop (self ):
55- try :
56- async for resp in self ._stream .from_server_grpc :
47+ while True :
48+ try :
49+ resp = await self ._stream .receive (timeout = 3 )
5750 if self ._closed :
5851 break
5952
@@ -62,27 +55,18 @@ async def _reader_loop(self):
6255 try :
6356 self ._stream .write (Ping (fs .opaque ))
6457 except Exception :
65- self . _set_first_error ( RuntimeError ( "Failed to write Ping" ) )
58+ raise issues . Error ( "Failed to write Ping" )
6659 else :
6760 await self ._incoming_queue .put (resp )
68- self ._state_changed .set ()
69- except asyncio .CancelledError :
70- logger .debug ("CoordinationStream: reader loop cancelled" )
71- pass
72- except Exception as exc :
73- logger .exception ("CoordinationStream: reader loop error" )
74- self ._set_first_error (exc )
61+ except asyncio .CancelledError :
62+ break
7563
7664 async def send (self , req : IToProto ):
77- self ._check_error ()
7865 if self ._closed :
7966 raise issues .Error ("Stream closed" )
80- if not isinstance (req , IToProto ):
81- raise TypeError (f"Cannot write object of type { type (req ).__name__ } , must implement IToProto" )
8267 self ._stream .write (req )
8368
8469 async def receive (self , timeout : Optional [float ] = None ):
85- self ._check_error ()
8670 if self ._closed :
8771 raise issues .Error ("Stream closed" )
8872
@@ -99,7 +83,6 @@ async def close(self):
9983 return
10084 self ._closed = True
10185
102- logger .debug ("CoordinationStream: closing, cancelling %d background tasks" , len (self ._background_tasks ))
10386 for task in list (self ._background_tasks ):
10487 task .cancel ()
10588
@@ -112,24 +95,7 @@ async def close(self):
11295 try :
11396 self ._stream .close ()
11497 except Exception :
115- logger . exception ( "CoordinationStream: error closing underlying stream" )
98+ pass
11699 self ._stream = None
117100
118101 self .session_id = None
119- self ._state_changed .set ()
120- logger .debug ("CoordinationStream: closed" )
121-
122- def _set_first_error (self , exc : Exception ):
123- if not self ._first_error .done ():
124- self ._first_error .set_result (exc )
125- self ._state_changed .set ()
126-
127- def _get_first_error (self ):
128- if self ._first_error .done ():
129- return self ._first_error .result ()
130- return None
131-
132- def _check_error (self ):
133- err = self ._get_first_error ()
134- if err :
135- raise err
0 commit comments