Sha256: e6c6a8c4e43c0b982cf6798bd4c84d634d77b7b661ee85c478106e7d12ebf56d

Contents?: true

Size: 1.35 KB

Versions: 4

Compression:

Stored size: 1.35 KB

Contents

require_relative 'input/kafka'
require_relative 'input/rabbitmq'


module Anschel
  class Input
    def initialize config, qsize, stats, log, leftovers=[]
      log.info event: 'output-loading'
      log.debug event: 'output-config', config: config, qsize: qsize

      @queue = SizedQueue.new qsize || 2000

      Thread.new do
        leftovers ||= []
        log.trace event: 'input-leftovers', leftovers_size: leftovers.size
        leftovers.each { |l| @queue << l }
      end

      @inputs = []

      config.each do |input|
        case input.delete(:kind)
        when 'kafka'
          @inputs << Input::Kafka.new(@queue, input, stats, log)
          log.trace event: 'input-loaded', kind: 'kafka'
        when 'rabbitmq'
          @inputs << Input::RabbitMQ.new(@queue, input, stats, log)
          log.trace event: 'input-loaded', kind: 'rabbitmq'
        else
          raise 'Uknown input type'
        end
      end

      log.info event: 'input-fully-loaded'
    end


    def stop
      return @leftovers if defined? @leftovers
      @inputs.map &:stop
      @leftovers = []
      @leftovers << shift until @queue.empty?
    end


    def leftovers
      return @leftovers if defined? @leftovers
    end


    def shift
      event = @queue.shift
      case event
      when String
        event
      else
        event.message.to_s
      end
    end
  end
end

Version data entries

4 entries across 4 versions & 1 rubygems

Version Path
anschel-0.6.5 lib/anschel/input.rb
anschel-0.6.4 lib/anschel/input.rb
anschel-0.6.3 lib/anschel/input.rb
anschel-0.6.2 lib/anschel/input.rb