lib/fluent/plugin/out_dd.rb in fluent-plugin-dd-0.1.0 vs lib/fluent/plugin/out_dd.rb in fluent-plugin-dd-0.1.1

- old
+ new

@@ -1,26 +1,19 @@ class Fluent::DdOutput < Fluent::BufferedOutput - include Fluent::SetTimeKeyMixin - include Fluent::SetTagKeyMixin - Fluent::Plugin.register_output('dd', self) unless method_defined?(:log) define_method('log') { $log } end - config_set_default :include_time_key, true - config_set_default :include_tag_key, true - config_param :dd_api_key, :string config_param :host, :string, :default => nil def initialize super require 'dogapi' require 'socket' - require 'time' end def start super end @@ -43,34 +36,35 @@ @dog = Dogapi::Client.new(@dd_api_key, nil, @host) end def format(tag, time, record) - record.to_msgpack + [tag, time, record].to_msgpack end def write(chunk) enum = chunk.to_enum(:msgpack_each) - enum.select {|record| + enum.select {|tag, time, record| unless record['metric'] - log.warn("`metric` key does not exist: #{record.inspect}") + log.warn("`metric` key does not exist: #{[tag, time, record].inspect}") end record['metric'] - }.chunk {|record| - record.values_at('metric', 'tag', 'host', 'type') + }.chunk {|tag, time, record| + tag = record['tag'] || tag + [tag] + record.values_at('metric', 'host', 'type') }.each {|i, records| - metric, tag, host, type = i + tag, metric, host, type = i - points = records.map do |record| - time = Time.parse(record['time']) + points = records.map do |tag, time, record| + time = Time.at(time) value = record['value'] [time, value] end options = {} - options['tags'] = [tag] if tag + options['tags'] = tag.split(',').map {|i| i.strip } if tag options['host'] = host if host options['type'] = type if type @dog.emit_points(metric, points, options) }