lib/fluent/plugin/out_elasticsearch_dynamic.rb in fluent-plugin-elasticsearch-2.12.5 vs lib/fluent/plugin/out_elasticsearch_dynamic.rb in fluent-plugin-elasticsearch-3.0.0
- old
+ new
@@ -48,11 +48,10 @@
transport = Elasticsearch::Transport::Transport::HTTP::Faraday.new(connection_options.merge(
options: {
reload_connections: @reload_connections,
reload_on_failure: @reload_on_failure,
resurrect_after: @resurrect_after,
- retry_on_failure: 5,
logger: @transport_logger,
transport_options: {
headers: { 'Content-Type' => @content_type.to_s },
request: { timeout: @request_timeout },
ssl: { verify: @ssl_verify, ca_file: @ca_file, version: @ssl_version }
@@ -60,20 +59,11 @@
http: {
user: @user,
password: @password
}
}), &adapter_conf)
- es = Elasticsearch::Client.new transport: transport
-
- begin
- raise ConnectionFailure, "Can not reach Elasticsearch cluster (#{connection_options_description(host)})!" unless es.ping
- rescue *es.transport.host_unreachable_exceptions => e
- raise ConnectionFailure, "Can not reach Elasticsearch cluster (#{connection_options_description(host)})! #{e.message}"
- end
-
- log.info "Connection opened to Elasticsearch cluster => #{connection_options_description(host)}"
- es
+ Elasticsearch::Client.new transport: transport
end
end
def get_connection_options(con_host)
raise "`password` must be present if `user` is present" if @user && !@password
@@ -218,27 +208,18 @@
msgs.clear
end
end
def send_bulk(data, host, index)
- retries = 0
begin
response = client(host).bulk body: data, index: index
if response['errors']
log.error "Could not push log to Elasticsearch: #{response}"
end
- rescue *client(host).transport.host_unreachable_exceptions => e
- if retries < 2
- retries += 1
- @_es = nil
- log.warn "Could not push logs to Elasticsearch, resetting connection and trying again. #{e.message}"
- sleep 2**retries
- retry
- end
- raise ConnectionRetryFailure, "Could not push logs to Elasticsearch after #{retries} retries. #{e.message}"
- rescue Exception
+ rescue => e
@_es = nil if @reconnect_on_error
- raise
+ # FIXME: identify unrecoverable errors and raise UnrecoverableRequestFailure instead
+ raise RecoverableRequestFailure, "could not push logs to Elasticsearch cluster (#{connection_options_description(host)}): #{e.message}"
end
end
def eval_or_val(var)
return var unless var.is_a?(String)