11import asyncio
22import threading
33import time
4+ from cgitb import small
45
56import pytest
67
1617 DescribeLockResult ,
1718)
1819
20+ @pytest .fixture
21+ def sync_coordination_node (driver_sync ):
22+ client = CoordinationClient (driver_sync )
23+ node_path = "/local/test_node"
1924
20- class TestCoordination :
21- def test_coordination_node_lifecycle ( self , driver_sync : ydb . Driver ):
22- client = CoordinationClient ( driver_sync )
23- node_path = "/local/test_node_lifecycle"
25+ try :
26+ client . delete_node ( node_path )
27+ except ydb . SchemeError :
28+ pass
2429
25- try :
26- client .delete_node (node_path )
27- except ydb .SchemeError :
28- pass
30+ config = NodeConfig (
31+ session_grace_period_millis = 1000 ,
32+ attach_consistency_mode = ConsistencyMode .STRICT ,
33+ read_consistency_mode = ConsistencyMode .STRICT ,
34+ rate_limiter_counters_mode = RateLimiterCountersMode .UNSET ,
35+ self_check_period_millis = 0 ,
36+ )
37+ client .create_node (node_path , config )
2938
30- with pytest .raises (ydb .SchemeError ):
31- client .describe_node (node_path )
39+ yield client , node_path , config
3240
33- initial_config = NodeConfig (
34- session_grace_period_millis = 1000 ,
35- attach_consistency_mode = ConsistencyMode .STRICT ,
36- read_consistency_mode = ConsistencyMode .STRICT ,
37- rate_limiter_counters_mode = RateLimiterCountersMode .UNSET ,
38- self_check_period_millis = 0 ,
39- )
40- client .create_node (node_path , initial_config )
41+ try :
42+ client .delete_node (node_path )
43+ except ydb .SchemeError :
44+ pass
45+
46+ @pytest .fixture
47+ async def async_coordination_node (aio_connection ):
48+ client = aio .CoordinationClient (aio_connection )
49+ node_path = "/local/test_node"
50+
51+ try :
52+ await client .delete_node (node_path )
53+ except ydb .SchemeError :
54+ pass
55+
56+ config = NodeConfig (
57+ session_grace_period_millis = 1000 ,
58+ attach_consistency_mode = ConsistencyMode .STRICT ,
59+ read_consistency_mode = ConsistencyMode .STRICT ,
60+ rate_limiter_counters_mode = RateLimiterCountersMode .UNSET ,
61+ self_check_period_millis = 0 ,
62+ )
63+ await client .create_node (node_path , config )
64+
65+ yield client , node_path , config
66+
67+ try :
68+ await client .delete_node (node_path )
69+ except ydb .SchemeError :
70+ pass
71+
72+
73+ class TestCoordination :
74+ def test_coordination_node_lifecycle (self , sync_coordination_node ):
75+ client , node_path , initial_config = sync_coordination_node
4176
4277 node_conf = client .describe_node (node_path )
4378 assert node_conf == initial_config
@@ -59,26 +94,8 @@ def test_coordination_node_lifecycle(self, driver_sync: ydb.Driver):
5994 with pytest .raises (ydb .SchemeError ):
6095 client .describe_node (node_path )
6196
62- async def test_coordination_node_lifecycle_async (self , aio_connection ):
63- client = aio .CoordinationClient (aio_connection )
64- node_path = "/local/test_node_lifecycle"
65-
66- try :
67- await client .delete_node (node_path )
68- except ydb .SchemeError :
69- pass
70-
71- with pytest .raises (ydb .SchemeError ):
72- await client .describe_node (node_path )
73-
74- initial_config = NodeConfig (
75- session_grace_period_millis = 1000 ,
76- attach_consistency_mode = ConsistencyMode .STRICT ,
77- read_consistency_mode = ConsistencyMode .STRICT ,
78- rate_limiter_counters_mode = RateLimiterCountersMode .UNSET ,
79- self_check_period_millis = 0 ,
80- )
81- await client .create_node (node_path , initial_config )
97+ async def test_coordination_node_lifecycle_async (self , async_coordination_node ):
98+ client , node_path , initial_config = async_coordination_node
8299
83100 node_conf = await client .describe_node (node_path )
84101 assert node_conf == initial_config
@@ -100,52 +117,86 @@ async def test_coordination_node_lifecycle_async(self, aio_connection):
100117 with pytest .raises (ydb .SchemeError ):
101118 await client .describe_node (node_path )
102119
103- async def test_coordination_lock_full_lifecycle (self , aio_connection ):
104- client = aio .CoordinationClient (aio_connection )
105-
106- node_path = "/local/test_lock_full_lifecycle"
107-
108- try :
109- await client .delete_node (node_path )
110- except ydb .SchemeError :
111- pass
112-
113- await client .create_node (
114- node_path ,
115- NodeConfig (
116- session_grace_period_millis = 1000 ,
117- attach_consistency_mode = ConsistencyMode .STRICT ,
118- read_consistency_mode = ConsistencyMode .STRICT ,
119- rate_limiter_counters_mode = RateLimiterCountersMode .UNSET ,
120- self_check_period_millis = 0 ,
121- ),
122- )
120+ async def test_coordination_lock_describe_full_async (self , async_coordination_node ):
121+ client , node_path , config = async_coordination_node
123122
124123 lock = client .lock ("test_lock" , node_path )
125124
126- create_resp : CreateSemaphoreResult = await lock .create (init_limit = 1 , init_data = b"init-data" )
127- assert create_resp .status == StatusCode .SUCCESS
125+ create = await lock .create (init_limit = 1 , init_data = b"hello" )
126+ assert create .status == StatusCode .SUCCESS
127+
128+ desc = await lock .describe ()
129+ assert desc .status == StatusCode .SUCCESS
130+ assert desc .name == "test_lock"
131+ assert desc .data == b"hello"
132+ assert desc .count == 0
133+ assert desc .ephemeral is False
134+ assert list (desc .owners ) == []
135+ assert list (desc .waiters ) == []
136+
137+ upd = await lock .update (new_data = b"world" )
138+ assert upd .status == StatusCode .SUCCESS
139+
140+ desc2 = await lock .describe ()
141+ assert desc2 .status == StatusCode .SUCCESS
142+ assert desc2 .name == "test_lock"
143+ assert desc2 .data == b"world"
144+ assert desc2 .count == 0
145+ assert desc2 .ephemeral is False
146+ assert list (desc2 .owners ) == []
147+ assert list (desc2 .waiters ) == []
148+
149+
150+ delete = await lock .delete ()
151+ assert delete .status == StatusCode .SUCCESS
152+
153+ desc_after = await lock .describe ()
154+ assert desc_after .status == StatusCode .NOT_FOUND
155+
156+ def test_coordination_lock_describe_full_sync (self , sync_coordination_node ):
157+ client , node_path , config = sync_coordination_node
158+
159+ lock = client .lock ("test_lock" , node_path )
160+
161+ create = lock .create (init_limit = 1 , init_data = b"hello" )
162+ assert create .status == StatusCode .SUCCESS
163+
164+ desc = lock .describe ()
165+ assert desc .status == StatusCode .SUCCESS
166+ assert desc .name == "test_lock"
167+ assert desc .data == b"hello"
168+ assert desc .count == 0
169+ assert desc .ephemeral is False
170+ assert list (desc .owners ) == []
171+ assert list (desc .waiters ) == []
172+ upd = lock .update (new_data = b"world" )
173+ assert upd .status == StatusCode .SUCCESS
174+
175+ desc2 = lock .describe ()
176+ assert desc2 .status == StatusCode .SUCCESS
177+ assert desc2 .name == "test_lock"
178+ assert desc2 .data == b"world"
179+ assert desc2 .count == 0
180+ assert desc2 .ephemeral is False
181+ assert list (desc2 .owners ) == []
182+ assert list (desc2 .waiters ) == []
183+
184+ delete = lock .delete ()
185+ assert delete .status == StatusCode .SUCCESS
186+
187+ desc_after = lock .describe ()
188+ assert desc_after .status == StatusCode .NOT_FOUND
189+
190+ async def test_coordination_lock_racing_async (self , async_coordination_node ):
191+ client , node_path , initial_config = async_coordination_node
192+ small_timeout = 0.5
193+
194+ lock = client .lock ("test_lock" , node_path )
195+
196+ await lock .create (init_limit = 1 , init_data = b"init-data" )
128197
129198 describe_resp : DescribeLockResult = await lock .describe ()
130199 assert describe_resp .status == StatusCode .SUCCESS
131- assert describe_resp .name == "test_lock"
132- assert describe_resp .data == b"init-data"
133- assert describe_resp .count == 0
134- assert describe_resp .ephemeral is False
135- assert list (describe_resp .owners ) == []
136- assert list (describe_resp .waiters ) == []
137-
138- update_resp = await lock .update (new_data = b"updated-data" )
139- assert update_resp .status == StatusCode .SUCCESS
140-
141- describe_resp2 : DescribeLockResult = await lock .describe ()
142- assert describe_resp2 .status == StatusCode .SUCCESS
143- assert describe_resp2 .name == "test_lock"
144- assert describe_resp2 .data == b"updated-data"
145- assert describe_resp2 .count == 0
146- assert describe_resp2 .ephemeral is False
147- assert list (describe_resp2 .owners ) == []
148- assert list (describe_resp2 .waiters ) == []
149200
150201 lock2_started = asyncio .Event ()
151202 lock2_acquired = asyncio .Event ()
@@ -154,58 +205,30 @@ async def second_lock_task():
154205 lock2_started .set ()
155206 async with client .lock ("test_lock" , node_path ):
156207 lock2_acquired .set ()
157- await asyncio .sleep (0.5 )
208+ await asyncio .sleep (small_timeout )
158209
159210 async with client .lock ("test_lock" , node_path ) as lock1 :
160211
161212 resp : DescribeLockResult = await lock1 .describe ()
162213 assert resp .status == StatusCode .SUCCESS
163- assert resp .name == "test_lock"
164- assert resp .data == b"updated-data"
165- assert resp .count == 1
166- assert resp .ephemeral is False
167- assert len (list (resp .owners )) == 1
168- assert list (resp .waiters ) == []
169214
170215 t2 = asyncio .create_task (second_lock_task ())
171216 await lock2_started .wait ()
172217
173- await asyncio .sleep (0.5 )
218+ await asyncio .sleep (small_timeout )
174219
175- await asyncio .wait_for (lock2_acquired .wait (), timeout = 5 )
176- await asyncio .wait_for (t2 , timeout = 5 )
177-
178- async with client .lock ("test_lock" , node_path ) as lock3 :
179-
180- resp3 : DescribeLockResult = await lock3 .describe ()
181- assert resp3 .status == StatusCode .SUCCESS
182- assert resp3 .count == 1
220+ await asyncio .wait_for (lock2_acquired .wait (), timeout = small_timeout )
221+ await asyncio .wait_for (t2 , timeout = small_timeout )
183222
184223 delete_resp = await lock .delete ()
185224 assert delete_resp .status == StatusCode .SUCCESS
186225
187226 describe_after_delete : DescribeLockResult = await lock .describe ()
188227 assert describe_after_delete .status == StatusCode .NOT_FOUND
189228
190- def test_coordination_lock_full_lifecycle_sync (self , driver_sync ):
191- client = CoordinationClient (driver_sync )
192- node_path = "/local/test_lock_full_lifecycle"
193-
194- try :
195- client .delete_node (node_path )
196- except ydb .SchemeError :
197- pass
198-
199- client .create_node (
200- node_path ,
201- NodeConfig (
202- session_grace_period_millis = 1000 ,
203- attach_consistency_mode = ConsistencyMode .STRICT ,
204- read_consistency_mode = ConsistencyMode .STRICT ,
205- rate_limiter_counters_mode = RateLimiterCountersMode .UNSET ,
206- self_check_period_millis = 0 ,
207- ),
208- )
229+ def test_coordination_lock_racing_sync (self , sync_coordination_node ):
230+ client , node_path , initial_config = sync_coordination_node
231+ small_timeout = 0.5
209232
210233 lock = client .lock ("test_lock" , node_path )
211234
@@ -214,15 +237,9 @@ def test_coordination_lock_full_lifecycle_sync(self, driver_sync):
214237
215238 describe_resp : DescribeLockResult = lock .describe ()
216239 assert describe_resp .status == StatusCode .SUCCESS
217- assert describe_resp .data == b"init-data"
218-
219- update_resp = lock .update (new_data = b"updated-data" )
220- assert update_resp .status == StatusCode .SUCCESS
221- assert lock .describe ().data == b"updated-data"
222240
223241 lock2_ready = threading .Event ()
224242 lock2_acquired = threading .Event ()
225- thread_exc = {"err" : None }
226243
227244 def second_lock_task ():
228245 try :
@@ -239,30 +256,14 @@ def second_lock_task():
239256 with client .lock ("test_lock" , node_path ) as lock1 :
240257 resp = lock1 .describe ()
241258 assert resp .status == StatusCode .SUCCESS
242- assert resp .count == 1
243-
244259 t2 .start ()
245- started = lock2_ready .wait (timeout = 2.0 )
246- assert started , "Second thread did not signal readiness to acquire lock"
247-
248- acquired = lock2_acquired .wait (timeout = 10.0 )
249- t2 .join (timeout = 5.0 )
250-
251- if not acquired :
252- if thread_exc ["err" ]:
253- raise AssertionError (f"Second thread raised exception: { thread_exc ['err' ]!r} " ) from thread_exc ["err" ]
254- else :
255- raise AssertionError ("Second thread did not acquire the lock in time. Check logs for details." )
256-
257- assert not t2 .is_alive (), "Second thread did not finish after acquiring lock"
260+ lock2_ready .wait (timeout = small_timeout )
258261
259- with client .lock ("test_lock" , node_path ) as lock3 :
260- resp3 : DescribeLockResult = lock3 .describe ()
261- assert resp3 .status == StatusCode .SUCCESS
262- assert resp3 .count == 1
262+ lock2_acquired .wait (timeout = small_timeout )
263+ t2 .join (timeout = small_timeout )
263264
264265 delete_resp = lock .delete ()
265266 assert delete_resp .status == StatusCode .SUCCESS
266- time .sleep (0.1 )
267+ time .sleep (small_timeout )
267268 describe_after_delete : DescribeLockResult = lock .describe ()
268269 assert describe_after_delete .status == StatusCode .NOT_FOUND
0 commit comments