lib/mongo/cluster.rb in mongo-2.4.0.rc1 vs lib/mongo/cluster.rb in mongo-2.4.0

- old
+ new

@@ -22,10 +22,11 @@ # replica set, or a single or multiple mongos. # # @since 2.0.0 class Cluster extend Forwardable + include Monitoring::Publishable include Event::Subscriber include Loggable # The default number of mongos read retries. # @@ -35,23 +36,32 @@ # The default mongos read retry interval, in seconds. # # @since 2.1.1 READ_RETRY_INTERVAL = 5 + # How often an idle primary writes a no-op to the oplog. + # + # @since 2.4.0 + IDLE_WRITE_PERIOD_SECONDS = 10 + # @return [ Hash ] The options hash. attr_reader :options + # @return [ Monitoring ] monitoring The monitoring. + attr_reader :monitoring + # @return [ Object ] The cluster topology. attr_reader :topology # @return [ Mongo::Cluster::AppMetadata ] The application metadata, used for connection # handshakes. # # @since 2.4.0 attr_reader :app_metadata - def_delegators :topology, :replica_set?, :replica_set_name, :sharded?, :single?, :unknown? + def_delegators :topology, :replica_set?, :replica_set_name, :sharded?, + :single?, :unknown?, :member_discovered def_delegators :@cursor_reaper, :register_cursor, :schedule_kill_cursor, :unregister_cursor # Determine if this cluster of servers is equal to another object. Checks the # servers currently in the cluster, not what was configured. # @@ -82,19 +92,46 @@ # @since 2.0.0 def add(host) address = Address.new(host) if !addresses.include?(address) if addition_allowed?(address) - log_debug("Adding #{address.to_s} to the cluster.") @update_lock.synchronize { @addresses.push(address) } server = Server.new(address, self, @monitoring, event_listeners, options) @update_lock.synchronize { @servers.push(server) } server end end end + # Determine if the cluster would select a readable server for the + # provided read preference. + # + # @example Is a readable server present? + # topology.has_readable_server?(server_selector) + # + # @param [ ServerSelector ] server_selector The server + # selector. + # + # @return [ true, false ] If a readable server is present. + # + # @since 2.4.0 + def has_readable_server?(server_selector = nil) + topology.has_readable_server?(self, server_selector) + end + + # Determine if the cluster would select a writable server. + # + # @example Is a writable server present? + # topology.has_writable_server? + # + # @return [ true, false ] If a writable server is present. + # + # @since 2.4.0 + def has_writable_server? + topology.has_writable_server?(self) + end + # Instantiate the new cluster. # # @api private # # @example Instantiate the cluster. @@ -112,20 +149,30 @@ @servers = [] @monitoring = monitoring @event_listeners = Event::Listeners.new @options = options.freeze @app_metadata ||= AppMetadata.new(self) - @topology = Topology.initial(seeds, options) @update_lock = Mutex.new @pool_lock = Mutex.new + @topology = Topology.initial(seeds, monitoring, options) + publish_sdam_event( + Monitoring::TOPOLOGY_OPENING, + Monitoring::Event::TopologyOpening.new(@topology) + ) + subscribe_to(Event::STANDALONE_DISCOVERED, Event::StandaloneDiscovered.new(self)) subscribe_to(Event::DESCRIPTION_CHANGED, Event::DescriptionChanged.new(self)) - subscribe_to(Event::PRIMARY_ELECTED, Event::PrimaryElected.new(self)) + subscribe_to(Event::MEMBER_DISCOVERED, Event::MemberDiscovered.new(self)) seeds.each{ |seed| add(seed) } + publish_sdam_event( + Monitoring::TOPOLOGY_CHANGED, + Monitoring::Event::TopologyChanged.new(@topology, @topology) + ) if @servers.size > 1 + @cursor_reaper = CursorReaper.new @cursor_reaper.run! ObjectSpace.define_finalizer(self, self.class.finalize(pools)) end @@ -257,14 +304,17 @@ # # @param [ String ] host The host/port or socket address. # # @since 2.0.0 def remove(host) - log_debug("#{host} being removed from the cluster.") address = Address.new(host) removed_servers = @servers.select { |s| s.address == address } @update_lock.synchronize { @servers = @servers - removed_servers } removed_servers.each{ |server| server.disconnect! } if removed_servers + publish_sdam_event( + Monitoring::SERVER_CLOSED, + Monitoring::Event::ServerClosed.new(address, topology) + ) @update_lock.synchronize { @addresses.reject! { |addr| addr == address } } end # Force a scan of all known servers in the cluster. #