lib/logstash/inputs/elasticsearch.rb in logstash-input-elasticsearch-4.1.1 vs lib/logstash/inputs/elasticsearch.rb in logstash-input-elasticsearch-4.2.0
- old
+ new
@@ -13,10 +13,12 @@
#
# ================================================================================
#
# Read from an Elasticsearch cluster, based on search query results.
# This is useful for replaying test logs, reindexing, etc.
+# It also supports periodically scheduling lookup enrichments
+# using a cron syntax (see `schedule` setting).
#
# Example:
# [source,ruby]
# input {
# # Read all documents from Elasticsearch matching the given query
@@ -35,10 +37,28 @@
# }
# },
# "sort": [ "_doc" ]
# }'
#
+# ==== Scheduling
+#
+# Input from this plugin can be scheduled to run periodically according to a specific
+# schedule. This scheduling syntax is powered by https://github.com/jmettraux/rufus-scheduler[rufus-scheduler].
+# The syntax is cron-like with some extensions specific to Rufus (e.g. timezone support ).
+#
+# Examples:
+#
+# |==========================================================
+# | `* 5 * 1-3 *` | will execute every minute of 5am every day of January through March.
+# | `0 * * * *` | will execute on the 0th minute of every hour every day.
+# | `0 6 * * * America/Chicago` | will execute at 6:00am (UTC/GMT -5) every day.
+# |==========================================================
+#
+#
+# Further documentation describing this syntax can be found https://github.com/jmettraux/rufus-scheduler#parsing-cronlines-and-time-strings[here].
+#
+#
class LogStash::Inputs::Elasticsearch < LogStash::Inputs::Base
config_name "elasticsearch"
default :codec, "json"
@@ -112,12 +132,20 @@
config :ssl, :validate => :boolean, :default => false
# SSL Certificate Authority file in PEM encoded format, must also include any chain certificates as necessary
config :ca_file, :validate => :path
+ # Schedule of when to periodically run statement, in Cron format
+ # for example: "* * * * *" (execute query every minute, on the minute)
+ #
+ # There is no schedule by default. If no schedule is given, then the statement is run
+ # exactly once.
+ config :schedule, :validate => :string
+
def register
require "elasticsearch"
+ require "rufus/scheduler"
@options = {
:index => @index,
:body => @query,
:scroll => @scroll,
@@ -145,11 +173,31 @@
end
@client = Elasticsearch::Client.new(:hosts => hosts, :transport_options => transport_options)
end
+
def run(output_queue)
+ if @schedule
+ @scheduler = Rufus::Scheduler.new(:max_work_threads => 1)
+ @scheduler.cron @schedule do
+ do_run(output_queue)
+ end
+
+ @scheduler.join
+ else
+ do_run(output_queue)
+ end
+ end
+
+ def stop
+ @scheduler.stop if @scheduler
+ end
+
+ private
+
+ def do_run(output_queue)
# get first wave of data
r = @client.search(@options)
r['hits']['hits'].each { |hit| push_hit(hit, output_queue) }
has_hits = r['hits']['hits'].any?
@@ -157,11 +205,9 @@
while has_hits && !stop?
r = process_next_scroll(output_queue, r['_scroll_id'])
has_hits = r['has_hits']
end
end
-
- private
def process_next_scroll(output_queue, scroll_id)
r = scroll_request(scroll_id)
r['hits']['hits'].each { |hit| push_hit(hit, output_queue) }
{'has_hits' => r['hits']['hits'].any?, '_scroll_id' => r['_scroll_id']}