# Copyright (C) 2014-2017 MongoDB, Inc. # # Licensed under the Apache License, Version 2.0 (the 'License'); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an 'AS IS' BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. module Mongo # The client is the entry point to the driver and is the main object that # will be interacted with. # # @since 2.0.0 class Client extend Forwardable include Loggable # The options that do not affect the behaviour of a cluster and its # subcomponents. # # @since 2.1.0 CRUD_OPTIONS = [ :database, :read, :write ].freeze # Valid client options. # # @since 2.1.2 VALID_OPTIONS = [ :app_name, :auth_mech, :auth_mech_properties, :auth_source, :connect, :connect_timeout, :compressors, :database, :heartbeat_frequency, :id_generator, :local_threshold, :logger, :max_idle_time, :max_pool_size, :max_read_retries, :min_pool_size, :monitoring, :password, :platform, :read, :read_retry_interval, :replica_set, :retry_writes, :server_selection_timeout, :socket_timeout, :ssl, :ssl_ca_cert, :ssl_ca_cert_string, :ssl_ca_cert_object, :ssl_cert, :ssl_cert_string, :ssl_cert_object, :ssl_key, :ssl_key_string, :ssl_key_object, :ssl_key_pass_phrase, :ssl_verify, :truncate_logs, :user, :wait_queue_timeout, :write, :zlib_compression_level ].freeze # The compression algorithms supported by the driver. # # @since 2.5.0 VALID_COMPRESSORS = [ Mongo::Protocol::Compressed::ZLIB ].freeze # @return [ Mongo::Cluster ] cluster The cluster of servers for the client. attr_reader :cluster # @return [ Mongo::Database ] database The database the client is operating on. attr_reader :database # @return [ Hash ] options The configuration options. attr_reader :options # Delegate command and collections execution to the current database. def_delegators :@database, :command, :collections # Delegate subscription to monitoring. def_delegators :@monitoring, :subscribe, :unsubscribe # Determine if this client is equivalent to another object. # # @example Check client equality. # client == other # # @param [ Object ] other The object to compare to. # # @return [ true, false ] If the objects are equal. # # @since 2.0.0 def ==(other) return false unless other.is_a?(Client) cluster == other.cluster && options == other.options end alias_method :eql?, :== # Get a collection object for the provided collection name. # # @example Get the collection. # client[:users] # # @param [ String, Symbol ] collection_name The name of the collection. # @param [ Hash ] options The options to the collection. # # @return [ Mongo::Collection ] The collection. # # @since 2.0.0 def [](collection_name, options = {}) database[collection_name, options] end # Get the hash value of the client. # # @example Get the client hash value. # client.hash # # @return [ Integer ] The client hash value. # # @since 2.0.0 def hash [cluster, options].hash end # Instantiate a new driver client. # # @example Instantiate a single server or mongos client. # Mongo::Client.new([ '127.0.0.1:27017' ]) # # @example Instantiate a client for a replica set. # Mongo::Client.new([ '127.0.0.1:27017', '127.0.0.1:27021' ]) # # @param [ Array, String ] addresses_or_uri The array of server addresses in the # form of host:port or a MongoDB URI connection string. # @param [ Hash ] options The options to be used by the client. # # @option options [ Symbol ] :auth_mech The authentication mechanism to # use. One of :mongodb_cr, :mongodb_x509, :plain, :scram # @option options [ String ] :auth_source The source to authenticate from. # @option options [ Symbol ] :connect The connection method to use. This # forces the cluster to behave in the specified way instead of # auto-discovering. One of :direct, :replica_set, :sharded # @option options [ String ] :database The database to connect to. # @option options [ Hash ] :auth_mech_properties # @option options [ Float ] :heartbeat_frequency The number of seconds for # the server monitor to refresh it's description via ismaster. # @option options [ Integer ] :local_threshold The local threshold boundary # in seconds for selecting a near server for an operation. # @option options [ Integer ] :server_selection_timeout The timeout in seconds # for selecting a server for an operation. # @option options [ String ] :password The user's password. # @option options [ Integer ] :max_idle_time The maximum seconds a socket can remain idle # since it has been checked in to the pool. # @option options [ Integer ] :max_pool_size The maximum size of the # connection pool. # @option options [ Integer ] :min_pool_size The minimum size of the # connection pool. # @option options [ Float ] :wait_queue_timeout The time to wait, in # seconds, in the connection pool for a connection to be checked in. # @option options [ Float ] :connect_timeout The timeout, in seconds, to # attempt a connection. # @option options [ Array ] :compressors A list of potential compressors to use, in order of preference. # The driver chooses the first compressor that is also supported by the server. Currently the driver only # supports 'zlib'. # @option options [ Hash ] :read The read preference options. They consist of a # mode specified as a symbol, an array of hashes known as tag_sets, # and local_threshold. # :mode can be one of :secondary, :secondary_preferred, :primary, # :primary_preferred, :nearest. # @option options [ Symbol ] :replica_set The name of the replica set to # connect to. Servers not in this replica set will be ignored. # @option options [ true, false ] :ssl Whether to use SSL. # @option options [ String ] :ssl_cert The certificate file used to identify # the connection against MongoDB. This option, if present, takes precedence # over the values of :ssl_cert_string and :ssl_cert_object # @option options [ String ] :ssl_cert_string A string containing the PEM-encoded # certificate used to identify the connection against MongoDB. This option, if present, # takes precedence over the value of :ssl_cert_object # @option options [ OpenSSL::X509::Certificate ] :ssl_cert_object The OpenSSL::X509::Certificate # used to identify the connection against MongoDB # @option options [ String ] :ssl_key The private keyfile used to identify the # connection against MongoDB. Note that even if the key is stored in the same # file as the certificate, both need to be explicitly specified. This option, # if present, takes precedence over the values of :ssl_key_string and :ssl_key_object # @option options [ String ] :ssl_key_string A string containing the PEM-encoded private key # used to identify the connection against MongoDB. This parameter, if present, # takes precedence over the value of option :ssl_key_object # @option options [ OpenSSL::PKey ] :ssl_key_object The private key used to identify the # connection against MongoDB # @option options [ String ] :ssl_key_pass_phrase A passphrase for the private key. # @option options [ true, false ] :ssl_verify Whether or not to do peer certification # validation. # @option options [ String ] :ssl_ca_cert The file containing a set of concatenated # certification authority certifications used to validate certs passed from the # other end of the connection. One of :ssl_ca_cert, :ssl_ca_cert_string or # :ssl_ca_cert_object (in order of priority) is required for :ssl_verify. # @option options [ String ] :ssl_ca_cert_string A string containing a set of concatenated # certification authority certifications used to validate certs passed from the # other end of the connection. One of :ssl_ca_cert, :ssl_ca_cert_string or # :ssl_ca_cert_object (in order of priority) is required for :ssl_verify. # @option options [ Array ] :ssl_ca_cert_object An array of # OpenSSL::X509::Certificate representing the certification authority certifications used # to validate certs passed from the other end of the connection. One of :ssl_ca_cert, # :ssl_ca_cert_string or :ssl_ca_cert_object (in order of priority) is required for :ssl_verify. # @option options [ Float ] :socket_timeout The timeout, in seconds, to # execute operations on a socket. # @option options [ String ] :user The user name. # @option options [ Hash ] :write The write concern options. Can be :w => # Integer|String, :fsync => Boolean, :j => Boolean. # @option options [ true, false ] :monitoring Initializes a client without # any default monitoring if false is provided. # @option options [ Logger ] :logger A custom logger if desired. # @option options [ true, false ] :truncate_logs Whether to truncate the # logs at the default 250 characters. # @option options [ Integer ] :max_read_retries The maximum number of read # retries on mongos query failures. # @option options [ Float ] :read_retry_interval The interval, in seconds, # in which reads on a mongos are retried. # @option options [ Object ] :id_generator A custom object to generate ids # for documents. Must respond to #generate. # @option options [ String, Symbol ] :app_name Application name that is printed to the # mongod logs upon establishing a connection in server versions >= 3.4. # @option options [ String ] :platform Platform information to include in the # metadata printed to the mongod logs upon establishing a connection in server versions >= 3.4. # @option options [ Integer ] :zlib_compression_level The Zlib compression level to use, if using compression. # See Ruby's Zlib module for valid levels. # @option options [ true, false ] :retry_writes Retry writes once when connected to a replica set # or sharded cluster versions 3.6 and up. # # @since 2.0.0 def initialize(addresses_or_uri, options = Options::Redacted.new) @monitoring = Monitoring.new(options) if addresses_or_uri.is_a?(::String) create_from_uri(addresses_or_uri, validate_options!(options)) else create_from_addresses(addresses_or_uri, validate_options!(options)) end yield(self) if block_given? end # Get an inspection of the client as a string. # # @example Inspect the client. # client.inspect # # @return [ String ] The inspection string. # # @since 2.0.0 def inspect "#" end # Get the server selector. It either uses the read preference defined in the client options # or defaults to a Primary server selector. # # @example Get the server selector. # client.server_selector # # @return [ Mongo::ServerSelector ] The server selector using the user-defined read preference # or a Primary server selector default. # # @since 2.5.0 def server_selector @server_selector ||= ServerSelector.get(read_preference || ServerSelector::PRIMARY) end # Get the read preference from the options passed to the client. # # @example Get the read preference. # client.read_preference # # @return [ BSON::Document ] The user-defined read preference. # # @since 2.0.0 def read_preference @read_preference ||= options[:read] end # Use the database with the provided name. This will switch the current # database the client is operating on. # # @example Use the provided database. # client.use(:users) # # @param [ String, Symbol ] name The name of the database to use. # # @return [ Mongo::Client ] The new client with new database. # # @since 2.0.0 def use(name) with(database: name) end # Provides a new client with the passed options merged over the existing # options of this client. Useful for one-offs to change specific options # without altering the original client. # # @example Get a client with changed options. # client.with(:read => { :mode => :primary_preferred }) # # @param [ Hash ] new_options The new options to use. # # @return [ Mongo::Client ] A new client instance. # # @since 2.0.0 def with(new_options = Options::Redacted.new) clone.tap do |client| opts = validate_options!(new_options) client.options.update(opts) Database.create(client) # We can't use the same cluster if some options that would affect it # have changed. if cluster_modifying?(opts) Cluster.create(client) end end end # Get the write concern for this client. If no option was provided, then a # default single server acknowledgement will be used. # # @example Get the client write concern. # client.write_concern # # @return [ Mongo::WriteConcern ] The write concern. # # @since 2.0.0 def write_concern @write_concern ||= WriteConcern.get(options[:write]) end # Close all connections. # # @example Disconnect the client. # client.close # # @return [ true ] Always true. # # @since 2.1.0 def close @cluster.disconnect! and true end # Reconnect the client. # # @example Reconnect the client. # client.reconnect # # @return [ true ] Always true. # # @since 2.1.0 def reconnect @cluster.reconnect! and true end # Get the names of all databases. # # @example Get the database names. # client.database_names # # @param [ Hash ] filter The filter criteria for getting a list of databases. # @param [ Hash ] opts The command options. # # @return [ Array ] The names of the databases. # # @since 2.0.5 def database_names(filter = {}, opts = {}) list_databases(filter, true, opts).collect{ |info| info[Database::NAME] } end # Get info for each database. # # @example Get the info for each database. # client.list_databases # # @param [ Hash ] filter The filter criteria for getting a list of databases. # @param [ true, false ] name_only Whether to only return each database name without full metadata. # @param [ Hash ] opts The command options. # # @return [ Array ] The info for each database. # # @since 2.0.5 def list_databases(filter = {}, name_only = false, opts = {}) cmd = { listDatabases: 1 } cmd[:nameOnly] = !!name_only cmd[:filter] = filter unless filter.empty? use(Database::ADMIN).command(cmd, opts).first[Database::DATABASES] end # Returns a list of Mongo::Database objects. # # @example Get a list of Mongo::Database objects. # client.list_mongo_databases # # @param [ Hash ] filter The filter criteria for getting a list of databases. # @param [ Hash ] opts The command options. # # @return [ Array ] The list of database objects. # # @since 2.5.0 def list_mongo_databases(filter = {}, opts = {}) database_names(filter, opts).collect do |name| Database.new(self, name, options) end end # Start a session. # # @example Start a session. # client.start_session(causal_consistency: true) # # @param [ Hash ] options The session options. # # @note A Session cannot be used by multiple threads at once; session objects are not # thread-safe. # # @return [ Session ] The session. # # @since 2.5.0 def start_session(options = {}) cluster.send(:get_session, options.merge(implicit: false)) || (raise Error::InvalidSession.new(Session::SESSIONS_NOT_SUPPORTED)) end private def get_session(options = {}) cluster.send(:get_session, options) end def with_session(options = {}, &block) cluster.send(:with_session, options, &block) end def create_from_addresses(addresses, opts = Options::Redacted.new) @options = Database::DEFAULT_OPTIONS.merge(opts).freeze @cluster = Cluster.new(addresses, @monitoring, options) @database = Database.new(self, options[:database], options) end def create_from_uri(connection_string, opts = Options::Redacted.new) uri = URI.get(connection_string, opts) @options = validate_options!(Database::DEFAULT_OPTIONS.merge(uri.client_options.merge(opts))).freeze @cluster = Cluster.new(uri.servers, @monitoring, options) @database = Database.new(self, options[:database], options) end def initialize_copy(original) @options = original.options.dup @monitoring = Monitoring.new(@options) @database = nil @read_preference = nil @write_concern = nil end def cluster_modifying?(new_options) cluster_options = new_options.reject do |name| CRUD_OPTIONS.include?(name.to_sym) end cluster_options.any? do |name, value| options[name] != value end end def validate_options!(opts = Options::Redacted.new) return Options::Redacted.new unless opts opts.each.inject(Options::Redacted.new) do |_options, (k, v)| key = k.to_sym if VALID_OPTIONS.include?(key) validate_max_min_pool_size!(key, opts) if key == :compressors compressors = valid_compressors(v) _options[key] = compressors unless compressors.empty? else _options[key] = v end else log_warn("Unsupported client option '#{k}'. It will be ignored.") end _options end end def valid_compressors(compressors) compressors.select do |compressor| if !VALID_COMPRESSORS.include?(compressor) log_warn("Unsupported compressor '#{compressor}' in list '#{compressors}'. " + "This compressor will not be used.") false else true end end end def validate_max_min_pool_size!(option, opts) if option == :min_pool_size && opts[:min_pool_size] max = opts[:max_pool_size] || Server::ConnectionPool::Queue::MAX_SIZE raise Error::InvalidMinPoolSize.new(opts[:min_pool_size], max) unless opts[:min_pool_size] <= max end true end end end