lib/backburner/worker.rb in backburner-allq-1.0.16 vs lib/backburner/worker.rb in backburner-allq-1.0.17
- old
+ new
@@ -132,39 +132,42 @@
rescue Exception => e
sleep(rand*3)
return
end
- self.log_job_begin(job.name, job.args)
- job.process
- self.log_job_end(job.name)
+ if job
+ self.log_job_begin(job.name, job.args)
+ job.process
+ self.log_job_end(job.name)
- rescue Backburner::Job::JobFormatInvalid => e
- self.log_error self.exception_message(e)
- rescue => e # Error occurred processing job
- self.log_error self.exception_message(e) unless e.is_a?(Backburner::Job::RetryJob)
+ rescue Backburner::Job::JobFormatInvalid => e
+ self.log_error self.exception_message(e)
+ rescue => e # Error occurred processing job
+ self.log_error self.exception_message(e) unless e.is_a?(Backburner::Job::RetryJob)
- unless job
- self.log_error "Error occurred before we were able to assign a job. Giving up without retrying!"
- return
- end
+ unless job
+ self.log_error "Error occurred before we were able to assign a job. Giving up without retrying!"
+ return
+ end
- # NB: There's a slight chance here that the connection to allq has
- # gone down between the time we reserved / processed the job and here.
- num_retries = job.releases
- max_job_retries = resolve_max_job_retries(job.job_class)
- retry_status = "failed: attempt #{num_retries+1} of #{max_job_retries+1}"
- retry_delay = resolve_retry_delay(job.job_class)
- delay = resolve_retry_delay_proc(job.job_class).call(retry_delay, num_retries) rescue retry_delay
- if num_retries + 1 > max_job_retries
- job.bury
- else
- job.release(delay)
- end
- self.log_job_end(job.name, "#{retry_status}, retrying in #{delay}s") if job_started_at
+ # NB: There's a slight chance here that the connection to allq has
+ # gone down between the time we reserved / processed the job and here.
+ num_retries = job.releases
+ max_job_retries = resolve_max_job_retries(job.job_class)
+ retry_status = "failed: attempt #{num_retries+1} of #{max_job_retries+1}"
+ retry_delay = resolve_retry_delay(job.job_class)
+ delay = resolve_retry_delay_proc(job.job_class).call(retry_delay, num_retries) rescue retry_delay
+ if num_retries + 1 > max_job_retries
+ job.bury
+ else
+ job.release(delay)
+ end
+ self.log_job_end(job.name, "#{retry_status}, retrying in #{delay}s") if job_started_at
- handle_error(e, job.name, job.args, job)
+ handle_error(e, job.name, job.args, job)
+ end
+ job
end
protected
@@ -173,10 +176,12 @@
Connection.new(Backburner.configuration.allq_url) { |conn| Backburner::Hooks.invoke_hook_events(self, :on_reconnect, conn) }
end
# Reserve a job from the watched queues
def reserve_job(conn, reserve_timeout = Backburner.configuration.reserve_timeout)
- Backburner::Job.new(conn.get(@tube_names.sample))
+ job = conn.get(@tube_names.sample)
+ return nil if job.body == nil?
+ Backburner::Job.new(job)
end
# Returns a list of all tubes known within the system
# Filtered for tubes that match the known prefix
def all_existing_queues