lib/moped/cluster.rb in moped-1.5.3 vs lib/moped/cluster.rb in moped-2.0.0.beta
- old
+ new
@@ -1,36 +1,60 @@
+# encoding: utf-8
+require "moped/node"
+
module Moped
# The cluster represents a cluster of MongoDB server nodes, either a single
# node, a replica set, or a mongos server.
+ #
+ # @since 1.0.0
class Cluster
- # @attribute [r] options The cluster options.
- # @attribute [r] seeds The seeds the cluster was initialized with.
- attr_reader :options, :seeds
+ # The default interval that a node would be flagged as "down".
+ #
+ # @since 2.0.0
+ DOWN_INTERVAL = 30
+ # The default interval that a node should be refreshed in.
+ #
+ # @since 2.0.0
+ REFRESH_INTERVAL = 300
+
+ # The default time to wait to retry an operation.
+ #
+ # @since 2.0.0
+ RETRY_INTERVAL = 0.25
+
+ # @!attribute options
+ # @return [ Hash ] The refresh options.
+ # @!attribute peers
+ # @return [ Array<Node> ] The node peers.
+ # @!attribute seeds
+ # @return [ Array<Node> ] The seed nodes.
+ attr_reader :options, :peers, :seeds
+
# Get the credentials for the cluster.
#
- # @example Get the authentication details.
- # cluster.auth
+ # @example Get the applied credentials.
+ # node.credentials
#
- # @return [ Hash ] the cached authentication credentials for this cluster.
+ # @return [ Hash ] The credentials.
#
- # @since 1.0.0
- def auth
- @auth ||= {}
+ # @since 2.0.0
+ def credentials
+ @credentials ||= {}
end
# Disconnects all nodes in the cluster. This should only be used in cases
# where you know you're not going to use the cluster on the thread anymore
# and need to force the connections to close.
#
# @return [ true ] True if the disconnect succeeded.
#
# @since 1.2.0
def disconnect
- nodes(include_arbiters: true).each { |node| node.disconnect } and true
+ nodes.each { |node| node.disconnect } and true
end
# Get the interval at which a node should be flagged as down before
# retrying.
#
@@ -39,51 +63,13 @@
#
# @return [ Integer ] The down interval.
#
# @since 1.2.7
def down_interval
- options[:down_interval]
+ @down_interval ||= options[:down_interval] || DOWN_INTERVAL
end
- # Get the number of times an operation should be retried before raising an
- # error.
- #
- # @example Get the maximum retries.
- # cluster.max_retries
- #
- # @return [ Integer ] The max retries.
- #
- # @since 1.2.7
- def max_retries
- options[:max_retries]
- end
-
- # Get the interval in which the node list should be refreshed.
- #
- # @example Get the refresh interval, in seconds.
- # cluster.refresh_interval
- #
- # @return [ Integer ] The refresh interval.
- #
- # @since 1.2.7
- def refresh_interval
- options[:refresh_interval]
- end
-
- # Get the operation retry interval - the time to wait before retrying a
- # single operation.
- #
- # @example Get the retry interval, in seconds.
- # cluster.retry_interval
- #
- # @return [ Integer ] The retry interval.
- #
- # @since 1.2.7
- def retry_interval
- options[:retry_interval]
- end
-
# Initialize the new cluster.
#
# @example Initialize the cluster.
# Cluster.new([ "localhost:27017" ], down_interval: 20)
#
@@ -96,51 +82,63 @@
# @option options [ Integer ] :timeout The time in seconds to wait for an
# operation to timeout. (5)
#
# @since 1.0.0
def initialize(hosts, options)
- @seeds = hosts
- @nodes = hosts.map { |host| Node.new(host, options) }
+ @seeds = hosts.map{ |host| Node.new(host, options) }
@peers = []
+ @options = options
+ end
- @options = {
- down_interval: 30,
- max_retries: 20,
- refresh_interval: 300,
- retry_interval: 0.25
- }.merge(options)
+ # Provide a pretty string for cluster inspection.
+ #
+ # @example Inspect the cluster.
+ # cluster.inspect
+ #
+ # @return [ String ] A nicely formatted string.
+ #
+ # @since 1.0.0
+ def inspect
+ "#<#{self.class.name}:#{object_id} @seeds=#{seeds.inspect}>"
end
+ # Get the number of times an operation should be retried before raising an
+ # error.
+ #
+ # @example Get the maximum retries.
+ # cluster.max_retries
+ #
+ # @return [ Integer ] The max retries.
+ #
+ # @since 1.2.7
+ def max_retries
+ @max_retries ||= options[:max_retries] || seeds.size
+ end
+
# Returns the list of available nodes, refreshing 1) any nodes which were
# down and ready to be checked again and 2) any nodes whose information is
# out of date. Arbiter nodes are not returned.
#
# @example Get the available nodes.
# cluster.nodes
#
# @return [ Array<Node> ] the list of available nodes.
#
# @since 1.0.0
- def nodes(opts = {})
- current_time = Time.new
- down_boundary = current_time - down_interval
- refresh_boundary = current_time - refresh_interval
-
+ def nodes
# Find the nodes that were down but are ready to be refreshed, or those
# with stale connection information.
- needs_refresh, available = @nodes.partition do |node|
- node.down? ? (node.down_at < down_boundary) : node.needs_refresh?(refresh_boundary)
+ needs_refresh, available = seeds.partition do |node|
+ refreshable?(node)
end
# Refresh those nodes.
- available.concat refresh(needs_refresh)
+ available.concat(refresh(needs_refresh))
# Now return all the nodes that are available and participating in the
# replica set.
- available.reject do |node|
- node.down? || !member?(node) || (!opts[:include_arbiters] && node.arbiter?)
- end
+ available.reject{ |node| node.down? }
end
# Refreshes information for each of the nodes provided. The node list
# defaults to the list of all known nodes.
#
@@ -153,41 +151,61 @@
# @param [ Array<Node> ] nodes_to_refresh The nodes to refresh.
#
# @return [ Array<Node> ] the available nodes
#
# @since 1.0.0
- def refresh(nodes_to_refresh = @nodes)
+ def refresh(nodes_to_refresh = seeds)
refreshed_nodes = []
seen = {}
-
# Set up a recursive lambda function for refreshing a node and it's peers.
refresh_node = ->(node) do
unless seen[node]
seen[node] = true
-
# Add the node to the global list of known nodes.
- @nodes << node unless @nodes.include?(node)
-
+ seeds.push(node) unless seeds.include?(node)
begin
node.refresh
-
# This node is good, so add it to the list of nodes to return.
- refreshed_nodes << node unless refreshed_nodes.include?(node)
-
+ refreshed_nodes.push(node) unless refreshed_nodes.include?(node)
# Now refresh any newly discovered peer nodes - this will also
# remove nodes that are not included in the peer list.
refresh_peers(node, &refresh_node)
rescue Errors::ConnectionFailure
# We couldn't connect to the node.
end
end
end
nodes_to_refresh.each(&refresh_node)
- refreshed_nodes.to_a
+ refreshed_nodes
end
+ # Get the interval in which the node list should be refreshed.
+ #
+ # @example Get the refresh interval, in seconds.
+ # cluster.refresh_interval
+ #
+ # @return [ Integer ] The refresh interval.
+ #
+ # @since 1.2.7
+ def refresh_interval
+ @refresh_interval ||= options[:refresh_interval] || REFRESH_INTERVAL
+ end
+
+ # Get the operation retry interval - the time to wait before retrying a
+ # single operation.
+ #
+ # @example Get the retry interval, in seconds.
+ # cluster.retry_interval
+ #
+ # @return [ Integer ] The retry interval.
+ #
+ # @since 1.2.7
+ def retry_interval
+ @retry_interval ||= options[:retry_interval] || RETRY_INTERVAL
+ end
+
# Yields the replica set's primary node to the provided block. This method
# will retry the block in case of connection errors or replica set
# reconfiguration.
#
# @example Yield the primary to the block.
@@ -200,34 +218,20 @@
# @raises [ ConnectionFailure ] When no primary node can be found
#
# @return [ Object ] The result of the yield.
#
# @since 1.0.0
- def with_primary(retries = max_retries, &block)
+ def with_primary(&block)
if node = nodes.find(&:primary?)
begin
node.ensure_primary do
- return yield node.apply_auth(auth)
+ return yield(node.apply_credentials(credentials))
end
rescue Errors::ConnectionFailure, Errors::ReplicaSetReconfigured
- # Fall through to the code below if our connection was dropped or the
- # node is no longer the primary.
end
end
-
- if retries > 0
- # We couldn't find a primary node, so refresh the list and try again.
- warning(" MOPED: Retrying connection to primary for replica set #{inspect}")
- sleep(retry_interval)
- refresh
- with_primary(retries - 1, &block)
- else
- raise(
- Errors::ConnectionFailure,
- "Could not connect to a primary node for replica set #{inspect}"
- )
- end
+ raise Errors::ConnectionFailure, "Could not connect to a primary node for replica set #{inspect}"
end
# Yields a secondary node if available, otherwise the primary node. This
# method will retry the block in case of connection errors.
#
@@ -241,66 +245,95 @@
# @raises [ ConnectionFailure ] When no primary node can be found
#
# @return [ Object ] The result of the yield.
#
# @since 1.0.0
- def with_secondary(retries = max_retries, &block)
- available_nodes = nodes.shuffle!.partition(&:secondary?).flatten
-
+ def with_secondary(&block)
+ available_nodes = nodes.select(&:secondary?).shuffle!
while node = available_nodes.shift
begin
- return yield node.apply_auth(auth)
- rescue Errors::ConnectionFailure
- warning(" MOPED: Connection failed to secondary node #{node.inspect}, trying next node.")
- # That node's no good, so let's try the next one.
+ return yield(node.apply_credentials(credentials))
+ rescue Errors::ConnectionFailure => e
next
- rescue Errors::ReplicaSetReconfigured
- # That node's no good, so let's try the next one.
- next
end
end
-
- if retries > 0
- # We couldn't find a secondary or primary node, so refresh the list and
- # try again.
- warning(" MOPED: Could not connect to any node in replica set #{inspect}, refreshing list.")
- sleep(retry_interval)
- refresh
- with_secondary(retries - 1, &block)
- else
- raise(
- Errors::ConnectionFailure,
- "Could not connect to any secondary or primary nodes for replica set #{inspect}"
- )
- end
+ raise Errors::ConnectionFailure, "Could not connect to a secondary node for replica set #{inspect}"
end
- def inspect
- "<#{self.class.name} nodes=#{@nodes.inspect}>"
- end
-
private
- def initialize_copy(_)
- @nodes = @nodes.map(&:dup)
+ # Get the boundary where a node that is down would need to be refreshed.
+ #
+ # @api private
+ #
+ # @example Get the down boundary.
+ # cluster.down_boundary
+ #
+ # @return [ Time ] The down boundary.
+ #
+ # @since 2.0.0
+ def down_boundary
+ Time.new - down_interval
end
- def member?(node)
- @peers.empty? || @peers.include?(node)
+ # Get the standard refresh boundary to discover new nodes.
+ #
+ # @api private
+ #
+ # @example Get the refresh boundary.
+ # cluster.refresh_boundary
+ #
+ # @return [ Time ] The refresh boundary.
+ #
+ # @since 2.0.0
+ def refresh_boundary
+ Time.new - refresh_interval
end
- def refresh_peers(node, &block)
- peers = node.peers
- return if !peers || peers.empty?
- peers.each do |node|
- block.call(node) unless @nodes.include?(node)
- @peers.push(node) unless peers.include?(node)
- end
+ # Is the provided node refreshable? This is in the case where the refresh
+ # boundary has passed, or the node has been down longer than the down
+ # boundary.
+ #
+ # @api private
+ #
+ # @example Is the node refreshable?
+ # cluster.refreshable?(node)
+ #
+ # @param [ Node ] node The Node to check.
+ #
+ # @since 2.0.0
+ def refreshable?(node)
+ node.down? ? node.down_at < down_boundary : node.needs_refresh?(refresh_boundary)
end
- def warning(message)
- if logger = Moped.logger
- logger.warn(message)
+ # Creating a cloned cluster requires cloning all the seed nodes.
+ #
+ # @api prviate
+ #
+ # @example Clone the cluster.
+ # cluster.clone
+ #
+ # @return [ Cluster ] The cloned cluster.
+ #
+ # @since 1.0.0
+ def initialize_copy(_)
+ @seeds = seeds.map(&:dup)
+ end
+
+ # Refresh the peers based on the node's peers.
+ #
+ # @api private
+ #
+ # @example Refresh the peers.
+ # cluster.refresh_peers(node)
+ #
+ # @param [ Node ] node The node to refresh the peers for.
+ #
+ # @since 1.0.0
+ def refresh_peers(node, &block)
+ node.peers.each do |node|
+ block.call(node) unless seeds.include?(node)
+ peers.push(node) unless peers.include?(node)
end
end
end
end