lib/backburner/workers/threads_on_fork.rb in backburner-0.4.5 vs lib/backburner/workers/threads_on_fork.rb in backburner-0.4.6

- old
+ new

@@ -68,10 +68,11 @@ # Custom initializer just to set @tubes_data def initialize(*args) @tubes_data = {} super + self.process_tube_options end # Process the special tube_names of ThreadsOnFork worker # The idea is tube_name:custom_threads_limit:custom_garbage_limit:custom_retries # Any custom can be ignore. So if you want to set just the custom_retries @@ -99,14 +100,32 @@ tube_name end end end + # Process the tube settings + # This overrides @tubes_data set by process_tube_names method. So a tube has name 'super_job:5:20:10' + # and the tube class has setting queue_jobs_limit 10, the result limit will be 10 + # If the tube is known by existing beanstalkd queue, but not by class - skip it + # + def process_tube_options + Backburner::Worker.known_queue_classes.each do |queue| + next if @tubes_data[expand_tube_name(queue)].nil? + queue_settings = { + :threads => queue.queue_jobs_limit, + :garbage => queue.queue_garbage_limit, + :retries => queue.queue_retry_limit + } + @tubes_data[expand_tube_name(queue)].merge!(queue_settings){|k, v1, v2| v2.nil? ? v1 : v2 } + end + end + def prepare self.tube_names ||= Backburner.default_queues.any? ? Backburner.default_queues : all_existing_queues self.tube_names = Array(self.tube_names) tube_names.map! { |name| expand_tube_name(name) } - log_info "Working #{tube_names.size} queues: [ #{tube_names.join(', ')} ]" + tube_display_names = tube_names.map{|name| "#{name}:#{@tubes_data[name].values}"} + log_info "Working #{tube_names.size} queues: [ #{tube_display_names.join(', ')} ]" end # For each tube we will call fork_and_watch to create the fork # The lock argument define if this method should block or no def start(lock=true)