Sha256: 88e63ac1bd1679f3469d1c9f90ffde44f352e5419f7eabef16d5783a72781672
Contents?: true
Size: 1.07 KB
Versions: 1
Compression:
Stored size: 1.07 KB
Contents
module BunnyCarrot class Consumer include BunnyCarrot::Logger def self.subscribe(queue_name, worker, pool_size: 1, block: false) root_supervisor = Concurrent::Supervisor.new queue_supervisor = Concurrent::Supervisor.new(restart_strategy: :one_for_all) business_pool_supervisor = Concurrent::Supervisor.new root_supervisor.add_worker(queue_supervisor) root_supervisor.add_worker(business_pool_supervisor) business_actor, business_pool = BusinessActor.pool(pool_size) business_pool.each do |actor| actor.add_observer(BusinessActorObserver) business_pool_supervisor.add_worker(actor) end consumer_actor = ConsumerActor.new(business_actor, pool_size) queue_supervisor.add_worker(consumer_actor) root_supervisor.run! until root_supervisor.running? && queue_supervisor.running? && business_pool_supervisor.running? sleep(0.1) end logger.info 'Start consuming...' consumer_actor.post(queue_name, worker) loop { sleep(1) } if block end end end
Version data entries
1 entries across 1 versions & 1 rubygems
Version | Path |
---|---|
bunny_carrot-0.0.2 | lib/bunny_carrot/consumer.rb |