Skip to content

Commit 8b442d8

Browse files
authored
Merge pull request #834 from estolfo/RUBY-1096-sdam-monitoring
RUBY-1096 Implement SDAM Monitoring spec
2 parents 2c1a537 + 004c871 commit 8b442d8

File tree

66 files changed

+2299
-208
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

66 files changed

+2299
-208
lines changed

lib/mongo/cluster.rb

Lines changed: 50 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@ module Mongo
2424
# @since 2.0.0
2525
class Cluster
2626
extend Forwardable
27+
include Monitoring::Publishable
2728
include Event::Subscriber
2829
include Loggable
2930

@@ -45,6 +46,9 @@ class Cluster
4546
# @return [ Hash ] The options hash.
4647
attr_reader :options
4748

49+
# @return [ Monitoring ] monitoring The monitoring.
50+
attr_reader :monitoring
51+
4852
# @return [ Object ] The cluster topology.
4953
attr_reader :topology
5054

@@ -54,7 +58,8 @@ class Cluster
5458
# @since 2.4.0
5559
attr_reader :app_metadata
5660

57-
def_delegators :topology, :replica_set?, :replica_set_name, :sharded?, :single?, :unknown?
61+
def_delegators :topology, :replica_set?, :replica_set_name, :sharded?,
62+
:single?, :unknown?, :member_discovered
5863
def_delegators :@cursor_reaper, :register_cursor, :schedule_kill_cursor, :unregister_cursor
5964

6065
# Determine if this cluster of servers is equal to another object. Checks the
@@ -89,7 +94,6 @@ def add(host)
8994
address = Address.new(host)
9095
if !addresses.include?(address)
9196
if addition_allowed?(address)
92-
log_debug("Adding #{address.to_s} to the cluster.")
9397
@update_lock.synchronize { @addresses.push(address) }
9498
server = Server.new(address, self, @monitoring, event_listeners, options)
9599
@update_lock.synchronize { @servers.push(server) }
@@ -98,6 +102,34 @@ def add(host)
98102
end
99103
end
100104

105+
# Determine if the cluster would select a readable server for the
106+
# provided read preference.
107+
#
108+
# @example Is a readable server present?
109+
# topology.has_readable_server?(server_selector)
110+
#
111+
# @param [ ServerSelector ] server_selector The server
112+
# selector.
113+
#
114+
# @return [ true, false ] If a readable server is present.
115+
#
116+
# @since 2.4.0
117+
def has_readable_server?(server_selector = nil)
118+
topology.has_readable_server?(self, server_selector)
119+
end
120+
121+
# Determine if the cluster would select a writable server.
122+
#
123+
# @example Is a writable server present?
124+
# topology.has_writable_server?
125+
#
126+
# @return [ true, false ] If a writable server is present.
127+
#
128+
# @since 2.4.0
129+
def has_writable_server?
130+
topology.has_writable_server?(self)
131+
end
132+
101133
# Instantiate the new cluster.
102134
#
103135
# @api private
@@ -119,16 +151,26 @@ def initialize(seeds, monitoring, options = Options::Redacted.new)
119151
@event_listeners = Event::Listeners.new
120152
@options = options.freeze
121153
@app_metadata ||= AppMetadata.new(self)
122-
@topology = Topology.initial(seeds, options)
123154
@update_lock = Mutex.new
124155
@pool_lock = Mutex.new
156+
@topology = Topology.initial(seeds, monitoring, options)
157+
158+
publish_sdam_event(
159+
Monitoring::TOPOLOGY_OPENING,
160+
Monitoring::Event::TopologyOpening.new(@topology)
161+
)
125162

126163
subscribe_to(Event::STANDALONE_DISCOVERED, Event::StandaloneDiscovered.new(self))
127164
subscribe_to(Event::DESCRIPTION_CHANGED, Event::DescriptionChanged.new(self))
128-
subscribe_to(Event::PRIMARY_ELECTED, Event::PrimaryElected.new(self))
165+
subscribe_to(Event::MEMBER_DISCOVERED, Event::MemberDiscovered.new(self))
129166

130167
seeds.each{ |seed| add(seed) }
131168

