module Refinery #:nodoc: # A daemon provides a thread to run workers in. class Daemon < Thread include Refinery::Loggable include Refinery::Configurable include Refinery::Utilities include Refinery::Queueable RUNNING = 'running' STOPPED = 'stopped' # The name of the daemon attr_reader :name # The settings for the daemon attr_reader :settings # The base queue name attr_reader :queue_name # Stop the daemon def stop self.state = STOPPED end # Return the daemon state def state @state ||= RUNNING end # Set the daemon state. def state=(state) @state = state end protected :state # Return true if the daemon state is running. def running? state == RUNNING end # Initialize the daemon. # # * processor: The processor instance # * name: The processor name # * waiting_queue: The waiting queue that provides messages to be processed # * error_queue: The queue where errors are posted. # * done_queue: The queue for messages that have been processed. # * settings: The settings hash from the config. # # The settings hash may contain the following options: # * visibility: The time in seconds that the message is hidden # in the queue. def initialize(processor, name, queue_prefix='', settings={}) logger.debug "Starting daemon" @processor = processor @name = name @settings = settings queue_name = settings['queue'] || name queue_name = "#{queue_prefix}#{queue_name}" logger.debug "Using queue #{queue_name}" @queue_name = queue_name super do begin execute rescue Exception => e logger.error e end end end private def execute logger.debug "Running daemon thread: #{name} (settings: #{settings.inspect})" while(running?) #logger.debug "Checking #{queue_name}_waiting" with_queue("#{queue_name}_waiting") do |waiting_queue| while (message = waiting_queue.receive(settings['visibility'])) worker = load_worker_class(name).new(self) begin result, run_time = worker.run(decode_message(message.body)) if result with_queue("#{queue_name}_done") do |done_queue| done_message = { 'host_info' => host_info, 'original' => message.body, 'run_time' => run_time } logger.debug "Sending 'done' message to #{done_queue.name}" done_queue.send_message(encode_message(done_message)) end logger.debug "Deleting message from queue" message.delete() end rescue Exception => e with_queue("#{queue_name}_error") do |error_queue| error_message = { 'error' => { 'message' => e.message, 'class' => e.class.name }, 'host_info' => host_info, 'original' => message.body } logger.error "Sending 'error' message to #{error_queue.name}: #{e.message}" error_queue.send_message(encode_message(error_message)) end message.delete() end end sleep(settings['sleep'] || 5) end end logger.debug "Exiting daemon thread" end # A hash of worker classes def workers @workers ||= {} end private # Load the appropriate worker class def load_worker_class(name) source_file = "#{@processor.server.workers_directory}/#{name}.rb" if File.exist?(source_file) modified_at = File.mtime(source_file) if workers[name] != modified_at logger.debug "Loading #{source_file}" load(source_file) workers[name] = modified_at end else raise SourceFileNotFound, "Source file not found: #{source_file}" end Object.const_get(camelize("#{name}_worker")) end end end