lib/basquiat/adapters/rabbitmq_adapter.rb in basquiat-1.4.0 vs lib/basquiat/adapters/rabbitmq_adapter.rb in basquiat-1.5.0
- old
+ new
@@ -41,11 +41,15 @@
# Publishes the event to the exchange configured.
# @param event [String] routing key to be used
# @param message [Hash] the message to be publish
# @param props [Hash] other properties you wish to publish with the message, such as custom headers etc.
def publish(event, message, props: {})
- session_pool.with { |session| session.publish(event, message, props) }
+ if options[:publisher][:session_pool]
+ session_pool.with { |session| session.publish(event, message, props) }
+ else
+ session.publish(event, message, props)
+ end
disconnect unless options[:publisher][:persistent]
end
# Binds the queues and start the event lopp.
# @param block [Boolean] block the thread
@@ -88,11 +92,11 @@
end
# Lazy initializes and return the session pool
# @return [ConnectionPool<Session>]
def session_pool
- @session_pool ||= ConnectionPool.new(size: options[:publisher][:session_pool][:size],
- timeout: options[:publisher][:session_pool][:timeout]) do
+ @session_pool ||= ConnectionPool.new(size: options[:publisher][:session_pool].fetch(:size, 1),
+ timeout: options[:publisher][:session_pool].fetch(:timeout, 5)) do
Session.new(connection.create_channel, @configuration.session_options)
end
end
private