Sha256: 0503e7aaccaf912864d41c28660500f6d926b55365d41f51fef8d9f5d74cd1b9

Contents?: true

Size: 1.59 KB

Versions: 1

Compression:

Stored size: 1.59 KB

Contents

class Fluent::DdOutput < Fluent::BufferedOutput
  include Fluent::SetTimeKeyMixin
  include Fluent::SetTagKeyMixin

  Fluent::Plugin.register_output('dd', self)

  unless method_defined?(:log)
    define_method('log') { $log }
  end

  config_set_default :include_time_key, true
  config_set_default :include_tag_key, true

  config_param :dd_api_key, :string
  config_param :host, :string, :default => nil

  def initialize
    super
    require 'dogapi'
    require 'socket'
    require 'time'
  end

  def start
    super
  end

  def shutdown
    super
  end

  def configure(conf)
    super

    unless @dd_api_key
      raise Fluent::ConfigError, '`dd_api_key` is required'
    end

    unless @host
      @host = %x[hostname -f 2> /dev/null].strip
      @host = Socket.gethostname if @host.empty?
    end

    @dog = Dogapi::Client.new(@dd_api_key, nil, @host)
  end

  def format(tag, time, record)
    record.to_msgpack
  end

  def write(chunk)
    enum = chunk.to_enum(:msgpack_each)

    enum.select {|record|
      unless record['metric']
        log.warn("`metric` key does not exist: #{record.inspect}")
      end

      record['metric']
    }.chunk {|record|
      record.values_at('metric', 'tag', 'host', 'type')
    }.each {|i, records|
      metric, tag, host, type = i

      points = records.map do |record|
        time = Time.parse(record['time'])
        value = record['value']
        [time, value]
      end

      options = {}
      options['tags'] = [tag] if tag
      options['host'] = host if host
      options['type'] = type if type

      @dog.emit_points(metric, points, options)
    }
  end
end

Version data entries

1 entries across 1 versions & 1 rubygems

Version Path
fluent-plugin-dd-0.1.0 lib/fluent/plugin/out_dd.rb