169+
publish_sdam_event(
170+
Monitoring::TOPOLOGY_CHANGED,
171+
Monitoring::Event::TopologyChanged.new(@topology, @topology)
172+
) if @servers.size > 1
173+
132174
@cursor_reaper = CursorReaper.new
133175
@cursor_reaper.run!
134176

@@ -264,11 +306,14 @@ def standalone_discovered
264306
#
265307
# @since 2.0.0
266308
def remove(host)
267-
log_debug("#{host} being removed from the cluster.")
268309
address = Address.new(host)
269310
removed_servers = @servers.select { |s| s.address == address }
270311
@update_lock.synchronize { @servers = @servers - removed_servers }
271312
removed_servers.each{ |server| server.disconnect! } if removed_servers
313+
publish_sdam_event(
314+
Monitoring::SERVER_CLOSED,
315+
Monitoring::Event::ServerClosed.new(address, topology)
316+
)
272317
@update_lock.synchronize { @addresses.reject! { |addr| addr == address } }
273318
end
274319

lib/mongo/cluster/topology.rb

Lines changed: 7 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -26,33 +26,34 @@ class Cluster
2626
module Topology
2727
extend self
2828

29-
# The 2 various topologies for server selection.
29+
# The various topologies for server selection.
3030
#
3131
# @since 2.0.0
3232
OPTIONS = {
3333
replica_set: ReplicaSet,
3434
sharded: Sharded,
3535
direct: Single
36-
}
36+
}.freeze
3737

3838
# Get the initial cluster topology for the provided options.
3939
#
4040
# @example Get the initial cluster topology.
4141
# Topology.initial(topology: :replica_set)
4242
#
4343
# @param [ Array<String> ] seeds The addresses of the configured servers.
44+
# @param [ Monitoring ] monitoring The monitoring.
4445
# @param [ Hash ] options The cluster options.
4546
#
4647
# @return [ ReplicaSet, Sharded, Single ] The topology.
4748
#
4849
# @since 2.0.0
49-
def initial(seeds, options)
50+
def initial(seeds, monitoring, options)
5051
if options.has_key?(:connect)
51-
OPTIONS.fetch(options[:connect]).new(options, seeds)
52+
OPTIONS.fetch(options[:connect]).new(options, monitoring, seeds)
5253
elsif options.has_key?(:replica_set)
53-
ReplicaSet.new(options, seeds)
54+
ReplicaSet.new(options, monitoring, options)
5455
else
55-
Unknown.new(options, seeds)
56+
Unknown.new(options, monitoring, seeds)
5657
end
5758
end
5859
end

lib/mongo/cluster/topology/replica_set.rb

Lines changed: 48 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ module Topology
2121
# @since 2.0.0
2222
class ReplicaSet
2323
include Loggable
24+
include Monitoring::Publishable
2425

2526
# Constant for the replica set name configuration option.
2627
#
@@ -30,6 +31,9 @@ class ReplicaSet
3031
# @return [ Hash ] options The options.
3132
attr_reader :options
3233

34+
# @return [ Monitoring ] monitoring The monitoring.
35+
attr_reader :monitoring
36+
3337
# The display name for the topology.
3438
#
3539
# @since 2.0.0
@@ -61,7 +65,6 @@ def display_name
6165
def elect_primary(description, servers)
6266
if description.replica_set_name == replica_set_name
6367
unless detect_stale_primary!(description)
64-
log_debug("Server #{description.address.to_s} elected as primary in #{replica_set_name}.")
6568
servers.each do |server|
6669
if server.primary? && server.address != description.address
6770
server.description.unknown!
@@ -79,16 +82,51 @@ def elect_primary(description, servers)
7982
self
8083
end
8184

