lib/backburner/worker.rb in backburner-allq-1.0.5 vs lib/backburner/worker.rb in backburner-allq-1.0.7

- old
+ new

@@ -38,11 +38,12 @@ response = nil connection = Backburner::Connection.new(Backburner.configuration.allq_url) connection.retryable do tube_name = expand_tube_name(queue || job_class) serialized_data = Backburner.configuration.job_serializer_proc.call(data) - response = connection.put(serialized_data, :pri => pri, :delay => delay, :ttr => ttr) + puts "Tube name #{tube_name}" + response = connection.put(tube_name, serialized_data, :pri => pri, :delay => delay, :ttr => ttr) end return nil unless Backburner::Hooks.invoke_hook_events(job_class, :after_enqueue, *args) ensure connection.close if connection end @@ -127,11 +128,12 @@ # @worker.work_one_job # @raise [Beaneater::NotConnected] If beanstalk fails to connect multiple times. def work_one_job(conn = connection) begin job = reserve_job(conn) - rescue Beaneater::TimedOutError => e + rescue Exception => e + sleep(rand*3) return end self.log_job_begin(job.name, job.args) job.process @@ -147,31 +149,26 @@ return end # NB: There's a slight chance here that the connection to beanstalkd has # gone down between the time we reserved / processed the job and here. - num_retries = job.stats.releases + num_retries = job.releases max_job_retries = resolve_max_job_retries(job.job_class) retry_status = "failed: attempt #{num_retries+1} of #{max_job_retries+1}" - if num_retries < max_job_retries # retry again - retry_delay = resolve_retry_delay(job.job_class) - delay = resolve_retry_delay_proc(job.job_class).call(retry_delay, num_retries) rescue retry_delay - job.retry(num_retries + 1, delay) - self.log_job_end(job.name, "#{retry_status}, retrying in #{delay}s") if job_started_at - else # retries failed, bury - job.bury - self.log_job_end(job.name, "#{retry_status}, burying") if job_started_at - end + retry_delay = resolve_retry_delay(job.job_class) + delay = resolve_retry_delay_proc(job.job_class).call(retry_delay, num_retries) rescue retry_delay + job.release(delay) + self.log_job_end(job.name, "#{retry_status}, retrying in #{delay}s") if job_started_at handle_error(e, job.name, job.args, job) end protected # Return a new connection instance def new_connection - Connection.new(Backburner.configuration.beanstalk_url) { |conn| Backburner::Hooks.invoke_hook_events(self, :on_reconnect, conn) } + Connection.new(Backburner.configuration.allq_url) { |conn| Backburner::Hooks.invoke_hook_events(self, :on_reconnect, conn) } end # Reserve a job from the watched queues def reserve_job(conn, reserve_timeout = Backburner.configuration.reserve_timeout) Backburner::Job.new(conn.get(@tube_names.sample))