Sha256: ab16c262ab1ab95a884ed8aef9bf7a93b34c5fdc1310a35d7c6d4d7addb9e7c6

Contents?: true

Size: 1.53 KB

Versions: 1

Compression:

Stored size: 1.53 KB

Contents

class Fluent::DdOutput < Fluent::BufferedOutput
  Fluent::Plugin.register_output('dd', self)

  unless method_defined?(:log)
    define_method('log') { $log }
  end

  config_param :dd_api_key, :string
  config_param :host, :string, :default => nil

  def initialize
    super
    require 'dogapi'
    require 'socket'
  end

  def start
    super
  end

  def shutdown
    super
  end

  def configure(conf)
    super

    unless @dd_api_key
      raise Fluent::ConfigError, '`dd_api_key` is required'
    end

    unless @host
      @host = %x[hostname -f 2> /dev/null].strip
      @host = Socket.gethostname if @host.empty?
    end

    @dog = Dogapi::Client.new(@dd_api_key, nil, @host)
  end

  def format(tag, time, record)
    [tag, time, record].to_msgpack
  end

  def write(chunk)
    enum = chunk.to_enum(:msgpack_each)

    enum.select {|tag, time, record|
      unless record['metric']
        log.warn("`metric` key does not exist: #{[tag, time, record].inspect}")
      end

      record['metric']
    }.chunk {|tag, time, record|
      tag = record['tag'] || tag
      [tag] + record.values_at('metric', 'host', 'type')
    }.each {|i, records|
      tag, metric, host, type = i

      points = records.map do |tag, time, record|
        time = Time.at(time)
        value = record['value']
        [time, value]
      end

      options = {}
      options['tags'] = tag.split(',').map {|i| i.strip } if tag
      options['host'] = host if host
      options['type'] = type if type

      @dog.emit_points(metric, points, options)
    }
  end
end

Version data entries

1 entries across 1 versions & 1 rubygems

Version Path
fluent-plugin-dd-0.1.1 lib/fluent/plugin/out_dd.rb