Skip to content

Commit 3033bcb

Browse files
committed
Timeout and metadata for explicit transactions
1 parent de3f1ea commit 3033bcb

File tree

2 files changed

+56
-6
lines changed

2 files changed

+56
-6
lines changed

neo4j/__init__.py

Lines changed: 15 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -450,12 +450,20 @@ def fail(_):
450450
"server": server,
451451
"protocol_version": protocol_version,
452452
}
453+
454+
def done(summary_metadata):
455+
metadata.update(summary_metadata)
456+
bookmark = metadata.get("bookmark")
457+
if bookmark:
458+
self._bookmarks_in = tuple([bookmark])
459+
self._bookmark_out = bookmark
460+
453461
self._last_result = result = BoltStatementResult(self, hydrant, metadata)
454462
cx.run(statement, parameters, on_success=metadata.update, on_failure=fail)
455463
cx.pull_all(
456464
on_records=lambda records: result._records.extend(
457465
hydrant.hydrate_records(result.keys(), records)),
458-
on_success=metadata.update,
466+
on_success=done,
459467
on_failure=fail,
460468
on_summary=lambda: result.detach(sync=False),
461469
)
@@ -550,12 +558,14 @@ def has_transaction(self):
550558
def _close_transaction(self):
551559
self._transaction = None
552560

553-
def begin_transaction(self, bookmark=None):
561+
def begin_transaction(self, bookmark=None, metadata=None, timeout=None):
554562
""" Create a new :class:`.Transaction` within this session.
555563
Calling this method with a bookmark is equivalent to
556564
557565
:param bookmark: a bookmark to which the server should
558566
synchronise before beginning the transaction
567+
:param metadata:
568+
:param timeout:
559569
:returns: new :class:`.Transaction` instance.
560570
:raise: :class:`.TransactionError` if a transaction is already open
561571
"""
@@ -572,13 +582,13 @@ def begin_transaction(self, bookmark=None):
572582
_warned_about_transaction_bookmarks = True
573583
self._bookmarks_in = tuple([bookmark])
574584

575-
self._open_transaction()
585+
self._open_transaction(metadata=metadata, timeout=timeout)
576586
return self._transaction
577587

578-
def _open_transaction(self, access_mode=None, metadata=None):
588+
def _open_transaction(self, access_mode=None, metadata=None, timeout=None):
579589
self._transaction = Transaction(self, on_close=self._close_transaction)
580590
self._connect(access_mode)
581-
self._connection.begin(self._bookmarks_in)
591+
self._connection.begin(bookmarks=self._bookmarks_in, metadata=metadata, timeout=timeout)
582592

583593
def commit_transaction(self):
584594
""" Commit the current transaction.

test/integration/test_session.py

Lines changed: 41 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,14 +19,15 @@
1919
# limitations under the License.
2020

2121

22+
from time import sleep
2223
from unittest import SkipTest
2324
from uuid import uuid4
2425

2526
from neo4j import \
2627
READ_ACCESS, WRITE_ACCESS, \
2728
CypherError, SessionError, TransactionError, unit_of_work
2829
from neo4j.types.graph import Node, Relationship, Path
29-
from neo4j.exceptions import CypherSyntaxError
30+
from neo4j.exceptions import CypherSyntaxError, TransientError
3031

3132
from test.integration.tools import DirectIntegrationTestCase
3233

@@ -62,6 +63,24 @@ def test_can_run_simple_statement_with_params(self):
6263
session.close()
6364
assert count == 1
6465

66+
def test_autocommit_transactions_use_bookmarks(self):
67+
if self.protocol_version() < 3:
68+
raise SkipTest("Test requires Bolt v3")
69+
bookmarks = []
70+
# Generate an initial bookmark
71+
with self.driver.session() as session:
72+
session.run("CREATE ()").consume()
73+
bookmark = session.last_bookmark()
74+
self.assertIsNotNone(bookmark)
75+
bookmarks.append(bookmark)
76+
# Propagate into another session
77+
with self.driver.session(bookmarks=bookmarks) as session:
78+
self.assertEqual(list(session.next_bookmarks()), bookmarks)
79+
session.run("CREATE ()").consume()
80+
bookmark = session.last_bookmark()
81+
self.assertIsNotNone(bookmark)
82+
self.assertNotIn(bookmark, bookmarks)
83+
6584
def test_fails_on_bad_syntax(self):
6685
session = self.driver.session()
6786
with self.assertRaises(CypherError):
@@ -369,6 +388,27 @@ def test_last_run_statement_should_be_cleared_on_failure(self):
369388
assert connection_2._last_run_statement is None
370389
tx.close()
371390

391+
def test_transaction_metadata(self):
392+
if self.protocol_version() < 3:
393+
raise SkipTest("Test requires Bolt v3")
394+
with self.driver.session() as session:
395+
metadata_in = {"foo": "bar"}
396+
with session.begin_transaction(metadata=metadata_in) as tx:
397+
metadata_out = tx.run("CALL dbms.getTXMetaData").single().value()
398+
self.assertEqual(metadata_in, metadata_out)
399+
400+
def test_transaction_timeout(self):
401+
if self.protocol_version() < 3:
402+
raise SkipTest("Test requires Bolt v3")
403+
with self.driver.session() as s1:
404+
s1.run("CREATE (a:Node)").consume()
405+
with self.driver.session() as s2:
406+
tx1 = s1.begin_transaction()
407+
tx1.run("MATCH (a:Node) SET a.property = 1").consume()
408+
tx2 = s2.begin_transaction(timeout=0.25)
409+
with self.assertRaises(TransientError):
410+
tx2.run("MATCH (a:Node) SET a.property = 2").consume()
411+
372412

373413
class BookmarkingTestCase(DirectIntegrationTestCase):
374414

0 commit comments

Comments
 (0)