lib/qs/daemon.rb in qs-0.5.0 vs lib/qs/daemon.rb in qs-0.6.0

- old
+ new

@@ -1,61 +1,70 @@ require 'dat-worker-pool' +require 'much-plugin' require 'ns-options' require 'pathname' require 'system_timer' require 'thread' require 'qs' require 'qs/client' require 'qs/daemon_data' -require 'qs/io_pipe' require 'qs/logger' -require 'qs/payload_handler' require 'qs/queue_item' +require 'qs/worker' module Qs module Daemon + include MuchPlugin InvalidError = Class.new(ArgumentError) SIGNAL = '.'.freeze - def self.included(klass) - klass.class_eval do - extend ClassMethods - include InstanceMethods - end + plugin_included do + extend ClassMethods + include InstanceMethods end module InstanceMethods attr_reader :daemon_data, :logger attr_reader :signals_redis_key, :queue_redis_keys - # * Set the size of the client to the max workers + 1. This ensures we - # have 1 connection for fetching work from redis and at least 1 - # connection for each worker to requeue its message when hard-shutdown. + # set the size of the client to the num workers + 1, this ensures we have + # 1 connection for fetching work from redis and at least 1 connection for + # each worker to requeue its message when hard-shutdown def initialize self.class.configuration.validate! Qs.init @daemon_data = DaemonData.new(self.class.configuration.to_hash) - @logger = @daemon_data.logger + @logger = @daemon_data.logger @client = QsClient.new(Qs.redis_config.merge({ :timeout => 1, - :size => self.daemon_data.max_workers + 1 + :size => self.daemon_data.num_workers + 1 })) @queue_redis_keys = self.daemon_data.queue_redis_keys - @work_loop_thread = nil - @worker_pool = nil - @signals_redis_key = "signals:#{@daemon_data.name}-" \ "#{Socket.gethostname}-#{::Process.pid}" - @worker_available_io = IOPipe.new - @signal = Signal.new(:stop) + @worker_available = WorkerAvailable.new + + @worker_pool = DatWorkerPool.new(self.daemon_data.worker_class, { + :num_workers => self.daemon_data.num_workers, + :logger => self.daemon_data.dwp_logger, + :worker_params => self.daemon_data.worker_params.merge({ + :qs_daemon_data => self.daemon_data, + :qs_client => @client, + :qs_worker_available => @worker_available, + :qs_logger => @logger + }) + }) + + @thread = nil + @state = State.new(:stop) rescue InvalidError => exception exception.set_backtrace(caller) raise exception end @@ -70,162 +79,102 @@ def pid_file @daemon_data.pid_file end def running? - !!(@work_loop_thread && @work_loop_thread.alive?) + !!(@thread && @thread.alive?) end - # * Ping redis to check that it can communicate with redis before running, - # this is friendlier than starting and continously erroring because it - # can't dequeue. + # ping to check that it can communicate with redis before running, this is + # friendlier than starting and continously erroring because it can't + # dequeue def start @client.ping - @signal.set :start - @work_loop_thread ||= Thread.new{ work_loop } + @state.set :run + @thread ||= Thread.new{ work_loop } end def stop(wait = false) return unless self.running? - @signal.set :stop - wakeup_work_loop_thread + @state.set :stop + wakeup_thread wait_for_shutdown if wait end def halt(wait = false) return unless self.running? - @signal.set :halt - wakeup_work_loop_thread + @state.set :halt + wakeup_thread wait_for_shutdown if wait end private - def process(queue_item) - Qs::PayloadHandler.new(self.daemon_data, queue_item).run - end - def work_loop - log "Starting work loop", :debug - setup_redis_and_ios - @worker_pool = build_worker_pool - process_inputs while @signal.start? - log "Stopping work loop", :debug + setup + fetch_messages while @state.run? rescue StandardError => exception - @signal.set :stop + @state.set :stop log "Error occurred while running the daemon, exiting", :error log "#{exception.class}: #{exception.message}", :error - log exception.backtrace.join("\n"), :error + (exception.backtrace || []).each{ |l| log(l, :error) } ensure - shutdown_worker_pool - @worker_available_io.teardown - @work_loop_thread = nil - log "Stopped work loop", :debug + teardown end - def setup_redis_and_ios - # clear any signals that are already on the signals redis list + # clear any signals that are already on the signals list in redis + def setup @client.clear(self.signals_redis_key) - @worker_available_io.setup + @worker_pool.start end - def build_worker_pool - wp = DatWorkerPool.new( - self.daemon_data.min_workers, - self.daemon_data.max_workers - ){ |queue_item| process(queue_item) } - - # add internal callbacks - wp.on_worker_error do |worker, exception, queue_item| - handle_worker_exception(exception, queue_item) + # shuffle the queue redis keys to avoid queue starvation, redis will pull + # messages off queues in the order they are passed to the command, by + # shuffling we ensure they are randomly ordered so every queue should get + # a chance; use 0 for the brpop timeout which means block indefinitely; + # rescue runtime errors so the daemon thread doesn't fail if redis is + # temporarily down, sleep for a second to keep the thread from thrashing + # by repeatedly erroring if redis is down + def fetch_messages + if !@worker_pool.worker_available? && @state.run? + @worker_available.wait end - wp.on_worker_sleep{ @worker_available_io.write(SIGNAL) } + return unless @worker_pool.worker_available? && @state.run? - # add any configured callbacks - self.daemon_data.worker_start_procs.each{ |cb| wp.on_worker_start(&cb) } - self.daemon_data.worker_shutdown_procs.each{ |cb| wp.on_worker_shutdown(&cb) } - self.daemon_data.worker_sleep_procs.each{ |cb| wp.on_worker_sleep(&cb) } - self.daemon_data.worker_wakeup_procs.each{ |cb| wp.on_worker_wakeup(&cb) } - - wp.start - wp - end - - # * Shuffle the queue redis keys to avoid queue starvation. Redis will - # pull messages off queues in the order they are passed to the command, - # by shuffling we ensure they are randomly ordered so every queue should - # get a chance. - # * Use 0 for the brpop timeout which means block indefinitely. - # * Rescue runtime errors so the daemon thread doesn't fail if redis is - # temporarily down. Sleep for a second to keep the thread from thrashing - # by repeatedly erroring if redis is down. - def process_inputs - wait_for_available_worker - return unless @worker_pool.worker_available? && @signal.start? - begin args = [self.signals_redis_key, self.queue_redis_keys.shuffle, 0].flatten redis_key, encoded_payload = @client.block_dequeue(*args) if redis_key != @signals_redis_key - @worker_pool.add_work(QueueItem.new(redis_key, encoded_payload)) + @worker_pool.push(QueueItem.new(redis_key, encoded_payload)) end rescue RuntimeError => exception - log "Error dequeueing #{exception.message.inspect}", :error - log exception.backtrace.join("\n"), :error + log "Error occurred while dequeueing", :error + log "#{exception.class}: #{exception.message}", :error + (exception.backtrace || []).each{ |l| log(l, :error) } sleep 1 end end - def wait_for_available_worker - if !@worker_pool.worker_available? && @signal.start? - @worker_available_io.wait - @worker_available_io.read - end - end - - def shutdown_worker_pool - return unless @worker_pool - timeout = @signal.stop? ? self.daemon_data.shutdown_timeout : 0 - if timeout - log "Shutting down, waiting up to #{timeout} seconds for work to finish" - else - log "Shutting down, waiting for work to finish" - end + def teardown + timeout = @state.halt? ? 0 : self.daemon_data.shutdown_timeout @worker_pool.shutdown(timeout) + log "Requeueing #{@worker_pool.work_items.size} message(s)" - @worker_pool.work_items.each do |ri| - @client.prepend(ri.queue_redis_key, ri.encoded_payload) + @worker_pool.work_items.each do |qi| + @client.prepend(qi.queue_redis_key, qi.encoded_payload) end + ensure + @thread = nil end - def wait_for_shutdown - @work_loop_thread.join if @work_loop_thread - end - - def wakeup_work_loop_thread + def wakeup_thread @client.append(self.signals_redis_key, SIGNAL) - @worker_available_io.write(SIGNAL) + @worker_available.signal end - # * This only catches errors that happen outside of running the payload - # handler. The only known use-case for this is dat worker pools - # hard-shutdown errors. - # * If there isn't a queue item (this can happen when an idle worker is - # being forced to exit) then we don't need to do anything. - # * If we never started processing the queue item, its safe to requeue it. - # Otherwise it happened while processing so the payload handler caught - # it or it happened after the payload handler which we don't care about. - def handle_worker_exception(exception, queue_item) - return if queue_item.nil? - if !queue_item.started - log "Worker error, requeueing message because it hasn't started", :error - @client.prepend(queue_item.queue_redis_key, queue_item.encoded_payload) - else - log "Worker error after message was processed, ignoring", :error - end - log "#{exception.class}: #{exception.message}", :error - log exception.backtrace.join("\n"), :error + def wait_for_shutdown + @thread.join if @thread end def log(message, level = :info) self.logger.send(level, "[Qs] #{message}") end @@ -244,39 +193,25 @@ def pid_file(*args) self.configuration.pid_file(*args) end - def min_workers(*args) - self.configuration.min_workers(*args) + def worker_class(new_worker_class = nil) + self.configuration.worker_class = new_worker_class if new_worker_class + self.configuration.worker_class end - def max_workers(*args) - self.configuration.max_workers(*args) + def worker_params(new_worker_params = nil ) + self.configuration.worker_params = new_worker_params if new_worker_params + self.configuration.worker_params end - def workers(*args) - self.min_workers(*args) - self.max_workers(*args) + def num_workers(*args) + self.configuration.num_workers(*args) end + alias :workers :num_workers - def on_worker_start(&block) - self.configuration.worker_start_procs << block - end - - def on_worker_shutdown(&block) - self.configuration.worker_shutdown_procs << block - end - - def on_worker_sleep(&block) - self.configuration.worker_sleep_procs << block - end - - def on_worker_wakeup(&block) - self.configuration.worker_wakeup_procs << block - end - def verbose_logging(*args) self.configuration.verbose_logging(*args) end def logger(*args) @@ -305,48 +240,41 @@ include NsOptions::Proxy option :name, String, :required => true option :pid_file, Pathname - option :min_workers, Integer, :default => 1 - option :max_workers, Integer, :default => 4 + option :num_workers, Integer, :default => 4 option :verbose_logging, :default => true option :logger, :default => proc{ Qs::NullLogger.new } option :shutdown_timeout - attr_accessor :process_label attr_accessor :init_procs, :error_procs + attr_accessor :worker_class, :worker_params attr_accessor :queues - attr_reader :worker_start_procs, :worker_shutdown_procs - attr_reader :worker_sleep_procs, :worker_wakeup_procs def initialize(values = nil) super(values) - @process_label = !(v = ENV['QS_PROCESS_LABEL'].to_s).empty? ? v : self.name @init_procs, @error_procs = [], [] - @worker_start_procs, @worker_shutdown_procs = [], [] - @worker_sleep_procs, @worker_wakeup_procs = [], [] + @worker_class = DefaultWorker + @worker_params = nil @queues = [] - @valid = nil + @valid = nil end def routes @queues.map(&:routes).flatten end def to_hash super.merge({ - :process_label => self.process_label, - :error_procs => self.error_procs, - :worker_start_procs => self.worker_start_procs, - :worker_shutdown_procs => self.worker_shutdown_procs, - :worker_sleep_procs => self.worker_sleep_procs, - :worker_wakeup_procs => self.worker_wakeup_procs, - :routes => self.routes, - :queue_redis_keys => self.queues.map(&:redis_key) + :error_procs => self.error_procs, + :worker_class => self.worker_class, + :worker_params => self.worker_params, + :routes => self.routes, + :queue_redis_keys => self.queues.map(&:redis_key) }) end def valid? !!@valid @@ -356,35 +284,33 @@ return @valid if !@valid.nil? self.init_procs.each(&:call) if self.queues.empty? || !self.required_set? raise InvalidError, "a name and queue must be configured" end + if !self.worker_class.kind_of?(Class) || !self.worker_class.include?(Qs::Worker) + raise InvalidError, "worker class must include `#{Qs::Worker}`" + end self.routes.each(&:validate!) @valid = true end end - class Signal - def initialize(value) - @value = value - @mutex = Mutex.new - end + DefaultWorker = Class.new{ include Qs::Worker } - def set(value) - @mutex.synchronize{ @value = value } + class WorkerAvailable + def initialize + @mutex = Mutex.new + @cond_var = ConditionVariable.new end - def start? - @mutex.synchronize{ @value == :start } - end + def wait; @mutex.synchronize{ @cond_var.wait(@mutex) }; end + def signal; @mutex.synchronize{ @cond_var.signal }; end + end - def stop? - @mutex.synchronize{ @value == :stop } - end - - def halt? - @mutex.synchronize{ @value == :halt } - end + class State < DatWorkerPool::LockedObject + def run?; self.value == :run; end + def stop?; self.value == :stop; end + def halt?; self.value == :halt; end end end end