lib/logstash/inputs/elasticsearch.rb in logstash-input-elasticsearch-4.1.1 vs lib/logstash/inputs/elasticsearch.rb in logstash-input-elasticsearch-4.2.0

- old
+ new

@@ -13,10 +13,12 @@ # # ================================================================================ # # Read from an Elasticsearch cluster, based on search query results. # This is useful for replaying test logs, reindexing, etc. +# It also supports periodically scheduling lookup enrichments +# using a cron syntax (see `schedule` setting). # # Example: # [source,ruby] # input { # # Read all documents from Elasticsearch matching the given query @@ -35,10 +37,28 @@ # } # }, # "sort": [ "_doc" ] # }' # +# ==== Scheduling +# +# Input from this plugin can be scheduled to run periodically according to a specific +# schedule. This scheduling syntax is powered by https://github.com/jmettraux/rufus-scheduler[rufus-scheduler]. +# The syntax is cron-like with some extensions specific to Rufus (e.g. timezone support ). +# +# Examples: +# +# |========================================================== +# | `* 5 * 1-3 *` | will execute every minute of 5am every day of January through March. +# | `0 * * * *` | will execute on the 0th minute of every hour every day. +# | `0 6 * * * America/Chicago` | will execute at 6:00am (UTC/GMT -5) every day. +# |========================================================== +# +# +# Further documentation describing this syntax can be found https://github.com/jmettraux/rufus-scheduler#parsing-cronlines-and-time-strings[here]. +# +# class LogStash::Inputs::Elasticsearch < LogStash::Inputs::Base config_name "elasticsearch" default :codec, "json" @@ -112,12 +132,20 @@ config :ssl, :validate => :boolean, :default => false # SSL Certificate Authority file in PEM encoded format, must also include any chain certificates as necessary config :ca_file, :validate => :path + # Schedule of when to periodically run statement, in Cron format + # for example: "* * * * *" (execute query every minute, on the minute) + # + # There is no schedule by default. If no schedule is given, then the statement is run + # exactly once. + config :schedule, :validate => :string + def register require "elasticsearch" + require "rufus/scheduler" @options = { :index => @index, :body => @query, :scroll => @scroll, @@ -145,11 +173,31 @@ end @client = Elasticsearch::Client.new(:hosts => hosts, :transport_options => transport_options) end + def run(output_queue) + if @schedule + @scheduler = Rufus::Scheduler.new(:max_work_threads => 1) + @scheduler.cron @schedule do + do_run(output_queue) + end + + @scheduler.join + else + do_run(output_queue) + end + end + + def stop + @scheduler.stop if @scheduler + end + + private + + def do_run(output_queue) # get first wave of data r = @client.search(@options) r['hits']['hits'].each { |hit| push_hit(hit, output_queue) } has_hits = r['hits']['hits'].any? @@ -157,11 +205,9 @@ while has_hits && !stop? r = process_next_scroll(output_queue, r['_scroll_id']) has_hits = r['has_hits'] end end - - private def process_next_scroll(output_queue, scroll_id) r = scroll_request(scroll_id) r['hits']['hits'].each { |hit| push_hit(hit, output_queue) } {'has_hits' => r['hits']['hits'].any?, '_scroll_id' => r['_scroll_id']}