Sha256: b9836489a59f5bfa832ba872b429192852a82f521f413e7fe410e071ea65ab9c
Contents?: true
Size: 1.38 KB
Versions: 4
Compression:
Stored size: 1.38 KB
Contents
require 'bunny' require 'ears/consumer' module Ears class Setup def exchange(name, type, opts = {}) Bunny::Exchange.new(Ears.channel, type, name, opts) end def queue(name, opts = {}) Bunny::Queue.new(Ears.channel, name, opts) end def consumer(queue, consumer_class, threads = 1, args = {}) threads.times do |n| consumer_queue = create_consumer_queue(queue, args) create_consumer(consumer_queue, consumer_class, args, n + 1) .tap do |consumer| consumer.on_delivery do |delivery_info, metadata, payload| consumer.process_delivery(delivery_info, metadata, payload) end consumer_queue.subscribe_with(consumer) end end end private def create_consumer(queue, consumer_class, args, number) consumer_class.new( queue.channel, queue, "#{consumer_class.name}-#{number}", false, false, args, ) end def create_consumer_channel(args) Ears .connection .create_channel(nil, 1, true) .tap do |channel| channel.prefetch(args.fetch(:prefetch, 1)) channel.on_uncaught_exception { |error| Thread.main.raise(error) } end end def create_consumer_queue(queue, args) Bunny::Queue.new(create_consumer_channel(args), queue.name, queue.options) end end end
Version data entries
4 entries across 4 versions & 1 rubygems
Version | Path |
---|---|
ears-0.3.0 | lib/ears/setup.rb |
ears-0.2.1 | lib/ears/setup.rb |
ears-0.2.0 | lib/ears/setup.rb |
ears-0.1.0 | lib/ears/setup.rb |