Sha256: f793efd3f532fd086ffbc4f2d98ded971f36e231593b4854167295d975b273d9

Contents?: true

Size: 911 Bytes

Versions: 3

Compression:

Stored size: 911 Bytes

Contents

require_relative 'core_ext'
require_relative 'processor'

module Upperkut
  class Manager
    attr_accessor :worker, :redis
    attr_reader :stopped, :logger

    def initialize(opts = {})
      self.worker = opts.fetch(:worker).constantize
      self.redis  = worker.setup.redis
      @concurrency = opts.fetch(:concurrency, 25)
      @logger = opts.fetch(:logger, Upperkut::Logging.logger)

      @stopped = false
      @processors = []
    end

    def run
      @concurrency.times do
        processor = Processor.new(self)
        @processors << processor
        processor.run
      end
    end

    def stop
      @stopped = true
    end

    def kill
      @processors.each(&:kill)
    end

    def notify_killed_processor(processor)
      @processors.delete(processor)
      return if @stopped

      processor = Processor.new(self)
      @processors << processor
      processor.run
    end
  end
end

Version data entries

3 entries across 3 versions & 1 rubygems

Version Path
upperkut-0.4.1 lib/upperkut/manager.rb
upperkut-0.4.0 lib/upperkut/manager.rb
upperkut-0.3.0 lib/upperkut/manager.rb