Skip to content

Commit f28d1e2

Browse files
committed
RUBY-475 refactor read preference logic
Move read_pool into Mongo::ReadPreference Add #read_preference to produce read preference hash
1 parent ef88563 commit f28d1e2

File tree

7 files changed

+68
-69
lines changed

7 files changed

+68
-69
lines changed

lib/mongo/cursor.rb

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -6,12 +6,13 @@ class Cursor
66
include Mongo::Constants
77
include Mongo::Conversions
88
include Mongo::Logging
9+
include Mongo::ReadPreference
910

1011
attr_reader :collection, :selector, :fields,
1112
:order, :hint, :snapshot, :timeout,
1213
:full_collection_name, :transformer,
1314
:options, :cursor_id, :show_disk_loc,
14-
:comment, :read, :tag_sets
15+
:comment, :read, :tag_sets, :acceptable_latency
1516

1617
# Create a new cursor.
1718
#
@@ -530,9 +531,9 @@ def checkout_socket_from_connection
530531
if @pool
531532
socket = @pool.checkout
532533
elsif @command && !Mongo::Support::secondary_ok?(@selector)
533-
socket = @connection.checkout_reader(:primary)
534+
socket = @connection.checkout_reader({:mode => :primary})
534535
else
535-
socket = @connection.checkout_reader(@read, @tag_sets, @acceptable_latency)
536+
socket = @connection.checkout_reader(read_preference)
536537
end
537538
rescue SystemStackError, NoMemoryError, SystemCallError => ex
538539
@connection.close

lib/mongo/mongo_client.rb

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -351,10 +351,14 @@ def [](db_name)
351351
def refresh
352352
end
353353

354-
def pin_pool(pool)
354+
def pinned_pool
355+
@primary_pool
356+
end
357+
358+
def pin_pool(pool, read_prefs)
355359
end
356360

357-
def unpin_pool(pool)
361+
def unpin_pool
358362
end
359363

360364
# Drop a database.
@@ -521,7 +525,7 @@ def max_message_size
521525

522526
# Checkout a socket for reading (i.e., a secondary node).
523527
# Note: this is overridden in MongoReplicaSetClient.
524-
def checkout_reader(mode=:primary, tag_sets={}, acceptable_latency=15)
528+
def checkout_reader(read_preference)
525529
connect unless connected?
526530
@primary_pool.checkout
527531
end

lib/mongo/mongo_replica_set_client.rb

Lines changed: 16 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ module Mongo
22

33
# Instantiates and manages connections to a MongoDB replica set.
44
class MongoReplicaSetClient < MongoClient
5+
include ReadPreference
56
include ThreadLocalVariableManager
67

