lib/bunny/session.rb in bunny-0.9.0.pre8 vs lib/bunny/session.rb in bunny-0.9.0.pre9

- old
+ new

@@ -7,11 +7,15 @@ require "bunny/main_loop" require "bunny/authentication/credentials_encoder" require "bunny/authentication/plain_mechanism_encoder" require "bunny/authentication/external_mechanism_encoder" -require "bunny/concurrent/condition" +if defined?(JRUBY_VERSION) + require "bunny/concurrent/linked_continuation_queue" +else + require "bunny/concurrent/continuation_queue" +end require "amq/protocol/client" require "amq/settings" module Bunny @@ -26,11 +30,11 @@ # Default username used for connection DEFAULT_USER = "guest" # Default password used for connection DEFAULT_PASSWORD = "guest" # Default heartbeat interval, the same value as RabbitMQ 3.0 uses. - DEFAULT_HEARTBEAT = 600 + DEFAULT_HEARTBEAT = :server # @private DEFAULT_FRAME_MAX = 131072 # backwards compatibility # @private @@ -50,11 +54,13 @@ :information => "http://rubybunny.info", } DEFAULT_LOCALE = "en_GB" + DEFAULT_NETWORK_RECOVERY_INTERVAL = 5.0 + # # API # attr_reader :status, :host, :port, :heartbeat, :user, :pass, :vhost, :frame_max, :threaded @@ -105,10 +111,11 @@ @automatically_recover = if opts[:automatically_recover].nil? && opts[:automatic_recovery].nil? true else opts[:automatically_recover] || opts[:automatic_recovery] end + @network_recovery_interval = opts.fetch(:network_recovery_interval, DEFAULT_NETWORK_RECOVERY_INTERVAL) @status = :not_connected # these are negotiated with the broker during the connection tuning phase @client_frame_max = opts.fetch(:frame_max, DEFAULT_FRAME_MAX) @@ -122,11 +129,11 @@ # mutex for the channel id => channel hash @channel_mutex = Mutex.new @network_mutex = Mutex.new @channels = Hash.new - @continuations = ::Queue.new + self.reset_continuations end # @return [String] RabbitMQ hostname (or IP address) used def hostname; self.host; end # @return [String] Username used @@ -134,10 +141,13 @@ # @return [String] Password used def password; self.pass; end # @return [String] Virtual host used def virtual_host; self.vhost; end + # @return [Integer] Heartbeat interval used + def heartbeat_interval; self.heartbeat; end + # @return [Boolean] true if this connection uses TLS (SSL) def uses_tls? @transport.uses_tls? end alias tls? uses_tls? @@ -146,25 +156,34 @@ def uses_ssl? @transport.uses_ssl? end alias ssl? uses_ssl? + # @return [Boolean] true if this connection uses a separate thread for I/O activity + def threaded? + @threaded + end + # Starts connection process # @api public def start - @continuations = ::Queue.new + return self if connected? + @status = :connecting + self.reset_continuations self.initialize_transport self.init_connection self.open_connection @event_loop = nil self.start_main_loop if @threaded @default_channel = self.create_channel + + self end def read_write_timeout @transport.read_write_timeout end @@ -195,32 +214,37 @@ end end end alias stop close + # Creates a temporary channel, yields it to the block given to this + # method and closes it. + # + # @return [Bunny::Session] self def with_channel(n = nil) ch = create_channel(n) yield ch ch.close self end - + # @return [Boolean] true if this connection is still not fully open def connecting? status == :connecting end def closed? status == :closed end def open? - (status == :open || status == :connected) && @transport.open? + (status == :open || status == :connected || status == :connecting) && @transport.open? end alias connected? open? + # @return [Boolean] true if this connection has automatic recovery from network failure enabled def automatically_recover? @automatically_recover end # @@ -316,10 +340,12 @@ when AMQ::Protocol::Channel::CloseOk then @continuations.push(method) when AMQ::Protocol::Connection::Close then @last_connection_error = instantiate_connection_level_exception(method) @continuations.push(method) + + raise @last_connection_error when AMQ::Protocol::Connection::CloseOk then @last_connection_close_ok = method begin @continuations.clear @@ -330,11 +356,11 @@ rescue StandardError => e puts e.class.name puts e.message puts e.backtrace ensure - @continuations.push(nil) + @continuations.push(:__unblock__) end when AMQ::Protocol::Channel::Close then begin ch = @channels[ch_number] ch.handle_method(method) @@ -374,10 +400,12 @@ # @private def handle_network_failure(exception) raise NetworkErrorWrapper.new(exception) unless @threaded + @status = :disconnected + if !recovering_from_network_failure? @recovering_from_network_failure = true if recoverable_network_failure?(exception) # puts "Recovering from a network failure..." @channels.each do |n, ch| @@ -404,21 +432,22 @@ end # @private def recover_from_network_failure begin + sleep @network_recovery_interval # puts "About to start recovery..." start if open? @recovering_from_network_failure = false recover_channels end rescue TCPConnectionFailed, AMQ::Protocol::EmptyResponseError => e # puts "TCP connection failed, reconnecting in 5 seconds" - sleep 5.0 + sleep @network_recovery_interval retry if recoverable_network_failure?(e) end end # @private @@ -441,15 +470,17 @@ # @private def instantiate_connection_level_exception(frame) case frame when AMQ::Protocol::Connection::Close then klass = case frame.reply_code + when 320 then + ConnectionForced when 503 then InvalidCommand when 504 then ChannelError - when 504 then + when 505 then UnexpectedFrame else raise "Unknown reply code: #{frame.reply_code}, text: #{frame.reply_text}" end @@ -529,10 +560,15 @@ def event_loop @event_loop ||= MainLoop.new(@transport, self, Thread.current) end # @private + def maybe_shutdown_main_loop + @event_loop.stop if @event_loop + end + + # @private def signal_activity! @heartbeat_sender.signal_activity! if @heartbeat_sender end @@ -631,11 +667,11 @@ connection_tune = frame.decode_payload @frame_max = negotiate_value(@client_frame_max, connection_tune.frame_max) @channel_max = negotiate_value(@client_channel_max, connection_tune.channel_max) # this allows for disabled heartbeats. MK. - @heartbeat = if 0 == @client_heartbeat || @client_heartbeat.nil? + @heartbeat = if heartbeat_disabled?(@client_heartbeat) 0 else negotiate_value(@client_heartbeat, connection_tune.heartbeat) end @@ -663,12 +699,18 @@ end raise "could not open connection: server did not respond with connection.open-ok" unless connection_open_ok.is_a?(AMQ::Protocol::Connection::OpenOk) end + def heartbeat_disabled?(val) + 0 == val || val.nil? + end + # @api private def negotiate_value(client_value, server_value) + return server_value if client_value == :server + if client_value == 0 || server_value == 0 [client_value, server_value].max else [client_value, server_value].min end @@ -704,9 +746,18 @@ end # encode_credentials(username, password) # @api private def credentials_encoder_for(mechanism) Authentication::CredentialsEncoder.for_session(self) + end + + # @api private + def reset_continuations + @continuations = if defined?(JRUBY_VERSION) + Concurrent::LinkedContinuationQueue.new + else + Concurrent::ContinuationQueue.new + end end # @api private def wait_on_continuations unless @threaded