lib/mqtt/packet.rb in mqtt-0.0.9 vs lib/mqtt/packet.rb in mqtt-0.1.0

- old
+ new

@@ -1,5 +1,7 @@ +# encoding: BINARY + module MQTT # Class representing a MQTT Packet # Performs binary encoding and decoding of headers class MQTT::Packet @@ -15,31 +17,25 @@ :body_length => nil } # Read in a packet from a socket def self.read(socket) - # Read in the packet header and work out the class - header = read_byte(socket) - type_id = ((header & 0xF0) >> 4) - packet_class = MQTT::PACKET_TYPES[type_id] - - # Create a new packet object - packet = packet_class.new( - :duplicate => ((header & 0x08) >> 3), - :qos => ((header & 0x06) >> 1), - :retain => ((header & 0x01) >> 0) + # Read in the packet header and create a new packet object + packet = create_from_header( + read_byte(socket) ) # Read in the packet length multiplier = 1 body_length = 0 + pos = 1 begin digit = read_byte(socket) body_length += ((digit & 0x7F) * multiplier) multiplier *= 0x80 - end while ((digit & 0x80) != 0x00) - # FIXME: only allow 4 bytes? + pos += 1 + end while ((digit & 0x80) != 0x00) and pos <= 4 # Store the expected body length in the packet packet.instance_variable_set('@body_length', body_length) # Read in the packet body @@ -56,48 +52,59 @@ end # Parse the header and create a new packet object of the correct type # The header is removed from the buffer passed into this function def self.parse_header(buffer) - # Work out the class - type_id = ((buffer.unpack("C*")[0] & 0xF0) >> 4) - packet_class = MQTT::PACKET_TYPES[type_id] - if packet_class.nil? - raise ProtocolException.new("Invalid packet type identifier: #{type_id}") + # Check that the packet is a long as the minimum packet size + if buffer.bytesize < 2 + raise ProtocolException.new("Invalid packet: less than 2 bytes long") end # Create a new packet object - packet = packet_class.new( - :duplicate => ((buffer.unpack("C*")[0] & 0x08) >> 3) == 0x01, - :qos => ((buffer.unpack("C*")[0] & 0x06) >> 1), - :retain => ((buffer.unpack("C*")[0] & 0x01) >> 0) == 0x01 - ) + bytes = buffer.unpack("C5") + packet = create_from_header(bytes.first) # Parse the packet length body_length = 0 multiplier = 1 pos = 1 begin - if buffer.length <= pos + if buffer.bytesize <= pos raise ProtocolException.new("The packet length header is incomplete") end - digit = buffer.unpack("C*")[pos] + digit = bytes[pos] body_length += ((digit & 0x7F) * multiplier) multiplier *= 0x80 pos += 1 end while ((digit & 0x80) != 0x00) and pos <= 4 # Store the expected body length in the packet packet.instance_variable_set('@body_length', body_length) - # Delete the variable length header from the raw packet passed in + # Delete the fixed header from the raw packet passed in buffer.slice!(0...pos) return packet end + # Create a new packet object from the first byte of a MQTT packet + def self.create_from_header(byte) + # Work out the class + type_id = ((byte & 0xF0) >> 4) + packet_class = MQTT::PACKET_TYPES[type_id] + if packet_class.nil? + raise ProtocolException.new("Invalid packet type identifier: #{type_id}") + end + # Create a new packet object + packet_class.new( + :duplicate => ((byte & 0x08) >> 3) == 0x01, + :qos => ((byte & 0x06) >> 1), + :retain => ((byte & 0x01) >> 0) == 0x01 + ) + end + # Create a new empty packet def initialize(args={}) update_attributes(DEFAULTS.merge(args)) end @@ -135,22 +142,25 @@ end # Set the Quality of Service level (0/1/2) def qos=(arg) @qos = arg.to_i + if @qos < 0 or @qos > 2 + raise "Invalid QoS value: #{@qos}" + end end # Set the length of the packet body def body_length=(arg) @body_length = arg.to_i end # Parse the body (variable header and payload) of a packet def parse_body(buffer) - if buffer.length != body_length + if buffer.bytesize != body_length raise ProtocolException.new( - "Failed to parse packet - input buffer (#{buffer.length}) is not the same as the body length buffer (#{body_length})" + "Failed to parse packet - input buffer (#{buffer.bytesize}) is not the same as the body length header (#{body_length})" ) end end # Get serialisation of packet's body (variable header and payload) @@ -170,12 +180,17 @@ ] # Get the packet's variable header and payload body = self.encode_body + # Check that that packet isn't too big + body_length = body.bytesize + if body_length > 268435455 + raise "Error serialising packet: body is more than 256MB" + end + # Build up the body length field bytes - body_length = body.length begin digit = (body_length % 128) body_length = (body_length / 128) # if there are more digits to encode, set the top bit of this digit digit |= 0x80 if (body_length > 0) @@ -184,10 +199,13 @@ # Convert header to binary and add on body header.pack('C*') + body end + def inspect + "\#<#{self.class}>" + end protected # Encode an array of bytes and return them def encode_bytes(*bytes) @@ -197,15 +215,18 @@ # Encode a 16-bit unsigned integer and return it def encode_short(val) [val.to_i].pack('n') end - # Encode a string and return it + # Encode a UTF-8 string and return it # (preceded by the length of the string) def encode_string(str) - str = str.to_s unless str.is_a?(String) - encode_short(str.length) + str + str = str.to_s.encode('UTF-8') + + # Force to binary, when assembling the packet + str.force_encoding('ASCII-8BIT') + encode_short(str.bytesize) + str end # Remove a 16-bit unsigned integer from the front of buffer def shift_short(buffer) bytes = buffer.slice!(0..1) @@ -223,21 +244,23 @@ end # Remove string from the front of buffer def shift_string(buffer) len = shift_short(buffer) - shift_data(buffer,len) + str = shift_data(buffer,len) + # Strings in MQTT v3.1 are all UTF-8 + str.force_encoding('UTF-8') end private # Read and unpack a single byte from a socket def self.read_byte(socket) byte = socket.read(1) if byte.nil? - raise ProtocolException + raise ProtocolException.new("Failed to read byte from socket") end byte.unpack('C').first end @@ -263,26 +286,46 @@ end # Get serialisation of packet's body def encode_body body = '' - if @topic.nil? + if @topic.nil? or @topic.to_s.empty? raise "Invalid topic name when serialising packet" end body += encode_string(@topic) body += encode_short(@message_id) unless qos == 0 - body += payload.to_s + body += payload.to_s.force_encoding('ASCII-8BIT') return body end # Parse the body (variable header and payload) of a Publish packet def parse_body(buffer) super(buffer) @topic = shift_string(buffer) @message_id = shift_short(buffer) unless qos == 0 - @payload = buffer.dup + @payload = buffer end + + def inspect + "\#<#{self.class}: " + + "d#{duplicate ? '1' : '0'}, " + + "q#{qos}, " + + "r#{retain ? '1' : '0'}, " + + "m#{message_id}, " + + "'#{topic}', " + + "#{inspect_payload}>" + end + + protected + def inspect_payload + str = payload.to_s + if str.bytesize < 16 + "'#{str}'" + else + "... (#{str.bytesize} bytes)" + end + end end # Class representing an MQTT Connect Packet class Connect < MQTT::Packet attr_accessor :protocol_name @@ -321,16 +364,20 @@ end # Get serialisation of packet's body def encode_body body = '' - if @client_id.nil? or @client_id.length < 1 or @client_id.length > 23 + if @client_id.nil? or @client_id.bytesize < 1 or @client_id.bytesize > 23 raise "Invalid client identifier when serialising packet" end body += encode_string(@protocol_name) body += encode_bytes(@protocol_version.to_i) + if @keep_alive < 0 + raise "Invalid keep-alive value: cannot be less than 0" + end + # Set the Connect flags @connect_flags = 0 @connect_flags |= 0x02 if @clean_session @connect_flags |= 0x04 unless @will_topic.nil? @connect_flags |= ((@will_qos & 0x03) << 3) @@ -341,10 +388,11 @@ body += encode_short(@keep_alive) body += encode_string(@client_id) unless will_topic.nil? body += encode_string(@will_topic) + # The MQTT v3.1 specification says that the payload is a UTF-8 string body += encode_string(@will_payload) end body += encode_string(@username) unless @username.nil? body += encode_string(@password) unless @password.nil? return body @@ -352,29 +400,53 @@ # Parse the body (variable header and payload) of a Connect packet def parse_body(buffer) super(buffer) @protocol_name = shift_string(buffer) - @protocol_version = shift_byte(buffer) + @protocol_version = shift_byte(buffer).to_i + + if @protocol_name != 'MQIsdp' + raise ProtocolException.new( + "Unsupported protocol name: #{@protocol_name}" + ) + end + + if @protocol_version != 3 + raise ProtocolException.new( + "Unsupported protocol version: #{@protocol_version}" + ) + end + @connect_flags = shift_byte(buffer) @clean_session = ((@connect_flags & 0x02) >> 1) == 0x01 @keep_alive = shift_short(buffer) @client_id = shift_string(buffer) if ((@connect_flags & 0x04) >> 2) == 0x01 # Last Will and Testament @will_qos = ((@connect_flags & 0x18) >> 3) @will_retain = ((@connect_flags & 0x20) >> 5) == 0x01 @will_topic = shift_string(buffer) + # The MQTT v3.1 specification says that the payload is a UTF-8 string @will_payload = shift_string(buffer) end - if ((@connect_flags & 0x80) >> 7) == 0x01 and buffer.length > 0 + if ((@connect_flags & 0x80) >> 7) == 0x01 and buffer.bytesize > 0 @username = shift_string(buffer) end - if ((@connect_flags & 0x40) >> 6) == 0x01 and buffer.length > 0 + if ((@connect_flags & 0x40) >> 6) == 0x01 and buffer.bytesize > 0 @password = shift_string(buffer) end end + + def inspect + str = "\#<#{self.class}: " + str += "keep_alive=#{keep_alive}" + str += ", clean" if clean_session + str += ", client_id='#{client_id}'" + str += ", username='#{username}'" unless username.nil? + str += ", password=..." unless password.nil? + str += ">" + end end # Class representing an MQTT Connect Acknowledgment Packet class Connack < MQTT::Packet attr_accessor :return_code @@ -414,16 +486,20 @@ end # Parse the body (variable header and payload) of a Connect Acknowledgment packet def parse_body(buffer) super(buffer) - unused = shift_byte(buffer) + _unused = shift_byte(buffer) @return_code = shift_byte(buffer) unless buffer.empty? raise ProtocolException.new("Extra bytes at end of Connect Acknowledgment packet") end end + + def inspect + "\#<#{self.class}: 0x%2.2X>" % return_code + end end # Class representing an MQTT Publish Acknowledgment packet class Puback < MQTT::Packet attr_accessor :message_id @@ -445,10 +521,14 @@ @message_id = shift_short(buffer) unless buffer.empty? raise ProtocolException.new("Extra bytes at end of Publish Acknowledgment packet") end end + + def inspect + "\#<#{self.class}: 0x%2.2X>" % message_id + end end # Class representing an MQTT Publish Received packet class Pubrec < MQTT::Packet attr_accessor :message_id @@ -470,10 +550,14 @@ @message_id = shift_short(buffer) unless buffer.empty? raise ProtocolException.new("Extra bytes at end of Publish Received packet") end end + + def inspect + "\#<#{self.class}: 0x%2.2X>" % message_id + end end # Class representing an MQTT Publish Release packet class Pubrel < MQTT::Packet attr_accessor :message_id @@ -495,10 +579,14 @@ @message_id = shift_short(buffer) unless buffer.empty? raise ProtocolException.new("Extra bytes at end of Publish Release packet") end end + + def inspect + "\#<#{self.class}: 0x%2.2X>" % message_id + end end # Class representing an MQTT Publish Complete packet class Pubcomp < MQTT::Packet attr_accessor :message_id @@ -520,10 +608,14 @@ @message_id = shift_short(buffer) unless buffer.empty? raise ProtocolException.new("Extra bytes at end of Publish Complete packet") end end + + def inspect + "\#<#{self.class}: 0x%2.2X>" % message_id + end end # Class representing an MQTT Client Subscribe packet class Subscribe < MQTT::Packet attr_accessor :message_id @@ -595,16 +687,23 @@ # Parse the body (variable header and payload) of a packet def parse_body(buffer) super(buffer) @message_id = shift_short(buffer) @topics = [] - while(buffer.length>0) + while(buffer.bytesize>0) topic_name = shift_string(buffer) topic_qos = shift_byte(buffer) @topics << [topic_name,topic_qos] end end + + def inspect + str = "\#<#{self.class}: 0x%2.2X, %s>" % [ + message_id, + topics.map {|t| "'#{t[0]}':#{t[1]}"}.join(', ') + ] + end end # Class representing an MQTT Subscribe Acknowledgment packet class Suback < MQTT::Packet attr_accessor :message_id @@ -641,14 +740,18 @@ # Parse the body (variable header and payload) of a packet def parse_body(buffer) super(buffer) @message_id = shift_short(buffer) - while(buffer.length>0) + while(buffer.bytesize>0) @granted_qos << shift_byte(buffer) end end + + def inspect + "\#<#{self.class}: 0x%2.2X, qos=%s>" % [message_id, granted_qos.join(',')] + end end # Class representing an MQTT Client Unsubscribe packet class Unsubscribe < MQTT::Packet attr_reader :topics @@ -682,14 +785,21 @@ # Parse the body (variable header and payload) of a packet def parse_body(buffer) super(buffer) @message_id = shift_short(buffer) - while(buffer.length>0) + while(buffer.bytesize>0) @topics << shift_string(buffer) end end + + def inspect + str = "\#<#{self.class}: 0x%2.2X, %s>" % [ + message_id, + topics.map {|t| "'#{t}'"}.join(', ') + ] + end end # Class representing an MQTT Unsubscribe Acknowledgment packet class Unsuback < MQTT::Packet attr_accessor :message_id @@ -711,10 +821,14 @@ @message_id = shift_short(buffer) unless buffer.empty? raise ProtocolException.new("Extra bytes at end of Unsubscribe Acknowledgment packet") end end + + def inspect + "\#<#{self.class}: 0x%2.2X>" % message_id + end end # Class representing an MQTT Ping Request packet class Pingreq < MQTT::Packet # Create a new Ping Request packet @@ -760,10 +874,9 @@ unless buffer.empty? raise ProtocolException.new("Extra bytes at end of Disconnect packet") end end end - end # An enumeration of the MQTT packet types PACKET_TYPES = [