lib/bunny/session.rb in bunny-0.9.0.pre10 vs lib/bunny/session.rb in bunny-0.9.0.pre11
- old
+ new
@@ -2,11 +2,11 @@
require "thread"
require "bunny/transport"
require "bunny/channel_id_allocator"
require "bunny/heartbeat_sender"
-require "bunny/main_loop"
+require "bunny/reader_loop"
require "bunny/authentication/credentials_encoder"
require "bunny/authentication/plain_mechanism_encoder"
require "bunny/authentication/external_mechanism_encoder"
if defined?(JRUBY_VERSION)
@@ -68,10 +68,14 @@
attr_reader :default_channel
attr_reader :channel_id_allocator
# Authentication mechanism, e.g. "PLAIN" or "EXTERNAL"
# @return [String]
attr_reader :mechanism
+ # @return [Logger]
+ attr_reader :logger
+ # @return [Integer] Timeout for blocking protocol operations (queue.declare, queue.bind, etc), in milliseconds. Default is 4000.
+ attr_reader :continuation_timeout
# @param [String, Hash] connection_string_or_opts Connection string or a hash of connection options
# @param [Hash] optz Extra options not related to connection
#
@@ -101,21 +105,24 @@
@host = self.hostname_from(opts)
@port = self.port_from(opts)
@user = self.username_from(opts)
@pass = self.password_from(opts)
@vhost = self.vhost_from(opts)
- @logfile = opts[:logfile]
- @logging = opts[:logging] || false
+ @logfile = opts[:log_file] || opts[:logfile] || STDOUT
@threaded = opts.fetch(:threaded, true)
+ self.init_logger(opts[:log_level] || ENV["BUNNY_LOG_LEVEL"] || Logger::WARN)
+
# should automatic recovery from network failures be used?
@automatically_recover = if opts[:automatically_recover].nil? && opts[:automatic_recovery].nil?
true
else
opts[:automatically_recover] || opts[:automatic_recovery]
end
@network_recovery_interval = opts.fetch(:network_recovery_interval, DEFAULT_NETWORK_RECOVERY_INTERVAL)
+ # in ms
+ @continuation_timeout = opts.fetch(:continuation_timeout, 4000)
@status = :not_connected
# these are negotiated with the broker during the connection tuning phase
@client_frame_max = opts.fetch(:frame_max, DEFAULT_FRAME_MAX)
@@ -126,11 +133,10 @@
@mechanism = opts.fetch(:auth_mechanism, "PLAIN")
@credentials_encoder = credentials_encoder_for(@mechanism)
@locale = @opts.fetch(:locale, DEFAULT_LOCALE)
# mutex for the channel id => channel hash
@channel_mutex = Mutex.new
- @network_mutex = Mutex.new
@channels = Hash.new
self.reset_continuations
end
@@ -178,12 +184,12 @@
self.initialize_transport
self.init_connection
self.open_connection
- @event_loop = nil
- self.start_main_loop if @threaded
+ @reader_loop = nil
+ self.start_reader_loop if @threaded
@default_channel = self.create_channel
rescue Exception => e
@status = :not_connected
raise e
@@ -199,15 +205,15 @@
# Opens a new channel and returns it. This method will block the calling
# thread until the response is received and the channel is guaranteed to be
# opened (this operation is very fast and inexpensive).
#
# @return [Bunny::Channel] Newly opened channel
- def create_channel(n = nil)
+ def create_channel(n = nil, consumer_pool_size = 1)
if n && (ch = @channels[n])
ch
else
- ch = Bunny::Channel.new(self, n)
+ ch = Bunny::Channel.new(self, n, ConsumerWorkPool.new(consumer_pool_size || 1))
ch.open
ch
end
end
alias channel create_channel
@@ -339,11 +345,11 @@
end
end
# @private
def handle_frame(ch_number, method)
- # puts "Session#handle_frame on #{ch_number}: #{method.inspect}"
+ @logger.debug "Session#handle_frame on #{ch_number}: #{method.inspect}"
case method
when AMQ::Protocol::Channel::OpenOk then
@continuations.push(method)
when AMQ::Protocol::Channel::CloseOk then
@continuations.push(method)
@@ -355,18 +361,18 @@
when AMQ::Protocol::Connection::CloseOk then
@last_connection_close_ok = method
begin
@continuations.clear
- event_loop.stop
- @event_loop = nil
+ reader_loop.stop
+ @reader_loop = nil
@transport.close
rescue StandardError => e
- puts e.class.name
- puts e.message
- puts e.backtrace
+ @logger.error e.class.name
+ @logger.error e.message
+ @logger.error e.backtrace
ensure
@continuations.push(:__unblock__)
end
when AMQ::Protocol::Channel::Close then
begin
@@ -379,11 +385,11 @@
@channels[ch_number].handle_basic_get_empty(method)
else
if ch = @channels[ch_number]
ch.handle_method(method)
else
- # TODO: log a warning
+ @logger.warn "Channel #{ch_number} is not open on this connection!"
end
end
end
# @private
@@ -413,11 +419,11 @@
@status = :disconnected
if !recovering_from_network_failure?
@recovering_from_network_failure = true
if recoverable_network_failure?(exception)
- # puts "Recovering from a network failure..."
+ @logger.warn "Recovering from a network failure..."
@channels.each do |n, ch|
ch.maybe_kill_consumer_work_pool!
end
maybe_shutdown_heartbeat_sender
@@ -441,20 +447,20 @@
# @private
def recover_from_network_failure
begin
sleep @network_recovery_interval
- # puts "About to start recovery..."
+ @logger.debug "About to start connection recovery..."
start
if open?
@recovering_from_network_failure = false
recover_channels
end
rescue TCPConnectionFailed, AMQ::Protocol::EmptyResponseError => e
- # puts "TCP connection failed, reconnecting in 5 seconds"
+ @logger.warn "TCP connection failed, reconnecting in 5 seconds"
sleep @network_recovery_interval
retry if recoverable_network_failure?(e)
end
end
@@ -469,21 +475,18 @@
ch.recover_from_network_failure
end
end
# @private
- def send_raw(data)
- @transport.write(data)
- end
-
- # @private
def instantiate_connection_level_exception(frame)
case frame
when AMQ::Protocol::Connection::Close then
klass = case frame.reply_code
when 320 then
ConnectionForced
+ when 501 then
+ FrameError
when 503 then
InvalidCommand
when 504 then
ChannelError
when 505 then
@@ -558,22 +561,22 @@
@channels.delete(ch.number)
end
end
# @private
- def start_main_loop
- event_loop.start
+ def start_reader_loop
+ reader_loop.start
end
# @private
- def event_loop
- @event_loop ||= MainLoop.new(@transport, self, Thread.current)
+ def reader_loop
+ @reader_loop ||= ReaderLoop.new(@transport, self, Thread.current)
end
# @private
- def maybe_shutdown_main_loop
- @event_loop.stop if @event_loop
+ def maybe_shutdown_reader_loop
+ @reader_loop.stop if @reader_loop
end
# @private
def signal_activity!
@heartbeat_sender.signal_activity! if @heartbeat_sender
@@ -583,29 +586,31 @@
# Sends frame to the peer, checking that connection is open.
# Exposed primarily for Bunny::Channel
#
# @raise [ConnectionClosedError]
# @private
- def send_frame(frame)
+ def send_frame(frame, signal_activity = true)
if closed?
raise ConnectionClosedError.new(frame)
else
- @network_mutex.synchronize { @transport.write(frame.encode) }
+ @transport.write(frame.encode)
+ signal_activity! if signal_activity
end
end
# Sends frame to the peer, checking that connection is open.
# Uses transport implementation that does not perform
# timeout control. Exposed primarily for Bunny::Channel.
#
# @raise [ConnectionClosedError]
# @private
- def send_frame_without_timeout(frame)
+ def send_frame_without_timeout(frame, signal_activity = true)
if closed?
raise ConnectionClosedError.new(frame)
else
- @network_mutex.synchronize { @transport.write_without_timeout(frame.encode) }
+ @transport.write_without_timeout(frame.encode)
+ signal_activity! if signal_activity
end
end
# Sends multiple frames, one by one. For thread safety this method takes a channel
# object and synchronizes on it.
@@ -616,12 +621,12 @@
# threads publish on the same channel aggressively, at some point frames will be
# delivered out of order and broker will raise 505 UNEXPECTED_FRAME exception.
# If we synchronize on the channel, however, this is both thread safe and pretty fine-grained
# locking. Note that "single frame" methods do not need this kind of synchronization. MK.
channel.synchronize do
- frames.each { |frame| self.send_frame(frame) }
- @transport.flush
+ frames.each { |frame| self.send_frame(frame, false) }
+ signal_activity!
end
end # send_frameset(frames)
# Sends multiple frames, one by one. For thread safety this method takes a channel
# object and synchronizes on it. Uses transport implementation that does not perform
@@ -633,14 +638,21 @@
# threads publish on the same channel aggressively, at some point frames will be
# delivered out of order and broker will raise 505 UNEXPECTED_FRAME exception.
# If we synchronize on the channel, however, this is both thread safe and pretty fine-grained
# locking. Note that "single frame" methods do not need this kind of synchronization. MK.
channel.synchronize do
- frames.each { |frame| self.send_frame_without_timeout(frame) }
+ frames.each { |frame| self.send_frame_without_timeout(frame, false) }
+ signal_activity!
end
end # send_frameset_without_timeout(frames)
+ # @return [String]
+ # @api public
+ def to_s
+ "Bunny::Session #{@user}@#{@host}:#{@port}, vhost=#{@vhost}"
+ end
+
protected
# @api private
def init_connection
self.send_preamble
@@ -657,20 +669,22 @@
end
# @api private
def open_connection
@transport.send_frame(AMQ::Protocol::Connection::StartOk.encode(@client_properties, @mechanism, self.encode_credentials(username, password), @locale))
+ @logger.debug "Sent connection.start-ok"
frame = begin
@transport.read_next_frame
# frame timeout means the broker has closed the TCP connection, which it
# does per 0.9.1 spec.
rescue Errno::ECONNRESET, ClientTimeout, AMQ::Protocol::EmptyResponseError, EOFError, IOError => e
nil
end
if frame.nil?
@state = :closed
+ @logger.error "RabbitMQ closed TCP connection before AMQP 0.9.1 connection was finalized. Most likely this means authentication failure."
raise Bunny::PossibleAuthenticationFailureError.new(self.user, self.vhost, self.password.size)
end
connection_tune = frame.decode_payload
@@ -680,25 +694,30 @@
@heartbeat = if heartbeat_disabled?(@client_heartbeat)
0
else
negotiate_value(@client_heartbeat, connection_tune.heartbeat)
end
+ @logger.debug "Heartbeat interval negotiation: client = #{@client_heartbeat}, server = #{connection_tune.heartbeat}, result = #{@heartbeat}"
+ @logger.info "Heartbeat interval used (in seconds): #{@heartbeat}"
@channel_id_allocator = ChannelIdAllocator.new(@channel_max)
@transport.send_frame(AMQ::Protocol::Connection::TuneOk.encode(@channel_max, @frame_max, @heartbeat))
+ @logger.debug "Sent connection.tune-ok with heartbeat interval = #{@heartbeat}, frame_max = #{@frame_max}, channel_max = #{@channel_max}"
@transport.send_frame(AMQ::Protocol::Connection::Open.encode(self.vhost))
+ @logger.debug "Sent connection.open with vhost = #{self.vhost}"
frame2 = begin
@transport.read_next_frame
# frame timeout means the broker has closed the TCP connection, which it
# does per 0.9.1 spec.
rescue Errno::ECONNRESET, ClientTimeout, AMQ::Protocol::EmptyResponseError, EOFError => e
nil
end
if frame2.nil?
@state = :closed
+ @logger.warn "RabbitMQ closed TCP connection before AMQP 0.9.1 connection was finalized. Most likely this means authentication failure."
raise Bunny::PossibleAuthenticationFailureError.new(self.user, self.vhost, self.password.size)
end
connection_open_ok = frame2.decode_payload
@status = :open
@@ -724,12 +743,12 @@
end
end
# @api private
def initialize_heartbeat_sender
- # puts "Initializing heartbeat sender..."
- @heartbeat_sender = HeartbeatSender.new(@transport)
+ @logger.debug "Initializing heartbeat sender..."
+ @heartbeat_sender = HeartbeatSender.new(@transport, @logger)
@heartbeat_sender.start(@heartbeat)
end
# @api private
def maybe_shutdown_heartbeat_sender
@@ -747,11 +766,12 @@
end
# Sends AMQ protocol header (also known as preamble).
# @api private
def send_preamble
- @transport.send_raw(AMQ::Protocol::PREAMBLE)
+ @transport.write(AMQ::Protocol::PREAMBLE)
+ @logger.debug "Sent protocol preamble"
end
# @api private
def encode_credentials(username, password)
@@ -761,25 +781,50 @@
# @api private
def credentials_encoder_for(mechanism)
Authentication::CredentialsEncoder.for_session(self)
end
- # @api private
- def reset_continuations
- @continuations = if defined?(JRUBY_VERSION)
- Concurrent::LinkedContinuationQueue.new
- else
- Concurrent::ContinuationQueue.new
- end
+ if defined?(JRUBY_VERSION)
+ # @api private
+ def reset_continuations
+ @continuations = Concurrent::LinkedContinuationQueue.new
+ end
+ else
+ # @api private
+ def reset_continuations
+ @continuations = Concurrent::ContinuationQueue.new
+ end
end
# @api private
def wait_on_continuations
unless @threaded
- event_loop.run_once until @continuations.length > 0
+ reader_loop.run_once until @continuations.length > 0
end
- @continuations.pop
+ @continuations.poll(@continuation_timeout)
+ end
+
+ # @api private
+ def init_logger(level)
+ @logger = ::Logger.new(@logfile)
+ @logger.level = normalize_log_level(level)
+ @logger.progname = self.to_s
+
+ @logger
+ end
+
+ # @api private
+ def normalize_log_level(level)
+ case level
+ when :debug, Logger::DEBUG, "debug" then Logger::DEBUG
+ when :info, Logger::INFO, "info" then Logger::INFO
+ when :warn, Logger::WARN, "warn" then Logger::WARN
+ when :error, Logger::ERROR, "error" then Logger::ERROR
+ when :fatal, Logger::FATAL, "fatal" then Logger::FATAL
+ else
+ Logger::WARN
+ end
end
end # Session
# backwards compatibility
Client = Session