module ActiveBeaneater class Worker def self.work(only: nil, exclude: []) client = Rails.application.config.beaneater.client tube_names = [ActiveBeaneater.resolve_queue_name('mailers')] ObjectSpace.each_object(Class) do |k| if k.ancestors.include?(ActiveJob::Base) tube_names << ActiveBeaneater.resolve_queue_name(k.queue_name) end end tube_names = only if only tube_names.uniq! tube_names -= exclude tube_names.each do |name| client.jobs.register(name) do |job| Rails.logger.debug("Processing job on tube #{name}") begin perform(job) rescue Exception => e Rails.logger.error(e) raise e end end end Rails.logger.info("Watching tubes [#{tube_names.join(', ')}]") client.jobs.process! end def self.perform(job) active_job = ActiveJob::Base.deserialize(MultiJson.load(job.body)) active_job.native_job = job if active_job.respond_to?(:native_job=) active_job.perform_now end end end