lib/fluent/plugin/out_dd.rb in fluent-plugin-dd-0.1.4 vs lib/fluent/plugin/out_dd.rb in fluent-plugin-dd-0.1.5
- old
+ new
@@ -7,10 +7,11 @@
config_param :dd_api_key, :string
config_param :host, :string, :default => nil
config_param :use_fluentd_tag_for_datadog_tag, :bool, :default => false
config_param :emit_in_background, :bool, :default => false
+ config_param :concurrency, :integer, :default => nil
def initialize
super
require 'dogapi'
require 'socket'
@@ -21,35 +22,46 @@
super
if @emit_in_background
@queue = Queue.new
- @thread = Thread.start do
- while(chunk = @queue.pop)
- emit_points(chunk)
- Thread.pass
+ @threads = @concurrency.times.map do
+ Thread.start do
+ while (job = @queue.pop)
+ emit_points(*job)
+ Thread.pass
+ end
end
end
end
end
def shutdown
super
if @emit_in_background
- @queue.push(false)
- @thread.join
+ @threads.size.times do
+ @queue.push(false)
+ end
+ @threads.each do |thread|
+ thread.join
+ end
end
end
def configure(conf)
super
unless @dd_api_key
raise Fluent::ConfigError, '`dd_api_key` is required'
end
+ if !@emit_in_background && @concurrency
+ raise Fluent::ConfigError, '`concurrency` should be used with `emit_in_background`'
+ end
+ @concurrency ||= 1
+
unless @host
@host = %x[hostname -f 2> /dev/null].strip
@host = Socket.gethostname if @host.empty?
end
@@ -59,20 +71,28 @@
def format(tag, time, record)
[tag, time, record].to_msgpack
end
def write(chunk)
- if @emit_in_background
- @queue.push(chunk)
- else
- emit_points(chunk)
+ jobs = chunk_to_jobs(chunk)
+
+ jobs.each do |job|
+ if @emit_in_background
+ @queue.push(job)
+ else
+ emit_points(*job)
+ end
end
end
private
- def emit_points(chunk)
+ def emit_points(metric, points, options)
+ @dog.emit_points(metric, points, options)
+ end
+
+ def chunk_to_jobs(chunk)
enum = chunk.to_enum(:msgpack_each)
enum.select {|tag, time, record|
unless record['metric']
log.warn("`metric` key does not exist: #{[tag, time, record].inspect}")
@@ -85,11 +105,11 @@
if not dd_tag and @use_fluentd_tag_for_datadog_tag
dd_tag = tag
end
[dd_tag] + record.values_at('metric', 'host', 'type')
- }.each {|i, records|
+ }.map {|i, records|
tag, metric, host, type = i
points = records.map do |tag, time, record|
time = Time.at(time)
value = record['value']
@@ -99,9 +119,9 @@
options = {}
options[:tags] = tag.split(',').map {|i| i.strip } if tag
options[:host] = host if host
options[:type] = type if type
- @dog.emit_points(metric, points, options)
+ [metric, points, options]
}
end
end