lib/backburner/worker.rb in backburner-0.2.0 vs lib/backburner/worker.rb in backburner-0.2.5

- old
+ new

@@ -1,8 +1,12 @@ require 'backburner/job' module Backburner + # + # @abstract Subclass and override {#process_tube_names}, {#prepare} and {#start} to implement + # a custom Worker class. + # class Worker include Backburner::Helpers include Backburner::Logger # Backburner::Worker.known_queue_classes @@ -10,11 +14,11 @@ class << self attr_writer :known_queue_classes def known_queue_classes; @known_queue_classes ||= []; end end - # Enqueues a job to be processed later by a worker + # Enqueues a job to be processed later by a worker. # Options: `pri` (priority), `delay` (delay in secs), `ttr` (time to respond), `queue` (queue name) # # @raise [Beaneater::NotConnected] If beanstalk fails to connect. # @example # Backburner::Worker.enqueue NewsletterSender, [self.id, user.id], :ttr => 1000 @@ -30,68 +34,76 @@ tube.put data.to_json, :pri => pri, :delay => delay, :ttr => ttr job_class.invoke_hook_events(:after_enqueue, *args) return true end - # Starts processing jobs in the specified tube_names + # Starts processing jobs with the specified tube_names. # # @example # Backburner::Worker.start(["foo.tube.name"]) # def self.start(tube_names=nil) self.new(tube_names).start end - # Returns the worker connection + # Returns the worker connection. # @example # Backburner::Worker.connection # => <Beaneater::Pool> def self.connection @connection ||= Connection.new(Backburner.configuration.beanstalk_url) end # List of tube names to be watched and processed attr_accessor :tube_names + # Constructs a new worker for processing jobs within specified tubes. + # # @example # Worker.new(['test.job']) def initialize(tube_names=nil) - @tube_names = begin - tube_names = tube_names.first if tube_names && tube_names.size == 1 && tube_names.first.is_a?(Array) - tube_names = Array(tube_names).compact if tube_names && Array(tube_names).compact.size > 0 - tube_names = nil if tube_names && tube_names.compact.empty? - tube_names - end + @tube_names = self.process_tube_names(tube_names) end - # Starts processing new jobs indefinitely - # Primary way to consume and process jobs in specified tubes + # Starts processing ready jobs indefinitely. + # Primary way to consume and process jobs in specified tubes. # # @example # @worker.start # def start - prepare - loop { work_one_job } + raise NotImplementedError end - # Setup beanstalk tube_names and watch all specified tubes for jobs. - # Used to prepare job queues before processing jobs. + # Used to prepare the job queues before job processing is initiated. # # @raise [Beaneater::NotConnected] If beanstalk fails to connect. # @example # @worker.prepare # + # @abstract Define this in your worker subclass + # to be run once before processing. Recommended to watch tubes + # or print a message to the logs with 'log_info' + # def prepare - self.tube_names ||= Backburner.default_queues.any? ? Backburner.default_queues : all_existing_queues - self.tube_names = Array(self.tube_names) - self.tube_names.map! { |name| expand_tube_name(name) } - log_info "Working #{tube_names.size} queues: [ #{tube_names.join(', ')} ]" - self.connection.tubes.watch!(*self.tube_names) + raise NotImplementedError end - # Reserves one job within the specified queues - # Pops the job off and serializes the job to JSON + # Processes tube_names given tube_names array. + # Should return normalized tube_names as an array of strings. + # + # @example + # process_tube_names([['foo'], ['bar']]) + # => ['foo', 'bar', 'baz'] + # + # @note This method can be overridden in inherited workers + # to add more complex tube name processing. + def process_tube_names(tube_names) + compact_tube_names(tube_names) + end + + # Reserves one job within the specified queues. + # Pops the job off and serializes the job to JSON. # Each job is performed by invoking `perform` on the job class. # # @example # @worker.work_one_job # @@ -140,8 +152,19 @@ error_handler.call(e) else error_handler.call(e, name, args) end end + end + + # Normalizes tube names given array of tube_names + # Compacts nil items, flattens arrays, sets tubes to nil if no valid names + # Loads default tubes when no tubes given. + def compact_tube_names(tube_names) + tube_names = tube_names.first if tube_names && tube_names.size == 1 && tube_names.first.is_a?(Array) + tube_names = Array(tube_names).compact if tube_names && Array(tube_names).compact.size > 0 + tube_names = nil if tube_names && tube_names.compact.empty? + tube_names ||= Backburner.default_queues.any? ? Backburner.default_queues : all_existing_queues + Array(tube_names) end end # Worker end # Backburner \ No newline at end of file