Sha256: b20faa1fd68ef93f1ff1e60dc3ec2ce8cda05891794b2280127858c6cd901301
Contents?: true
Size: 1.63 KB
Versions: 8
Compression:
Stored size: 1.63 KB
Contents
require 'thread' require 'typhoeus' require 'typhoeus/adapters/faraday' require 'elasticsearch' require_relative 'base' module Anschel class Output class Elasticsearch < Base def initialize config, stats, log default_index = config.delete(:default_index) || '.anschel' qsize = config.delete(:queue_size) || 2000 bsize = config.delete(:bulk_size) || 500 timeout = config.delete(:bulk_timeout) || 0.5 slice = timeout / bsize client = ::Elasticsearch::Client.new config client.transport.reload_connections! @queue = SizedQueue.new qsize @thread = Thread.new do loop do events = [] count = 0 start = Time.now.to_f until (Time.now.to_f - start > timeout) || ((count += 1) > bsize) begin events.push @queue.shift(true) rescue # shift returned immediately sleep slice end end next if events.empty? body = events.map do |e| index = e.delete(:_index) if index.nil? log.error \ event: 'elasticsearch-output-error', reason: 'event was not indexed', remediation: "sending to default index '#{default_index}'", raw_event: e index = default_index end { index: { _index: index, _type: e[:type], data: e } } end client.bulk body: body stats.inc 'output', body.size end end end end end end
Version data entries
8 entries across 8 versions & 1 rubygems