lib/fluent/plugin/out_elasticsearch_dynamic.rb in fluent-plugin-elasticsearch-3.6.0 vs lib/fluent/plugin/out_elasticsearch_dynamic.rb in fluent-plugin-elasticsearch-3.6.1

- old
+ new

@@ -42,25 +42,32 @@ @_es = nil unless is_existing_connection(connection_options[:hosts]) @_es ||= begin @current_config = connection_options[:hosts].clone adapter_conf = lambda {|f| f.adapter @http_backend, @backend_options } + gzip_headers = if compression + {'Content-Encoding' => 'gzip'} + else + {} + end + headers = { 'Content-Type' => @content_type.to_s, }.merge(gzip_headers) transport = Elasticsearch::Transport::Transport::HTTP::Faraday.new(connection_options.merge( options: { reload_connections: @reload_connections, reload_on_failure: @reload_on_failure, resurrect_after: @resurrect_after, logger: @transport_logger, transport_options: { - headers: { 'Content-Type' => @content_type.to_s }, + headers: headers, request: { timeout: @request_timeout }, ssl: { verify: @ssl_verify, ca_file: @ca_file, version: @ssl_version } }, http: { user: @user, password: @password - } + }, + compression: compression, }), &adapter_conf) Elasticsearch::Client.new transport: transport end end @@ -208,10 +215,15 @@ end end def send_bulk(data, host, index) begin - response = client(host).bulk body: data, index: index + prepared_data = if compression + gzip(data) + else + data + end + response = client(host).bulk body: prepared_data, index: index if response['errors'] log.error "Could not push log to Elasticsearch: #{response}" end rescue => e @_es = nil if @reconnect_on_error