Sha256: 8a75d85646fe502c8e8bf905af38f76a6306eb5e90f890146d03da3b1f3f30c2

Contents?: true

Size: 1.6 KB

Versions: 2

Compression:

Stored size: 1.6 KB

Contents

require 'march_hare'

require_relative 'base'


module Anschel
  class Input
    class RabbitMQ < Base
      def initialize output, config, stats, log
        connection_defaults = {}

        exchange_defaults = {
          type: 'x-consistent-hash',
          durable: true
        }

        queue_defaults = {
          exclusive: false,
          auto_delete: false,
          durable: true
        }

        subscription_defaults = {
          block: true,
          ack: true,
          manual_ack: true
        }

        connection = ::MarchHare.connect \
          connection_defaults.merge(config[:connection] || {})

        exchange_name = config[:exchange].delete(:name)

        @threads = config[:queues].map do |queue_name, queue_config|
          Thread.new do
            channel = connection.create_channel

            exchange = channel.exchange exchange_name, \
              exchange_defaults.merge(config[:exchange])

            subscription = subscription_defaults.merge \
              (config[:subscription] || {})

            queue = channel.queue queue_name.to_s, \
              queue_defaults.merge(queue_config)

            log.debug \
              event: 'input-rabbitmq-connecting-queue',
              queue: queue_name

            queue.subscribe(subscription) do |meta, message|
              output << message
              stats.inc 'input'
              channel.ack meta.delivery_tag if subscription[:manual_ack]
            end
          end
        end
      end

      def stop
        return if @stopped
        @threads.map(&:kill)
        @stopped = true
      end
    end
  end
end

Version data entries

2 entries across 2 versions & 1 rubygems

Version Path
anschel-0.7.10 lib/anschel/input/rabbitmq.rb
anschel-0.7.9 lib/anschel/input/rabbitmq.rb