lib/qs/daemon.rb in qs-0.5.0 vs lib/qs/daemon.rb in qs-0.6.0
- old
+ new
@@ -1,61 +1,70 @@
require 'dat-worker-pool'
+require 'much-plugin'
require 'ns-options'
require 'pathname'
require 'system_timer'
require 'thread'
require 'qs'
require 'qs/client'
require 'qs/daemon_data'
-require 'qs/io_pipe'
require 'qs/logger'
-require 'qs/payload_handler'
require 'qs/queue_item'
+require 'qs/worker'
module Qs
module Daemon
+ include MuchPlugin
InvalidError = Class.new(ArgumentError)
SIGNAL = '.'.freeze
- def self.included(klass)
- klass.class_eval do
- extend ClassMethods
- include InstanceMethods
- end
+ plugin_included do
+ extend ClassMethods
+ include InstanceMethods
end
module InstanceMethods
attr_reader :daemon_data, :logger
attr_reader :signals_redis_key, :queue_redis_keys
- # * Set the size of the client to the max workers + 1. This ensures we
- # have 1 connection for fetching work from redis and at least 1
- # connection for each worker to requeue its message when hard-shutdown.
+ # set the size of the client to the num workers + 1, this ensures we have
+ # 1 connection for fetching work from redis and at least 1 connection for
+ # each worker to requeue its message when hard-shutdown
def initialize
self.class.configuration.validate!
Qs.init
@daemon_data = DaemonData.new(self.class.configuration.to_hash)
- @logger = @daemon_data.logger
+ @logger = @daemon_data.logger
@client = QsClient.new(Qs.redis_config.merge({
:timeout => 1,
- :size => self.daemon_data.max_workers + 1
+ :size => self.daemon_data.num_workers + 1
}))
@queue_redis_keys = self.daemon_data.queue_redis_keys
- @work_loop_thread = nil
- @worker_pool = nil
-
@signals_redis_key = "signals:#{@daemon_data.name}-" \
"#{Socket.gethostname}-#{::Process.pid}"
- @worker_available_io = IOPipe.new
- @signal = Signal.new(:stop)
+ @worker_available = WorkerAvailable.new
+
+ @worker_pool = DatWorkerPool.new(self.daemon_data.worker_class, {
+ :num_workers => self.daemon_data.num_workers,
+ :logger => self.daemon_data.dwp_logger,
+ :worker_params => self.daemon_data.worker_params.merge({
+ :qs_daemon_data => self.daemon_data,
+ :qs_client => @client,
+ :qs_worker_available => @worker_available,
+ :qs_logger => @logger
+ })
+ })
+
+ @thread = nil
+ @state = State.new(:stop)
rescue InvalidError => exception
exception.set_backtrace(caller)
raise exception
end
@@ -70,162 +79,102 @@
def pid_file
@daemon_data.pid_file
end
def running?
- !!(@work_loop_thread && @work_loop_thread.alive?)
+ !!(@thread && @thread.alive?)
end
- # * Ping redis to check that it can communicate with redis before running,
- # this is friendlier than starting and continously erroring because it
- # can't dequeue.
+ # ping to check that it can communicate with redis before running, this is
+ # friendlier than starting and continously erroring because it can't
+ # dequeue
def start
@client.ping
- @signal.set :start
- @work_loop_thread ||= Thread.new{ work_loop }
+ @state.set :run
+ @thread ||= Thread.new{ work_loop }
end
def stop(wait = false)
return unless self.running?
- @signal.set :stop
- wakeup_work_loop_thread
+ @state.set :stop
+ wakeup_thread
wait_for_shutdown if wait
end
def halt(wait = false)
return unless self.running?
- @signal.set :halt
- wakeup_work_loop_thread
+ @state.set :halt
+ wakeup_thread
wait_for_shutdown if wait
end
private
- def process(queue_item)
- Qs::PayloadHandler.new(self.daemon_data, queue_item).run
- end
-
def work_loop
- log "Starting work loop", :debug
- setup_redis_and_ios
- @worker_pool = build_worker_pool
- process_inputs while @signal.start?
- log "Stopping work loop", :debug
+ setup
+ fetch_messages while @state.run?
rescue StandardError => exception
- @signal.set :stop
+ @state.set :stop
log "Error occurred while running the daemon, exiting", :error
log "#{exception.class}: #{exception.message}", :error
- log exception.backtrace.join("\n"), :error
+ (exception.backtrace || []).each{ |l| log(l, :error) }
ensure
- shutdown_worker_pool
- @worker_available_io.teardown
- @work_loop_thread = nil
- log "Stopped work loop", :debug
+ teardown
end
- def setup_redis_and_ios
- # clear any signals that are already on the signals redis list
+ # clear any signals that are already on the signals list in redis
+ def setup
@client.clear(self.signals_redis_key)
- @worker_available_io.setup
+ @worker_pool.start
end
- def build_worker_pool
- wp = DatWorkerPool.new(
- self.daemon_data.min_workers,
- self.daemon_data.max_workers
- ){ |queue_item| process(queue_item) }
-
- # add internal callbacks
- wp.on_worker_error do |worker, exception, queue_item|
- handle_worker_exception(exception, queue_item)
+ # shuffle the queue redis keys to avoid queue starvation, redis will pull
+ # messages off queues in the order they are passed to the command, by
+ # shuffling we ensure they are randomly ordered so every queue should get
+ # a chance; use 0 for the brpop timeout which means block indefinitely;
+ # rescue runtime errors so the daemon thread doesn't fail if redis is
+ # temporarily down, sleep for a second to keep the thread from thrashing
+ # by repeatedly erroring if redis is down
+ def fetch_messages
+ if !@worker_pool.worker_available? && @state.run?
+ @worker_available.wait
end
- wp.on_worker_sleep{ @worker_available_io.write(SIGNAL) }
+ return unless @worker_pool.worker_available? && @state.run?
- # add any configured callbacks
- self.daemon_data.worker_start_procs.each{ |cb| wp.on_worker_start(&cb) }
- self.daemon_data.worker_shutdown_procs.each{ |cb| wp.on_worker_shutdown(&cb) }
- self.daemon_data.worker_sleep_procs.each{ |cb| wp.on_worker_sleep(&cb) }
- self.daemon_data.worker_wakeup_procs.each{ |cb| wp.on_worker_wakeup(&cb) }
-
- wp.start
- wp
- end
-
- # * Shuffle the queue redis keys to avoid queue starvation. Redis will
- # pull messages off queues in the order they are passed to the command,
- # by shuffling we ensure they are randomly ordered so every queue should
- # get a chance.
- # * Use 0 for the brpop timeout which means block indefinitely.
- # * Rescue runtime errors so the daemon thread doesn't fail if redis is
- # temporarily down. Sleep for a second to keep the thread from thrashing
- # by repeatedly erroring if redis is down.
- def process_inputs
- wait_for_available_worker
- return unless @worker_pool.worker_available? && @signal.start?
-
begin
args = [self.signals_redis_key, self.queue_redis_keys.shuffle, 0].flatten
redis_key, encoded_payload = @client.block_dequeue(*args)
if redis_key != @signals_redis_key
- @worker_pool.add_work(QueueItem.new(redis_key, encoded_payload))
+ @worker_pool.push(QueueItem.new(redis_key, encoded_payload))
end
rescue RuntimeError => exception
- log "Error dequeueing #{exception.message.inspect}", :error
- log exception.backtrace.join("\n"), :error
+ log "Error occurred while dequeueing", :error
+ log "#{exception.class}: #{exception.message}", :error
+ (exception.backtrace || []).each{ |l| log(l, :error) }
sleep 1
end
end
- def wait_for_available_worker
- if !@worker_pool.worker_available? && @signal.start?
- @worker_available_io.wait
- @worker_available_io.read
- end
- end
-
- def shutdown_worker_pool
- return unless @worker_pool
- timeout = @signal.stop? ? self.daemon_data.shutdown_timeout : 0
- if timeout
- log "Shutting down, waiting up to #{timeout} seconds for work to finish"
- else
- log "Shutting down, waiting for work to finish"
- end
+ def teardown
+ timeout = @state.halt? ? 0 : self.daemon_data.shutdown_timeout
@worker_pool.shutdown(timeout)
+
log "Requeueing #{@worker_pool.work_items.size} message(s)"
- @worker_pool.work_items.each do |ri|
- @client.prepend(ri.queue_redis_key, ri.encoded_payload)
+ @worker_pool.work_items.each do |qi|
+ @client.prepend(qi.queue_redis_key, qi.encoded_payload)
end
+ ensure
+ @thread = nil
end
- def wait_for_shutdown
- @work_loop_thread.join if @work_loop_thread
- end
-
- def wakeup_work_loop_thread
+ def wakeup_thread
@client.append(self.signals_redis_key, SIGNAL)
- @worker_available_io.write(SIGNAL)
+ @worker_available.signal
end
- # * This only catches errors that happen outside of running the payload
- # handler. The only known use-case for this is dat worker pools
- # hard-shutdown errors.
- # * If there isn't a queue item (this can happen when an idle worker is
- # being forced to exit) then we don't need to do anything.
- # * If we never started processing the queue item, its safe to requeue it.
- # Otherwise it happened while processing so the payload handler caught
- # it or it happened after the payload handler which we don't care about.
- def handle_worker_exception(exception, queue_item)
- return if queue_item.nil?
- if !queue_item.started
- log "Worker error, requeueing message because it hasn't started", :error
- @client.prepend(queue_item.queue_redis_key, queue_item.encoded_payload)
- else
- log "Worker error after message was processed, ignoring", :error
- end
- log "#{exception.class}: #{exception.message}", :error
- log exception.backtrace.join("\n"), :error
+ def wait_for_shutdown
+ @thread.join if @thread
end
def log(message, level = :info)
self.logger.send(level, "[Qs] #{message}")
end
@@ -244,39 +193,25 @@
def pid_file(*args)
self.configuration.pid_file(*args)
end
- def min_workers(*args)
- self.configuration.min_workers(*args)
+ def worker_class(new_worker_class = nil)
+ self.configuration.worker_class = new_worker_class if new_worker_class
+ self.configuration.worker_class
end
- def max_workers(*args)
- self.configuration.max_workers(*args)
+ def worker_params(new_worker_params = nil )
+ self.configuration.worker_params = new_worker_params if new_worker_params
+ self.configuration.worker_params
end
- def workers(*args)
- self.min_workers(*args)
- self.max_workers(*args)
+ def num_workers(*args)
+ self.configuration.num_workers(*args)
end
+ alias :workers :num_workers
- def on_worker_start(&block)
- self.configuration.worker_start_procs << block
- end
-
- def on_worker_shutdown(&block)
- self.configuration.worker_shutdown_procs << block
- end
-
- def on_worker_sleep(&block)
- self.configuration.worker_sleep_procs << block
- end
-
- def on_worker_wakeup(&block)
- self.configuration.worker_wakeup_procs << block
- end
-
def verbose_logging(*args)
self.configuration.verbose_logging(*args)
end
def logger(*args)
@@ -305,48 +240,41 @@
include NsOptions::Proxy
option :name, String, :required => true
option :pid_file, Pathname
- option :min_workers, Integer, :default => 1
- option :max_workers, Integer, :default => 4
+ option :num_workers, Integer, :default => 4
option :verbose_logging, :default => true
option :logger, :default => proc{ Qs::NullLogger.new }
option :shutdown_timeout
- attr_accessor :process_label
attr_accessor :init_procs, :error_procs
+ attr_accessor :worker_class, :worker_params
attr_accessor :queues
- attr_reader :worker_start_procs, :worker_shutdown_procs
- attr_reader :worker_sleep_procs, :worker_wakeup_procs
def initialize(values = nil)
super(values)
- @process_label = !(v = ENV['QS_PROCESS_LABEL'].to_s).empty? ? v : self.name
@init_procs, @error_procs = [], []
- @worker_start_procs, @worker_shutdown_procs = [], []
- @worker_sleep_procs, @worker_wakeup_procs = [], []
+ @worker_class = DefaultWorker
+ @worker_params = nil
@queues = []
- @valid = nil
+ @valid = nil
end
def routes
@queues.map(&:routes).flatten
end
def to_hash
super.merge({
- :process_label => self.process_label,
- :error_procs => self.error_procs,
- :worker_start_procs => self.worker_start_procs,
- :worker_shutdown_procs => self.worker_shutdown_procs,
- :worker_sleep_procs => self.worker_sleep_procs,
- :worker_wakeup_procs => self.worker_wakeup_procs,
- :routes => self.routes,
- :queue_redis_keys => self.queues.map(&:redis_key)
+ :error_procs => self.error_procs,
+ :worker_class => self.worker_class,
+ :worker_params => self.worker_params,
+ :routes => self.routes,
+ :queue_redis_keys => self.queues.map(&:redis_key)
})
end
def valid?
!!@valid
@@ -356,35 +284,33 @@
return @valid if !@valid.nil?
self.init_procs.each(&:call)
if self.queues.empty? || !self.required_set?
raise InvalidError, "a name and queue must be configured"
end
+ if !self.worker_class.kind_of?(Class) || !self.worker_class.include?(Qs::Worker)
+ raise InvalidError, "worker class must include `#{Qs::Worker}`"
+ end
self.routes.each(&:validate!)
@valid = true
end
end
- class Signal
- def initialize(value)
- @value = value
- @mutex = Mutex.new
- end
+ DefaultWorker = Class.new{ include Qs::Worker }
- def set(value)
- @mutex.synchronize{ @value = value }
+ class WorkerAvailable
+ def initialize
+ @mutex = Mutex.new
+ @cond_var = ConditionVariable.new
end
- def start?
- @mutex.synchronize{ @value == :start }
- end
+ def wait; @mutex.synchronize{ @cond_var.wait(@mutex) }; end
+ def signal; @mutex.synchronize{ @cond_var.signal }; end
+ end
- def stop?
- @mutex.synchronize{ @value == :stop }
- end
-
- def halt?
- @mutex.synchronize{ @value == :halt }
- end
+ class State < DatWorkerPool::LockedObject
+ def run?; self.value == :run; end
+ def stop?; self.value == :stop; end
+ def halt?; self.value == :halt; end
end
end
end