lib/qs/daemon.rb in qs-0.1.0 vs lib/qs/daemon.rb in qs-0.2.0

- old
+ new

@@ -4,20 +4,23 @@ require 'system_timer' require 'thread' require 'qs' require 'qs/client' require 'qs/daemon_data' +require 'qs/io_pipe' require 'qs/logger' require 'qs/payload_handler' require 'qs/redis_item' module Qs module Daemon InvalidError = Class.new(ArgumentError) + SIGNAL = '.'.freeze + def self.included(klass) klass.class_eval do extend ClassMethods include InstanceMethods end @@ -92,24 +95,25 @@ def process(redis_item) Qs::PayloadHandler.new(self.daemon_data, redis_item).run end def work_loop - self.logger.debug "Starting work loop..." + log "Starting work loop", :debug setup_redis_and_ios @worker_pool = build_worker_pool process_inputs while @signal.start? - self.logger.debug "Stopping work loop..." - shutdown_worker_pool + log "Stopping work loop", :debug rescue StandardError => exception - self.logger.error "Exception occurred, stopping daemon!" - self.logger.error "#{exception.class}: #{exception.message}" - self.logger.error exception.backtrace.join("\n") + @signal.set :stop + log "Error occurred while running the daemon, exiting", :error + log "#{exception.class}: #{exception.message}", :error + log exception.backtrace.join("\n"), :error ensure + shutdown_worker_pool @worker_available_io.teardown @work_loop_thread = nil - self.logger.debug "Stopped work loop" + log "Stopped work loop", :debug end def setup_redis_and_ios # clear any signals that are already on the signals redis list @client.clear(self.signals_redis_key) @@ -120,72 +124,96 @@ wp = DatWorkerPool.new( self.daemon_data.min_workers, self.daemon_data.max_workers ){ |redis_item| process(redis_item) } wp.on_worker_error do |worker, exception, redis_item| - handle_worker_exception(redis_item) + handle_worker_exception(exception, redis_item) end - wp.on_worker_sleep{ @worker_available_io.signal } + wp.on_worker_sleep{ @worker_available_io.write(SIGNAL) } wp.start wp end # * Shuffle the queue redis keys to avoid queue starvation. Redis will # pull jobs off queues in the order they are passed to the command, by # shuffling we ensure they are randomly ordered so every queue should # get a chance. # * Use 0 for the brpop timeout which means block indefinitely. + # * Rescue runtime errors so the daemon thread doesn't fail if redis is + # temporarily down. Sleep for a second to keep the thread from thrashing + # by repeatedly erroring if redis is down. def process_inputs wait_for_available_worker return unless @worker_pool.worker_available? && @signal.start? - args = [self.signals_redis_key, self.queue_redis_keys.shuffle, 0].flatten - redis_key, serialized_payload = @client.block_dequeue(*args) - if redis_key != @signals_redis_key - @worker_pool.add_work(RedisItem.new(redis_key, serialized_payload)) + begin + args = [self.signals_redis_key, self.queue_redis_keys.shuffle, 0].flatten + redis_key, serialized_payload = @client.block_dequeue(*args) + if redis_key != @signals_redis_key + @worker_pool.add_work(RedisItem.new(redis_key, serialized_payload)) + end + rescue RuntimeError => exception + log "Error dequeueing #{exception.message.inspect}", :error + log exception.backtrace.join("\n"), :error + sleep 1 end end def wait_for_available_worker if !@worker_pool.worker_available? && @signal.start? - @worker_available_io.wait + @worker_available_io.wait.read end end def shutdown_worker_pool - self.logger.debug "Shutting down worker pool" + return unless @worker_pool timeout = @signal.stop? ? self.daemon_data.shutdown_timeout : 0 + if timeout + log "Shutting down, waiting up to #{timeout} seconds for work to finish" + else + log "Shutting down, waiting for work to finish" + end @worker_pool.shutdown(timeout) + log "Requeueing #{@worker_pool.work_items.size} job(s)" @worker_pool.work_items.each do |ri| @client.prepend(ri.queue_redis_key, ri.serialized_payload) end end def wait_for_shutdown @work_loop_thread.join if @work_loop_thread end def wakeup_work_loop_thread - @client.append(self.signals_redis_key, '.') - @worker_available_io.signal + @client.append(self.signals_redis_key, SIGNAL) + @worker_available_io.write(SIGNAL) end # * This only catches errors that happen outside of running the payload # handler. The only known use-case for this is dat worker pools # hard-shutdown errors. # * If there isn't a redis item (this can happen when an idle worker is # being forced to exit) then we don't need to do anything. # * If we never started processing the redis item, its safe to requeue it. # Otherwise it happened while processing so the payload handler caught # it or it happened after the payload handler which we don't care about. - def handle_worker_exception(redis_item) + def handle_worker_exception(exception, redis_item) return if redis_item.nil? if !redis_item.started + log "Worker error, requeueing job because it hasn't started", :error @client.prepend(redis_item.queue_redis_key, redis_item.serialized_payload) + else + log "Worker error after job was processed, ignoring", :error end + log "#{exception.class}: #{exception.message}", :error + log exception.backtrace.join("\n"), :error end + def log(message, level = :info) + self.logger.send(level, "[Qs] #{message}") + end + end module ClassMethods def configuration @@ -285,41 +313,9 @@ if self.queues.empty? || !self.required_set? raise InvalidError, "a name and queue must be configured" end self.routes.each(&:validate!) @valid = true - end - end - - class IOPipe - NULL = File.open('/dev/null', 'w') - SIGNAL = '.'.freeze - - attr_reader :reader, :writer - - def initialize - @reader = NULL - @writer = NULL - end - - def wait - ::IO.select([@reader]) - @reader.read_nonblock(SIGNAL.bytesize) - end - - def signal - @writer.write_nonblock(SIGNAL) - end - - def setup - @reader, @writer = ::IO.pipe - end - - def teardown - @reader.close unless @reader === NULL - @writer.close unless @writer === NULL - @reader = NULL - @writer = NULL end end class Signal def initialize(value)