Sha256: 0277a3d437ae4d6d4ba1e5e7b50bfa5b45ec6805e5a5c95c13694f9c7506018f
Contents?: true
Size: 1.39 KB
Versions: 7
Compression:
Stored size: 1.39 KB
Contents
require_relative 'input/kafka' require_relative 'input/rabbitmq' module Anschel class Input def initialize config, qsize, stats, log, leftovers=[] log.info event: 'output-loading' log.info event: 'output-config', config: config, qsize: qsize @queue = SizedQueue.new(qsize || 2000) Thread.new do leftovers ||= [] log.warn event: 'input-leftovers', leftovers_size: leftovers.size leftovers.each { |l| @queue << l } end @inputs = [] stats.create 'input' stats.get 'input' config.each do |input| case input.delete(:kind) when 'kafka' @inputs << Input::Kafka.new(@queue, input, stats, log) log.info event: 'input-loaded', kind: 'kafka' when 'rabbitmq' @inputs << Input::RabbitMQ.new(@queue, input, stats, log) log.info event: 'input-loaded', kind: 'rabbitmq' else raise 'Uknown input type' end end log.info event: 'input-fully-loaded' end def stop return @leftovers if defined? @leftovers @inputs.map &:stop @leftovers = [] @leftovers << shift until @queue.empty? end def leftovers return @leftovers if defined? @leftovers end def shift event = @queue.shift case event when String event else event.message.to_s end end end end
Version data entries
7 entries across 7 versions & 1 rubygems