lib/qs/daemon.rb in qs-0.4.0 vs lib/qs/daemon.rb in qs-0.5.0

- old
+ new

@@ -61,10 +61,14 @@ def name @daemon_data.name end + def process_label + @daemon_data.process_label + end + def pid_file @daemon_data.pid_file end def running? @@ -127,14 +131,23 @@ def build_worker_pool wp = DatWorkerPool.new( self.daemon_data.min_workers, self.daemon_data.max_workers ){ |queue_item| process(queue_item) } + + # add internal callbacks wp.on_worker_error do |worker, exception, queue_item| handle_worker_exception(exception, queue_item) end wp.on_worker_sleep{ @worker_available_io.write(SIGNAL) } + + # add any configured callbacks + self.daemon_data.worker_start_procs.each{ |cb| wp.on_worker_start(&cb) } + self.daemon_data.worker_shutdown_procs.each{ |cb| wp.on_worker_shutdown(&cb) } + self.daemon_data.worker_sleep_procs.each{ |cb| wp.on_worker_sleep(&cb) } + self.daemon_data.worker_wakeup_procs.each{ |cb| wp.on_worker_wakeup(&cb) } + wp.start wp end # * Shuffle the queue redis keys to avoid queue starvation. Redis will @@ -244,10 +257,26 @@ def workers(*args) self.min_workers(*args) self.max_workers(*args) end + def on_worker_start(&block) + self.configuration.worker_start_procs << block + end + + def on_worker_shutdown(&block) + self.configuration.worker_shutdown_procs << block + end + + def on_worker_sleep(&block) + self.configuration.worker_sleep_procs << block + end + + def on_worker_wakeup(&block) + self.configuration.worker_wakeup_procs << block + end + def verbose_logging(*args) self.configuration.verbose_logging(*args) end def logger(*args) @@ -284,28 +313,39 @@ option :verbose_logging, :default => true option :logger, :default => proc{ Qs::NullLogger.new } option :shutdown_timeout + attr_accessor :process_label attr_accessor :init_procs, :error_procs attr_accessor :queues + attr_reader :worker_start_procs, :worker_shutdown_procs + attr_reader :worker_sleep_procs, :worker_wakeup_procs def initialize(values = nil) super(values) + @process_label = !(v = ENV['QS_PROCESS_LABEL'].to_s).empty? ? v : self.name @init_procs, @error_procs = [], [] + @worker_start_procs, @worker_shutdown_procs = [], [] + @worker_sleep_procs, @worker_wakeup_procs = [], [] @queues = [] - @valid = nil + @valid = nil end def routes @queues.map(&:routes).flatten end def to_hash super.merge({ - :error_procs => self.error_procs, - :queue_redis_keys => self.queues.map(&:redis_key), - :routes => self.routes + :process_label => self.process_label, + :error_procs => self.error_procs, + :worker_start_procs => self.worker_start_procs, + :worker_shutdown_procs => self.worker_shutdown_procs, + :worker_sleep_procs => self.worker_sleep_procs, + :worker_wakeup_procs => self.worker_wakeup_procs, + :routes => self.routes, + :queue_redis_keys => self.queues.map(&:redis_key) }) end def valid? !!@valid