lib/basquiat/adapters/rabbitmq_adapter.rb in basquiat-1.3.6 vs lib/basquiat/adapters/rabbitmq_adapter.rb in basquiat-1.4.0

- old
+ new

@@ -41,11 +41,11 @@ # Publishes the event to the exchange configured. # @param event [String] routing key to be used # @param message [Hash] the message to be publish # @param props [Hash] other properties you wish to publish with the message, such as custom headers etc. def publish(event, message, props: {}) - session.publish(event, message, props) + session_pool.with { |session| session.publish(event, message, props) } disconnect unless options[:publisher][:persistent] end # Binds the queues and start the event lopp. # @param block [Boolean] block the thread @@ -65,13 +65,14 @@ end # Reset the connection to RabbitMQ. def reset_connection connection.disconnect - @connection = nil - @session = nil - @strategy = nil + @connection = nil + @session = nil + @session_pool = nil + @strategy = nil end alias disconnect reset_connection # Lazy initializes the requeue strategy configured for the adapter @@ -82,9 +83,18 @@ # Lazy initializes and return the session # @return [Session] def session @session ||= Session.new(connection.create_channel, @configuration.session_options) + end + + # Lazy initializes and return the session pool + # @return [ConnectionPool<Session>] + def session_pool + @session_pool ||= ConnectionPool.new(size: options[:publisher][:session_pool][:size], + timeout: options[:publisher][:session_pool][:timeout]) do + Session.new(connection.create_channel, @configuration.session_options) + end end private # Lazy initializes the connection