Sha256: 91565f4e10f09e5bc370bbf20531941f2c22a1130458c309a6aa8333cd210d28
Contents?: true
Size: 1.07 KB
Versions: 1
Compression:
Stored size: 1.07 KB
Contents
# encoding: UTF-8 class Fluent::ElasticsearchOutput < Fluent::BufferedOutput # register plugin first. attr_reader :es Fluent::Plugin.register_output('elasticsearch', self) config_param :host, :string, :default => 'localhost' config_param :port, :integer, :default => 9200 config_param :index, :string, :default => "fluentd" def initialize require 'elasticsearch' super end def configure(conf) super end def start super es_url = "#{self.host}:#{self.port}" @es = Elasticsearch::Client.new hosts: [es_url] end def shutdown super end def format(tag, time, record) [tag, time, record].to_msgpack end def write(chunk) bulk_items = [] chunk.msgpack_each do |tag, time, record| bulk_items << { :index => { :_index => self.index, :_type => tag, :data => { :tag => tag, :time => time, :record => record } } } end ## now bulk index @es.bulk :index => self.index, :body => bulk_items end ##-- end
Version data entries
1 entries across 1 versions & 1 rubygems
Version | Path |
---|---|
fluent-plugin-elasticsearch-ruby-0.0.3 | lib/fluent/plugin/out_elasticsearch.rb |