lib/bunny/session.rb in bunny-0.9.0.pre6 vs lib/bunny/session.rb in bunny-0.9.0.pre7
- old
+ new
@@ -13,54 +13,76 @@
require "amq/protocol/client"
require "amq/settings"
module Bunny
+ # Represents AMQP 0.9.1 connection (connection to RabbitMQ).
+ # @see http://rubybunny.info/articles/connecting.html Connecting to RabbitMQ guide
class Session
+ # Default host used for connection
DEFAULT_HOST = "127.0.0.1"
+ # Default virtual host used for connection
DEFAULT_VHOST = "/"
+ # Default username used for connection
DEFAULT_USER = "guest"
+ # Default password used for connection
DEFAULT_PASSWORD = "guest"
- # the same value as RabbitMQ 3.0 uses. MK.
+ # Default heartbeat interval, the same value as RabbitMQ 3.0 uses.
DEFAULT_HEARTBEAT = 600
- # 128K
+ # @private
DEFAULT_FRAME_MAX = 131072
# backwards compatibility
+ # @private
CONNECT_TIMEOUT = Transport::DEFAULT_CONNECTION_TIMEOUT
-
+ # RabbitMQ client metadata
DEFAULT_CLIENT_PROPERTIES = {
:capabilities => {
:publisher_confirms => true,
:consumer_cancel_notify => true,
:exchange_exchange_bindings => true,
:"basic.nack" => true
},
:product => "Bunny",
:platform => ::RUBY_DESCRIPTION,
:version => Bunny::VERSION,
- :information => "http://github.com/ruby-amqp/bunny",
+ :information => "http://rubybunny.info",
}
DEFAULT_LOCALE = "en_GB"
#
# API
#
- attr_reader :status, :host, :port, :heartbeat, :user, :pass, :vhost, :frame_max
+ attr_reader :status, :host, :port, :heartbeat, :user, :pass, :vhost, :frame_max, :threaded
attr_reader :server_capabilities, :server_properties, :server_authentication_mechanisms, :server_locales
attr_reader :default_channel
attr_reader :channel_id_allocator
# Authentication mechanism, e.g. "PLAIN" or "EXTERNAL"
# @return [String]
attr_reader :mechanism
+ # @param [String, Hash] connection_string_or_opts Connection string or a hash of connection options
+ # @param [Hash] optz Extra options not related to connection
+ #
+ # @option connection_string_or_opts [String] :host ("127.0.0.1") Hostname or IP address to connect to
+ # @option connection_string_or_opts [Integer] :port (5672) Port RabbitMQ listens on
+ # @option connection_string_or_opts [String] :username ("guest") Username
+ # @option connection_string_or_opts [String] :password ("guest") Password
+ # @option connection_string_or_opts [String] :vhost ("/") Virtual host to use
+ # @option connection_string_or_opts [Integer] :heartbeat (600) Heartbeat interval. 0 means no heartbeat.
+ #
+ # @option optz [String] :auth_mechanism ("PLAIN") Authentication mechanism, PLAIN or EXTERNAL
+ # @option optz [String] :locale ("PLAIN") Locale RabbitMQ should use
+ #
+ # @see http://rubybunny.info/articles/connecting.html Connecting to RabbitMQ guide
+ # @api public
def initialize(connection_string_or_opts = Hash.new, optz = Hash.new)
opts = case (ENV["RABBITMQ_URL"] || connection_string_or_opts)
when nil then
Hash.new
when String then
@@ -75,10 +97,11 @@
@user = self.username_from(opts)
@pass = self.password_from(opts)
@vhost = self.vhost_from(opts)
@logfile = opts[:logfile]
@logging = opts[:logging] || false
+ @threaded = opts.fetch(:threaded, true)
@status = :not_connected
# these are negotiated with the broker during the connection tuning phase
@client_frame_max = opts.fetch(:frame_max, DEFAULT_FRAME_MAX)
@@ -93,41 +116,57 @@
@channels = Hash.new
@continuations = ::Queue.new
end
+ # @return [String] RabbitMQ hostname (or IP address) used
def hostname; self.host; end
+ # @return [String] Username used
def username; self.user; end
+ # @return [String] Password used
def password; self.pass; end
+ # @return [String] Virtual host used
def virtual_host; self.vhost; end
+ # @return [Boolean] true if this connection uses TLS (SSL)
def uses_tls?
@transport.uses_tls?
end
alias tls? uses_tls?
+ # @return [Boolean] true if this connection uses TLS (SSL)
def uses_ssl?
@transport.uses_ssl?
end
alias ssl? uses_ssl?
+ # Starts connection process
+ # @api public
def start
@continuations = ::Queue.new
@status = :connecting
self.initialize_transport
self.init_connection
self.open_connection
@event_loop = nil
- self.start_main_loop
+ self.start_main_loop if @threaded
@default_channel = self.create_channel
end
+ def read_write_timeout
+ @transport.read_write_timeout
+ end
+ # Opens a new channel and returns it. This method will block the calling
+ # thread until the response is received and the channel is guaranteed to be
+ # opened (this operation is very fast and inexpensive).
+ #
+ # @return [Bunny::Channel] Newly opened channel
def create_channel(n = nil)
if n && (ch = @channels[n])
ch
else
ch = Bunny::Channel.new(self, n)
@@ -135,15 +174,16 @@
ch
end
end
alias channel create_channel
+ # Closes the connection. This involves closing all of its channels.
def close
if @transport.open?
close_all_channels
- Bunny::Timer.timeout(@disconnect_timeout, ClientTimeout) do
+ Bunny::Timer.timeout(@transport.disconnect_timeout, ClientTimeout) do
self.close_connection(false)
end
end
end
alias stop close
@@ -164,107 +204,117 @@
def closed?
status == :closed
end
def open?
- status == :open || status == :connected
+ (status == :open || status == :connected) && @transport.open?
end
alias connected? open?
#
# Backwards compatibility
#
+ # @private
def queue(*args)
@default_channel.queue(*args)
end
+ # @private
def direct(*args)
@default_channel.direct(*args)
end
+ # @private
def fanout(*args)
@default_channel.fanout(*args)
end
+ # @private
def topic(*args)
@default_channel.topic(*args)
end
+ # @private
def headers(*args)
@default_channel.headers(*args)
end
+ # @private
def exchange(*args)
@default_channel.exchange(*args)
end
#
# Implementation
#
-
+ # @private
def open_channel(ch)
n = ch.number
self.register_channel(ch)
@transport.send_frame(AMQ::Protocol::Channel::Open.encode(n, AMQ::Protocol::EMPTY_STRING))
- @last_channel_open_ok = @continuations.pop
+ @last_channel_open_ok = wait_on_continuations
raise_if_continuation_resulted_in_a_connection_error!
@last_channel_open_ok
end
+ # @private
def close_channel(ch)
n = ch.number
@transport.send_frame(AMQ::Protocol::Channel::Close.encode(n, 200, "Goodbye", 0, 0))
- @last_channel_close_ok = @continuations.pop
+ @last_channel_close_ok = wait_on_continuations
raise_if_continuation_resulted_in_a_connection_error!
self.unregister_channel(ch)
@last_channel_close_ok
end
+ # @private
def close_all_channels
@channels.reject {|n, ch| n == 0 || !ch.open? }.each do |_, ch|
- Bunny::Timer.timeout(@disconnect_timeout, ClientTimeout) { ch.close }
+ Bunny::Timer.timeout(@transport.disconnect_timeout, ClientTimeout) { ch.close }
end
end
+ # @private
def close_connection(sync = true)
@transport.send_frame(AMQ::Protocol::Connection::Close.encode(200, "Goodbye", 0, 0))
maybe_shutdown_heartbeat_sender
@status = :not_connected
if sync
- @last_connection_close_ok = @continuations.pop
+ @last_connection_close_ok = wait_on_continuations
end
end
+ # @private
def handle_frame(ch_number, method)
# puts "Session#handle_frame on #{ch_number}: #{method.inspect}"
case method
when AMQ::Protocol::Channel::OpenOk then
@continuations.push(method)
when AMQ::Protocol::Channel::CloseOk then
@continuations.push(method)
when AMQ::Protocol::Connection::Close then
@last_connection_error = instantiate_connection_level_exception(method)
- @contunuations.push(method)
+ @continuations.push(method)
when AMQ::Protocol::Connection::CloseOk then
@last_connection_close_ok = method
begin
@continuations.clear
- @event_loop.stop
+ event_loop.stop
@event_loop = nil
@transport.close
- rescue Exception => e
+ rescue StandardError => e
puts e.class.name
puts e.message
puts e.backtrace
ensure
@active_continuation.notify_all if @active_continuation
@@ -278,14 +328,19 @@
self.unregister_channel(ch)
end
when AMQ::Protocol::Basic::GetEmpty then
@channels[ch_number].handle_basic_get_empty(method)
else
- @channels[ch_number].handle_method(method)
+ if ch = @channels[ch_number]
+ ch.handle_method(method)
+ else
+ # TODO: log a warning
+ end
end
end
+ # @private
def raise_if_continuation_resulted_in_a_connection_error!
raise @last_connection_error if @last_connection_error
end
def handle_frameset(ch_number, frames)
@@ -301,11 +356,14 @@
else
@channels[ch_number].handle_frameset(*frames)
end
end
+ # @private
def handle_network_failure(exception)
+ raise NetworkErrorWrapper.new(exception) unless @threaded
+
if !recovering_from_network_failure?
@recovering_from_network_failure = true
if recoverable_network_failure?(exception)
# puts "Recovering from a network failure..."
@channels.each do |n, ch|
@@ -318,19 +376,22 @@
# TODO: investigate if we can be a bit smarter here. MK.
end
end
end
+ # @private
def recoverable_network_failure?(exception)
# TODO: investigate if we can be a bit smarter here. MK.
true
end
+ # @private
def recovering_from_network_failure?
@recovering_from_network_failure
end
+ # @private
def recover_from_network_failure
begin
# puts "About to start recovery..."
start
@@ -344,10 +405,11 @@
sleep 5.0
retry if recoverable_network_failure?(e)
end
end
+ # @private
def recover_channels
# default channel is reopened right after connection
# negotiation is completed, so make sure we do not try to open
# it twice. MK.
@channels.reject { |n, ch| ch == @default_channel }.each do |n, ch|
@@ -355,14 +417,16 @@
ch.recover_from_network_failure
end
end
+ # @private
def send_raw(*args)
@transport.write(*args)
end
+ # @private
def instantiate_connection_level_exception(frame)
case frame
when AMQ::Protocol::Connection::Close then
klass = case frame.reply_code
when 503 then
@@ -377,68 +441,84 @@
klass.new("Connection-level error: #{frame.reply_text}", self, frame)
end
end
+ # @private
def hostname_from(options)
options[:host] || options[:hostname] || DEFAULT_HOST
end
+ # @private
def port_from(options)
fallback = if options[:tls] || options[:ssl]
AMQ::Protocol::TLS_PORT
else
AMQ::Protocol::DEFAULT_PORT
end
options.fetch(:port, fallback)
end
+ # @private
def vhost_from(options)
options[:virtual_host] || options[:vhost] || DEFAULT_VHOST
end
+ # @private
def username_from(options)
options[:username] || options[:user] || DEFAULT_USER
end
+ # @private
def password_from(options)
options[:password] || options[:pass] || options [:pwd] || DEFAULT_PASSWORD
end
+ # @private
def heartbeat_from(options)
options[:heartbeat] || options[:heartbeat_interval] || options[:requested_heartbeat] || DEFAULT_HEARTBEAT
end
+ # @private
def next_channel_id
@channel_id_allocator.next_channel_id
end
+ # @private
def release_channel_id(i)
@channel_id_allocator.release_channel_id(i)
end
+ # @private
def register_channel(ch)
@channel_mutex.synchronize do
@channels[ch.number] = ch
end
end
+ # @private
def unregister_channel(ch)
@channel_mutex.synchronize do
n = ch.number
self.release_channel_id(n)
@channels.delete(ch.number)
end
end
+ # @private
def start_main_loop
- @event_loop = MainLoop.new(@transport, self)
- @event_loop.start
+ event_loop.start
end
+ # @private
+ def event_loop
+ @event_loop ||= MainLoop.new(@transport, self)
+ end
+
+ # @private
def signal_activity!
@heartbeat_sender.signal_activity! if @heartbeat_sender
end
@@ -456,11 +536,11 @@
end
# Sends multiple frames, one by one. For thread safety this method takes a channel
# object and synchronizes on it.
#
- # @api public
+ # @api private
def send_frameset(frames, channel)
# some developers end up sharing channels between threads and when multiple
# threads publish on the same channel aggressively, at some point frames will be
# delivered out of order and broker will raise 505 UNEXPECTED_FRAME exception.
# If we synchronize on the channel, however, this is both thread safe and pretty fine-grained
@@ -471,10 +551,11 @@
end
end # send_frameset(frames)
protected
+ # @api private
def init_connection
self.send_preamble
connection_start = @transport.read_next_frame.decode_payload
@@ -485,10 +566,11 @@
@server_locales = Array(connection_start.locales)
@status = :connected
end
+ # @api private
def open_connection
@transport.send_frame(AMQ::Protocol::Connection::StartOk.encode(@client_properties, @mechanism, self.encode_credentials(username, password), @locale))
frame = begin
@transport.read_next_frame
@@ -537,47 +619,59 @@
end
raise "could not open connection: server did not respond with connection.open-ok" unless connection_open_ok.is_a?(AMQ::Protocol::Connection::OpenOk)
end
+ # @api private
def negotiate_value(client_value, server_value)
if client_value == 0 || server_value == 0
[client_value, server_value].max
else
[client_value, server_value].min
end
end
+ # @api private
def initialize_heartbeat_sender
# puts "Initializing heartbeat sender..."
@heartbeat_sender = HeartbeatSender.new(@transport)
@heartbeat_sender.start(@heartbeat)
end
+ # @api private
def maybe_shutdown_heartbeat_sender
@heartbeat_sender.stop if @heartbeat_sender
end
-
+ # @api private
def initialize_transport
@transport = Transport.new(self, @host, @port, @opts)
end
# Sends AMQ protocol header (also known as preamble).
+ # @api private
def send_preamble
@transport.send_raw(AMQ::Protocol::PREAMBLE)
end
-
-
- # @api plugin
+ # @api private
def encode_credentials(username, password)
@credentials_encoder.encode_credentials(username, password)
end # encode_credentials(username, password)
+ # @api private
def credentials_encoder_for(mechanism)
Authentication::CredentialsEncoder.for_session(self)
+ end
+
+ # @api private
+ def wait_on_continuations
+ unless @threaded
+ event_loop.run_once until @continuations.length > 0
+ end
+
+ @continuations.pop
end
end # Session
# backwards compatibility
Client = Session