lib/basquiat/adapters/rabbitmq_adapter.rb in basquiat-1.3.6 vs lib/basquiat/adapters/rabbitmq_adapter.rb in basquiat-1.4.0
- old
+ new
@@ -41,11 +41,11 @@
# Publishes the event to the exchange configured.
# @param event [String] routing key to be used
# @param message [Hash] the message to be publish
# @param props [Hash] other properties you wish to publish with the message, such as custom headers etc.
def publish(event, message, props: {})
- session.publish(event, message, props)
+ session_pool.with { |session| session.publish(event, message, props) }
disconnect unless options[:publisher][:persistent]
end
# Binds the queues and start the event lopp.
# @param block [Boolean] block the thread
@@ -65,13 +65,14 @@
end
# Reset the connection to RabbitMQ.
def reset_connection
connection.disconnect
- @connection = nil
- @session = nil
- @strategy = nil
+ @connection = nil
+ @session = nil
+ @session_pool = nil
+ @strategy = nil
end
alias disconnect reset_connection
# Lazy initializes the requeue strategy configured for the adapter
@@ -82,9 +83,18 @@
# Lazy initializes and return the session
# @return [Session]
def session
@session ||= Session.new(connection.create_channel, @configuration.session_options)
+ end
+
+ # Lazy initializes and return the session pool
+ # @return [ConnectionPool<Session>]
+ def session_pool
+ @session_pool ||= ConnectionPool.new(size: options[:publisher][:session_pool][:size],
+ timeout: options[:publisher][:session_pool][:timeout]) do
+ Session.new(connection.create_channel, @configuration.session_options)
+ end
end
private
# Lazy initializes the connection