lib/qs/daemon.rb in qs-0.4.0 vs lib/qs/daemon.rb in qs-0.5.0
- old
+ new
@@ -61,10 +61,14 @@
def name
@daemon_data.name
end
+ def process_label
+ @daemon_data.process_label
+ end
+
def pid_file
@daemon_data.pid_file
end
def running?
@@ -127,14 +131,23 @@
def build_worker_pool
wp = DatWorkerPool.new(
self.daemon_data.min_workers,
self.daemon_data.max_workers
){ |queue_item| process(queue_item) }
+
+ # add internal callbacks
wp.on_worker_error do |worker, exception, queue_item|
handle_worker_exception(exception, queue_item)
end
wp.on_worker_sleep{ @worker_available_io.write(SIGNAL) }
+
+ # add any configured callbacks
+ self.daemon_data.worker_start_procs.each{ |cb| wp.on_worker_start(&cb) }
+ self.daemon_data.worker_shutdown_procs.each{ |cb| wp.on_worker_shutdown(&cb) }
+ self.daemon_data.worker_sleep_procs.each{ |cb| wp.on_worker_sleep(&cb) }
+ self.daemon_data.worker_wakeup_procs.each{ |cb| wp.on_worker_wakeup(&cb) }
+
wp.start
wp
end
# * Shuffle the queue redis keys to avoid queue starvation. Redis will
@@ -244,10 +257,26 @@
def workers(*args)
self.min_workers(*args)
self.max_workers(*args)
end
+ def on_worker_start(&block)
+ self.configuration.worker_start_procs << block
+ end
+
+ def on_worker_shutdown(&block)
+ self.configuration.worker_shutdown_procs << block
+ end
+
+ def on_worker_sleep(&block)
+ self.configuration.worker_sleep_procs << block
+ end
+
+ def on_worker_wakeup(&block)
+ self.configuration.worker_wakeup_procs << block
+ end
+
def verbose_logging(*args)
self.configuration.verbose_logging(*args)
end
def logger(*args)
@@ -284,28 +313,39 @@
option :verbose_logging, :default => true
option :logger, :default => proc{ Qs::NullLogger.new }
option :shutdown_timeout
+ attr_accessor :process_label
attr_accessor :init_procs, :error_procs
attr_accessor :queues
+ attr_reader :worker_start_procs, :worker_shutdown_procs
+ attr_reader :worker_sleep_procs, :worker_wakeup_procs
def initialize(values = nil)
super(values)
+ @process_label = !(v = ENV['QS_PROCESS_LABEL'].to_s).empty? ? v : self.name
@init_procs, @error_procs = [], []
+ @worker_start_procs, @worker_shutdown_procs = [], []
+ @worker_sleep_procs, @worker_wakeup_procs = [], []
@queues = []
- @valid = nil
+ @valid = nil
end
def routes
@queues.map(&:routes).flatten
end
def to_hash
super.merge({
- :error_procs => self.error_procs,
- :queue_redis_keys => self.queues.map(&:redis_key),
- :routes => self.routes
+ :process_label => self.process_label,
+ :error_procs => self.error_procs,
+ :worker_start_procs => self.worker_start_procs,
+ :worker_shutdown_procs => self.worker_shutdown_procs,
+ :worker_sleep_procs => self.worker_sleep_procs,
+ :worker_wakeup_procs => self.worker_wakeup_procs,
+ :routes => self.routes,
+ :queue_redis_keys => self.queues.map(&:redis_key)
})
end
def valid?
!!@valid