Sha256: e6c6a8c4e43c0b982cf6798bd4c84d634d77b7b661ee85c478106e7d12ebf56d
Contents?: true
Size: 1.35 KB
Versions: 4
Compression:
Stored size: 1.35 KB
Contents
require_relative 'input/kafka' require_relative 'input/rabbitmq' module Anschel class Input def initialize config, qsize, stats, log, leftovers=[] log.info event: 'output-loading' log.debug event: 'output-config', config: config, qsize: qsize @queue = SizedQueue.new qsize || 2000 Thread.new do leftovers ||= [] log.trace event: 'input-leftovers', leftovers_size: leftovers.size leftovers.each { |l| @queue << l } end @inputs = [] config.each do |input| case input.delete(:kind) when 'kafka' @inputs << Input::Kafka.new(@queue, input, stats, log) log.trace event: 'input-loaded', kind: 'kafka' when 'rabbitmq' @inputs << Input::RabbitMQ.new(@queue, input, stats, log) log.trace event: 'input-loaded', kind: 'rabbitmq' else raise 'Uknown input type' end end log.info event: 'input-fully-loaded' end def stop return @leftovers if defined? @leftovers @inputs.map &:stop @leftovers = [] @leftovers << shift until @queue.empty? end def leftovers return @leftovers if defined? @leftovers end def shift event = @queue.shift case event when String event else event.message.to_s end end end end
Version data entries
4 entries across 4 versions & 1 rubygems
Version | Path |
---|---|
anschel-0.6.5 | lib/anschel/input.rb |
anschel-0.6.4 | lib/anschel/input.rb |
anschel-0.6.3 | lib/anschel/input.rb |
anschel-0.6.2 | lib/anschel/input.rb |