Sha256: ba8e0e66d6c26880e65aa46c5645f2ece1a5927091b390368316ec36bcf8d72f
Contents?: true
Size: 1.15 KB
Versions: 1
Compression:
Stored size: 1.15 KB
Contents
module BunnyCarrot class ConsumerActor < LoggingActor def initialize(business_actor, pool_size) super() @business_actor = business_actor @pool_size = pool_size logger.info 'Consumer actor initialized' end def act(queue_name, worker) logger.info "Consumer is acting..." queue = channel.queue(queue_name, durable: true) queue.subscribe(block: true, ack: true) do |delivery_info, properties, payload| acknowledge_proc = lambda { channel.ack(delivery_info.delivery_tag) } message_hash = Hamster.hash({ queue_name: queue_name, payload: JSON.parse(payload), message_headers: Hamster.hash(properties.headers || {}), acknowledge_proc: acknowledge_proc, worker: worker }) @business_actor.post(message_hash) end end private def channel @channel ||= RabbitHole.get_subscribe_channel.tap { |c| c.prefetch(@pool_size); logger.info 'Ready to consume!' } end end end
Version data entries
1 entries across 1 versions & 1 rubygems
Version | Path |
---|---|
bunny_carrot-0.0.2 | lib/bunny_carrot/consumer_actor.rb |