Sha256: 88e63ac1bd1679f3469d1c9f90ffde44f352e5419f7eabef16d5783a72781672

Contents?: true

Size: 1.07 KB

Versions: 1

Compression:

Stored size: 1.07 KB

Contents

module BunnyCarrot
  class Consumer
    include BunnyCarrot::Logger

    def self.subscribe(queue_name, worker, pool_size: 1, block: false)
      root_supervisor          = Concurrent::Supervisor.new
      queue_supervisor         = Concurrent::Supervisor.new(restart_strategy: :one_for_all)
      business_pool_supervisor = Concurrent::Supervisor.new
      root_supervisor.add_worker(queue_supervisor)
      root_supervisor.add_worker(business_pool_supervisor)

      business_actor, business_pool = BusinessActor.pool(pool_size)
      business_pool.each do |actor|
        actor.add_observer(BusinessActorObserver)
        business_pool_supervisor.add_worker(actor)
      end

      consumer_actor = ConsumerActor.new(business_actor, pool_size)
      queue_supervisor.add_worker(consumer_actor)

      root_supervisor.run!
      until root_supervisor.running? && queue_supervisor.running? && business_pool_supervisor.running?
        sleep(0.1)
      end

      logger.info 'Start consuming...'

      consumer_actor.post(queue_name, worker)
      loop { sleep(1) } if block
    end
  end
end

Version data entries

1 entries across 1 versions & 1 rubygems

Version Path
bunny_carrot-0.0.2 lib/bunny_carrot/consumer.rb