lib/logstash/inputs/cloudwatch_logs.rb in logstash-input-cloudwatch_logs-1.0.0.pre.3 vs lib/logstash/inputs/cloudwatch_logs.rb in logstash-input-cloudwatch_logs-1.0.0.rc1
- old
+ new
@@ -27,11 +27,11 @@
default :codec, "plain"
# Log group(s) to use as an input. If `log_group_prefix` is set
# to `true`, then each member of the array is treated as a prefix
- config :log_group, :validate => :string, :list => true, :required => true
+ config :log_group, :validate => :string, :list => true
# Where to write the since database (keeps track of the date
# the last handled log stream was updated). The default will write
# sincedb files to some path matching "$HOME/.sincedb*"
# Should be a path with filename not just a directory.
@@ -42,19 +42,27 @@
config :interval, :validate => :number, :default => 60
# Decide if log_group is a prefix or an absolute name
config :log_group_prefix, :validate => :boolean, :default => false
+ # When a new log group is encountered at initial plugin start (not already in
+ # sincedb), allow configuration to specify where to begin ingestion on this group.
+ # Valid options are: `beginning`, `end`, or an integer, representing number of
+ # seconds before now to read back from.
+ config :start_position, :default => 'beginning'
+
# def register
public
def register
require "digest/md5"
@logger.debug("Registering cloudwatch_logs input", :log_group => @log_group)
settings = defined?(LogStash::SETTINGS) ? LogStash::SETTINGS : nil
@sincedb = {}
+ check_start_position_validity
+
Aws::ConfigService::Client.new(aws_options_hash)
@cloudwatch = Aws::CloudWatchLogs::Client.new(aws_options_hash)
if @sincedb_path.nil?
if settings
@@ -86,25 +94,37 @@
:sincedb_path => @sincedb_path, :log_group => @log_group)
end
end #def register
+ public
+ def check_start_position_validity
+ raise LogStash::ConfigurationError, "No start_position specified!" unless @start_position
+
+ return if @start_position =~ /^(beginning|end)$/
+ return if @start_position.is_a? Integer
+
+ raise LogStash::ConfigurationError, "start_position '#{@start_position}' is invalid! Must be `beginning`, `end`, or an integer."
+ end # def check_start_position_validity
+
# def run
public
def run(queue)
@queue = queue
_sincedb_open
+ determine_start_position(find_log_groups)
- Stud.interval(@interval) do
+ while !stop?
groups = find_log_groups
groups.each do |group|
@logger.debug("calling process_group on #{group}")
process_group(group)
end # groups.each
- end
+ Stud.stoppable_sleep(@interval) { stop? }
+ end
end # def run
public
def find_log_groups
if @log_group_prefix
@@ -124,9 +144,27 @@
@logger.debug("log_group_prefix not enabled")
groups = @log_group
end
groups
end # def find_log_groups
+
+ public
+ def determine_start_position(groups)
+ groups.each do |group|
+ if !@sincedb.member?(group)
+ case @start_position
+ when 'beginning'
+ @sincedb[group] = 0
+
+ when 'end'
+ @sincedb[group] = DateTime.now.strftime('%Q')
+
+ else
+ @sincedb[group] = DateTime.now.strftime('%Q') - (@start_position * 1000)
+ end # case @start_position
+ end
+ end
+ end # def determine_start_position
private
def process_group(group)
next_token = nil
loop do