|
70 | 70 | ) |
71 | 71 | from pymongo.hello import Hello, HelloCompat |
72 | 72 | from pymongo.helpers_shared import _check_command_response, _check_write_command_response |
73 | | -from pymongo.monitoring import ServerHeartbeatFailedEvent, ServerHeartbeatStartedEvent |
| 73 | +from pymongo.monitoring import ( |
| 74 | + ConnectionCheckOutFailedEvent, |
| 75 | + PoolClearedEvent, |
| 76 | + ServerHeartbeatFailedEvent, |
| 77 | + ServerHeartbeatStartedEvent, |
| 78 | +) |
74 | 79 | from pymongo.server_description import SERVER_TYPE, ServerDescription |
75 | 80 | from pymongo.topology_description import TOPOLOGY_TYPE |
76 | 81 |
|
@@ -446,6 +451,66 @@ async def mock_close(self, reason): |
446 | 451 | AsyncConnection.close_conn = original_close |
447 | 452 |
|
448 | 453 |
|
| 454 | +class TestPoolBackpressure(AsyncIntegrationTest): |
| 455 | + # @async_client_context.require_version_min(8, 0, 0) |
| 456 | + async def test_connection_pool_is_not_cleared(self): |
| 457 | + listener = CMAPListener() |
| 458 | + |
| 459 | + # Create a client that listens to CMAP events, with maxConnecting=100. |
| 460 | + client = await self.async_rs_or_single_client(maxConnecting=100, event_listeners=[listener]) |
| 461 | + |
| 462 | + # Use an admin client for test setup and teardown to enable and disable the ingress rate limiter. |
| 463 | + admin_client = self.client |
| 464 | + await admin_client.admin.command( |
| 465 | + "setParameter", 1, ingressConnectionEstablishmentRateLimiterEnabled=True |
| 466 | + ) |
| 467 | + await admin_client.admin.command( |
| 468 | + "setParameter", 1, ingressConnectionEstablishmentRatePerSec=30 |
| 469 | + ) |
| 470 | + await admin_client.admin.command( |
| 471 | + "setParameter", 1, ingressConnectionEstablishmentBurstCapacitySecs=1 |
| 472 | + ) |
| 473 | + await admin_client.admin.command( |
| 474 | + "setParameter", 1, ingressConnectionEstablishmentMaxQueueDepth=1 |
| 475 | + ) |
| 476 | + |
| 477 | + # Disable the ingress rate limiter on teardown. |
| 478 | + async def teardown(): |
| 479 | + await admin_client.admin.command( |
| 480 | + "setParameter", 1, ingressConnectionEstablishmentRateLimiterEnabled=False |
| 481 | + ) |
| 482 | + |
| 483 | + self.addAsyncCleanup(teardown) |
| 484 | + |
| 485 | + # Seed the collection with a document for us to query with a regex. |
| 486 | + await client.test.test.delete_many({}) |
| 487 | + await client.test.test.insert_one({"str0": "abcdefg"}) |
| 488 | + |
| 489 | + # Run a regex operation to slow down the query. |
| 490 | + async def target(): |
| 491 | + query = {"str0": {"$regex": "abcd"}} |
| 492 | + try: |
| 493 | + await client.test.test.find_one(query) |
| 494 | + except OperationFailure: |
| 495 | + pass |
| 496 | + |
| 497 | + # Warm the pool with 10 tasks so there are existing connections. |
| 498 | + tasks = [] |
| 499 | + for i in range(10): |
| 500 | + tasks.append(asyncio.create_task(target())) |
| 501 | + await asyncio.wait(tasks) |
| 502 | + |
| 503 | + # Run 100 parallel operations that contend for connections. |
| 504 | + tasks = [] |
| 505 | + for i in range(100): |
| 506 | + tasks.append(asyncio.create_task(target())) |
| 507 | + await asyncio.wait(tasks) |
| 508 | + |
| 509 | + # Verify there were at least 10 connection checkout failed event but no pool cleared events. |
| 510 | + self.assertGreater(len(listener.events_by_type(ConnectionCheckOutFailedEvent)), 10) |
| 511 | + self.assertEqual(len(listener.events_by_type(PoolClearedEvent)), 0) |
| 512 | + |
| 513 | + |
449 | 514 | class TestServerMonitoringMode(AsyncIntegrationTest): |
450 | 515 | @async_client_context.require_no_load_balancer |
451 | 516 | async def asyncSetUp(self): |
|
0 commit comments