lib/amq/client/adapter.rb in amq-client-0.7.0.alpha27 vs lib/amq/client/adapter.rb in amq-client-0.7.0.alpha28

- old
+ new

@@ -201,11 +201,11 @@ self.on_disconnection(&block) # ruby-amqp/amqp#66, MK. if self.open? closing! - self.send Protocol::Connection::Close.encode(reply_code, reply_text, class_id, method_id) + self.send_frame(Protocol::Connection::Close.encode(reply_code, reply_text, class_id, method_id)) elsif self.closing? # no-op else self.disconnection_successful end @@ -222,11 +222,11 @@ end # Sends frame to the peer, checking that connection is open. # # @raise [ConnectionClosedError] - def send(frame) + def send_frame(frame) if closed? raise ConnectionClosedError.new(frame) else self.send_raw(frame.encode) end @@ -234,11 +234,11 @@ # Sends multiple frames, one by one. # # @api public def send_frameset(frames) - frames.each { |frame| self.send(frame) } + frames.each { |frame| self.send_frame(frame) } end # send_frameset(frames) # Returns heartbeat interval this client uses, in seconds. @@ -250,10 +250,15 @@ def heartbeat_interval @settings[:heartbeat] || @settings[:heartbeat_interval] || 0 end # heartbeat_interval + # vhost this connection uses. Default is "/", a historically estabilished convention + # of RabbitMQ and amqp gem. + # + # @return [String] vhost this connection uses + # @api public def vhost @settings.fetch(:vhost, "/") end # vhost @@ -283,10 +288,11 @@ def send_raw(data) raise MissingInterfaceMethodError.new("AMQ::Client#send_raw(data)") end # Sends connection preamble to the broker. + # @api plugin def handshake @authenticating = true self.send_preamble end @@ -294,11 +300,11 @@ # Sends connection.open to the server. # # @api plugin # @see http://bit.ly/htCzCX AMQP 0.9.1 protocol documentation (Section 1.4.2.7) def open(vhost = "/") - self.send Protocol::Connection::Open.encode(vhost) + self.send_frame(Protocol::Connection::Open.encode(vhost)) end # Resets connection state. # # @api plugin @@ -311,118 +317,124 @@ def encode_credentials(username, password) "\0#{username}\0#{password}" end # encode_credentials(username, password) + # Processes a single frame. + # + # @param [AMQ::Protocol::Frame] frame + # @api plugin def receive_frame(frame) @frames << frame if frameset_complete?(@frames) receive_frameset(@frames) @frames.clear else # puts "#{frame.inspect} is NOT final" end end - # When the adapter receives all the frames related to - # given method frame, it's supposed to call this method. - # It calls handler for method class of the first (method) - # frame with all the other frames as arguments. Handlers - # are defined in amq/client/* by the handle(klass, &block) - # method. + # Processes a frameset by finding and invoking a suitable handler. + # Heartbeat frames are treated in a special way: they simply update @last_server_heartbeat + # value. + # + # @param [Array<AMQ::Protocol::Frame>] frames + # @api plugin def receive_frameset(frames) frame = frames.first if Protocol::HeartbeatFrame === frame @last_server_heartbeat = Time.now else - callable = AMQ::Client::HandlersRegistry.find(frame.method_class) - if callable + if callable = AMQ::Client::HandlersRegistry.find(frame.method_class) callable.call(self, frames.first, frames[1..-1]) else raise MissingHandlerError.new(frames.first) end end end + # Sends a heartbeat frame if connection is open. + # @api plugin def send_heartbeat if tcp_connection_established? if @last_server_heartbeat < (Time.now - (self.heartbeat_interval * 2)) logger.error "Reconnecting due to missing server heartbeats" # TODO: reconnect end - send(Protocol::HeartbeatFrame) + send_frame(Protocol::HeartbeatFrame) end end # send_heartbeat - # Handles Connection.Start. + # Handles connection.start. # # @api plugin # @see http://bit.ly/htCzCX AMQP 0.9.1 protocol documentation (Section 1.4.2.1.) - def start_ok(method) - @server_properties = method.server_properties + def handle_start(connection_start) + @server_properties = connection_start.server_properties username = @settings[:user] || @settings[:username] password = @settings[:pass] || @settings[:password] # It's not clear whether we should transition to :opening state here # or in #open but in case authentication fails, it would be strange to have # @status undefined. So lets do this. MK. opening! - self.send Protocol::Connection::StartOk.encode(@client_properties, @mechanism, self.encode_credentials(username, password), @locale) + self.send_frame(Protocol::Connection::StartOk.encode(@client_properties, @mechanism, self.encode_credentials(username, password), @locale)) end + # Handles Connection.Tune-Ok. + # + # @api plugin + # @see http://bit.ly/htCzCX AMQP 0.9.1 protocol documentation (Section 1.4.2.6) + def handle_tune(tune_ok) + @channel_max = tune_ok.channel_max + @frame_max = tune_ok.frame_max + @heartbeat_interval = self.heartbeat_interval || tune_ok.heartbeat + + self.send_frame(Protocol::Connection::TuneOk.encode(@channel_max, [settings[:frame_max], @frame_max].min, @heartbeat_interval)) + end # handle_tune(method) + + # Handles Connection.Open-Ok. # # @api plugin # @see http://bit.ly/htCzCX AMQP 0.9.1 protocol documentation (Section 1.4.2.8.) - def handle_open_ok(method) - @known_hosts = method.known_hosts + def handle_open_ok(open_ok) + @known_hosts = open_ok.known_hosts opened! self.connection_successful if self.respond_to?(:connection_successful) end - # Handles Connection.Tune-Ok. - # - # @api plugin - # @see http://bit.ly/htCzCX AMQP 0.9.1 protocol documentation (Section 1.4.2.6) - def handle_tune(method) - @channel_max = method.channel_max - @frame_max = method.frame_max - @heartbeat_interval = self.heartbeat_interval || method.heartbeat - self.send Protocol::Connection::TuneOk.encode(@channel_max, [settings[:frame_max], @frame_max].min, @heartbeat_interval) - end # handle_tune(method) - - # Handles connection.close. When broker detects a connection level exception, this method is called. # # @api plugin # @see http://bit.ly/htCzCX AMQP 0.9.1 protocol documentation (Section 1.5.2.9) - def handle_close(method) + def handle_close(conn_close) self.handle_connection_interruption closed! - # TODO: use proper exception class, provide protocol class (we know method.class_id and method.method_id) as well! - error = RuntimeError.new(method.reply_text) + # TODO: use proper exception class, provide protocol class (we know conn_close.class_id and conn_close.method_id) as well! + error = RuntimeError.new(conn_close.reply_text) self.error(error) end # Handles Connection.Close-Ok. # # @api plugin # @see http://bit.ly/htCzCX AMQP 0.9.1 protocol documentation (Section 1.4.2.10) - def handle_close_ok(method) + def handle_close_ok(close_ok) closed! self.disconnection_successful - end # handle_close_ok(method) + end # handle_close_ok(close_ok) # @api plugin def handle_connection_interruption @channels.each { |n, c| c.handle_connection_interruption } end # handle_connection_interruption @@ -437,12 +449,12 @@ def get_next_frame return nil unless @chunk_buffer.size > 7 # otherwise, cannot read the length # octet + short offset = 3 # 1 + 2 # length - payload_length = @chunk_buffer[offset, 4].unpack('N')[0] + payload_length = @chunk_buffer[offset, 4].unpack(AMQ::Protocol::PACK_UINT32).first # 4 bytes for long payload length, 1 byte final octet - frame_length = offset + 4 + payload_length + 1 + frame_length = offset + payload_length + 5 if frame_length <= @chunk_buffer.size @chunk_buffer.slice!(0, frame_length) else nil end