lib/rabbit/publishing.rb in rabbit_messaging-0.7.1 vs lib/rabbit/publishing.rb in rabbit_messaging-0.8.1
- old
+ new
@@ -3,67 +3,48 @@
require "rabbit/publishing/message"
module Rabbit
module Publishing
autoload :Job, "rabbit/publishing/job"
+ autoload :ChannelsPool, "rabbit/publishing/channels_pool"
+ extend self
MUTEX = Mutex.new
- extend self
+ def publish(msg)
+ return if Rabbit.config.environment.in? %i[test development]
- def publish(message)
- return unless client
+ pool.with_channel msg.confirm_select? do |ch|
+ ch.basic_publish *msg.basic_publish_args
- channel = channel(message.confirm_select?)
- channel.basic_publish(*message.basic_publish_args)
+ raise MessageNotDelivered, "RabbitMQ message not delivered: #{msg}" \
+ if msg.confirm_select? && !ch.wait_for_confirms
- if message.confirm_select? && !channel.wait_for_confirms
- raise MessageNotDelivered, "RabbitMQ message not delivered: #{message}"
- else
- log(message)
+ log msg
end
rescue Timeout::Error
raise MessageNotDelivered, <<~MESSAGE
- Timeout while sending message #{message}. Possible reasons:
- - #{message.real_exchange_name} exchange is not found
+ Timeout while sending message #{msg}. Possible reasons:
+ - #{msg.real_exchange_name} exchange is not found
- RabbitMQ is extremely high loaded
MESSAGE
end
- def client
- MUTEX.synchronize { @client ||= create_client }
+ def pool
+ MUTEX.synchronize { @pool ||= ChannelsPool.new(create_client) }
end
- def channel(confirm)
- Thread.current[:bunny_channels] ||= {}
- channel = Thread.current[:bunny_channels][confirm]
- Thread.current[:bunny_channels][confirm] = create_channel(confirm) unless channel&.open?
- Thread.current[:bunny_channels][confirm]
- end
-
private
def create_queue_if_not_exists(channel, message)
channel.queue(message.routing_key, durable: true)
end
def create_client
- return if Rabbit.config.environment == :test
-
config = Rails.application.config_for("sneakers") rescue {}
config = config["bunny_options"].to_h.symbolize_keys
- begin
- Bunny.new(config).start
- rescue
- raise unless Rabbit.config.environment == :development
- end
- end
-
- def create_channel(confirm)
- channel = client.create_channel
- channel.confirm_select if confirm
- channel
+ Bunny.new(config).start
end
def log(message)
@logger ||= Rabbit.config.publish_logger