Skip to content

Commit f602d4c

Browse files
committed
fix handling of maxConnecting
1 parent 6623261 commit f602d4c

File tree

4 files changed

+46
-72
lines changed

4 files changed

+46
-72
lines changed

pymongo/asynchronous/pool.py

Lines changed: 8 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -788,7 +788,6 @@ def __init__(
788788
# Enforces: maxConnecting
789789
# Also used for: clearing the wait queue
790790
self._max_connecting_cond = _async_create_condition(self.lock)
791-
self._max_connecting = self.opts.max_connecting
792791
self._pending = 0
793792
self._client_id = client_id
794793
self._backoff = 0
@@ -931,6 +930,11 @@ async def _reset(
931930
for conn in sockets:
932931
await conn.close_conn(ConnectionClosedReason.STALE)
933932

933+
@property
934+
def max_connecting(self) -> int:
935+
"""The current max connecting limit for the pool."""
936+
return 1 if self._backoff else self.opts.max_connecting
937+
934938
async def update_is_writable(self, is_writable: Optional[bool]) -> None:
935939
"""Updates the is_writable attribute on all sockets currently in the
936940
Pool.
@@ -997,8 +1001,7 @@ async def remove_stale_sockets(self, reference_generation: int) -> None:
9971001
async with self._max_connecting_cond:
9981002
# If maxConnecting connections are already being created
9991003
# by this pool then try again later instead of waiting.
1000-
max_connecting = 1 if self._backoff else self._max_connecting
1001-
if self._pending >= max_connecting:
1004+
if self._pending >= self.max_connecting:
10021005
return
10031006
self._pending += 1
10041007
incremented = True
@@ -1297,13 +1300,12 @@ async def _get_conn(
12971300
# to be checked back into the pool.
12981301
async with self._max_connecting_cond:
12991302
self._raise_if_not_ready(checkout_started_time, emit_event=False)
1300-
max_connecting = 1 if self._backoff else self._max_connecting
1301-
while not (self.conns or self._pending < max_connecting):
1303+
while not (self.conns or self._pending < self.max_connecting):
13021304
timeout = deadline - time.monotonic() if deadline else None
13031305
if not await _async_cond_wait(self._max_connecting_cond, timeout):
13041306
# Timed out, notify the next thread to ensure a
13051307
# timeout doesn't consume the condition.
1306-
if self.conns or self._pending < max_connecting:
1308+
if self.conns or self._pending < self.max_connecting:
13071309
self._max_connecting_cond.notify()
13081310
emitted_event = True
13091311
self._raise_wait_queue_timeout(checkout_started_time)

pymongo/synchronous/pool.py

Lines changed: 8 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -786,7 +786,6 @@ def __init__(
786786
# Enforces: maxConnecting
787787
# Also used for: clearing the wait queue
788788
self._max_connecting_cond = _create_condition(self.lock)
789-
self._max_connecting = self.opts.max_connecting
790789
self._pending = 0
791790
self._client_id = client_id
792791
self._backoff = 0
@@ -929,6 +928,11 @@ def _reset(
929928
for conn in sockets:
930929
conn.close_conn(ConnectionClosedReason.STALE)
931930

931+
@property
932+
def max_connecting(self) -> int:
933+
"""The current max connecting limit for the pool."""
934+
return 1 if self._backoff else self.opts.max_connecting
935+
932936
def update_is_writable(self, is_writable: Optional[bool]) -> None:
933937
"""Updates the is_writable attribute on all sockets currently in the
934938
Pool.
@@ -993,8 +997,7 @@ def remove_stale_sockets(self, reference_generation: int) -> None:
993997
with self._max_connecting_cond:
994998
# If maxConnecting connections are already being created
995999
# by this pool then try again later instead of waiting.
996-
max_connecting = 1 if self._backoff else self._max_connecting
997-
if self._pending >= max_connecting:
1000+
if self._pending >= self.max_connecting:
9981001
return
9991002
self._pending += 1
10001003
incremented = True
@@ -1293,13 +1296,12 @@ def _get_conn(
12931296
# to be checked back into the pool.
12941297
with self._max_connecting_cond:
12951298
self._raise_if_not_ready(checkout_started_time, emit_event=False)
1296-
max_connecting = 1 if self._backoff else self._max_connecting
1297-
while not (self.conns or self._pending < max_connecting):
1299+
while not (self.conns or self._pending < self.max_connecting):
12981300
timeout = deadline - time.monotonic() if deadline else None
12991301
if not _cond_wait(self._max_connecting_cond, timeout):
13001302
# Timed out, notify the next thread to ensure a
13011303
# timeout doesn't consume the condition.
1302-
if self.conns or self._pending < max_connecting:
1304+
if self.conns or self._pending < self.max_connecting:
13031305
self._max_connecting_cond.notify()
13041306
emitted_event = True
13051307
self._raise_wait_queue_timeout(checkout_started_time)

test/asynchronous/test_pooling.py

Lines changed: 15 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -395,52 +395,40 @@ async def test_checkout_more_than_max_pool_size(self):
395395
await asyncio.sleep(0.05)
396396
await pool.close()
397397

398-
async def _check_maxConnecting(
399-
self, client: AsyncMongoClient, backoff=False
400-
) -> tuple[int, int]:
401-
coll = client.test.test.with_options(read_preference=ReadPreference.PRIMARY)
402-
await coll.insert_one({})
403-
398+
async def test_maxConnecting(self):
399+
client = await self.async_rs_or_single_client()
400+
await self.client.test.test.insert_one({})
401+
self.addAsyncCleanup(self.client.test.test.delete_many, {})
404402
pool = await async_get_pool(client)
405-
if backoff:
406-
pool._backoff = 1
407403
docs = []
408404

409405
# Run 50 short running operations
410406
async def find_one():
411-
docs.append(await coll.find_one({}))
407+
docs.append(await client.test.test.find_one({}))
412408

413409
tasks = [ConcurrentRunner(target=find_one) for _ in range(50)]
414410
for task in tasks:
415411
await task.start()
416412
for task in tasks:
417413
await task.join(10)
418414

419-
await coll.delete_many({})
420-
421-
return len(docs), len(pool.conns)
422-
423-
async def test_maxConnecting(self):
424-
client = await self.async_rs_or_single_client()
425-
num_docs, num_conns = await self._check_maxConnecting(client)
426-
427-
self.assertEqual(num_docs, 50)
428-
self.assertLessEqual(num_conns, 50)
415+
self.assertEqual(len(docs), 50)
416+
self.assertLessEqual(len(pool.conns), 50)
429417
# TLS and auth make connection establishment more expensive than
430418
# the query which leads to more threads hitting maxConnecting.
431419
# The end result is fewer total connections and better latency.
432420
if async_client_context.tls and async_client_context.auth_enabled:
433-
self.assertLessEqual(num_conns, 30)
421+
self.assertLessEqual(len(pool.conns), 30)
434422
else:
435-
self.assertLessEqual(num_conns, 50)
423+
self.assertLessEqual(len(pool.conns), 50)
436424
# MongoDB 4.4.1 with auth + ssl:
437425
# maxConnecting = 2: 6 connections in ~0.231+ seconds
438426
# maxConnecting = unbounded: 50 connections in ~0.642+ seconds
439427
#
440428
# MongoDB 4.4.1 with no-auth no-ssl Python 3.8:
441429
# maxConnecting = 2: 15-22 connections in ~0.108+ seconds
442430
# maxConnecting = unbounded: 30+ connections in ~0.140+ seconds
443-
print(num_conns)
431+
print(len(pool.conns))
444432

445433
@async_client_context.require_failCommand_appName
446434
async def test_csot_timeout_message(self):
@@ -588,17 +576,13 @@ async def test_pool_backoff_preserves_existing_connections(self):
588576
await pool.close()
589577

590578
async def test_pool_backoff_limits_maxConnecting(self):
591-
client = await self.async_rs_or_single_client(maxConnecting=30)
592-
_, baseline_conns = await self._check_maxConnecting(client)
593-
await client.close()
594-
595-
client = await self.async_rs_or_single_client(maxConnecting=30)
596-
_, backoff_conns = await self._check_maxConnecting(client, backoff=True)
579+
client = await self.async_rs_or_single_client(maxConnecting=10)
580+
pool = await async_get_pool(client)
581+
assert pool.maxConnecting == 10
582+
pool._backoff = 1
583+
assert pool.maxConnecting == 1
597584
await client.close()
598585

599-
# We should have created less conns due to limiting maxConnecting.
600-
self.assertLess(backoff_conns, baseline_conns)
601-
602586

603587
class TestPoolMaxSize(_TestPoolingBase):
604588
async def test_max_pool_size(self):

test/test_pooling.py

Lines changed: 15 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -395,50 +395,40 @@ def test_checkout_more_than_max_pool_size(self):
395395
time.sleep(0.05)
396396
pool.close()
397397

398-
def _check_maxConnecting(self, client: MongoClient, backoff=False) -> tuple[int, int]:
399-
coll = client.test.test.with_options(read_preference=ReadPreference.PRIMARY)
400-
coll.insert_one({})
401-
398+
def test_maxConnecting(self):
399+
client = self.rs_or_single_client()
400+
self.client.test.test.insert_one({})
401+
self.addCleanup(self.client.test.test.delete_many, {})
402402
pool = get_pool(client)
403-
if backoff:
404-
pool._backoff = 1
405403
docs = []
406404

407405
# Run 50 short running operations
408406
def find_one():
409-
docs.append(coll.find_one({}))
407+
docs.append(client.test.test.find_one({}))
410408

411409
tasks = [ConcurrentRunner(target=find_one) for _ in range(50)]
412410
for task in tasks:
413411
task.start()
414412
for task in tasks:
415413
task.join(10)
416414

417-
coll.delete_many({})
418-
419-
return len(docs), len(pool.conns)
420-
421-
def test_maxConnecting(self):
422-
client = self.rs_or_single_client()
423-
num_docs, num_conns = self._check_maxConnecting(client)
424-
425-
self.assertEqual(num_docs, 50)
426-
self.assertLessEqual(num_conns, 50)
415+
self.assertEqual(len(docs), 50)
416+
self.assertLessEqual(len(pool.conns), 50)
427417
# TLS and auth make connection establishment more expensive than
428418
# the query which leads to more threads hitting maxConnecting.
429419
# The end result is fewer total connections and better latency.
430420
if client_context.tls and client_context.auth_enabled:
431-
self.assertLessEqual(num_conns, 30)
421+
self.assertLessEqual(len(pool.conns), 30)
432422
else:
433-
self.assertLessEqual(num_conns, 50)
423+
self.assertLessEqual(len(pool.conns), 50)
434424
# MongoDB 4.4.1 with auth + ssl:
435425
# maxConnecting = 2: 6 connections in ~0.231+ seconds
436426
# maxConnecting = unbounded: 50 connections in ~0.642+ seconds
437427
#
438428
# MongoDB 4.4.1 with no-auth no-ssl Python 3.8:
439429
# maxConnecting = 2: 15-22 connections in ~0.108+ seconds
440430
# maxConnecting = unbounded: 30+ connections in ~0.140+ seconds
441-
print(num_conns)
431+
print(len(pool.conns))
442432

443433
@client_context.require_failCommand_appName
444434
def test_csot_timeout_message(self):
@@ -584,17 +574,13 @@ def test_pool_backoff_preserves_existing_connections(self):
584574
pool.close()
585575

586576
def test_pool_backoff_limits_maxConnecting(self):
587-
client = self.rs_or_single_client(maxConnecting=30)
588-
_, baseline_conns = self._check_maxConnecting(client)
589-
client.close()
590-
591-
client = self.rs_or_single_client(maxConnecting=30)
592-
_, backoff_conns = self._check_maxConnecting(client, backoff=True)
577+
client = self.rs_or_single_client(maxConnecting=10)
578+
pool = get_pool(client)
579+
assert pool.maxConnecting == 10
580+
pool._backoff = 1
581+
assert pool.maxConnecting == 1
593582
client.close()
594583

595-
# We should have created less conns due to limiting maxConnecting.
596-
self.assertLess(backoff_conns, baseline_conns)
597-
598584

599585
class TestPoolMaxSize(_TestPoolingBase):
600586
def test_max_pool_size(self):

0 commit comments

Comments
 (0)