Sha256: 25e933c9687bba8fa46397a3b438c14761f5ab4f266613fa6f5a083ba7198845

Contents?: true

Size: 1.38 KB

Versions: 10

Compression:

Stored size: 1.38 KB

Contents

module Larva
  class WorkerPool
    def self.start(processors)
      new(processors).start
    end

    attr_reader :processors, :workers
    def initialize(processors)
      @processors = processors
    end

    def start
      @running = true
      start_workers
      keep_workers_alive if workers.count > 0 
    end
   
    def stop
      logger.info "Request to stop worker pool accepted"
      @running = false
    end

    private
    def start_workers
      logger.info "Starting #{processors.count} threads."
      @workers = processors.map do |topic, processor|
        worker = Thread.new { start_worker(topic, processor) }
        worker[:name] = "Listener for #{topic}"
        worker
      end
      logger.info "#{workers.count} threads started."
    end

    def start_worker(topic, processor)
      Larva::Listener.listen(topic, processor)
    rescue => e
      logger.error "Unexpected listener termination: #{e} #{e.backtrace}"
    end

    def keep_workers_alive
      while @running && workers.all? { |t| t.alive?  }
        logger.info 'All threads are alive.'
        sleep(60) 
      end

      logger.error 'Some threads have died:'
      workers.each do |worker|
        logger.error "#{worker[:name]} was #{worker.alive? ? 'alive' : 'dead'}"
      end
      raise StandardError.new('Some threads have died') if @running
    end

    def logger
      Propono.config.logger
    end
  end
end

Version data entries

10 entries across 10 versions & 1 rubygems

Version Path
larva-1.3.0 lib/larva/worker_pool.rb
larva-1.2.0 lib/larva/worker_pool.rb
larva-1.1.3 lib/larva/worker_pool.rb
larva-1.1.2 lib/larva/worker_pool.rb
larva-1.1.1 lib/larva/worker_pool.rb
larva-1.1.0 lib/larva/worker_pool.rb
larva-1.0.1 lib/larva/worker_pool.rb
larva-1.0.0 lib/larva/worker_pool.rb
larva-0.9.2 lib/larva/worker_pool.rb
larva-0.9.1 lib/larva/worker_pool.rb