Sha256: 529ab5703701aaa190048626abddd0d202b17131bd5ad32bb7f3f3b28da6425e
Contents?: true
Size: 1.58 KB
Versions: 1
Compression:
Stored size: 1.58 KB
Contents
# frozen_string_literal: true require "sneakers" require "rabbit" require "rabbit/receiving/message" require "rabbit/receiving/malformed_message" require "rabbit/receiving/handler_resolver" module Rabbit::Receiving autoload :Job, "rabbit/receiving/job" class Worker include Sneakers::Worker def self.logger @logger ||= Logger.new(Rails.root.join("log", "incoming_rabbit_messages.log")) end def work_with_params(message, delivery_info, arguments) message = message.dup.force_encoding("UTF-8") self.class.logger.debug([message, delivery_info, arguments].join(" / ")) job_class.set(queue: queue(message, arguments)).perform_later(message, arguments.to_h) ack! rescue => error raise if Rabbit.config.environment == :test Rabbit.config.exception_notifier.call(error) requeue! end private def queue(message, arguments) message = Rabbit::Receiving::Message.build(message, arguments) handler = Rabbit::Receiving::HandlerResolver.handler_for(message) queue_name = handler.queue ignore_conversion = handler.ignore_queue_conversion return Rabbit.default_queue_name(ignore_conversion: ignore_conversion) unless queue_name calculated_queue = begin queue_name.is_a?(Proc) ? queue_name.call(message, arguments) : queue_name end Rabbit.queue_name(calculated_queue, ignore_conversion: ignore_conversion) rescue Rabbit.default_queue_name end def job_class Rabbit.config.receiving_job_class_callable&.call || Job end end end
Version data entries
1 entries across 1 versions & 1 rubygems
Version | Path |
---|---|
rabbit_messaging-0.6.0 | lib/rabbit/receiving/worker.rb |