Sha256: d8026dd46aa946814e61096e2126a0e2b3f899204088e823b055efd303eeca35
Contents?: true
Size: 1.04 KB
Versions: 1
Compression:
Stored size: 1.04 KB
Contents
module Rabbithole class Worker attr_reader :number_of_threads def initialize(number_of_threads = 1) @number_of_threads = number_of_threads @channel = Connection.create_channel(number_of_threads) @channel.prefetch(number_of_threads * 5) end def listen_to_queue(queue_name) queue = Connection.queue(queue_name, @channel) start_consumer(queue) end def stop_listening @channel.consumers.values.each(&:cancel) end def join @channel.work_pool.join end private def start_consumer(queue) queue.subscribe(:ack => true, :block => false) do |delivery_info, properties, payload| data = MessagePack.unpack(payload) begin Object.const_get(data['klass']).perform(*data['args']) @channel.acknowledge(delivery_info.delivery_tag, false) rescue => e @channel.reject(delivery_info.delivery_tag, !delivery_info.redelivered) ErrorHandler.handle(e, queue.name, payload) end end end end end
Version data entries
1 entries across 1 versions & 1 rubygems
Version | Path |
---|---|
rabbithole-0.0.3 | lib/rabbithole/worker.rb |