Sha256: 2046ef44d83524918a82001e1719fff93ea1f372aaf85313f857178807021f8e

Contents?: true

Size: 1.57 KB

Versions: 5

Compression:

Stored size: 1.57 KB

Contents

require 'march_hare'

require_relative 'base'


module Anschel
  class Input
    class RabbitMQ < Base
      def initialize output, config, stats, log
        connection_defaults = {}

        exchange_defaults = {
          type: 'x-consistent-hash',
          durable: true
        }

        queue_defaults = {
          exclusive: false,
          auto_delete: false,
          durable: true
        }

        subscription_defaults = {
          block: true,
          ack: true
        }

        connection = ::MarchHare.connect \
          connection_defaults.merge(config[:connection] || {})

        exchange_name = config[:exchange].delete(:name)

        @threads = config[:queues].map do |queue_name, queue_config|
          Thread.new do
            channel = connection.create_channel

            exchange = channel.exchange exchange_name, \
              exchange_defaults.merge(config[:exchange])

            subscription = subscription_defaults.merge \
              (config[:subscription] || {})

            queue = channel.queue queue_name.to_s, \
              queue_defaults.merge(queue_config)

            log.debug \
              event: 'input-rabbitmq-connecting-queue',
              queue: queue_name

            queue.subscribe(subscription) do |meta, message|
              output << message
              stats.inc 'input'
              channel.ack meta.delivery_tag, false if subscription[:ack]
            end
          end
        end
      end

      def stop
        return if @stopped
        @threads.map(&:kill)
        @stopped = true
      end
    end
  end
end

Version data entries

5 entries across 5 versions & 1 rubygems

Version Path
anschel-0.7.4 lib/anschel/input/rabbitmq.rb
anschel-0.7.3 lib/anschel/input/rabbitmq.rb
anschel-0.7.2 lib/anschel/input/rabbitmq.rb
anschel-0.7.1 lib/anschel/input/rabbitmq.rb
anschel-0.7.0 lib/anschel/input/rabbitmq.rb