lib/amq/client/adapter.rb in amq-client-0.7.0.alpha27 vs lib/amq/client/adapter.rb in amq-client-0.7.0.alpha28
- old
+ new
@@ -201,11 +201,11 @@
self.on_disconnection(&block)
# ruby-amqp/amqp#66, MK.
if self.open?
closing!
- self.send Protocol::Connection::Close.encode(reply_code, reply_text, class_id, method_id)
+ self.send_frame(Protocol::Connection::Close.encode(reply_code, reply_text, class_id, method_id))
elsif self.closing?
# no-op
else
self.disconnection_successful
end
@@ -222,11 +222,11 @@
end
# Sends frame to the peer, checking that connection is open.
#
# @raise [ConnectionClosedError]
- def send(frame)
+ def send_frame(frame)
if closed?
raise ConnectionClosedError.new(frame)
else
self.send_raw(frame.encode)
end
@@ -234,11 +234,11 @@
# Sends multiple frames, one by one.
#
# @api public
def send_frameset(frames)
- frames.each { |frame| self.send(frame) }
+ frames.each { |frame| self.send_frame(frame) }
end # send_frameset(frames)
# Returns heartbeat interval this client uses, in seconds.
@@ -250,10 +250,15 @@
def heartbeat_interval
@settings[:heartbeat] || @settings[:heartbeat_interval] || 0
end # heartbeat_interval
+ # vhost this connection uses. Default is "/", a historically estabilished convention
+ # of RabbitMQ and amqp gem.
+ #
+ # @return [String] vhost this connection uses
+ # @api public
def vhost
@settings.fetch(:vhost, "/")
end # vhost
@@ -283,10 +288,11 @@
def send_raw(data)
raise MissingInterfaceMethodError.new("AMQ::Client#send_raw(data)")
end
# Sends connection preamble to the broker.
+ # @api plugin
def handshake
@authenticating = true
self.send_preamble
end
@@ -294,11 +300,11 @@
# Sends connection.open to the server.
#
# @api plugin
# @see http://bit.ly/htCzCX AMQP 0.9.1 protocol documentation (Section 1.4.2.7)
def open(vhost = "/")
- self.send Protocol::Connection::Open.encode(vhost)
+ self.send_frame(Protocol::Connection::Open.encode(vhost))
end
# Resets connection state.
#
# @api plugin
@@ -311,118 +317,124 @@
def encode_credentials(username, password)
"\0#{username}\0#{password}"
end # encode_credentials(username, password)
+ # Processes a single frame.
+ #
+ # @param [AMQ::Protocol::Frame] frame
+ # @api plugin
def receive_frame(frame)
@frames << frame
if frameset_complete?(@frames)
receive_frameset(@frames)
@frames.clear
else
# puts "#{frame.inspect} is NOT final"
end
end
- # When the adapter receives all the frames related to
- # given method frame, it's supposed to call this method.
- # It calls handler for method class of the first (method)
- # frame with all the other frames as arguments. Handlers
- # are defined in amq/client/* by the handle(klass, &block)
- # method.
+ # Processes a frameset by finding and invoking a suitable handler.
+ # Heartbeat frames are treated in a special way: they simply update @last_server_heartbeat
+ # value.
+ #
+ # @param [Array<AMQ::Protocol::Frame>] frames
+ # @api plugin
def receive_frameset(frames)
frame = frames.first
if Protocol::HeartbeatFrame === frame
@last_server_heartbeat = Time.now
else
- callable = AMQ::Client::HandlersRegistry.find(frame.method_class)
- if callable
+ if callable = AMQ::Client::HandlersRegistry.find(frame.method_class)
callable.call(self, frames.first, frames[1..-1])
else
raise MissingHandlerError.new(frames.first)
end
end
end
+ # Sends a heartbeat frame if connection is open.
+ # @api plugin
def send_heartbeat
if tcp_connection_established?
if @last_server_heartbeat < (Time.now - (self.heartbeat_interval * 2))
logger.error "Reconnecting due to missing server heartbeats"
# TODO: reconnect
end
- send(Protocol::HeartbeatFrame)
+ send_frame(Protocol::HeartbeatFrame)
end
end # send_heartbeat
- # Handles Connection.Start.
+ # Handles connection.start.
#
# @api plugin
# @see http://bit.ly/htCzCX AMQP 0.9.1 protocol documentation (Section 1.4.2.1.)
- def start_ok(method)
- @server_properties = method.server_properties
+ def handle_start(connection_start)
+ @server_properties = connection_start.server_properties
username = @settings[:user] || @settings[:username]
password = @settings[:pass] || @settings[:password]
# It's not clear whether we should transition to :opening state here
# or in #open but in case authentication fails, it would be strange to have
# @status undefined. So lets do this. MK.
opening!
- self.send Protocol::Connection::StartOk.encode(@client_properties, @mechanism, self.encode_credentials(username, password), @locale)
+ self.send_frame(Protocol::Connection::StartOk.encode(@client_properties, @mechanism, self.encode_credentials(username, password), @locale))
end
+ # Handles Connection.Tune-Ok.
+ #
+ # @api plugin
+ # @see http://bit.ly/htCzCX AMQP 0.9.1 protocol documentation (Section 1.4.2.6)
+ def handle_tune(tune_ok)
+ @channel_max = tune_ok.channel_max
+ @frame_max = tune_ok.frame_max
+ @heartbeat_interval = self.heartbeat_interval || tune_ok.heartbeat
+
+ self.send_frame(Protocol::Connection::TuneOk.encode(@channel_max, [settings[:frame_max], @frame_max].min, @heartbeat_interval))
+ end # handle_tune(method)
+
+
# Handles Connection.Open-Ok.
#
# @api plugin
# @see http://bit.ly/htCzCX AMQP 0.9.1 protocol documentation (Section 1.4.2.8.)
- def handle_open_ok(method)
- @known_hosts = method.known_hosts
+ def handle_open_ok(open_ok)
+ @known_hosts = open_ok.known_hosts
opened!
self.connection_successful if self.respond_to?(:connection_successful)
end
- # Handles Connection.Tune-Ok.
- #
- # @api plugin
- # @see http://bit.ly/htCzCX AMQP 0.9.1 protocol documentation (Section 1.4.2.6)
- def handle_tune(method)
- @channel_max = method.channel_max
- @frame_max = method.frame_max
- @heartbeat_interval = self.heartbeat_interval || method.heartbeat
- self.send Protocol::Connection::TuneOk.encode(@channel_max, [settings[:frame_max], @frame_max].min, @heartbeat_interval)
- end # handle_tune(method)
-
-
# Handles connection.close. When broker detects a connection level exception, this method is called.
#
# @api plugin
# @see http://bit.ly/htCzCX AMQP 0.9.1 protocol documentation (Section 1.5.2.9)
- def handle_close(method)
+ def handle_close(conn_close)
self.handle_connection_interruption
closed!
- # TODO: use proper exception class, provide protocol class (we know method.class_id and method.method_id) as well!
- error = RuntimeError.new(method.reply_text)
+ # TODO: use proper exception class, provide protocol class (we know conn_close.class_id and conn_close.method_id) as well!
+ error = RuntimeError.new(conn_close.reply_text)
self.error(error)
end
# Handles Connection.Close-Ok.
#
# @api plugin
# @see http://bit.ly/htCzCX AMQP 0.9.1 protocol documentation (Section 1.4.2.10)
- def handle_close_ok(method)
+ def handle_close_ok(close_ok)
closed!
self.disconnection_successful
- end # handle_close_ok(method)
+ end # handle_close_ok(close_ok)
# @api plugin
def handle_connection_interruption
@channels.each { |n, c| c.handle_connection_interruption }
end # handle_connection_interruption
@@ -437,12 +449,12 @@
def get_next_frame
return nil unless @chunk_buffer.size > 7 # otherwise, cannot read the length
# octet + short
offset = 3 # 1 + 2
# length
- payload_length = @chunk_buffer[offset, 4].unpack('N')[0]
+ payload_length = @chunk_buffer[offset, 4].unpack(AMQ::Protocol::PACK_UINT32).first
# 4 bytes for long payload length, 1 byte final octet
- frame_length = offset + 4 + payload_length + 1
+ frame_length = offset + payload_length + 5
if frame_length <= @chunk_buffer.size
@chunk_buffer.slice!(0, frame_length)
else
nil
end