lib/fluent/plugin/out_elasticsearch_dynamic.rb in fluent-plugin-elasticsearch-2.12.5 vs lib/fluent/plugin/out_elasticsearch_dynamic.rb in fluent-plugin-elasticsearch-3.0.0

- old
+ new

@@ -48,11 +48,10 @@ transport = Elasticsearch::Transport::Transport::HTTP::Faraday.new(connection_options.merge( options: { reload_connections: @reload_connections, reload_on_failure: @reload_on_failure, resurrect_after: @resurrect_after, - retry_on_failure: 5, logger: @transport_logger, transport_options: { headers: { 'Content-Type' => @content_type.to_s }, request: { timeout: @request_timeout }, ssl: { verify: @ssl_verify, ca_file: @ca_file, version: @ssl_version } @@ -60,20 +59,11 @@ http: { user: @user, password: @password } }), &adapter_conf) - es = Elasticsearch::Client.new transport: transport - - begin - raise ConnectionFailure, "Can not reach Elasticsearch cluster (#{connection_options_description(host)})!" unless es.ping - rescue *es.transport.host_unreachable_exceptions => e - raise ConnectionFailure, "Can not reach Elasticsearch cluster (#{connection_options_description(host)})! #{e.message}" - end - - log.info "Connection opened to Elasticsearch cluster => #{connection_options_description(host)}" - es + Elasticsearch::Client.new transport: transport end end def get_connection_options(con_host) raise "`password` must be present if `user` is present" if @user && !@password @@ -218,27 +208,18 @@ msgs.clear end end def send_bulk(data, host, index) - retries = 0 begin response = client(host).bulk body: data, index: index if response['errors'] log.error "Could not push log to Elasticsearch: #{response}" end - rescue *client(host).transport.host_unreachable_exceptions => e - if retries < 2 - retries += 1 - @_es = nil - log.warn "Could not push logs to Elasticsearch, resetting connection and trying again. #{e.message}" - sleep 2**retries - retry - end - raise ConnectionRetryFailure, "Could not push logs to Elasticsearch after #{retries} retries. #{e.message}" - rescue Exception + rescue => e @_es = nil if @reconnect_on_error - raise + # FIXME: identify unrecoverable errors and raise UnrecoverableRequestFailure instead + raise RecoverableRequestFailure, "could not push logs to Elasticsearch cluster (#{connection_options_description(host)}): #{e.message}" end end def eval_or_val(var) return var unless var.is_a?(String)