lib/mongo/cluster.rb in mongo-2.4.0.rc1 vs lib/mongo/cluster.rb in mongo-2.4.0
- old
+ new
@@ -22,10 +22,11 @@
# replica set, or a single or multiple mongos.
#
# @since 2.0.0
class Cluster
extend Forwardable
+ include Monitoring::Publishable
include Event::Subscriber
include Loggable
# The default number of mongos read retries.
#
@@ -35,23 +36,32 @@
# The default mongos read retry interval, in seconds.
#
# @since 2.1.1
READ_RETRY_INTERVAL = 5
+ # How often an idle primary writes a no-op to the oplog.
+ #
+ # @since 2.4.0
+ IDLE_WRITE_PERIOD_SECONDS = 10
+
# @return [ Hash ] The options hash.
attr_reader :options
+ # @return [ Monitoring ] monitoring The monitoring.
+ attr_reader :monitoring
+
# @return [ Object ] The cluster topology.
attr_reader :topology
# @return [ Mongo::Cluster::AppMetadata ] The application metadata, used for connection
# handshakes.
#
# @since 2.4.0
attr_reader :app_metadata
- def_delegators :topology, :replica_set?, :replica_set_name, :sharded?, :single?, :unknown?
+ def_delegators :topology, :replica_set?, :replica_set_name, :sharded?,
+ :single?, :unknown?, :member_discovered
def_delegators :@cursor_reaper, :register_cursor, :schedule_kill_cursor, :unregister_cursor
# Determine if this cluster of servers is equal to another object. Checks the
# servers currently in the cluster, not what was configured.
#
@@ -82,19 +92,46 @@
# @since 2.0.0
def add(host)
address = Address.new(host)
if !addresses.include?(address)
if addition_allowed?(address)
- log_debug("Adding #{address.to_s} to the cluster.")
@update_lock.synchronize { @addresses.push(address) }
server = Server.new(address, self, @monitoring, event_listeners, options)
@update_lock.synchronize { @servers.push(server) }
server
end
end
end
+ # Determine if the cluster would select a readable server for the
+ # provided read preference.
+ #
+ # @example Is a readable server present?
+ # topology.has_readable_server?(server_selector)
+ #
+ # @param [ ServerSelector ] server_selector The server
+ # selector.
+ #
+ # @return [ true, false ] If a readable server is present.
+ #
+ # @since 2.4.0
+ def has_readable_server?(server_selector = nil)
+ topology.has_readable_server?(self, server_selector)
+ end
+
+ # Determine if the cluster would select a writable server.
+ #
+ # @example Is a writable server present?
+ # topology.has_writable_server?
+ #
+ # @return [ true, false ] If a writable server is present.
+ #
+ # @since 2.4.0
+ def has_writable_server?
+ topology.has_writable_server?(self)
+ end
+
# Instantiate the new cluster.
#
# @api private
#
# @example Instantiate the cluster.
@@ -112,20 +149,30 @@
@servers = []
@monitoring = monitoring
@event_listeners = Event::Listeners.new
@options = options.freeze
@app_metadata ||= AppMetadata.new(self)
- @topology = Topology.initial(seeds, options)
@update_lock = Mutex.new
@pool_lock = Mutex.new
+ @topology = Topology.initial(seeds, monitoring, options)
+ publish_sdam_event(
+ Monitoring::TOPOLOGY_OPENING,
+ Monitoring::Event::TopologyOpening.new(@topology)
+ )
+
subscribe_to(Event::STANDALONE_DISCOVERED, Event::StandaloneDiscovered.new(self))
subscribe_to(Event::DESCRIPTION_CHANGED, Event::DescriptionChanged.new(self))
- subscribe_to(Event::PRIMARY_ELECTED, Event::PrimaryElected.new(self))
+ subscribe_to(Event::MEMBER_DISCOVERED, Event::MemberDiscovered.new(self))
seeds.each{ |seed| add(seed) }
+ publish_sdam_event(
+ Monitoring::TOPOLOGY_CHANGED,
+ Monitoring::Event::TopologyChanged.new(@topology, @topology)
+ ) if @servers.size > 1
+
@cursor_reaper = CursorReaper.new
@cursor_reaper.run!
ObjectSpace.define_finalizer(self, self.class.finalize(pools))
end
@@ -257,14 +304,17 @@
#
# @param [ String ] host The host/port or socket address.
#
# @since 2.0.0
def remove(host)
- log_debug("#{host} being removed from the cluster.")
address = Address.new(host)
removed_servers = @servers.select { |s| s.address == address }
@update_lock.synchronize { @servers = @servers - removed_servers }
removed_servers.each{ |server| server.disconnect! } if removed_servers
+ publish_sdam_event(
+ Monitoring::SERVER_CLOSED,
+ Monitoring::Event::ServerClosed.new(address, topology)
+ )
@update_lock.synchronize { @addresses.reject! { |addr| addr == address } }
end
# Force a scan of all known servers in the cluster.
#