lib/amq/client/adapter.rb in amq-client-0.7.0.alpha25 vs lib/amq/client/adapter.rb in amq-client-0.7.0.alpha26
- old
+ new
@@ -1,16 +1,17 @@
# encoding: utf-8
require "amq/client/logging"
require "amq/client/settings"
require "amq/client/entity"
-require "amq/client/connection"
require "amq/client/channel"
module AMQ
# For overview of AMQP client adapters API, see {AMQ::Client::Adapter}
module Client
+
+
# Base adapter class. Specific implementations (for example, EventMachine-based, Cool.io-based or
# sockets-based) subclass it and must implement Adapter API methods:
#
# * #send_raw(data)
# * #estabilish_connection(settings)
@@ -18,26 +19,73 @@
#
# @abstract
module Adapter
def self.included(host)
- host.extend(ClassMethods)
+ host.extend ClassMethods
+ host.extend ProtocolMethodHandlers
host.class_eval do
- attr_accessor :logger, :settings, :connection
- # Authentication mechanism
- attr_accessor :mechanism
+ #
+ # Behaviors
+ #
- # Security response data
- attr_accessor :response
+ include Entity
+
+
+ #
+ # API
+ #
+
+ attr_accessor :logger
+ attr_accessor :settings
+ attr_accessor :connection
+
# The locale defines the language in which the server will send reply texts.
#
# @see http://bit.ly/htCzCX AMQP 0.9.1 protocol documentation (Section 1.4.2.2)
attr_accessor :locale
+ # Client capabilities
+ #
+ # @see http://bit.ly/htCzCX AMQP 0.9.1 protocol documentation (Section 1.4.2.2.1)
+ attr_accessor :client_properties
+
+ # Server capabilities
+ #
+ # @see http://bit.ly/htCzCX AMQP 0.9.1 protocol documentation (Section 1.4.2.1.3)
+ attr_reader :server_properties
+
+ # Authentication mechanism used.
+ #
+ # @see http://bit.ly/htCzCX AMQP 0.9.1 protocol documentation (Section 1.4.2.2)
+ attr_reader :mechanism
+
+ # Channels within this connection.
+ #
+ # @see http://bit.ly/hw2ELX AMQP 0.9.1 specification (Section 2.2.5)
+ attr_reader :channels
+
+ # Maximum channel number that the server permits this connection to use.
+ # Usable channel numbers are in the range 1..channel_max.
+ # Zero indicates no specified limit.
+ #
+ # @see http://bit.ly/htCzCX AMQP 0.9.1 protocol documentation (Sections 1.4.2.5.1 and 1.4.2.6.1)
+ attr_accessor :channel_max
+
+ # Maximum frame size that the server permits this connection to use.
+ #
+ # @see http://bit.ly/htCzCX AMQP 0.9.1 protocol documentation (Sections 1.4.2.5.2 and 1.4.2.6.2)
+ attr_accessor :frame_max
+
+
+ attr_reader :known_hosts
+
+
+
# @api plugin
# @see #disconnect
# @note Adapters must implement this method but it is NOT supposed to be used directly.
# AMQ protocol defines two-step process of closing connection (send Connection.Close
# to the peer and wait for Connection.Close-Ok), implemented by {Adapter#disconnect}
@@ -90,13 +138,11 @@
# @example Specifying using custom adapter class
# AMQ::Client::SocketClient.connect
# @param [Hash] Connection parameters, including :adapter to use.
# @api public
def connect(settings = nil, &block)
- # TODO: this doesn't look very nice, do we need it?
- # Let's make it an instance thing by instance = self.new(settings)
- @settings = settings = Settings.configure(settings)
+ @settings = Settings.configure(settings)
instance = self.new
instance.establish_connection(settings)
instance.register_connection_callback(&block)
@@ -130,87 +176,145 @@
extend RegisterEntityMixin
register_entity :channel, AMQ::Client::Channel
+
#
# API
#
- def initialize(*args)
- super(*args)
- self.logger = self.class.logger
- self.settings = self.class.settings
-
- @frames = Array.new
- end
-
-
-
# Establish socket connection to the server.
#
# @api plugin
def establish_connection(settings)
raise MissingInterfaceMethodError.new("AMQ::Client#establish_connection(settings)")
end
- def handshake(mechanism = "PLAIN", response = "\0guest\0guest", locale = "en_GB")
- self.send_preamble
- self.connection = AMQ::Client::Connection.new(self, mechanism, response, locale)
- end
-
# Properly close connection with AMQ broker, as described in
# section 2.2.4 of the {http://bit.ly/hw2ELX AMQP 0.9.1 specification}.
#
# @api plugin
# @see #close_connection
- def disconnect(reply_code = 200, reply_text = "Goodbye", &block)
+ def disconnect(reply_code = 200, reply_text = "Goodbye", class_id = 0, method_id = 0, &block)
@intentionally_closing_connection = true
-
self.on_disconnection(&block)
- closing!
# ruby-amqp/amqp#66, MK.
- if self.connection
- self.connection.close(reply_code, reply_text)
+ if self.open?
+ closing!
+ self.send Protocol::Connection::Close.encode(reply_code, reply_text, class_id, method_id)
+ elsif self.closing?
+ # no-op
else
self.disconnection_successful
end
end
- alias close disconnect
+
# Sends AMQ protocol header (also known as preamble).
#
# @note This must be implemented by all AMQP clients.
# @api plugin
# @see http://bit.ly/hw2ELX AMQP 0.9.1 specification (Section 2.2)
def send_preamble
self.send_raw(AMQ::Protocol::PREAMBLE)
end
+ # Sends frame to the peer, checking that connection is open.
+ #
+ # @raise [ConnectionClosedError]
def send(frame)
- if self.connection.closed?
+ if closed?
raise ConnectionClosedError.new(frame)
else
self.send_raw(frame.encode)
end
end
+ # Sends multiple frames, one by one.
+ #
+ # @api public
def send_frameset(frames)
frames.each { |frame| self.send(frame) }
end # send_frameset(frames)
+
+ # Returns heartbeat interval this client uses, in seconds.
+ # This value may or may not be used depending on broker capabilities.
+ # Zero means the server does not want a heartbeat.
+ #
+ # @return [Fixnum] Heartbeat interval this client uses, in seconds.
+ # @see http://bit.ly/htCzCX AMQP 0.9.1 protocol documentation (Section 1.4.2.6)
+ def heartbeat_interval
+ @settings[:heartbeat] || @settings[:heartbeat_interval] || 0
+ end # heartbeat_interval
+
+
+ def vhost
+ @settings.fetch(:vhost, "/")
+ end # vhost
+
+
+ # Called when previously established TCP connection fails.
+ # @api public
+ def tcp_connection_lost
+ @on_tcp_connection_loss.call(self, @settings) if @on_tcp_connection_loss
+ end
+
+ # Called when initial TCP connection fails.
+ # @api public
+ def tcp_connection_failed
+ @on_tcp_connection_failure.call(@settings) if @on_tcp_connection_failure
+ end
+
+
+
+ #
+ # Implementation
+ #
+
+
# Sends opaque data to AMQ broker over active connection.
#
# @note This must be implemented by all AMQP clients.
# @api plugin
def send_raw(data)
raise MissingInterfaceMethodError.new("AMQ::Client#send_raw(data)")
end
+ # Sends connection preamble to the broker.
+ def handshake
+ @authenticating = true
+ self.send_preamble
+ end
+
+
+ # Sends connection.open to the server.
+ #
+ # @api plugin
+ # @see http://bit.ly/htCzCX AMQP 0.9.1 protocol documentation (Section 1.4.2.7)
+ def open(vhost = "/")
+ self.send Protocol::Connection::Open.encode(vhost)
+ end
+
+ # Resets connection state.
+ #
+ # @api plugin
+ def reset_state!
+ # no-op by default
+ end # reset_state!
+
+ # @api plugin
+ # @see http://tools.ietf.org/rfc/rfc2595.txt RFC 2595
+ def encode_credentials(username, password)
+ "\0#{username}\0#{password}"
+ end # encode_credentials(username, password)
+
+
def receive_frame(frame)
@frames << frame
if frameset_complete?(@frames)
receive_frameset(@frames)
@frames.clear
@@ -249,19 +353,102 @@
send(Protocol::HeartbeatFrame)
end
end # send_heartbeat
- # Returns heartbeat interval this client uses, in seconds.
- # This value may or may not be used depending on broker capabilities.
+
+ # Handles Connection.Start.
#
- # @return [Fixnum] Heartbeat interval this client uses, in seconds.
+ # @api plugin
+ # @see http://bit.ly/htCzCX AMQP 0.9.1 protocol documentation (Section 1.4.2.1.)
+ def start_ok(method)
+ @server_properties = method.server_properties
+
+ username = @settings[:user] || @settings[:username]
+ password = @settings[:pass] || @settings[:password]
+
+ # It's not clear whether we should transition to :opening state here
+ # or in #open but in case authentication fails, it would be strange to have
+ # @status undefined. So lets do this. MK.
+ opening!
+
+ self.send Protocol::Connection::StartOk.encode(@client_properties, @mechanism, self.encode_credentials(username, password), @locale)
+ end
+
+
+ # Handles Connection.Open-Ok.
+ #
+ # @api plugin
+ # @see http://bit.ly/htCzCX AMQP 0.9.1 protocol documentation (Section 1.4.2.8.)
+ def handle_open_ok(method)
+ @known_hosts = method.known_hosts
+
+ opened!
+ self.connection_successful if self.respond_to?(:connection_successful)
+ end
+
+ # Handles Connection.Tune-Ok.
+ #
+ # @api plugin
# @see http://bit.ly/htCzCX AMQP 0.9.1 protocol documentation (Section 1.4.2.6)
- def heartbeat_interval
- @settings[:heartbeat] || @settings[:heartbeat_interval] || 0
- end # heartbeat_interval
+ def handle_tune(method)
+ @channel_max = method.channel_max
+ @frame_max = method.frame_max
+ @heartbeat_interval = self.heartbeat_interval || method.heartbeat
+ self.send Protocol::Connection::TuneOk.encode(@channel_max, [settings[:frame_max], @frame_max].min, @heartbeat_interval)
+ end # handle_tune(method)
+
+
+ # Handles connection.close. When broker detects a connection level exception, this method is called.
+ #
+ # @api plugin
+ # @see http://bit.ly/htCzCX AMQP 0.9.1 protocol documentation (Section 1.5.2.9)
+ def handle_close(method)
+ self.handle_connection_interruption
+
+ closed!
+ # TODO: use proper exception class, provide protocol class (we know method.class_id and method.method_id) as well!
+ error = RuntimeError.new(method.reply_text)
+ self.error(error)
+ end
+
+
+ # Handles Connection.Close-Ok.
+ #
+ # @api plugin
+ # @see http://bit.ly/htCzCX AMQP 0.9.1 protocol documentation (Section 1.4.2.10)
+ def handle_close_ok(method)
+ closed!
+ self.disconnection_successful
+ end # handle_close_ok(method)
+
+ # @api plugin
+ def handle_connection_interruption
+ @channels.each { |n, c| c.handle_connection_interruption }
+ end # handle_connection_interruption
+
+
+
protected
+
+ # Returns next frame from buffer whenever possible
+ #
+ # @api private
+ def get_next_frame
+ return nil unless @chunk_buffer.size > 7 # otherwise, cannot read the length
+ # octet + short
+ offset = 3 # 1 + 2
+ # length
+ payload_length = @chunk_buffer[offset, 4].unpack('N')[0]
+ # 4 bytes for long payload length, 1 byte final octet
+ frame_length = offset + 4 + payload_length + 1
+ if frame_length <= @chunk_buffer.size
+ @chunk_buffer.slice!(0, frame_length)
+ else
+ nil
+ end
+ end # def get_next_frame
# Utility methods
# Determines, whether the received frameset is ready to be further processed
def frameset_complete?(frames)