Sha256: 4f3c6778b5f06261d6c4d0ddc1f687a8e9162fdb97e6f5afd6a76a25d525dbd1

Contents?: true

Size: 1.54 KB

Versions: 2

Compression:

Stored size: 1.54 KB

Contents

# frozen_string_literal: true

require "sneakers"

require "rabbit"
require "rabbit/receiving/message"
require "rabbit/receiving/malformed_message"
require "rabbit/receiving/handler_resolver"

module Rabbit::Receiving
  autoload :Job, "rabbit/receiving/job"

  class Worker
    include Sneakers::Worker

    def self.logger
      @logger ||= Rabbit.config.receive_logger
    end

    def work_with_params(message, delivery_info, arguments)
      message = message.dup.force_encoding("UTF-8")
      self.class.logger.debug([message, delivery_info, arguments].join(" / "))
      job_class.set(queue: queue(message, arguments)).perform_later(message, arguments.to_h)
      ack!
    rescue => error
      raise if Rabbit.config.environment == :test
      Rabbit.config.exception_notifier.call(error)
      requeue!
    end

    private

    def queue(message, arguments)
      message           = Rabbit::Receiving::Message.build(message, arguments)
      handler           = Rabbit::Receiving::HandlerResolver.handler_for(message)
      queue_name        = handler.queue
      ignore_conversion = handler.ignore_queue_conversion

      return Rabbit.default_queue_name(ignore_conversion: ignore_conversion) unless queue_name

      calculated_queue = begin
        queue_name.is_a?(Proc) ? queue_name.call(message, arguments) : queue_name
      end

      Rabbit.queue_name(calculated_queue, ignore_conversion: ignore_conversion)
    rescue
      Rabbit.default_queue_name
    end

    def job_class
      Rabbit.config.receiving_job_class_callable&.call || Job
    end
  end
end

Version data entries

2 entries across 2 versions & 1 rubygems

Version Path
rabbit_messaging-0.7.1 lib/rabbit/receiving/worker.rb
rabbit_messaging-0.7.0 lib/rabbit/receiving/worker.rb