# Copyright (C) 2009-2013 MongoDB, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
module Mongo
# Instantiates and manages connections to a MongoDB replica set.
class MongoReplicaSetClient < MongoClient
include ReadPreference
include ThreadLocalVariableManager
REPL_SET_OPTS = [
:refresh_mode,
:refresh_interval,
:read_secondary,
:rs_name,
:name
]
attr_reader :replica_set_name,
:seeds,
:refresh_interval,
:refresh_mode,
:refresh_version,
:manager
# Create a connection to a MongoDB replica set.
#
# If no args are provided, it will check ENV["MONGODB_URI"]
.
#
# Once connected to a replica set, you can find out which nodes are primary, secondary, and
# arbiters with the corresponding accessors: MongoClient#primary, MongoClient#secondaries, and
# MongoClient#arbiters. This is useful if your application needs to connect manually to nodes other
# than the primary.
#
# @overload initialize(seeds=ENV["MONGODB_URI"], opts={})
# @param [Array, Array] seeds
#
# @option opts [String, Integer, Symbol] :w (1) Set default number of nodes to which a write
# should be acknowledged.
# @option opts [Integer] :wtimeout (nil) Set replica set acknowledgement timeout.
# @option opts [Boolean] :j (false) If true, block until write operations have been committed
# to the journal. Cannot be used in combination with 'fsync'. Prior to MongoDB 2.6 this option was
# ignored if the server was running without journaling. Starting with MongoDB 2.6, write operations will
# fail with an exception if this option is used when the server is running without journaling.
# @option opts [Boolean] :fsync (false) If true, and the server is running without journaling, blocks until
# the server has synced all data files to disk. If the server is running with journaling, this acts the same as
# the 'j' option, blocking until write operations have been committed to the journal.
# Cannot be used in combination with 'j'.
#
# Notes about write concern options:
# Write concern options are propagated to objects instantiated from this MongoReplicaSetClient.
# These defaults can be overridden upon instantiation of any object by explicitly setting an options hash
# on initialization.
# @option opts [:primary, :primary_preferred, :secondary, :secondary_preferred, :nearest] :read (:primary)
# A "read preference" determines the candidate replica set members to which a query or command can be sent.
# [:primary]
# * Read from primary only.
# * Cannot be combined with tags.
# [:primary_preferred]
# * Read from primary if available, otherwise read from a secondary.
# [:secondary]
# * Read from secondary if available.
# [:secondary_preferred]
# * Read from a secondary if available, otherwise read from the primary.
# [:nearest]
# * Read from any member.
# @option opts [Array Tag Value }>] :tag_sets ([])
# Read from replica-set members with these tags.
# @option opts [Integer] :secondary_acceptable_latency_ms (15) The acceptable
# nearest available member for a member to be considered "near".
# @option opts [Logger] :logger (nil) Logger instance to receive driver operation log.
# @option opts [Integer] :pool_size (1) The maximum number of socket connections allowed per
# connection pool. Note: this setting is relevant only for multi-threaded applications.
# @option opts [Float] :pool_timeout (5.0) When all of the connections a pool are checked out,
# this is the number of seconds to wait for a new connection to be released before throwing an exception.
# Note: this setting is relevant only for multi-threaded applications.
# @option opts [Float] :op_timeout (DEFAULT_OP_TIMEOUT) The number of seconds to wait for a read operation to time out.
# Set to DEFAULT_OP_TIMEOUT (20) by default. A value of nil may be specified explicitly.
# @option opts [Float] :connect_timeout (30) The number of seconds to wait before timing out a
# connection attempt.
# @option opts [Boolean] :ssl (false) If true, create the connection to the server using SSL.
# @option opts [String] :ssl_cert (nil) The certificate file used to identify the local connection against MongoDB.
# @option opts [String] :ssl_key (nil) The private keyfile used to identify the local connection against MongoDB.
# Note that even if the key is stored in the same file as the certificate, both need to be explicitly specified.
# @option opts [String] :ssl_key_pass_phrase (nil) A passphrase for the private key.
# @option opts [Boolean] :ssl_verify (nil) Specifies whether or not peer certification validation should occur.
# @option opts [String] :ssl_ca_cert (nil) The ca_certs file contains a set of concatenated "certification authority"
# certificates, which are used to validate certificates passed from the other end of the connection.
# Required for :ssl_verify.
# @option opts [Boolean] :refresh_mode (false) Set this to :sync to periodically update the
# state of the connection every :refresh_interval seconds. Replica set connection failures
# will always trigger a complete refresh. This option is useful when you want to add new nodes
# or remove replica set nodes not currently in use by the driver.
# @option opts [Integer] :refresh_interval (90) If :refresh_mode is enabled, this is the number of seconds
# between calls to check the replica set's state.
# @note the number of seed nodes does not have to be equal to the number of replica set members.
# The purpose of seed nodes is to permit the driver to find at least one replica set member even if a member is down.
#
# @example Connect to a replica set and provide two seed nodes.
# MongoReplicaSetClient.new(['localhost:30000', 'localhost:30001'])
#
# @example Connect to a replica set providing two seed nodes and ensuring a connection to the replica set named 'prod':
# MongoReplicaSetClient.new(['localhost:30000', 'localhost:30001'], :name => 'prod')
#
# @example Connect to a replica set providing two seed nodes and allowing reads from a secondary node:
# MongoReplicaSetClient.new(['localhost:30000', 'localhost:30001'], :read => :secondary)
#
# @see http://api.mongodb.org/ruby/current/file.REPLICA_SETS.html Replica sets in Ruby
#
# @raise [MongoArgumentError] This is raised for usage errors.
#
# @raise [ConnectionFailure] This is raised for the various connection failures.
def initialize(*args)
opts = args.last.is_a?(Hash) ? args.pop : {}
nodes = args.shift || []
raise MongoArgumentError, "Too many arguments" unless args.empty?
# This is temporary until support for the old format is dropped
@seeds = nodes.collect do |node|
if node.is_a?(Array)
warn "Initiating a MongoReplicaSetClient with seeds passed as individual [host, port] array arguments is deprecated."
warn "Please specify hosts as an array of 'host:port' strings; the old format will be removed in v2.0"
node
elsif node.is_a?(String)
Support.normalize_seeds(node)
else
raise MongoArgumentError "Bad seed format!"
end
end
if @seeds.empty? && ENV.has_key?('MONGODB_URI')
parser = URIParser.new ENV['MONGODB_URI']
if parser.direct?
raise MongoArgumentError,
"ENV['MONGODB_URI'] implies a direct connection."
end
opts = parser.connection_options.merge! opts
@seeds = parser.nodes
end
if @seeds.length.zero?
raise MongoArgumentError, "A MongoReplicaSetClient requires at least one seed node."
end
@seeds.freeze
# Refresh
@last_refresh = Time.now
@refresh_version = 0
# No connection manager by default.
@manager = nil
# Lock for request ids.
@id_lock = Mutex.new
@connected = false
@connect_mutex = Mutex.new
@mongos = false
check_opts(opts)
setup(opts.dup)
end
def valid_opts
super + REPL_SET_OPTS - CLIENT_ONLY_OPTS
end
def inspect
""
end
# Initiate a connection to the replica set.
def connect(force = !connected?)
return unless force
log(:info, "Connecting...")
# Prevent recursive connection attempts from the same thread.
# This is done rather than using a Monitor to prevent potentially recursing
# infinitely while attempting to connect and continually failing. Instead, fail fast.
raise ConnectionFailure, "Failed to get node data." if thread_local[:locks][:connecting] == true
current_version = @refresh_version
@connect_mutex.synchronize do
# don't try to connect if another thread has done so while we were waiting for the lock
return unless current_version == @refresh_version
begin
thread_local[:locks][:connecting] = true
if @manager
ensure_manager
@manager.refresh!(@seeds)
else
@manager = PoolManager.new(self, @seeds)
ensure_manager
@manager.connect
end
ensure
thread_local[:locks][:connecting] = false
end
@refresh_version += 1
if @manager.pools.empty?
close
raise ConnectionFailure, "Failed to connect to any node."
end
check_wire_version_in_range
@connected = true
end
end
# Determine whether a replica set refresh is
# required. If so, run a hard refresh. You can
# force a hard refresh by running
# MongoReplicaSetClient#hard_refresh!
#
# @return [Boolean] +true+ unless a hard refresh
# is run and the refresh lock can't be acquired.
def refresh(opts={})
if !connected?
log(:info, "Trying to check replica set health but not " +
"connected...")
return hard_refresh!
end
log(:debug, "Checking replica set connection health...")
ensure_manager
@manager.check_connection_health
if @manager.refresh_required?
return hard_refresh!
end
return true
end
# Force a hard refresh of this connection's view
# of the replica set.
#
# @return [Boolean] +true+ if hard refresh
# occurred. +false+ is returned when unable
# to get the refresh lock.
def hard_refresh!
log(:info, "Initiating hard refresh...")
connect(true)
return true
end
def connected?
@connected && !@manager.pools.empty?
end
# @deprecated
def connecting?
warn "MongoReplicaSetClient#connecting? is deprecated and will be removed in v2.0."
false
end
# The replica set primary's host name.
#
# @return [String]
def host
@manager.primary_pool.host
end
# The replica set primary's port.
#
# @return [Integer]
def port
@manager.primary_pool.port
end
def nodes
warn "MongoReplicaSetClient#nodes is DEPRECATED and will be removed in v2.0. " +
"Please use MongoReplicaSetClient#seeds instead."
@seeds
end
# Determine whether we're reading from a primary node. If false,
# this connection connects to a secondary node and @read_secondaries is true.
#
# @return [Boolean]
def read_primary?
read_pool == primary_pool
end
alias :primary? :read_primary?
# Close the connection to the database.
def close(opts={})
if opts[:soft]
@manager.close(:soft => true) if @manager
else
@manager.close if @manager
end
# Clear the reference to this object.
thread_local[:managers].delete(self)
unpin_pool
@connected = false
end
# If a ConnectionFailure is raised, this method will be called
# to close the connection and reset connection values.
# @deprecated
def reset_connection
close
warn "MongoReplicaSetClient#reset_connection is now deprecated and will be removed in v2.0. " +
"Use MongoReplicaSetClient#close instead."
end
# Returns +true+ if it's okay to read from a secondary node.
#
# This method exist primarily so that Cursor objects will
# generate query messages with a slaveOkay value of +true+.
#
# @return [Boolean] +true+
def slave_ok?
@read != :primary
end
# Generic socket checkout
# Takes a block that returns a socket from pool
def checkout
ensure_manager
connected? ? sync_refresh : connect
begin
socket = yield
rescue => ex
checkin(socket) if socket
raise ex
end
if socket
return socket
else
@connected = false
raise ConnectionFailure.new("Could not checkout a socket.")
end
end
def checkout_reader(read_pref={})
checkout do
pool = read_pool(read_pref)
get_socket_from_pool(pool)
end
end
# Checkout a socket for writing (i.e., a primary node).
def checkout_writer
checkout do
get_socket_from_pool(primary_pool)
end
end
# Checkin a socket used for reading.
def checkin(socket)
if socket && socket.pool
socket.checkin
end
sync_refresh
end
def ensure_manager
thread_local[:managers][self] = @manager
end
def pinned_pool
thread_local[:pinned_pools][@manager.object_id] if @manager
end
def pin_pool(pool, read_preference)
if @manager
thread_local[:pinned_pools][@manager.object_id] = {
:pool => pool,
:read_preference => read_preference
}
end
end
def unpin_pool
thread_local[:pinned_pools].delete @manager.object_id if @manager
end
def get_socket_from_pool(pool)
begin
pool.checkout if pool
rescue ConnectionFailure
nil
end
end
def local_manager
thread_local[:managers][self]
end
def arbiters
local_manager.arbiters.nil? ? [] : local_manager.arbiters
end
def primary
local_manager ? local_manager.primary : nil
end
# Note: might want to freeze these after connecting.
def secondaries
local_manager ? local_manager.secondaries : []
end
def hosts
local_manager ? local_manager.hosts : []
end
def primary_pool
local_manager ? local_manager.primary_pool : nil
end
def secondary_pool
local_manager ? local_manager.secondary_pool : nil
end
def secondary_pools
local_manager ? local_manager.secondary_pools : []
end
def pools
local_manager ? local_manager.pools : []
end
def tag_map
local_manager ? local_manager.tag_map : {}
end
def max_bson_size
return local_manager.max_bson_size if local_manager
DEFAULT_MAX_BSON_SIZE
end
def max_message_size
return local_manager.max_message_size if local_manager
max_bson_size * MESSAGE_SIZE_FACTOR
end
def max_wire_version
return local_manager.max_wire_version if local_manager
0
end
def min_wire_version
return local_manager.min_wire_version if local_manager
0
end
def primary_wire_version_feature?(feature)
local_manager && local_manager.primary_pool && local_manager.primary_pool.node.wire_version_feature?(feature)
end
def max_write_batch_size
local_manager && local_manager.primary_pool && local_manager.primary_pool.node.max_write_batch_size ||
DEFAULT_MAX_WRITE_BATCH_SIZE
end
private
# Parse option hash
def setup(opts)
# Refresh
@refresh_mode = opts.delete(:refresh_mode) || false
@refresh_interval = opts.delete(:refresh_interval) || 90
if @refresh_mode && @refresh_interval < 60
@refresh_interval = 60 unless ENV['TEST_MODE'] = 'TRUE'
end
if @refresh_mode == :async
warn ":async refresh mode has been deprecated. Refresh
mode will be disabled."
elsif ![:sync, false].include?(@refresh_mode)
raise MongoArgumentError,
"Refresh mode must be either :sync or false."
end
if opts[:read_secondary]
warn ":read_secondary options has now been deprecated and will " +
"be removed in driver v2.0. Use the :read option instead."
@read_secondary = opts.delete(:read_secondary) || false
end
# Replica set name
if opts[:rs_name]
warn ":rs_name option has been deprecated and will be removed in v2.0. " +
"Please use :name instead."
@replica_set_name = opts.delete(:rs_name)
else
@replica_set_name = opts.delete(:name)
end
super opts
end
def sync_refresh
if @refresh_mode == :sync &&
((Time.now - @last_refresh) > @refresh_interval)
@last_refresh = Time.now
refresh
end
end
end
end