lib/amq/client/adapter.rb in amq-client-0.7.0.alpha34 vs lib/amq/client/adapter.rb in amq-client-0.7.0.alpha35
- old
+ new
@@ -1,515 +1,19 @@
# encoding: utf-8
require "amq/client/logging"
require "amq/client/settings"
-require "amq/client/entity"
-require "amq/client/channel"
+require "amq/client/async/queue"
+require "amq/client/async/exchange"
+require "amq/client/async/channel"
+require "amq/client/async/adapter"
+
module AMQ
# For overview of AMQP client adapters API, see {AMQ::Client::Adapter}
module Client
-
- # Base adapter class. Specific implementations (for example, EventMachine-based, Cool.io-based or
- # sockets-based) subclass it and must implement Adapter API methods:
- #
- # * #send_raw(data)
- # * #estabilish_connection(settings)
- # * #close_connection
- #
- # @abstract
- module Adapter
-
- def self.included(host)
- host.extend ClassMethods
- host.extend ProtocolMethodHandlers
-
- host.class_eval do
-
- #
- # API
- #
-
- attr_accessor :logger
- attr_accessor :settings
-
- # @return [Array<#call>]
- attr_reader :callbacks
-
-
- # The locale defines the language in which the server will send reply texts.
- #
- # @see http://bit.ly/htCzCX AMQP 0.9.1 protocol documentation (Section 1.4.2.2)
- attr_accessor :locale
-
- # Client capabilities
- #
- # @see http://bit.ly/htCzCX AMQP 0.9.1 protocol documentation (Section 1.4.2.2.1)
- attr_accessor :client_properties
-
- # Server properties
- #
- # @see http://bit.ly/htCzCX AMQP 0.9.1 protocol documentation (Section 1.4.2.1.3)
- attr_reader :server_properties
-
- # Server capabilities
- #
- # @see http://bit.ly/htCzCX AMQP 0.9.1 protocol documentation (Section 1.4.2.1.3)
- attr_reader :server_capabilities
-
- # Locales server supports
- #
- # @see http://bit.ly/htCzCX AMQP 0.9.1 protocol documentation (Section 1.4.2.1.3)
- attr_reader :server_locales
-
- # Authentication mechanism used.
- #
- # @see http://bit.ly/htCzCX AMQP 0.9.1 protocol documentation (Section 1.4.2.2)
- attr_reader :mechanism
-
- # Authentication mechanisms broker supports.
- #
- # @see http://bit.ly/htCzCX AMQP 0.9.1 protocol documentation (Section 1.4.2.2)
- attr_reader :server_authentication_mechanisms
-
- # Channels within this connection.
- #
- # @see http://bit.ly/hw2ELX AMQP 0.9.1 specification (Section 2.2.5)
- attr_reader :channels
-
- # Maximum channel number that the server permits this connection to use.
- # Usable channel numbers are in the range 1..channel_max.
- # Zero indicates no specified limit.
- #
- # @see http://bit.ly/htCzCX AMQP 0.9.1 protocol documentation (Sections 1.4.2.5.1 and 1.4.2.6.1)
- attr_accessor :channel_max
-
- # Maximum frame size that the server permits this connection to use.
- #
- # @see http://bit.ly/htCzCX AMQP 0.9.1 protocol documentation (Sections 1.4.2.5.2 and 1.4.2.6.2)
- attr_accessor :frame_max
-
-
- attr_reader :known_hosts
-
-
-
- # @api plugin
- # @see #disconnect
- # @note Adapters must implement this method but it is NOT supposed to be used directly.
- # AMQ protocol defines two-step process of closing connection (send Connection.Close
- # to the peer and wait for Connection.Close-Ok), implemented by {Adapter#disconnect}
- def close_connection
- raise MissingInterfaceMethodError.new("AMQ::Client.close_connection")
- end unless defined?(:close_connection) # since it is a module, this method may already be defined
- end
- end # self.included(host)
-
-
-
- module ClassMethods
- # Settings
- def settings
- @settings ||= AMQ::Client::Settings.default
- end
-
- def logger
- @logger ||= begin
- require "logger"
- Logger.new(STDERR)
- end
- end
-
- def logger=(logger)
- methods = AMQ::Client::Logging::REQUIRED_METHODS
- unless methods.all? { |method| logger.respond_to?(method) }
- raise AMQ::Client::Logging::IncompatibleLoggerError.new(methods)
- end
-
- @logger = logger
- end
-
- # @return [Boolean] Current value of logging flag.
- def logging
- settings[:logging]
- end
-
- # Turns loggin on or off.
- def logging=(boolean)
- settings[:logging] = boolean
- end
-
-
- # Establishes connection to AMQ broker and returns it. New connection object is yielded to
- # the block if it is given.
- #
- # @example Specifying adapter via the :adapter option
- # AMQ::Client::Adapter.connect(:adapter => "socket")
- # @example Specifying using custom adapter class
- # AMQ::Client::SocketClient.connect
- # @param [Hash] Connection parameters, including :adapter to use.
- # @api public
- def connect(settings = nil, &block)
- @settings = Settings.configure(settings)
-
- instance = self.new
- instance.establish_connection(settings)
- instance.register_connection_callback(&block)
-
- instance
- end
-
-
- # Can be overriden by higher-level libraries like amqp gem or bunny.
- # Defaults to AMQ::Client::TCPConnectionFailed.
- #
- # @return [Class]
- def tcp_connection_failure_exception_class
- @tcp_connection_failure_exception_class ||= AMQ::Client::TCPConnectionFailed
- end # tcp_connection_failure_exception_class
-
- # Can be overriden by higher-level libraries like amqp gem or bunny.
- # Defaults to AMQ::Client::PossibleAuthenticationFailure.
- #
- # @return [Class]
- def authentication_failure_exception_class
- @authentication_failure_exception_class ||= AMQ::Client::PossibleAuthenticationFailureError
- end # authentication_failure_exception_class
- end # ClassMethods
-
-
- #
- # Behaviors
- #
-
- include Openable
- include Callbacks
-
-
- extend RegisterEntityMixin
-
- register_entity :channel, AMQ::Client::Channel
-
-
- #
- # API
- #
-
-
- # Establish socket connection to the server.
- #
- # @api plugin
- def establish_connection(settings)
- raise MissingInterfaceMethodError.new("AMQ::Client#establish_connection(settings)")
- end
-
- # Properly close connection with AMQ broker, as described in
- # section 2.2.4 of the {http://bit.ly/hw2ELX AMQP 0.9.1 specification}.
- #
- # @api plugin
- # @see #close_connection
- def disconnect(reply_code = 200, reply_text = "Goodbye", class_id = 0, method_id = 0, &block)
- @intentionally_closing_connection = true
- self.on_disconnection(&block)
-
- # ruby-amqp/amqp#66, MK.
- if self.open?
- closing!
- self.send_frame(Protocol::Connection::Close.encode(reply_code, reply_text, class_id, method_id))
- elsif self.closing?
- # no-op
- else
- self.disconnection_successful
- end
- end
-
-
- # Sends AMQ protocol header (also known as preamble).
- #
- # @note This must be implemented by all AMQP clients.
- # @api plugin
- # @see http://bit.ly/hw2ELX AMQP 0.9.1 specification (Section 2.2)
- def send_preamble
- self.send_raw(AMQ::Protocol::PREAMBLE)
- end
-
- # Sends frame to the peer, checking that connection is open.
- #
- # @raise [ConnectionClosedError]
- def send_frame(frame)
- if closed?
- raise ConnectionClosedError.new(frame)
- else
- self.send_raw(frame.encode)
- end
- end
-
- # Sends multiple frames, one by one.
- #
- # @api public
- def send_frameset(frames)
- frames.each { |frame| self.send_frame(frame) }
- end # send_frameset(frames)
-
-
-
- # Returns heartbeat interval this client uses, in seconds.
- # This value may or may not be used depending on broker capabilities.
- # Zero means the server does not want a heartbeat.
- #
- # @return [Fixnum] Heartbeat interval this client uses, in seconds.
- # @see http://bit.ly/htCzCX AMQP 0.9.1 protocol documentation (Section 1.4.2.6)
- def heartbeat_interval
- @settings[:heartbeat] || @settings[:heartbeat_interval] || 0
- end # heartbeat_interval
-
-
- # vhost this connection uses. Default is "/", a historically estabilished convention
- # of RabbitMQ and amqp gem.
- #
- # @return [String] vhost this connection uses
- # @api public
- def vhost
- @settings.fetch(:vhost, "/")
- end # vhost
-
-
- # Called when previously established TCP connection fails.
- # @api public
- def tcp_connection_lost
- @on_tcp_connection_loss.call(self, @settings) if @on_tcp_connection_loss
- end
-
- # Called when initial TCP connection fails.
- # @api public
- def tcp_connection_failed
- @on_tcp_connection_failure.call(@settings) if @on_tcp_connection_failure
- end
-
-
-
- #
- # Implementation
- #
-
-
- # Sends opaque data to AMQ broker over active connection.
- #
- # @note This must be implemented by all AMQP clients.
- # @api plugin
- def send_raw(data)
- raise MissingInterfaceMethodError.new("AMQ::Client#send_raw(data)")
- end
-
- # Sends connection preamble to the broker.
- # @api plugin
- def handshake
- @authenticating = true
- self.send_preamble
- end
-
-
- # Sends connection.open to the server.
- #
- # @api plugin
- # @see http://bit.ly/htCzCX AMQP 0.9.1 protocol documentation (Section 1.4.2.7)
- def open(vhost = "/")
- self.send_frame(Protocol::Connection::Open.encode(vhost))
- end
-
- # Resets connection state.
- #
- # @api plugin
- def reset_state!
- # no-op by default
- end # reset_state!
-
- # @api plugin
- # @see http://tools.ietf.org/rfc/rfc2595.txt RFC 2595
- def encode_credentials(username, password)
- "\0#{username}\0#{password}"
- end # encode_credentials(username, password)
-
-
- # Processes a single frame.
- #
- # @param [AMQ::Protocol::Frame] frame
- # @api plugin
- def receive_frame(frame)
- @frames << frame
- if frameset_complete?(@frames)
- receive_frameset(@frames)
- @frames.clear
- else
- # puts "#{frame.inspect} is NOT final"
- end
- end
-
- # Processes a frameset by finding and invoking a suitable handler.
- # Heartbeat frames are treated in a special way: they simply update @last_server_heartbeat
- # value.
- #
- # @param [Array<AMQ::Protocol::Frame>] frames
- # @api plugin
- def receive_frameset(frames)
- frame = frames.first
-
- if Protocol::HeartbeatFrame === frame
- @last_server_heartbeat = Time.now
- else
- if callable = AMQ::Client::HandlersRegistry.find(frame.method_class)
- callable.call(self, frames.first, frames[1..-1])
- else
- raise MissingHandlerError.new(frames.first)
- end
- end
- end
-
- # Sends a heartbeat frame if connection is open.
- # @api plugin
- def send_heartbeat
- if tcp_connection_established?
- if @last_server_heartbeat < (Time.now - (self.heartbeat_interval * 2))
- logger.error "Reconnecting due to missing server heartbeats"
- # TODO: reconnect
- end
- send_frame(Protocol::HeartbeatFrame)
- end
- end # send_heartbeat
-
-
-
-
- # @group Error handling
-
- # Defines a callback that will be executed when channel is closed after
- # channel-level exception. Only one callback can be added (the one added last
- # replaces previous added ones).
- #
- # @api public
- def on_error(&block)
- self.redefine_callback(:error, &block)
- end
-
- # @endgroup
-
-
-
-
- # Handles connection.start.
- #
- # @api plugin
- # @see http://bit.ly/htCzCX AMQP 0.9.1 protocol documentation (Section 1.4.2.1.)
- def handle_start(connection_start)
- @server_properties = connection_start.server_properties
- @server_capabilities = @server_properties["capabilities"]
-
- @server_authentication_mechanisms = (connection_start.mechanisms || "").split(" ")
- @server_locales = Array(connection_start.locales)
-
- username = @settings[:user] || @settings[:username]
- password = @settings[:pass] || @settings[:password]
-
- # It's not clear whether we should transition to :opening state here
- # or in #open but in case authentication fails, it would be strange to have
- # @status undefined. So lets do this. MK.
- opening!
-
- self.send_frame(Protocol::Connection::StartOk.encode(@client_properties, @mechanism, self.encode_credentials(username, password), @locale))
- end
-
-
- # Handles Connection.Tune-Ok.
- #
- # @api plugin
- # @see http://bit.ly/htCzCX AMQP 0.9.1 protocol documentation (Section 1.4.2.6)
- def handle_tune(tune_ok)
- @channel_max = tune_ok.channel_max.freeze
- @frame_max = tune_ok.frame_max.freeze
- @heartbeat_interval = self.heartbeat_interval || tune_ok.heartbeat
-
- self.send_frame(Protocol::Connection::TuneOk.encode(@channel_max, [settings[:frame_max], @frame_max].min, @heartbeat_interval))
- end # handle_tune(method)
-
-
- # Handles Connection.Open-Ok.
- #
- # @api plugin
- # @see http://bit.ly/htCzCX AMQP 0.9.1 protocol documentation (Section 1.4.2.8.)
- def handle_open_ok(open_ok)
- @known_hosts = open_ok.known_hosts.dup.freeze
-
- opened!
- self.connection_successful if self.respond_to?(:connection_successful)
- end
-
-
- # Handles connection.close. When broker detects a connection level exception, this method is called.
- #
- # @api plugin
- # @see http://bit.ly/htCzCX AMQP 0.9.1 protocol documentation (Section 1.5.2.9)
- def handle_close(conn_close)
- self.handle_connection_interruption
-
- closed!
- # TODO: use proper exception class, provide protocol class (we know conn_close.class_id and conn_close.method_id) as well!
- self.exec_callback_yielding_self(:error, conn_close)
- end
-
-
- # Handles Connection.Close-Ok.
- #
- # @api plugin
- # @see http://bit.ly/htCzCX AMQP 0.9.1 protocol documentation (Section 1.4.2.10)
- def handle_close_ok(close_ok)
- closed!
- self.disconnection_successful
- end # handle_close_ok(close_ok)
-
- # @api plugin
- def handle_connection_interruption
- @channels.each { |n, c| c.handle_connection_interruption }
- end # handle_connection_interruption
-
-
-
- protected
-
- # Returns next frame from buffer whenever possible
- #
- # @api private
- def get_next_frame
- return nil unless @chunk_buffer.size > 7 # otherwise, cannot read the length
- # octet + short
- offset = 3 # 1 + 2
- # length
- payload_length = @chunk_buffer[offset, 4].unpack(AMQ::Protocol::PACK_UINT32).first
- # 4 bytes for long payload length, 1 byte final octet
- frame_length = offset + payload_length + 5
- if frame_length <= @chunk_buffer.size
- @chunk_buffer.slice!(0, frame_length)
- else
- nil
- end
- end # def get_next_frame
-
- # Utility methods
-
- # Determines, whether the received frameset is ready to be further processed
- def frameset_complete?(frames)
- return false if frames.empty?
- first_frame = frames[0]
- first_frame.final? || (first_frame.method_class.has_content? && content_complete?(frames[1..-1]))
- end
-
- # Determines, whether given frame array contains full content body
- def content_complete?(frames)
- return false if frames.empty?
- header = frames[0]
- raise "Not a content header frame first: #{header.inspect}" unless header.kind_of?(AMQ::Protocol::HeaderFrame)
- header.body_size == frames[1..-1].inject(0) {|sum, frame| sum + frame.payload.size }
- end
-
- end # Adapter
+ # backwards compatibility
+ # @private
+ Adapter = Async::Adapter
end # Client
end # AMQ