lib/bunny/session.rb in bunny-0.9.0.pre8 vs lib/bunny/session.rb in bunny-0.9.0.pre9
- old
+ new
@@ -7,11 +7,15 @@
require "bunny/main_loop"
require "bunny/authentication/credentials_encoder"
require "bunny/authentication/plain_mechanism_encoder"
require "bunny/authentication/external_mechanism_encoder"
-require "bunny/concurrent/condition"
+if defined?(JRUBY_VERSION)
+ require "bunny/concurrent/linked_continuation_queue"
+else
+ require "bunny/concurrent/continuation_queue"
+end
require "amq/protocol/client"
require "amq/settings"
module Bunny
@@ -26,11 +30,11 @@
# Default username used for connection
DEFAULT_USER = "guest"
# Default password used for connection
DEFAULT_PASSWORD = "guest"
# Default heartbeat interval, the same value as RabbitMQ 3.0 uses.
- DEFAULT_HEARTBEAT = 600
+ DEFAULT_HEARTBEAT = :server
# @private
DEFAULT_FRAME_MAX = 131072
# backwards compatibility
# @private
@@ -50,11 +54,13 @@
:information => "http://rubybunny.info",
}
DEFAULT_LOCALE = "en_GB"
+ DEFAULT_NETWORK_RECOVERY_INTERVAL = 5.0
+
#
# API
#
attr_reader :status, :host, :port, :heartbeat, :user, :pass, :vhost, :frame_max, :threaded
@@ -105,10 +111,11 @@
@automatically_recover = if opts[:automatically_recover].nil? && opts[:automatic_recovery].nil?
true
else
opts[:automatically_recover] || opts[:automatic_recovery]
end
+ @network_recovery_interval = opts.fetch(:network_recovery_interval, DEFAULT_NETWORK_RECOVERY_INTERVAL)
@status = :not_connected
# these are negotiated with the broker during the connection tuning phase
@client_frame_max = opts.fetch(:frame_max, DEFAULT_FRAME_MAX)
@@ -122,11 +129,11 @@
# mutex for the channel id => channel hash
@channel_mutex = Mutex.new
@network_mutex = Mutex.new
@channels = Hash.new
- @continuations = ::Queue.new
+ self.reset_continuations
end
# @return [String] RabbitMQ hostname (or IP address) used
def hostname; self.host; end
# @return [String] Username used
@@ -134,10 +141,13 @@
# @return [String] Password used
def password; self.pass; end
# @return [String] Virtual host used
def virtual_host; self.vhost; end
+ # @return [Integer] Heartbeat interval used
+ def heartbeat_interval; self.heartbeat; end
+
# @return [Boolean] true if this connection uses TLS (SSL)
def uses_tls?
@transport.uses_tls?
end
alias tls? uses_tls?
@@ -146,25 +156,34 @@
def uses_ssl?
@transport.uses_ssl?
end
alias ssl? uses_ssl?
+ # @return [Boolean] true if this connection uses a separate thread for I/O activity
+ def threaded?
+ @threaded
+ end
+
# Starts connection process
# @api public
def start
- @continuations = ::Queue.new
+ return self if connected?
+
@status = :connecting
+ self.reset_continuations
self.initialize_transport
self.init_connection
self.open_connection
@event_loop = nil
self.start_main_loop if @threaded
@default_channel = self.create_channel
+
+ self
end
def read_write_timeout
@transport.read_write_timeout
end
@@ -195,32 +214,37 @@
end
end
end
alias stop close
+ # Creates a temporary channel, yields it to the block given to this
+ # method and closes it.
+ #
+ # @return [Bunny::Session] self
def with_channel(n = nil)
ch = create_channel(n)
yield ch
ch.close
self
end
-
+ # @return [Boolean] true if this connection is still not fully open
def connecting?
status == :connecting
end
def closed?
status == :closed
end
def open?
- (status == :open || status == :connected) && @transport.open?
+ (status == :open || status == :connected || status == :connecting) && @transport.open?
end
alias connected? open?
+ # @return [Boolean] true if this connection has automatic recovery from network failure enabled
def automatically_recover?
@automatically_recover
end
#
@@ -316,10 +340,12 @@
when AMQ::Protocol::Channel::CloseOk then
@continuations.push(method)
when AMQ::Protocol::Connection::Close then
@last_connection_error = instantiate_connection_level_exception(method)
@continuations.push(method)
+
+ raise @last_connection_error
when AMQ::Protocol::Connection::CloseOk then
@last_connection_close_ok = method
begin
@continuations.clear
@@ -330,11 +356,11 @@
rescue StandardError => e
puts e.class.name
puts e.message
puts e.backtrace
ensure
- @continuations.push(nil)
+ @continuations.push(:__unblock__)
end
when AMQ::Protocol::Channel::Close then
begin
ch = @channels[ch_number]
ch.handle_method(method)
@@ -374,10 +400,12 @@
# @private
def handle_network_failure(exception)
raise NetworkErrorWrapper.new(exception) unless @threaded
+ @status = :disconnected
+
if !recovering_from_network_failure?
@recovering_from_network_failure = true
if recoverable_network_failure?(exception)
# puts "Recovering from a network failure..."
@channels.each do |n, ch|
@@ -404,21 +432,22 @@
end
# @private
def recover_from_network_failure
begin
+ sleep @network_recovery_interval
# puts "About to start recovery..."
start
if open?
@recovering_from_network_failure = false
recover_channels
end
rescue TCPConnectionFailed, AMQ::Protocol::EmptyResponseError => e
# puts "TCP connection failed, reconnecting in 5 seconds"
- sleep 5.0
+ sleep @network_recovery_interval
retry if recoverable_network_failure?(e)
end
end
# @private
@@ -441,15 +470,17 @@
# @private
def instantiate_connection_level_exception(frame)
case frame
when AMQ::Protocol::Connection::Close then
klass = case frame.reply_code
+ when 320 then
+ ConnectionForced
when 503 then
InvalidCommand
when 504 then
ChannelError
- when 504 then
+ when 505 then
UnexpectedFrame
else
raise "Unknown reply code: #{frame.reply_code}, text: #{frame.reply_text}"
end
@@ -529,10 +560,15 @@
def event_loop
@event_loop ||= MainLoop.new(@transport, self, Thread.current)
end
# @private
+ def maybe_shutdown_main_loop
+ @event_loop.stop if @event_loop
+ end
+
+ # @private
def signal_activity!
@heartbeat_sender.signal_activity! if @heartbeat_sender
end
@@ -631,11 +667,11 @@
connection_tune = frame.decode_payload
@frame_max = negotiate_value(@client_frame_max, connection_tune.frame_max)
@channel_max = negotiate_value(@client_channel_max, connection_tune.channel_max)
# this allows for disabled heartbeats. MK.
- @heartbeat = if 0 == @client_heartbeat || @client_heartbeat.nil?
+ @heartbeat = if heartbeat_disabled?(@client_heartbeat)
0
else
negotiate_value(@client_heartbeat, connection_tune.heartbeat)
end
@@ -663,12 +699,18 @@
end
raise "could not open connection: server did not respond with connection.open-ok" unless connection_open_ok.is_a?(AMQ::Protocol::Connection::OpenOk)
end
+ def heartbeat_disabled?(val)
+ 0 == val || val.nil?
+ end
+
# @api private
def negotiate_value(client_value, server_value)
+ return server_value if client_value == :server
+
if client_value == 0 || server_value == 0
[client_value, server_value].max
else
[client_value, server_value].min
end
@@ -704,9 +746,18 @@
end # encode_credentials(username, password)
# @api private
def credentials_encoder_for(mechanism)
Authentication::CredentialsEncoder.for_session(self)
+ end
+
+ # @api private
+ def reset_continuations
+ @continuations = if defined?(JRUBY_VERSION)
+ Concurrent::LinkedContinuationQueue.new
+ else
+ Concurrent::ContinuationQueue.new
+ end
end
# @api private
def wait_on_continuations
unless @threaded