Sha256: 224885f026005244d96e4517c80e559c9a3bc7d8b21b72dc4c43d0ad2ca4faa2

Contents?: true

Size: 1.02 KB

Versions: 3

Compression:

Stored size: 1.02 KB

Contents

require 'march_hare'

require_relative 'base'


module Anschel
  class Input
    class RabbitMQ < Base
      def initialize queue, config, stats, log
        default_exchange = {
          type: 'x-consistent-hash',
          durable: true
        }

        default_qconf = {
          exclusive: false,
          auto_delete: false,
          durable: true
        }

        exchange_name = config[:exchange].delete(:name)

        @threads = config[:queues].map do |qname, qconf|
          Thread.new do
            conn = ::MarchHare.connect config[:connection]
            chan = conn.create_channel

            exchange = chan.exchange exchange_name, \
              default_exchange.merge(config[:exchange])

            rmq = chan.queue qname.to_s, default_qconf.merge(qconf)
            rmq.subscribe(block: true) do |_, message|
              queue << message
            end
          end
        end
      end

      def stop
        return if @stopped
        @threads.map &:kill
        @stopped = true
      end
    end
  end
end

Version data entries

3 entries across 3 versions & 1 rubygems

Version Path
anschel-0.6.4 lib/anschel/input/rabbitmq.rb
anschel-0.6.3 lib/anschel/input/rabbitmq.rb
anschel-0.6.2 lib/anschel/input/rabbitmq.rb