lib/backburner/worker.rb in backburner-allq-1.0.37 vs lib/backburner/worker.rb in backburner-allq-1.0.38
- old
+ new
@@ -11,11 +11,14 @@
# Backburner::Worker.known_queue_classes
# List of known_queue_classes
class << self
attr_writer :known_queue_classes
- def known_queue_classes; @known_queue_classes ||= []; end
+
+ def known_queue_classes
+ @known_queue_classes ||= []
+ end
end
# Enqueues a job to be processed later by a worker.
# Options: `pri` (priority), `delay` (delay in secs), `ttr` (time to respond), `queue` (queue name)
#
@@ -35,12 +38,12 @@
ttr = resolve_respond_timeout(opts[:ttr] || job_class)
res = Backburner::Hooks.invoke_hook_events(job_class, :before_enqueue, *args)
return nil unless res # stop if hook is false
- data = { :class => job_class.name, :args => args, :ttr => ttr }
- queue = opts[:queue] && (Proc === opts[:queue] ? opts[:queue].call(job_class) : opts[:queue])
+ data = { class: job_class.name, args: args, ttr: ttr }
+ queue = opts[:queue] && (opts[:queue].is_a?(Proc) ? opts[:queue].call(job_class) : opts[:queue])
begin
response = nil
connection = Backburner::Connection.new(Backburner.configuration.allq_url)
connection.retryable do
@@ -65,16 +68,14 @@
# Starts processing jobs with the specified tube_names.
#
# @example
# Backburner::Worker.start(["foo.tube.name"])
#
- def self.start(tube_names=nil)
- begin
- self.new(tube_names).start
- rescue SystemExit
- # do nothing
- end
+ def self.start(tube_names = nil)
+ new(tube_names).start
+ rescue SystemExit
+ # do nothing
end
# List of tube names to be watched and processed
attr_accessor :tube_names, :connection
@@ -82,11 +83,11 @@
#
# @example
# Worker.new(['test.job'])
def initialize(tube_names = nil)
@connection = new_connection
- @tube_names = self.process_tube_names(tube_names)
+ @tube_names = process_tube_names(tube_names)
register_signal_handlers!
end
# Starts processing ready jobs indefinitely.
# Primary way to consume and process jobs in specified tubes.
@@ -138,83 +139,85 @@
# @example
# @worker.work_one_job
# @raise [Beaneater::NotConnected] If beanstalk fails to connect multiple times.
def work_one_job(conn = connection, tube_name = nil)
if tube_name.nil?
- self.log_error "Sampling tube, this is bad practice for Allq"
- tube_name = @tube_names.sample
+ log_error 'Sampling tube, this is bad practice for Allq'
+ tube_name = @tube_names.sample
end
-
+
begin
job = reserve_job(conn, tube_name)
rescue Exception => e
- self.log_error "Sleeping"
- self.log_error "Exception: #{e.full_message}"
- sleep(rand*3)
+ log_error "Exception: #{e.full_message}"
+ sleep(rand * 3)
return
end
if job && job.body
begin
- self.log_job_begin(job.name, job.args)
+ log_job_begin(job.name, job.args)
job.process
- self.log_job_end(job.name)
+ log_job_end(job.name)
rescue Backburner::Job::JobFormatInvalid => e
- self.log_error self.exception_message(e)
- rescue => e # Error occurred processing job
- self.log_error self.exception_message(e) unless e.is_a?(Backburner::Job::RetryJob)
+ log_error exception_message(e)
+ rescue StandardError => e # Error occurred processing job
+ log_error exception_message(e) unless e.is_a?(Backburner::Job::RetryJob)
unless job
- self.log_error "Error occurred before we were able to assign a job. Giving up without retrying!"
+ log_error 'Error occurred before we were able to assign a job. Giving up without retrying!'
return
end
# NB: There's a slight chance here that the connection to allq has
# gone down between the time we reserved / processed the job and here.
num_retries = job.releases
max_job_retries = resolve_max_job_retries(job.job_class)
- retry_status = "failed: attempt #{num_retries+1} of #{max_job_retries+1}"
+ retry_status = "failed: attempt #{num_retries + 1} of #{max_job_retries + 1}"
retry_delay = resolve_retry_delay(job.job_class)
- delay = resolve_retry_delay_proc(job.job_class).call(retry_delay, num_retries) rescue retry_delay
-
+ delay = begin
+ resolve_retry_delay_proc(job.job_class).call(retry_delay, num_retries)
+ rescue StandardError
+ retry_delay
+ end
+
if num_retries + 1 > max_job_retries
job.bury
else
job.release(delay)
end
- self.log_job_end(job.name, "#{retry_status}, retrying in #{delay}s") if job_started_at
+ log_job_end(job.name, "#{retry_status}, retrying in #{delay}s") if job_started_at
handle_error(e, job.name, job.args, job)
end
else
- sleep(rand*3)
+ sleep(rand * 3)
end
job
end
-
protected
# Return a new connection instance
def new_connection
Connection.new(Backburner.configuration.allq_url) { |conn| Backburner::Hooks.invoke_hook_events(self, :on_reconnect, conn) }
end
# Reserve a job from the watched queues
- def reserve_job(conn, tube_name, reserve_timeout = Backburner.configuration.reserve_timeout)
+ def reserve_job(conn, tube_name, _reserve_timeout = Backburner.configuration.reserve_timeout)
job = conn.get(tube_name)
return nil if job.nil? || job.body == nil?
+
Backburner::Job.new(job)
end
# Returns a list of all tubes known within the system
# Filtered for tubes that match the known prefix
def all_existing_queues
known_queues = Backburner::Worker.known_queue_classes.map(&:queue)
- existing_tubes = self.connection.tubes.all.map(&:name).select { |tube| tube =~ /^#{queue_config.tube_namespace}/ }
+ existing_tubes = connection.tubes.all.map(&:name).select { |tube| tube =~ /^#{queue_config.tube_namespace}/ }
existing_tubes + known_queues + [queue_config.primary_queue]
end
-
# Handles an error according to custom definition
# Used when processing a job that errors out
def handle_error(e, name, args, job)
if error_handler = Backburner.configuration.on_error