@@ -97,7 +97,11 @@ def initialize(host=nil, port=nil, options={})
9797
9898 # Mutex for synchronizing pool access
9999 @connection_mutex = Mutex . new
100- @safe_mutex = Mutex . new
100+
101+
102+ # Create a mutex when a new key, in this case a socket,
103+ # is added to the hash.
104+ @safe_mutexes = Hash . new { |h , k | h [ k ] = Mutex . new }
101105
102106 # Condition variable for signal and wait
103107 @queue = ConditionVariable . new
@@ -368,7 +372,7 @@ def send_message_with_safe_check(operation, message, db_name, log_message=nil, l
368372 sock = checkout
369373 packed_message = message_with_headers . append! ( message_with_check ) . to_s
370374 docs = num_received = cursor_id = ''
371- @safe_mutex . synchronize do
375+ @safe_mutexes [ sock ] . synchronize do
372376 send_message_on_socket ( packed_message , sock )
373377 docs , num_received , cursor_id = receive ( sock )
374378 end
@@ -398,7 +402,7 @@ def receive_message(operation, message, log_message=nil, socket=nil)
398402 sock = socket || checkout
399403
400404 result = ''
401- @safe_mutex . synchronize do
405+ @safe_mutexes [ sock ] . synchronize do
402406 send_message_on_socket ( packed_message , sock )
403407 result = receive ( sock )
404408 end
0 commit comments