lib/rabbit/publishing.rb in rabbit_messaging-0.7.1 vs lib/rabbit/publishing.rb in rabbit_messaging-0.8.1

- old
+ new

@@ -3,67 +3,48 @@ require "rabbit/publishing/message" module Rabbit module Publishing autoload :Job, "rabbit/publishing/job" + autoload :ChannelsPool, "rabbit/publishing/channels_pool" + extend self MUTEX = Mutex.new - extend self + def publish(msg) + return if Rabbit.config.environment.in? %i[test development] - def publish(message) - return unless client + pool.with_channel msg.confirm_select? do |ch| + ch.basic_publish *msg.basic_publish_args - channel = channel(message.confirm_select?) - channel.basic_publish(*message.basic_publish_args) + raise MessageNotDelivered, "RabbitMQ message not delivered: #{msg}" \ + if msg.confirm_select? && !ch.wait_for_confirms - if message.confirm_select? && !channel.wait_for_confirms - raise MessageNotDelivered, "RabbitMQ message not delivered: #{message}" - else - log(message) + log msg end rescue Timeout::Error raise MessageNotDelivered, <<~MESSAGE - Timeout while sending message #{message}. Possible reasons: - - #{message.real_exchange_name} exchange is not found + Timeout while sending message #{msg}. Possible reasons: + - #{msg.real_exchange_name} exchange is not found - RabbitMQ is extremely high loaded MESSAGE end - def client - MUTEX.synchronize { @client ||= create_client } + def pool + MUTEX.synchronize { @pool ||= ChannelsPool.new(create_client) } end - def channel(confirm) - Thread.current[:bunny_channels] ||= {} - channel = Thread.current[:bunny_channels][confirm] - Thread.current[:bunny_channels][confirm] = create_channel(confirm) unless channel&.open? - Thread.current[:bunny_channels][confirm] - end - private def create_queue_if_not_exists(channel, message) channel.queue(message.routing_key, durable: true) end def create_client - return if Rabbit.config.environment == :test - config = Rails.application.config_for("sneakers") rescue {} config = config["bunny_options"].to_h.symbolize_keys - begin - Bunny.new(config).start - rescue - raise unless Rabbit.config.environment == :development - end - end - - def create_channel(confirm) - channel = client.create_channel - channel.confirm_select if confirm - channel + Bunny.new(config).start end def log(message) @logger ||= Rabbit.config.publish_logger