lib/moped/cluster.rb in moped-1.0.0.alpha vs lib/moped/cluster.rb in moped-1.0.0.beta
- old
+ new
@@ -1,193 +1,145 @@
module Moped
- # @api private
- #
- # The internal class managing connections to both a single node and replica
- # sets.
- #
- # @note Though the socket class itself *is* threadsafe, the cluster presently
- # is not. This means that in the course of normal operations sessions can be
- # shared across threads, but in failure modes (when a resync is required),
- # things can possibly go wrong.
class Cluster
- # @return [Array] the user supplied seeds
+ # @return [Array<String>] the seeds the replica set was initialized with
attr_reader :seeds
- # @return [Boolean] whether this is a direct connection
- attr_reader :direct
+ # @option options :down_interval number of seconds to wait before attempting
+ # to reconnect to a down node. (30)
+ #
+ # @option options :refresh_interval number of seconds to cache information
+ # about a node. (300)
+ def initialize(hosts, options)
+ @options = {
+ down_interval: 30,
+ refresh_interval: 300
+ }.merge(options)
- # @return [Array] all available nodes
- attr_reader :servers
-
- # @return [Array] seeds gathered from cluster discovery
- attr_reader :dynamic_seeds
-
- # @param [Array] seeds an array of host:port pairs
- # @param [Boolean] direct (false) whether to connect directly to the hosts
- # provided or to find additional available nodes.
- def initialize(seeds, direct = false)
- @seeds = seeds
- @direct = direct
-
- @servers = []
- @dynamic_seeds = []
+ @seeds = hosts
+ @nodes = hosts.map { |host| Node.new(host) }
end
- # @return [Array] available secondary nodes
- def secondaries
- servers.select(&:secondary?)
- end
+ # Refreshes information for each of the nodes provided. The node list
+ # defaults to the list of all known nodes.
+ #
+ # If a node is successfully refreshed, any newly discovered peers will also
+ # be refreshed.
+ #
+ # @return [Array<Node>] the available nodes
+ def refresh(nodes_to_refresh = @nodes)
+ refreshed_nodes = []
+ seen = {}
- # @return [Array] available primary nodes
- def primaries
- servers.select(&:primary?)
- end
+ # Set up a recursive lambda function for refreshing a node and it's peers.
+ refresh_node = ->(node) do
+ unless seen[node]
+ seen[node] = true
- # @return [Array] all known addresses from user supplied seeds, dynamically
- # discovered seeds, and active servers.
- def known_addresses
- [].tap do |addresses|
- addresses.concat seeds
- addresses.concat dynamic_seeds
- addresses.concat servers.map { |server| server.address }
- end.uniq
- end
+ # Add the node to the global list of known nodes.
+ @nodes << node unless @nodes.include?(node)
- def remove(server)
- servers.delete(server)
- end
+ begin
+ node.refresh
- def reconnect
- @servers = servers.map { |server| Server.new(server.address) }
- end
+ # This node is good, so add it to the list of nodes to return.
+ refreshed_nodes << node unless refreshed_nodes.include?(node)
- def sync
- known = known_addresses.shuffle
- seen = {}
-
- sync_seed = ->(seed) do
- server = Server.new seed
-
- unless seen[server.resolved_address]
- seen[server.resolved_address] = true
-
- hosts = sync_server(server)
-
- hosts.each do |host|
- sync_seed[host]
+ # Now refresh any newly discovered peer nodes.
+ (node.peers - @nodes).each &refresh_node
+ rescue Errors::ConnectionFailure
+ # We couldn't connect to the node, so don't do anything with it.
end
end
end
- known.each do |seed|
- sync_seed[seed]
- end
+ nodes_to_refresh.each &refresh_node
- unless servers.empty?
- @dynamic_seeds = servers.map(&:address)
- end
-
- true
+ refreshed_nodes.to_a
end
- def sync_server(server)
- [].tap do |hosts|
- socket = server.socket
+ # Returns the list of available nodes, refreshing 1) any nodes which were
+ # down and ready to be checked again and 2) any nodes whose information is
+ # out of date.
+ #
+ # @return [Array<Node>] the list of available nodes.
+ def nodes
+ # Find the nodes that were down but are ready to be refreshed, or those
+ # with stale connection information.
+ needs_refresh, available = @nodes.partition do |node|
+ (node.down? && node.down_at < (Time.new - @options[:down_interval])) ||
+ node.needs_refresh?(Time.new - @options[:refresh_interval])
+ end
- if socket.connect
- info = socket.simple_query Protocol::Command.new(:admin, ismaster: 1)
+ # Refresh those nodes.
+ available.concat refresh(needs_refresh)
- if info["ismaster"]
- server.primary = true
- end
+ # Now return all the nodes that are available.
+ available.reject &:down?
+ end
- if info["secondary"]
- server.secondary = true
+ # Yields the replica set's primary node to the provided block. This method
+ # will retry the block in case of connection errors or replica set
+ # reconfiguration.
+ #
+ # @raises ConnectionFailure when no primary node can be found
+ def with_primary(retry_on_failure = true, &block)
+ if node = nodes.find(&:primary?)
+ begin
+ node.ensure_primary do
+ return yield node.apply_auth(auth)
end
-
- if info["primary"]
- hosts.push info["primary"]
- end
-
- if info["hosts"]
- hosts.concat info["hosts"]
- end
-
- if info["passives"]
- hosts.concat info["passives"]
- end
-
- merge(server)
-
+ rescue Errors::ConnectionFailure, Errors::ReplicaSetReconfigured
+ # Fall through to the code below if our connection was dropped or the
+ # node is no longer the primary.
end
- end.uniq
- end
+ end
- def merge(server)
- previous = servers.find { |other| other == server }
- primary = server.primary?
- secondary = server.secondary?
-
- if previous
- previous.merge(server)
+ if retry_on_failure
+ # We couldn't find a primary node, so refresh the list and try again.
+ refresh
+ with_primary(false, &block)
else
- servers << server
+ raise Errors::ConnectionFailure, "Could not connect to a primary node for replica set #{inspect}"
end
end
- # @param [:read, :write] mode the type of socket to return
- # @return [Socket] a socket valid for +mode+ operations
- def socket_for(mode)
- sync unless primaries.any? || (secondaries.any? && mode == :read)
+ # Yields a secondary node if available, otherwise the primary node. This
+ # method will retry the block in case of connection errors.
+ #
+ # @raises ConnectionError when no secondary or primary node can be found
+ def with_secondary(retry_on_failure = true, &block)
+ available_nodes = nodes.shuffle!.partition(&:secondary?).flatten
- server = nil
- while primaries.any? || (secondaries.any? && mode == :read)
- if mode == :write || secondaries.empty?
- server = primaries.sample
- else
- server = secondaries.sample
+ while node = available_nodes.shift
+ begin
+ return yield node.apply_auth(auth)
+ rescue Errors::ConnectionFailure
+ # That node's no good, so let's try the next one.
+ next
end
-
- if server
- socket = server.socket
- socket.connect unless socket.connection
-
- if socket.alive?
- break server
- else
- remove server
- end
- end
end
- unless server
- raise Errors::ConnectionFailure.new("Could not connect to any primary or secondary servers")
+ if retry_on_failure
+ # We couldn't find a secondary or primary node, so refresh the list and
+ # try again.
+ refresh
+ with_secondary(false, &block)
+ else
+ raise Errors::ConnectionFailure, "Could not connect to any secondary or primary nodes for replica set #{inspect}"
end
-
- socket = server.socket
- socket.apply_auth auth
- socket
end
# @return [Hash] the cached authentication credentials for this cluster.
def auth
@auth ||= {}
end
- # Log in to +database+ with +username+ and +password+. Does not perform the
- # actual log in, but saves the credentials for later authentication on a
- # socket.
- def login(database, username, password)
- auth[database.to_s] = [username, password]
- end
+ private
- # Log out of +database+. Does not perform the actual log out, but will log
- # out when the socket is used next.
- def logout(database)
- auth.delete(database.to_s)
+ def initialize_copy(_)
+ @nodes = @nodes.map &:dup
end
end
-
end