Sha256: 1fee815850ee4066c2a6fc26b560891e98eccb856d0c04477a95eaa3cad2a7e9

Contents?: true

Size: 1.34 KB

Versions: 9

Compression:

Stored size: 1.34 KB

Contents

module Refinery #:nodoc:
  # This class is used to monitor all of the threads for a single
  # processor.
  class Processor < Thread
    include Refinery::Configurable
    include Refinery::Loggable
    
    attr_reader :server
    attr_reader :key
    attr_reader :settings
    attr_reader :daemons
    
    # Initialize the processor.
    def initialize(server, key, settings={})
      @server = server
      @key = key
      @settings = settings
      @daemons = []
      super do
        execute
      end
    end
    
    private
    # Execute the processor
    def execute
      queue_prefix = config['prefix'] || ''
      
      logger.debug "Creating daemons for #{key}"
      1.upto(settings['workers']['initial']) do
        daemons << Daemon.new(self, key, queue_prefix, settings)
      end
      
      logger.debug "Running #{daemons.length} daemons"
      
      wait = ThreadsWait.new(*daemons)
      wait.all_waits do |daemon|
        logger.debug "a #{daemon.name} just died"
        daemons.delete(daemon)
        logger.debug "waiting for 60 seconds before starting a new #{key} daemon"
        sleep(60)
        logger.debug "starting a new #{key} daemon"
        daemon = Daemon.new(self, key, queue_prefix, settings)
        daemons << daemon
        wait.join(daemon)
      end
      
      logger.debug "Processor #{key} is exiting"
    end
  end
end

Version data entries

9 entries across 9 versions & 2 rubygems

Version Path
aeden-refinery-0.10.2 lib/refinery/processor.rb
aeden-refinery-0.10.3 lib/refinery/processor.rb
aeden-refinery-0.10.5 lib/refinery/processor.rb
aeden-refinery-0.10.6 lib/refinery/processor.rb
refinery-0.10.2 lib/refinery/processor.rb
refinery-0.10.3 lib/refinery/processor.rb
refinery-0.10.4 lib/refinery/processor.rb
refinery-0.10.5 lib/refinery/processor.rb
refinery-0.10.6 lib/refinery/processor.rb