Sha256: 30d1cb800d0d3157d99aecf859e04a441815810335ae9ceeaf5414d0e819ed27

Contents?: true

Size: 1.23 KB

Versions: 2

Compression:

Stored size: 1.23 KB

Contents

module Refinery #:nodoc:
  # This class is used to monitor all of the threads for a single
  # processor.
  class Processor < Thread
    include Refinery::Configurable
    include Refinery::Loggable
    
    attr_reader :server
    attr_reader :key
    attr_reader :settings
    attr_reader :daemons
    
    # Initialize the processor.
    def initialize(server, key, settings={})
      @server = server
      @key = key
      @settings = settings
      @daemons = []
      super do
        execute
      end
    end
    
    private
    # Execute the processor
    def execute
      queue_prefix = config['prefix'] || ''
      
      logger.debug "Creating daemons for #{key}"
      1.upto(settings['workers']['initial']) do
        daemons << Daemon.new(self, key, queue_prefix, settings)
      end
      
      logger.debug "Running #{daemons.length} daemons"
      
      wait = ThreadsWait.new(*daemons)
      wait.all_waits do |daemon|
        puts "a #{daemon.name} just died"
        daemons.remove(daemon)
        puts "starting a new #{key} daemon"
        daemon = Daemon.new(self, key, queue_prefix, settings)
        daemons << daemon
        wait.join(daemon)
      end
      
      logger.debug "Processor #{key} is exiting"
    end
  end
end

Version data entries

2 entries across 2 versions & 2 rubygems

Version Path
aeden-refinery-0.10.0 lib/refinery/processor.rb
refinery-0.10.0 lib/refinery/processor.rb