Skip to content

Commit 8026e7a

Browse files
committed
Flag implicit sessions used by retryable operations that also use CommandCursors
1 parent d65fe45 commit 8026e7a

File tree

6 files changed

+40
-8
lines changed

6 files changed

+40
-8
lines changed

pymongo/asynchronous/client_session.py

Lines changed: 14 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -514,6 +514,7 @@ def __init__(
514514
self._implicit = implicit
515515
self._transaction = _Transaction(None, client)
516516
self._attached_to_cursor = False
517+
self._leave_alive = False
517518

518519
async def end_session(self) -> None:
519520
"""Finish this session. If a transaction has started, abort it.
@@ -536,7 +537,7 @@ async def _end_session(self, lock: bool) -> None:
536537

537538
def _end_implicit_session(self) -> None:
538539
# Implicit sessions can't be part of transactions or pinned connections
539-
if self._server_session is not None:
540+
if not self._leave_alive and self._server_session is not None:
540541
# print(f"Ending session {self}, implicit: {self._implicit}, attached: {self._attached_to_cursor}")
541542
self._client._return_server_session(self._server_session)
542543
self._server_session = None
@@ -606,6 +607,18 @@ def attached_to_cursor(self) -> bool:
606607
def attached_to_cursor(self, value: bool) -> None:
607608
self._attached_to_cursor = value
608609

610+
@property
611+
def leave_alive(self) -> bool:
612+
"""Whether to leave this session alive when it is
613+
no longer in use.
614+
Typically used for implicit sessions that are used for multiple operations within a single larger operation.
615+
"""
616+
return self._leave_alive
617+
618+
@leave_alive.setter
619+
def leave_alive(self, value: bool) -> None:
620+
self._leave_alive = value
621+
609622
def _inherit_option(self, name: str, val: _T) -> _T:
610623
"""Return the inherited TransactionOption value."""
611624
if val:

pymongo/asynchronous/command_cursor.py

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -65,6 +65,7 @@ def __init__(
6565
max_await_time_ms: Optional[int] = None,
6666
session: Optional[AsyncClientSession] = None,
6767
comment: Any = None,
68+
leave_session_alive: bool = False,
6869
) -> None:
6970
"""Create a new command cursor."""
7071
self._sock_mgr: Any = None
@@ -83,6 +84,7 @@ def __init__(
8384
self._session.attached_to_cursor = True
8485
self._killed = self._id == 0
8586
self._comment = comment
87+
self._leave_session_alive = leave_session_alive
8688
if self._killed:
8789
self._end_session()
8890

@@ -240,8 +242,9 @@ async def _die_lock(self) -> None:
240242
self._sock_mgr = None
241243

242244
def _end_session(self) -> None:
243-
if self._session:
244-
if self._session.implicit:
245+
if self._session and self._session.implicit:
246+
self._session.attached_to_cursor = False
247+
if not self._leave_session_alive:
245248
# print(f"Ending session {self}, session: {self._session}")
246249
self._session._end_implicit_session()
247250
self._session = None

pymongo/asynchronous/database.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -611,6 +611,7 @@ async def create_collection(
611611
common.validate_is_mapping("clusteredIndex", clustered_index)
612612

613613
async with self._client._tmp_session(session) as s:
614+
s.leave_alive = True
614615
# Skip this check in a transaction where listCollections is not
615616
# supported.
616617
if (
@@ -705,7 +706,6 @@ async def aggregate(
705706
AsyncCommandCursor,
706707
pipeline,
707708
kwargs,
708-
session is not None,
709709
user_fields={"cursor": {"firstBatch": 1}},
710710
)
711711
return await self.client._retryable_read(

pymongo/synchronous/client_session.py

Lines changed: 14 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -513,6 +513,7 @@ def __init__(
513513
self._implicit = implicit
514514
self._transaction = _Transaction(None, client)
515515
self._attached_to_cursor = False
516+
self._leave_alive = False
516517

517518
def end_session(self) -> None:
518519
"""Finish this session. If a transaction has started, abort it.
@@ -535,7 +536,7 @@ def _end_session(self, lock: bool) -> None:
535536

536537
def _end_implicit_session(self) -> None:
537538
# Implicit sessions can't be part of transactions or pinned connections
538-
if self._server_session is not None:
539+
if not self._leave_alive and self._server_session is not None:
539540
# print(f"Ending session {self}, implicit: {self._implicit}, attached: {self._attached_to_cursor}")
540541
self._client._return_server_session(self._server_session)
541542
self._server_session = None
@@ -605,6 +606,18 @@ def attached_to_cursor(self) -> bool:
605606
def attached_to_cursor(self, value: bool) -> None:
606607
self._attached_to_cursor = value
607608

609+
@property
610+
def leave_alive(self) -> bool:
611+
"""Whether to leave this session alive when it is
612+
no longer in use.
613+
Typically used for implicit sessions that are used for multiple operations within a single larger operation.
614+
"""
615+
return self._leave_alive
616+
617+
@leave_alive.setter
618+
def leave_alive(self, value: bool) -> None:
619+
self._leave_alive = value
620+
608621
def _inherit_option(self, name: str, val: _T) -> _T:
609622
"""Return the inherited TransactionOption value."""
610623
if val:

pymongo/synchronous/command_cursor.py

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -65,6 +65,7 @@ def __init__(
6565
max_await_time_ms: Optional[int] = None,
6666
session: Optional[ClientSession] = None,
6767
comment: Any = None,
68+
leave_session_alive: bool = False,
6869
) -> None:
6970
"""Create a new command cursor."""
7071
self._sock_mgr: Any = None
@@ -83,6 +84,7 @@ def __init__(
8384
self._session.attached_to_cursor = True
8485
self._killed = self._id == 0
8586
self._comment = comment
87+
self._leave_session_alive = leave_session_alive
8688
if self._killed:
8789
self._end_session()
8890

@@ -240,8 +242,9 @@ def _die_lock(self) -> None:
240242
self._sock_mgr = None
241243

242244
def _end_session(self) -> None:
243-
if self._session:
244-
if self._session.implicit:
245+
if self._session and self._session.implicit:
246+
self._session.attached_to_cursor = False
247+
if not self._leave_session_alive:
245248
# print(f"Ending session {self}, session: {self._session}")
246249
self._session._end_implicit_session()
247250
self._session = None

pymongo/synchronous/database.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -611,6 +611,7 @@ def create_collection(
611611
common.validate_is_mapping("clusteredIndex", clustered_index)
612612

613613
with self._client._tmp_session(session) as s:
614+
s.leave_alive = True
614615
# Skip this check in a transaction where listCollections is not
615616
# supported.
616617
if (
@@ -705,7 +706,6 @@ def aggregate(
705706
CommandCursor,
706707
pipeline,
707708
kwargs,
708-
session is not None,
709709
user_fields={"cursor": {"firstBatch": 1}},
710710
)
711711
return self.client._retryable_read(

0 commit comments

Comments
 (0)