module Refinery #:nodoc:
# A daemon provides a thread to run workers in.
class Daemon
include Refinery::Loggable
include Refinery::Configurable
include Refinery::Utilities
RUNNING = 'running'
STOPPED = 'stopped'
# The daemon's thread
attr_reader :thread
# The name of the daemon
attr_reader :name
# The queue for incoming messages to process
attr_reader :waiting_queue
# The queue for outgoing messages once they've been processed
attr_reader :done_queue
# The queue for error messages
attr_reader :error_queue
# Stop the daemon
def stop
self.state = STOPPED
end
# Return the daemon state
def state
@state ||= RUNNING
end
# Set the daemon state.
def state=(state)
@state = state
end
protected :state
# Return true if the daemon state is running.
def running?
state == RUNNING
end
# Initialize the daemon.
#
# * server: The server instance
# * name: The processor name
# * waiting_queue: The waiting queue that provides messages to be processed
# * error_queue: The queue where errors are posted.
# * done_queue: The queue for messages that have been processed.
# * settings: The settings hash from the config.
#
# The settings hash may contain the following options:
# * visibility: The time in seconds that the message is hidden
# in the queue.
def initialize(server, name, waiting_queue, error_queue, done_queue, settings={})
Refinery::Server.logger.debug "Starting daemon"
@server = server
@name = name
@waiting_queue = waiting_queue
@error_queue = error_queue
@done_queue = done_queue
@thread = Thread.new(self) do |daemon|
logger.debug "Running daemon thread: #{name} (settings: #{settings.inspect})"
while(running?)
begin
while (message = waiting_queue.receive(settings['visibility']))
worker = load_worker_class(name).new(self)
begin
result, run_time = worker.run(decode_message(message.body))
if result
done_message = {
'host_info' => host_info,
'original' => message.body,
'run_time' => run_time
}
logger.debug "Sending 'done' message to #{done_queue.name}"
done_queue.send_message(encode_message(done_message))
logger.debug "Deleting message from queue"
message.delete()
end
rescue Exception => e
error_message = {
'error' => {
'message' => e.message,
'class' => e.class.name
},
'host_info' => host_info,
'original' => message.body
}
error_queue.send_message(encode_message(error_message))
message.delete()
end
end
sleep(settings['sleep'] || 5)
rescue Exception => e
logger.error "An error occurred while receiving from the waiting queue: #{e.message}"
sleep(30) # try to let things recover a bit
end
end
logger.debug "Exiting daemon thread"
end
end
# A hash of worker classes
def workers
@workers ||= {}
end
private
# Load the appropriate worker class
def load_worker_class(name)
source_file = "#{@server.workers_directory}/#{name}.rb"
if File.exist?(source_file)
modified_at = File.mtime(source_file)
if workers[name] != modified_at
logger.debug "Loading #{source_file}"
load(source_file)
workers[name] = modified_at
end
else
raise SourceFileNotFound, "Source file not found: #{source_file}"
end
Object.const_get(camelize("#{name}_worker"))
end
end
end