# encoding: utf-8 require "amq/client" require "amq/client/channel" require "amq/client/exchange" require "amq/client/framing/string/frame" require "eventmachine" module AMQ module Client class EventMachineClient < EM::Connection class Deferrable include EventMachine::Deferrable end # # Behaviors # include AMQ::Client::Adapter self.sync = false register_entity :channel, AMQ::Client::Channel register_entity :exchange, AMQ::Client::Exchange # # API # def self.connect(settings = nil, &block) settings = AMQ::Client::Settings.configure(settings) instance = EM.connect(settings[:host], settings[:port], self, settings) unless block.nil? # delay calling block we were given till after we receive # connection.open-ok. Connection will notify us when # that happens. instance.on_connection do block.call(instance) end end instance end attr_reader :connections def initialize(*args) super(*args) # EventMachine::Connection's and Adapter's constructors arity # make it easier to use *args. MK. @settings = args.first @connections = Array.new @on_possible_authentication_failure = @settings[:on_possible_authentication_failure] @chunk_buffer = "" @connection_deferrable = Deferrable.new @disconnection_deferrable = Deferrable.new @authenticating = false # succeeds when connection is open, that is, vhost is selected # and client is given green light to proceed. @connection_opened_deferrable = Deferrable.new @tcp_connection_established = false if self.heartbeat_interval > 0 @last_server_heartbeat = Time.now EM.add_periodic_timer(self.heartbeat_interval, &method(:send_heartbeat)) end end # initialize(*args) def establish_connection(settings) # an intentional no-op end alias send_raw send_data def authenticating? @authenticating end # authenticating? def tcp_connection_established? @tcp_connection_established end # tcp_connection_established? # # Implementation # def post_init reset @tcp_connection_established = true self.handshake rescue Exception => error raise error end # post_init # # EventMachine receives data in chunks, sometimes those chunks are smaller # than the size of AMQP frame. That's why you need to add some kind of buffer. # def receive_data(chunk) @chunk_buffer << chunk while frame = get_next_frame self.receive_frame(AMQ::Client::Framing::String::Frame.decode(frame)) end end def unbind closing! @tcp_connection_established = false @connections.each { |c| c.on_connection_interruption } @disconnection_deferrable.succeed closed! # since AMQP spec dictates that authentication failure is a protocol exception # and protocol exceptions result in connection closure, check whether we are # in the authentication stage. If so, it is likely to signal an authentication # issue. Java client behaves the same way. MK. if authenticating? if sync? raise PossibleAuthenticationFailureError.new(@settings) else @on_possible_authentication_failure.call(@settings) if @on_possible_authentication_failure end end end # unbind def on_connection(&block) @connection_deferrable.callback(&block) end # on_connection(&block) # called by AMQ::Client::Connection after we receive connection.open-ok. def connection_successful @connection_deferrable.succeed end # connection_successful def on_open(&block) @connection_opened_deferrable.callback(&block) end # on_open(&block) def open_successful @authenticating = false @connection_opened_deferrable.succeed opened! end # open_successful def on_disconnection(&block) @disconnection_deferrable.callback(&block) end # on_disconnection(&block) # called by AMQ::Client::Connection after we receive connection.close-ok. def disconnection_successful @disconnection_deferrable.succeed self.close_connection closed! end # disconnection_successful def on_possible_authentication_failure(&block) @on_possible_authentication_failure = block end protected def handshake(mechanism = "PLAIN", response = nil, locale = "en_GB") username = @settings[:user] || @settings[:username] password = @settings[:pass] || @settings[:password] self.logger.info "[authentication] Credentials are #{username}/#{'*' * password.bytesize}" self.connection = AMQ::Client::Connection.new(self, mechanism, self.encode_credentials(username, password), locale) @authenticating = true self.send_preamble end def reset @size = 0 @payload = "" @frames = Array.new end # @see http://tools.ietf.org/rfc/rfc2595.txt RFC 2595 def encode_credentials(username, password) "\0#{username}\0#{password}" end # encode_credentials(username, password) def get_next_frame return unless @chunk_buffer.size > 7 # otherwise, cannot read the length # octet + short offset = 3 # 1 + 2 # length payload_length = @chunk_buffer[offset, 4].unpack('N')[0] # 5: 4 bytes for long payload length, 1 byte final octet frame_length = offset + 5 + payload_length if frame_length <= @chunk_buffer.size @chunk_buffer.slice!(0, frame_length) else nil end end end # EventMachineClient end # Client end # AMQ