85+
# Determine if the topology would select a readable server for the
86+
# provided candidates and read preference.
87+
#
88+
# @example Is a readable server present?
89+
# topology.has_readable_server?(cluster, server_selector)
90+
#
91+
# @param [ Cluster ] cluster The cluster.
92+
# @param [ ServerSelector ] server_selector The server
93+
# selector.
94+
#
95+
# @return [ true, false ] If a readable server is present.
96+
#
97+
# @since 2.4.0
98+
def has_readable_server?(cluster, server_selector = nil)
99+
(server_selector || ServerSelector.get(mode: :primary)).candidates(cluster).any?
100+
end
101+
102+
# Determine if the topology would select a writable server for the
103+
# provided candidates.
104+
#
105+
# @example Is a writable server present?
106+
# topology.has_writable_server?(servers)
107+
#
108+
# @param [ Cluster ] cluster The cluster.
109+
#
110+
# @return [ true, false ] If a writable server is present.
111+
#
112+
# @since 2.4.0
113+
def has_writable_server?(cluster)
114+
cluster.servers.any?{ |server| server.primary? }
115+
end
116+
82117
# Initialize the topology with the options.
83118
#
84119
# @example Initialize the topology.
85120
# ReplicaSet.new(options)
86121
#
87122
# @param [ Hash ] options The options.
123+
# @param [ Monitoring ] monitoring The monitoring.
124+
# @param [ Array<String> ] seeds The seeds.
88125
#
89126
# @since 2.0.0
90-
def initialize(options, seeds = [])
127+
def initialize(options, monitoring, seeds = [])
91128
@options = options
129+
@monitoring = monitoring
92130
@max_election_id = nil
93131
@max_set_version = nil
94132
end
@@ -222,6 +260,14 @@ def unknown?; false; end
222260
# @since 2.0.6
223261
def standalone_discovered; self; end
224262

263+
# Notify the topology that a member was discovered.
264+
#
265+
# @example Notify the topology that a member was discovered.
266+
# topology.member_discovered
267+
#
268+
# @since 2.4.0
269+
def member_discovered; end;
270+
225271
private
226272

227273
def update_max_election_id(description)

lib/mongo/cluster/topology/sharded.rb

Lines changed: 47 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,12 +20,19 @@ module Topology
2020
#
2121
# @since 2.0.0
2222
class Sharded
23+
include Monitoring::Publishable
2324

2425
# The display name for the topology.
2526
#
2627
# @since 2.0.0
2728
NAME = 'Sharded'.freeze
2829

30+
# @return [ Hash ] options The options.
31+
attr_reader :options
32+
33+
# @return [ Monitoring ] monitoring The monitoring.
34+
attr_reader :monitoring
35+
2936
# Get the display name.
3037
#
3138
# @example Get the display name.
@@ -51,16 +58,47 @@ def display_name
5158
# @return [ Sharded ] The topology.
5259
def elect_primary(description, servers); self; end
5360

61+
# Determine if the topology would select a readable server for the
62+
# provided candidates and read preference.
63+
#
64+
# @example Is a readable server present?
65+
# topology.has_readable_server?(cluster, server_selector)
66+
#
67+
# @param [ Cluster ] cluster The cluster.
68+
# @param [ ServerSelector ] server_selector The server
69+
# selector.
70+
#
71+
# @return [ true ] A Sharded cluster always has a readable server.
72+
#
73+
# @since 2.4.0
74+
def has_readable_server?(cluster, server_selector = nil); true; end
75+
76+
# Determine if the topology would select a writable server for the
77+
# provided candidates.
78+
#
79+
# @example Is a writable server present?
80+
# topology.has_writable_server?(servers)
81+
#
82+
# @param [ Cluster ] cluster The cluster.
83+
#
84+
# @return [ true ] A Sharded cluster always has a writable server.
85+
#
86+
# @since 2.4.0
87+
def has_writable_server?(cluster); true; end
88+
5489
# Initialize the topology with the options.
5590
#
5691
# @example Initialize the topology.
5792
# Sharded.new(options)
5893
#
5994
# @param [ Hash ] options The options.
95+
# @param [ Monitoring ] monitoring The monitoring.
96+
# @param [ Array<String> ] seeds The seeds.
6097
#
6198
# @since 2.0.0
62-
def initialize(options, seeds = [])
99+
def initialize(options, monitoring, seeds = [])
63100
@options = options
101+
@monitoring = monitoring
64102
end
65103

66104
# A sharded topology is not a replica set.
@@ -181,6 +219,14 @@ def unknown?; false; end
181219
# @since 2.0.6
182220
def standalone_discovered; self; end
183221

222+
# Notify the topology that a member was discovered.
223+
#
224+
# @example Notify the cluster that a member was discovered.
225+
# topology.member_discovered
226+
#
227+
# @since 2.4.0
228+
def member_discovered; end;
229+
184230
private
185231

186232
def remove_self?(description, server)

0 commit comments

Comments
 (0)