lib/bbqueue/consumer.rb in bbqueue-0.0.1 vs lib/bbqueue/consumer.rb in bbqueue-0.0.2
- old
+ new
@@ -1,14 +1,57 @@
module BBQueue
class Consumer
+ attr_accessor :logger
+
+ def before_fork
+ # Nothing
+ end
+
+ def after_fork
+ # Nothing
+ end
+
+ def fork?
+ false
+ end
+
+ def fork_and_wait
+ if fork?
+ before_fork
+
+ Process.fork do
+ after_fork
+
+ yield
+ end
+
+ Process.wait
+ else
+ yield
+ end
+ end
+
+ def work(job, queue_name)
+ fork_and_wait do
+ begin
+ job.work
+ rescue Timeout::Error, StandardError => e
+ logger.error "Job #{job.inspect} on #{queue_name.inspect} failed"
+ logger.error e
+ end
+ end
+ end
+
def initialize(queue_names, options = {})
- logger = options[:logger] || BBQueue::NullLogger.new
+ self.logger = logger = options[:logger] || BBQueue::NullLogger.new
+ consumer = self
+
Stalking::Consumer.new options.merge(:logger => BBQueue::FatalLogger.new(logger)) do
Array(queue_names).each do |queue_name|
job queue_name do |args|
- BBQueue::Serializer.load(args["object"]).work
+ consumer.work BBQueue::Serializer.load(args["object"]), queue_name
end
end
before do |queue_name, args|
logger.info "Job #{BBQueue::Serializer.load(args["object"]).inspect} on #{queue_name.inspect} started"