Sha256: 0277a3d437ae4d6d4ba1e5e7b50bfa5b45ec6805e5a5c95c13694f9c7506018f

Contents?: true

Size: 1.39 KB

Versions: 7

Compression:

Stored size: 1.39 KB

Contents

require_relative 'input/kafka'
require_relative 'input/rabbitmq'


module Anschel
  class Input
    def initialize config, qsize, stats, log, leftovers=[]
      log.info event: 'output-loading'
      log.info event: 'output-config', config: config, qsize: qsize

      @queue = SizedQueue.new(qsize || 2000)

      Thread.new do
        leftovers ||= []
        log.warn event: 'input-leftovers', leftovers_size: leftovers.size
        leftovers.each { |l| @queue << l }
      end

      @inputs = []

      stats.create 'input'
      stats.get 'input'

      config.each do |input|
        case input.delete(:kind)
        when 'kafka'
          @inputs << Input::Kafka.new(@queue, input, stats, log)
          log.info event: 'input-loaded', kind: 'kafka'
        when 'rabbitmq'
          @inputs << Input::RabbitMQ.new(@queue, input, stats, log)
          log.info event: 'input-loaded', kind: 'rabbitmq'
        else
          raise 'Uknown input type'
        end
      end

      log.info event: 'input-fully-loaded'
    end


    def stop
      return @leftovers if defined? @leftovers
      @inputs.map &:stop
      @leftovers = []
      @leftovers << shift until @queue.empty?
    end


    def leftovers
      return @leftovers if defined? @leftovers
    end


    def shift
      event = @queue.shift
      case event
      when String
        event
      else
        event.message.to_s
      end
    end
  end
end

Version data entries

7 entries across 7 versions & 1 rubygems

Version Path
anschel-0.7.19 lib/anschel/input.rb
anschel-0.7.18 lib/anschel/input.rb
anschel-0.7.17 lib/anschel/input.rb
anschel-0.7.16 lib/anschel/input.rb
anschel-0.7.15 lib/anschel/input.rb
anschel-0.7.14 lib/anschel/input.rb
anschel-0.7.13 lib/anschel/input.rb