lib/backburner/worker.rb in backburner-allq-1.0.37 vs lib/backburner/worker.rb in backburner-allq-1.0.38

- old
+ new

@@ -11,11 +11,14 @@ # Backburner::Worker.known_queue_classes # List of known_queue_classes class << self attr_writer :known_queue_classes - def known_queue_classes; @known_queue_classes ||= []; end + + def known_queue_classes + @known_queue_classes ||= [] + end end # Enqueues a job to be processed later by a worker. # Options: `pri` (priority), `delay` (delay in secs), `ttr` (time to respond), `queue` (queue name) # @@ -35,12 +38,12 @@ ttr = resolve_respond_timeout(opts[:ttr] || job_class) res = Backburner::Hooks.invoke_hook_events(job_class, :before_enqueue, *args) return nil unless res # stop if hook is false - data = { :class => job_class.name, :args => args, :ttr => ttr } - queue = opts[:queue] && (Proc === opts[:queue] ? opts[:queue].call(job_class) : opts[:queue]) + data = { class: job_class.name, args: args, ttr: ttr } + queue = opts[:queue] && (opts[:queue].is_a?(Proc) ? opts[:queue].call(job_class) : opts[:queue]) begin response = nil connection = Backburner::Connection.new(Backburner.configuration.allq_url) connection.retryable do @@ -65,16 +68,14 @@ # Starts processing jobs with the specified tube_names. # # @example # Backburner::Worker.start(["foo.tube.name"]) # - def self.start(tube_names=nil) - begin - self.new(tube_names).start - rescue SystemExit - # do nothing - end + def self.start(tube_names = nil) + new(tube_names).start + rescue SystemExit + # do nothing end # List of tube names to be watched and processed attr_accessor :tube_names, :connection @@ -82,11 +83,11 @@ # # @example # Worker.new(['test.job']) def initialize(tube_names = nil) @connection = new_connection - @tube_names = self.process_tube_names(tube_names) + @tube_names = process_tube_names(tube_names) register_signal_handlers! end # Starts processing ready jobs indefinitely. # Primary way to consume and process jobs in specified tubes. @@ -138,83 +139,85 @@ # @example # @worker.work_one_job # @raise [Beaneater::NotConnected] If beanstalk fails to connect multiple times. def work_one_job(conn = connection, tube_name = nil) if tube_name.nil? - self.log_error "Sampling tube, this is bad practice for Allq" - tube_name = @tube_names.sample + log_error 'Sampling tube, this is bad practice for Allq' + tube_name = @tube_names.sample end - + begin job = reserve_job(conn, tube_name) rescue Exception => e - self.log_error "Sleeping" - self.log_error "Exception: #{e.full_message}" - sleep(rand*3) + log_error "Exception: #{e.full_message}" + sleep(rand * 3) return end if job && job.body begin - self.log_job_begin(job.name, job.args) + log_job_begin(job.name, job.args) job.process - self.log_job_end(job.name) + log_job_end(job.name) rescue Backburner::Job::JobFormatInvalid => e - self.log_error self.exception_message(e) - rescue => e # Error occurred processing job - self.log_error self.exception_message(e) unless e.is_a?(Backburner::Job::RetryJob) + log_error exception_message(e) + rescue StandardError => e # Error occurred processing job + log_error exception_message(e) unless e.is_a?(Backburner::Job::RetryJob) unless job - self.log_error "Error occurred before we were able to assign a job. Giving up without retrying!" + log_error 'Error occurred before we were able to assign a job. Giving up without retrying!' return end # NB: There's a slight chance here that the connection to allq has # gone down between the time we reserved / processed the job and here. num_retries = job.releases max_job_retries = resolve_max_job_retries(job.job_class) - retry_status = "failed: attempt #{num_retries+1} of #{max_job_retries+1}" + retry_status = "failed: attempt #{num_retries + 1} of #{max_job_retries + 1}" retry_delay = resolve_retry_delay(job.job_class) - delay = resolve_retry_delay_proc(job.job_class).call(retry_delay, num_retries) rescue retry_delay - + delay = begin + resolve_retry_delay_proc(job.job_class).call(retry_delay, num_retries) + rescue StandardError + retry_delay + end + if num_retries + 1 > max_job_retries job.bury else job.release(delay) end - self.log_job_end(job.name, "#{retry_status}, retrying in #{delay}s") if job_started_at + log_job_end(job.name, "#{retry_status}, retrying in #{delay}s") if job_started_at handle_error(e, job.name, job.args, job) end else - sleep(rand*3) + sleep(rand * 3) end job end - protected # Return a new connection instance def new_connection Connection.new(Backburner.configuration.allq_url) { |conn| Backburner::Hooks.invoke_hook_events(self, :on_reconnect, conn) } end # Reserve a job from the watched queues - def reserve_job(conn, tube_name, reserve_timeout = Backburner.configuration.reserve_timeout) + def reserve_job(conn, tube_name, _reserve_timeout = Backburner.configuration.reserve_timeout) job = conn.get(tube_name) return nil if job.nil? || job.body == nil? + Backburner::Job.new(job) end # Returns a list of all tubes known within the system # Filtered for tubes that match the known prefix def all_existing_queues known_queues = Backburner::Worker.known_queue_classes.map(&:queue) - existing_tubes = self.connection.tubes.all.map(&:name).select { |tube| tube =~ /^#{queue_config.tube_namespace}/ } + existing_tubes = connection.tubes.all.map(&:name).select { |tube| tube =~ /^#{queue_config.tube_namespace}/ } existing_tubes + known_queues + [queue_config.primary_queue] end - # Handles an error according to custom definition # Used when processing a job that errors out def handle_error(e, name, args, job) if error_handler = Backburner.configuration.on_error