lib/mqtt/packet.rb in mqtt-0.2.0 vs lib/mqtt/packet.rb in mqtt-0.3.0

- old
+ new

@@ -3,36 +3,36 @@ module MQTT # Class representing a MQTT Packet # Performs binary encoding and decoding of headers class MQTT::Packet - # Duplicate delivery flag - attr_reader :duplicate + # The version number of the MQTT protocol to use (default 3.1.0) + attr_accessor :version - # Retain flag - attr_reader :retain + # Identifier to link related control packets together + attr_accessor :id - # Quality of Service level (0, 1, 2) - attr_reader :qos + # Array of 4 bits in the fixed header + attr_accessor :flags # The length of the parsed packet body attr_reader :body_length # Default attribute values ATTR_DEFAULTS = { - :duplicate => false, - :qos => 0, - :retain => false, + :version => '3.1.0', + :id => 0, :body_length => nil } # Read in a packet from a socket def self.read(socket) # Read in the packet header and create a new packet object packet = create_from_header( read_byte(socket) ) + packet.validate_flags # Read in the packet length multiplier = 1 body_length = 0 pos = 1 @@ -68,10 +68,11 @@ end # Create a new packet object bytes = buffer.unpack("C5") packet = create_from_header(bytes.first) + packet.validate_flags # Parse the packet length body_length = 0 multiplier = 1 pos = 1 @@ -101,27 +102,32 @@ packet_class = MQTT::PACKET_TYPES[type_id] if packet_class.nil? raise ProtocolException.new("Invalid packet type identifier: #{type_id}") end + # Convert the last 4 bits of byte into array of true/false + flags = (0..3).map { |i| byte & (2 ** i) != 0 } + # Create a new packet object - packet_class.new( - :duplicate => ((byte & 0x08) >> 3) == 0x01, - :qos => ((byte & 0x06) >> 1), - :retain => ((byte & 0x01) >> 0) == 0x01 - ) + packet_class.new(:flags => flags) end # Create a new empty packet def initialize(args={}) + # We must set flags before the other values + @flags = [false, false, false, false] update_attributes(ATTR_DEFAULTS.merge(args)) end # Set packet attributes from a hash of attribute names and values def update_attributes(attr={}) attr.each_pair do |k,v| - send("#{k}=", v) + if v.is_a?(Array) or v.is_a?(Hash) + send("#{k}=", v.dup) + else + send("#{k}=", v) + end end end # Get the identifer for this packet type def type_id @@ -130,36 +136,23 @@ raise "Invalid packet type: #{self.class}" end return index end - # Set the dup flag (true/false) - def duplicate=(arg) - if arg.kind_of?(Integer) - @duplicate = (arg != 0) - else - @duplicate = arg - end + # Get the name of the packet type as a string in capitals + # (like the MQTT specification uses) + # + # Example: CONNACK + def type_name + self.class.name.split('::').last.upcase end - # Set the retain flag (true/false) - def retain=(arg) - if arg.kind_of?(Integer) - @retain = (arg != 0) - else - @retain = arg - end + # Set the protocol version number + def version=(arg) + @version = arg.to_s end - # Set the Quality of Service level (0/1/2) - def qos=(arg) - @qos = arg.to_i - if @qos < 0 or @qos > 2 - raise "Invalid QoS value: #{@qos}" - end - end - # Set the length of the packet body def body_length=(arg) @body_length = arg.to_i end @@ -181,13 +174,14 @@ # Serialise the packet def to_s # Encode the fixed header header = [ ((type_id.to_i & 0x0F) << 4) | - ((duplicate ? 0x1 : 0x0) << 3) | - ((qos.to_i & 0x03) << 1) | - (retain ? 0x1 : 0x0) + (flags[3] ? 0x8 : 0x0) | + (flags[2] ? 0x4 : 0x0) | + (flags[1] ? 0x2 : 0x0) | + (flags[0] ? 0x1 : 0x0) ] # Get the packet's variable header and payload body = self.encode_body @@ -208,10 +202,18 @@ # Convert header to binary and add on body header.pack('C*') + body end + # Check that fixed header flags are valid for types that don't use the flags + # @private + def validate_flags + if flags != [false, false, false, false] + raise ProtocolException.new("Invalid flags in #{type_name} packet header") + end + end + # Returns a human readable string def inspect "\#<#{self.class}>" end @@ -220,10 +222,15 @@ # Encode an array of bytes and return them def encode_bytes(*bytes) bytes.pack('C*') end + # Encode an array of bits and return them + def encode_bits(bits) + [bits.map{|b| b ? '1' : '0'}.join].pack('b*') + end + # Encode a 16-bit unsigned integer and return it def encode_short(val) [val.to_i].pack('n') end @@ -246,10 +253,15 @@ # Remove one byte from the front of the string def shift_byte(buffer) buffer.slice!(0...1).unpack('C').first end + # Remove 8 bits from the front of buffer + def shift_bits(buffer) + buffer.slice!(0...1).unpack('b8').first.split('').map {|b| b == '1'} + end + # Remove n bytes from the front of buffer def shift_data(buffer,bytes) buffer.slice!(0...bytes) end @@ -278,113 +290,169 @@ ## PACKET SUBCLASSES ## # Class representing an MQTT Publish message class Publish < MQTT::Packet + + # Duplicate delivery flag + attr_accessor :duplicate + + # Retain flag + attr_accessor :retain + + # Quality of Service level (0, 1, 2) + attr_accessor :qos + # The topic name to publish to attr_accessor :topic - - # Identifier for an individual publishing flow - # Only required in PUBLISH Packets where the QoS level is 1 or 2 - attr_accessor :message_id - + # The data to be published attr_accessor :payload # Default attribute values ATTR_DEFAULTS = { - :topic => nil, - :message_id => 0, - :payload => '' + :topic => nil, + :payload => '' } # Create a new Publish packet def initialize(args={}) super(ATTR_DEFAULTS.merge(args)) end + def duplicate + @flags[3] + end + + # Set the DUP flag (true/false) + def duplicate=(arg) + if arg.kind_of?(Integer) + @flags[3] = (arg == 0x1) + else + @flags[3] = arg + end + end + + def retain + @flags[0] + end + + # Set the retain flag (true/false) + def retain=(arg) + if arg.kind_of?(Integer) + @flags[0] = (arg == 0x1) + else + @flags[0] = arg + end + end + + def qos + (@flags[1] ? 0x01 : 0x00) | (@flags[2] ? 0x02 : 0x00) + end + + # Set the Quality of Service level (0/1/2) + def qos=(arg) + @qos = arg.to_i + if @qos < 0 or @qos > 2 + raise "Invalid QoS value: #{@qos}" + else + @flags[1] = (arg & 0x01 == 0x01) + @flags[2] = (arg & 0x02 == 0x02) + end + end + # Get serialisation of packet's body def encode_body body = '' if @topic.nil? or @topic.to_s.empty? raise "Invalid topic name when serialising packet" end body += encode_string(@topic) - body += encode_short(@message_id) unless qos == 0 + body += encode_short(@id) unless qos == 0 body += payload.to_s.force_encoding('ASCII-8BIT') return body end # Parse the body (variable header and payload) of a Publish packet def parse_body(buffer) super(buffer) @topic = shift_string(buffer) - @message_id = shift_short(buffer) unless qos == 0 + @id = shift_short(buffer) unless qos == 0 @payload = buffer end + # Check that fixed header flags are valid for this packet type + # @private + def validate_flags + if qos == 3 + raise ProtocolException.new("Invalid packet: QoS value of 3 is not allowed") + end + if qos == 0 and duplicate + raise ProtocolException.new("Invalid packet: DUP cannot be set for QoS 0") + end + end + # Returns a human readable string, summarising the properties of the packet def inspect "\#<#{self.class}: " + "d#{duplicate ? '1' : '0'}, " + "q#{qos}, " + "r#{retain ? '1' : '0'}, " + - "m#{message_id}, " + + "m#{id}, " + "'#{topic}', " + "#{inspect_payload}>" end protected + def inspect_payload str = payload.to_s - if str.bytesize < 16 + if str.bytesize < 16 and str =~ /^[ -~]*$/ "'#{str}'" else "... (#{str.bytesize} bytes)" end end end # Class representing an MQTT Connect Packet class Connect < MQTT::Packet - # The name of the protocol (defaults to MQIsdp) + # The name of the protocol attr_accessor :protocol_name - # The version number of the protocol (defaults to 3) - attr_accessor :protocol_version + # The version number of the protocol + attr_accessor :protocol_level # The client identifier string attr_accessor :client_id - - # Set to false to keep a persistent session with the broker + + # Set to false to keep a persistent session with the server attr_accessor :clean_session - # Period the broker should keep connection open for between pings + # Period the server should keep connection open for between pings attr_accessor :keep_alive - + # The topic name to send the Will message to attr_accessor :will_topic - + # The QoS level to send the Will message as attr_accessor :will_qos - + # Set to true to make the Will message retained attr_accessor :will_retain - + # The payload of the Will message attr_accessor :will_payload - - # The username for authenticating with the broker + + # The username for authenticating with the server attr_accessor :username - - # The password for authenticating with the broker + + # The password for authenticating with the server attr_accessor :password # Default attribute values ATTR_DEFAULTS = { - :protocol_name => 'MQIsdp', - :protocol_version => 0x03, :client_id => nil, :clean_session => true, :keep_alive => 15, :will_topic => nil, :will_qos => 0, @@ -395,20 +463,34 @@ } # Create a new Client Connect packet def initialize(args={}) super(ATTR_DEFAULTS.merge(args)) + + if version == '3.1.0' or version == '3.1' + self.protocol_name ||= 'MQIsdp' + self.protocol_level ||= 0x03 + elsif version == '3.1.1' + self.protocol_name ||= 'MQTT' + self.protocol_level ||= 0x04 + else + raise ArgumentError.new("Unsupported protocol version: #{version}") + end end # Get serialisation of packet's body def encode_body body = '' - if @client_id.nil? or @client_id.bytesize < 1 or @client_id.bytesize > 23 - raise "Invalid client identifier when serialising packet" + if @version == '3.1.0' + if @client_id.nil? or @client_id.bytesize < 1 + raise "Client identifier too short while serialising packet" + elsif @client_id.bytesize > 23 + raise "Client identifier too long when serialising packet" + end end body += encode_string(@protocol_name) - body += encode_bytes(@protocol_version.to_i) + body += encode_bytes(@protocol_level.to_i) if @keep_alive < 0 raise "Invalid keep-alive value: cannot be less than 0" end @@ -436,24 +518,21 @@ # Parse the body (variable header and payload) of a Connect packet def parse_body(buffer) super(buffer) @protocol_name = shift_string(buffer) - @protocol_version = shift_byte(buffer).to_i - - if @protocol_name != 'MQIsdp' + @protocol_level = shift_byte(buffer).to_i + if @protocol_name == 'MQIsdp' and @protocol_level == 3 + @version = '3.1.0' + elsif @protocol_name == 'MQTT' and @protocol_level == 4 + @version = '3.1.1' + else raise ProtocolException.new( - "Unsupported protocol name: #{@protocol_name}" + "Unsupported protocol: #{@protocol_name}/#{@protocol_level}" ) end - if @protocol_version != 3 - raise ProtocolException.new( - "Unsupported protocol version: #{@protocol_version}" - ) - end - @connect_flags = shift_byte(buffer) @clean_session = ((@connect_flags & 0x02) >> 1) == 0x01 @keep_alive = shift_short(buffer) @client_id = shift_string(buffer) if ((@connect_flags & 0x04) >> 2) == 0x01 @@ -480,36 +559,68 @@ str += ", client_id='#{client_id}'" str += ", username='#{username}'" unless username.nil? str += ", password=..." unless password.nil? str += ">" end + + # ---- Deprecated attributes and methods ---- # + public + + # @deprecated Please use {#protocol_level} instead + def protocol_version + protocol_level + end + + # @deprecated Please use {#protocol_level=} instead + def protocol_version=(args) + self.protocol_level = args + end end # Class representing an MQTT Connect Acknowledgment Packet class Connack < MQTT::Packet + # Session Present flag + attr_accessor :session_present + # The return code (defaults to 0 for connection accepted) attr_accessor :return_code # Default attribute values ATTR_DEFAULTS = {:return_code => 0x00} # Create a new Client Connect packet def initialize(args={}) + # We must set flags before other attributes + @connack_flags = [false, false, false, false, false, false, false, false] super(ATTR_DEFAULTS.merge(args)) end + # Get the Session Present flag + def session_present + @connack_flags[0] + end + + # Set the Session Present flag + def session_present=(arg) + if arg.kind_of?(Integer) + @connack_flags[0] = (arg == 0x1) + else + @connack_flags[0] = arg + end + end + # Get a string message corresponding to a return code def return_msg case return_code when 0x00 "Connection Accepted" when 0x01 "Connection refused: unacceptable protocol version" when 0x02 "Connection refused: client identifier rejected" when 0x03 - "Connection refused: broker unavailable" + "Connection refused: server unavailable" when 0x04 "Connection refused: bad user name or password" when 0x05 "Connection refused: not authorised" else @@ -518,19 +629,22 @@ end # Get serialisation of packet's body def encode_body body = '' - body += encode_bytes(0) # Unused - body += encode_bytes(@return_code.to_i) # Return Code + body += encode_bits(@connack_flags) + body += encode_bytes(@return_code.to_i) return body end # Parse the body (variable header and payload) of a Connect Acknowledgment packet def parse_body(buffer) super(buffer) - _unused = shift_byte(buffer) + @connack_flags = shift_bits(buffer) + unless @connack_flags[1,7] == [false, false, false, false, false, false, false] + raise ProtocolException.new("Invalid flags in Connack variable header") + end @return_code = shift_byte(buffer) unless buffer.empty? raise ProtocolException.new("Extra bytes at end of Connect Acknowledgment packet") end end @@ -541,159 +655,132 @@ end end # Class representing an MQTT Publish Acknowledgment packet class Puback < MQTT::Packet - # Identifier for an individual publishing flow - attr_accessor :message_id - - # Default attribute values - ATTR_DEFAULTS = {:message_id => 0} - - # Create a new Publish Acknowledgment packet - def initialize(args={}) - super(ATTR_DEFAULTS.merge(args)) - end - # Get serialisation of packet's body def encode_body - encode_short(@message_id) + encode_short(@id) end # Parse the body (variable header and payload) of a packet def parse_body(buffer) super(buffer) - @message_id = shift_short(buffer) + @id = shift_short(buffer) unless buffer.empty? raise ProtocolException.new("Extra bytes at end of Publish Acknowledgment packet") end end # Returns a human readable string, summarising the properties of the packet def inspect - "\#<#{self.class}: 0x%2.2X>" % message_id + "\#<#{self.class}: 0x%2.2X>" % id end end # Class representing an MQTT Publish Received packet class Pubrec < MQTT::Packet - # Identifier for an individual publishing flow - attr_accessor :message_id - - # Default attribute values - ATTR_DEFAULTS = {:message_id => 0} - - # Create a new Publish Recieved packet - def initialize(args={}) - super(ATTR_DEFAULTS.merge(args)) - end - # Get serialisation of packet's body def encode_body - encode_short(@message_id) + encode_short(@id) end # Parse the body (variable header and payload) of a packet def parse_body(buffer) super(buffer) - @message_id = shift_short(buffer) + @id = shift_short(buffer) unless buffer.empty? raise ProtocolException.new("Extra bytes at end of Publish Received packet") end end # Returns a human readable string, summarising the properties of the packet def inspect - "\#<#{self.class}: 0x%2.2X>" % message_id + "\#<#{self.class}: 0x%2.2X>" % id end end # Class representing an MQTT Publish Release packet class Pubrel < MQTT::Packet - # Identifier for an individual publishing flow - attr_accessor :message_id # Default attribute values - ATTR_DEFAULTS = {:message_id => 0} + ATTR_DEFAULTS = { + :flags => [false, true, false, false], + } - # Create a new Publish Release packet + # Create a new Pubrel packet def initialize(args={}) super(ATTR_DEFAULTS.merge(args)) end # Get serialisation of packet's body def encode_body - encode_short(@message_id) + encode_short(@id) end # Parse the body (variable header and payload) of a packet def parse_body(buffer) super(buffer) - @message_id = shift_short(buffer) + @id = shift_short(buffer) unless buffer.empty? raise ProtocolException.new("Extra bytes at end of Publish Release packet") end end + # Check that fixed header flags are valid for this packet type + # @private + def validate_flags + if @flags != [false, true, false, false] + raise ProtocolException.new("Invalid flags in PUBREL packet header") + end + end + # Returns a human readable string, summarising the properties of the packet def inspect - "\#<#{self.class}: 0x%2.2X>" % message_id + "\#<#{self.class}: 0x%2.2X>" % id end end # Class representing an MQTT Publish Complete packet class Pubcomp < MQTT::Packet - # Identifier for an individual publishing flow - attr_accessor :message_id - - # Default attribute values - ATTR_DEFAULTS = {:message_id => 0} - - # Create a new Publish Complete packet - def initialize(args={}) - super(ATTR_DEFAULTS.merge(args)) - end - # Get serialisation of packet's body def encode_body - encode_short(@message_id) + encode_short(@id) end # Parse the body (variable header and payload) of a packet def parse_body(buffer) super(buffer) - @message_id = shift_short(buffer) + @id = shift_short(buffer) unless buffer.empty? raise ProtocolException.new("Extra bytes at end of Publish Complete packet") end end # Returns a human readable string, summarising the properties of the packet def inspect - "\#<#{self.class}: 0x%2.2X>" % message_id + "\#<#{self.class}: 0x%2.2X>" % id end end # Class representing an MQTT Client Subscribe packet class Subscribe < MQTT::Packet - # Identifier for an individual publishing flow - attr_accessor :message_id - - # One or more topic names to subscribe to - attr_reader :topics + # One or more topic filters to subscribe to + attr_accessor :topics # Default attribute values - ATTR_DEFAULTS = {:message_id => 0} + ATTR_DEFAULTS = { + :topics => [], + :flags => [false, true, false, false], + } # Create a new Subscribe packet def initialize(args={}) super(ATTR_DEFAULTS.merge(args)) - @topics ||= [] - @qos = 1 # Force a QOS of 1 end - # Set one or more topics for the Subscrible packet + # Set one or more topic filters for the Subscribe packet # The topics parameter should be one of the following: # * String: subscribe to one topic with QOS 0 # * Array: subscribe to multiple topics with QOS 0 # * Hash: subscribe to multiple topics where the key is the topic and the value is the QOS level # @@ -736,112 +823,129 @@ # Get serialisation of packet's body def encode_body if @topics.empty? raise "no topics given when serialising packet" end - body = encode_short(@message_id) + body = encode_short(@id) topics.each do |item| body += encode_string(item[0]) body += encode_bytes(item[1]) end return body end # Parse the body (variable header and payload) of a packet def parse_body(buffer) super(buffer) - @message_id = shift_short(buffer) + @id = shift_short(buffer) @topics = [] while(buffer.bytesize>0) topic_name = shift_string(buffer) topic_qos = shift_byte(buffer) @topics << [topic_name,topic_qos] end end + # Check that fixed header flags are valid for this packet type + # @private + def validate_flags + if @flags != [false, true, false, false] + raise ProtocolException.new("Invalid flags in SUBSCRIBE packet header") + end + end + # Returns a human readable string, summarising the properties of the packet def inspect _str = "\#<#{self.class}: 0x%2.2X, %s>" % [ - message_id, + id, topics.map {|t| "'#{t[0]}':#{t[1]}"}.join(', ') ] end end # Class representing an MQTT Subscribe Acknowledgment packet class Suback < MQTT::Packet - # Identifier to tie the Subscribe request to the Suback response - attr_accessor :message_id + # An array of return codes, ordered by the topics that were subscribed to + attr_accessor :return_codes - # The QoS level that was granted for the subscribe request - attr_reader :granted_qos - # Default attribute values - ATTR_DEFAULTS = {:message_id => 0} + ATTR_DEFAULTS = { + :return_codes => [], + } # Create a new Subscribe Acknowledgment packet def initialize(args={}) super(ATTR_DEFAULTS.merge(args)) - @granted_qos ||= [] end # Set the granted QOS value for each of the topics that were subscribed to # Can either be an integer or an array or integers. - def granted_qos=(value) + def return_codes=(value) if value.is_a?(Array) - @granted_qos = value + @return_codes = value elsif value.is_a?(Integer) - @granted_qos = [value] + @return_codes = [value] else - raise "granted QOS should be an integer or an array of QOS levels" + raise "return_codes should be an integer or an array of return codes" end end # Get serialisation of packet's body def encode_body - if @granted_qos.empty? + if @return_codes.empty? raise "no granted QOS given when serialising packet" end - body = encode_short(@message_id) - granted_qos.each { |qos| body += encode_bytes(qos) } + body = encode_short(@id) + return_codes.each { |qos| body += encode_bytes(qos) } return body end # Parse the body (variable header and payload) of a packet def parse_body(buffer) super(buffer) - @message_id = shift_short(buffer) + @id = shift_short(buffer) while(buffer.bytesize>0) - @granted_qos << shift_byte(buffer) + @return_codes << shift_byte(buffer) end end # Returns a human readable string, summarising the properties of the packet def inspect - "\#<#{self.class}: 0x%2.2X, qos=%s>" % [message_id, granted_qos.join(',')] + "\#<#{self.class}: 0x%2.2X, rc=%s>" % [id, return_codes.map{|rc| "0x%2.2X" % rc}.join(',')] end + + # ---- Deprecated attributes and methods ---- # + public + + # @deprecated Please use {#return_codes} instead + def granted_qos + return_codes + end + + # @deprecated Please use {#return_codes=} instead + def granted_qos=(args) + self.return_codes = args + end end # Class representing an MQTT Client Unsubscribe packet class Unsubscribe < MQTT::Packet - # One or more topics to unsubscribe from - attr_reader :topics + # One or more topic paths to unsubscribe from + attr_accessor :topics - # Identifier to tie the Unsubscribe request to the Unsuback response - attr_accessor :message_id - # Default attribute values - ATTR_DEFAULTS = {:message_id => 0} + ATTR_DEFAULTS = { + :topics => [], + :flags => [false, true, false, false], + } # Create a new Unsubscribe packet def initialize(args={}) super(ATTR_DEFAULTS.merge(args)) - @topics ||= [] - @qos = 1 # Force a QOS of 1 end - # Set one or more topics to unsubscribe from + # Set one or more topic paths to unsubscribe from def topics=(value) if value.is_a?(Array) @topics = value else @topics = [value] @@ -851,63 +955,65 @@ # Get serialisation of packet's body def encode_body if @topics.empty? raise "no topics given when serialising packet" end - body = encode_short(@message_id) + body = encode_short(@id) topics.each { |topic| body += encode_string(topic) } return body end # Parse the body (variable header and payload) of a packet def parse_body(buffer) super(buffer) - @message_id = shift_short(buffer) + @id = shift_short(buffer) while(buffer.bytesize>0) @topics << shift_string(buffer) end end + # Check that fixed header flags are valid for this packet type + # @private + def validate_flags + if @flags != [false, true, false, false] + raise ProtocolException.new("Invalid flags in UNSUBSCRIBE packet header") + end + end + # Returns a human readable string, summarising the properties of the packet def inspect "\#<#{self.class}: 0x%2.2X, %s>" % [ - message_id, + id, topics.map {|t| "'#{t}'"}.join(', ') ] end end # Class representing an MQTT Unsubscribe Acknowledgment packet class Unsuback < MQTT::Packet - # Identifier to tie the Unsubscribe request to the Unsuback response - attr_accessor :message_id - - # Default attribute values - ATTR_DEFAULTS = {:message_id => 0} - # Create a new Unsubscribe Acknowledgment packet def initialize(args={}) - super(ATTR_DEFAULTS.merge(args)) + super(args) end # Get serialisation of packet's body def encode_body - encode_short(@message_id) + encode_short(@id) end # Parse the body (variable header and payload) of a packet def parse_body(buffer) super(buffer) - @message_id = shift_short(buffer) + @id = shift_short(buffer) unless buffer.empty? raise ProtocolException.new("Extra bytes at end of Unsubscribe Acknowledgment packet") end end # Returns a human readable string, summarising the properties of the packet def inspect - "\#<#{self.class}: 0x%2.2X>" % message_id + "\#<#{self.class}: 0x%2.2X>" % id end end # Class representing an MQTT Ping Request packet class Pingreq < MQTT::Packet @@ -953,9 +1059,23 @@ super(buffer) unless buffer.empty? raise ProtocolException.new("Extra bytes at end of Disconnect packet") end end + end + + + # ---- Deprecated attributes and methods ---- # + public + + # @deprecated Please use {#id} instead + def message_id + id + end + + # @deprecated Please use {#id=} instead + def message_id=(args) + self.id = args end end # An enumeration of the MQTT packet types