lib/bunny/session.rb in bunny-1.0.0.rc2 vs lib/bunny/session.rb in bunny-1.0.0.rc3
- old
+ new
@@ -18,11 +18,11 @@
require "amq/protocol/client"
require "amq/settings"
module Bunny
- # Represents AMQP 0.9.1 connection (connection to RabbitMQ).
+ # Represents AMQP 0.9.1 connection (to a RabbitMQ node).
# @see http://rubybunny.info/articles/connecting.html Connecting to RabbitMQ guide
class Session
# Default host used for connection
DEFAULT_HOST = "127.0.0.1"
@@ -49,15 +49,17 @@
end
# RabbitMQ client metadata
DEFAULT_CLIENT_PROPERTIES = {
:capabilities => {
- :publisher_confirms => true,
- :consumer_cancel_notify => true,
- :exchange_exchange_bindings => true,
- :"basic.nack" => true,
- :"connection.blocked" => true
+ :publisher_confirms => true,
+ :consumer_cancel_notify => true,
+ :exchange_exchange_bindings => true,
+ :"basic.nack" => true,
+ :"connection.blocked" => true,
+ # See http://www.rabbitmq.com/auth-notification.html
+ :authentication_failure_close => true
},
:product => "Bunny",
:platform => ::RUBY_DESCRIPTION,
:version => Bunny::VERSION,
:information => "http://rubybunny.info",
@@ -96,14 +98,16 @@
# @option connection_string_or_opts [Integer] :port (5672) Port RabbitMQ listens on
# @option connection_string_or_opts [String] :username ("guest") Username
# @option connection_string_or_opts [String] :password ("guest") Password
# @option connection_string_or_opts [String] :vhost ("/") Virtual host to use
# @option connection_string_or_opts [Integer] :heartbeat (600) Heartbeat interval. 0 means no heartbeat.
+ # @option connection_string_or_opts [Integer] :network_recovery_interval (4) Recovery interval periodic network recovery will use. This includes initial pause after network failure.
# @option connection_string_or_opts [Boolean] :tls (false) Should TLS/SSL be used?
# @option connection_string_or_opts [String] :tls_cert (nil) Path to client TLS/SSL certificate file (.pem)
# @option connection_string_or_opts [String] :tls_key (nil) Path to client TLS/SSL private key file (.pem)
# @option connection_string_or_opts [Array<String>] :tls_ca_certificates Array of paths to TLS/SSL CA files (.pem), by default detected from OpenSSL configuration
+ # @option connection_string_or_opts [Integer] :continuation_timeout (4000) Timeout for client operations that expect a response (e.g. {Bunny::Queue#get}), in milliseconds.
#
# @option optz [String] :auth_mechanism ("PLAIN") Authentication mechanism, PLAIN or EXTERNAL
# @option optz [String] :locale ("PLAIN") Locale RabbitMQ should use
#
# @see http://rubybunny.info/articles/connecting.html Connecting to RabbitMQ guide
@@ -294,24 +298,26 @@
#
# @return [Bunny::Session] self
def with_channel(n = nil)
ch = create_channel(n)
yield ch
- ch.close
+ ch.close if ch.open?
self
end
# @return [Boolean] true if this connection is still not fully open
def connecting?
status == :connecting
end
+ # @return [Boolean] true if this AMQP 0.9.1 connection is closed
def closed?
status == :closed
end
+ # @return [Boolean] true if this AMQP 0.9.1 connection is open
def open?
(status == :open || status == :connected || status == :connecting) && @transport.open?
end
alias connected? open?
@@ -388,12 +394,52 @@
# @return [Hash] Parsed URI as a hash
def self.parse_uri(uri)
AMQ::Settings.parse_amqp_url(uri)
end
+ # Checks if a queue with given name exists.
+ #
+ # Implemented using queue.declare
+ # with passive set to true and a one-off (short lived) channel
+ # under the hood.
+ #
+ # @param [String] name Queue name
+ # @return [Boolean] true if queue exists
+ def queue_exists?(name)
+ ch = create_channel
+ begin
+ ch.queue(name, :passive => true)
+ true
+ rescue Bunny::NotFound => _
+ false
+ ensure
+ ch.close if ch.open?
+ end
+ end
+ # Checks if a exchange with given name exists.
#
+ # Implemented using exchange.declare
+ # with passive set to true and a one-off (short lived) channel
+ # under the hood.
+ #
+ # @param [String] name Exchange name
+ # @return [Boolean] true if exchange exists
+ def exchange_exists?(name)
+ ch = create_channel
+ begin
+ ch.exchange(name, :passive => true)
+ true
+ rescue Bunny::NotFound => _
+ false
+ ensure
+ ch.close if ch.open?
+ end
+ end
+
+
+ #
# Implementation
#
# @private
def open_channel(ch)
@@ -440,10 +486,17 @@
@last_connection_close_ok = wait_on_continuations
end
end
end
+ # Handles incoming frames and dispatches them.
+ #
+ # Channel methods (`channel.open-ok`, `channel.close-ok`) are
+ # handled by the session itself.
+ # Connection level errors result in exceptions being raised.
+ # Deliveries and other methods are passed on to channels to dispatch.
+ #
# @private
def handle_frame(ch_number, method)
@logger.debug "Session#handle_frame on #{ch_number}: #{method.inspect}"
case method
when AMQ::Protocol::Channel::OpenOk then
@@ -780,10 +833,23 @@
frames.each { |frame| self.send_frame_without_timeout(frame, false) }
signal_activity!
end
end # send_frameset_without_timeout(frames)
+ # @private
+ def send_raw_without_timeout(data, channel)
+ # some developers end up sharing channels between threads and when multiple
+ # threads publish on the same channel aggressively, at some point frames will be
+ # delivered out of order and broker will raise 505 UNEXPECTED_FRAME exception.
+ # If we synchronize on the channel, however, this is both thread safe and pretty fine-grained
+ # locking. Note that "single frame" methods do not need this kind of synchronization. MK.
+ channel.synchronize do
+ @transport.write(data)
+ signal_activity!
+ end
+ end # send_frameset_without_timeout(frames)
+
# @return [String]
# @api public
def to_s
"#<#{self.class.name}:#{object_id} #{@user}@#{@host}:#{@port}, vhost=#{@vhost}>"
end
@@ -821,10 +887,19 @@
@state = :closed
@logger.error "RabbitMQ closed TCP connection before AMQP 0.9.1 connection was finalized. Most likely this means authentication failure."
raise Bunny::PossibleAuthenticationFailureError.new(self.user, self.vhost, self.password.size)
end
- connection_tune = frame.decode_payload
+ response = frame.decode_payload
+ if response.is_a?(AMQ::Protocol::Connection::Close)
+ @state = :closed
+ @logger.error "Authentication with RabbitMQ failed: #{response.reply_code} #{response.reply_text}"
+ raise Bunny::AuthenticationFailureError.new(self.user, self.vhost, self.password.size)
+ end
+
+
+
+ connection_tune = response
@frame_max = negotiate_value(@client_frame_max, connection_tune.frame_max)
@channel_max = negotiate_value(@client_channel_max, connection_tune.channel_max)
# this allows for disabled heartbeats. MK.
@heartbeat = if heartbeat_disabled?(@client_heartbeat)