lib/bunny/session.rb in bunny-0.9.0.pre13 vs lib/bunny/session.rb in bunny-0.9.0.rc1

- old
+ new

@@ -52,19 +52,23 @@ :platform => ::RUBY_DESCRIPTION, :version => Bunny::VERSION, :information => "http://rubybunny.info", } + # @private DEFAULT_LOCALE = "en_GB" + # Default reconnection interval for TCP connection failures DEFAULT_NETWORK_RECOVERY_INTERVAL = 5.0 # # API # + # @return [Bunny::Transport] + attr_reader :transport attr_reader :status, :host, :port, :heartbeat, :user, :pass, :vhost, :frame_max, :threaded attr_reader :server_capabilities, :server_properties, :server_authentication_mechanisms, :server_locales attr_reader :default_channel attr_reader :channel_id_allocator # Authentication mechanism, e.g. "PLAIN" or "EXTERNAL" @@ -136,10 +140,12 @@ # mutex for the channel id => channel hash @channel_mutex = Mutex.new @channels = Hash.new self.reset_continuations + + self.initialize_transport end # @return [String] RabbitMQ hostname (or IP address) used def hostname; self.host; end # @return [String] Username used @@ -167,11 +173,24 @@ # @return [Boolean] true if this connection uses a separate thread for I/O activity def threaded? @threaded end - # Starts connection process + def configure_socket(&block) + raise ArgumentError, "No block provided!" if block.nil? + + if @transport + @transport.configure_socket(&block) + else + @socket_configurator = block + end + end + + # Starts the connection process. + # + # @see http://rubybunny.info/articles/getting_started.html + # @see http://rubybunny.info/articles/connecting.html # @api public def start return self if connected? @status = :connecting @@ -181,10 +200,17 @@ # close existing transport if we have one, # to not leak sockets self.maybe_close_transport self.initialize_transport + @transport.initialize_socket + @transport.connect + + if @socket_configurator + @transport.configure_socket(&@socket_configurator) + end + self.init_connection self.open_connection @reader_loop = nil self.start_reader_loop if @threaded @@ -196,10 +222,13 @@ end self end + # Socket operation timeout used by this connection + # @return [Integer] + # @private def read_write_timeout @transport.read_write_timeout end # Opens a new channel and returns it. This method will block the calling @@ -395,10 +424,11 @@ # @private def raise_if_continuation_resulted_in_a_connection_error! raise @last_connection_error if @last_connection_error end + # @private def handle_frameset(ch_number, frames) method = frames.first case method when AMQ::Protocol::Basic::GetOk then @@ -484,15 +514,19 @@ when 320 then ConnectionForced when 501 then FrameError when 503 then - InvalidCommand + CommandInvalid when 504 then ChannelError when 505 then UnexpectedFrame + when 506 then + ResourceError + when 541 then + InternalError else raise "Unknown reply code: #{frame.reply_code}, text: #{frame.reply_text}" end klass.new("Connection-level error: #{frame.reply_text}", self, frame) @@ -613,11 +647,11 @@ end # Sends multiple frames, one by one. For thread safety this method takes a channel # object and synchronizes on it. # - # @api private + # @private def send_frameset(frames, channel) # some developers end up sharing channels between threads and when multiple # threads publish on the same channel aggressively, at some point frames will be # delivered out of order and broker will raise 505 UNEXPECTED_FRAME exception. # If we synchronize on the channel, however, this is both thread safe and pretty fine-grained @@ -630,11 +664,11 @@ # Sends multiple frames, one by one. For thread safety this method takes a channel # object and synchronizes on it. Uses transport implementation that does not perform # timeout control. # - # @api private + # @private def send_frameset_without_timeout(frames, channel) # some developers end up sharing channels between threads and when multiple # threads publish on the same channel aggressively, at some point frames will be # delivered out of order and broker will raise 505 UNEXPECTED_FRAME exception. # If we synchronize on the channel, however, this is both thread safe and pretty fine-grained @@ -651,11 +685,11 @@ "#<#{self.class.name}:#{object_id} #{@user}@#{@host}:#{@port}, vhost=#{@vhost}>" end protected - # @api private + # @private def init_connection self.send_preamble connection_start = @transport.read_next_frame.decode_payload @@ -666,11 +700,11 @@ @server_locales = Array(connection_start.locales) @status = :connected end - # @api private + # @private def open_connection @transport.send_frame(AMQ::Protocol::Connection::StartOk.encode(@client_properties, @mechanism, self.encode_credentials(username, password), @locale)) @logger.debug "Sent connection.start-ok" frame = begin @@ -730,91 +764,91 @@ def heartbeat_disabled?(val) 0 == val || val.nil? end - # @api private + # @private def negotiate_value(client_value, server_value) return server_value if client_value == :server if client_value == 0 || server_value == 0 [client_value, server_value].max else [client_value, server_value].min end end - # @api private + # @private def initialize_heartbeat_sender @logger.debug "Initializing heartbeat sender..." @heartbeat_sender = HeartbeatSender.new(@transport, @logger) @heartbeat_sender.start(@heartbeat) end - # @api private + # @private def maybe_shutdown_heartbeat_sender @heartbeat_sender.stop if @heartbeat_sender end - # @api private + # @private def initialize_transport @transport = Transport.new(self, @host, @port, @opts.merge(:session_thread => Thread.current)) end - # @api private + # @private def maybe_close_transport @transport.close if @transport end # Sends AMQ protocol header (also known as preamble). - # @api private + # @private def send_preamble @transport.write(AMQ::Protocol::PREAMBLE) @logger.debug "Sent protocol preamble" end - # @api private + # @private def encode_credentials(username, password) @credentials_encoder.encode_credentials(username, password) end # encode_credentials(username, password) - # @api private + # @private def credentials_encoder_for(mechanism) Authentication::CredentialsEncoder.for_session(self) end if defined?(JRUBY_VERSION) - # @api private + # @private def reset_continuations @continuations = Concurrent::LinkedContinuationQueue.new end else - # @api private + # @private def reset_continuations @continuations = Concurrent::ContinuationQueue.new end end - # @api private + # @private def wait_on_continuations unless @threaded reader_loop.run_once until @continuations.length > 0 end @continuations.poll(@continuation_timeout) end - # @api private + # @private def init_logger(level) @logger = ::Logger.new(@logfile) @logger.level = normalize_log_level(level) @logger.progname = self.to_s @logger end - # @api private + # @private def normalize_log_level(level) case level when :debug, Logger::DEBUG, "debug" then Logger::DEBUG when :info, Logger::INFO, "info" then Logger::INFO when :warn, Logger::WARN, "warn" then Logger::WARN