lib/mqtt/packet.rb in mqtt-0.0.4 vs lib/mqtt/packet.rb in mqtt-0.0.5
- old
+ new
@@ -1,128 +1,194 @@
-#!/usr/bin/env ruby
-
-require 'mqtt'
-
module MQTT
# Class representing a MQTT Packet
# Performs binary encoding and decoding of headers
- class Packet
- attr_reader :dup # Duplicate delivery flag
- attr_reader :retain # Retain flag
- attr_reader :qos # Quality of Service level
-
- # Read in a packet from a socket
+ class MQTT::Packet
+ attr_reader :dup # Duplicate delivery flag
+ attr_reader :retain # Retain flag
+ attr_reader :qos # Quality of Service level
+ attr_reader :body_length # The length of the parsed packet body
+
+ # Deprecate this: Read in a packet from a socket
def self.read(socket)
# Read in the packet header and work out the class
header = read_byte(socket)
type_id = ((header & 0xF0) >> 4)
packet_class = MQTT::PACKET_TYPES[type_id]
-
+
# Create a new packet object
packet = packet_class.new(
:dup => ((header & 0x08) >> 3),
:qos => ((header & 0x06) >> 1),
:retain => ((header & 0x01) >> 0)
)
-
+
# Read in the packet length
- multiplier = 1
- body_len = 0
+ multiplier = 1
+ body_length = 0
begin
digit = read_byte(socket)
- body_len += ((digit & 0x7F) * multiplier)
+ body_length += ((digit & 0x7F) * multiplier)
multiplier *= 0x80
end while ((digit & 0x80) != 0x00)
# FIXME: only allow 4 bytes?
+ # Store the expected body length in the packet
+ packet.instance_variable_set('@body_length', body_length)
+
# Read in the packet body
- packet.parse_body( socket.read(body_len) )
+ packet.parse_body( socket.read(body_length) )
return packet
end
+ # Parse buffer into new packet object
+ def self.parse(buffer)
+ packet = parse_header(buffer)
+ packet.parse_body(buffer)
+ return packet
+ end
+
+ # Parse the header and create a new packet object of the correct type
+ # The header is removed from the buffer passed into this function
+ def self.parse_header(buffer)
+ # Work out the class
+ type_id = ((buffer[0] & 0xF0) >> 4)
+ packet_class = MQTT::PACKET_TYPES[type_id]
+ if packet_class.nil?
+ raise ProtocolException.new("Invalid packet type identifier: #{type_id}")
+ end
+
+ # Create a new packet object
+ packet = packet_class.new(
+ :dup => ((buffer[0] & 0x08) >> 3) == 0x01,
+ :qos => ((buffer[0] & 0x06) >> 1),
+ :retain => ((buffer[0] & 0x01) >> 0) == 0x01
+ )
+
+ # Parse the packet length
+ body_length = 0
+ multiplier = 1
+ pos = 1
+ begin
+ if buffer.length <= pos
+ raise ProtocolException.new("The packet length header is incomplete")
+ end
+ digit = buffer[pos]
+ body_length += ((digit & 0x7F) * multiplier)
+ multiplier *= 0x80
+ pos += 1
+ end while ((digit & 0x80) != 0x00) and pos <= 4
+
+ # Store the expected body length in the packet
+ packet.instance_variable_set('@body_length', body_length)
+
+ # Delete the variable length header from the raw packet passed in
+ buffer.slice!(0...pos)
+
+ return packet
+ end
+
+
# Create a new empty packet
def initialize(args={})
- self.dup = args[:dup] || false
- self.qos = args[:qos] || 0
- self.retain = args[:retain] || false
+ update_attributes({
+ :dup => false,
+ :qos => 0,
+ :retain => false,
+ :body_length => nil
+ }.merge(args))
end
-
+
+ def update_attributes(attr={})
+ attr.each_pair do |k,v|
+ send("#{k}=", v)
+ end
+ end
+
# Get the identifer for this packet type
def type_id
index = MQTT::PACKET_TYPES.index(self.class)
- raise "Invalid packet type: #{self.class}" if index.nil?
+ if index.nil?
+ raise "Invalid packet type: #{self.class}"
+ end
return index
end
-
+
# Set the dup flag (true/false)
def dup=(arg)
if arg.kind_of?(Integer)
- @dup = (arg != 0 ? true : false)
+ @dup = (arg != 0)
else
@dup = arg
end
end
-
+
# Set the retain flag (true/false)
def retain=(arg)
if arg.kind_of?(Integer)
- @retain = (arg != 0 ? true : false)
+ @retain = (arg != 0)
else
@retain = arg
end
end
-
+
# Set the Quality of Service level (0/1/2)
def qos=(arg)
@qos = arg.to_i
end
+ # Set the length of the packet body
+ def body_length=(arg)
+ @body_length = arg.to_i
+ end
+
# Parse the body (variable header and payload) of a packet
def parse_body(buffer)
- unless buffer.size == 0
- raise MQTT::ProtocolException.new("Error: parse_body was not sub-classed for a packet with a payload")
+ if buffer.length != body_length
+ raise ProtocolException.new(
+ "Failed to parse packet - input buffer (#{buffer.length}) is not the same as the body length buffer (#{body_length})"
+ )
end
end
-
+
# Get serialisation of packet's body (variable header and payload)
def encode_body
'' # No body by default
end
-
+
# Serialise the packet
def to_s
# Encode the fixed header
header = [
((type_id.to_i & 0x0F) << 4) |
((dup ? 0x1 : 0x0) << 3) |
((qos.to_i & 0x03) << 1) |
(retain ? 0x1 : 0x0)
]
-
+
# Get the packet's variable header and payload
body = self.encode_body
-
+
# Build up the body length field bytes
- body_size = body.size
+ body_length = body.length
begin
- digit = (body_size % 128)
- body_size = (body_size / 128)
+ digit = (body_length % 128)
+ body_length = (body_length / 128)
# if there are more digits to encode, set the top bit of this digit
- digit |= 0x80 if (body_size > 0)
+ digit |= 0x80 if (body_length > 0)
header.push(digit)
- end while (body_size > 0)
+ end while (body_length > 0)
# Convert header to binary and add on body
header.pack('C*') + body
end
protected
-
+
# Encode an array of bytes and return them
def encode_bytes(*bytes)
bytes.pack('C*')
end
@@ -133,76 +199,82 @@
# Encode a string and return it
# (preceded by the length of the string)
def encode_string(str)
str = str.to_s unless str.is_a?(String)
- encode_short(str.size) + str
+ encode_short(str.length) + str
end
-
+
# Remove a 16-bit unsigned integer from the front of buffer
def shift_short(buffer)
bytes = buffer.slice!(0..1)
bytes.unpack('n').first
end
-
+
# Remove one byte from the front of the string
def shift_byte(buffer)
buffer.slice!(0...1).unpack('C').first
end
-
+
# Remove n bytes from the front of buffer
def shift_data(buffer,bytes)
buffer.slice!(0...bytes)
end
-
+
# Remove string from the front of buffer
def shift_string(buffer)
len = shift_short(buffer)
shift_data(buffer,len)
end
-
+
private
-
+
# Read and unpack a single byte from a socket
def self.read_byte(socket)
byte = socket.read(1)
- raise MQTT::ProtocolException if byte.nil?
+ if byte.nil?
+ raise ProtocolException
+ end
byte.unpack('C').first
end
## PACKET SUBCLASSES ##
-
+
# Class representing an MQTT Publish message
class Publish < MQTT::Packet
attr_accessor :topic
attr_accessor :message_id
attr_accessor :payload
- # Create a new Unsubscribe Acknowledgment packet
+ # Create a new Publish packet
def initialize(args={})
- super(args)
- self.topic = args[:topic] || nil
- self.message_id = args[:message_id] || 0
- self.payload = args[:payload] || ''
+ super({
+ :topic => nil,
+ :message_id => 0,
+ :payload => ''
+ }.merge(args))
end
-
+
# Get serialisation of packet's body
def encode_body
body = ''
- raise "Invalid topic name when serialising packet" if @topic.nil?
+ if @topic.nil?
+ raise "Invalid topic name when serialising packet"
+ end
body += encode_string(@topic)
body += encode_short(@message_id) unless qos == 0
body += payload.to_s
return body
end
-
+
# Parse the body (variable header and payload) of a Publish packet
def parse_body(buffer)
+ super(buffer)
@topic = shift_string(buffer)
@message_id = shift_short(buffer) unless qos == 0
@payload = buffer.dup
end
end
@@ -210,69 +282,106 @@
# Class representing an MQTT Connect Packet
class Connect < MQTT::Packet
attr_accessor :protocol_name
attr_accessor :protocol_version
attr_accessor :client_id
- attr_accessor :clean_start
+ attr_accessor :clean_session
attr_accessor :keep_alive
attr_accessor :will_topic
attr_accessor :will_qos
attr_accessor :will_retain
attr_accessor :will_payload
+ attr_accessor :username
+ attr_accessor :password
+ # OLD deprecated clean_start
+ alias :clean_start :clean_session
+ alias :clean_start= :clean_session=
+
# Create a new Client Connect packet
def initialize(args={})
- super(args)
- self.protocol_name = args[:protocol_name] || 'MQIsdp'
- self.protocol_version = args[:protocol_version] || 0x03
- self.client_id = args[:client_id] || nil
- self.clean_start = args[:clean_start] || true
- self.keep_alive = args[:keep_alive] || 10
- self.will_topic = args[:will_topic] || nil
- self.will_qos = args[:will_qos] || 0
- self.will_retain = args[:will_retain] || false
- self.will_payload = args[:will_payload] || ''
+ super({
+ :protocol_name => 'MQIsdp',
+ :protocol_version => 0x03,
+ :client_id => nil,
+ :clean_session => true,
+ :keep_alive => 10,
+ :will_topic => nil,
+ :will_qos => 0,
+ :will_retain => false,
+ :will_payload => '',
+ :username => nil,
+ :password => nil,
+ }.merge(args))
end
-
+
# Get serialisation of packet's body
def encode_body
body = ''
- raise "Invalid client identifier when serialising packet" if @client_id.nil?
+ if @client_id.nil? or @client_id.length < 1 or @client_id.length > 23
+ raise "Invalid client identifier when serialising packet"
+ end
body += encode_string(@protocol_name)
body += encode_bytes(@protocol_version.to_i)
- body += encode_bytes(0) # Connect Flags
- body += encode_short(@keep_alive) # Keep Alive timer
+
+ # Set the Connect flags
+ @connect_flags = 0
+ @connect_flags |= 0x02 if @clean_session
+ @connect_flags |= 0x04 unless @will_topic.nil?
+ @connect_flags |= ((@will_qos & 0x03) << 3)
+ @connect_flags |= 0x20 if @will_retain
+ @connect_flags |= 0x40 unless @password.nil?
+ @connect_flags |= 0x80 unless @username.nil?
+ body += encode_bytes(@connect_flags)
+
+ body += encode_short(@keep_alive)
body += encode_string(@client_id)
- # FIXME: implement Will
- #unless @will_topic.nil?
- # body += encode_string(@will_topic)
- # body += will_payload.to_s
- #end
+ unless will_topic.nil?
+ body += encode_string(@will_topic)
+ body += encode_string(@will_payload)
+ end
+ body += encode_string(@username) unless @username.nil?
+ body += encode_string(@password) unless @password.nil?
return body
end
-
+
# Parse the body (variable header and payload) of a Connect packet
def parse_body(buffer)
+ super(buffer)
@protocol_name = shift_string(buffer)
@protocol_version = shift_byte(buffer)
- flags = shift_byte(buffer)
+ @connect_flags = shift_byte(buffer)
+ @clean_session = ((@connect_flags & 0x02) >> 1) == 0x01
@keep_alive = shift_short(buffer)
@client_id = shift_string(buffer)
- # FIXME: implement Will
+ if ((@connect_flags & 0x04) >> 2) == 0x01
+ # Last Will and Testament
+ @will_qos = ((@connect_flags & 0x18) >> 3)
+ @will_retain = ((@connect_flags & 0x20) >> 5) == 0x01
+ @will_topic = shift_string(buffer)
+ @will_payload = shift_string(buffer)
+ end
+ if ((@connect_flags & 0x80) >> 7) == 0x01 and buffer.length > 0
+ @username = shift_string(buffer)
+ end
+ if ((@connect_flags & 0x40) >> 6) == 0x01 and buffer.length > 0
+ @password = shift_string(buffer)
+ end
end
end
# Class representing an MQTT Connect Acknowledgment Packet
class Connack < MQTT::Packet
attr_accessor :return_code
# Create a new Client Connect packet
def initialize(args={})
- super(args)
- self.return_code = args[:return_code] || 0
+ super({
+ :return_code => 0x00
+ }.merge(args))
end
-
+
# Get a string message corresponding to a return code
def return_msg
case return_code
when 0x00
"Connection Accepted"
@@ -280,127 +389,156 @@
"Connection refused: unacceptable protocol version"
when 0x02
"Connection refused: client identifier rejected"
when 0x03
"Connection refused: broker unavailable"
+ when 0x04
+ "Connection refused: bad user name or password"
+ when 0x05
+ "Connection refused: not authorised"
else
"Connection refused: error code #{return_code}"
end
end
-
+
# Get serialisation of packet's body
def encode_body
body = ''
body += encode_bytes(0) # Unused
body += encode_bytes(@return_code.to_i) # Return Code
return body
end
-
+
# Parse the body (variable header and payload) of a Connect Acknowledgment packet
def parse_body(buffer)
+ super(buffer)
unused = shift_byte(buffer)
@return_code = shift_byte(buffer)
- end
+ unless buffer.empty?
+ raise ProtocolException.new("Extra bytes at end of Connect Acknowledgment packet")
+ end
+ end
end
# Class representing an MQTT Publish Acknowledgment packet
class Puback < MQTT::Packet
attr_accessor :message_id
-
- # Create a new Unsubscribe Acknowledgment packet
+
+ # Create a new Publish Acknowledgment packet
def initialize(args={})
- super(args)
- self.message_id = args[:message_id] || 0
+ super({
+ :message_id => 0
+ }.merge(args))
end
-
+
# Get serialisation of packet's body
def encode_body
encode_short(@message_id)
end
-
+
# Parse the body (variable header and payload) of a packet
def parse_body(buffer)
+ super(buffer)
@message_id = shift_short(buffer)
+ unless buffer.empty?
+ raise ProtocolException.new("Extra bytes at end of Publish Acknowledgment packet")
+ end
end
end
# Class representing an MQTT Publish Received packet
class Pubrec < MQTT::Packet
attr_accessor :message_id
-
- # Create a new Unsubscribe Acknowledgment packet
+
+ # Create a new Publish Recieved packet
def initialize(args={})
- super(args)
- self.message_id = args[:message_id] || 0
+ super({
+ :message_id => 0
+ }.merge(args))
end
-
+
# Get serialisation of packet's body
def encode_body
encode_short(@message_id)
end
-
+
# Parse the body (variable header and payload) of a packet
def parse_body(buffer)
+ super(buffer)
@message_id = shift_short(buffer)
+ unless buffer.empty?
+ raise ProtocolException.new("Extra bytes at end of Publish Received packet")
+ end
end
end
# Class representing an MQTT Publish Release packet
class Pubrel < MQTT::Packet
attr_accessor :message_id
-
- # Create a new Unsubscribe Acknowledgment packet
+
+ # Create a new Publish Release packet
def initialize(args={})
- super(args)
- self.message_id = args[:message_id] || 0
+ super({
+ :message_id => 0
+ }.merge(args))
end
-
+
# Get serialisation of packet's body
def encode_body
encode_short(@message_id)
end
-
+
# Parse the body (variable header and payload) of a packet
def parse_body(buffer)
+ super(buffer)
@message_id = shift_short(buffer)
+ unless buffer.empty?
+ raise ProtocolException.new("Extra bytes at end of Publish Release packet")
+ end
end
end
# Class representing an MQTT Publish Complete packet
class Pubcomp < MQTT::Packet
attr_accessor :message_id
-
- # Create a new Unsubscribe Acknowledgment packet
+
+ # Create a new Publish Complete packet
def initialize(args={})
- super(args)
- self.message_id = args[:message_id] || 0
+ super({
+ :message_id => 0
+ }.merge(args))
end
-
+
# Get serialisation of packet's body
def encode_body
encode_short(@message_id)
end
-
+
# Parse the body (variable header and payload) of a packet
def parse_body(buffer)
+ super(buffer)
@message_id = shift_short(buffer)
+ unless buffer.empty?
+ raise ProtocolException.new("Extra bytes at end of Publish Complete packet")
+ end
end
end
# Class representing an MQTT Client Subscribe packet
class Subscribe < MQTT::Packet
- attr_reader :topics
attr_accessor :message_id
-
- # Create a new Unsubscribe Acknowledgment packet
+ attr_reader :topics
+
+ # Create a new Subscribe packet
def initialize(args={})
- super(args)
- self.topics = args[:topics] || []
- self.message_id = args[:message_id] || 0
+ super({
+ :topics => [],
+ :message_id => 0
+ }.merge(args))
self.qos = 1 # Force a QOS of 1
end
-
+
# Set one or more topics for the Subscrible packet
# The topics parameter should be one of the following:
# * String: subscribe to one topic with QOS 0
# * Array: subscribe to multiple topics with QOS 0
# * Hash: subscribe to multiple topics where the key is the topic and the value is the QOS level
@@ -416,13 +554,13 @@
if value.is_a?(Array)
input = value.flatten
else
input = [value]
end
-
+
@topics = []
- while(input.size>0)
+ while(input.length>0)
item = input.shift
if item.is_a?(Hash)
# Convert hash into an ordered array of arrays
@topics += item.sort
elsif item.is_a?(String)
@@ -438,27 +576,30 @@
raise "Invalid topics input: #{value.inspect}"
end
end
@topics
end
-
+
# Get serialisation of packet's body
def encode_body
- raise "no topics given when serialising packet" if @topics.empty?
+ if @topics.empty?
+ raise "no topics given when serialising packet"
+ end
body = encode_short(@message_id)
topics.each do |item|
body += encode_string(item[0])
body += encode_bytes(item[1])
end
return body
end
-
+
# Parse the body (variable header and payload) of a packet
def parse_body(buffer)
+ super(buffer)
@message_id = shift_short(buffer)
@topics = []
- while(buffer.size>0)
+ while(buffer.length>0)
topic_name = shift_string(buffer)
topic_qos = shift_byte(buffer)
@topics << [topic_name,topic_qos]
end
end
@@ -466,123 +607,162 @@
# Class representing an MQTT Subscribe Acknowledgment packet
class Suback < MQTT::Packet
attr_accessor :message_id
attr_reader :granted_qos
-
- # Create a new Unsubscribe Acknowledgment packet
+
+ # Create a new Subscribe Acknowledgment packet
def initialize(args={})
- super(args)
- self.message_id = args[:message_id] || 0
- self.granted_qos = args[:granted_qos] || []
+ super({
+ :granted_qos => [],
+ :message_id => 0
+ }.merge(args))
end
-
+
def granted_qos=(value)
- raise "granted QOS should be an array of arrays" unless value.is_a?(Array)
+ unless value.is_a?(Array)
+ raise "granted QOS should be an array of arrays"
+ end
@granted_qos = value
end
-
+
# Get serialisation of packet's body
def encode_body
- raise "no granted QOS given when serialising packet" if @granted_qos.empty?
+ if @granted_qos.empty?
+ raise "no granted QOS given when serialising packet"
+ end
body = encode_short(@message_id)
granted_qos.flatten.each { |qos| body += encode_bytes(qos) }
return body
end
-
+
# Parse the body (variable header and payload) of a packet
def parse_body(buffer)
+ super(buffer)
@message_id = shift_short(buffer)
- while(buffer.size>0)
+ while(buffer.length>0)
@granted_qos << [shift_byte(buffer),shift_byte(buffer)]
end
end
end
# Class representing an MQTT Client Unsubscribe packet
class Unsubscribe < MQTT::Packet
attr_reader :topics
attr_accessor :message_id
-
- # Create a new Unsubscribe Acknowledgment packet
+
+ # Create a new Unsubscribe packet
def initialize(args={})
- super(args)
- self.topics = args[:topics] || []
- self.message_id = args[:message_id] || 0
+ super({
+ :topics => [],
+ :message_id => 0
+ }.merge(args))
self.qos = 1 # Force a QOS of 1
end
-
+
def topics=(value)
if value.is_a?(Array)
@topics = value
else
@topics = [value]
end
end
-
+
# Get serialisation of packet's body
def encode_body
- raise "no topics given when serialising packet" if @topics.empty?
+ if @topics.empty?
+ raise "no topics given when serialising packet"
+ end
body = encode_short(@message_id)
topics.each { |topic| body += encode_string(topic) }
return body
end
-
+
# Parse the body (variable header and payload) of a packet
def parse_body(buffer)
+ super(buffer)
@message_id = shift_short(buffer)
- while(buffer.size>0)
+ while(buffer.length>0)
@topics << shift_string(buffer)
end
end
end
# Class representing an MQTT Unsubscribe Acknowledgment packet
class Unsuback < MQTT::Packet
attr_accessor :message_id
-
+
# Create a new Unsubscribe Acknowledgment packet
def initialize(args={})
- super(args)
- self.message_id = args[:message_id] || 0
+ super({
+ :message_id => 0
+ }.merge(args))
end
-
+
# Get serialisation of packet's body
def encode_body
encode_short(@message_id)
end
-
+
# Parse the body (variable header and payload) of a packet
def parse_body(buffer)
+ super(buffer)
@message_id = shift_short(buffer)
+ unless buffer.empty?
+ raise ProtocolException.new("Extra bytes at end of Unsubscribe Acknowledgment packet")
+ end
end
end
# Class representing an MQTT Ping Request packet
class Pingreq < MQTT::Packet
# Create a new Ping Request packet
def initialize(args={})
super(args)
end
+
+ # Check the body
+ def parse_body(buffer)
+ super(buffer)
+ unless buffer.empty?
+ raise ProtocolException.new("Extra bytes at end of Ping Request packet")
+ end
+ end
end
# Class representing an MQTT Ping Response packet
class Pingresp < MQTT::Packet
# Create a new Ping Response packet
def initialize(args={})
super(args)
end
+
+ # Check the body
+ def parse_body(buffer)
+ super(buffer)
+ unless buffer.empty?
+ raise ProtocolException.new("Extra bytes at end of Ping Response packet")
+ end
+ end
end
# Class representing an MQTT Client Disconnect packet
class Disconnect < MQTT::Packet
# Create a new Client Disconnect packet
def initialize(args={})
super(args)
end
+
+ # Check the body
+ def parse_body(buffer)
+ super(buffer)
+ unless buffer.empty?
+ raise ProtocolException.new("Extra bytes at end of Disconnect packet")
+ end
+ end
end
-
+
end
# An enumeration of the MQTT packet types
PACKET_TYPES = [
@@ -601,7 +781,7 @@
MQTT::Packet::Pingreq,
MQTT::Packet::Pingresp,
MQTT::Packet::Disconnect,
nil
]
-
+
end