lib/fluent/plugin/out_dd.rb in fluent-plugin-dd-0.1.4 vs lib/fluent/plugin/out_dd.rb in fluent-plugin-dd-0.1.5

- old
+ new

@@ -7,10 +7,11 @@ config_param :dd_api_key, :string config_param :host, :string, :default => nil config_param :use_fluentd_tag_for_datadog_tag, :bool, :default => false config_param :emit_in_background, :bool, :default => false + config_param :concurrency, :integer, :default => nil def initialize super require 'dogapi' require 'socket' @@ -21,35 +22,46 @@ super if @emit_in_background @queue = Queue.new - @thread = Thread.start do - while(chunk = @queue.pop) - emit_points(chunk) - Thread.pass + @threads = @concurrency.times.map do + Thread.start do + while (job = @queue.pop) + emit_points(*job) + Thread.pass + end end end end end def shutdown super if @emit_in_background - @queue.push(false) - @thread.join + @threads.size.times do + @queue.push(false) + end + @threads.each do |thread| + thread.join + end end end def configure(conf) super unless @dd_api_key raise Fluent::ConfigError, '`dd_api_key` is required' end + if !@emit_in_background && @concurrency + raise Fluent::ConfigError, '`concurrency` should be used with `emit_in_background`' + end + @concurrency ||= 1 + unless @host @host = %x[hostname -f 2> /dev/null].strip @host = Socket.gethostname if @host.empty? end @@ -59,20 +71,28 @@ def format(tag, time, record) [tag, time, record].to_msgpack end def write(chunk) - if @emit_in_background - @queue.push(chunk) - else - emit_points(chunk) + jobs = chunk_to_jobs(chunk) + + jobs.each do |job| + if @emit_in_background + @queue.push(job) + else + emit_points(*job) + end end end private - def emit_points(chunk) + def emit_points(metric, points, options) + @dog.emit_points(metric, points, options) + end + + def chunk_to_jobs(chunk) enum = chunk.to_enum(:msgpack_each) enum.select {|tag, time, record| unless record['metric'] log.warn("`metric` key does not exist: #{[tag, time, record].inspect}") @@ -85,11 +105,11 @@ if not dd_tag and @use_fluentd_tag_for_datadog_tag dd_tag = tag end [dd_tag] + record.values_at('metric', 'host', 'type') - }.each {|i, records| + }.map {|i, records| tag, metric, host, type = i points = records.map do |tag, time, record| time = Time.at(time) value = record['value'] @@ -99,9 +119,9 @@ options = {} options[:tags] = tag.split(',').map {|i| i.strip } if tag options[:host] = host if host options[:type] = type if type - @dog.emit_points(metric, points, options) + [metric, points, options] } end end