Sha256: 91565f4e10f09e5bc370bbf20531941f2c22a1130458c309a6aa8333cd210d28

Contents?: true

Size: 1.07 KB

Versions: 1

Compression:

Stored size: 1.07 KB

Contents

# encoding: UTF-8

class Fluent::ElasticsearchOutput < Fluent::BufferedOutput
  # register plugin first.  
  attr_reader :es
  Fluent::Plugin.register_output('elasticsearch', self)

  config_param :host,  :string,  :default => 'localhost'
  config_param :port,  :integer, :default => 9200
  config_param :index, :string, :default => "fluentd"

  def initialize
    require 'elasticsearch'
    super
  end

  def configure(conf)
    super
  end

  def start
    super
    es_url = "#{self.host}:#{self.port}"
    @es = Elasticsearch::Client.new hosts: [es_url]
  end

  def shutdown
    super
  end

  def format(tag, time, record)
    [tag, time, record].to_msgpack
  end

  def write(chunk)
    bulk_items = []

    chunk.msgpack_each do |tag, time, record|
      bulk_items << {
        :index => { 
          :_index => self.index,
          :_type => tag,
          :data => {
            :tag  => tag,
            :time => time,
            :record => record
          }
        }
      }
    end
    ## now bulk index
    @es.bulk :index => self.index, :body => bulk_items
  end
  ##--
end

Version data entries

1 entries across 1 versions & 1 rubygems

Version Path
fluent-plugin-elasticsearch-ruby-0.0.3 lib/fluent/plugin/out_elasticsearch.rb