Skip to content

Commit 4482b36

Browse files
committed
RUBY-475 pin/unpin logic added to cursor
1 parent f28d1e2 commit 4482b36

File tree

3 files changed

+47
-1
lines changed

3 files changed

+47
-1
lines changed

lib/mongo/cursor.rb

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -472,6 +472,7 @@ def send_initial_query
472472
nil, @options & OP_QUERY_EXHAUST != 0)
473473
rescue ConnectionFailure => ex
474474
socket.close if socket
475+
@connection.unpin_pool
475476
@connection.refresh
476477
if tries < 3 && !@socket && (!@command || Mongo::Support::secondary_ok?(@selector))
477478
tries += 1
@@ -484,6 +485,9 @@ def send_initial_query
484485
ensure
485486
socket.checkin unless @socket || socket.nil?
486487
end
488+
if !@socket && !@command
489+
@connection.pin_pool(socket.pool, read_preference)
490+
end
487491
@returned += @n_received
488492
@cache += results
489493
@query_run = true

lib/mongo/mongo_replica_set_client.rb

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -283,6 +283,7 @@ def close(opts={})
283283

284284
# Clear the reference to this object.
285285
thread_local[:managers].delete(self)
286+
unpin_pool
286287

287288
@connected = false
288289
end
@@ -363,7 +364,7 @@ def ensure_manager
363364
end
364365

365366
def pinned_pool
366-
thread_local[:pinned_pools][@manager.object_id]
367+
thread_local[:pinned_pools][@manager.object_id] if @manager
367368
end
368369

369370
def pin_pool(pool, read_preference)

test/replica_set/pinning_test.rb

Lines changed: 41 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,41 @@
1+
require 'test_helper'
2+
3+
class ReplicaSetPinningTest < Test::Unit::TestCase
4+
def setup
5+
ensure_cluster(:rs)
6+
@client = MongoReplicaSetClient.new(@rs.repl_set_seeds, :name => @rs.repl_set_name)
7+
@db = @client.db(MONGO_TEST_DB)
8+
@coll = @db.collection("test-sets")
9+
@coll.insert({:a => 1})
10+
end
11+
12+
def test_unpinning
13+
# pin primary
14+
@coll.find_one
15+
assert_equal @client.pinned_pool[:pool], @client.primary_pool
16+
17+
# pin secondary
18+
@coll.find_one({}, :read => :secondary_preferred)
19+
assert @client.secondary_pools.include? @client.pinned_pool[:pool]
20+
21+
# repin primary
22+
@coll.find_one({}, :read => :primary_preferred)
23+
assert_equal @client.pinned_pool[:pool], @client.primary_pool
24+
end
25+
26+
def test_pinned_pool_is_local_to_thread
27+
threads = []
28+
30.times do |i|
29+
threads << Thread.new do
30+
if i % 2 == 0
31+
@coll.find_one({}, :read => :secondary_preferred)
32+
assert @client.secondary_pools.include? @client.pinned_pool[:pool]
33+
else
34+
@coll.find_one({}, :read => :primary_preferred)
35+
assert_equal @client.pinned_pool[:pool], @client.primary_pool
36+
end
37+
end
38+
end
39+
threads.each(&:join)
40+
end
41+
end

0 commit comments

Comments
 (0)