lib/bunny/session.rb in bunny-0.9.0.pre13 vs lib/bunny/session.rb in bunny-0.9.0.rc1
- old
+ new
@@ -52,19 +52,23 @@
:platform => ::RUBY_DESCRIPTION,
:version => Bunny::VERSION,
:information => "http://rubybunny.info",
}
+ # @private
DEFAULT_LOCALE = "en_GB"
+ # Default reconnection interval for TCP connection failures
DEFAULT_NETWORK_RECOVERY_INTERVAL = 5.0
#
# API
#
+ # @return [Bunny::Transport]
+ attr_reader :transport
attr_reader :status, :host, :port, :heartbeat, :user, :pass, :vhost, :frame_max, :threaded
attr_reader :server_capabilities, :server_properties, :server_authentication_mechanisms, :server_locales
attr_reader :default_channel
attr_reader :channel_id_allocator
# Authentication mechanism, e.g. "PLAIN" or "EXTERNAL"
@@ -136,10 +140,12 @@
# mutex for the channel id => channel hash
@channel_mutex = Mutex.new
@channels = Hash.new
self.reset_continuations
+
+ self.initialize_transport
end
# @return [String] RabbitMQ hostname (or IP address) used
def hostname; self.host; end
# @return [String] Username used
@@ -167,11 +173,24 @@
# @return [Boolean] true if this connection uses a separate thread for I/O activity
def threaded?
@threaded
end
- # Starts connection process
+ def configure_socket(&block)
+ raise ArgumentError, "No block provided!" if block.nil?
+
+ if @transport
+ @transport.configure_socket(&block)
+ else
+ @socket_configurator = block
+ end
+ end
+
+ # Starts the connection process.
+ #
+ # @see http://rubybunny.info/articles/getting_started.html
+ # @see http://rubybunny.info/articles/connecting.html
# @api public
def start
return self if connected?
@status = :connecting
@@ -181,10 +200,17 @@
# close existing transport if we have one,
# to not leak sockets
self.maybe_close_transport
self.initialize_transport
+ @transport.initialize_socket
+ @transport.connect
+
+ if @socket_configurator
+ @transport.configure_socket(&@socket_configurator)
+ end
+
self.init_connection
self.open_connection
@reader_loop = nil
self.start_reader_loop if @threaded
@@ -196,10 +222,13 @@
end
self
end
+ # Socket operation timeout used by this connection
+ # @return [Integer]
+ # @private
def read_write_timeout
@transport.read_write_timeout
end
# Opens a new channel and returns it. This method will block the calling
@@ -395,10 +424,11 @@
# @private
def raise_if_continuation_resulted_in_a_connection_error!
raise @last_connection_error if @last_connection_error
end
+ # @private
def handle_frameset(ch_number, frames)
method = frames.first
case method
when AMQ::Protocol::Basic::GetOk then
@@ -484,15 +514,19 @@
when 320 then
ConnectionForced
when 501 then
FrameError
when 503 then
- InvalidCommand
+ CommandInvalid
when 504 then
ChannelError
when 505 then
UnexpectedFrame
+ when 506 then
+ ResourceError
+ when 541 then
+ InternalError
else
raise "Unknown reply code: #{frame.reply_code}, text: #{frame.reply_text}"
end
klass.new("Connection-level error: #{frame.reply_text}", self, frame)
@@ -613,11 +647,11 @@
end
# Sends multiple frames, one by one. For thread safety this method takes a channel
# object and synchronizes on it.
#
- # @api private
+ # @private
def send_frameset(frames, channel)
# some developers end up sharing channels between threads and when multiple
# threads publish on the same channel aggressively, at some point frames will be
# delivered out of order and broker will raise 505 UNEXPECTED_FRAME exception.
# If we synchronize on the channel, however, this is both thread safe and pretty fine-grained
@@ -630,11 +664,11 @@
# Sends multiple frames, one by one. For thread safety this method takes a channel
# object and synchronizes on it. Uses transport implementation that does not perform
# timeout control.
#
- # @api private
+ # @private
def send_frameset_without_timeout(frames, channel)
# some developers end up sharing channels between threads and when multiple
# threads publish on the same channel aggressively, at some point frames will be
# delivered out of order and broker will raise 505 UNEXPECTED_FRAME exception.
# If we synchronize on the channel, however, this is both thread safe and pretty fine-grained
@@ -651,11 +685,11 @@
"#<#{self.class.name}:#{object_id} #{@user}@#{@host}:#{@port}, vhost=#{@vhost}>"
end
protected
- # @api private
+ # @private
def init_connection
self.send_preamble
connection_start = @transport.read_next_frame.decode_payload
@@ -666,11 +700,11 @@
@server_locales = Array(connection_start.locales)
@status = :connected
end
- # @api private
+ # @private
def open_connection
@transport.send_frame(AMQ::Protocol::Connection::StartOk.encode(@client_properties, @mechanism, self.encode_credentials(username, password), @locale))
@logger.debug "Sent connection.start-ok"
frame = begin
@@ -730,91 +764,91 @@
def heartbeat_disabled?(val)
0 == val || val.nil?
end
- # @api private
+ # @private
def negotiate_value(client_value, server_value)
return server_value if client_value == :server
if client_value == 0 || server_value == 0
[client_value, server_value].max
else
[client_value, server_value].min
end
end
- # @api private
+ # @private
def initialize_heartbeat_sender
@logger.debug "Initializing heartbeat sender..."
@heartbeat_sender = HeartbeatSender.new(@transport, @logger)
@heartbeat_sender.start(@heartbeat)
end
- # @api private
+ # @private
def maybe_shutdown_heartbeat_sender
@heartbeat_sender.stop if @heartbeat_sender
end
- # @api private
+ # @private
def initialize_transport
@transport = Transport.new(self, @host, @port, @opts.merge(:session_thread => Thread.current))
end
- # @api private
+ # @private
def maybe_close_transport
@transport.close if @transport
end
# Sends AMQ protocol header (also known as preamble).
- # @api private
+ # @private
def send_preamble
@transport.write(AMQ::Protocol::PREAMBLE)
@logger.debug "Sent protocol preamble"
end
- # @api private
+ # @private
def encode_credentials(username, password)
@credentials_encoder.encode_credentials(username, password)
end # encode_credentials(username, password)
- # @api private
+ # @private
def credentials_encoder_for(mechanism)
Authentication::CredentialsEncoder.for_session(self)
end
if defined?(JRUBY_VERSION)
- # @api private
+ # @private
def reset_continuations
@continuations = Concurrent::LinkedContinuationQueue.new
end
else
- # @api private
+ # @private
def reset_continuations
@continuations = Concurrent::ContinuationQueue.new
end
end
- # @api private
+ # @private
def wait_on_continuations
unless @threaded
reader_loop.run_once until @continuations.length > 0
end
@continuations.poll(@continuation_timeout)
end
- # @api private
+ # @private
def init_logger(level)
@logger = ::Logger.new(@logfile)
@logger.level = normalize_log_level(level)
@logger.progname = self.to_s
@logger
end
- # @api private
+ # @private
def normalize_log_level(level)
case level
when :debug, Logger::DEBUG, "debug" then Logger::DEBUG
when :info, Logger::INFO, "info" then Logger::INFO
when :warn, Logger::WARN, "warn" then Logger::WARN