lib/basquiat/adapters/rabbitmq/connection.rb in basquiat-1.2.0 vs lib/basquiat/adapters/rabbitmq/connection.rb in basquiat-1.3.0.pre.1
- old
+ new
@@ -1,88 +1,65 @@
module Basquiat
module Adapters
class RabbitMq
+ # Control the connection to the RabitMQ server. Delegates calls to {Bunny::Connection}
class Connection < SimpleDelegator
- def initialize(servers:, failover: {}, auth: {})
- @servers = Array(servers)
- @failover = { default_timeout: 5, max_retries: 5 }.merge(failover)
- @auth = { user: 'guest', password: 'guest' }.merge(auth)
+ # @param hosts: [Array<String>] IPs or FQDN of the RabbitMQ instances
+ # @param port: [Fixnum] Port that the RabbitMQ instances run
+ # @param failover: [Hash]
+ # @option failover: [Fixnum] :max_retries (5) Maximum number of reconnection retries
+ # @option failover: [Fixnum] :default_timeout (5) Interval between to reconnect attempts
+ # @option failover: [Fixnum] :connection_timeout (5) Allowed time before a connection attempt timeouts
+ # @param auth: [Hash]
+ # @option auth: [String] :user ('guest')
+ # @option auth: [String] :password ('guest')
+ def initialize(hosts:, port: 5672, failover: {}, auth: {})
+ @hosts = hosts
+ @port = port
+ @failover = failover
+ @auth = auth
end
+ # Creates a channel
+ # @return [Bunny::Channel]
+ def create_channel
+ connection.start
+ connection.create_channel
+ end
+
+ # Starts the connection if needed
def start
- with_network_failure_handler do
- connection.start
- current_server[:retries] = 0
- end
+ connection.start unless connection.connected?
end
def connected?
connection.status == :started
end
+ # Closes the channels and the connection.
def disconnect
connection.close_all_channels
connection.close
reset
end
- def current_server_uri
- "amqp://#{current_server[:host]}:#{current_server[:port]}#{current_server[:vhost]}"
- end
-
- def with_network_failure_handler
- yield if block_given?
- rescue Bunny::ConnectionForced, Bunny::TCPConnectionFailed, Bunny::NetworkFailure => error
- if current_server.fetch(:retries, 0) <= @failover.fetch(:max_retries)
- handle_network_failures
- retry
- else
- raise(error)
- end
- end
-
private
def reset
@connection = nil
end
- def handle_network_failures
- Basquiat.logger.warn "Failed to connect to #{current_server_uri}"
- retries = current_server.fetch(:retries, 0)
- current_server[:retries] = retries + 1
- if retries < @failover.fetch(:max_retries)
- Basquiat.logger.warn("Retrying connection to #{current_server_uri} in #{@failover.fetch(:default_timeout)} seconds")
- sleep(@failover.fetch(:default_timeout))
- else
- Basquiat.logger.warn("Total number of retries exceeded for #{current_server_uri}")
- rotate
- end
- reset
- end
-
def connection
- Basquiat.logger.info("Connecting to #{current_server_uri}")
@connection ||= Bunny.new(
- current_server_uri,
- user: auth[:user],
- password: auth[:password],
- automatic_recovery: false,
- threaded: @failover.fetch(:threaded, true),
- logger: Basquiat.logger)
+ hosts: @hosts,
+ port: @port,
+ username: @auth.fetch(:user, 'guest'),
+ password: @auth.fetch(:password, 'guest'),
+ recovery_attempts: @failover.fetch(:max_retries, 5),
+ network_recovery_interval: @failover.fetch(:default_timeout, 5),
+ connection_timeout: @failover.fetch(:connection_timeout, 5),
+ logger: Basquiat.logger)
__setobj__(@connection)
- end
-
- def current_server
- @servers.first
- end
-
- def auth
- current_server[:auth] || @auth
- end
-
- def rotate
- @servers.rotate!
end
end
end
end
end