lib/logstash/inputs/elasticsearch.rb in logstash-input-elasticsearch-4.0.0 vs lib/logstash/inputs/elasticsearch.rb in logstash-input-elasticsearch-4.0.1

- old
+ new

@@ -1,8 +1,10 @@ # encoding: utf-8 require "logstash/inputs/base" require "logstash/namespace" +require 'elasticsearch' +require 'elasticsearch/transport/transport/http/manticore' require "base64" # Read from an Elasticsearch cluster, based on search query results. # This is useful for replaying test logs, reindexing, etc. # @@ -99,28 +101,28 @@ config :password, :validate => :password # SSL config :ssl, :validate => :boolean, :default => false - # SSL Certificate Authority file + # SSL Certificate Authority file in PEM encoded format, must also include any chain certificates as necessary config :ca_file, :validate => :path def register - require "elasticsearch" - @options = { :index => @index, :body => @query, :scroll => @scroll, :size => @size } transport_options = {} if @user && @password - token = Base64.strict_encode64("#{@user}:#{@password.value}") - transport_options[:headers] = { :Authorization => "Basic #{token}" } + transport_options[:auth] = { + :user => @user, + :password => @password.value + } end hosts = if @ssl then @hosts.map do |h| host, port = h.split(":") @@ -128,14 +130,20 @@ end else @hosts end + client_options = { + :hosts => hosts, + :transport_options => transport_options, + :transport_class => Elasticsearch::Transport::Transport::HTTP::Manticore + } + if @ssl && @ca_file - transport_options[:ssl] = { :ca_file => @ca_file } + client_options[:ssl] = { :enabled => true, :ca_file => @ca_file } end - - @client = Elasticsearch::Client.new(:hosts => hosts, :transport_options => transport_options) + + @client = Elasticsearch::Client.new(client_options) end def run(output_queue) # get first wave of data r = @client.search(@options)