Skip to content

Commit b666884

Browse files
committed
Routing driver forgets addresses on some errors
This commit makes routing driver forget addresses and purge their connections on network and database errors. It also makes driver forget write server addresses when they fail as writers, connections remain in the pool because such machines can remain readers/routers.
1 parent f243ee6 commit b666884

File tree

8 files changed

+156
-6
lines changed

8 files changed

+156
-6
lines changed

neo4j/bolt/connection.py

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -144,6 +144,9 @@ class Connection(object):
144144
#: Error class used for raising connection errors
145145
Error = ServiceUnavailable
146146

147+
#: The function to handle send and receive errors
148+
error_handler = None
149+
147150
_supports_statement_reuse = False
148151

149152
_last_run_statement = None
@@ -237,6 +240,14 @@ def reset(self):
237240
self.sync()
238241

239242
def send(self):
243+
try:
244+
self._send()
245+
except Exception as error:
246+
if self.error_handler is not None:
247+
self.error_handler(error)
248+
raise error
249+
250+
def _send(self):
240251
""" Send all queued messages to the server.
241252
"""
242253
data = self.output_buffer.view()
@@ -250,6 +261,14 @@ def send(self):
250261
self.output_buffer.clear()
251262

252263
def fetch(self):
264+
try:
265+
return self._fetch()
266+
except Exception as error:
267+
if self.error_handler is not None:
268+
self.error_handler(error)
269+
raise error
270+
271+
def _fetch(self):
253272
""" Receive at least one message from the server, if available.
254273
255274
:return: 2-tuple of number of detail messages and number of summary messages fetched

neo4j/v1/api.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -277,7 +277,7 @@ def _disconnect(self, sync):
277277
if sync:
278278
try:
279279
self._connection.sync()
280-
except ServiceUnavailable:
280+
except (SessionError, ServiceUnavailable):
281281
pass
282282
if self._connection:
283283
self._connection.in_use = False

neo4j/v1/routing.py

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -251,6 +251,9 @@ class RoutingConnectionPool(ConnectionPool):
251251
""" Connection pool with routing table.
252252
"""
253253

254+
FAILURE_CODES = ("Neo.TransientError.General.DatabaseUnavailable")
255+
WRITE_FAILURE_CODES = ("Neo.ClientError.Cluster.NotALeader", "Neo.ClientError.General.ForbiddenOnReadOnlyDatabase")
256+
254257
def __init__(self, connector, initial_address, routing_context, *routers, **config):
255258
super(RoutingConnectionPool, self).__init__(connector)
256259
self.initial_address = initial_address
@@ -399,12 +402,21 @@ def acquire(self, access_mode=None):
399402
try:
400403
connection = self.acquire_direct(address) # should always be a resolved address
401404
connection.Error = SessionExpired
405+
connection.error_handler = lambda error: self._handle_connection_error(address, error)
402406
except ServiceUnavailable:
403407
self.remove(address)
404408
else:
405409
return connection
406410
raise SessionExpired("Failed to obtain connection towards '%s' server." % access_mode)
407411

412+
def _handle_connection_error(self, address, error):
413+
""" Handle routing connection send or receive error.
414+
"""
415+
if isinstance(error, (SessionExpired, ServiceUnavailable)) or error.code in self.FAILURE_CODES:
416+
self.remove(address)
417+
elif error.code in self.WRITE_FAILURE_CODES:
418+
self._remove_writer(address)
419+
408420
def remove(self, address):
409421
""" Remove an address from the connection pool, if present, closing
410422
all connections to that address. Also remove from the routing table.
@@ -416,6 +428,11 @@ def remove(self, address):
416428
self.routing_table.writers.discard(address)
417429
super(RoutingConnectionPool, self).remove(address)
418430

431+
def _remove_writer(self, address):
432+
""" Remove a writer address from the routing table, if present.
433+
"""
434+
self.routing_table.writers.discard(address)
435+
419436

