# encoding: BINARY module MQTT # Class representing a MQTT Packet # Performs binary encoding and decoding of headers class MQTT::Packet # The version number of the MQTT protocol to use (default 3.1.0) attr_accessor :version # Identifier to link related control packets together attr_accessor :id # Array of 4 bits in the fixed header attr_accessor :flags # The length of the parsed packet body attr_reader :body_length # Default attribute values ATTR_DEFAULTS = { :version => '3.1.0', :id => 0, :body_length => nil } # Read in a packet from a socket def self.read(socket) # Read in the packet header and create a new packet object packet = create_from_header( read_byte(socket) ) packet.validate_flags # Read in the packet length multiplier = 1 body_length = 0 pos = 1 begin digit = read_byte(socket) body_length += ((digit & 0x7F) * multiplier) multiplier *= 0x80 pos += 1 end while ((digit & 0x80) != 0x00) and pos <= 4 # Store the expected body length in the packet packet.instance_variable_set('@body_length', body_length) # Read in the packet body packet.parse_body( socket.read(body_length) ) return packet end # Parse buffer into new packet object def self.parse(buffer) packet = parse_header(buffer) packet.parse_body(buffer) return packet end # Parse the header and create a new packet object of the correct type # The header is removed from the buffer passed into this function def self.parse_header(buffer) # Check that the packet is a long as the minimum packet size if buffer.bytesize < 2 raise ProtocolException.new("Invalid packet: less than 2 bytes long") end # Create a new packet object bytes = buffer.unpack("C5") packet = create_from_header(bytes.first) packet.validate_flags # Parse the packet length body_length = 0 multiplier = 1 pos = 1 begin if buffer.bytesize <= pos raise ProtocolException.new("The packet length header is incomplete") end digit = bytes[pos] body_length += ((digit & 0x7F) * multiplier) multiplier *= 0x80 pos += 1 end while ((digit & 0x80) != 0x00) and pos <= 4 # Store the expected body length in the packet packet.instance_variable_set('@body_length', body_length) # Delete the fixed header from the raw packet passed in buffer.slice!(0...pos) return packet end # Create a new packet object from the first byte of a MQTT packet def self.create_from_header(byte) # Work out the class type_id = ((byte & 0xF0) >> 4) packet_class = MQTT::PACKET_TYPES[type_id] if packet_class.nil? raise ProtocolException.new("Invalid packet type identifier: #{type_id}") end # Convert the last 4 bits of byte into array of true/false flags = (0..3).map { |i| byte & (2 ** i) != 0 } # Create a new packet object packet_class.new(:flags => flags) end # Create a new empty packet def initialize(args={}) # We must set flags before the other values @flags = [false, false, false, false] update_attributes(ATTR_DEFAULTS.merge(args)) end # Set packet attributes from a hash of attribute names and values def update_attributes(attr={}) attr.each_pair do |k,v| if v.is_a?(Array) or v.is_a?(Hash) send("#{k}=", v.dup) else send("#{k}=", v) end end end # Get the identifer for this packet type def type_id index = MQTT::PACKET_TYPES.index(self.class) if index.nil? raise "Invalid packet type: #{self.class}" end return index end # Get the name of the packet type as a string in capitals # (like the MQTT specification uses) # # Example: CONNACK def type_name self.class.name.split('::').last.upcase end # Set the protocol version number def version=(arg) @version = arg.to_s end # Set the length of the packet body def body_length=(arg) @body_length = arg.to_i end # Parse the body (variable header and payload) of a packet def parse_body(buffer) if buffer.bytesize != body_length raise ProtocolException.new( "Failed to parse packet - input buffer (#{buffer.bytesize}) is not the same as the body length header (#{body_length})" ) end end # Get serialisation of packet's body (variable header and payload) def encode_body '' # No body by default end # Serialise the packet def to_s # Encode the fixed header header = [ ((type_id.to_i & 0x0F) << 4) | (flags[3] ? 0x8 : 0x0) | (flags[2] ? 0x4 : 0x0) | (flags[1] ? 0x2 : 0x0) | (flags[0] ? 0x1 : 0x0) ] # Get the packet's variable header and payload body = self.encode_body # Check that that packet isn't too big body_length = body.bytesize if body_length > 268435455 raise "Error serialising packet: body is more than 256MB" end # Build up the body length field bytes begin digit = (body_length % 128) body_length = (body_length / 128) # if there are more digits to encode, set the top bit of this digit digit |= 0x80 if (body_length > 0) header.push(digit) end while (body_length > 0) # Convert header to binary and add on body header.pack('C*') + body end # Check that fixed header flags are valid for types that don't use the flags # @private def validate_flags if flags != [false, false, false, false] raise ProtocolException.new("Invalid flags in #{type_name} packet header") end end # Returns a human readable string def inspect "\#<#{self.class}>" end protected # Encode an array of bytes and return them def encode_bytes(*bytes) bytes.pack('C*') end # Encode an array of bits and return them def encode_bits(bits) [bits.map{|b| b ? '1' : '0'}.join].pack('b*') end # Encode a 16-bit unsigned integer and return it def encode_short(val) [val.to_i].pack('n') end # Encode a UTF-8 string and return it # (preceded by the length of the string) def encode_string(str) str = str.to_s.encode('UTF-8') # Force to binary, when assembling the packet str.force_encoding('ASCII-8BIT') encode_short(str.bytesize) + str end # Remove a 16-bit unsigned integer from the front of buffer def shift_short(buffer) bytes = buffer.slice!(0..1) bytes.unpack('n').first end # Remove one byte from the front of the string def shift_byte(buffer) buffer.slice!(0...1).unpack('C').first end # Remove 8 bits from the front of buffer def shift_bits(buffer) buffer.slice!(0...1).unpack('b8').first.split('').map {|b| b == '1'} end # Remove n bytes from the front of buffer def shift_data(buffer,bytes) buffer.slice!(0...bytes) end # Remove string from the front of buffer def shift_string(buffer) len = shift_short(buffer) str = shift_data(buffer,len) # Strings in MQTT v3.1 are all UTF-8 str.force_encoding('UTF-8') end private # Read and unpack a single byte from a socket def self.read_byte(socket) byte = socket.read(1) if byte.nil? raise ProtocolException.new("Failed to read byte from socket") end byte.unpack('C').first end ## PACKET SUBCLASSES ## # Class representing an MQTT Publish message class Publish < MQTT::Packet # Duplicate delivery flag attr_accessor :duplicate # Retain flag attr_accessor :retain # Quality of Service level (0, 1, 2) attr_accessor :qos # The topic name to publish to attr_accessor :topic # The data to be published attr_accessor :payload # Default attribute values ATTR_DEFAULTS = { :topic => nil, :payload => '' } # Create a new Publish packet def initialize(args={}) super(ATTR_DEFAULTS.merge(args)) end def duplicate @flags[3] end # Set the DUP flag (true/false) def duplicate=(arg) if arg.kind_of?(Integer) @flags[3] = (arg == 0x1) else @flags[3] = arg end end def retain @flags[0] end # Set the retain flag (true/false) def retain=(arg) if arg.kind_of?(Integer) @flags[0] = (arg == 0x1) else @flags[0] = arg end end def qos (@flags[1] ? 0x01 : 0x00) | (@flags[2] ? 0x02 : 0x00) end # Set the Quality of Service level (0/1/2) def qos=(arg) @qos = arg.to_i if @qos < 0 or @qos > 2 raise "Invalid QoS value: #{@qos}" else @flags[1] = (arg & 0x01 == 0x01) @flags[2] = (arg & 0x02 == 0x02) end end # Get serialisation of packet's body def encode_body body = '' if @topic.nil? or @topic.to_s.empty? raise "Invalid topic name when serialising packet" end body += encode_string(@topic) body += encode_short(@id) unless qos == 0 body += payload.to_s.dup.force_encoding('ASCII-8BIT') return body end # Parse the body (variable header and payload) of a Publish packet def parse_body(buffer) super(buffer) @topic = shift_string(buffer) @id = shift_short(buffer) unless qos == 0 @payload = buffer end # Check that fixed header flags are valid for this packet type # @private def validate_flags if qos == 3 raise ProtocolException.new("Invalid packet: QoS value of 3 is not allowed") end if qos == 0 and duplicate raise ProtocolException.new("Invalid packet: DUP cannot be set for QoS 0") end end # Returns a human readable string, summarising the properties of the packet def inspect "\#<#{self.class}: " + "d#{duplicate ? '1' : '0'}, " + "q#{qos}, " + "r#{retain ? '1' : '0'}, " + "m#{id}, " + "'#{topic}', " + "#{inspect_payload}>" end protected def inspect_payload str = payload.to_s if str.bytesize < 16 and str =~ /^[ -~]*$/ "'#{str}'" else "... (#{str.bytesize} bytes)" end end end # Class representing an MQTT Connect Packet class Connect < MQTT::Packet # The name of the protocol attr_accessor :protocol_name # The version number of the protocol attr_accessor :protocol_level # The client identifier string attr_accessor :client_id # Set to false to keep a persistent session with the server attr_accessor :clean_session # Period the server should keep connection open for between pings attr_accessor :keep_alive # The topic name to send the Will message to attr_accessor :will_topic # The QoS level to send the Will message as attr_accessor :will_qos # Set to true to make the Will message retained attr_accessor :will_retain # The payload of the Will message attr_accessor :will_payload # The username for authenticating with the server attr_accessor :username # The password for authenticating with the server attr_accessor :password # Default attribute values ATTR_DEFAULTS = { :client_id => nil, :clean_session => true, :keep_alive => 15, :will_topic => nil, :will_qos => 0, :will_retain => false, :will_payload => '', :username => nil, :password => nil, } # Create a new Client Connect packet def initialize(args={}) super(ATTR_DEFAULTS.merge(args)) if version == '3.1.0' or version == '3.1' self.protocol_name ||= 'MQIsdp' self.protocol_level ||= 0x03 elsif version == '3.1.1' self.protocol_name ||= 'MQTT' self.protocol_level ||= 0x04 else raise ArgumentError.new("Unsupported protocol version: #{version}") end end # Get serialisation of packet's body def encode_body body = '' if @version == '3.1.0' if @client_id.nil? or @client_id.bytesize < 1 raise "Client identifier too short while serialising packet" elsif @client_id.bytesize > 23 raise "Client identifier too long when serialising packet" end end body += encode_string(@protocol_name) body += encode_bytes(@protocol_level.to_i) if @keep_alive < 0 raise "Invalid keep-alive value: cannot be less than 0" end # Set the Connect flags @connect_flags = 0 @connect_flags |= 0x02 if @clean_session @connect_flags |= 0x04 unless @will_topic.nil? @connect_flags |= ((@will_qos & 0x03) << 3) @connect_flags |= 0x20 if @will_retain @connect_flags |= 0x40 unless @password.nil? @connect_flags |= 0x80 unless @username.nil? body += encode_bytes(@connect_flags) body += encode_short(@keep_alive) body += encode_string(@client_id) unless will_topic.nil? body += encode_string(@will_topic) # The MQTT v3.1 specification says that the payload is a UTF-8 string body += encode_string(@will_payload) end body += encode_string(@username) unless @username.nil? body += encode_string(@password) unless @password.nil? return body end # Parse the body (variable header and payload) of a Connect packet def parse_body(buffer) super(buffer) @protocol_name = shift_string(buffer) @protocol_level = shift_byte(buffer).to_i if @protocol_name == 'MQIsdp' and @protocol_level == 3 @version = '3.1.0' elsif @protocol_name == 'MQTT' and @protocol_level == 4 @version = '3.1.1' else raise ProtocolException.new( "Unsupported protocol: #{@protocol_name}/#{@protocol_level}" ) end @connect_flags = shift_byte(buffer) @clean_session = ((@connect_flags & 0x02) >> 1) == 0x01 @keep_alive = shift_short(buffer) @client_id = shift_string(buffer) if ((@connect_flags & 0x04) >> 2) == 0x01 # Last Will and Testament @will_qos = ((@connect_flags & 0x18) >> 3) @will_retain = ((@connect_flags & 0x20) >> 5) == 0x01 @will_topic = shift_string(buffer) # The MQTT v3.1 specification says that the payload is a UTF-8 string @will_payload = shift_string(buffer) end if ((@connect_flags & 0x80) >> 7) == 0x01 and buffer.bytesize > 0 @username = shift_string(buffer) end if ((@connect_flags & 0x40) >> 6) == 0x01 and buffer.bytesize > 0 @password = shift_string(buffer) end end # Returns a human readable string, summarising the properties of the packet def inspect str = "\#<#{self.class}: " str += "keep_alive=#{keep_alive}" str += ", clean" if clean_session str += ", client_id='#{client_id}'" str += ", username='#{username}'" unless username.nil? str += ", password=..." unless password.nil? str += ">" end # ---- Deprecated attributes and methods ---- # public # @deprecated Please use {#protocol_level} instead def protocol_version protocol_level end # @deprecated Please use {#protocol_level=} instead def protocol_version=(args) self.protocol_level = args end end # Class representing an MQTT Connect Acknowledgment Packet class Connack < MQTT::Packet # Session Present flag attr_accessor :session_present # The return code (defaults to 0 for connection accepted) attr_accessor :return_code # Default attribute values ATTR_DEFAULTS = {:return_code => 0x00} # Create a new Client Connect packet def initialize(args={}) # We must set flags before other attributes @connack_flags = [false, false, false, false, false, false, false, false] super(ATTR_DEFAULTS.merge(args)) end # Get the Session Present flag def session_present @connack_flags[0] end # Set the Session Present flag def session_present=(arg) if arg.kind_of?(Integer) @connack_flags[0] = (arg == 0x1) else @connack_flags[0] = arg end end # Get a string message corresponding to a return code def return_msg case return_code when 0x00 "Connection Accepted" when 0x01 "Connection refused: unacceptable protocol version" when 0x02 "Connection refused: client identifier rejected" when 0x03 "Connection refused: server unavailable" when 0x04 "Connection refused: bad user name or password" when 0x05 "Connection refused: not authorised" else "Connection refused: error code #{return_code}" end end # Get serialisation of packet's body def encode_body body = '' body += encode_bits(@connack_flags) body += encode_bytes(@return_code.to_i) return body end # Parse the body (variable header and payload) of a Connect Acknowledgment packet def parse_body(buffer) super(buffer) @connack_flags = shift_bits(buffer) unless @connack_flags[1,7] == [false, false, false, false, false, false, false] raise ProtocolException.new("Invalid flags in Connack variable header") end @return_code = shift_byte(buffer) unless buffer.empty? raise ProtocolException.new("Extra bytes at end of Connect Acknowledgment packet") end end # Returns a human readable string, summarising the properties of the packet def inspect "\#<#{self.class}: 0x%2.2X>" % return_code end end # Class representing an MQTT Publish Acknowledgment packet class Puback < MQTT::Packet # Get serialisation of packet's body def encode_body encode_short(@id) end # Parse the body (variable header and payload) of a packet def parse_body(buffer) super(buffer) @id = shift_short(buffer) unless buffer.empty? raise ProtocolException.new("Extra bytes at end of Publish Acknowledgment packet") end end # Returns a human readable string, summarising the properties of the packet def inspect "\#<#{self.class}: 0x%2.2X>" % id end end # Class representing an MQTT Publish Received packet class Pubrec < MQTT::Packet # Get serialisation of packet's body def encode_body encode_short(@id) end # Parse the body (variable header and payload) of a packet def parse_body(buffer) super(buffer) @id = shift_short(buffer) unless buffer.empty? raise ProtocolException.new("Extra bytes at end of Publish Received packet") end end # Returns a human readable string, summarising the properties of the packet def inspect "\#<#{self.class}: 0x%2.2X>" % id end end # Class representing an MQTT Publish Release packet class Pubrel < MQTT::Packet # Default attribute values ATTR_DEFAULTS = { :flags => [false, true, false, false], } # Create a new Pubrel packet def initialize(args={}) super(ATTR_DEFAULTS.merge(args)) end # Get serialisation of packet's body def encode_body encode_short(@id) end # Parse the body (variable header and payload) of a packet def parse_body(buffer) super(buffer) @id = shift_short(buffer) unless buffer.empty? raise ProtocolException.new("Extra bytes at end of Publish Release packet") end end # Check that fixed header flags are valid for this packet type # @private def validate_flags if @flags != [false, true, false, false] raise ProtocolException.new("Invalid flags in PUBREL packet header") end end # Returns a human readable string, summarising the properties of the packet def inspect "\#<#{self.class}: 0x%2.2X>" % id end end # Class representing an MQTT Publish Complete packet class Pubcomp < MQTT::Packet # Get serialisation of packet's body def encode_body encode_short(@id) end # Parse the body (variable header and payload) of a packet def parse_body(buffer) super(buffer) @id = shift_short(buffer) unless buffer.empty? raise ProtocolException.new("Extra bytes at end of Publish Complete packet") end end # Returns a human readable string, summarising the properties of the packet def inspect "\#<#{self.class}: 0x%2.2X>" % id end end # Class representing an MQTT Client Subscribe packet class Subscribe < MQTT::Packet # One or more topic filters to subscribe to attr_accessor :topics # Default attribute values ATTR_DEFAULTS = { :topics => [], :flags => [false, true, false, false], } # Create a new Subscribe packet def initialize(args={}) super(ATTR_DEFAULTS.merge(args)) end # Set one or more topic filters for the Subscribe packet # The topics parameter should be one of the following: # * String: subscribe to one topic with QoS 0 # * Array: subscribe to multiple topics with QoS 0 # * Hash: subscribe to multiple topics where the key is the topic and the value is the QoS level # # For example: # packet.topics = 'a/b' # packet.topics = ['a/b', 'c/d'] # packet.topics = [['a/b',0], ['c/d',1]] # packet.topics = {'a/b' => 0, 'c/d' => 1} # def topics=(value) # Get input into a consistent state if value.is_a?(Array) input = value.flatten else input = [value] end @topics = [] while(input.length>0) item = input.shift if item.is_a?(Hash) # Convert hash into an ordered array of arrays @topics += item.sort elsif item.is_a?(String) # Peek at the next item in the array, and remove it if it is an integer if input.first.is_a?(Integer) qos = input.shift @topics << [item,qos] else @topics << [item,0] end else # Meh? raise "Invalid topics input: #{value.inspect}" end end @topics end # Get serialisation of packet's body def encode_body if @topics.empty? raise "no topics given when serialising packet" end body = encode_short(@id) topics.each do |item| body += encode_string(item[0]) body += encode_bytes(item[1]) end return body end # Parse the body (variable header and payload) of a packet def parse_body(buffer) super(buffer) @id = shift_short(buffer) @topics = [] while(buffer.bytesize>0) topic_name = shift_string(buffer) topic_qos = shift_byte(buffer) @topics << [topic_name,topic_qos] end end # Check that fixed header flags are valid for this packet type # @private def validate_flags if @flags != [false, true, false, false] raise ProtocolException.new("Invalid flags in SUBSCRIBE packet header") end end # Returns a human readable string, summarising the properties of the packet def inspect _str = "\#<#{self.class}: 0x%2.2X, %s>" % [ id, topics.map {|t| "'#{t[0]}':#{t[1]}"}.join(', ') ] end end # Class representing an MQTT Subscribe Acknowledgment packet class Suback < MQTT::Packet # An array of return codes, ordered by the topics that were subscribed to attr_accessor :return_codes # Default attribute values ATTR_DEFAULTS = { :return_codes => [], } # Create a new Subscribe Acknowledgment packet def initialize(args={}) super(ATTR_DEFAULTS.merge(args)) end # Set the granted QoS value for each of the topics that were subscribed to # Can either be an integer or an array or integers. def return_codes=(value) if value.is_a?(Array) @return_codes = value elsif value.is_a?(Integer) @return_codes = [value] else raise "return_codes should be an integer or an array of return codes" end end # Get serialisation of packet's body def encode_body if @return_codes.empty? raise "no granted QoS given when serialising packet" end body = encode_short(@id) return_codes.each { |qos| body += encode_bytes(qos) } return body end # Parse the body (variable header and payload) of a packet def parse_body(buffer) super(buffer) @id = shift_short(buffer) while(buffer.bytesize>0) @return_codes << shift_byte(buffer) end end # Returns a human readable string, summarising the properties of the packet def inspect "\#<#{self.class}: 0x%2.2X, rc=%s>" % [id, return_codes.map{|rc| "0x%2.2X" % rc}.join(',')] end # ---- Deprecated attributes and methods ---- # public # @deprecated Please use {#return_codes} instead def granted_qos return_codes end # @deprecated Please use {#return_codes=} instead def granted_qos=(args) self.return_codes = args end end # Class representing an MQTT Client Unsubscribe packet class Unsubscribe < MQTT::Packet # One or more topic paths to unsubscribe from attr_accessor :topics # Default attribute values ATTR_DEFAULTS = { :topics => [], :flags => [false, true, false, false], } # Create a new Unsubscribe packet def initialize(args={}) super(ATTR_DEFAULTS.merge(args)) end # Set one or more topic paths to unsubscribe from def topics=(value) if value.is_a?(Array) @topics = value else @topics = [value] end end # Get serialisation of packet's body def encode_body if @topics.empty? raise "no topics given when serialising packet" end body = encode_short(@id) topics.each { |topic| body += encode_string(topic) } return body end # Parse the body (variable header and payload) of a packet def parse_body(buffer) super(buffer) @id = shift_short(buffer) while(buffer.bytesize>0) @topics << shift_string(buffer) end end # Check that fixed header flags are valid for this packet type # @private def validate_flags if @flags != [false, true, false, false] raise ProtocolException.new("Invalid flags in UNSUBSCRIBE packet header") end end # Returns a human readable string, summarising the properties of the packet def inspect "\#<#{self.class}: 0x%2.2X, %s>" % [ id, topics.map {|t| "'#{t}'"}.join(', ') ] end end # Class representing an MQTT Unsubscribe Acknowledgment packet class Unsuback < MQTT::Packet # Create a new Unsubscribe Acknowledgment packet def initialize(args={}) super(args) end # Get serialisation of packet's body def encode_body encode_short(@id) end # Parse the body (variable header and payload) of a packet def parse_body(buffer) super(buffer) @id = shift_short(buffer) unless buffer.empty? raise ProtocolException.new("Extra bytes at end of Unsubscribe Acknowledgment packet") end end # Returns a human readable string, summarising the properties of the packet def inspect "\#<#{self.class}: 0x%2.2X>" % id end end # Class representing an MQTT Ping Request packet class Pingreq < MQTT::Packet # Create a new Ping Request packet def initialize(args={}) super(args) end # Check the body def parse_body(buffer) super(buffer) unless buffer.empty? raise ProtocolException.new("Extra bytes at end of Ping Request packet") end end end # Class representing an MQTT Ping Response packet class Pingresp < MQTT::Packet # Create a new Ping Response packet def initialize(args={}) super(args) end # Check the body def parse_body(buffer) super(buffer) unless buffer.empty? raise ProtocolException.new("Extra bytes at end of Ping Response packet") end end end # Class representing an MQTT Client Disconnect packet class Disconnect < MQTT::Packet # Create a new Client Disconnect packet def initialize(args={}) super(args) end # Check the body def parse_body(buffer) super(buffer) unless buffer.empty? raise ProtocolException.new("Extra bytes at end of Disconnect packet") end end end # ---- Deprecated attributes and methods ---- # public # @deprecated Please use {#id} instead def message_id id end # @deprecated Please use {#id=} instead def message_id=(args) self.id = args end end # An enumeration of the MQTT packet types PACKET_TYPES = [ nil, MQTT::Packet::Connect, MQTT::Packet::Connack, MQTT::Packet::Publish, MQTT::Packet::Puback, MQTT::Packet::Pubrec, MQTT::Packet::Pubrel, MQTT::Packet::Pubcomp, MQTT::Packet::Subscribe, MQTT::Packet::Suback, MQTT::Packet::Unsubscribe, MQTT::Packet::Unsuback, MQTT::Packet::Pingreq, MQTT::Packet::Pingresp, MQTT::Packet::Disconnect, nil ] end