lib/mongo/cluster.rb in mongo-2.0.5 vs lib/mongo/cluster.rb in mongo-2.0.6

- old
+ new

@@ -23,13 +23,10 @@ class Cluster extend Forwardable include Event::Subscriber include Loggable - # @return [ Array<String> ] The provided seed addresses. - attr_reader :addresses - # @return [ Hash ] The options hash. attr_reader :options # @return [ Object ] The cluster topology. attr_reader :topology @@ -67,14 +64,13 @@ def add(host) address = Address.new(host) if !addresses.include?(address) if addition_allowed?(address) log_debug([ "Adding #{address.to_s} to the cluster." ]) - addresses.push(address) - server = Server.new(address, event_listeners, - options.merge(slave_ok: @slave_ok)) - @servers.push(server) + @update_lock.synchronize { @addresses.push(address) } + server = Server.new(address, self, event_listeners, options) + @update_lock.synchronize { @servers.push(server) } server end end end @@ -91,14 +87,14 @@ @addresses = [] @servers = [] @event_listeners = Event::Listeners.new @options = options.freeze @topology = Topology.initial(seeds, options) - @slave_ok = @topology.single? unless options[:read] + @update_lock = Mutex.new - subscribe_to(Event::SERVER_ADDED, Event::ServerAdded.new(self)) - subscribe_to(Event::SERVER_REMOVED, Event::ServerRemoved.new(self)) + subscribe_to(Event::STANDALONE_DISCOVERED, Event::StandaloneDiscovered.new(self)) + subscribe_to(Event::DESCRIPTION_CHANGED, Event::DescriptionChanged.new(self)) subscribe_to(Event::PRIMARY_ELECTED, Event::PrimaryElected.new(self)) seeds.each{ |seed| add(seed) } end @@ -136,14 +132,27 @@ # # @return [ Topology ] The cluster topology. # # @since 2.0.0 def elect_primary!(description) - @topology = topology.elect_primary(description, @servers) + @topology = topology.elect_primary(description, servers_list) end - # Removed the server from the cluster for the provided address, if it + # Notify the cluster that a standalone server was discovered so that the + # topology can be updated accordingly. + # + # @example Notify the cluster that a standalone server was discovered. + # cluster.standalone_discovered + # + # @return [ Topology ] The cluster topology. + # + # @since 2.0.6 + def standalone_discovered + @topology = topology.standalone_discovered + end + + # Remove the server from the cluster for the provided address, if it # exists. # # @example Remove the server from the cluster. # server.remove('127.0.0.1:27017') # @@ -151,13 +160,14 @@ # # @since 2.0.0 def remove(host) log_debug([ "#{host} being removed from the cluster." ]) address = Address.new(host) - removed_servers = @servers.reject!{ |server| server.address == address } + removed_servers = @servers.select { |s| s.address == address } + @update_lock.synchronize { @servers = @servers - removed_servers } removed_servers.each{ |server| server.disconnect! } if removed_servers - addresses.reject!{ |addr| addr == address } + @update_lock.synchronize { @addresses.reject! { |addr| addr == address } } end # Force a scan of all known servers in the cluster. # # @example Force a full cluster scan. @@ -168,11 +178,11 @@ # # @return [ true ] Always true. # # @since 2.0.0 def scan! - @servers.each{ |server| server.scan! } and true + servers_list.each{ |server| server.scan! } and true end # Get a list of server candidates from the cluster that can have operations # executed on them. # @@ -181,13 +191,43 @@ # # @return [ Array<Server> ] The candidate servers. # # @since 2.0.0 def servers - topology.servers(@servers.compact).compact + topology.servers(servers_list.compact).compact end + # Add hosts in a description to the cluster. + # + # @example Add hosts in a description to the cluster. + # cluster.add_hosts(description) + # + # @param [ Mongo::Server::Description ] description The description. + # + # @since 2.0.6 + def add_hosts(description) + if topology.add_hosts?(description, servers_list) + description.servers.each { |s| add(s) } + end + end + + # Remove hosts in a description from the cluster. + # + # @example Remove hosts in a description from the cluster. + # cluster.remove_hosts(description) + # + # @param [ Mongo::Server::Description ] description The description. + # + # @since 2.0.6 + def remove_hosts(description) + if topology.remove_hosts?(description) + servers_list.each do |s| + remove(s.address.to_s) if topology.remove_server?(description, s) + end + end + end + # Create a cluster for the provided client, for use when we don't want the # client's original cluster instance to be the same. # # @api private # @@ -202,16 +242,44 @@ def self.create(client) cluster = Cluster.new(client.cluster.addresses.map(&:to_s), client.options) client.instance_variable_set(:@cluster, cluster) end + # The addresses in the cluster. + # + # @example Get the addresses in the cluster. + # cluster.addresses + # + # @return [ Array<Mongo::Address> ] The addresses. + # + # @since 2.0.6 + def addresses + addresses_list + end + private def direct_connection?(address) address.seed == @topology.seed end def addition_allowed?(address) !@topology.single? || direct_connection?(address) + end + + def servers_list + @update_lock.synchronize do + @servers.reduce([]) do |servers, server| + servers << server + end + end + end + + def addresses_list + @update_lock.synchronize do + @addresses.reduce([]) do |addresses, address| + addresses << address + end + end end end end