Sha256: 397aeaf10e4ec140313026c197da5c8316b7b3379dfe3860af0b5d8075c641cc

Contents?: true

Size: 1.59 KB

Versions: 4

Compression:

Stored size: 1.59 KB

Contents

require 'thread'

require 'typhoeus'
require 'typhoeus/adapters/faraday'
require 'elasticsearch'

require_relative 'base'


module Anschel
  class Output
    class Elasticsearch < Base
      def initialize config, stats, log
        default_index = config.delete(:default_index) || '.anschel'
        qsize   = config.delete(:queue_size) || 2000
        bsize   = config.delete(:bulk_size) || 500
        timeout = config.delete(:bulk_timeout) || 0.5
        slice   = timeout / bsize
        client  = ::Elasticsearch::Client.new config
        client.transport.reload_connections!

        @queue = SizedQueue.new qsize

        @thread = Thread.new do
          loop do
            events = []
            count  = 0
            start  = Time.now.to_f
            until (Time.now.to_f - start > timeout) || ((count += 1) > bsize)
              begin
                events.push @queue.shift(true)
              rescue # shift returned immediately
                sleep slice
              end
            end

            next if events.empty?

            body = events.map do |e|
              index = e.delete(:_index)
              if index.nil?
                log.error \
                  event: 'elasticsearch-output-error',
                  reason: 'event was not indexed',
                  remediation: "sending to default index '#{default_index}'",
                  raw_event: e
                index = default_index
              end
              { index: { _index: index, _type: e[:type], data: e } }
            end

            client.bulk body: body
          end
        end
      end
    end
  end
end

Version data entries

4 entries across 4 versions & 1 rubygems

Version Path
anschel-0.6.5 lib/anschel/output/elasticsearch.rb
anschel-0.6.4 lib/anschel/output/elasticsearch.rb
anschel-0.6.3 lib/anschel/output/elasticsearch.rb
anschel-0.6.2 lib/anschel/output/elasticsearch.rb