Sha256: 7723b415f4cdb78020b0a0e33346d77ccab04576067c0cfc8fc652df713ee74a

Contents?: true

Size: 1.04 KB

Versions: 1

Compression:

Stored size: 1.04 KB

Contents

module ActiveBeaneater
  class Worker

    def self.work(only: nil, exclude: [])
      client = Rails.application.config.beaneater.client


      tube_names = []

      ObjectSpace.each_object(Class) do |k|
        if k.ancestors.include?(ActiveJob::Base)
          tube_names << ActiveBeaneater.resolve_queue_name(k.queue_name)
        end
      end

      tube_names = only if only
      tube_names.uniq!
      tube_names -= exclude

      tube_names.each do |name|
        client.jobs.register(name) do |job|
          Rails.logger.debug("Processing job on tube #{name}")
          begin
            perform(job)
          rescue Exception => e
            Rails.logger.error(e)
            raise e
          end
        end
      end

      Rails.logger.info("Watching tubes [#{tube_names.join(', ')}]")
      client.jobs.process!
    end

    def self.perform(job)
      active_job = ActiveJob::Base.deserialize(MultiJson.load(job.body))
      active_job.native_job = job if active_job.respond_to?(:native_job=)
      active_job.perform_now
    end

  end
end

Version data entries

1 entries across 1 versions & 1 rubygems

Version Path
active_beaneater-0.1.0 lib/active_beaneater/worker.rb