lib/logstash/inputs/cloudwatch_logs.rb in logstash-input-cloudwatch_logs-1.0.0.pre.3 vs lib/logstash/inputs/cloudwatch_logs.rb in logstash-input-cloudwatch_logs-1.0.0.rc1

- old
+ new

@@ -27,11 +27,11 @@ default :codec, "plain" # Log group(s) to use as an input. If `log_group_prefix` is set # to `true`, then each member of the array is treated as a prefix - config :log_group, :validate => :string, :list => true, :required => true + config :log_group, :validate => :string, :list => true # Where to write the since database (keeps track of the date # the last handled log stream was updated). The default will write # sincedb files to some path matching "$HOME/.sincedb*" # Should be a path with filename not just a directory. @@ -42,19 +42,27 @@ config :interval, :validate => :number, :default => 60 # Decide if log_group is a prefix or an absolute name config :log_group_prefix, :validate => :boolean, :default => false + # When a new log group is encountered at initial plugin start (not already in + # sincedb), allow configuration to specify where to begin ingestion on this group. + # Valid options are: `beginning`, `end`, or an integer, representing number of + # seconds before now to read back from. + config :start_position, :default => 'beginning' + # def register public def register require "digest/md5" @logger.debug("Registering cloudwatch_logs input", :log_group => @log_group) settings = defined?(LogStash::SETTINGS) ? LogStash::SETTINGS : nil @sincedb = {} + check_start_position_validity + Aws::ConfigService::Client.new(aws_options_hash) @cloudwatch = Aws::CloudWatchLogs::Client.new(aws_options_hash) if @sincedb_path.nil? if settings @@ -86,25 +94,37 @@ :sincedb_path => @sincedb_path, :log_group => @log_group) end end #def register + public + def check_start_position_validity + raise LogStash::ConfigurationError, "No start_position specified!" unless @start_position + + return if @start_position =~ /^(beginning|end)$/ + return if @start_position.is_a? Integer + + raise LogStash::ConfigurationError, "start_position '#{@start_position}' is invalid! Must be `beginning`, `end`, or an integer." + end # def check_start_position_validity + # def run public def run(queue) @queue = queue _sincedb_open + determine_start_position(find_log_groups) - Stud.interval(@interval) do + while !stop? groups = find_log_groups groups.each do |group| @logger.debug("calling process_group on #{group}") process_group(group) end # groups.each - end + Stud.stoppable_sleep(@interval) { stop? } + end end # def run public def find_log_groups if @log_group_prefix @@ -124,9 +144,27 @@ @logger.debug("log_group_prefix not enabled") groups = @log_group end groups end # def find_log_groups + + public + def determine_start_position(groups) + groups.each do |group| + if !@sincedb.member?(group) + case @start_position + when 'beginning' + @sincedb[group] = 0 + + when 'end' + @sincedb[group] = DateTime.now.strftime('%Q') + + else + @sincedb[group] = DateTime.now.strftime('%Q') - (@start_position * 1000) + end # case @start_position + end + end + end # def determine_start_position private def process_group(group) next_token = nil loop do