Sha256: 224885f026005244d96e4517c80e559c9a3bc7d8b21b72dc4c43d0ad2ca4faa2
Contents?: true
Size: 1.02 KB
Versions: 3
Compression:
Stored size: 1.02 KB
Contents
require 'march_hare' require_relative 'base' module Anschel class Input class RabbitMQ < Base def initialize queue, config, stats, log default_exchange = { type: 'x-consistent-hash', durable: true } default_qconf = { exclusive: false, auto_delete: false, durable: true } exchange_name = config[:exchange].delete(:name) @threads = config[:queues].map do |qname, qconf| Thread.new do conn = ::MarchHare.connect config[:connection] chan = conn.create_channel exchange = chan.exchange exchange_name, \ default_exchange.merge(config[:exchange]) rmq = chan.queue qname.to_s, default_qconf.merge(qconf) rmq.subscribe(block: true) do |_, message| queue << message end end end end def stop return if @stopped @threads.map &:kill @stopped = true end end end end
Version data entries
3 entries across 3 versions & 1 rubygems
Version | Path |
---|---|
anschel-0.6.4 | lib/anschel/input/rabbitmq.rb |
anschel-0.6.3 | lib/anschel/input/rabbitmq.rb |
anschel-0.6.2 | lib/anschel/input/rabbitmq.rb |