lib/backburner/worker.rb in backburner-0.3.4 vs lib/backburner/worker.rb in backburner-0.4.0
- old
+ new
@@ -22,19 +22,19 @@
# @raise [Beaneater::NotConnected] If beanstalk fails to connect.
# @example
# Backburner::Worker.enqueue NewsletterSender, [self.id, user.id], :ttr => 1000
#
def self.enqueue(job_class, args=[], opts={})
- pri = opts[:pri] || job_class.queue_priority || Backburner.configuration.default_priority
+ pri = resolve_priority(opts[:pri] || job_class)
delay = [0, opts[:delay].to_i].max
ttr = opts[:ttr] || Backburner.configuration.respond_timeout
tube = connection.tubes[expand_tube_name(opts[:queue] || job_class)]
- res = job_class.invoke_hook_events(:before_enqueue, *args)
+ res = Backburner::Hooks.invoke_hook_events(job_class, :before_enqueue, *args)
return false unless res # stop if hook is false
data = { :class => job_class.name, :args => args }
tube.put data.to_json, :pri => pri, :delay => delay, :ttr => ttr
- job_class.invoke_hook_events(:after_enqueue, *args)
+ Backburner::Hooks.invoke_hook_events(job_class, :after_enqueue, *args)
return true
end
# Starts processing jobs with the specified tube_names.
#
@@ -141,11 +141,11 @@
# Returns a list of all tubes known within the system
# Filtered for tubes that match the known prefix
def all_existing_queues
known_queues = Backburner::Worker.known_queue_classes.map(&:queue)
existing_tubes = self.connection.tubes.all.map(&:name).select { |tube| tube =~ /^#{queue_config.tube_namespace}/ }
- known_queues + existing_tubes
+ known_queues + existing_tubes + [queue_config.primary_queue]
end
# Returns a reference to the beanstalk connection
def connection
self.class.connection
@@ -169,10 +169,10 @@
def compact_tube_names(tube_names)
tube_names = tube_names.first if tube_names && tube_names.size == 1 && tube_names.first.is_a?(Array)
tube_names = Array(tube_names).compact if tube_names && Array(tube_names).compact.size > 0
tube_names = nil if tube_names && tube_names.compact.empty?
tube_names ||= Backburner.default_queues.any? ? Backburner.default_queues : all_existing_queues
- Array(tube_names)
+ Array(tube_names).uniq
end
# Registers signal handlers TERM and INT to trigger
def register_signal_handlers!
trap('TERM') { shutdown }
\ No newline at end of file