lib/mqtt/client.rb in mqtt-0.2.0 vs lib/mqtt/client.rb in mqtt-0.3.0
- old
+ new
@@ -1,65 +1,69 @@
autoload :OpenSSL, 'openssl'
autoload :URI, 'uri'
-# Client class for talking to an MQTT broker
+# Client class for talking to an MQTT server
class MQTT::Client
- # Hostname of the remote broker
- attr_accessor :remote_host
+ # Hostname of the remote server
+ attr_accessor :host
- # Port number of the remote broker
- attr_accessor :remote_port
+ # Port number of the remote server
+ attr_accessor :port
+ # The version number of the MQTT protocol to use (default 3.1.0)
+ attr_accessor :version
+
# Set to true to enable SSL/TLS encrypted communication
#
# Set to a symbol to use a specific variant of SSL/TLS.
- # Allowed values include:
- #
+ # Allowed values include:
+ #
# @example Using TLS 1.0
# client = Client.new('mqtt.example.com', :ssl => :TLSv1)
# @see OpenSSL::SSL::SSLContext::METHODS
attr_accessor :ssl
- # Time (in seconds) between pings to remote broker
+ # Time (in seconds) between pings to remote server (default is 15 seconds)
attr_accessor :keep_alive
- # Set the 'Clean Session' flag when connecting?
+ # Set the 'Clean Session' flag when connecting? (default is true)
attr_accessor :clean_session
# Client Identifier
attr_accessor :client_id
- # Number of seconds to wait for acknowledgement packets
+ # Number of seconds to wait for acknowledgement packets (default is 5 seconds)
attr_accessor :ack_timeout
- # Username to authenticate to the broker with
+ # Username to authenticate to the server with
attr_accessor :username
- # Password to authenticate to the broker with
+ # Password to authenticate to the server with
attr_accessor :password
# The topic that the Will message is published to
attr_accessor :will_topic
- # Contents of message that is sent by broker when client disconnect
+ # Contents of message that is sent by server when client disconnect
attr_accessor :will_payload
- # The QoS level of the will message sent by the broker
+ # The QoS level of the will message sent by the server
attr_accessor :will_qos
- # If the Will message should be retain by the broker after it is sent
+ # If the Will message should be retain by the server after it is sent
attr_accessor :will_retain
# Timeout between select polls (in seconds)
SELECT_TIMEOUT = 0.5
# Default attribute values
ATTR_DEFAULTS = {
- :remote_host => nil,
- :remote_port => nil,
+ :host => nil,
+ :port => nil,
+ :version => '3.1.0',
:keep_alive => 15,
:clean_session => true,
:client_id => nil,
:ack_timeout => 5,
:username => nil,
@@ -87,11 +91,11 @@
return client
end
# Generate a random client identifier
# (using the characters 0-9 and a-z)
- def self.generate_client_id(prefix='ruby_', length=16)
+ def self.generate_client_id(prefix='ruby', length=16)
str = prefix.dup
length.times do
num = rand(36)
if (num<10)
# Number
@@ -111,47 +115,47 @@
# - a URI that uses the MQTT scheme
# - a hostname and port
# - a Hash containing attributes to be set on the new instance
#
# If no arguments are given then the method will look for a URI
- # in the MQTT_BROKER environment variable.
+ # in the MQTT_SERVER environment variable.
#
# Examples:
# client = MQTT::Client.new
# client = MQTT::Client.new('mqtt://myserver.example.com')
# client = MQTT::Client.new('mqtt://user:pass@myserver.example.com')
# client = MQTT::Client.new('myserver.example.com')
# client = MQTT::Client.new('myserver.example.com', 18830)
- # client = MQTT::Client.new(:remote_host => 'myserver.example.com')
- # client = MQTT::Client.new(:remote_host => 'myserver.example.com', :keep_alive => 30)
+ # client = MQTT::Client.new(:host => 'myserver.example.com')
+ # client = MQTT::Client.new(:host => 'myserver.example.com', :keep_alive => 30)
#
def initialize(*args)
if args.last.is_a?(Hash)
attr = args.pop
else
attr = {}
end
if args.length == 0
- if ENV['MQTT_BROKER']
- attr.merge!(parse_uri(ENV['MQTT_BROKER']))
+ if ENV['MQTT_SERVER']
+ attr.merge!(parse_uri(ENV['MQTT_SERVER']))
end
end
if args.length >= 1
case args[0]
when URI
attr.merge!(parse_uri(args[0]))
when %r|^mqtts?://|
attr.merge!(parse_uri(args[0]))
else
- attr.merge!(:remote_host => args[0])
+ attr.merge!(:host => args[0])
end
end
if args.length >= 2
- attr.merge!(:remote_port => args[1])
+ attr.merge!(:port => args[1]) unless args[1].nil?
end
if args.length >= 3
raise ArgumentError, "Unsupported number of arguments"
end
@@ -160,16 +164,16 @@
ATTR_DEFAULTS.merge(attr).each_pair do |k,v|
self.send("#{k}=", v)
end
# Set a default port number
- if @remote_port.nil?
- @remote_port = @ssl ? MQTT::DEFAULT_SSL_PORT : MQTT::DEFAULT_PORT
+ if @port.nil?
+ @port = @ssl ? MQTT::DEFAULT_SSL_PORT : MQTT::DEFAULT_PORT
end
# Initialise private instance variables
- @message_id = 0
+ @packet_id = 0
@last_pingreq = Time.now
@last_pingresp = Time.now
@socket = nil
@read_queue = Queue.new
@read_thread = nil
@@ -199,41 +203,44 @@
end
end
# Set the Will for the client
#
- # The will is a message that will be delivered by the broker when the client dies.
- # The Will must be set before establishing a connection to the broker
+ # The will is a message that will be delivered by the server when the client dies.
+ # The Will must be set before establishing a connection to the server
def set_will(topic, payload, retain=false, qos=0)
self.will_topic = topic
self.will_payload = payload
self.will_retain = retain
self.will_qos = qos
end
- # Connect to the MQTT broker
+ # Connect to the MQTT server
# If a block is given, then yield to that block and then disconnect again.
def connect(clientid=nil)
unless clientid.nil?
@client_id = clientid
end
if @client_id.nil? or @client_id.empty?
if @clean_session
- @client_id = MQTT::Client.generate_client_id
+ if @version == '3.1.0'
+ # Empty client id is not allowed for version 3.1.0
+ @client_id = MQTT::Client.generate_client_id
+ end
else
raise 'Must provide a client_id if clean_session is set to false'
end
end
- if @remote_host.nil?
- raise 'No MQTT broker host set when attempting to connect'
+ if @host.nil?
+ raise 'No MQTT server host set when attempting to connect'
end
if not connected?
# Create network socket
- tcp_socket = TCPSocket.new(@remote_host, @remote_port)
+ tcp_socket = TCPSocket.new(@host, @port)
if @ssl
# Set the protocol version
if @ssl.is_a?(Symbol)
ssl_context.ssl_version = @ssl
@@ -244,12 +251,13 @@
@socket.connect
else
@socket = tcp_socket
end
- # Protocol name and version
+ # Construct a connect packet
packet = MQTT::Packet::Connect.new(
+ :version => @version,
:clean_session => @clean_session,
:keep_alive => @keep_alive,
:client_id => @client_id,
:username => @username,
:password => @password,
@@ -279,12 +287,12 @@
yield(self)
disconnect
end
end
- # Disconnect from the MQTT broker.
- # If you don't want to say goodbye to the broker, set send_msg to false.
+ # Disconnect from the MQTT server.
+ # If you don't want to say goodbye to the server, set send_msg to false.
def disconnect(send_msg=true)
# Stop reading packets from the socket first
@read_thread.kill if @read_thread and @read_thread.alive?
@read_thread = nil
@@ -297,11 +305,11 @@
@socket.close unless @socket.nil?
@socket = nil
end
end
- # Checks whether the client is connected to the broker.
+ # Checks whether the client is connected to the server.
def connected?
(not @socket.nil?) and (not @socket.closed?)
end
# Send a MQTT ping message to indicate that the MQTT client is alive.
@@ -312,25 +320,28 @@
packet = MQTT::Packet::Pingreq.new
send_packet(packet)
@last_pingreq = Time.now
end
- # Publish a message on a particular topic to the MQTT broker.
- def publish(topic, payload, retain=false, qos=0)
+ # Publish a message on a particular topic to the MQTT server.
+ def publish(topic, payload='', retain=false, qos=0)
+ raise ArgumentError.new("Topic name cannot be nil") if topic.nil?
+ raise ArgumentError.new("Topic name cannot be empty") if topic.empty?
+
packet = MQTT::Packet::Publish.new(
+ :id => @packet_id.next,
:qos => qos,
:retain => retain,
:topic => topic,
- :payload => payload,
- :message_id => @message_id.next
+ :payload => payload
)
# Send the packet
send_packet(packet)
end
- # Send a subscribe message for one or more topics on the MQTT broker.
+ # Send a subscribe message for one or more topics on the MQTT server.
# The topics parameter should be one of the following:
# * String: subscribe to one topic with QOS 0
# * Array: subscribe to multiple topics with QOS 0
# * Hash: subscribe to multiple topics where the key is the topic and the value is the QOS level
#
@@ -340,17 +351,17 @@
# client.subscribe( ['a/b',0], ['c/d',1] )
# client.subscribe( 'a/b' => 0, 'c/d' => 1 )
#
def subscribe(*topics)
packet = MQTT::Packet::Subscribe.new(
- :topics => topics,
- :message_id => @message_id.next
+ :id => @packet_id.next,
+ :topics => topics
)
send_packet(packet)
end
- # Return the next message received from the MQTT broker.
+ # Return the next message received from the MQTT server.
# An optional topic can be given to subscribe to.
#
# The method either returns the topic and message as an array:
# topic,message = client.get
#
@@ -374,11 +385,11 @@
packet = @read_queue.pop
return packet.topic, packet.payload
end
end
- # Return the next packet object received from the MQTT broker.
+ # Return the next packet object received from the MQTT server.
# An optional topic can be given to subscribe to.
#
# The method either returns a single packet:
# packet = client.get_packet
# puts packet.topic
@@ -412,26 +423,26 @@
# Returns the length of the incoming message queue.
def queue_length
@read_queue.length
end
- # Send a unsubscribe message for one or more topics on the MQTT broker
+ # Send a unsubscribe message for one or more topics on the MQTT server
def unsubscribe(*topics)
if topics.is_a?(Enumerable) and topics.count == 1
topics = topics.first
end
packet = MQTT::Packet::Unsubscribe.new(
:topics => topics,
- :message_id => @message_id.next
+ :id => @packet_id.next
)
send_packet(packet)
end
private
- # Try to read a packet from the broker
+ # Try to read a packet from the server
# Also sends keep-alive ping packets.
def receive_packet
begin
# Poll socket - is there data waiting?
result = IO.select([@socket], [], [], SELECT_TIMEOUT)
@@ -468,21 +479,23 @@
# Read and check a connection acknowledgement packet
def receive_connack
Timeout.timeout(@ack_timeout) do
packet = MQTT::Packet.read(@socket)
if packet.class != MQTT::Packet::Connack
- raise MQTT::ProtocolException.new("Response wan't a connection acknowledgement: #{packet.class}")
+ raise MQTT::ProtocolException.new(
+ "Response wasn't a connection acknowledgement: #{packet.class}"
+ )
end
# Check the return code
if packet.return_code != 0x00
raise MQTT::ProtocolException.new(packet.return_msg)
end
end
end
- # Send a packet to broker
+ # Send a packet to server
def send_packet(data)
# Throw exception if we aren't connected
raise MQTT::NotConnectedException if not connected?
# Only allow one thread to write to socket at a time
@@ -501,14 +514,38 @@
else
raise "Only the mqtt:// and mqtts:// schemes are supported"
end
{
- :remote_host => uri.host,
- :remote_port => uri.port || nil,
+ :host => uri.host,
+ :port => uri.port || nil,
:username => uri.user,
:password => uri.password,
:ssl => ssl
}
+ end
+
+
+ # ---- Deprecated attributes and methods ---- #
+ public
+
+ # @deprecated Please use {#host} instead
+ def remote_host
+ host
+ end
+
+ # @deprecated Please use {#host=} instead
+ def remote_host=(args)
+ self.host = args
+ end
+
+ # @deprecated Please use {#port} instead
+ def remote_port
+ port
+ end
+
+ # @deprecated Please use {#port=} instead
+ def remote_port=(args)
+ self.port = args
end
end