lib/bunny/session.rb in bunny-1.0.0.rc2 vs lib/bunny/session.rb in bunny-1.0.0.rc3

- old
+ new

@@ -18,11 +18,11 @@ require "amq/protocol/client" require "amq/settings" module Bunny - # Represents AMQP 0.9.1 connection (connection to RabbitMQ). + # Represents AMQP 0.9.1 connection (to a RabbitMQ node). # @see http://rubybunny.info/articles/connecting.html Connecting to RabbitMQ guide class Session # Default host used for connection DEFAULT_HOST = "127.0.0.1" @@ -49,15 +49,17 @@ end # RabbitMQ client metadata DEFAULT_CLIENT_PROPERTIES = { :capabilities => { - :publisher_confirms => true, - :consumer_cancel_notify => true, - :exchange_exchange_bindings => true, - :"basic.nack" => true, - :"connection.blocked" => true + :publisher_confirms => true, + :consumer_cancel_notify => true, + :exchange_exchange_bindings => true, + :"basic.nack" => true, + :"connection.blocked" => true, + # See http://www.rabbitmq.com/auth-notification.html + :authentication_failure_close => true }, :product => "Bunny", :platform => ::RUBY_DESCRIPTION, :version => Bunny::VERSION, :information => "http://rubybunny.info", @@ -96,14 +98,16 @@ # @option connection_string_or_opts [Integer] :port (5672) Port RabbitMQ listens on # @option connection_string_or_opts [String] :username ("guest") Username # @option connection_string_or_opts [String] :password ("guest") Password # @option connection_string_or_opts [String] :vhost ("/") Virtual host to use # @option connection_string_or_opts [Integer] :heartbeat (600) Heartbeat interval. 0 means no heartbeat. + # @option connection_string_or_opts [Integer] :network_recovery_interval (4) Recovery interval periodic network recovery will use. This includes initial pause after network failure. # @option connection_string_or_opts [Boolean] :tls (false) Should TLS/SSL be used? # @option connection_string_or_opts [String] :tls_cert (nil) Path to client TLS/SSL certificate file (.pem) # @option connection_string_or_opts [String] :tls_key (nil) Path to client TLS/SSL private key file (.pem) # @option connection_string_or_opts [Array<String>] :tls_ca_certificates Array of paths to TLS/SSL CA files (.pem), by default detected from OpenSSL configuration + # @option connection_string_or_opts [Integer] :continuation_timeout (4000) Timeout for client operations that expect a response (e.g. {Bunny::Queue#get}), in milliseconds. # # @option optz [String] :auth_mechanism ("PLAIN") Authentication mechanism, PLAIN or EXTERNAL # @option optz [String] :locale ("PLAIN") Locale RabbitMQ should use # # @see http://rubybunny.info/articles/connecting.html Connecting to RabbitMQ guide @@ -294,24 +298,26 @@ # # @return [Bunny::Session] self def with_channel(n = nil) ch = create_channel(n) yield ch - ch.close + ch.close if ch.open? self end # @return [Boolean] true if this connection is still not fully open def connecting? status == :connecting end + # @return [Boolean] true if this AMQP 0.9.1 connection is closed def closed? status == :closed end + # @return [Boolean] true if this AMQP 0.9.1 connection is open def open? (status == :open || status == :connected || status == :connecting) && @transport.open? end alias connected? open? @@ -388,12 +394,52 @@ # @return [Hash] Parsed URI as a hash def self.parse_uri(uri) AMQ::Settings.parse_amqp_url(uri) end + # Checks if a queue with given name exists. + # + # Implemented using queue.declare + # with passive set to true and a one-off (short lived) channel + # under the hood. + # + # @param [String] name Queue name + # @return [Boolean] true if queue exists + def queue_exists?(name) + ch = create_channel + begin + ch.queue(name, :passive => true) + true + rescue Bunny::NotFound => _ + false + ensure + ch.close if ch.open? + end + end + # Checks if a exchange with given name exists. # + # Implemented using exchange.declare + # with passive set to true and a one-off (short lived) channel + # under the hood. + # + # @param [String] name Exchange name + # @return [Boolean] true if exchange exists + def exchange_exists?(name) + ch = create_channel + begin + ch.exchange(name, :passive => true) + true + rescue Bunny::NotFound => _ + false + ensure + ch.close if ch.open? + end + end + + + # # Implementation # # @private def open_channel(ch) @@ -440,10 +486,17 @@ @last_connection_close_ok = wait_on_continuations end end end + # Handles incoming frames and dispatches them. + # + # Channel methods (`channel.open-ok`, `channel.close-ok`) are + # handled by the session itself. + # Connection level errors result in exceptions being raised. + # Deliveries and other methods are passed on to channels to dispatch. + # # @private def handle_frame(ch_number, method) @logger.debug "Session#handle_frame on #{ch_number}: #{method.inspect}" case method when AMQ::Protocol::Channel::OpenOk then @@ -780,10 +833,23 @@ frames.each { |frame| self.send_frame_without_timeout(frame, false) } signal_activity! end end # send_frameset_without_timeout(frames) + # @private + def send_raw_without_timeout(data, channel) + # some developers end up sharing channels between threads and when multiple + # threads publish on the same channel aggressively, at some point frames will be + # delivered out of order and broker will raise 505 UNEXPECTED_FRAME exception. + # If we synchronize on the channel, however, this is both thread safe and pretty fine-grained + # locking. Note that "single frame" methods do not need this kind of synchronization. MK. + channel.synchronize do + @transport.write(data) + signal_activity! + end + end # send_frameset_without_timeout(frames) + # @return [String] # @api public def to_s "#<#{self.class.name}:#{object_id} #{@user}@#{@host}:#{@port}, vhost=#{@vhost}>" end @@ -821,10 +887,19 @@ @state = :closed @logger.error "RabbitMQ closed TCP connection before AMQP 0.9.1 connection was finalized. Most likely this means authentication failure." raise Bunny::PossibleAuthenticationFailureError.new(self.user, self.vhost, self.password.size) end - connection_tune = frame.decode_payload + response = frame.decode_payload + if response.is_a?(AMQ::Protocol::Connection::Close) + @state = :closed + @logger.error "Authentication with RabbitMQ failed: #{response.reply_code} #{response.reply_text}" + raise Bunny::AuthenticationFailureError.new(self.user, self.vhost, self.password.size) + end + + + + connection_tune = response @frame_max = negotiate_value(@client_frame_max, connection_tune.frame_max) @channel_max = negotiate_value(@client_channel_max, connection_tune.channel_max) # this allows for disabled heartbeats. MK. @heartbeat = if heartbeat_disabled?(@client_heartbeat)