lib/elasticsearch/transport/transport/base.rb in elasticsearch-transport-1.0.16 vs lib/elasticsearch/transport/transport/base.rb in elasticsearch-transport-1.0.17

- old
+ new

@@ -16,11 +16,11 @@ attr_reader :hosts, :options, :connections, :counter, :last_request_at, :protocol attr_accessor :serializer, :sniffer, :logger, :tracer, :reload_connections, :reload_after, :resurrect_after, :max_retries - # Creates a new transport object. + # Creates a new transport object # # @param arguments [Hash] Settings and options for the transport # @param block [Proc] Lambda or Proc which can be evaluated in the context of the "session" object # # @option arguments [Array] :hosts An Array of normalized hosts information @@ -69,11 +69,11 @@ @counter_mtx.synchronize { @counter += 1 } reload_connections! if reload_connections && counter % reload_after == 0 connections.get_connection(options) end - # Reloads and replaces the connection collection based on cluster information. + # Reloads and replaces the connection collection based on cluster information # # @see Sniffer#hosts # def reload_connections! hosts = sniffer.hosts @@ -82,40 +82,86 @@ rescue SnifferTimeoutError logger.error "[SnifferTimeoutError] Timeout when reloading connections." if logger self end - # Tries to "resurrect" all eligible dead connections. + # Tries to "resurrect" all eligible dead connections # # @see Connections::Connection#resurrect! # def resurrect_dead_connections! connections.dead.each { |c| c.resurrect! } end - # Replaces the connections collection. + # Rebuilds the connections collection in the transport. # + # The methods *adds* new connections from the passed hosts to the collection, + # and *removes* all connections not contained in the passed hosts. + # + # @return [Connections::Collection] # @api private # def __rebuild_connections(arguments={}) @state_mutex.synchronize do @hosts = arguments[:hosts] || [] @options = arguments[:options] || {} + __close_connections - @connections = __build_connections + + new_connections = __build_connections + stale_connections = @connections.select { |c| ! new_connections.include?(c) } + new_connections = new_connections.reject { |c| @connections.include?(c) } + + @connections.remove(stale_connections) + @connections.add(new_connections) + @connections end end - # Closes the connections collection. + # Builds and returns a collection of connections # + # The adapters have to implement the {Base#__build_connection} method. + # + # @return [Connections::Collection] + # @api private + # + def __build_connections + Connections::Collection.new \ + :connections => hosts.map { |host| + host[:protocol] = host[:scheme] || options[:scheme] || options[:http][:scheme] || DEFAULT_PROTOCOL + host[:port] ||= options[:port] || options[:http][:scheme] || DEFAULT_PORT + if (options[:user] || options[:http][:user]) && !host[:user] + host[:user] ||= options[:user] || options[:http][:user] + host[:password] ||= options[:password] || options[:http][:password] + end + + __build_connection(host, (options[:transport_options] || {}), @block) + }, + :selector_class => options[:selector_class], + :selector => options[:selector] + end + + # @abstract Build and return a connection. + # A transport implementation *must* implement this method. + # See {HTTP::Faraday#__build_connection} for an example. + # + # @return [Connections::Connection] + # @api private + # + def __build_connection(host, options={}, block=nil) + raise NoMethodError, "Implement this method in your class" + end + + # Closes the connections collection + # # @api private # def __close_connections - # to be implemented by specific transports + # A hook point for specific adapters when they need to close connections end - # Log request and response information. + # Log request and response information # # @api private # def __log(method, path, params, body, url, response, json, took, duration) sanitized_url = url.to_s.gsub(/\/\/(.+):(.+)@/, '//' + '\1:' + SANITIZED_PASSWORD + '@') @@ -123,20 +169,22 @@ "[status:#{response.status}, request:#{sprintf('%.3fs', duration)}, query:#{took}]" logger.debug "> #{__convert_to_json(body)}" if body logger.debug "< #{response.body}" end - # Log failed request. + # Log failed request # # @api private + # def __log_failed(response) logger.fatal "[#{response.status}] #{response.body}" end - # Trace the request in the `curl` format. + # Trace the request in the `curl` format # # @api private + # def __trace(method, path, params, body, url, response, json, took, duration) trace_url = "http://localhost:9200/#{path}?pretty" + ( params.empty? ? '' : "&#{::Faraday::Utils::ParamsHash[params].to_query}" ) trace_body = body ? " -d '#{__convert_to_json(body, :pretty => true)}'" : '' tracer.info "curl -X #{method.to_s.upcase} '#{trace_url}'#{trace_body}\n" @@ -145,18 +193,20 @@ end # Raise error specific for the HTTP response status or a generic server error # # @api private + # def __raise_transport_error(response) error = ERRORS[response.status] || ServerError raise error.new "[#{response.status}] #{response.body}" end # Converts any non-String object to JSON # # @api private + # def __convert_to_json(o=nil, options={}) o = o.is_a?(String) ? o : serializer.dump(o, options) end # Returns a full URL based on information from host @@ -278,18 +328,9 @@ # # @return [Array] # def host_unreachable_exceptions [Errno::ECONNREFUSED] - end - - # @abstract A transport implementation must implement this method. - # See {HTTP::Faraday#__build_connections} for an example. - # - # @return [Connections::Collection] - # @api private - def __build_connections - raise NoMethodError, "Implement this method in your class" end end end end end