lib/moped/cluster.rb in moped-1.5.3 vs lib/moped/cluster.rb in moped-2.0.0.beta

- old
+ new

@@ -1,36 +1,60 @@ +# encoding: utf-8 +require "moped/node" + module Moped # The cluster represents a cluster of MongoDB server nodes, either a single # node, a replica set, or a mongos server. + # + # @since 1.0.0 class Cluster - # @attribute [r] options The cluster options. - # @attribute [r] seeds The seeds the cluster was initialized with. - attr_reader :options, :seeds + # The default interval that a node would be flagged as "down". + # + # @since 2.0.0 + DOWN_INTERVAL = 30 + # The default interval that a node should be refreshed in. + # + # @since 2.0.0 + REFRESH_INTERVAL = 300 + + # The default time to wait to retry an operation. + # + # @since 2.0.0 + RETRY_INTERVAL = 0.25 + + # @!attribute options + # @return [ Hash ] The refresh options. + # @!attribute peers + # @return [ Array<Node> ] The node peers. + # @!attribute seeds + # @return [ Array<Node> ] The seed nodes. + attr_reader :options, :peers, :seeds + # Get the credentials for the cluster. # - # @example Get the authentication details. - # cluster.auth + # @example Get the applied credentials. + # node.credentials # - # @return [ Hash ] the cached authentication credentials for this cluster. + # @return [ Hash ] The credentials. # - # @since 1.0.0 - def auth - @auth ||= {} + # @since 2.0.0 + def credentials + @credentials ||= {} end # Disconnects all nodes in the cluster. This should only be used in cases # where you know you're not going to use the cluster on the thread anymore # and need to force the connections to close. # # @return [ true ] True if the disconnect succeeded. # # @since 1.2.0 def disconnect - nodes(include_arbiters: true).each { |node| node.disconnect } and true + nodes.each { |node| node.disconnect } and true end # Get the interval at which a node should be flagged as down before # retrying. # @@ -39,51 +63,13 @@ # # @return [ Integer ] The down interval. # # @since 1.2.7 def down_interval - options[:down_interval] + @down_interval ||= options[:down_interval] || DOWN_INTERVAL end - # Get the number of times an operation should be retried before raising an - # error. - # - # @example Get the maximum retries. - # cluster.max_retries - # - # @return [ Integer ] The max retries. - # - # @since 1.2.7 - def max_retries - options[:max_retries] - end - - # Get the interval in which the node list should be refreshed. - # - # @example Get the refresh interval, in seconds. - # cluster.refresh_interval - # - # @return [ Integer ] The refresh interval. - # - # @since 1.2.7 - def refresh_interval - options[:refresh_interval] - end - - # Get the operation retry interval - the time to wait before retrying a - # single operation. - # - # @example Get the retry interval, in seconds. - # cluster.retry_interval - # - # @return [ Integer ] The retry interval. - # - # @since 1.2.7 - def retry_interval - options[:retry_interval] - end - # Initialize the new cluster. # # @example Initialize the cluster. # Cluster.new([ "localhost:27017" ], down_interval: 20) # @@ -96,51 +82,63 @@ # @option options [ Integer ] :timeout The time in seconds to wait for an # operation to timeout. (5) # # @since 1.0.0 def initialize(hosts, options) - @seeds = hosts - @nodes = hosts.map { |host| Node.new(host, options) } + @seeds = hosts.map{ |host| Node.new(host, options) } @peers = [] + @options = options + end - @options = { - down_interval: 30, - max_retries: 20, - refresh_interval: 300, - retry_interval: 0.25 - }.merge(options) + # Provide a pretty string for cluster inspection. + # + # @example Inspect the cluster. + # cluster.inspect + # + # @return [ String ] A nicely formatted string. + # + # @since 1.0.0 + def inspect + "#<#{self.class.name}:#{object_id} @seeds=#{seeds.inspect}>" end + # Get the number of times an operation should be retried before raising an + # error. + # + # @example Get the maximum retries. + # cluster.max_retries + # + # @return [ Integer ] The max retries. + # + # @since 1.2.7 + def max_retries + @max_retries ||= options[:max_retries] || seeds.size + end + # Returns the list of available nodes, refreshing 1) any nodes which were # down and ready to be checked again and 2) any nodes whose information is # out of date. Arbiter nodes are not returned. # # @example Get the available nodes. # cluster.nodes # # @return [ Array<Node> ] the list of available nodes. # # @since 1.0.0 - def nodes(opts = {}) - current_time = Time.new - down_boundary = current_time - down_interval - refresh_boundary = current_time - refresh_interval - + def nodes # Find the nodes that were down but are ready to be refreshed, or those # with stale connection information. - needs_refresh, available = @nodes.partition do |node| - node.down? ? (node.down_at < down_boundary) : node.needs_refresh?(refresh_boundary) + needs_refresh, available = seeds.partition do |node| + refreshable?(node) end # Refresh those nodes. - available.concat refresh(needs_refresh) + available.concat(refresh(needs_refresh)) # Now return all the nodes that are available and participating in the # replica set. - available.reject do |node| - node.down? || !member?(node) || (!opts[:include_arbiters] && node.arbiter?) - end + available.reject{ |node| node.down? } end # Refreshes information for each of the nodes provided. The node list # defaults to the list of all known nodes. # @@ -153,41 +151,61 @@ # @param [ Array<Node> ] nodes_to_refresh The nodes to refresh. # # @return [ Array<Node> ] the available nodes # # @since 1.0.0 - def refresh(nodes_to_refresh = @nodes) + def refresh(nodes_to_refresh = seeds) refreshed_nodes = [] seen = {} - # Set up a recursive lambda function for refreshing a node and it's peers. refresh_node = ->(node) do unless seen[node] seen[node] = true - # Add the node to the global list of known nodes. - @nodes << node unless @nodes.include?(node) - + seeds.push(node) unless seeds.include?(node) begin node.refresh - # This node is good, so add it to the list of nodes to return. - refreshed_nodes << node unless refreshed_nodes.include?(node) - + refreshed_nodes.push(node) unless refreshed_nodes.include?(node) # Now refresh any newly discovered peer nodes - this will also # remove nodes that are not included in the peer list. refresh_peers(node, &refresh_node) rescue Errors::ConnectionFailure # We couldn't connect to the node. end end end nodes_to_refresh.each(&refresh_node) - refreshed_nodes.to_a + refreshed_nodes end + # Get the interval in which the node list should be refreshed. + # + # @example Get the refresh interval, in seconds. + # cluster.refresh_interval + # + # @return [ Integer ] The refresh interval. + # + # @since 1.2.7 + def refresh_interval + @refresh_interval ||= options[:refresh_interval] || REFRESH_INTERVAL + end + + # Get the operation retry interval - the time to wait before retrying a + # single operation. + # + # @example Get the retry interval, in seconds. + # cluster.retry_interval + # + # @return [ Integer ] The retry interval. + # + # @since 1.2.7 + def retry_interval + @retry_interval ||= options[:retry_interval] || RETRY_INTERVAL + end + # Yields the replica set's primary node to the provided block. This method # will retry the block in case of connection errors or replica set # reconfiguration. # # @example Yield the primary to the block. @@ -200,34 +218,20 @@ # @raises [ ConnectionFailure ] When no primary node can be found # # @return [ Object ] The result of the yield. # # @since 1.0.0 - def with_primary(retries = max_retries, &block) + def with_primary(&block) if node = nodes.find(&:primary?) begin node.ensure_primary do - return yield node.apply_auth(auth) + return yield(node.apply_credentials(credentials)) end rescue Errors::ConnectionFailure, Errors::ReplicaSetReconfigured - # Fall through to the code below if our connection was dropped or the - # node is no longer the primary. end end - - if retries > 0 - # We couldn't find a primary node, so refresh the list and try again. - warning(" MOPED: Retrying connection to primary for replica set #{inspect}") - sleep(retry_interval) - refresh - with_primary(retries - 1, &block) - else - raise( - Errors::ConnectionFailure, - "Could not connect to a primary node for replica set #{inspect}" - ) - end + raise Errors::ConnectionFailure, "Could not connect to a primary node for replica set #{inspect}" end # Yields a secondary node if available, otherwise the primary node. This # method will retry the block in case of connection errors. # @@ -241,66 +245,95 @@ # @raises [ ConnectionFailure ] When no primary node can be found # # @return [ Object ] The result of the yield. # # @since 1.0.0 - def with_secondary(retries = max_retries, &block) - available_nodes = nodes.shuffle!.partition(&:secondary?).flatten - + def with_secondary(&block) + available_nodes = nodes.select(&:secondary?).shuffle! while node = available_nodes.shift begin - return yield node.apply_auth(auth) - rescue Errors::ConnectionFailure - warning(" MOPED: Connection failed to secondary node #{node.inspect}, trying next node.") - # That node's no good, so let's try the next one. + return yield(node.apply_credentials(credentials)) + rescue Errors::ConnectionFailure => e next - rescue Errors::ReplicaSetReconfigured - # That node's no good, so let's try the next one. - next end end - - if retries > 0 - # We couldn't find a secondary or primary node, so refresh the list and - # try again. - warning(" MOPED: Could not connect to any node in replica set #{inspect}, refreshing list.") - sleep(retry_interval) - refresh - with_secondary(retries - 1, &block) - else - raise( - Errors::ConnectionFailure, - "Could not connect to any secondary or primary nodes for replica set #{inspect}" - ) - end + raise Errors::ConnectionFailure, "Could not connect to a secondary node for replica set #{inspect}" end - def inspect - "<#{self.class.name} nodes=#{@nodes.inspect}>" - end - private - def initialize_copy(_) - @nodes = @nodes.map(&:dup) + # Get the boundary where a node that is down would need to be refreshed. + # + # @api private + # + # @example Get the down boundary. + # cluster.down_boundary + # + # @return [ Time ] The down boundary. + # + # @since 2.0.0 + def down_boundary + Time.new - down_interval end - def member?(node) - @peers.empty? || @peers.include?(node) + # Get the standard refresh boundary to discover new nodes. + # + # @api private + # + # @example Get the refresh boundary. + # cluster.refresh_boundary + # + # @return [ Time ] The refresh boundary. + # + # @since 2.0.0 + def refresh_boundary + Time.new - refresh_interval end - def refresh_peers(node, &block) - peers = node.peers - return if !peers || peers.empty? - peers.each do |node| - block.call(node) unless @nodes.include?(node) - @peers.push(node) unless peers.include?(node) - end + # Is the provided node refreshable? This is in the case where the refresh + # boundary has passed, or the node has been down longer than the down + # boundary. + # + # @api private + # + # @example Is the node refreshable? + # cluster.refreshable?(node) + # + # @param [ Node ] node The Node to check. + # + # @since 2.0.0 + def refreshable?(node) + node.down? ? node.down_at < down_boundary : node.needs_refresh?(refresh_boundary) end - def warning(message) - if logger = Moped.logger - logger.warn(message) + # Creating a cloned cluster requires cloning all the seed nodes. + # + # @api prviate + # + # @example Clone the cluster. + # cluster.clone + # + # @return [ Cluster ] The cloned cluster. + # + # @since 1.0.0 + def initialize_copy(_) + @seeds = seeds.map(&:dup) + end + + # Refresh the peers based on the node's peers. + # + # @api private + # + # @example Refresh the peers. + # cluster.refresh_peers(node) + # + # @param [ Node ] node The node to refresh the peers for. + # + # @since 1.0.0 + def refresh_peers(node, &block) + node.peers.each do |node| + block.call(node) unless seeds.include?(node) + peers.push(node) unless peers.include?(node) end end end end