lib/mqtt/packet.rb in mqtt-0.0.9 vs lib/mqtt/packet.rb in mqtt-0.1.0
- old
+ new
@@ -1,5 +1,7 @@
+# encoding: BINARY
+
module MQTT
# Class representing a MQTT Packet
# Performs binary encoding and decoding of headers
class MQTT::Packet
@@ -15,31 +17,25 @@
:body_length => nil
}
# Read in a packet from a socket
def self.read(socket)
- # Read in the packet header and work out the class
- header = read_byte(socket)
- type_id = ((header & 0xF0) >> 4)
- packet_class = MQTT::PACKET_TYPES[type_id]
-
- # Create a new packet object
- packet = packet_class.new(
- :duplicate => ((header & 0x08) >> 3),
- :qos => ((header & 0x06) >> 1),
- :retain => ((header & 0x01) >> 0)
+ # Read in the packet header and create a new packet object
+ packet = create_from_header(
+ read_byte(socket)
)
# Read in the packet length
multiplier = 1
body_length = 0
+ pos = 1
begin
digit = read_byte(socket)
body_length += ((digit & 0x7F) * multiplier)
multiplier *= 0x80
- end while ((digit & 0x80) != 0x00)
- # FIXME: only allow 4 bytes?
+ pos += 1
+ end while ((digit & 0x80) != 0x00) and pos <= 4
# Store the expected body length in the packet
packet.instance_variable_set('@body_length', body_length)
# Read in the packet body
@@ -56,48 +52,59 @@
end
# Parse the header and create a new packet object of the correct type
# The header is removed from the buffer passed into this function
def self.parse_header(buffer)
- # Work out the class
- type_id = ((buffer.unpack("C*")[0] & 0xF0) >> 4)
- packet_class = MQTT::PACKET_TYPES[type_id]
- if packet_class.nil?
- raise ProtocolException.new("Invalid packet type identifier: #{type_id}")
+ # Check that the packet is a long as the minimum packet size
+ if buffer.bytesize < 2
+ raise ProtocolException.new("Invalid packet: less than 2 bytes long")
end
# Create a new packet object
- packet = packet_class.new(
- :duplicate => ((buffer.unpack("C*")[0] & 0x08) >> 3) == 0x01,
- :qos => ((buffer.unpack("C*")[0] & 0x06) >> 1),
- :retain => ((buffer.unpack("C*")[0] & 0x01) >> 0) == 0x01
- )
+ bytes = buffer.unpack("C5")
+ packet = create_from_header(bytes.first)
# Parse the packet length
body_length = 0
multiplier = 1
pos = 1
begin
- if buffer.length <= pos
+ if buffer.bytesize <= pos
raise ProtocolException.new("The packet length header is incomplete")
end
- digit = buffer.unpack("C*")[pos]
+ digit = bytes[pos]
body_length += ((digit & 0x7F) * multiplier)
multiplier *= 0x80
pos += 1
end while ((digit & 0x80) != 0x00) and pos <= 4
# Store the expected body length in the packet
packet.instance_variable_set('@body_length', body_length)
- # Delete the variable length header from the raw packet passed in
+ # Delete the fixed header from the raw packet passed in
buffer.slice!(0...pos)
return packet
end
+ # Create a new packet object from the first byte of a MQTT packet
+ def self.create_from_header(byte)
+ # Work out the class
+ type_id = ((byte & 0xF0) >> 4)
+ packet_class = MQTT::PACKET_TYPES[type_id]
+ if packet_class.nil?
+ raise ProtocolException.new("Invalid packet type identifier: #{type_id}")
+ end
+ # Create a new packet object
+ packet_class.new(
+ :duplicate => ((byte & 0x08) >> 3) == 0x01,
+ :qos => ((byte & 0x06) >> 1),
+ :retain => ((byte & 0x01) >> 0) == 0x01
+ )
+ end
+
# Create a new empty packet
def initialize(args={})
update_attributes(DEFAULTS.merge(args))
end
@@ -135,22 +142,25 @@
end
# Set the Quality of Service level (0/1/2)
def qos=(arg)
@qos = arg.to_i
+ if @qos < 0 or @qos > 2
+ raise "Invalid QoS value: #{@qos}"
+ end
end
# Set the length of the packet body
def body_length=(arg)
@body_length = arg.to_i
end
# Parse the body (variable header and payload) of a packet
def parse_body(buffer)
- if buffer.length != body_length
+ if buffer.bytesize != body_length
raise ProtocolException.new(
- "Failed to parse packet - input buffer (#{buffer.length}) is not the same as the body length buffer (#{body_length})"
+ "Failed to parse packet - input buffer (#{buffer.bytesize}) is not the same as the body length header (#{body_length})"
)
end
end
# Get serialisation of packet's body (variable header and payload)
@@ -170,12 +180,17 @@
]
# Get the packet's variable header and payload
body = self.encode_body
+ # Check that that packet isn't too big
+ body_length = body.bytesize
+ if body_length > 268435455
+ raise "Error serialising packet: body is more than 256MB"
+ end
+
# Build up the body length field bytes
- body_length = body.length
begin
digit = (body_length % 128)
body_length = (body_length / 128)
# if there are more digits to encode, set the top bit of this digit
digit |= 0x80 if (body_length > 0)
@@ -184,10 +199,13 @@
# Convert header to binary and add on body
header.pack('C*') + body
end
+ def inspect
+ "\#<#{self.class}>"
+ end
protected
# Encode an array of bytes and return them
def encode_bytes(*bytes)
@@ -197,15 +215,18 @@
# Encode a 16-bit unsigned integer and return it
def encode_short(val)
[val.to_i].pack('n')
end
- # Encode a string and return it
+ # Encode a UTF-8 string and return it
# (preceded by the length of the string)
def encode_string(str)
- str = str.to_s unless str.is_a?(String)
- encode_short(str.length) + str
+ str = str.to_s.encode('UTF-8')
+
+ # Force to binary, when assembling the packet
+ str.force_encoding('ASCII-8BIT')
+ encode_short(str.bytesize) + str
end
# Remove a 16-bit unsigned integer from the front of buffer
def shift_short(buffer)
bytes = buffer.slice!(0..1)
@@ -223,21 +244,23 @@
end
# Remove string from the front of buffer
def shift_string(buffer)
len = shift_short(buffer)
- shift_data(buffer,len)
+ str = shift_data(buffer,len)
+ # Strings in MQTT v3.1 are all UTF-8
+ str.force_encoding('UTF-8')
end
private
# Read and unpack a single byte from a socket
def self.read_byte(socket)
byte = socket.read(1)
if byte.nil?
- raise ProtocolException
+ raise ProtocolException.new("Failed to read byte from socket")
end
byte.unpack('C').first
end
@@ -263,26 +286,46 @@
end
# Get serialisation of packet's body
def encode_body
body = ''
- if @topic.nil?
+ if @topic.nil? or @topic.to_s.empty?
raise "Invalid topic name when serialising packet"
end
body += encode_string(@topic)
body += encode_short(@message_id) unless qos == 0
- body += payload.to_s
+ body += payload.to_s.force_encoding('ASCII-8BIT')
return body
end
# Parse the body (variable header and payload) of a Publish packet
def parse_body(buffer)
super(buffer)
@topic = shift_string(buffer)
@message_id = shift_short(buffer) unless qos == 0
- @payload = buffer.dup
+ @payload = buffer
end
+
+ def inspect
+ "\#<#{self.class}: " +
+ "d#{duplicate ? '1' : '0'}, " +
+ "q#{qos}, " +
+ "r#{retain ? '1' : '0'}, " +
+ "m#{message_id}, " +
+ "'#{topic}', " +
+ "#{inspect_payload}>"
+ end
+
+ protected
+ def inspect_payload
+ str = payload.to_s
+ if str.bytesize < 16
+ "'#{str}'"
+ else
+ "... (#{str.bytesize} bytes)"
+ end
+ end
end
# Class representing an MQTT Connect Packet
class Connect < MQTT::Packet
attr_accessor :protocol_name
@@ -321,16 +364,20 @@
end
# Get serialisation of packet's body
def encode_body
body = ''
- if @client_id.nil? or @client_id.length < 1 or @client_id.length > 23
+ if @client_id.nil? or @client_id.bytesize < 1 or @client_id.bytesize > 23
raise "Invalid client identifier when serialising packet"
end
body += encode_string(@protocol_name)
body += encode_bytes(@protocol_version.to_i)
+ if @keep_alive < 0
+ raise "Invalid keep-alive value: cannot be less than 0"
+ end
+
# Set the Connect flags
@connect_flags = 0
@connect_flags |= 0x02 if @clean_session
@connect_flags |= 0x04 unless @will_topic.nil?
@connect_flags |= ((@will_qos & 0x03) << 3)
@@ -341,10 +388,11 @@
body += encode_short(@keep_alive)
body += encode_string(@client_id)
unless will_topic.nil?
body += encode_string(@will_topic)
+ # The MQTT v3.1 specification says that the payload is a UTF-8 string
body += encode_string(@will_payload)
end
body += encode_string(@username) unless @username.nil?
body += encode_string(@password) unless @password.nil?
return body
@@ -352,29 +400,53 @@
# Parse the body (variable header and payload) of a Connect packet
def parse_body(buffer)
super(buffer)
@protocol_name = shift_string(buffer)
- @protocol_version = shift_byte(buffer)
+ @protocol_version = shift_byte(buffer).to_i
+
+ if @protocol_name != 'MQIsdp'
+ raise ProtocolException.new(
+ "Unsupported protocol name: #{@protocol_name}"
+ )
+ end
+
+ if @protocol_version != 3
+ raise ProtocolException.new(
+ "Unsupported protocol version: #{@protocol_version}"
+ )
+ end
+
@connect_flags = shift_byte(buffer)
@clean_session = ((@connect_flags & 0x02) >> 1) == 0x01
@keep_alive = shift_short(buffer)
@client_id = shift_string(buffer)
if ((@connect_flags & 0x04) >> 2) == 0x01
# Last Will and Testament
@will_qos = ((@connect_flags & 0x18) >> 3)
@will_retain = ((@connect_flags & 0x20) >> 5) == 0x01
@will_topic = shift_string(buffer)
+ # The MQTT v3.1 specification says that the payload is a UTF-8 string
@will_payload = shift_string(buffer)
end
- if ((@connect_flags & 0x80) >> 7) == 0x01 and buffer.length > 0
+ if ((@connect_flags & 0x80) >> 7) == 0x01 and buffer.bytesize > 0
@username = shift_string(buffer)
end
- if ((@connect_flags & 0x40) >> 6) == 0x01 and buffer.length > 0
+ if ((@connect_flags & 0x40) >> 6) == 0x01 and buffer.bytesize > 0
@password = shift_string(buffer)
end
end
+
+ def inspect
+ str = "\#<#{self.class}: "
+ str += "keep_alive=#{keep_alive}"
+ str += ", clean" if clean_session
+ str += ", client_id='#{client_id}'"
+ str += ", username='#{username}'" unless username.nil?
+ str += ", password=..." unless password.nil?
+ str += ">"
+ end
end
# Class representing an MQTT Connect Acknowledgment Packet
class Connack < MQTT::Packet
attr_accessor :return_code
@@ -414,16 +486,20 @@
end
# Parse the body (variable header and payload) of a Connect Acknowledgment packet
def parse_body(buffer)
super(buffer)
- unused = shift_byte(buffer)
+ _unused = shift_byte(buffer)
@return_code = shift_byte(buffer)
unless buffer.empty?
raise ProtocolException.new("Extra bytes at end of Connect Acknowledgment packet")
end
end
+
+ def inspect
+ "\#<#{self.class}: 0x%2.2X>" % return_code
+ end
end
# Class representing an MQTT Publish Acknowledgment packet
class Puback < MQTT::Packet
attr_accessor :message_id
@@ -445,10 +521,14 @@
@message_id = shift_short(buffer)
unless buffer.empty?
raise ProtocolException.new("Extra bytes at end of Publish Acknowledgment packet")
end
end
+
+ def inspect
+ "\#<#{self.class}: 0x%2.2X>" % message_id
+ end
end
# Class representing an MQTT Publish Received packet
class Pubrec < MQTT::Packet
attr_accessor :message_id
@@ -470,10 +550,14 @@
@message_id = shift_short(buffer)
unless buffer.empty?
raise ProtocolException.new("Extra bytes at end of Publish Received packet")
end
end
+
+ def inspect
+ "\#<#{self.class}: 0x%2.2X>" % message_id
+ end
end
# Class representing an MQTT Publish Release packet
class Pubrel < MQTT::Packet
attr_accessor :message_id
@@ -495,10 +579,14 @@
@message_id = shift_short(buffer)
unless buffer.empty?
raise ProtocolException.new("Extra bytes at end of Publish Release packet")
end
end
+
+ def inspect
+ "\#<#{self.class}: 0x%2.2X>" % message_id
+ end
end
# Class representing an MQTT Publish Complete packet
class Pubcomp < MQTT::Packet
attr_accessor :message_id
@@ -520,10 +608,14 @@
@message_id = shift_short(buffer)
unless buffer.empty?
raise ProtocolException.new("Extra bytes at end of Publish Complete packet")
end
end
+
+ def inspect
+ "\#<#{self.class}: 0x%2.2X>" % message_id
+ end
end
# Class representing an MQTT Client Subscribe packet
class Subscribe < MQTT::Packet
attr_accessor :message_id
@@ -595,16 +687,23 @@
# Parse the body (variable header and payload) of a packet
def parse_body(buffer)
super(buffer)
@message_id = shift_short(buffer)
@topics = []
- while(buffer.length>0)
+ while(buffer.bytesize>0)
topic_name = shift_string(buffer)
topic_qos = shift_byte(buffer)
@topics << [topic_name,topic_qos]
end
end
+
+ def inspect
+ str = "\#<#{self.class}: 0x%2.2X, %s>" % [
+ message_id,
+ topics.map {|t| "'#{t[0]}':#{t[1]}"}.join(', ')
+ ]
+ end
end
# Class representing an MQTT Subscribe Acknowledgment packet
class Suback < MQTT::Packet
attr_accessor :message_id
@@ -641,14 +740,18 @@
# Parse the body (variable header and payload) of a packet
def parse_body(buffer)
super(buffer)
@message_id = shift_short(buffer)
- while(buffer.length>0)
+ while(buffer.bytesize>0)
@granted_qos << shift_byte(buffer)
end
end
+
+ def inspect
+ "\#<#{self.class}: 0x%2.2X, qos=%s>" % [message_id, granted_qos.join(',')]
+ end
end
# Class representing an MQTT Client Unsubscribe packet
class Unsubscribe < MQTT::Packet
attr_reader :topics
@@ -682,14 +785,21 @@
# Parse the body (variable header and payload) of a packet
def parse_body(buffer)
super(buffer)
@message_id = shift_short(buffer)
- while(buffer.length>0)
+ while(buffer.bytesize>0)
@topics << shift_string(buffer)
end
end
+
+ def inspect
+ str = "\#<#{self.class}: 0x%2.2X, %s>" % [
+ message_id,
+ topics.map {|t| "'#{t}'"}.join(', ')
+ ]
+ end
end
# Class representing an MQTT Unsubscribe Acknowledgment packet
class Unsuback < MQTT::Packet
attr_accessor :message_id
@@ -711,10 +821,14 @@
@message_id = shift_short(buffer)
unless buffer.empty?
raise ProtocolException.new("Extra bytes at end of Unsubscribe Acknowledgment packet")
end
end
+
+ def inspect
+ "\#<#{self.class}: 0x%2.2X>" % message_id
+ end
end
# Class representing an MQTT Ping Request packet
class Pingreq < MQTT::Packet
# Create a new Ping Request packet
@@ -760,10 +874,9 @@
unless buffer.empty?
raise ProtocolException.new("Extra bytes at end of Disconnect packet")
end
end
end
-
end
# An enumeration of the MQTT packet types
PACKET_TYPES = [