lib/logstash/inputs/elasticsearch.rb in logstash-input-elasticsearch-0.1.0 vs lib/logstash/inputs/elasticsearch.rb in logstash-input-elasticsearch-0.1.1
- old
+ new
@@ -1,37 +1,36 @@
# encoding: utf-8
require "logstash/inputs/base"
require "logstash/namespace"
-require "logstash/util/socket_peer"
-require "logstash/json"
+require "base64"
# Read from an Elasticsearch cluster, based on search query results.
# This is useful for replaying test logs, reindexing, etc.
#
# Example:
-#
+# [source,ruby]
# input {
# # Read all documents from Elasticsearch matching the given query
# elasticsearch {
# host => "localhost"
-# query => "ERROR"
+# query => '{ "query": { "match": { "statuscode": 200 } } }'
# }
# }
#
# This would create an Elasticsearch query with the following format:
+# [source,json]
+# http://localhost:9200/logstash-*/_search?q='{ "query": { "match": { "statuscode": 200 } } }'&scroll=1m&size=1000
#
-# http://localhost:9200/logstash-*/_search?q=ERROR&scroll=1m&size=1000
-#
-# * TODO(sissel): Option to keep the index, type, and doc id so we can do reindexing?
+# TODO(sissel): Option to keep the index, type, and doc id so we can do reindexing?
class LogStash::Inputs::Elasticsearch < LogStash::Inputs::Base
config_name "elasticsearch"
milestone 1
default :codec, "json"
- # The IP address or hostname of your Elasticsearch server.
- config :host, :validate => :string, :required => true
+ # List of elasticsearch hosts to use for querying.
+ config :hosts, :validate => :array
# The HTTP port of your Elasticsearch server's REST interface.
config :port, :validate => :number, :default => 9200
# The index or alias to search.
@@ -50,85 +49,76 @@
# This parameter controls the keepalive time in seconds of the scrolling
# request and initiates the scrolling process. The timeout applies per
# round trip (i.e. between the previous scan scroll request, to the next).
config :scroll, :validate => :string, :default => "1m"
+ # Basic Auth - username
+ config :user, :validate => :string
+
+ # Basic Auth - password
+ config :password, :validate => :password
+
+ # SSL
+ config :ssl, :validate => :boolean, :default => false
+
+ # SSL Certificate Authority file
+ config :ca_file, :validate => :path
+
public
def register
- require "ftw"
- @agent = FTW::Agent.new
+ require "elasticsearch"
- params = {
- "q" => @query,
- "scroll" => @scroll,
- "size" => "#{@size}",
+ @options = {
+ index: @index,
+ body: @query,
+ scroll: @scroll,
+ size: @size
}
- params['search_type'] = "scan" if @scan
- @search_url = "http://#{@host}:#{@port}/#{@index}/_search?#{encode(params)}"
- @scroll_url = "http://#{@host}:#{@port}/_search/scroll?#{encode({"scroll" => @scroll})}"
- end # def register
+ @options[:search_type] = 'scan' if @scan
- private
- def encode(hash)
- return hash.collect do |key, value|
- CGI.escape(key) + "=" + CGI.escape(value)
- end.join("&")
- end # def encode
+ transport_options = {}
- private
- def execute_search_request
- response = @agent.get!(@search_url)
- json = ""
- response.read_body { |c| json << c }
- json
- end
+ if @user && @password
+ token = Base64.strict_encode64("#{@user}:#{@password.value}")
+ transport_options[:headers] = { Authorization: "Basic #{token}" }
+ end
- private
- def execute_scroll_request(scroll_id)
- response = @agent.post!(@scroll_url, :body => scroll_id)
- json = ""
- response.read_body { |c| json << c }
- json
- end
+ hosts = if @ssl then
+ @hosts.map {|h| { host: h, scheme: 'https' } }
+ else
+ @hosts
+ end
- public
- def run(output_queue)
- result = LogStash::Json.load(execute_search_request)
- scroll_id = result["_scroll_id"]
-
- # When using the search_type=scan we don't get an initial result set.
- # So we do it here.
- if @scan
- result = LogStash::Json.load(execute_scroll_request(scroll_id))
+ if @ssl && @ca_file
+ transport_options[:ssl] = { ca_file: @ca_file }
end
- loop do
- break if result.nil?
- hits = result["hits"]["hits"]
- break if hits.empty?
+ @client = Elasticsearch::Client.new hosts: hosts, transport_options: transport_options
- hits.each do |hit|
- # Hack to make codecs work
- @codec.decode(LogStash::Json.dump(hit["_source"])) do |event|
- decorate(event)
- output_queue << event
- end
- end
+ end # def register
- # Get the scroll id from the previous result set and use it for getting the next data set
- scroll_id = result["_scroll_id"]
+ public
+ def run(output_queue)
- # Fetch the next result set
- result = LogStash::Json.load(execute_scroll_request(scroll_id))
+ # get first wave of data
+ r = @client.search @options
- if result["error"]
- @logger.warn(result["error"], :request => scroll_url)
- # TODO(sissel): raise an error instead of breaking
- break
- end
+ # since 'scan' doesn't return data on the search call, do an extra scroll
+ if @scan
+ r = scroll_request(r['_scroll_id'])
+ end
+ while r['hits']['hits'].any? do
+ r['hits']['hits'].each do |event|
+ decorate(event)
+ output_queue << event
+ end
+ r = scroll_request(r['_scroll_id'])
end
- rescue LogStash::ShutdownSignal
- # Do nothing, let us quit.
end # def run
+
+ private
+ def scroll_request scroll_id
+ @client.scroll(body: scroll_id, scroll: @scroll)
+ end
end # class LogStash::Inputs::Elasticsearch