Sha256: adb43d7349fb720f98f7e0603a5e9157804cbd5f4ccb1b796e6a9439b769ba3a

Contents?: true

Size: 1.43 KB

Versions: 1

Compression:

Stored size: 1.43 KB

Contents

# encoding: UTF-8
require 'net/http'
require 'date'

class Fluent::ElasticsearchOutput < Fluent::BufferedOutput
  Fluent::Plugin.register_output('elasticsearch', self)

  config_param :host, :string,  :default => 'localhost'
  config_param :port, :integer, :default => 9200
  config_param :logstash_format, :bool, :default => false
  config_param :type_name, :string, :default => "fluentd"
  config_param :index_name, :string, :default => "fluentd"

  include Fluent::SetTagKeyMixin
  config_set_default :include_tag_key, false

  def initialize
    super
  end

  def configure(conf)
    super
  end

  def start
    super
  end

  def format(tag, time, record)
    [tag, time, record].to_msgpack
  end

  def shutdown
    super
  end

  def write(chunk)
    bulk_message = []

    chunk.msgpack_each do |tag, time, record|
      if @logstash_format
        record.merge!({"@timestamp" => Time.at(time).to_datetime.to_s})
        target_index = "logstash-#{Time.at(time).getutc.strftime("%Y.%m.%d")}"
      else
        target_index = @index_name
      end

      if @include_tag_key
        record.merge!(@tag_key => tag)
      end

      bulk_message << { "index" => {"_index" => target_index, "_type" => type_name} }.to_json
      bulk_message << record.to_json
    end
    bulk_message << ""

    http = Net::HTTP.new(@host, @port.to_i)
    request = Net::HTTP::Post.new("/_bulk")
    request.body = bulk_message.join("\n")
    http.request(request)
  end
end

Version data entries

1 entries across 1 versions & 1 rubygems

Version Path
fluent-plugin-elasticsearch-0.1.1 lib/fluent/plugin/out_elasticsearch.rb