lib/mqtt/packet.rb in mqtt-0.0.1 vs lib/mqtt/packet.rb in mqtt-0.0.2

- old
+ new

@@ -4,33 +4,42 @@ module MQTT # Class representing a MQTT Packet # Performs binary encoding and decoding of headers - class Packet #:nodoc: all + class Packet attr_reader :type # The packet type attr_reader :dup # Duplicate delivery flag attr_reader :retain # Retain flag attr_reader :qos # Quality of Service level attr_reader :body # Packet's body (everything after fixed header) # Read in a packet from a socket - def self.read(sock) - header = sock.read(2) - raise MQTT::ProtocolException if header.nil? - byte1,byte2 = header.unpack('C*') + def self.read(socket) - # FIXME: support decoding of multi-byte length header - + # Create a packet object + header = read_byte(socket) packet = MQTT::Packet.new( - :type => ((byte1 & 0xF0) >> 4), - :dup => ((byte1 & 0x08) >> 3), - :qos => ((byte1 & 0x06) >> 1), - :retain => ((byte1 & 0x01) >> 0) + :type => ((header & 0xF0) >> 4), + :dup => ((header & 0x08) >> 3), + :qos => ((header & 0x06) >> 1), + :retain => ((header & 0x01) >> 0) ) - packet.body = sock.read(byte2) + + # Read in the packet length + multiplier = 1 + body_len = 0 + begin + digit = read_byte(socket) + body_len += ((digit & 0x7F) * multiplier) + multiplier *= 0x80 + end while ((digit & 0x80) != 0x00) + # FIXME: only allow 4 bytes? + # Read in the packet body + packet.body = socket.read(body_len) + return packet end # Create a new empty packet def initialize(args={}) @@ -39,10 +48,14 @@ self.qos = args[:qos] || 0 self.retain = args[:retain] || false self.body = args[:body] || '' end + # Set the packet type + # Can either by the packet type id (integer) + # Or the packet type as a symbol/string + # See the MQTT module for an enumeration of packet types. def type=(arg) if arg.kind_of?(Integer) # Convert type identifier to symbol @type = MQTT::PACKET_TYPES[arg] else @@ -57,31 +70,36 @@ index = MQTT::PACKET_TYPES.index(@type) raise "Invalid packet type: #{@type}" if index.nil? return index end + # Set the dup flag (true/false) def dup=(arg) if arg.kind_of?(Integer) @dup = (arg != 0 ? true : false) else @dup = arg end end + # Set the retain flag (true/false) def retain=(arg) if arg.kind_of?(Integer) @retain = (arg != 0 ? true : false) else @retain = arg end end + # Set the Quality of Service level (0/1/2) def qos=(arg) @qos = arg.to_i end + # Set (replace) the packet body def body=(arg) + # FIXME: only allow 268435455 bytes? @body = arg.to_s end @@ -135,25 +153,44 @@ end # Serialise the packet def to_s - # Encode the 2-byte fixed header + # Encode the fixed header header = [ ((type_id.to_i & 0x0F) << 4) | ((dup ? 0x1 : 0x0) << 3) | ((qos.to_i & 0x03) << 1) | - (retain ? 0x1 : 0x0), - (@body.length & 0x7F) + (retain ? 0x1 : 0x0) ] - # FIXME: support multi-byte length header + + # Build up the body length field bytes + body_size = @body.size + begin + digit = (body_size % 128) + body_size = (body_size / 128) + # if there are more digits to encode, set the top bit of this digit + digit |= 0x80 if (body_size > 0) + header.push(digit) + end while (body_size > 0) + + # Convert header to binary and add on body header.pack('C*') + @body end def inspect format("#<MQTT::Packet:0x%01x ", object_id)+ "type=#{@type}, dup=#{@dup}, retain=#{@retain}, "+ "qos=#{@qos}, body.size=#{@body.size}>" + end + + private + + # Read and unpack a single byte from socket + def self.read_byte(socket) + byte = socket.read(1) + raise MQTT::ProtocolException if byte.nil? + byte.unpack('C').first end end end