lib/fluent/plugin/out_elasticsearch_dynamic.rb in fluent-plugin-elasticsearch-3.6.0 vs lib/fluent/plugin/out_elasticsearch_dynamic.rb in fluent-plugin-elasticsearch-3.6.1
- old
+ new
@@ -42,25 +42,32 @@
@_es = nil unless is_existing_connection(connection_options[:hosts])
@_es ||= begin
@current_config = connection_options[:hosts].clone
adapter_conf = lambda {|f| f.adapter @http_backend, @backend_options }
+ gzip_headers = if compression
+ {'Content-Encoding' => 'gzip'}
+ else
+ {}
+ end
+ headers = { 'Content-Type' => @content_type.to_s, }.merge(gzip_headers)
transport = Elasticsearch::Transport::Transport::HTTP::Faraday.new(connection_options.merge(
options: {
reload_connections: @reload_connections,
reload_on_failure: @reload_on_failure,
resurrect_after: @resurrect_after,
logger: @transport_logger,
transport_options: {
- headers: { 'Content-Type' => @content_type.to_s },
+ headers: headers,
request: { timeout: @request_timeout },
ssl: { verify: @ssl_verify, ca_file: @ca_file, version: @ssl_version }
},
http: {
user: @user,
password: @password
- }
+ },
+ compression: compression,
}), &adapter_conf)
Elasticsearch::Client.new transport: transport
end
end
@@ -208,10 +215,15 @@
end
end
def send_bulk(data, host, index)
begin
- response = client(host).bulk body: data, index: index
+ prepared_data = if compression
+ gzip(data)
+ else
+ data
+ end
+ response = client(host).bulk body: prepared_data, index: index
if response['errors']
log.error "Could not push log to Elasticsearch: #{response}"
end
rescue => e
@_es = nil if @reconnect_on_error