lib/logstash/inputs/cloudwatch_logs.rb in logstash-input-cloudwatch_logs-0.10.3 vs lib/logstash/inputs/cloudwatch_logs.rb in logstash-input-cloudwatch_logs-1.0.0.pre
- old
+ new
@@ -2,15 +2,14 @@
require "logstash/inputs/base"
require "logstash/namespace"
require "logstash/plugin_mixins/aws_config"
require "logstash/timestamp"
require "time"
-require "tmpdir"
require "stud/interval"
-require "stud/temporary"
require "aws-sdk"
-require "logstash/inputs/cloudwatch/patch"
+require "logstash/inputs/cloudwatch_logs/patch"
+require "fileutils"
Aws.eager_autoload!
# Stream events from CloudWatch Logs streams.
#
@@ -26,13 +25,13 @@
config_name "cloudwatch_logs"
default :codec, "plain"
- # Log group to pull logs from for this plugin. Will pull in all
- # streams inside of this log group.
- config :log_group, :validate => :string, :required => true
+ # Log group(s) to use as an input. If `log_group_prefix` is set
+ # to `true`, then each member of the array is treated as a prefix
+ config :log_group, :validate => :array, :required => true
# Where to write the since database (keeps track of the date
# the last handled log stream was updated). The default will write
# sincedb files to some path matching "$HOME/.sincedb*"
# Should be a path with filename not just a directory.
@@ -48,207 +47,169 @@
# def register
public
def register
require "digest/md5"
+ @logger.trace("Registering cloudwatch_logs input", :log_group => @log_group)
+ settings = defined?(LogStash::SETTINGS) ? LogStash::SETTINGS : nil
+ @sincedb = {}
- @logger.info("Registering cloudwatch_logs input", :log_group => @log_group)
-
Aws::ConfigService::Client.new(aws_options_hash)
-
@cloudwatch = Aws::CloudWatchLogs::Client.new(aws_options_hash)
+
+ if @sincedb_path.nil?
+ if settings
+ datapath = File.join(settings.get_value("path.data"), "plugins", "inputs", "cloudwatch_logs")
+ # Ensure that the filepath exists before writing, since it's deeply nested.
+ FileUtils::mkdir_p datapath
+ @sincedb_path = File.join(datapath, ".sincedb_" + Digest::MD5.hexdigest(@log_group.join(",")))
+ end
+ end
+
+ # This section is going to be deprecated eventually, as path.data will be
+ # the default, not an environment variable (SINCEDB_DIR or HOME)
+ if @sincedb_path.nil? # If it is _still_ nil...
+ if ENV["SINCEDB_DIR"].nil? && ENV["HOME"].nil?
+ @logger.error("No SINCEDB_DIR or HOME environment variable set, I don't know where " \
+ "to keep track of the files I'm watching. Either set " \
+ "HOME or SINCEDB_DIR in your environment, or set sincedb_path in " \
+ "in your Logstash config for the file input with " \
+ "path '#{@path.inspect}'")
+ raise
+ end
+
+ #pick SINCEDB_DIR if available, otherwise use HOME
+ sincedb_dir = ENV["SINCEDB_DIR"] || ENV["HOME"]
+
+ @sincedb_path = File.join(sincedb_dir, ".sincedb_" + Digest::MD5.hexdigest(@log_group.join(",")))
+
+ @logger.info("No sincedb_path set, generating one based on the log_group setting",
+ :sincedb_path => @sincedb_path, :log_group => @log_group)
+ end
+
end #def register
# def run
public
def run(queue)
- while !stop?
- process_group(queue)
- Stud.stoppable_sleep(@interval)
+ @queue = queue
+ _sincedb_open
+
+ Stud.interval(@interval) do
+ groups = find_log_groups
+
+ groups.each do |group|
+ @logger.debug("calling process_group on #{group}")
+ process_group(group)
+ end # groups.each
end
+
end # def run
- # def list_new_streams
public
- def list_new_streams()
+ def find_log_groups
if @log_group_prefix
- log_groups = @cloudwatch.describe_log_groups(log_group_name_prefix: @log_group)
- groups = log_groups.log_groups.map {|n| n.log_group_name}
- while log_groups.next_token
- log_groups = @cloudwatch.describe_log_groups(log_group_name_prefix: @log_group, next_token: log_groups.next_token)
- groups += log_groups.log_groups.map {|n| n.log_group_name}
+ @logger.debug("log_group prefix is enabled, searching for log groups")
+ groups = []
+ next_token = nil
+ @log_group.each do |group|
+ loop do
+ log_groups = @cloudwatch.describe_log_groups(log_group_name_prefix: group, next_token: next_token)
+ groups += log_groups.log_groups.map {|n| n.log_group_name}
+ next_token = log_groups.next_token
+ @logger.debug("found #{log_groups.log_groups.length} log groups matching prefix #{group}")
+ break if next_token.nil?
+ end
end
else
- groups = [@log_group]
+ @logger.debug("log_group_prefix not enabled")
+ groups = @log_group
end
- objects = []
- for log_group in groups
- objects.concat(list_new_streams_for_log_group(log_group))
- end
- objects
- end
+ groups
+ end # def find_log_groups
- # def list_new_streams_for_log_group
- public
- def list_new_streams_for_log_group(log_group, token = nil, objects = [], stepback=0)
- params = {
- :log_group_name => log_group,
- :order_by => "LastEventTime",
- :descending => false
- }
+ private
+ def process_group(group)
+ next_token = nil
+ loop do
+ if !@sincedb.member?(group)
+ @sincedb[group] = 0
+ end
+ params = {
+ :log_group_name => group,
+ :start_time => @sincedb[group],
+ :limit => 10,
+ :interleaved => true,
+ :next_token => next_token
+ }
+ resp = @cloudwatch.filter_log_events(params)
- @logger.debug("CloudWatch Logs for log_group #{log_group}")
+ resp.events.each do |event|
+ process_log(event, group)
+ end
- if token != nil
- params[:next_token] = token
- end
+ _sincedb_write
- begin
- streams = @cloudwatch.describe_log_streams(params)
- rescue Aws::CloudWatchLogs::Errors::ThrottlingException
- @logger.debug("CloudWatch Logs stepping back ", :stepback => 2 ** stepback * 60)
- sleep(2 ** stepback * 60)
- stepback += 1
- @logger.debug("CloudWatch Logs repeating list_new_streams again with token", :token => token)
- return list_new_streams_for_log_group(log_group, token=token, objects=objects, stepback=stepback)
+ next_token = resp.next_token
+ break if next_token.nil?
end
+ end #def process_group
- objects.push(*streams.log_streams)
- if streams.next_token == nil
- @logger.debug("CloudWatch Logs hit end of tokens for streams")
- objects
- else
- @logger.debug("CloudWatch Logs calling list_new_streams again on token", :token => streams.next_token)
- list_new_streams_for_log_group(log_group, streams.next_token, objects)
- end
- end # def list_new_streams_for_log_group
-
# def process_log
private
- def process_log(queue, log, stream)
+ def process_log(log, group)
@codec.decode(log.message.to_str) do |event|
event.set("@timestamp", parse_time(log.timestamp))
- event.set("[cloudwatch][ingestion_time]", parse_time(log.ingestion_time))
- event.set("[cloudwatch][log_group]", stream.arn.split(/:/)[6])
- event.set("[cloudwatch][log_stream]", stream.log_stream_name)
+ event.set("[cloudwatch_logs][ingestion_time]", parse_time(log.ingestion_time))
+ event.set("[cloudwatch_logs][log_group]", group)
+ event.set("[cloudwatch_logs][log_stream]", log.log_stream_name)
+ event.set("[cloudwatch_logs][event_id]", log.event_id)
decorate(event)
- queue << event
+ @queue << event
+ @sincedb[group] = log.timestamp + 1
end
- end
- # def process_log
+ end # def process_log
# def parse_time
private
def parse_time(data)
LogStash::Timestamp.at(data.to_i / 1000, (data.to_i % 1000) * 1000)
end # def parse_time
- # def process_group
- public
- def process_group(queue)
- objects = list_new_streams
-
- last_read = sincedb.read
- current_window = DateTime.now.strftime('%Q')
-
- if last_read < 0
- last_read = 1
- end
-
- objects.each do |stream|
- if stream.last_ingestion_time && stream.last_ingestion_time > last_read
- process_log_stream(queue, stream, last_read, current_window)
+ private
+ def _sincedb_open
+ begin
+ File.open(@sincedb_path) do |db|
+ @logger.debug? && @logger.debug("_sincedb_open: reading from #{@sincedb_path}")
+ db.each do |line|
+ group, pos = line.split(" ", 2)
+ @logger.debug? && @logger.debug("_sincedb_open: setting #{group} to #{pos.to_i}")
+ @sincedb[group] = pos.to_i
+ end
end
+ rescue
+ #No existing sincedb to load
+ @logger.debug? && @logger.debug("_sincedb_open: error: #{@sincedb_path}: #{$!}")
end
+ end # def _sincedb_open
- sincedb.write(current_window)
- end # def process_group
-
- # def process_log_stream
private
- def process_log_stream(queue, stream, last_read, current_window, token = nil, stepback=0)
- @logger.debug("CloudWatch Logs processing stream",
- :log_stream => stream.log_stream_name,
- :log_group => stream.arn.split(":")[6],
- :lastRead => last_read,
- :currentWindow => current_window,
- :token => token
- )
-
- params = {
- :log_group_name => stream.arn.split(":")[6],
- :log_stream_name => stream.log_stream_name,
- :start_from_head => true
- }
-
- if token != nil
- params[:next_token] = token
- end
-
-
+ def _sincedb_write
begin
- logs = @cloudwatch.get_log_events(params)
- rescue Aws::CloudWatchLogs::Errors::ThrottlingException
- @logger.debug("CloudWatch Logs stepping back ", :stepback => 2 ** stepback * 60)
- sleep(2 ** stepback * 60)
- stepback += 1
- @logger.debug("CloudWatch Logs repeating process_log_stream again with token", :token => token)
- return process_log_stream(queue, stream, last_read, current_window, token, stepback)
+ IO.write(@sincedb_path, serialize_sincedb, 0)
+ rescue Errno::EACCES
+ # probably no file handles free
+ # maybe it will work next time
+ @logger.debug? && @logger.debug("_sincedb_write: error: #{@sincedb_path}: #{$!}")
end
+ end # def _sincedb_write
- logs.events.each do |log|
- if log.ingestion_time > last_read
- process_log(queue, log, stream)
- end
- end
- # if there are more pages, continue
- if logs.events.count != 0 && logs.next_forward_token != nil
- process_log_stream(queue, stream, last_read, current_window, logs.next_forward_token)
- end
- end # def process_log_stream
-
private
- def sincedb
- @sincedb ||= if @sincedb_path.nil?
- @logger.info("Using default generated file for the sincedb", :filename => sincedb_file)
- SinceDB::File.new(sincedb_file)
- else
- @logger.info("Using the provided sincedb_path",
- :sincedb_path => @sincedb_path)
- SinceDB::File.new(@sincedb_path)
- end
- end
-
- private
- def sincedb_file
- File.join(ENV["HOME"], ".sincedb_" + Digest::MD5.hexdigest("#{@log_group}"))
- end
-
- module SinceDB
- class File
- def initialize(file)
- @sincedb_path = file
- end
-
- def newer?(date)
- date > read
- end
-
- def read
- if ::File.exists?(@sincedb_path)
- since = ::File.read(@sincedb_path).chomp.strip.to_i
- else
- since = 1
- end
- return since
- end
-
- def write(since = nil)
- since = DateTime.now.strftime('%Q') if since.nil?
- ::File.open(@sincedb_path, 'w') { |file| file.write(since.to_s) }
- end
- end
+ def serialize_sincedb
+ @sincedb.map do |group, pos|
+ [group, pos].join(" ")
+ end.join("\n") + "\n"
end
end # class LogStash::Inputs::CloudWatch_Logs