lib/backburner/worker.rb in backburner-0.0.1 vs lib/backburner/worker.rb in backburner-0.0.2

- old
+ new

@@ -1,24 +1,25 @@ +require 'backburner/job' + module Backburner class Worker include Backburner::Helpers include Backburner::Logger - class JobNotFound < RuntimeError; end - class JobTimeout < RuntimeError; end - class JobQueueNotSet < RuntimeError; end - # Backburner::Worker.known_queue_classes # List of known_queue_classes class << self attr_writer :known_queue_classes def known_queue_classes; @known_queue_classes ||= []; end end # Enqueues a job to be processed later by a worker # Options: `pri` (priority), `delay` (delay in secs), `ttr` (time to respond), `queue` (queue name) - # Backburner::Worker.enqueue NewsletterSender, [self.id, user.id], :ttr => 1000 + # + # @example + # Backburner::Worker.enqueue NewsletterSender, [self.id, user.id], :ttr => 1000 + # def self.enqueue(job_class, args=[], opts={}) pri = opts[:pri] || Backburner.configuration.default_priority delay = [0, opts[:delay].to_i].max ttr = opts[:ttr] || Backburner.configuration.respond_timeout connection.use expand_tube_name(opts[:queue] || job_class) @@ -27,25 +28,28 @@ rescue Beanstalk::NotConnected => e failed_connection(e) end # Starts processing jobs in the specified tube_names - # Backburner::Worker.start(["foo.tube.name"]) + # @example + # Backburner::Worker.start(["foo.tube.name"]) def self.start(tube_names=nil) self.new(tube_names).start end # Returns the worker connection - # Backburner::Worker.connection => <Beanstalk::Pool> + # @example + # Backburner::Worker.connection # => <Beanstalk::Pool> def self.connection @connection ||= Connection.new(Backburner.configuration.beanstalk_url) end # List of tube names to be watched and processed attr_accessor :tube_names - # Worker.new(['test.job']) + # @example + # Worker.new(['test.job']) def initialize(tube_names=nil) @tube_names = begin tube_names = tube_names.first if tube_names && tube_names.size == 1 && tube_names.first.is_a?(Array) tube_names = Array(tube_names).compact if tube_names && Array(tube_names).compact.size > 0 tube_names = nil if tube_names && tube_names.compact.empty? @@ -53,19 +57,21 @@ end end # Starts processing new jobs indefinitely # Primary way to consume and process jobs in specified tubes - # @worker.start + # @example + # @worker.start def start prepare loop { work_one_job } end # Setup beanstalk tube_names and watch all specified tubes for jobs. # Used to prepare job queues before processing jobs. - # @worker.prepare + # @example + # @worker.prepare def prepare self.tube_names ||= Backburner.default_queues.any? ? Backburner.default_queues : all_existing_queues self.tube_names = Array(self.tube_names) self.tube_names.map! { |name| expand_tube_name(name) } log "Working #{tube_names.size} queues: [ #{tube_names.join(', ')} ]" @@ -78,37 +84,25 @@ end # Reserves one job within the specified queues # Pops the job off and serializes the job to JSON # Each job is performed by invoking `perform` on the job class. - # @worker.work_one_job + # @example + # @worker.work_one_job def work_one_job - job = self.connection.reserve - body = JSON.parse job.body - name, args = body["class"], body["args"] - self.class.log_job_begin(body) - handler = constantize(name) - raise(JobNotFound, name) unless handler - - begin - Timeout::timeout(job.ttr - 1) do - handler.perform(*args) - end - rescue Timeout::Error - raise JobTimeout, "#{name} hit #{job.ttr-1}s timeout" - end - - job.delete - self.class.log_job_end(name) + job = Backburner::Job.new(self.connection.reserve) + self.class.log_job_begin(job.body) + job.process + self.class.log_job_end(job.name) rescue Beanstalk::NotConnected => e failed_connection(e) rescue SystemExit raise rescue => e job.bury self.class.log_error self.class.exception_message(e) - self.class.log_job_end(name, 'failed') if @job_begun - handle_error(e, name, args) + self.class.log_job_end(job.name, 'failed') if @job_begun + handle_error(e, job.name, job.args) end protected # Returns a list of all tubes known within the system \ No newline at end of file