Sha256: da3ce87b0a191ac782c44319208bb46fe791a760bdc31acdc4805c6757364983
Contents?: true
Size: 1.54 KB
Versions: 8
Compression:
Stored size: 1.54 KB
Contents
# frozen_string_literal: true require "rabbit/publishing/message" module Rabbit module Publishing autoload :Job, "rabbit/publishing/job" autoload :ChannelsPool, "rabbit/publishing/channels_pool" extend self MUTEX = Mutex.new def publish(msg) return if Rabbit.config.skip_publish? pool.with_channel msg.confirm_select? do |ch| ch.basic_publish *msg.basic_publish_args raise MessageNotDelivered, "RabbitMQ message not delivered: #{msg}" \ if msg.confirm_select? && !ch.wait_for_confirms log msg end rescue Timeout::Error raise MessageNotDelivered, <<~MESSAGE Timeout while sending message #{msg}. Possible reasons: - #{msg.real_exchange_name} exchange is not found - RabbitMQ is extremely high loaded MESSAGE end def pool MUTEX.synchronize { @pool ||= ChannelsPool.new(create_client) } end private def create_queue_if_not_exists(channel, message) channel.queue(message.routing_key, durable: true) end def create_client config = Rails.application.config_for("sneakers") rescue {} config = config["bunny_options"].to_h.symbolize_keys Bunny.new(config).start end def log(message) @logger ||= Rabbit.config.publish_logger metadata = [ message.real_exchange_name, message.routing_key, message.headers, message.event, message.confirm_select? ? "confirm" : "no-confirm" ] @logger.debug "#{metadata.join ' / '}: #{message.data}" end end end
Version data entries
8 entries across 8 versions & 1 rubygems