Sha256: f47a969f1b32106da8011725710785cd48a76df0a7a1625fa0eb536e82812376

Contents?: true

Size: 1018 Bytes

Versions: 3

Compression:

Stored size: 1018 Bytes

Contents

module Larva
  class WorkerPool
    def self.start(processors, queue_suffix)
      new(processors, queue_suffix).start
    end

    attr_reader :processors, :queue_suffix, :workers
    def initialize(processors, queue_suffix)
      @processors = processors
      @queue_suffix = queue_suffix
    end

    def start
      start_workers
      keep_workers_alive if workers.count > 0
    end

    private
    def start_workers
      logger.info "Starting threads."
      @workers = processors.map do |topic, processor|
        Thread.new { start_worker(topic, processor) }
      end
      logger.info "Threads Started."
    end

    def start_worker(topic, processor)
      Larva::Listener.listen(topic, processor, queue_suffix)
    rescue => e
      logger.error "Unexpected listener termination: #{e} #{e.backtrace}"
    end

    def keep_workers_alive
      sleep(1) while workers.all? { |t| t.alive?  }
      logger.error "Some threads have died"
    end

    def logger
      Propono.config.logger
    end
  end
end

Version data entries

3 entries across 3 versions & 1 rubygems

Version Path
larva-0.4.1 lib/larva/worker_pool.rb
larva-0.4.0 lib/larva/worker_pool.rb
larva-0.3.0 lib/larva/worker_pool.rb