lib/qs/daemon.rb in qs-0.6.1 vs lib/qs/daemon.rb in qs-0.7.0
- old
+ new
@@ -1,10 +1,7 @@
require 'dat-worker-pool'
require 'much-plugin'
-require 'ns-options'
-require 'pathname'
-require 'system_timer'
require 'thread'
require 'qs'
require 'qs/client'
require 'qs/daemon_data'
require 'qs/logger'
@@ -14,60 +11,71 @@
module Qs
module Daemon
include MuchPlugin
- InvalidError = Class.new(ArgumentError)
+ SIGNAL = '.'.freeze
+ FETCH_ERR_SLEEP_TIME = 1.0.freeze
- SIGNAL = '.'.freeze
-
plugin_included do
extend ClassMethods
include InstanceMethods
end
module InstanceMethods
- attr_reader :daemon_data, :logger
- attr_reader :signals_redis_key, :queue_redis_keys
+ attr_reader :daemon_data, :signals_redis_key
- # set the size of the client to the num workers + 1, this ensures we have
- # 1 connection for fetching work from redis and at least 1 connection for
- # each worker to requeue its message when hard-shutdown
def initialize
- self.class.configuration.validate!
+ config = self.class.config
+ begin
+ config.validate!
+ rescue InvalidError => exception
+ exception.set_backtrace(caller)
+ raise exception
+ end
Qs.init
- @daemon_data = DaemonData.new(self.class.configuration.to_hash)
- @logger = @daemon_data.logger
- @client = QsClient.new(Qs.redis_config.merge({
- :timeout => 1,
- :size => self.daemon_data.num_workers + 1
- }))
- @queue_redis_keys = self.daemon_data.queue_redis_keys
+ @daemon_data = DaemonData.new({
+ :name => config.name,
+ :pid_file => config.pid_file,
+ :shutdown_timeout => config.shutdown_timeout,
+ :worker_class => config.worker_class,
+ :worker_params => config.worker_params,
+ :num_workers => config.num_workers,
+ :error_procs => config.error_procs,
+ :logger => config.logger,
+ :queues => config.queues,
+ :verbose_logging => config.verbose_logging,
+ :routes => config.routes
+ })
- @signals_redis_key = "signals:#{@daemon_data.name}-" \
+ @signals_redis_key = "signals:#{self.daemon_data.name}-" \
"#{Socket.gethostname}-#{::Process.pid}"
+ @thread = nil
@worker_available = WorkerAvailable.new
+ @state = State.new(:stop)
+ # set the size of the client to the num workers + 1, this ensures we
+ # have 1 connection for fetching work from redis and at least 1
+ # connection for each worker to requeue its message when hard-shutdown
+ @client = QsClient.new(Qs.redis_connect_hash.merge({
+ :timeout => 1,
+ :size => self.daemon_data.num_workers + 1
+ }))
+
@worker_pool = DatWorkerPool.new(self.daemon_data.worker_class, {
:num_workers => self.daemon_data.num_workers,
:logger => self.daemon_data.dwp_logger,
:worker_params => self.daemon_data.worker_params.merge({
:qs_daemon_data => self.daemon_data,
:qs_client => @client,
:qs_worker_available => @worker_available,
- :qs_logger => @logger
+ :qs_logger => self.logger
})
})
-
- @thread = nil
- @state = State.new(:stop)
- rescue InvalidError => exception
- exception.set_backtrace(caller)
- raise exception
end
def name
@daemon_data.name
end
@@ -78,18 +86,26 @@
def pid_file
@daemon_data.pid_file
end
+ def logger
+ @daemon_data.logger
+ end
+
+ def queue_redis_keys
+ @daemon_data.queue_redis_keys
+ end
+
def running?
!!(@thread && @thread.alive?)
end
- # ping to check that it can communicate with redis before running, this is
- # friendlier than starting and continously erroring because it can't
- # dequeue
def start
+ # ping to check that it can communicate with redis before running,
+ # this is friendlier than starting and continously erroring because
+ # it can't dequeue
@client.ping
@state.set :run
@thread ||= Thread.new{ work_loop }
end
@@ -119,40 +135,40 @@
(exception.backtrace || []).each{ |l| log(l, :error) }
ensure
teardown
end
- # clear any signals that are already on the signals list in redis
def setup
- @client.clear(self.signals_redis_key)
+ # clear any signals that are already on the signals list in redis
+ @client.clear(self.signals_redis_key)
@worker_pool.start
end
- # shuffle the queue redis keys to avoid queue starvation, redis will pull
- # messages off queues in the order they are passed to the command, by
- # shuffling we ensure they are randomly ordered so every queue should get
- # a chance; use 0 for the brpop timeout which means block indefinitely;
- # rescue runtime errors so the daemon thread doesn't fail if redis is
- # temporarily down, sleep for a second to keep the thread from thrashing
- # by repeatedly erroring if redis is down
def fetch_messages
if !@worker_pool.worker_available? && @state.run?
@worker_available.wait
end
return unless @worker_pool.worker_available? && @state.run?
+ # shuffle the queue redis keys to avoid queue starvation, redis will
+ # pull messages off queues in the order they are passed to the command,
+ # by shuffling we ensure they are randomly ordered so every queue
+ # should get a chance; use 0 for the brpop timeout which means block
+ # indefinitely; rescue runtime errors so the daemon thread doesn't fail
+ # if redis is temporarily down, sleep for a second to keep the thread
+ # from thrashing by repeatedly erroring if redis is down
begin
args = [self.signals_redis_key, self.queue_redis_keys.shuffle, 0].flatten
redis_key, encoded_payload = @client.block_dequeue(*args)
if redis_key != @signals_redis_key
@worker_pool.push(QueueItem.new(redis_key, encoded_payload))
end
rescue RuntimeError => exception
log "Error occurred while dequeueing", :error
log "#{exception.class}: #{exception.message}", :error
(exception.backtrace || []).each{ |l| log(l, :error) }
- sleep 1
+ sleep FETCH_ERR_SLEEP_TIME
end
end
def teardown
timeout = @state.halt? ? 0 : self.daemon_data.shutdown_timeout
@@ -181,125 +197,143 @@
end
module ClassMethods
- def configuration
- @configuration ||= Configuration.new
+ def config
+ @config ||= Config.new
end
- def name(*args)
- self.configuration.name(*args)
+ def name(value = nil)
+ self.config.name = value if !value.nil?
+ self.config.name
end
- def pid_file(*args)
- self.configuration.pid_file(*args)
+ def pid_file(value = nil)
+ self.config.pid_file = value if !value.nil?
+ self.config.pid_file
end
- def worker_class(new_worker_class = nil)
- self.configuration.worker_class = new_worker_class if !new_worker_class.nil?
- self.configuration.worker_class
+ def shutdown_timeout(value = nil)
+ self.config.shutdown_timeout = value if !value.nil?
+ self.config.shutdown_timeout
end
- def worker_params(new_worker_params = nil)
- self.configuration.worker_params = new_worker_params if !new_worker_params.nil?
- self.configuration.worker_params
+ def worker_class(value = nil)
+ self.config.worker_class = value if !value.nil?
+ self.config.worker_class
end
- def num_workers(*args)
- self.configuration.num_workers(*args)
+ def worker_params(value = nil)
+ self.config.worker_params = value if !value.nil?
+ self.config.worker_params
end
+
+ def num_workers(new_num_workers = nil)
+ self.config.num_workers = new_num_workers if new_num_workers
+ self.config.num_workers
+ end
alias :workers :num_workers
- def verbose_logging(*args)
- self.configuration.verbose_logging(*args)
+ def init(&block)
+ self.config.init_procs << block
end
- def logger(*args)
- self.configuration.logger(*args)
+ def init_procs
+ self.config.init_procs
end
- def shutdown_timeout(*args)
- self.configuration.shutdown_timeout(*args)
+ def error(&block)
+ self.config.error_procs << block
end
- def init(&block)
- self.configuration.init_procs << block
+ def error_procs
+ self.config.error_procs
end
- def error(&block)
- self.configuration.error_procs << block
+ def logger(value = nil)
+ self.config.logger = value if !value.nil?
+ self.config.logger
end
- def queue(queue)
- self.configuration.queues << queue
+ def queue(value)
+ self.config.queues << value
end
def queues
- self.configuration.queues
+ self.config.queues
end
- end
+ # flags
- class Configuration
- include NsOptions::Proxy
+ def verbose_logging(value = nil)
+ self.config.verbose_logging = value if !value.nil?
+ self.config.verbose_logging
+ end
- option :name, String, :required => true
- option :pid_file, Pathname
+ end
- option :num_workers, Integer, :default => 4
+ class Config
- option :verbose_logging, :default => true
- option :logger, :default => proc{ Qs::NullLogger.new }
+ DEFAULT_NUM_WORKERS = 4.freeze
- option :shutdown_timeout
+ attr_accessor :name, :pid_file, :shutdown_timeout
+ attr_accessor :worker_class, :worker_params, :num_workers
+ attr_accessor :init_procs, :error_procs, :logger, :queues
+ attr_accessor :verbose_logging
- attr_accessor :init_procs, :error_procs
- attr_accessor :worker_class, :worker_params
- attr_accessor :queues
+ def initialize
+ @name = nil
+ @pid_file = nil
+ @shutdown_timeout = nil
+ @worker_class = DefaultWorker
+ @worker_params = nil
+ @num_workers = DEFAULT_NUM_WORKERS
+ @init_procs = []
+ @error_procs = []
+ @logger = Qs::NullLogger.new
+ @queues = []
- def initialize(values = nil)
- super(values)
- @init_procs, @error_procs = [], []
- @worker_class = DefaultWorker
- @worker_params = nil
- @queues = []
+ @verbose_logging = true
+
@valid = nil
end
def routes
@queues.map(&:routes).flatten
end
- def to_hash
- super.merge({
- :error_procs => self.error_procs,
- :worker_class => self.worker_class,
- :worker_params => self.worker_params,
- :routes => self.routes,
- :queue_redis_keys => self.queues.map(&:redis_key)
- })
- end
-
def valid?
!!@valid
end
+ # for the config to be considered "valid", a few things need to happen.
+ # The key here is that this only needs to be done _once_ for each config.
+
def validate!
- return @valid if !@valid.nil?
+ return @valid if !@valid.nil? # only need to run this once per config
+
+ # ensure all user and plugin configs/settings are applied
self.init_procs.each(&:call)
- if self.queues.empty? || !self.required_set?
- raise InvalidError, "a name and queue must be configured"
+ if self.queues.empty? || self.name.nil?
+ raise InvalidError, "a name and at least 1 queue must be configured"
end
+
+ # validate the worker class
if !self.worker_class.kind_of?(Class) || !self.worker_class.include?(Qs::Worker)
raise InvalidError, "worker class must include `#{Qs::Worker}`"
end
+
+ # validate the routes
self.routes.each(&:validate!)
- @valid = true
+
+ @valid = true # if it made it this far, it's valid!
end
+
end
DefaultWorker = Class.new{ include Qs::Worker }
+ InvalidError = Class.new(ArgumentError)
class WorkerAvailable
def initialize
@mutex = Mutex.new
@cond_var = ConditionVariable.new