module BBQueue class Consumer attr_accessor :logger def before_fork # Nothing end def after_fork # Nothing end def fork? false end def fork_and_wait if fork? before_fork Process.fork do after_fork yield end Process.wait else yield end end def work(job, queue_name) fork_and_wait do begin job.work rescue Timeout::Error, StandardError => e logger.error "Job #{job.inspect} on #{queue_name.inspect} failed" logger.error e end end end def initialize(queue_names, options = {}) self.logger = logger = options[:logger] || BBQueue::NullLogger.new consumer = self Stalking::Consumer.new options.merge(:logger => BBQueue::FatalLogger.new(logger)) do Array(queue_names).each do |queue_name| job queue_name do |args| consumer.work BBQueue::Serializer.load(args["object"]), queue_name end end before do |queue_name, args| logger.info "Job #{BBQueue::Serializer.load(args["object"]).inspect} on #{queue_name.inspect} started" end after do |queue_name, args| logger.info "Job #{BBQueue::Serializer.load(args["object"]).inspect} on #{queue_name.inspect} finished" end error do |e, queue_name, args| logger.error "Job #{BBQueue::Serializer.load(args["object"]).inspect} on #{queue_name.inspect} failed" logger.error e end end end end end