lib/logstash/inputs/elasticsearch.rb in logstash-input-elasticsearch-0.1.1 vs lib/logstash/inputs/elasticsearch.rb in logstash-input-elasticsearch-0.1.2
- old
+ new
@@ -16,16 +16,20 @@
# }
# }
#
# This would create an Elasticsearch query with the following format:
# [source,json]
-# http://localhost:9200/logstash-*/_search?q='{ "query": { "match": { "statuscode": 200 } } }'&scroll=1m&size=1000
+# curl 'http://localhost:9200/logstash-*/_search?&scroll=1m&size=1000' -d '{
+# "query": {
+# "match": {
+# "statuscode": 200
+# }
+# }
+# }'
#
-# TODO(sissel): Option to keep the index, type, and doc id so we can do reindexing?
class LogStash::Inputs::Elasticsearch < LogStash::Inputs::Base
config_name "elasticsearch"
- milestone 1
default :codec, "json"
# List of elasticsearch hosts to use for querying.
config :hosts, :validate => :array
@@ -35,11 +39,11 @@
# The index or alias to search.
config :index, :validate => :string, :default => "logstash-*"
# The query to be executed.
- config :query, :validate => :string, :default => "*"
+ config :query, :validate => :string, :default => '{"query": { "match_all": {} } }'
# Enable the Elasticsearch "scan" search type. This will disable
# sorting but increase speed and performance.
config :scan, :validate => :boolean, :default => true
@@ -49,10 +53,49 @@
# This parameter controls the keepalive time in seconds of the scrolling
# request and initiates the scrolling process. The timeout applies per
# round trip (i.e. between the previous scan scroll request, to the next).
config :scroll, :validate => :string, :default => "1m"
+ # If set, include Elasticsearch document information such as index, type, and
+ # the id in the event.
+ #
+ # It might be important to note, with regards to metadata, that if you're
+ # ingesting documents with the intent to re-index them (or just update them)
+ # that the `action` option in the elasticsearch output want's to know how to
+ # handle those things. It can be dynamically assigned with a field
+ # added to the metadata.
+ #
+ # Example
+ # [source, ruby]
+ # input {
+ # elasticsearch {
+ # host => "es.production.mysite.org"
+ # index => "mydata-2018.09.*"
+ # query => "*"
+ # size => 500
+ # scroll => "5m"
+ # docinfo => true
+ # }
+ # }
+ # output {
+ # elasticsearch {
+ # index => "copy-of-production.%{[@metadata][_index]}"
+ # index_type => "%{[@metadata][_type]}"
+ # document_id => "%{[@metadata][_id]}"
+ # }
+ # }
+ #
+ config :docinfo, :validate => :boolean, :default => false
+
+ # Where to move the Elasticsearch document information by default we use the @metadata field.
+ config :docinfo_target, :validate=> :string, :default => "@metadata"
+
+ # List of document metadata to move to the `docinfo_target` field
+ # To learn more about Elasticsearch metadata fields read
+ # http://www.elasticsearch.org/guide/en/elasticsearch/guide/current/_document_metadata.html
+ config :docinfo_fields, :validate => :array, :default => ['_index', '_type', '_id']
+
# Basic Auth - username
config :user, :validate => :string
# Basic Auth - password
config :password, :validate => :password
@@ -66,59 +109,74 @@
public
def register
require "elasticsearch"
@options = {
- index: @index,
- body: @query,
- scroll: @scroll,
- size: @size
+ :index => @index,
+ :body => @query,
+ :scroll => @scroll,
+ :size => @size
}
@options[:search_type] = 'scan' if @scan
transport_options = {}
if @user && @password
token = Base64.strict_encode64("#{@user}:#{@password.value}")
- transport_options[:headers] = { Authorization: "Basic #{token}" }
+ transport_options[:headers] = { :Authorization => "Basic #{token}" }
end
hosts = if @ssl then
- @hosts.map {|h| { host: h, scheme: 'https' } }
+ @hosts.map { |h| { :host => h, :scheme => 'https' } }
else
@hosts
end
if @ssl && @ca_file
- transport_options[:ssl] = { ca_file: @ca_file }
+ transport_options[:ssl] = { :ca_file => @ca_file }
end
- @client = Elasticsearch::Client.new hosts: hosts, transport_options: transport_options
-
- end # def register
-
+ @client = Elasticsearch::Client.new(:hosts => hosts, :transport_options => transport_options)
+ end
+
public
def run(output_queue)
# get first wave of data
- r = @client.search @options
+ r = @client.search(@options)
# since 'scan' doesn't return data on the search call, do an extra scroll
if @scan
r = scroll_request(r['_scroll_id'])
end
while r['hits']['hits'].any? do
- r['hits']['hits'].each do |event|
+ r['hits']['hits'].each do |hit|
+ event = LogStash::Event.new(hit['_source'])
decorate(event)
+
+ if @docinfo
+ event[@docinfo_target] ||= {}
+
+ unless event[@docinfo_target].is_a?(Hash)
+ @logger.error("Elasticsearch Input: Incompatible Event, incompatible type for the `@metadata` field in the `_source` document, expected a hash got:", :metadata_type => event[@docinfo_target].class)
+
+ raise Exception.new("Elasticsearch input: incompatible event")
+ end
+
+ @docinfo_fields.each do |field|
+ event[@docinfo_target][field] = hit[field]
+ end
+ end
+
output_queue << event
end
r = scroll_request(r['_scroll_id'])
end
end # def run
private
def scroll_request scroll_id
- @client.scroll(body: scroll_id, scroll: @scroll)
+ @client.scroll(:body => scroll_id, :scroll => @scroll)
end
end # class LogStash::Inputs::Elasticsearch