lib/fluent/plugin/out_dd.rb in fluent-plugin-dd-0.1.0 vs lib/fluent/plugin/out_dd.rb in fluent-plugin-dd-0.1.1
- old
+ new
@@ -1,26 +1,19 @@
class Fluent::DdOutput < Fluent::BufferedOutput
- include Fluent::SetTimeKeyMixin
- include Fluent::SetTagKeyMixin
-
Fluent::Plugin.register_output('dd', self)
unless method_defined?(:log)
define_method('log') { $log }
end
- config_set_default :include_time_key, true
- config_set_default :include_tag_key, true
-
config_param :dd_api_key, :string
config_param :host, :string, :default => nil
def initialize
super
require 'dogapi'
require 'socket'
- require 'time'
end
def start
super
end
@@ -43,34 +36,35 @@
@dog = Dogapi::Client.new(@dd_api_key, nil, @host)
end
def format(tag, time, record)
- record.to_msgpack
+ [tag, time, record].to_msgpack
end
def write(chunk)
enum = chunk.to_enum(:msgpack_each)
- enum.select {|record|
+ enum.select {|tag, time, record|
unless record['metric']
- log.warn("`metric` key does not exist: #{record.inspect}")
+ log.warn("`metric` key does not exist: #{[tag, time, record].inspect}")
end
record['metric']
- }.chunk {|record|
- record.values_at('metric', 'tag', 'host', 'type')
+ }.chunk {|tag, time, record|
+ tag = record['tag'] || tag
+ [tag] + record.values_at('metric', 'host', 'type')
}.each {|i, records|
- metric, tag, host, type = i
+ tag, metric, host, type = i
- points = records.map do |record|
- time = Time.parse(record['time'])
+ points = records.map do |tag, time, record|
+ time = Time.at(time)
value = record['value']
[time, value]
end
options = {}
- options['tags'] = [tag] if tag
+ options['tags'] = tag.split(',').map {|i| i.strip } if tag
options['host'] = host if host
options['type'] = type if type
@dog.emit_points(metric, points, options)
}