lib/mqtt/packet.rb in mqtt-0.0.3 vs lib/mqtt/packet.rb in mqtt-0.0.4

- old
+ new

@@ -5,23 +5,23 @@ module MQTT # Class representing a MQTT Packet # Performs binary encoding and decoding of headers class Packet - attr_reader :type # The packet type attr_reader :dup # Duplicate delivery flag attr_reader :retain # Retain flag attr_reader :qos # Quality of Service level - attr_reader :body # Packet's body (everything after fixed header) # Read in a packet from a socket def self.read(socket) - - # Create a packet object + # Read in the packet header and work out the class header = read_byte(socket) - packet = MQTT::Packet.new( - :type => ((header & 0xF0) >> 4), + type_id = ((header & 0xF0) >> 4) + packet_class = MQTT::PACKET_TYPES[type_id] + + # Create a new packet object + packet = packet_class.new( :dup => ((header & 0x08) >> 3), :qos => ((header & 0x06) >> 1), :retain => ((header & 0x01) >> 0) ) @@ -34,43 +34,26 @@ multiplier *= 0x80 end while ((digit & 0x80) != 0x00) # FIXME: only allow 4 bytes? # Read in the packet body - packet.body = socket.read(body_len) + packet.parse_body( socket.read(body_len) ) return packet end # Create a new empty packet def initialize(args={}) - self.type = args[:type] || :invalid self.dup = args[:dup] || false self.qos = args[:qos] || 0 self.retain = args[:retain] || false - self.body = args[:body] || '' end - # Set the packet type - # Can either by the packet type id (integer) - # Or the packet type as a symbol/string - # See the MQTT module for an enumeration of packet types. - def type=(arg) - if arg.kind_of?(Integer) - # Convert type identifier to symbol - @type = MQTT::PACKET_TYPES[arg] - else - @type = arg.to_sym - # FIXME: raise exception if packet type is invalid? - end - end - - # Return the identifer for this packet type + # Get the identifer for this packet type def type_id - raise "No packet type set for this packet" if @type.nil? - index = MQTT::PACKET_TYPES.index(@type) - raise "Invalid packet type: #{@type}" if index.nil? + index = MQTT::PACKET_TYPES.index(self.class) + raise "Invalid packet type: #{self.class}" if index.nil? return index end # Set the dup flag (true/false) def dup=(arg) @@ -92,106 +75,533 @@ # Set the Quality of Service level (0/1/2) def qos=(arg) @qos = arg.to_i end - - # Set (replace) the packet body - def body=(arg) - # FIXME: only allow 268435455 bytes? - @body = arg.to_s - end - - - - - # Add an array of bytes to the end of the packet's body - def add_bytes(*bytes) - @body += bytes.pack('C*') + # Parse the body (variable header and payload) of a packet + def parse_body(buffer) + unless buffer.size == 0 + raise MQTT::ProtocolException.new("Error: parse_body was not sub-classed for a packet with a payload") + end end - - # Add a 16-bit unsigned integer to the end of the packet's body - def add_short(val) - @body += [val.to_i].pack('n') - end - # Add some raw data to the end of the packet's body - def add_data(data) - data = data.to_s unless data.is_a?(String) - @body += data + # Get serialisation of packet's body (variable header and payload) + def encode_body + '' # No body by default end - # Add a string to the end of the packet's body - # (preceded by the length of the string) - def add_string(str) - str = str.to_s unless str.is_a?(String) - add_short(str.size) - add_data(str) - end - - - # Remove a 16-bit unsigned integer from the front on the body - def shift_short - bytes = @body.slice!(0..1) - bytes.unpack('n').first - end - # Remove n bytes from the front on the body - def shift_bytes(bytes) - @body.slice!(0...bytes).unpack('C*') - end - - # Remove n bytes from the front on the body - def shift_data(bytes) - @body.slice!(0...bytes) - end - - # Remove string from the front on the body - def shift_string - len = shift_short - shift_data(len) - end - - # Serialise the packet def to_s # Encode the fixed header header = [ ((type_id.to_i & 0x0F) << 4) | ((dup ? 0x1 : 0x0) << 3) | ((qos.to_i & 0x03) << 1) | (retain ? 0x1 : 0x0) ] + # Get the packet's variable header and payload + body = self.encode_body + # Build up the body length field bytes - body_size = @body.size + body_size = body.size begin digit = (body_size % 128) body_size = (body_size / 128) # if there are more digits to encode, set the top bit of this digit digit |= 0x80 if (body_size > 0) header.push(digit) end while (body_size > 0) # Convert header to binary and add on body - header.pack('C*') + @body + header.pack('C*') + body end + + + protected - def inspect - format("#<MQTT::Packet:0x%01x ", object_id)+ - "type=#{@type}, dup=#{@dup}, retain=#{@retain}, "+ - "qos=#{@qos}, body.size=#{@body.size}>" + # Encode an array of bytes and return them + def encode_bytes(*bytes) + bytes.pack('C*') end + + # Encode a 16-bit unsigned integer and return it + def encode_short(val) + [val.to_i].pack('n') + end + + # Encode a string and return it + # (preceded by the length of the string) + def encode_string(str) + str = str.to_s unless str.is_a?(String) + encode_short(str.size) + str + end + # Remove a 16-bit unsigned integer from the front of buffer + def shift_short(buffer) + bytes = buffer.slice!(0..1) + bytes.unpack('n').first + end + + # Remove one byte from the front of the string + def shift_byte(buffer) + buffer.slice!(0...1).unpack('C').first + end + + # Remove n bytes from the front of buffer + def shift_data(buffer,bytes) + buffer.slice!(0...bytes) + end + + # Remove string from the front of buffer + def shift_string(buffer) + len = shift_short(buffer) + shift_data(buffer,len) + end + + private - # Read and unpack a single byte from socket + # Read and unpack a single byte from a socket def self.read_byte(socket) byte = socket.read(1) raise MQTT::ProtocolException if byte.nil? byte.unpack('C').first end + + + + ## PACKET SUBCLASSES ## + + # Class representing an MQTT Publish message + class Publish < MQTT::Packet + attr_accessor :topic + attr_accessor :message_id + attr_accessor :payload + + # Create a new Unsubscribe Acknowledgment packet + def initialize(args={}) + super(args) + self.topic = args[:topic] || nil + self.message_id = args[:message_id] || 0 + self.payload = args[:payload] || '' + end + + # Get serialisation of packet's body + def encode_body + body = '' + raise "Invalid topic name when serialising packet" if @topic.nil? + body += encode_string(@topic) + body += encode_short(@message_id) unless qos == 0 + body += payload.to_s + return body + end + + # Parse the body (variable header and payload) of a Publish packet + def parse_body(buffer) + @topic = shift_string(buffer) + @message_id = shift_short(buffer) unless qos == 0 + @payload = buffer.dup + end + end + + # Class representing an MQTT Connect Packet + class Connect < MQTT::Packet + attr_accessor :protocol_name + attr_accessor :protocol_version + attr_accessor :client_id + attr_accessor :clean_start + attr_accessor :keep_alive + attr_accessor :will_topic + attr_accessor :will_qos + attr_accessor :will_retain + attr_accessor :will_payload + + # Create a new Client Connect packet + def initialize(args={}) + super(args) + self.protocol_name = args[:protocol_name] || 'MQIsdp' + self.protocol_version = args[:protocol_version] || 0x03 + self.client_id = args[:client_id] || nil + self.clean_start = args[:clean_start] || true + self.keep_alive = args[:keep_alive] || 10 + self.will_topic = args[:will_topic] || nil + self.will_qos = args[:will_qos] || 0 + self.will_retain = args[:will_retain] || false + self.will_payload = args[:will_payload] || '' + end + + # Get serialisation of packet's body + def encode_body + body = '' + raise "Invalid client identifier when serialising packet" if @client_id.nil? + body += encode_string(@protocol_name) + body += encode_bytes(@protocol_version.to_i) + body += encode_bytes(0) # Connect Flags + body += encode_short(@keep_alive) # Keep Alive timer + body += encode_string(@client_id) + # FIXME: implement Will + #unless @will_topic.nil? + # body += encode_string(@will_topic) + # body += will_payload.to_s + #end + return body + end + + # Parse the body (variable header and payload) of a Connect packet + def parse_body(buffer) + @protocol_name = shift_string(buffer) + @protocol_version = shift_byte(buffer) + flags = shift_byte(buffer) + @keep_alive = shift_short(buffer) + @client_id = shift_string(buffer) + # FIXME: implement Will + end + end + + # Class representing an MQTT Connect Acknowledgment Packet + class Connack < MQTT::Packet + attr_accessor :return_code + + # Create a new Client Connect packet + def initialize(args={}) + super(args) + self.return_code = args[:return_code] || 0 + end + + # Get a string message corresponding to a return code + def return_msg + case return_code + when 0x00 + "Connection Accepted" + when 0x01 + "Connection refused: unacceptable protocol version" + when 0x02 + "Connection refused: client identifier rejected" + when 0x03 + "Connection refused: broker unavailable" + else + "Connection refused: error code #{return_code}" + end + end + + # Get serialisation of packet's body + def encode_body + body = '' + body += encode_bytes(0) # Unused + body += encode_bytes(@return_code.to_i) # Return Code + return body + end + + # Parse the body (variable header and payload) of a Connect Acknowledgment packet + def parse_body(buffer) + unused = shift_byte(buffer) + @return_code = shift_byte(buffer) + end + end + + # Class representing an MQTT Publish Acknowledgment packet + class Puback < MQTT::Packet + attr_accessor :message_id + + # Create a new Unsubscribe Acknowledgment packet + def initialize(args={}) + super(args) + self.message_id = args[:message_id] || 0 + end + + # Get serialisation of packet's body + def encode_body + encode_short(@message_id) + end + + # Parse the body (variable header and payload) of a packet + def parse_body(buffer) + @message_id = shift_short(buffer) + end + end + + # Class representing an MQTT Publish Received packet + class Pubrec < MQTT::Packet + attr_accessor :message_id + + # Create a new Unsubscribe Acknowledgment packet + def initialize(args={}) + super(args) + self.message_id = args[:message_id] || 0 + end + + # Get serialisation of packet's body + def encode_body + encode_short(@message_id) + end + + # Parse the body (variable header and payload) of a packet + def parse_body(buffer) + @message_id = shift_short(buffer) + end + end + + # Class representing an MQTT Publish Release packet + class Pubrel < MQTT::Packet + attr_accessor :message_id + + # Create a new Unsubscribe Acknowledgment packet + def initialize(args={}) + super(args) + self.message_id = args[:message_id] || 0 + end + + # Get serialisation of packet's body + def encode_body + encode_short(@message_id) + end + + # Parse the body (variable header and payload) of a packet + def parse_body(buffer) + @message_id = shift_short(buffer) + end + end + + # Class representing an MQTT Publish Complete packet + class Pubcomp < MQTT::Packet + attr_accessor :message_id + + # Create a new Unsubscribe Acknowledgment packet + def initialize(args={}) + super(args) + self.message_id = args[:message_id] || 0 + end + + # Get serialisation of packet's body + def encode_body + encode_short(@message_id) + end + + # Parse the body (variable header and payload) of a packet + def parse_body(buffer) + @message_id = shift_short(buffer) + end + end + + # Class representing an MQTT Client Subscribe packet + class Subscribe < MQTT::Packet + attr_reader :topics + attr_accessor :message_id + + # Create a new Unsubscribe Acknowledgment packet + def initialize(args={}) + super(args) + self.topics = args[:topics] || [] + self.message_id = args[:message_id] || 0 + self.qos = 1 # Force a QOS of 1 + end + + # Set one or more topics for the Subscrible packet + # The topics parameter should be one of the following: + # * String: subscribe to one topic with QOS 0 + # * Array: subscribe to multiple topics with QOS 0 + # * Hash: subscribe to multiple topics where the key is the topic and the value is the QOS level + # + # For example: + # packet.topics = 'a/b' + # packet.topics = ['a/b', 'c/d'] + # packet.topics = [['a/b',0], ['c/d',1]] + # packet.topics = {'a/b' => 0, 'c/d' => 1} + # + def topics=(value) + # Get input into a consistent state + if value.is_a?(Array) + input = value.flatten + else + input = [value] + end + + @topics = [] + while(input.size>0) + item = input.shift + if item.is_a?(Hash) + # Convert hash into an ordered array of arrays + @topics += item.sort + elsif item.is_a?(String) + # Peek at the next item in the array, and remove it if it is an integer + if input.first.is_a?(Integer) + qos = input.shift + @topics << [item,qos] + else + @topics << [item,0] + end + else + # Meh? + raise "Invalid topics input: #{value.inspect}" + end + end + @topics + end + + # Get serialisation of packet's body + def encode_body + raise "no topics given when serialising packet" if @topics.empty? + body = encode_short(@message_id) + topics.each do |item| + body += encode_string(item[0]) + body += encode_bytes(item[1]) + end + return body + end + + # Parse the body (variable header and payload) of a packet + def parse_body(buffer) + @message_id = shift_short(buffer) + @topics = [] + while(buffer.size>0) + topic_name = shift_string(buffer) + topic_qos = shift_byte(buffer) + @topics << [topic_name,topic_qos] + end + end + end + + # Class representing an MQTT Subscribe Acknowledgment packet + class Suback < MQTT::Packet + attr_accessor :message_id + attr_reader :granted_qos + + # Create a new Unsubscribe Acknowledgment packet + def initialize(args={}) + super(args) + self.message_id = args[:message_id] || 0 + self.granted_qos = args[:granted_qos] || [] + end + + def granted_qos=(value) + raise "granted QOS should be an array of arrays" unless value.is_a?(Array) + @granted_qos = value + end + + # Get serialisation of packet's body + def encode_body + raise "no granted QOS given when serialising packet" if @granted_qos.empty? + body = encode_short(@message_id) + granted_qos.flatten.each { |qos| body += encode_bytes(qos) } + return body + end + + # Parse the body (variable header and payload) of a packet + def parse_body(buffer) + @message_id = shift_short(buffer) + while(buffer.size>0) + @granted_qos << [shift_byte(buffer),shift_byte(buffer)] + end + end + end + + # Class representing an MQTT Client Unsubscribe packet + class Unsubscribe < MQTT::Packet + attr_reader :topics + attr_accessor :message_id + + # Create a new Unsubscribe Acknowledgment packet + def initialize(args={}) + super(args) + self.topics = args[:topics] || [] + self.message_id = args[:message_id] || 0 + self.qos = 1 # Force a QOS of 1 + end + + def topics=(value) + if value.is_a?(Array) + @topics = value + else + @topics = [value] + end + end + + # Get serialisation of packet's body + def encode_body + raise "no topics given when serialising packet" if @topics.empty? + body = encode_short(@message_id) + topics.each { |topic| body += encode_string(topic) } + return body + end + + # Parse the body (variable header and payload) of a packet + def parse_body(buffer) + @message_id = shift_short(buffer) + while(buffer.size>0) + @topics << shift_string(buffer) + end + end + end + + # Class representing an MQTT Unsubscribe Acknowledgment packet + class Unsuback < MQTT::Packet + attr_accessor :message_id + + # Create a new Unsubscribe Acknowledgment packet + def initialize(args={}) + super(args) + self.message_id = args[:message_id] || 0 + end + + # Get serialisation of packet's body + def encode_body + encode_short(@message_id) + end + + # Parse the body (variable header and payload) of a packet + def parse_body(buffer) + @message_id = shift_short(buffer) + end + end + + # Class representing an MQTT Ping Request packet + class Pingreq < MQTT::Packet + # Create a new Ping Request packet + def initialize(args={}) + super(args) + end + end + + # Class representing an MQTT Ping Response packet + class Pingresp < MQTT::Packet + # Create a new Ping Response packet + def initialize(args={}) + super(args) + end + end + + # Class representing an MQTT Client Disconnect packet + class Disconnect < MQTT::Packet + # Create a new Client Disconnect packet + def initialize(args={}) + super(args) + end + end + end + + + # An enumeration of the MQTT packet types + PACKET_TYPES = [ + nil, + MQTT::Packet::Connect, + MQTT::Packet::Connack, + MQTT::Packet::Publish, + MQTT::Packet::Puback, + MQTT::Packet::Pubrec, + MQTT::Packet::Pubrel, + MQTT::Packet::Pubcomp, + MQTT::Packet::Subscribe, + MQTT::Packet::Suback, + MQTT::Packet::Unsubscribe, + MQTT::Packet::Unsuback, + MQTT::Packet::Pingreq, + MQTT::Packet::Pingresp, + MQTT::Packet::Disconnect, + nil + ] end