module Refinery #:nodoc: # A daemon provides a thread to run workers in. class Daemon include Refinery::Loggable include Refinery::Configurable include Refinery::Utilities RUNNING = 'running' STOPPED = 'stopped' # The daemon's thread attr_reader :thread # The name of the daemon attr_reader :name # The queue for incoming messages to process attr_reader :waiting_queue # The queue for outgoing messages once they've been processed attr_reader :done_queue # The queue for error messages attr_reader :error_queue # Stop the daemon def stop self.state = STOPPED end # Return the daemon state def state @state ||= RUNNING end # Set the daemon state. def state=(state) @state = state end protected :state # Return true if the daemon state is running. def running? state == RUNNING end # Initialize the daemon. # # * server: The server instance # * name: The processor name # * waiting_queue: The waiting queue that provides messages to be processed # * error_queue: The queue where errors are posted. # * done_queue: The queue for messages that have been processed. # * settings: The settings hash from the config. # # The settings hash may contain the following options: # * visibility: The time in seconds that the message is hidden # in the queue. def initialize(server, name, waiting_queue, error_queue, done_queue, settings={}) Refinery::Server.logger.debug "Starting daemon" @server = server @name = name @waiting_queue = waiting_queue @error_queue = error_queue @done_queue = done_queue @thread = Thread.new(self) do |daemon| logger.debug "Running daemon thread: #{name} (settings: #{settings.inspect})" while(running?) begin while (message = waiting_queue.receive(settings['visibility'])) worker = load_worker_class(name).new(self) begin result, run_time = worker.run(decode_message(message.body)) if result done_message = { 'host_info' => host_info, 'original' => message.body, 'run_time' => run_time } logger.debug "Sending 'done' message to #{done_queue.name}" done_queue.send_message(encode_message(done_message)) logger.debug "Deleting message from queue" message.delete() end rescue Exception => e error_message = { 'error' => { 'message' => e.message, 'class' => e.class.name }, 'host_info' => host_info, 'original' => message.body } logger.error "Sending 'error' message to #{error_queue.name}: #{e.message}" error_queue.send_message(encode_message(error_message)) message.delete() end end sleep(settings['sleep'] || 5) rescue Exception => e logger.error "An error occurred while receiving from the waiting queue: #{e.message}" # delay to try to get past the issue with the queue sleep(30) # assign a new queue instance @waiting_queue = queue(waiting_queue.name) end end logger.debug "Exiting daemon thread" end end # A hash of worker classes def workers @workers ||= {} end private # Load the appropriate worker class def load_worker_class(name) source_file = "#{@server.workers_directory}/#{name}.rb" if File.exist?(source_file) modified_at = File.mtime(source_file) if workers[name] != modified_at logger.debug "Loading #{source_file}" load(source_file) workers[name] = modified_at end else raise SourceFileNotFound, "Source file not found: #{source_file}" end Object.const_get(camelize("#{name}_worker")) end end end