lib/backburner/worker.rb in backburner-allq-1.0.5 vs lib/backburner/worker.rb in backburner-allq-1.0.7
- old
+ new
@@ -38,11 +38,12 @@
response = nil
connection = Backburner::Connection.new(Backburner.configuration.allq_url)
connection.retryable do
tube_name = expand_tube_name(queue || job_class)
serialized_data = Backburner.configuration.job_serializer_proc.call(data)
- response = connection.put(serialized_data, :pri => pri, :delay => delay, :ttr => ttr)
+ puts "Tube name #{tube_name}"
+ response = connection.put(tube_name, serialized_data, :pri => pri, :delay => delay, :ttr => ttr)
end
return nil unless Backburner::Hooks.invoke_hook_events(job_class, :after_enqueue, *args)
ensure
connection.close if connection
end
@@ -127,11 +128,12 @@
# @worker.work_one_job
# @raise [Beaneater::NotConnected] If beanstalk fails to connect multiple times.
def work_one_job(conn = connection)
begin
job = reserve_job(conn)
- rescue Beaneater::TimedOutError => e
+ rescue Exception => e
+ sleep(rand*3)
return
end
self.log_job_begin(job.name, job.args)
job.process
@@ -147,31 +149,26 @@
return
end
# NB: There's a slight chance here that the connection to beanstalkd has
# gone down between the time we reserved / processed the job and here.
- num_retries = job.stats.releases
+ num_retries = job.releases
max_job_retries = resolve_max_job_retries(job.job_class)
retry_status = "failed: attempt #{num_retries+1} of #{max_job_retries+1}"
- if num_retries < max_job_retries # retry again
- retry_delay = resolve_retry_delay(job.job_class)
- delay = resolve_retry_delay_proc(job.job_class).call(retry_delay, num_retries) rescue retry_delay
- job.retry(num_retries + 1, delay)
- self.log_job_end(job.name, "#{retry_status}, retrying in #{delay}s") if job_started_at
- else # retries failed, bury
- job.bury
- self.log_job_end(job.name, "#{retry_status}, burying") if job_started_at
- end
+ retry_delay = resolve_retry_delay(job.job_class)
+ delay = resolve_retry_delay_proc(job.job_class).call(retry_delay, num_retries) rescue retry_delay
+ job.release(delay)
+ self.log_job_end(job.name, "#{retry_status}, retrying in #{delay}s") if job_started_at
handle_error(e, job.name, job.args, job)
end
protected
# Return a new connection instance
def new_connection
- Connection.new(Backburner.configuration.beanstalk_url) { |conn| Backburner::Hooks.invoke_hook_events(self, :on_reconnect, conn) }
+ Connection.new(Backburner.configuration.allq_url) { |conn| Backburner::Hooks.invoke_hook_events(self, :on_reconnect, conn) }
end
# Reserve a job from the watched queues
def reserve_job(conn, reserve_timeout = Backburner.configuration.reserve_timeout)
Backburner::Job.new(conn.get(@tube_names.sample))