Sha256: 529ab5703701aaa190048626abddd0d202b17131bd5ad32bb7f3f3b28da6425e

Contents?: true

Size: 1.58 KB

Versions: 1

Compression:

Stored size: 1.58 KB

Contents

# frozen_string_literal: true

require "sneakers"

require "rabbit"
require "rabbit/receiving/message"
require "rabbit/receiving/malformed_message"
require "rabbit/receiving/handler_resolver"

module Rabbit::Receiving
  autoload :Job, "rabbit/receiving/job"

  class Worker
    include Sneakers::Worker

    def self.logger
      @logger ||= Logger.new(Rails.root.join("log", "incoming_rabbit_messages.log"))
    end

    def work_with_params(message, delivery_info, arguments)
      message = message.dup.force_encoding("UTF-8")
      self.class.logger.debug([message, delivery_info, arguments].join(" / "))
      job_class.set(queue: queue(message, arguments)).perform_later(message, arguments.to_h)
      ack!
    rescue => error
      raise if Rabbit.config.environment == :test
      Rabbit.config.exception_notifier.call(error)
      requeue!
    end

    private

    def queue(message, arguments)
      message           = Rabbit::Receiving::Message.build(message, arguments)
      handler           = Rabbit::Receiving::HandlerResolver.handler_for(message)
      queue_name        = handler.queue
      ignore_conversion = handler.ignore_queue_conversion

      return Rabbit.default_queue_name(ignore_conversion: ignore_conversion) unless queue_name

      calculated_queue = begin
        queue_name.is_a?(Proc) ? queue_name.call(message, arguments) : queue_name
      end

      Rabbit.queue_name(calculated_queue, ignore_conversion: ignore_conversion)
    rescue
      Rabbit.default_queue_name
    end

    def job_class
      Rabbit.config.receiving_job_class_callable&.call || Job
    end
  end
end

Version data entries

1 entries across 1 versions & 1 rubygems

Version Path
rabbit_messaging-0.6.0 lib/rabbit/receiving/worker.rb