lib/amq/client/adapter.rb in amq-client-0.7.0.alpha25 vs lib/amq/client/adapter.rb in amq-client-0.7.0.alpha26

- old
+ new

@@ -1,16 +1,17 @@ # encoding: utf-8 require "amq/client/logging" require "amq/client/settings" require "amq/client/entity" -require "amq/client/connection" require "amq/client/channel" module AMQ # For overview of AMQP client adapters API, see {AMQ::Client::Adapter} module Client + + # Base adapter class. Specific implementations (for example, EventMachine-based, Cool.io-based or # sockets-based) subclass it and must implement Adapter API methods: # # * #send_raw(data) # * #estabilish_connection(settings) @@ -18,26 +19,73 @@ # # @abstract module Adapter def self.included(host) - host.extend(ClassMethods) + host.extend ClassMethods + host.extend ProtocolMethodHandlers host.class_eval do - attr_accessor :logger, :settings, :connection - # Authentication mechanism - attr_accessor :mechanism + # + # Behaviors + # - # Security response data - attr_accessor :response + include Entity + + + # + # API + # + + attr_accessor :logger + attr_accessor :settings + attr_accessor :connection + # The locale defines the language in which the server will send reply texts. # # @see http://bit.ly/htCzCX AMQP 0.9.1 protocol documentation (Section 1.4.2.2) attr_accessor :locale + # Client capabilities + # + # @see http://bit.ly/htCzCX AMQP 0.9.1 protocol documentation (Section 1.4.2.2.1) + attr_accessor :client_properties + + # Server capabilities + # + # @see http://bit.ly/htCzCX AMQP 0.9.1 protocol documentation (Section 1.4.2.1.3) + attr_reader :server_properties + + # Authentication mechanism used. + # + # @see http://bit.ly/htCzCX AMQP 0.9.1 protocol documentation (Section 1.4.2.2) + attr_reader :mechanism + + # Channels within this connection. + # + # @see http://bit.ly/hw2ELX AMQP 0.9.1 specification (Section 2.2.5) + attr_reader :channels + + # Maximum channel number that the server permits this connection to use. + # Usable channel numbers are in the range 1..channel_max. + # Zero indicates no specified limit. + # + # @see http://bit.ly/htCzCX AMQP 0.9.1 protocol documentation (Sections 1.4.2.5.1 and 1.4.2.6.1) + attr_accessor :channel_max + + # Maximum frame size that the server permits this connection to use. + # + # @see http://bit.ly/htCzCX AMQP 0.9.1 protocol documentation (Sections 1.4.2.5.2 and 1.4.2.6.2) + attr_accessor :frame_max + + + attr_reader :known_hosts + + + # @api plugin # @see #disconnect # @note Adapters must implement this method but it is NOT supposed to be used directly. # AMQ protocol defines two-step process of closing connection (send Connection.Close # to the peer and wait for Connection.Close-Ok), implemented by {Adapter#disconnect} @@ -90,13 +138,11 @@ # @example Specifying using custom adapter class # AMQ::Client::SocketClient.connect # @param [Hash] Connection parameters, including :adapter to use. # @api public def connect(settings = nil, &block) - # TODO: this doesn't look very nice, do we need it? - # Let's make it an instance thing by instance = self.new(settings) - @settings = settings = Settings.configure(settings) + @settings = Settings.configure(settings) instance = self.new instance.establish_connection(settings) instance.register_connection_callback(&block) @@ -130,87 +176,145 @@ extend RegisterEntityMixin register_entity :channel, AMQ::Client::Channel + # # API # - def initialize(*args) - super(*args) - self.logger = self.class.logger - self.settings = self.class.settings - - @frames = Array.new - end - - - # Establish socket connection to the server. # # @api plugin def establish_connection(settings) raise MissingInterfaceMethodError.new("AMQ::Client#establish_connection(settings)") end - def handshake(mechanism = "PLAIN", response = "\0guest\0guest", locale = "en_GB") - self.send_preamble - self.connection = AMQ::Client::Connection.new(self, mechanism, response, locale) - end - # Properly close connection with AMQ broker, as described in # section 2.2.4 of the {http://bit.ly/hw2ELX AMQP 0.9.1 specification}. # # @api plugin # @see #close_connection - def disconnect(reply_code = 200, reply_text = "Goodbye", &block) + def disconnect(reply_code = 200, reply_text = "Goodbye", class_id = 0, method_id = 0, &block) @intentionally_closing_connection = true - self.on_disconnection(&block) - closing! # ruby-amqp/amqp#66, MK. - if self.connection - self.connection.close(reply_code, reply_text) + if self.open? + closing! + self.send Protocol::Connection::Close.encode(reply_code, reply_text, class_id, method_id) + elsif self.closing? + # no-op else self.disconnection_successful end end - alias close disconnect + # Sends AMQ protocol header (also known as preamble). # # @note This must be implemented by all AMQP clients. # @api plugin # @see http://bit.ly/hw2ELX AMQP 0.9.1 specification (Section 2.2) def send_preamble self.send_raw(AMQ::Protocol::PREAMBLE) end + # Sends frame to the peer, checking that connection is open. + # + # @raise [ConnectionClosedError] def send(frame) - if self.connection.closed? + if closed? raise ConnectionClosedError.new(frame) else self.send_raw(frame.encode) end end + # Sends multiple frames, one by one. + # + # @api public def send_frameset(frames) frames.each { |frame| self.send(frame) } end # send_frameset(frames) + + # Returns heartbeat interval this client uses, in seconds. + # This value may or may not be used depending on broker capabilities. + # Zero means the server does not want a heartbeat. + # + # @return [Fixnum] Heartbeat interval this client uses, in seconds. + # @see http://bit.ly/htCzCX AMQP 0.9.1 protocol documentation (Section 1.4.2.6) + def heartbeat_interval + @settings[:heartbeat] || @settings[:heartbeat_interval] || 0 + end # heartbeat_interval + + + def vhost + @settings.fetch(:vhost, "/") + end # vhost + + + # Called when previously established TCP connection fails. + # @api public + def tcp_connection_lost + @on_tcp_connection_loss.call(self, @settings) if @on_tcp_connection_loss + end + + # Called when initial TCP connection fails. + # @api public + def tcp_connection_failed + @on_tcp_connection_failure.call(@settings) if @on_tcp_connection_failure + end + + + + # + # Implementation + # + + # Sends opaque data to AMQ broker over active connection. # # @note This must be implemented by all AMQP clients. # @api plugin def send_raw(data) raise MissingInterfaceMethodError.new("AMQ::Client#send_raw(data)") end + # Sends connection preamble to the broker. + def handshake + @authenticating = true + self.send_preamble + end + + + # Sends connection.open to the server. + # + # @api plugin + # @see http://bit.ly/htCzCX AMQP 0.9.1 protocol documentation (Section 1.4.2.7) + def open(vhost = "/") + self.send Protocol::Connection::Open.encode(vhost) + end + + # Resets connection state. + # + # @api plugin + def reset_state! + # no-op by default + end # reset_state! + + # @api plugin + # @see http://tools.ietf.org/rfc/rfc2595.txt RFC 2595 + def encode_credentials(username, password) + "\0#{username}\0#{password}" + end # encode_credentials(username, password) + + def receive_frame(frame) @frames << frame if frameset_complete?(@frames) receive_frameset(@frames) @frames.clear @@ -249,19 +353,102 @@ send(Protocol::HeartbeatFrame) end end # send_heartbeat - # Returns heartbeat interval this client uses, in seconds. - # This value may or may not be used depending on broker capabilities. + + # Handles Connection.Start. # - # @return [Fixnum] Heartbeat interval this client uses, in seconds. + # @api plugin + # @see http://bit.ly/htCzCX AMQP 0.9.1 protocol documentation (Section 1.4.2.1.) + def start_ok(method) + @server_properties = method.server_properties + + username = @settings[:user] || @settings[:username] + password = @settings[:pass] || @settings[:password] + + # It's not clear whether we should transition to :opening state here + # or in #open but in case authentication fails, it would be strange to have + # @status undefined. So lets do this. MK. + opening! + + self.send Protocol::Connection::StartOk.encode(@client_properties, @mechanism, self.encode_credentials(username, password), @locale) + end + + + # Handles Connection.Open-Ok. + # + # @api plugin + # @see http://bit.ly/htCzCX AMQP 0.9.1 protocol documentation (Section 1.4.2.8.) + def handle_open_ok(method) + @known_hosts = method.known_hosts + + opened! + self.connection_successful if self.respond_to?(:connection_successful) + end + + # Handles Connection.Tune-Ok. + # + # @api plugin # @see http://bit.ly/htCzCX AMQP 0.9.1 protocol documentation (Section 1.4.2.6) - def heartbeat_interval - @settings[:heartbeat] || @settings[:heartbeat_interval] || 0 - end # heartbeat_interval + def handle_tune(method) + @channel_max = method.channel_max + @frame_max = method.frame_max + @heartbeat_interval = self.heartbeat_interval || method.heartbeat + self.send Protocol::Connection::TuneOk.encode(@channel_max, [settings[:frame_max], @frame_max].min, @heartbeat_interval) + end # handle_tune(method) + + + # Handles connection.close. When broker detects a connection level exception, this method is called. + # + # @api plugin + # @see http://bit.ly/htCzCX AMQP 0.9.1 protocol documentation (Section 1.5.2.9) + def handle_close(method) + self.handle_connection_interruption + + closed! + # TODO: use proper exception class, provide protocol class (we know method.class_id and method.method_id) as well! + error = RuntimeError.new(method.reply_text) + self.error(error) + end + + + # Handles Connection.Close-Ok. + # + # @api plugin + # @see http://bit.ly/htCzCX AMQP 0.9.1 protocol documentation (Section 1.4.2.10) + def handle_close_ok(method) + closed! + self.disconnection_successful + end # handle_close_ok(method) + + # @api plugin + def handle_connection_interruption + @channels.each { |n, c| c.handle_connection_interruption } + end # handle_connection_interruption + + + protected + + # Returns next frame from buffer whenever possible + # + # @api private + def get_next_frame + return nil unless @chunk_buffer.size > 7 # otherwise, cannot read the length + # octet + short + offset = 3 # 1 + 2 + # length + payload_length = @chunk_buffer[offset, 4].unpack('N')[0] + # 4 bytes for long payload length, 1 byte final octet + frame_length = offset + 4 + payload_length + 1 + if frame_length <= @chunk_buffer.size + @chunk_buffer.slice!(0, frame_length) + else + nil + end + end # def get_next_frame # Utility methods # Determines, whether the received frameset is ready to be further processed def frameset_complete?(frames)