lib/basquiat/adapters/rabbitmq_adapter.rb in basquiat-1.2.0 vs lib/basquiat/adapters/rabbitmq_adapter.rb in basquiat-1.3.0.pre.1

- old
+ new

@@ -5,68 +5,87 @@ module Adapters # The RabbitMQ adapter for Basquiat class RabbitMq < Basquiat::Adapters::Base using Basquiat::HashRefinements - # Avoid superclass mismatch errors require 'basquiat/adapters/rabbitmq/events' require 'basquiat/adapters/rabbitmq/message' require 'basquiat/adapters/rabbitmq/configuration' require 'basquiat/adapters/rabbitmq/connection' require 'basquiat/adapters/rabbitmq/session' require 'basquiat/adapters/rabbitmq/requeue_strategies' + # Initializes the superclass using a {Events} object as the procs instance variable def initialize - super - @procs = Events.new + super(procs: Events.new) end + # Since the RabbitMQ configuration options are quite vast and it's interations with the requeue strategies a bit + # convoluted it uses a {Configuration} object to handle it all def base_options @configuration ||= Configuration.new @configuration.merge_user_options(Basquiat.configuration.adapter_options) end + # Adds the subscription and register the proc to the event. + # @param event_name [String] routing key to be matched (and bound to) when listening + # @param proc [#call] callable object to be run when a message with the said routing_key is received def subscribe_to(event_name, proc) procs[event_name] = proc end - def publish(event, message, persistent: options[:publisher][:persistent], props: {}) - connection.with_network_failure_handler do - session.publish(event, message, props) - disconnect unless persistent - end + # Publishes the event to the exchange configured. + # @param event [String] routing key to be used + # @param message [Hash] the message to be publish + # @param props [Hash] other properties you wish to publish with the message, such as custom headers etc. + def publish(event, message, props: {}) + session.publish(event, message, props) + disconnect unless options[:publisher][:persistent] end - def listen(block: true) - connection.with_network_failure_handler do - procs.keys.each { |key| session.bind_queue(key) } - session.subscribe(block) do |message| - strategy.run(message) do - procs[message.routing_key].call(message) - end + # Binds the queues and start the event lopp. + # @param block [Boolean] block the thread + def listen(block: true, rescue_proc: Basquiat.configuration.rescue_proc) + procs.keys.each { |key| session.bind_queue(key) } + session.subscribe(block: block) do |message| + strategy.run(message) do + process_message(message, rescue_proc) end end end + def process_message(message, rescue_proc) + procs[message.routing_key].call(message) + rescue => ex + rescue_proc.call(ex, message) + end + + # Reset the connection to RabbitMQ. def reset_connection connection.disconnect @connection = nil @session = nil + @strategy = nil end alias_method :disconnect, :reset_connection + # Lazy initializes the requeue strategy configured for the adapter + # @return [BaseStrategy] def strategy @strategy ||= @configuration.strategy.new(session) end + # Lazy initializes and return the session + # @return [Session] def session - @session ||= Session.new(connection, @configuration.session_options) + @session ||= Session.new(connection.create_channel, @configuration.session_options) end private + # Lazy initializes the connection def connection @connection ||= Connection.new(@configuration.connection_options) end end end