lib/basquiat/adapters/rabbitmq_adapter.rb in basquiat-1.1.1 vs lib/basquiat/adapters/rabbitmq_adapter.rb in basquiat-1.2.0

- old
+ new

@@ -1,130 +1,74 @@ require 'bunny' +require 'delegate' module Basquiat module Adapters # The RabbitMQ adapter for Basquiat - class RabbitMq - include Basquiat::Adapters::Base + class RabbitMq < Basquiat::Adapters::Base + using Basquiat::HashRefinements - def default_options - { failover: { default_timeout: 5, max_retries: 5 }, - servers: [{ host: 'localhost', port: 5672 }], - queue: { name: Basquiat.configuration.queue_name, options: { durable: true } }, - exchange: { name: Basquiat.configuration.exchange_name, options: { durable: true } }, - publisher: { confirm: true, persistent: false }, - auth: { user: 'guest', password: 'guest' } } + + # Avoid superclass mismatch errors + require 'basquiat/adapters/rabbitmq/events' + require 'basquiat/adapters/rabbitmq/message' + require 'basquiat/adapters/rabbitmq/configuration' + require 'basquiat/adapters/rabbitmq/connection' + require 'basquiat/adapters/rabbitmq/session' + require 'basquiat/adapters/rabbitmq/requeue_strategies' + + def initialize + super + @procs = Events.new end + def base_options + @configuration ||= Configuration.new + @configuration.merge_user_options(Basquiat.configuration.adapter_options) + end + def subscribe_to(event_name, proc) procs[event_name] = proc end - def publish(event, message, persistent: options[:publisher][:persistent]) - with_network_failure_handler do - channel.confirm_select if options[:publisher][:confirm] - exchange.publish(Basquiat::Json.encode(message), routing_key: event) + def publish(event, message, persistent: options[:publisher][:persistent], props: {}) + connection.with_network_failure_handler do + session.publish(event, message, props) disconnect unless persistent end end def listen(block: true) - with_network_failure_handler do - procs.keys.each { |key| bind_queue(key) } - queue.subscribe(block: block) do |di, _, msg| - message = Basquiat::Json.decode(msg) - procs[di.routing_key].call(message) + connection.with_network_failure_handler do + procs.keys.each { |key| session.bind_queue(key) } + session.subscribe(block) do |message| + strategy.run(message) do + procs[message.routing_key].call(message) + end end end end - def connect - with_network_failure_handler do - connection.start - current_server[:retries] = 0 - end + def reset_connection + connection.disconnect + @connection = nil + @session = nil end - def connection_uri - current_server_uri - end + alias_method :disconnect, :reset_connection - def disconnect - connection.close_all_channels - connection.close - reset_connection + def strategy + @strategy ||= @configuration.strategy.new(session) end - def connected? - @connection + def session + @session ||= Session.new(connection, @configuration.session_options) end private - def with_network_failure_handler - yield if block_given? - rescue Bunny::ConnectionForced, Bunny::TCPConnectionFailed, Bunny::NetworkFailure => error - if current_server.fetch(:retries, 0) <= failover_opts[:max_retries] - handle_network_failures - retry - else - raise(error) - end - end - def failover_opts - options[:failover] - end - - def bind_queue(event_name) - queue.bind(exchange, routing_key: event_name) - end - - def reset_connection - @connection, @channel, @exchange, @queue = nil, nil, nil, nil - end - - def rotate_servers - return unless options[:servers].any? { |server| server.fetch(:retries, 0) < failover_opts[:max_retries] } - options[:servers].rotate! - end - - def handle_network_failures - logger.warn "[WARN] Handling connection to #{current_server_uri}" - retries = current_server.fetch(:retries, 0) - current_server[:retries] = retries + 1 - if retries < failover_opts[:max_retries] - logger.warn("[WARN] Connection failed retrying in #{failover_opts[:default_timeout]} seconds") - sleep(failover_opts[:default_timeout]) - else - rotate_servers - end - reset_connection - end - def connection - @connection ||= Bunny.new(current_server_uri) - end - - def channel - connect - @channel ||= connection.create_channel - end - - def queue - @queue ||= channel.queue(options[:queue][:name], options[:queue][:options]) - end - - def exchange - @exchange ||= channel.topic(options[:exchange][:name], options[:exchange][:options]) - end - - def current_server - options[:servers].first - end - - def current_server_uri - auth = current_server[:auth] || options[:auth] - "amqp://#{auth[:user]}:#{auth[:password]}@#{current_server[:host]}:#{current_server[:port]}" + @connection ||= Connection.new(@configuration.connection_options) end end end end