lib/backburner/worker.rb in backburner-allq-1.0.16 vs lib/backburner/worker.rb in backburner-allq-1.0.17

- old
+ new

@@ -132,39 +132,42 @@ rescue Exception => e sleep(rand*3) return end - self.log_job_begin(job.name, job.args) - job.process - self.log_job_end(job.name) + if job + self.log_job_begin(job.name, job.args) + job.process + self.log_job_end(job.name) - rescue Backburner::Job::JobFormatInvalid => e - self.log_error self.exception_message(e) - rescue => e # Error occurred processing job - self.log_error self.exception_message(e) unless e.is_a?(Backburner::Job::RetryJob) + rescue Backburner::Job::JobFormatInvalid => e + self.log_error self.exception_message(e) + rescue => e # Error occurred processing job + self.log_error self.exception_message(e) unless e.is_a?(Backburner::Job::RetryJob) - unless job - self.log_error "Error occurred before we were able to assign a job. Giving up without retrying!" - return - end + unless job + self.log_error "Error occurred before we were able to assign a job. Giving up without retrying!" + return + end - # NB: There's a slight chance here that the connection to allq has - # gone down between the time we reserved / processed the job and here. - num_retries = job.releases - max_job_retries = resolve_max_job_retries(job.job_class) - retry_status = "failed: attempt #{num_retries+1} of #{max_job_retries+1}" - retry_delay = resolve_retry_delay(job.job_class) - delay = resolve_retry_delay_proc(job.job_class).call(retry_delay, num_retries) rescue retry_delay - if num_retries + 1 > max_job_retries - job.bury - else - job.release(delay) - end - self.log_job_end(job.name, "#{retry_status}, retrying in #{delay}s") if job_started_at + # NB: There's a slight chance here that the connection to allq has + # gone down between the time we reserved / processed the job and here. + num_retries = job.releases + max_job_retries = resolve_max_job_retries(job.job_class) + retry_status = "failed: attempt #{num_retries+1} of #{max_job_retries+1}" + retry_delay = resolve_retry_delay(job.job_class) + delay = resolve_retry_delay_proc(job.job_class).call(retry_delay, num_retries) rescue retry_delay + if num_retries + 1 > max_job_retries + job.bury + else + job.release(delay) + end + self.log_job_end(job.name, "#{retry_status}, retrying in #{delay}s") if job_started_at - handle_error(e, job.name, job.args, job) + handle_error(e, job.name, job.args, job) + end + job end protected @@ -173,10 +176,12 @@ Connection.new(Backburner.configuration.allq_url) { |conn| Backburner::Hooks.invoke_hook_events(self, :on_reconnect, conn) } end # Reserve a job from the watched queues def reserve_job(conn, reserve_timeout = Backburner.configuration.reserve_timeout) - Backburner::Job.new(conn.get(@tube_names.sample)) + job = conn.get(@tube_names.sample) + return nil if job.body == nil? + Backburner::Job.new(job) end # Returns a list of all tubes known within the system # Filtered for tubes that match the known prefix def all_existing_queues