Sha256: 26fc0023be4926ec734b8fa85ceff7d66aabef4ae3e4cc256924c5bb69f756c4

Contents?: true

Size: 1.33 KB

Versions: 1

Compression:

Stored size: 1.33 KB

Contents

# frozen_string_literal: true

require "tainbox"

require "rabbit"
require "rabbit/receiving/queue"
require "rabbit/receiving/job"

class Rabbit::Receiving::Receive
  include Tainbox

  attribute :message
  attribute :delivery_info
  attribute :arguments

  def call
    log!
    call_hooks(before_hooks)
    process_message
    call_hooks(after_hooks)
  end

  def log!
    Rabbit.config.receive_logger.debug(
      [message, delivery_info, arguments].join(" / "),
    )
  end

  def process_message
    job_class
      .set(queue: queue_name, **job_configs)
      .perform_later(message, message_info)
  end

  def call_hooks(hooks)
    hooks.each do |hook_proc|
      hook_proc.call(message, message_info)
    end
  end

  def before_hooks
    Rabbit.config.before_receiving_hooks || []
  end

  def after_hooks
    Rabbit.config.after_receiving_hooks || []
  end

  def message_info
    arguments.merge(
      delivery_info.slice(:exchange, :routing_key),
    )
  end

  def queue
    @queue ||= Rabbit::Receiving::Queue.new(message, arguments)
  end

  def job_configs
    queue.handler.additional_job_configs
  end

  def queue_name
    queue.name
  end

  def job_class
    Rabbit.config.receiving_job_class_callable&.call(
      message: message,
      delivery_info: delivery_info,
      arguments: arguments,
    ) || Rabbit::Receiving::Job
  end
end

Version data entries

1 entries across 1 versions & 1 rubygems

Version Path
rabbit_messaging-1.1.0 lib/rabbit/receiving/receive.rb