lib/backburner/worker.rb in backburner-0.0.2 vs lib/backburner/worker.rb in backburner-0.0.3
- old
+ new
@@ -13,27 +13,28 @@
end
# Enqueues a job to be processed later by a worker
# Options: `pri` (priority), `delay` (delay in secs), `ttr` (time to respond), `queue` (queue name)
#
+ # @raise [Beanstalk::NotConnected] If beanstalk fails to connect.
# @example
# Backburner::Worker.enqueue NewsletterSender, [self.id, user.id], :ttr => 1000
#
def self.enqueue(job_class, args=[], opts={})
- pri = opts[:pri] || Backburner.configuration.default_priority
+ pri = opts[:pri] || job_class.queue_priority || Backburner.configuration.default_priority
delay = [0, opts[:delay].to_i].max
ttr = opts[:ttr] || Backburner.configuration.respond_timeout
connection.use expand_tube_name(opts[:queue] || job_class)
data = { :class => job_class.name, :args => args }
connection.put data.to_json, pri, delay, ttr
- rescue Beanstalk::NotConnected => e
- failed_connection(e)
end
# Starts processing jobs in the specified tube_names
+ #
# @example
# Backburner::Worker.start(["foo.tube.name"])
+ #
def self.start(tube_names=nil)
self.new(tube_names).start
end
# Returns the worker connection
@@ -57,47 +58,48 @@
end
end
# Starts processing new jobs indefinitely
# Primary way to consume and process jobs in specified tubes
+ #
# @example
# @worker.start
+ #
def start
prepare
loop { work_one_job }
end
# Setup beanstalk tube_names and watch all specified tubes for jobs.
# Used to prepare job queues before processing jobs.
+ #
+ # @raise [Beanstalk::NotConnected] If beanstalk fails to connect.
# @example
# @worker.prepare
+ #
def prepare
self.tube_names ||= Backburner.default_queues.any? ? Backburner.default_queues : all_existing_queues
self.tube_names = Array(self.tube_names)
self.tube_names.map! { |name| expand_tube_name(name) }
- log "Working #{tube_names.size} queues: [ #{tube_names.join(', ')} ]"
+ log_info "Working #{tube_names.size} queues: [ #{tube_names.join(', ')} ]"
self.tube_names.uniq.each { |name| self.connection.watch(name) }
self.connection.list_tubes_watched.each do |server, tubes|
tubes.each { |tube| self.connection.ignore(tube) unless self.tube_names.include?(tube) }
end
- rescue Beanstalk::NotConnected => e
- failed_connection(e)
end
# Reserves one job within the specified queues
# Pops the job off and serializes the job to JSON
# Each job is performed by invoking `perform` on the job class.
+ #
# @example
# @worker.work_one_job
+ #
def work_one_job
job = Backburner::Job.new(self.connection.reserve)
self.class.log_job_begin(job.body)
job.process
self.class.log_job_end(job.name)
- rescue Beanstalk::NotConnected => e
- failed_connection(e)
- rescue SystemExit
- raise
rescue => e
job.bury
self.class.log_error self.class.exception_message(e)
self.class.log_job_end(job.name, 'failed') if @job_begun
handle_error(e, job.name, job.args)
\ No newline at end of file