Sha256: adb43d7349fb720f98f7e0603a5e9157804cbd5f4ccb1b796e6a9439b769ba3a
Contents?: true
Size: 1.43 KB
Versions: 1
Compression:
Stored size: 1.43 KB
Contents
# encoding: UTF-8 require 'net/http' require 'date' class Fluent::ElasticsearchOutput < Fluent::BufferedOutput Fluent::Plugin.register_output('elasticsearch', self) config_param :host, :string, :default => 'localhost' config_param :port, :integer, :default => 9200 config_param :logstash_format, :bool, :default => false config_param :type_name, :string, :default => "fluentd" config_param :index_name, :string, :default => "fluentd" include Fluent::SetTagKeyMixin config_set_default :include_tag_key, false def initialize super end def configure(conf) super end def start super end def format(tag, time, record) [tag, time, record].to_msgpack end def shutdown super end def write(chunk) bulk_message = [] chunk.msgpack_each do |tag, time, record| if @logstash_format record.merge!({"@timestamp" => Time.at(time).to_datetime.to_s}) target_index = "logstash-#{Time.at(time).getutc.strftime("%Y.%m.%d")}" else target_index = @index_name end if @include_tag_key record.merge!(@tag_key => tag) end bulk_message << { "index" => {"_index" => target_index, "_type" => type_name} }.to_json bulk_message << record.to_json end bulk_message << "" http = Net::HTTP.new(@host, @port.to_i) request = Net::HTTP::Post.new("/_bulk") request.body = bulk_message.join("\n") http.request(request) end end
Version data entries
1 entries across 1 versions & 1 rubygems
Version | Path |
---|---|
fluent-plugin-elasticsearch-0.1.1 | lib/fluent/plugin/out_elasticsearch.rb |