lib/moped/cluster.rb in moped-1.0.0.alpha vs lib/moped/cluster.rb in moped-1.0.0.beta

- old
+ new

@@ -1,193 +1,145 @@ module Moped - # @api private - # - # The internal class managing connections to both a single node and replica - # sets. - # - # @note Though the socket class itself *is* threadsafe, the cluster presently - # is not. This means that in the course of normal operations sessions can be - # shared across threads, but in failure modes (when a resync is required), - # things can possibly go wrong. class Cluster - # @return [Array] the user supplied seeds + # @return [Array<String>] the seeds the replica set was initialized with attr_reader :seeds - # @return [Boolean] whether this is a direct connection - attr_reader :direct + # @option options :down_interval number of seconds to wait before attempting + # to reconnect to a down node. (30) + # + # @option options :refresh_interval number of seconds to cache information + # about a node. (300) + def initialize(hosts, options) + @options = { + down_interval: 30, + refresh_interval: 300 + }.merge(options) - # @return [Array] all available nodes - attr_reader :servers - - # @return [Array] seeds gathered from cluster discovery - attr_reader :dynamic_seeds - - # @param [Array] seeds an array of host:port pairs - # @param [Boolean] direct (false) whether to connect directly to the hosts - # provided or to find additional available nodes. - def initialize(seeds, direct = false) - @seeds = seeds - @direct = direct - - @servers = [] - @dynamic_seeds = [] + @seeds = hosts + @nodes = hosts.map { |host| Node.new(host) } end - # @return [Array] available secondary nodes - def secondaries - servers.select(&:secondary?) - end + # Refreshes information for each of the nodes provided. The node list + # defaults to the list of all known nodes. + # + # If a node is successfully refreshed, any newly discovered peers will also + # be refreshed. + # + # @return [Array<Node>] the available nodes + def refresh(nodes_to_refresh = @nodes) + refreshed_nodes = [] + seen = {} - # @return [Array] available primary nodes - def primaries - servers.select(&:primary?) - end + # Set up a recursive lambda function for refreshing a node and it's peers. + refresh_node = ->(node) do + unless seen[node] + seen[node] = true - # @return [Array] all known addresses from user supplied seeds, dynamically - # discovered seeds, and active servers. - def known_addresses - [].tap do |addresses| - addresses.concat seeds - addresses.concat dynamic_seeds - addresses.concat servers.map { |server| server.address } - end.uniq - end + # Add the node to the global list of known nodes. + @nodes << node unless @nodes.include?(node) - def remove(server) - servers.delete(server) - end + begin + node.refresh - def reconnect - @servers = servers.map { |server| Server.new(server.address) } - end + # This node is good, so add it to the list of nodes to return. + refreshed_nodes << node unless refreshed_nodes.include?(node) - def sync - known = known_addresses.shuffle - seen = {} - - sync_seed = ->(seed) do - server = Server.new seed - - unless seen[server.resolved_address] - seen[server.resolved_address] = true - - hosts = sync_server(server) - - hosts.each do |host| - sync_seed[host] + # Now refresh any newly discovered peer nodes. + (node.peers - @nodes).each &refresh_node + rescue Errors::ConnectionFailure + # We couldn't connect to the node, so don't do anything with it. end end end - known.each do |seed| - sync_seed[seed] - end + nodes_to_refresh.each &refresh_node - unless servers.empty? - @dynamic_seeds = servers.map(&:address) - end - - true + refreshed_nodes.to_a end - def sync_server(server) - [].tap do |hosts| - socket = server.socket + # Returns the list of available nodes, refreshing 1) any nodes which were + # down and ready to be checked again and 2) any nodes whose information is + # out of date. + # + # @return [Array<Node>] the list of available nodes. + def nodes + # Find the nodes that were down but are ready to be refreshed, or those + # with stale connection information. + needs_refresh, available = @nodes.partition do |node| + (node.down? && node.down_at < (Time.new - @options[:down_interval])) || + node.needs_refresh?(Time.new - @options[:refresh_interval]) + end - if socket.connect - info = socket.simple_query Protocol::Command.new(:admin, ismaster: 1) + # Refresh those nodes. + available.concat refresh(needs_refresh) - if info["ismaster"] - server.primary = true - end + # Now return all the nodes that are available. + available.reject &:down? + end - if info["secondary"] - server.secondary = true + # Yields the replica set's primary node to the provided block. This method + # will retry the block in case of connection errors or replica set + # reconfiguration. + # + # @raises ConnectionFailure when no primary node can be found + def with_primary(retry_on_failure = true, &block) + if node = nodes.find(&:primary?) + begin + node.ensure_primary do + return yield node.apply_auth(auth) end - - if info["primary"] - hosts.push info["primary"] - end - - if info["hosts"] - hosts.concat info["hosts"] - end - - if info["passives"] - hosts.concat info["passives"] - end - - merge(server) - + rescue Errors::ConnectionFailure, Errors::ReplicaSetReconfigured + # Fall through to the code below if our connection was dropped or the + # node is no longer the primary. end - end.uniq - end + end - def merge(server) - previous = servers.find { |other| other == server } - primary = server.primary? - secondary = server.secondary? - - if previous - previous.merge(server) + if retry_on_failure + # We couldn't find a primary node, so refresh the list and try again. + refresh + with_primary(false, &block) else - servers << server + raise Errors::ConnectionFailure, "Could not connect to a primary node for replica set #{inspect}" end end - # @param [:read, :write] mode the type of socket to return - # @return [Socket] a socket valid for +mode+ operations - def socket_for(mode) - sync unless primaries.any? || (secondaries.any? && mode == :read) + # Yields a secondary node if available, otherwise the primary node. This + # method will retry the block in case of connection errors. + # + # @raises ConnectionError when no secondary or primary node can be found + def with_secondary(retry_on_failure = true, &block) + available_nodes = nodes.shuffle!.partition(&:secondary?).flatten - server = nil - while primaries.any? || (secondaries.any? && mode == :read) - if mode == :write || secondaries.empty? - server = primaries.sample - else - server = secondaries.sample + while node = available_nodes.shift + begin + return yield node.apply_auth(auth) + rescue Errors::ConnectionFailure + # That node's no good, so let's try the next one. + next end - - if server - socket = server.socket - socket.connect unless socket.connection - - if socket.alive? - break server - else - remove server - end - end end - unless server - raise Errors::ConnectionFailure.new("Could not connect to any primary or secondary servers") + if retry_on_failure + # We couldn't find a secondary or primary node, so refresh the list and + # try again. + refresh + with_secondary(false, &block) + else + raise Errors::ConnectionFailure, "Could not connect to any secondary or primary nodes for replica set #{inspect}" end - - socket = server.socket - socket.apply_auth auth - socket end # @return [Hash] the cached authentication credentials for this cluster. def auth @auth ||= {} end - # Log in to +database+ with +username+ and +password+. Does not perform the - # actual log in, but saves the credentials for later authentication on a - # socket. - def login(database, username, password) - auth[database.to_s] = [username, password] - end + private - # Log out of +database+. Does not perform the actual log out, but will log - # out when the socket is used next. - def logout(database) - auth.delete(database.to_s) + def initialize_copy(_) + @nodes = @nodes.map &:dup end end - end