class Fluent::CloudwatchInput < Fluent::Input Fluent::Plugin.register_input("cloudwatch", self) config_param :tag, :string config_param :access_key, :string config_param :secret_key, :string config_param :endpoint, :string, :default => "monitoring.amazonaws.com" def initialize super require 'rubygems' require 'AWS' require 'time' end def configure(conf) super @mon = AWS::Cloudwatch::Base.new( :access_key_id => access_key, :secret_access_key => secret_key, :server => endpoint) @metrics = conf.elements.select {|e| e.name == 'metric' }.map {|e| {:namespace => e['namespace'], :statistics => e['statistics'], :dimensions => e['dimensions'], :measure_name => e['metric_name']} } end def start super @watcher = Thread.new(&method(:watch)) end def shutdown super @watcher.terminate @watcher.join end private def watch while true Fluent::Engine.emit(tag, Fluent::Engine.now, output) sleep 60 * 5 end end def output end_time = Time.now start_time = end_time - 60 * 60 @metrics.map {|m| m[:start_time] = start_time m[:end_time] = end_time format m, @mon.get_metric_statistics(m) } end def format(m, s) d = s["GetMetricStatisticsResult"]["Datapoints"]["member"].sort {|a,b| time(a) <=> time(b) }.last d["MetricName"] = m[:measure_name] d["Statistics"] = m[:statistics] d["Value"] = d.delete(m[:statistics]) d end def time(d) Time.parse d["Timestamp"] end end