lib/logstash/inputs/elasticsearch.rb in logstash-input-elasticsearch-4.0.0 vs lib/logstash/inputs/elasticsearch.rb in logstash-input-elasticsearch-4.0.1
- old
+ new
@@ -1,8 +1,10 @@
# encoding: utf-8
require "logstash/inputs/base"
require "logstash/namespace"
+require 'elasticsearch'
+require 'elasticsearch/transport/transport/http/manticore'
require "base64"
# Read from an Elasticsearch cluster, based on search query results.
# This is useful for replaying test logs, reindexing, etc.
#
@@ -99,28 +101,28 @@
config :password, :validate => :password
# SSL
config :ssl, :validate => :boolean, :default => false
- # SSL Certificate Authority file
+ # SSL Certificate Authority file in PEM encoded format, must also include any chain certificates as necessary
config :ca_file, :validate => :path
def register
- require "elasticsearch"
-
@options = {
:index => @index,
:body => @query,
:scroll => @scroll,
:size => @size
}
transport_options = {}
if @user && @password
- token = Base64.strict_encode64("#{@user}:#{@password.value}")
- transport_options[:headers] = { :Authorization => "Basic #{token}" }
+ transport_options[:auth] = {
+ :user => @user,
+ :password => @password.value
+ }
end
hosts = if @ssl then
@hosts.map do |h|
host, port = h.split(":")
@@ -128,14 +130,20 @@
end
else
@hosts
end
+ client_options = {
+ :hosts => hosts,
+ :transport_options => transport_options,
+ :transport_class => Elasticsearch::Transport::Transport::HTTP::Manticore
+ }
+
if @ssl && @ca_file
- transport_options[:ssl] = { :ca_file => @ca_file }
+ client_options[:ssl] = { :enabled => true, :ca_file => @ca_file }
end
-
- @client = Elasticsearch::Client.new(:hosts => hosts, :transport_options => transport_options)
+
+ @client = Elasticsearch::Client.new(client_options)
end
def run(output_queue)
# get first wave of data
r = @client.search(@options)