78
REPL_SET_OPTS = [
@@ -268,7 +269,7 @@ def nodes
268269
#
269270
# @return [Boolean]
270271
def read_primary?
271-
@manager.read_pool == @manager.primary_pool
272+
read_pool == primary_pool
272273
end
273274
alias :primary? :read_primary?
274275

@@ -335,9 +336,9 @@ def checkout
335336
end
336337
end
337338

338-
def checkout_reader(mode=@read, tag_sets=@tag_sets, acceptable_latency=@acceptable_latency)
339+
def checkout_reader(read_pref={})
339340
checkout do
340-
pool = read_pool(mode, tag_sets, acceptable_latency)
341+
pool = read_pool(read_pref)
341342
get_socket_from_pool(pool)
342343
end
343344
end
@@ -361,11 +362,20 @@ def ensure_manager
361362
thread_local[:managers][self] = @manager
362363
end
363364

364-
def pin_pool(pool)
365-
thread_local[:pinned_pools][@manager.object_id] = pool if @manager
365+
def pinned_pool
366+
thread_local[:pinned_pools][@manager.object_id]
366367
end
367368

368-
def unpin_pool(pool)
369+
def pin_pool(pool, read_preference)
370+
if @manager
371+
thread_local[:pinned_pools][@manager.object_id] = {
372+
:pool => pool,
373+
:read_preference => read_preference
374+
}
375+
end
376+
end
377+
378+
def unpin_pool
369379
thread_local[:pinned_pools].delete @manager.object_id if @manager
370380
end
371381

@@ -402,10 +412,6 @@ def primary_pool
402412
local_manager ? local_manager.primary_pool : nil
403413
end
404414

405-
def read_pool(mode=@read, tags=@tag_sets, acceptable_latency=@acceptable_latency)
406-
local_manager ? local_manager.read_pool(mode, tags, acceptable_latency) : nil
407-
end
408-
409415
def secondary_pool
410416
local_manager ? local_manager.secondary_pool : nil
411417
end

lib/mongo/util/pool.rb

Lines changed: 0 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -85,25 +85,6 @@ def up?
8585
!@closed
8686
end
8787

88-
def matches_mode(mode)
89-
if mode == :primary && @node.secondary? ||
90-
mode == :secondary && @node.primary?
91-
false
92-
else
93-
true
94-
end
95-
end
96-
97-
def matches_tag_set(tag_set)
98-
tag_set.all? do |tag, value|
99-
tags.has_key?(tag) && tags[tag] == value
100-
end
101-
end
102-
103-
def matches_tag_sets(tag_sets)
104-
tag_sets.all? {|set| matches_tag_set(set)}
105-
end
106-
10788
def inspect
10889
"#<Mongo::Pool:0x#{self.object_id.to_s(16)} @host=#{@host} @port=#{port} " +
10990
"@ping_time=#{@ping_time} #{@checked_out.size}/#{@size} sockets available " +

lib/mongo/util/pool_manager.rb

Lines changed: 0 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,5 @@
11
module Mongo
22
class PoolManager
3-
include Mongo::ReadPreference
43
include ThreadLocalVariableManager
54

65
attr_reader :client,
@@ -114,26 +113,6 @@ def read
114113
read_pool.host_port
115114
end
116115

117-
def read_pool(mode=@client.read,
118-
tags=@client.tag_sets,
119-
acceptable_latency=@client.acceptable_latency)
120-
121-
pinned = thread_local[:pinned_pools][self.object_id]
122-
123-
if pinned && pinned.matches_mode(mode) && pinned.matches_tag_sets(tags) && pinned.up?
124-
pool = pinned
125-
else
126-
pool = select_pool(mode, tags, acceptable_latency)
127-
end
128-
129-
unless pool
130-
raise ConnectionFailure, "No replica set member available for query " +
131-
"with read preference matching mode #{mode} and tags matching #{tags}."
132-
end
133-
134-
pool
135-
end
136-
137116
def max_bson_size
138117
@max_bson_size ||= config_min('maxBsonObjectSize', DEFAULT_MAX_BSON_SIZE)
139118
end

lib/mongo/util/read_preference.rb

Lines changed: 40 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -33,29 +33,56 @@ def self.validate(value)
3333
end
3434
end
3535

36-
def select_pool(mode, tags, latency)
37-
return primary_pool if @client.mongos?
36+
def read_preference
37+
{
38+
:mode => @read,
39+
:tags => @tag_sets,
40+
:latency => @acceptable_latency
41+
}
42+
end
43+
44+
def read_pool(read_preference_override={})
45+
return primary_pool if mongos?
46+
47+
read_pref = read_preference.merge read_preference_override
48+
49+
if pinned_pool && pinned_pool[:read_preference] == read_pref
50+
pool = pinned_pool[:pool]
51+
else
52+
unpin_pool
53+
pool = select_pool(read_pref)
54+
end
55+
56+
unless pool
57+
raise ConnectionFailure, "No replica set member available for query " +
58+
"with read preference matching mode #{read_pref[:mode]} and tags " +
59+
"matching #{read_pref[:tags]}."
60+
end
61+
62+
pool
63+
end
3864

39-
if mode == :primary && !tags.empty?
65+
def select_pool(read_pref)
66+
if read_pref[:mode] == :primary && !read_pref[:tags].empty?
4067
raise MongoArgumentError, "Read preference :primary cannot be combined with tags"
4168
end
4269

43-
case mode
70+
case read_pref[:mode]
4471
when :primary
4572
primary_pool
4673
when :primary_preferred
47-
primary_pool || select_secondary_pool(secondary_pools, tags, latency)
74+
primary_pool || select_secondary_pool(secondary_pools, read_pref)
4875
when :secondary
49-
select_secondary_pool(secondary_pools, tags, latency)
76+
select_secondary_pool(secondary_pools, read_pref)
5077
when :secondary_preferred
51-
select_secondary_pool(secondary_pools, tags, latency) || primary_pool
78+
select_secondary_pool(secondary_pools, read_pref) || primary_pool
5279
when :nearest
53-
select_secondary_pool(pools, tags, latency)
80+
select_secondary_pool(pools, read_pref)
5481
end
5582
end
5683

57-
def select_secondary_pool(candidates, tag_sets, latency)
58-
tag_sets = [tag_sets] unless tag_sets.is_a?(Array)
84+
def select_secondary_pool(candidates, read_pref)
85+
tag_sets = read_pref[:tags]
5986

6087
if !tag_sets.empty?
6188
matches = []
@@ -70,10 +97,11 @@ def select_secondary_pool(candidates, tag_sets, latency)
7097
matches = candidates
7198
end
7299

73-
matches.empty? ? nil : select_near_pool(matches, latency)
100+
matches.empty? ? nil : select_near_pool(matches, read_pref)
74101
end
75102

76-
def select_near_pool(candidates, latency)
103+
def select_near_pool(candidates, read_pref)
104+
latency = read_pref[:latency]
77105
nearest_pool = candidates.min_by { |candidate| candidate.ping_time }
78106
near_pools = candidates.select do |candidate|
79107
(candidate.ping_time - nearest_pool.ping_time) <= latency

test/replica_set/basic_test.rb

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -68,7 +68,7 @@ def test_accessors
6868
assert_equal 2, client.secondaries.length
6969
assert_equal 2, client.secondary_pools.length
7070
assert_equal @rs.repl_set_name, client.replica_set_name
71-
assert client.secondary_pools.include?(client.read_pool(:secondary))
71+
assert client.secondary_pools.include?(client.read_pool({:mode => :secondary}))
7272
assert_equal 90, client.refresh_interval
7373
assert_equal client.refresh_mode, false
7474
client.close

0 commit comments

Comments
 (0)