lib/elasticsearch/transport/transport/base.rb in elasticsearch-transport-1.0.16 vs lib/elasticsearch/transport/transport/base.rb in elasticsearch-transport-1.0.17
- old
+ new
@@ -16,11 +16,11 @@
attr_reader :hosts, :options, :connections, :counter, :last_request_at, :protocol
attr_accessor :serializer, :sniffer, :logger, :tracer,
:reload_connections, :reload_after,
:resurrect_after, :max_retries
- # Creates a new transport object.
+ # Creates a new transport object
#
# @param arguments [Hash] Settings and options for the transport
# @param block [Proc] Lambda or Proc which can be evaluated in the context of the "session" object
#
# @option arguments [Array] :hosts An Array of normalized hosts information
@@ -69,11 +69,11 @@
@counter_mtx.synchronize { @counter += 1 }
reload_connections! if reload_connections && counter % reload_after == 0
connections.get_connection(options)
end
- # Reloads and replaces the connection collection based on cluster information.
+ # Reloads and replaces the connection collection based on cluster information
#
# @see Sniffer#hosts
#
def reload_connections!
hosts = sniffer.hosts
@@ -82,40 +82,86 @@
rescue SnifferTimeoutError
logger.error "[SnifferTimeoutError] Timeout when reloading connections." if logger
self
end
- # Tries to "resurrect" all eligible dead connections.
+ # Tries to "resurrect" all eligible dead connections
#
# @see Connections::Connection#resurrect!
#
def resurrect_dead_connections!
connections.dead.each { |c| c.resurrect! }
end
- # Replaces the connections collection.
+ # Rebuilds the connections collection in the transport.
#
+ # The methods *adds* new connections from the passed hosts to the collection,
+ # and *removes* all connections not contained in the passed hosts.
+ #
+ # @return [Connections::Collection]
# @api private
#
def __rebuild_connections(arguments={})
@state_mutex.synchronize do
@hosts = arguments[:hosts] || []
@options = arguments[:options] || {}
+
__close_connections
- @connections = __build_connections
+
+ new_connections = __build_connections
+ stale_connections = @connections.select { |c| ! new_connections.include?(c) }
+ new_connections = new_connections.reject { |c| @connections.include?(c) }
+
+ @connections.remove(stale_connections)
+ @connections.add(new_connections)
+ @connections
end
end
- # Closes the connections collection.
+ # Builds and returns a collection of connections
#
+ # The adapters have to implement the {Base#__build_connection} method.
+ #
+ # @return [Connections::Collection]
+ # @api private
+ #
+ def __build_connections
+ Connections::Collection.new \
+ :connections => hosts.map { |host|
+ host[:protocol] = host[:scheme] || options[:scheme] || options[:http][:scheme] || DEFAULT_PROTOCOL
+ host[:port] ||= options[:port] || options[:http][:scheme] || DEFAULT_PORT
+ if (options[:user] || options[:http][:user]) && !host[:user]
+ host[:user] ||= options[:user] || options[:http][:user]
+ host[:password] ||= options[:password] || options[:http][:password]
+ end
+
+ __build_connection(host, (options[:transport_options] || {}), @block)
+ },
+ :selector_class => options[:selector_class],
+ :selector => options[:selector]
+ end
+
+ # @abstract Build and return a connection.
+ # A transport implementation *must* implement this method.
+ # See {HTTP::Faraday#__build_connection} for an example.
+ #
+ # @return [Connections::Connection]
+ # @api private
+ #
+ def __build_connection(host, options={}, block=nil)
+ raise NoMethodError, "Implement this method in your class"
+ end
+
+ # Closes the connections collection
+ #
# @api private
#
def __close_connections
- # to be implemented by specific transports
+ # A hook point for specific adapters when they need to close connections
end
- # Log request and response information.
+ # Log request and response information
#
# @api private
#
def __log(method, path, params, body, url, response, json, took, duration)
sanitized_url = url.to_s.gsub(/\/\/(.+):(.+)@/, '//' + '\1:' + SANITIZED_PASSWORD + '@')
@@ -123,20 +169,22 @@
"[status:#{response.status}, request:#{sprintf('%.3fs', duration)}, query:#{took}]"
logger.debug "> #{__convert_to_json(body)}" if body
logger.debug "< #{response.body}"
end
- # Log failed request.
+ # Log failed request
#
# @api private
+ #
def __log_failed(response)
logger.fatal "[#{response.status}] #{response.body}"
end
- # Trace the request in the `curl` format.
+ # Trace the request in the `curl` format
#
# @api private
+ #
def __trace(method, path, params, body, url, response, json, took, duration)
trace_url = "http://localhost:9200/#{path}?pretty" +
( params.empty? ? '' : "&#{::Faraday::Utils::ParamsHash[params].to_query}" )
trace_body = body ? " -d '#{__convert_to_json(body, :pretty => true)}'" : ''
tracer.info "curl -X #{method.to_s.upcase} '#{trace_url}'#{trace_body}\n"
@@ -145,18 +193,20 @@
end
# Raise error specific for the HTTP response status or a generic server error
#
# @api private
+ #
def __raise_transport_error(response)
error = ERRORS[response.status] || ServerError
raise error.new "[#{response.status}] #{response.body}"
end
# Converts any non-String object to JSON
#
# @api private
+ #
def __convert_to_json(o=nil, options={})
o = o.is_a?(String) ? o : serializer.dump(o, options)
end
# Returns a full URL based on information from host
@@ -278,18 +328,9 @@
#
# @return [Array]
#
def host_unreachable_exceptions
[Errno::ECONNREFUSED]
- end
-
- # @abstract A transport implementation must implement this method.
- # See {HTTP::Faraday#__build_connections} for an example.
- #
- # @return [Connections::Collection]
- # @api private
- def __build_connections
- raise NoMethodError, "Implement this method in your class"
end
end
end
end
end