lib/logstash/inputs/elasticsearch.rb in logstash-input-elasticsearch-0.1.0 vs lib/logstash/inputs/elasticsearch.rb in logstash-input-elasticsearch-0.1.1

- old
+ new

@@ -1,37 +1,36 @@ # encoding: utf-8 require "logstash/inputs/base" require "logstash/namespace" -require "logstash/util/socket_peer" -require "logstash/json" +require "base64" # Read from an Elasticsearch cluster, based on search query results. # This is useful for replaying test logs, reindexing, etc. # # Example: -# +# [source,ruby] # input { # # Read all documents from Elasticsearch matching the given query # elasticsearch { # host => "localhost" -# query => "ERROR" +# query => '{ "query": { "match": { "statuscode": 200 } } }' # } # } # # This would create an Elasticsearch query with the following format: +# [source,json] +# http://localhost:9200/logstash-*/_search?q='{ "query": { "match": { "statuscode": 200 } } }'&scroll=1m&size=1000 # -# http://localhost:9200/logstash-*/_search?q=ERROR&scroll=1m&size=1000 -# -# * TODO(sissel): Option to keep the index, type, and doc id so we can do reindexing? +# TODO(sissel): Option to keep the index, type, and doc id so we can do reindexing? class LogStash::Inputs::Elasticsearch < LogStash::Inputs::Base config_name "elasticsearch" milestone 1 default :codec, "json" - # The IP address or hostname of your Elasticsearch server. - config :host, :validate => :string, :required => true + # List of elasticsearch hosts to use for querying. + config :hosts, :validate => :array # The HTTP port of your Elasticsearch server's REST interface. config :port, :validate => :number, :default => 9200 # The index or alias to search. @@ -50,85 +49,76 @@ # This parameter controls the keepalive time in seconds of the scrolling # request and initiates the scrolling process. The timeout applies per # round trip (i.e. between the previous scan scroll request, to the next). config :scroll, :validate => :string, :default => "1m" + # Basic Auth - username + config :user, :validate => :string + + # Basic Auth - password + config :password, :validate => :password + + # SSL + config :ssl, :validate => :boolean, :default => false + + # SSL Certificate Authority file + config :ca_file, :validate => :path + public def register - require "ftw" - @agent = FTW::Agent.new + require "elasticsearch" - params = { - "q" => @query, - "scroll" => @scroll, - "size" => "#{@size}", + @options = { + index: @index, + body: @query, + scroll: @scroll, + size: @size } - params['search_type'] = "scan" if @scan - @search_url = "http://#{@host}:#{@port}/#{@index}/_search?#{encode(params)}" - @scroll_url = "http://#{@host}:#{@port}/_search/scroll?#{encode({"scroll" => @scroll})}" - end # def register + @options[:search_type] = 'scan' if @scan - private - def encode(hash) - return hash.collect do |key, value| - CGI.escape(key) + "=" + CGI.escape(value) - end.join("&") - end # def encode + transport_options = {} - private - def execute_search_request - response = @agent.get!(@search_url) - json = "" - response.read_body { |c| json << c } - json - end + if @user && @password + token = Base64.strict_encode64("#{@user}:#{@password.value}") + transport_options[:headers] = { Authorization: "Basic #{token}" } + end - private - def execute_scroll_request(scroll_id) - response = @agent.post!(@scroll_url, :body => scroll_id) - json = "" - response.read_body { |c| json << c } - json - end + hosts = if @ssl then + @hosts.map {|h| { host: h, scheme: 'https' } } + else + @hosts + end - public - def run(output_queue) - result = LogStash::Json.load(execute_search_request) - scroll_id = result["_scroll_id"] - - # When using the search_type=scan we don't get an initial result set. - # So we do it here. - if @scan - result = LogStash::Json.load(execute_scroll_request(scroll_id)) + if @ssl && @ca_file + transport_options[:ssl] = { ca_file: @ca_file } end - loop do - break if result.nil? - hits = result["hits"]["hits"] - break if hits.empty? + @client = Elasticsearch::Client.new hosts: hosts, transport_options: transport_options - hits.each do |hit| - # Hack to make codecs work - @codec.decode(LogStash::Json.dump(hit["_source"])) do |event| - decorate(event) - output_queue << event - end - end + end # def register - # Get the scroll id from the previous result set and use it for getting the next data set - scroll_id = result["_scroll_id"] + public + def run(output_queue) - # Fetch the next result set - result = LogStash::Json.load(execute_scroll_request(scroll_id)) + # get first wave of data + r = @client.search @options - if result["error"] - @logger.warn(result["error"], :request => scroll_url) - # TODO(sissel): raise an error instead of breaking - break - end + # since 'scan' doesn't return data on the search call, do an extra scroll + if @scan + r = scroll_request(r['_scroll_id']) + end + while r['hits']['hits'].any? do + r['hits']['hits'].each do |event| + decorate(event) + output_queue << event + end + r = scroll_request(r['_scroll_id']) end - rescue LogStash::ShutdownSignal - # Do nothing, let us quit. end # def run + + private + def scroll_request scroll_id + @client.scroll(body: scroll_id, scroll: @scroll) + end end # class LogStash::Inputs::Elasticsearch