Sha256: 157b732c7bc503d0305ee688beae29628006bf4259c62ecf89f57345e53dfd96

Contents?: true

Size: 937 Bytes

Versions: 6

Compression:

Stored size: 937 Bytes

Contents

require_relative 'core_ext'
require_relative 'processor'
require_relative 'worker'

module Upperkut
  class Manager
    attr_accessor :worker, :redis
    attr_reader :stopped, :logger

    def initialize(opts = {})
      self.worker = opts.fetch(:worker).constantize
      self.redis  = worker.setup.redis
      @concurrency = opts.fetch(:concurrency, 25)
      @logger = opts.fetch(:logger, Upperkut::Logging.logger)

      @stopped = false
      @processors = []
    end

    def run
      @concurrency.times do
        processor = Processor.new(self)
        @processors << processor
        processor.run
      end
    end

    def stop
      @stopped = true
    end

    def kill
      @processors.each(&:kill)
    end

    def notify_killed_processor(processor)
      @processors.delete(processor)
      return if @stopped

      processor = Processor.new(self)
      @processors << processor
      processor.run
    end
  end
end

Version data entries

6 entries across 6 versions & 1 rubygems

Version Path
upperkut-0.5.0 lib/upperkut/manager.rb
upperkut-0.4.6 lib/upperkut/manager.rb
upperkut-0.4.5 lib/upperkut/manager.rb
upperkut-0.4.4 lib/upperkut/manager.rb
upperkut-0.4.3 lib/upperkut/manager.rb
upperkut-0.4.2 lib/upperkut/manager.rb