lib/mongo/cluster.rb in mongo-2.0.5 vs lib/mongo/cluster.rb in mongo-2.0.6
- old
+ new
@@ -23,13 +23,10 @@
class Cluster
extend Forwardable
include Event::Subscriber
include Loggable
- # @return [ Array<String> ] The provided seed addresses.
- attr_reader :addresses
-
# @return [ Hash ] The options hash.
attr_reader :options
# @return [ Object ] The cluster topology.
attr_reader :topology
@@ -67,14 +64,13 @@
def add(host)
address = Address.new(host)
if !addresses.include?(address)
if addition_allowed?(address)
log_debug([ "Adding #{address.to_s} to the cluster." ])
- addresses.push(address)
- server = Server.new(address, event_listeners,
- options.merge(slave_ok: @slave_ok))
- @servers.push(server)
+ @update_lock.synchronize { @addresses.push(address) }
+ server = Server.new(address, self, event_listeners, options)
+ @update_lock.synchronize { @servers.push(server) }
server
end
end
end
@@ -91,14 +87,14 @@
@addresses = []
@servers = []
@event_listeners = Event::Listeners.new
@options = options.freeze
@topology = Topology.initial(seeds, options)
- @slave_ok = @topology.single? unless options[:read]
+ @update_lock = Mutex.new
- subscribe_to(Event::SERVER_ADDED, Event::ServerAdded.new(self))
- subscribe_to(Event::SERVER_REMOVED, Event::ServerRemoved.new(self))
+ subscribe_to(Event::STANDALONE_DISCOVERED, Event::StandaloneDiscovered.new(self))
+ subscribe_to(Event::DESCRIPTION_CHANGED, Event::DescriptionChanged.new(self))
subscribe_to(Event::PRIMARY_ELECTED, Event::PrimaryElected.new(self))
seeds.each{ |seed| add(seed) }
end
@@ -136,14 +132,27 @@
#
# @return [ Topology ] The cluster topology.
#
# @since 2.0.0
def elect_primary!(description)
- @topology = topology.elect_primary(description, @servers)
+ @topology = topology.elect_primary(description, servers_list)
end
- # Removed the server from the cluster for the provided address, if it
+ # Notify the cluster that a standalone server was discovered so that the
+ # topology can be updated accordingly.
+ #
+ # @example Notify the cluster that a standalone server was discovered.
+ # cluster.standalone_discovered
+ #
+ # @return [ Topology ] The cluster topology.
+ #
+ # @since 2.0.6
+ def standalone_discovered
+ @topology = topology.standalone_discovered
+ end
+
+ # Remove the server from the cluster for the provided address, if it
# exists.
#
# @example Remove the server from the cluster.
# server.remove('127.0.0.1:27017')
#
@@ -151,13 +160,14 @@
#
# @since 2.0.0
def remove(host)
log_debug([ "#{host} being removed from the cluster." ])
address = Address.new(host)
- removed_servers = @servers.reject!{ |server| server.address == address }
+ removed_servers = @servers.select { |s| s.address == address }
+ @update_lock.synchronize { @servers = @servers - removed_servers }
removed_servers.each{ |server| server.disconnect! } if removed_servers
- addresses.reject!{ |addr| addr == address }
+ @update_lock.synchronize { @addresses.reject! { |addr| addr == address } }
end
# Force a scan of all known servers in the cluster.
#
# @example Force a full cluster scan.
@@ -168,11 +178,11 @@
#
# @return [ true ] Always true.
#
# @since 2.0.0
def scan!
- @servers.each{ |server| server.scan! } and true
+ servers_list.each{ |server| server.scan! } and true
end
# Get a list of server candidates from the cluster that can have operations
# executed on them.
#
@@ -181,13 +191,43 @@
#
# @return [ Array<Server> ] The candidate servers.
#
# @since 2.0.0
def servers
- topology.servers(@servers.compact).compact
+ topology.servers(servers_list.compact).compact
end
+ # Add hosts in a description to the cluster.
+ #
+ # @example Add hosts in a description to the cluster.
+ # cluster.add_hosts(description)
+ #
+ # @param [ Mongo::Server::Description ] description The description.
+ #
+ # @since 2.0.6
+ def add_hosts(description)
+ if topology.add_hosts?(description, servers_list)
+ description.servers.each { |s| add(s) }
+ end
+ end
+
+ # Remove hosts in a description from the cluster.
+ #
+ # @example Remove hosts in a description from the cluster.
+ # cluster.remove_hosts(description)
+ #
+ # @param [ Mongo::Server::Description ] description The description.
+ #
+ # @since 2.0.6
+ def remove_hosts(description)
+ if topology.remove_hosts?(description)
+ servers_list.each do |s|
+ remove(s.address.to_s) if topology.remove_server?(description, s)
+ end
+ end
+ end
+
# Create a cluster for the provided client, for use when we don't want the
# client's original cluster instance to be the same.
#
# @api private
#
@@ -202,16 +242,44 @@
def self.create(client)
cluster = Cluster.new(client.cluster.addresses.map(&:to_s), client.options)
client.instance_variable_set(:@cluster, cluster)
end
+ # The addresses in the cluster.
+ #
+ # @example Get the addresses in the cluster.
+ # cluster.addresses
+ #
+ # @return [ Array<Mongo::Address> ] The addresses.
+ #
+ # @since 2.0.6
+ def addresses
+ addresses_list
+ end
+
private
def direct_connection?(address)
address.seed == @topology.seed
end
def addition_allowed?(address)
!@topology.single? || direct_connection?(address)
+ end
+
+ def servers_list
+ @update_lock.synchronize do
+ @servers.reduce([]) do |servers, server|
+ servers << server
+ end
+ end
+ end
+
+ def addresses_list
+ @update_lock.synchronize do
+ @addresses.reduce([]) do |addresses, address|
+ addresses << address
+ end
+ end
end
end
end