lib/basquiat/adapters/rabbitmq/connection.rb in basquiat-1.2.0 vs lib/basquiat/adapters/rabbitmq/connection.rb in basquiat-1.3.0.pre.1

- old
+ new

@@ -1,88 +1,65 @@ module Basquiat module Adapters class RabbitMq + # Control the connection to the RabitMQ server. Delegates calls to {Bunny::Connection} class Connection < SimpleDelegator - def initialize(servers:, failover: {}, auth: {}) - @servers = Array(servers) - @failover = { default_timeout: 5, max_retries: 5 }.merge(failover) - @auth = { user: 'guest', password: 'guest' }.merge(auth) + # @param hosts: [Array<String>] IPs or FQDN of the RabbitMQ instances + # @param port: [Fixnum] Port that the RabbitMQ instances run + # @param failover: [Hash] + # @option failover: [Fixnum] :max_retries (5) Maximum number of reconnection retries + # @option failover: [Fixnum] :default_timeout (5) Interval between to reconnect attempts + # @option failover: [Fixnum] :connection_timeout (5) Allowed time before a connection attempt timeouts + # @param auth: [Hash] + # @option auth: [String] :user ('guest') + # @option auth: [String] :password ('guest') + def initialize(hosts:, port: 5672, failover: {}, auth: {}) + @hosts = hosts + @port = port + @failover = failover + @auth = auth end + # Creates a channel + # @return [Bunny::Channel] + def create_channel + connection.start + connection.create_channel + end + + # Starts the connection if needed def start - with_network_failure_handler do - connection.start - current_server[:retries] = 0 - end + connection.start unless connection.connected? end def connected? connection.status == :started end + # Closes the channels and the connection. def disconnect connection.close_all_channels connection.close reset end - def current_server_uri - "amqp://#{current_server[:host]}:#{current_server[:port]}#{current_server[:vhost]}" - end - - def with_network_failure_handler - yield if block_given? - rescue Bunny::ConnectionForced, Bunny::TCPConnectionFailed, Bunny::NetworkFailure => error - if current_server.fetch(:retries, 0) <= @failover.fetch(:max_retries) - handle_network_failures - retry - else - raise(error) - end - end - private def reset @connection = nil end - def handle_network_failures - Basquiat.logger.warn "Failed to connect to #{current_server_uri}" - retries = current_server.fetch(:retries, 0) - current_server[:retries] = retries + 1 - if retries < @failover.fetch(:max_retries) - Basquiat.logger.warn("Retrying connection to #{current_server_uri} in #{@failover.fetch(:default_timeout)} seconds") - sleep(@failover.fetch(:default_timeout)) - else - Basquiat.logger.warn("Total number of retries exceeded for #{current_server_uri}") - rotate - end - reset - end - def connection - Basquiat.logger.info("Connecting to #{current_server_uri}") @connection ||= Bunny.new( - current_server_uri, - user: auth[:user], - password: auth[:password], - automatic_recovery: false, - threaded: @failover.fetch(:threaded, true), - logger: Basquiat.logger) + hosts: @hosts, + port: @port, + username: @auth.fetch(:user, 'guest'), + password: @auth.fetch(:password, 'guest'), + recovery_attempts: @failover.fetch(:max_retries, 5), + network_recovery_interval: @failover.fetch(:default_timeout, 5), + connection_timeout: @failover.fetch(:connection_timeout, 5), + logger: Basquiat.logger) __setobj__(@connection) - end - - def current_server - @servers.first - end - - def auth - current_server[:auth] || @auth - end - - def rotate - @servers.rotate! end end end end end