420437
class RoutingDriver(Driver):
421438
""" A :class:`.RoutingDriver` is created from a ``bolt+routing`` URI. The
Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,12 @@
1+
!: AUTO INIT
2+
!: AUTO RESET
3+
!: AUTO PULL_ALL
4+
!: AUTO ACK_FAILURE
5+
!: AUTO RUN "ROLLBACK" {}
6+
!: AUTO RUN "BEGIN" {}
7+
!: AUTO RUN "COMMIT" {}
8+
9+
C: RUN "RETURN 1" {}
10+
C: PULL_ALL
11+
S: FAILURE {"code": "Neo.TransientError.General.DatabaseUnavailable", "message": "Database is busy doing store copy"}
12+
S: IGNORED
Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,12 @@
1+
!: AUTO INIT
2+
!: AUTO RESET
3+
!: AUTO PULL_ALL
4+
!: AUTO ACK_FAILURE
5+
!: AUTO RUN "ROLLBACK" {}
6+
!: AUTO RUN "BEGIN" {}
7+
!: AUTO RUN "COMMIT" {}
8+
9+
C: RUN "CREATE (n {name:'Bob'})" {}
10+
C: PULL_ALL
11+
S: FAILURE {"code": "Neo.ClientError.General.ForbiddenOnReadOnlyDatabase", "message": "Unable to write"}
12+
S: IGNORED
Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,12 @@
1+
!: AUTO INIT
2+
!: AUTO RESET
3+
!: AUTO PULL_ALL
4+
!: AUTO ACK_FAILURE
5+
!: AUTO RUN "ROLLBACK" {}
6+
!: AUTO RUN "BEGIN" {}
7+
!: AUTO RUN "COMMIT" {}
8+
9+
C: RUN "CREATE (n {name:'Bob'})" {}
10+
C: PULL_ALL
11+
S: FAILURE {"code": "Neo.ClientError.Cluster.NotALeader", "message": "Leader switched has happened"}
12+
S: IGNORED
Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,7 @@
1+
!: AUTO INIT
2+
!: AUTO RESET
3+
4+
C: RUN "RETURN 1" {}
5+
PULL_ALL
6+
S: <EXIT>
7+

test/stub/test_routingdriver.py

Lines changed: 76 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@
2121

2222
from neo4j.v1 import GraphDatabase, READ_ACCESS, WRITE_ACCESS, SessionExpired, \
2323
RoutingDriver, RoutingConnectionPool, LeastConnectedLoadBalancingStrategy, LOAD_BALANCING_STRATEGY_ROUND_ROBIN, \
24-
RoundRobinLoadBalancingStrategy
24+
RoundRobinLoadBalancingStrategy, TransientError, ClientError
2525
from neo4j.bolt import ProtocolError, ServiceUnavailable
2626

2727
from test.stub.tools import StubTestCase, StubCluster
@@ -236,8 +236,79 @@ def test_can_select_round_robin_load_balancing_strategy(self):
236236
self.assertIsInstance(driver._pool.load_balancing_strategy, RoundRobinLoadBalancingStrategy)
237237

238238
def test_no_other_load_balancing_strategies_are_available(self):
239-
with StubCluster({9001: "router.script"}):
239+
uri = "bolt+routing://127.0.0.1:9001"
240+
with self.assertRaises(ValueError):
241+
with GraphDatabase.driver(uri, auth=self.auth_token, encrypted=False, load_balancing_strategy=-1):
242+
pass
243+
244+
def test_forgets_address_on_not_a_leader_error(self):
245+
with StubCluster({9001: "router.script", 9006: "not_a_leader.script"}):
240246
uri = "bolt+routing://127.0.0.1:9001"
241-
with self.assertRaises(ValueError):
242-
with GraphDatabase.driver(uri, auth=self.auth_token, encrypted=False, load_balancing_strategy=-1):
243-
pass
247+
with GraphDatabase.driver(uri, auth=self.auth_token, encrypted=False) as driver:
248+
with driver.session(WRITE_ACCESS) as session:
249+
with self.assertRaises(ClientError):
250+
_ = session.run("CREATE (n {name:'Bob'})")
251+
252+
pool = driver._pool
253+
table = pool.routing_table
254+
255+
# address might still have connections in the pool, failed instance just can't serve writes
256+
assert ('127.0.0.1', 9006) in pool.connections
257+
assert table.routers == {('127.0.0.1', 9001), ('127.0.0.1', 9002), ('127.0.0.1', 9003)}
258+
assert table.readers == {('127.0.0.1', 9004), ('127.0.0.1', 9005)}
259+
# writer 127.0.0.1:9006 should've been forgotten because of an error
260+
assert len(table.writers) == 0
261+
262+
def test_forgets_address_on_forbidden_on_read_only_database_error(self):
263+
with StubCluster({9001: "router.script", 9006: "forbidden_on_read_only_database.script"}):
264+
uri = "bolt+routing://127.0.0.1:9001"
265+
with GraphDatabase.driver(uri, auth=self.auth_token, encrypted=False) as driver:
266+
with driver.session(WRITE_ACCESS) as session:
267+
with self.assertRaises(ClientError):
268+
_ = session.run("CREATE (n {name:'Bob'})")
269+
270+
pool = driver._pool
271+
table = pool.routing_table
272+
273+
# address might still have connections in the pool, failed instance just can't serve writes
274+
assert ('127.0.0.1', 9006) in pool.connections
275+
assert table.routers == {('127.0.0.1', 9001), ('127.0.0.1', 9002), ('127.0.0.1', 9003)}
276+
assert table.readers == {('127.0.0.1', 9004), ('127.0.0.1', 9005)}
277+
# writer 127.0.0.1:9006 should've been forgotten because of an error
278+
assert len(table.writers) == 0
279+
280+
def test_forgets_address_on_service_unavailable_error(self):
281+
with StubCluster({9001: "router.script", 9004: "rude_reader.script"}):
282+
uri = "bolt+routing://127.0.0.1:9001"
283+
with GraphDatabase.driver(uri, auth=self.auth_token, encrypted=False) as driver:
284+
with driver.session(READ_ACCESS) as session:
285+
with self.assertRaises(SessionExpired):
286+
_ = session.run("RETURN 1")
287+
288+
pool = driver._pool
289+
table = pool.routing_table
290+
291+
# address should not have connections in the pool, it has failed
292+
assert ('127.0.0.1', 9004) not in pool.connections
293+
assert table.routers == {('127.0.0.1', 9001), ('127.0.0.1', 9002), ('127.0.0.1', 9003)}
294+
# reader 127.0.0.1:9004 should've been forgotten because of an error
295+
assert table.readers == {('127.0.0.1', 9005)}
296+
assert table.writers == {('127.0.0.1', 9006)}
297+
298+
def test_forgets_address_on_database_unavailable_error(self):
299+
with StubCluster({9001: "router.script", 9004: "database_unavailable.script"}):
300+
uri = "bolt+routing://127.0.0.1:9001"
301+
with GraphDatabase.driver(uri, auth=self.auth_token, encrypted=False) as driver:
302+
with driver.session(READ_ACCESS) as session:
303+
with self.assertRaises(TransientError):
304+
_ = session.run("RETURN 1")
305+
306+
pool = driver._pool
307+
table = pool.routing_table
308+
309+
# address should not have connections in the pool, it has failed
310+
assert ('127.0.0.1', 9004) not in pool.connections
311+
assert table.routers == {('127.0.0.1', 9001), ('127.0.0.1', 9002), ('127.0.0.1', 9003)}
312+
# reader 127.0.0.1:9004 should've been forgotten because of an error
313+
assert table.readers == {('127.0.0.1', 9005)}
314+
assert table.writers == {('127.0.0.1', 9006)}

0 commit comments

Comments
 (0)