lib/mqtt/packet.rb in mqtt-0.0.1 vs lib/mqtt/packet.rb in mqtt-0.0.2
- old
+ new
@@ -4,33 +4,42 @@
module MQTT
# Class representing a MQTT Packet
# Performs binary encoding and decoding of headers
- class Packet #:nodoc: all
+ class Packet
attr_reader :type # The packet type
attr_reader :dup # Duplicate delivery flag
attr_reader :retain # Retain flag
attr_reader :qos # Quality of Service level
attr_reader :body # Packet's body (everything after fixed header)
# Read in a packet from a socket
- def self.read(sock)
- header = sock.read(2)
- raise MQTT::ProtocolException if header.nil?
- byte1,byte2 = header.unpack('C*')
+ def self.read(socket)
- # FIXME: support decoding of multi-byte length header
-
+ # Create a packet object
+ header = read_byte(socket)
packet = MQTT::Packet.new(
- :type => ((byte1 & 0xF0) >> 4),
- :dup => ((byte1 & 0x08) >> 3),
- :qos => ((byte1 & 0x06) >> 1),
- :retain => ((byte1 & 0x01) >> 0)
+ :type => ((header & 0xF0) >> 4),
+ :dup => ((header & 0x08) >> 3),
+ :qos => ((header & 0x06) >> 1),
+ :retain => ((header & 0x01) >> 0)
)
- packet.body = sock.read(byte2)
+
+ # Read in the packet length
+ multiplier = 1
+ body_len = 0
+ begin
+ digit = read_byte(socket)
+ body_len += ((digit & 0x7F) * multiplier)
+ multiplier *= 0x80
+ end while ((digit & 0x80) != 0x00)
+ # FIXME: only allow 4 bytes?
+ # Read in the packet body
+ packet.body = socket.read(body_len)
+
return packet
end
# Create a new empty packet
def initialize(args={})
@@ -39,10 +48,14 @@
self.qos = args[:qos] || 0
self.retain = args[:retain] || false
self.body = args[:body] || ''
end
+ # Set the packet type
+ # Can either by the packet type id (integer)
+ # Or the packet type as a symbol/string
+ # See the MQTT module for an enumeration of packet types.
def type=(arg)
if arg.kind_of?(Integer)
# Convert type identifier to symbol
@type = MQTT::PACKET_TYPES[arg]
else
@@ -57,31 +70,36 @@
index = MQTT::PACKET_TYPES.index(@type)
raise "Invalid packet type: #{@type}" if index.nil?
return index
end
+ # Set the dup flag (true/false)
def dup=(arg)
if arg.kind_of?(Integer)
@dup = (arg != 0 ? true : false)
else
@dup = arg
end
end
+ # Set the retain flag (true/false)
def retain=(arg)
if arg.kind_of?(Integer)
@retain = (arg != 0 ? true : false)
else
@retain = arg
end
end
+ # Set the Quality of Service level (0/1/2)
def qos=(arg)
@qos = arg.to_i
end
+ # Set (replace) the packet body
def body=(arg)
+ # FIXME: only allow 268435455 bytes?
@body = arg.to_s
end
@@ -135,25 +153,44 @@
end
# Serialise the packet
def to_s
- # Encode the 2-byte fixed header
+ # Encode the fixed header
header = [
((type_id.to_i & 0x0F) << 4) |
((dup ? 0x1 : 0x0) << 3) |
((qos.to_i & 0x03) << 1) |
- (retain ? 0x1 : 0x0),
- (@body.length & 0x7F)
+ (retain ? 0x1 : 0x0)
]
- # FIXME: support multi-byte length header
+
+ # Build up the body length field bytes
+ body_size = @body.size
+ begin
+ digit = (body_size % 128)
+ body_size = (body_size / 128)
+ # if there are more digits to encode, set the top bit of this digit
+ digit |= 0x80 if (body_size > 0)
+ header.push(digit)
+ end while (body_size > 0)
+
+ # Convert header to binary and add on body
header.pack('C*') + @body
end
def inspect
format("#<MQTT::Packet:0x%01x ", object_id)+
"type=#{@type}, dup=#{@dup}, retain=#{@retain}, "+
"qos=#{@qos}, body.size=#{@body.size}>"
+ end
+
+ private
+
+ # Read and unpack a single byte from socket
+ def self.read_byte(socket)
+ byte = socket.read(1)
+ raise MQTT::ProtocolException if byte.nil?
+ byte.unpack('C').first
end
end
end