lib/backburner/workers/threads_on_fork.rb in backburner-0.4.5 vs lib/backburner/workers/threads_on_fork.rb in backburner-0.4.6
- old
+ new
@@ -68,10 +68,11 @@
# Custom initializer just to set @tubes_data
def initialize(*args)
@tubes_data = {}
super
+ self.process_tube_options
end
# Process the special tube_names of ThreadsOnFork worker
# The idea is tube_name:custom_threads_limit:custom_garbage_limit:custom_retries
# Any custom can be ignore. So if you want to set just the custom_retries
@@ -99,14 +100,32 @@
tube_name
end
end
end
+ # Process the tube settings
+ # This overrides @tubes_data set by process_tube_names method. So a tube has name 'super_job:5:20:10'
+ # and the tube class has setting queue_jobs_limit 10, the result limit will be 10
+ # If the tube is known by existing beanstalkd queue, but not by class - skip it
+ #
+ def process_tube_options
+ Backburner::Worker.known_queue_classes.each do |queue|
+ next if @tubes_data[expand_tube_name(queue)].nil?
+ queue_settings = {
+ :threads => queue.queue_jobs_limit,
+ :garbage => queue.queue_garbage_limit,
+ :retries => queue.queue_retry_limit
+ }
+ @tubes_data[expand_tube_name(queue)].merge!(queue_settings){|k, v1, v2| v2.nil? ? v1 : v2 }
+ end
+ end
+
def prepare
self.tube_names ||= Backburner.default_queues.any? ? Backburner.default_queues : all_existing_queues
self.tube_names = Array(self.tube_names)
tube_names.map! { |name| expand_tube_name(name) }
- log_info "Working #{tube_names.size} queues: [ #{tube_names.join(', ')} ]"
+ tube_display_names = tube_names.map{|name| "#{name}:#{@tubes_data[name].values}"}
+ log_info "Working #{tube_names.size} queues: [ #{tube_display_names.join(', ')} ]"
end
# For each tube we will call fork_and_watch to create the fork
# The lock argument define if this method should block or no
def start(lock=true)