lib/qs/daemon.rb in qs-0.1.0 vs lib/qs/daemon.rb in qs-0.2.0
- old
+ new
@@ -4,20 +4,23 @@
require 'system_timer'
require 'thread'
require 'qs'
require 'qs/client'
require 'qs/daemon_data'
+require 'qs/io_pipe'
require 'qs/logger'
require 'qs/payload_handler'
require 'qs/redis_item'
module Qs
module Daemon
InvalidError = Class.new(ArgumentError)
+ SIGNAL = '.'.freeze
+
def self.included(klass)
klass.class_eval do
extend ClassMethods
include InstanceMethods
end
@@ -92,24 +95,25 @@
def process(redis_item)
Qs::PayloadHandler.new(self.daemon_data, redis_item).run
end
def work_loop
- self.logger.debug "Starting work loop..."
+ log "Starting work loop", :debug
setup_redis_and_ios
@worker_pool = build_worker_pool
process_inputs while @signal.start?
- self.logger.debug "Stopping work loop..."
- shutdown_worker_pool
+ log "Stopping work loop", :debug
rescue StandardError => exception
- self.logger.error "Exception occurred, stopping daemon!"
- self.logger.error "#{exception.class}: #{exception.message}"
- self.logger.error exception.backtrace.join("\n")
+ @signal.set :stop
+ log "Error occurred while running the daemon, exiting", :error
+ log "#{exception.class}: #{exception.message}", :error
+ log exception.backtrace.join("\n"), :error
ensure
+ shutdown_worker_pool
@worker_available_io.teardown
@work_loop_thread = nil
- self.logger.debug "Stopped work loop"
+ log "Stopped work loop", :debug
end
def setup_redis_and_ios
# clear any signals that are already on the signals redis list
@client.clear(self.signals_redis_key)
@@ -120,72 +124,96 @@
wp = DatWorkerPool.new(
self.daemon_data.min_workers,
self.daemon_data.max_workers
){ |redis_item| process(redis_item) }
wp.on_worker_error do |worker, exception, redis_item|
- handle_worker_exception(redis_item)
+ handle_worker_exception(exception, redis_item)
end
- wp.on_worker_sleep{ @worker_available_io.signal }
+ wp.on_worker_sleep{ @worker_available_io.write(SIGNAL) }
wp.start
wp
end
# * Shuffle the queue redis keys to avoid queue starvation. Redis will
# pull jobs off queues in the order they are passed to the command, by
# shuffling we ensure they are randomly ordered so every queue should
# get a chance.
# * Use 0 for the brpop timeout which means block indefinitely.
+ # * Rescue runtime errors so the daemon thread doesn't fail if redis is
+ # temporarily down. Sleep for a second to keep the thread from thrashing
+ # by repeatedly erroring if redis is down.
def process_inputs
wait_for_available_worker
return unless @worker_pool.worker_available? && @signal.start?
- args = [self.signals_redis_key, self.queue_redis_keys.shuffle, 0].flatten
- redis_key, serialized_payload = @client.block_dequeue(*args)
- if redis_key != @signals_redis_key
- @worker_pool.add_work(RedisItem.new(redis_key, serialized_payload))
+ begin
+ args = [self.signals_redis_key, self.queue_redis_keys.shuffle, 0].flatten
+ redis_key, serialized_payload = @client.block_dequeue(*args)
+ if redis_key != @signals_redis_key
+ @worker_pool.add_work(RedisItem.new(redis_key, serialized_payload))
+ end
+ rescue RuntimeError => exception
+ log "Error dequeueing #{exception.message.inspect}", :error
+ log exception.backtrace.join("\n"), :error
+ sleep 1
end
end
def wait_for_available_worker
if !@worker_pool.worker_available? && @signal.start?
- @worker_available_io.wait
+ @worker_available_io.wait.read
end
end
def shutdown_worker_pool
- self.logger.debug "Shutting down worker pool"
+ return unless @worker_pool
timeout = @signal.stop? ? self.daemon_data.shutdown_timeout : 0
+ if timeout
+ log "Shutting down, waiting up to #{timeout} seconds for work to finish"
+ else
+ log "Shutting down, waiting for work to finish"
+ end
@worker_pool.shutdown(timeout)
+ log "Requeueing #{@worker_pool.work_items.size} job(s)"
@worker_pool.work_items.each do |ri|
@client.prepend(ri.queue_redis_key, ri.serialized_payload)
end
end
def wait_for_shutdown
@work_loop_thread.join if @work_loop_thread
end
def wakeup_work_loop_thread
- @client.append(self.signals_redis_key, '.')
- @worker_available_io.signal
+ @client.append(self.signals_redis_key, SIGNAL)
+ @worker_available_io.write(SIGNAL)
end
# * This only catches errors that happen outside of running the payload
# handler. The only known use-case for this is dat worker pools
# hard-shutdown errors.
# * If there isn't a redis item (this can happen when an idle worker is
# being forced to exit) then we don't need to do anything.
# * If we never started processing the redis item, its safe to requeue it.
# Otherwise it happened while processing so the payload handler caught
# it or it happened after the payload handler which we don't care about.
- def handle_worker_exception(redis_item)
+ def handle_worker_exception(exception, redis_item)
return if redis_item.nil?
if !redis_item.started
+ log "Worker error, requeueing job because it hasn't started", :error
@client.prepend(redis_item.queue_redis_key, redis_item.serialized_payload)
+ else
+ log "Worker error after job was processed, ignoring", :error
end
+ log "#{exception.class}: #{exception.message}", :error
+ log exception.backtrace.join("\n"), :error
end
+ def log(message, level = :info)
+ self.logger.send(level, "[Qs] #{message}")
+ end
+
end
module ClassMethods
def configuration
@@ -285,41 +313,9 @@
if self.queues.empty? || !self.required_set?
raise InvalidError, "a name and queue must be configured"
end
self.routes.each(&:validate!)
@valid = true
- end
- end
-
- class IOPipe
- NULL = File.open('/dev/null', 'w')
- SIGNAL = '.'.freeze
-
- attr_reader :reader, :writer
-
- def initialize
- @reader = NULL
- @writer = NULL
- end
-
- def wait
- ::IO.select([@reader])
- @reader.read_nonblock(SIGNAL.bytesize)
- end
-
- def signal
- @writer.write_nonblock(SIGNAL)
- end
-
- def setup
- @reader, @writer = ::IO.pipe
- end
-
- def teardown
- @reader.close unless @reader === NULL
- @writer.close unless @writer === NULL
- @reader = NULL
- @writer = NULL
end
end
class Signal
def initialize(value)