lib/backburner/worker.rb in backburner-0.2.0 vs lib/backburner/worker.rb in backburner-0.2.5
- old
+ new
@@ -1,8 +1,12 @@
require 'backburner/job'
module Backburner
+ #
+ # @abstract Subclass and override {#process_tube_names}, {#prepare} and {#start} to implement
+ # a custom Worker class.
+ #
class Worker
include Backburner::Helpers
include Backburner::Logger
# Backburner::Worker.known_queue_classes
@@ -10,11 +14,11 @@
class << self
attr_writer :known_queue_classes
def known_queue_classes; @known_queue_classes ||= []; end
end
- # Enqueues a job to be processed later by a worker
+ # Enqueues a job to be processed later by a worker.
# Options: `pri` (priority), `delay` (delay in secs), `ttr` (time to respond), `queue` (queue name)
#
# @raise [Beaneater::NotConnected] If beanstalk fails to connect.
# @example
# Backburner::Worker.enqueue NewsletterSender, [self.id, user.id], :ttr => 1000
@@ -30,68 +34,76 @@
tube.put data.to_json, :pri => pri, :delay => delay, :ttr => ttr
job_class.invoke_hook_events(:after_enqueue, *args)
return true
end
- # Starts processing jobs in the specified tube_names
+ # Starts processing jobs with the specified tube_names.
#
# @example
# Backburner::Worker.start(["foo.tube.name"])
#
def self.start(tube_names=nil)
self.new(tube_names).start
end
- # Returns the worker connection
+ # Returns the worker connection.
# @example
# Backburner::Worker.connection # => <Beaneater::Pool>
def self.connection
@connection ||= Connection.new(Backburner.configuration.beanstalk_url)
end
# List of tube names to be watched and processed
attr_accessor :tube_names
+ # Constructs a new worker for processing jobs within specified tubes.
+ #
# @example
# Worker.new(['test.job'])
def initialize(tube_names=nil)
- @tube_names = begin
- tube_names = tube_names.first if tube_names && tube_names.size == 1 && tube_names.first.is_a?(Array)
- tube_names = Array(tube_names).compact if tube_names && Array(tube_names).compact.size > 0
- tube_names = nil if tube_names && tube_names.compact.empty?
- tube_names
- end
+ @tube_names = self.process_tube_names(tube_names)
end
- # Starts processing new jobs indefinitely
- # Primary way to consume and process jobs in specified tubes
+ # Starts processing ready jobs indefinitely.
+ # Primary way to consume and process jobs in specified tubes.
#
# @example
# @worker.start
#
def start
- prepare
- loop { work_one_job }
+ raise NotImplementedError
end
- # Setup beanstalk tube_names and watch all specified tubes for jobs.
- # Used to prepare job queues before processing jobs.
+ # Used to prepare the job queues before job processing is initiated.
#
# @raise [Beaneater::NotConnected] If beanstalk fails to connect.
# @example
# @worker.prepare
#
+ # @abstract Define this in your worker subclass
+ # to be run once before processing. Recommended to watch tubes
+ # or print a message to the logs with 'log_info'
+ #
def prepare
- self.tube_names ||= Backburner.default_queues.any? ? Backburner.default_queues : all_existing_queues
- self.tube_names = Array(self.tube_names)
- self.tube_names.map! { |name| expand_tube_name(name) }
- log_info "Working #{tube_names.size} queues: [ #{tube_names.join(', ')} ]"
- self.connection.tubes.watch!(*self.tube_names)
+ raise NotImplementedError
end
- # Reserves one job within the specified queues
- # Pops the job off and serializes the job to JSON
+ # Processes tube_names given tube_names array.
+ # Should return normalized tube_names as an array of strings.
+ #
+ # @example
+ # process_tube_names([['foo'], ['bar']])
+ # => ['foo', 'bar', 'baz']
+ #
+ # @note This method can be overridden in inherited workers
+ # to add more complex tube name processing.
+ def process_tube_names(tube_names)
+ compact_tube_names(tube_names)
+ end
+
+ # Reserves one job within the specified queues.
+ # Pops the job off and serializes the job to JSON.
# Each job is performed by invoking `perform` on the job class.
#
# @example
# @worker.work_one_job
#
@@ -140,8 +152,19 @@
error_handler.call(e)
else
error_handler.call(e, name, args)
end
end
+ end
+
+ # Normalizes tube names given array of tube_names
+ # Compacts nil items, flattens arrays, sets tubes to nil if no valid names
+ # Loads default tubes when no tubes given.
+ def compact_tube_names(tube_names)
+ tube_names = tube_names.first if tube_names && tube_names.size == 1 && tube_names.first.is_a?(Array)
+ tube_names = Array(tube_names).compact if tube_names && Array(tube_names).compact.size > 0
+ tube_names = nil if tube_names && tube_names.compact.empty?
+ tube_names ||= Backburner.default_queues.any? ? Backburner.default_queues : all_existing_queues
+ Array(tube_names)
end
end # Worker
end # Backburner
\ No newline at end of file