3232from select import select
3333from socket import socket , SOL_SOCKET , SO_KEEPALIVE , SHUT_RDWR , error as SocketError , timeout as SocketTimeout , AF_INET , AF_INET6
3434from struct import pack as struct_pack , unpack as struct_unpack
35- from threading import RLock
35+ from threading import RLock , Condition
3636
37+ from neo4j .v1 import ClientError
3738from neo4j .addressing import SocketAddress , is_ip_address
3839from neo4j .bolt .cert import KNOWN_HOSTS
3940from neo4j .bolt .response import InitResponse , AckFailureResponse , ResetResponse
4849ChunkedOutputBuffer = _import_best ("neo4j.bolt._io" , "neo4j.bolt.io" ).ChunkedOutputBuffer
4950
5051
51- INFINITE_CONNECTION_LIFETIME = - 1
52- DEFAULT_MAX_CONNECTION_LIFETIME = INFINITE_CONNECTION_LIFETIME
52+ INFINITE = - 1
53+ DEFAULT_MAX_CONNECTION_LIFETIME = INFINITE
54+ DEFAULT_MAX_CONNECTION_POOL_SIZE = INFINITE
5355DEFAULT_CONNECTION_TIMEOUT = 5.0
56+ DEFAULT_CONNECTION_ACQUISITION_TIMEOUT = 60
5457DEFAULT_PORT = 7687
5558DEFAULT_USER_AGENT = "neo4j-python/%s" % version
5659
@@ -405,11 +408,14 @@ class ConnectionPool(object):
405408
406409 _closed = False
407410
408- def __init__ (self , connector , connection_error_handler ):
411+ def __init__ (self , connector , connection_error_handler , ** config ):
409412 self .connector = connector
410413 self .connection_error_handler = connection_error_handler
411414 self .connections = {}
412415 self .lock = RLock ()
416+ self .cond = Condition (self .lock )
417+ self ._max_connection_pool_size = config .get ("max_connection_pool_size" , DEFAULT_MAX_CONNECTION_POOL_SIZE )
418+ self ._connection_acquisition_timeout = config .get ("connection_acquisition_timeout" , DEFAULT_CONNECTION_ACQUISITION_TIMEOUT )
413419
414420 def __enter__ (self ):
415421 return self
@@ -433,23 +439,42 @@ def acquire_direct(self, address):
433439 connections = self .connections [address ]
434440 except KeyError :
435441 connections = self .connections [address ] = deque ()
436- for connection in list (connections ):
437- if connection .closed () or connection .defunct () or connection .timedout ():
438- connections .remove (connection )
439- continue
440- if not connection .in_use :
441- connection .in_use = True
442- return connection
443- try :
444- connection = self .connector (address , self .connection_error_handler )
445- except ServiceUnavailable :
446- self .remove (address )
447- raise
448- else :
449- connection .pool = self
450- connection .in_use = True
451- connections .append (connection )
452- return connection
442+
443+ connection_acquisition_start_timestamp = clock ()
444+ while True :
445+ # try to find a free connection in pool
446+ for connection in list (connections ):
447+ if connection .closed () or connection .defunct () or connection .timedout ():
448+ connections .remove (connection )
449+ continue
450+ if not connection .in_use :
451+ connection .in_use = True
452+ return connection
453+ # all connections in pool are in-use
454+ can_create_new_connection = self ._max_connection_pool_size == INFINITE or len (connections ) < self ._max_connection_pool_size
455+ if can_create_new_connection :
456+ try :
457+ connection = self .connector (address , self .connection_error_handler )
458+ except ServiceUnavailable :
459+ self .remove (address )
460+ raise
461+ else :
462+ connection .pool = self
463+ connection .in_use = True
464+ connections .append (connection )
465+ return connection
466+
467+ # failed to obtain a connection from pool because the pool is full and no free connection in the pool
468+ span_timeout = self ._connection_acquisition_timeout - (clock () - connection_acquisition_start_timestamp )
469+ if span_timeout > 0 :
470+ self .cond .wait (span_timeout )
471+ # if timed out, then we throw error. This time computation is needed, as with python 2.7, we cannot
472+ # tell if the condition is notified or timed out when we come to this line
473+ if self ._connection_acquisition_timeout <= (clock () - connection_acquisition_start_timestamp ):
474+ raise ClientError ("Failed to obtain a connection from pool within {!r}s" .format (
475+ self ._connection_acquisition_timeout ))
476+ else :
477+ raise ClientError ("Failed to obtain a connection from pool within {!r}s" .format (self ._connection_acquisition_timeout ))
453478
454479 def acquire (self , access_mode = None ):
455480 """ Acquire a connection to a server that can satisfy a set of parameters.
@@ -463,6 +488,7 @@ def release(self, connection):
463488 """
464489 with self .lock :
465490 connection .in_use = False
491+ self .cond .notify_all ()
466492
467493 def in_use_connection_count (self , address ):
468494 """ Count the number of connections currently in use to a given
0 commit comments