lib/mqtt/client.rb in mqtt-0.5.0 vs lib/mqtt/client.rb in mqtt-0.6.0
- old
+ new
@@ -1,483 +1,473 @@
autoload :OpenSSL, 'openssl'
autoload :URI, 'uri'
+autoload :CGI, 'cgi'
-
# Client class for talking to an MQTT server
-class MQTT::Client
- # Hostname of the remote server
- attr_accessor :host
+module MQTT
+ class Client
+ # Hostname of the remote server
+ attr_accessor :host
- # Port number of the remote server
- attr_accessor :port
+ # Port number of the remote server
+ attr_accessor :port
- # The version number of the MQTT protocol to use (default 3.1.1)
- attr_accessor :version
+ # The version number of the MQTT protocol to use (default 3.1.1)
+ attr_accessor :version
- # Set to true to enable SSL/TLS encrypted communication
- #
- # Set to a symbol to use a specific variant of SSL/TLS.
- # Allowed values include:
- #
- # @example Using TLS 1.0
- # client = Client.new('mqtt.example.com', :ssl => :TLSv1)
- # @see OpenSSL::SSL::SSLContext::METHODS
- attr_accessor :ssl
+ # Set to true to enable SSL/TLS encrypted communication
+ #
+ # Set to a symbol to use a specific variant of SSL/TLS.
+ # Allowed values include:
+ #
+ # @example Using TLS 1.0
+ # client = Client.new('mqtt.example.com', :ssl => :TLSv1)
+ # @see OpenSSL::SSL::SSLContext::METHODS
+ attr_accessor :ssl
- # Time (in seconds) between pings to remote server (default is 15 seconds)
- attr_accessor :keep_alive
+ # Time (in seconds) between pings to remote server (default is 15 seconds)
+ attr_accessor :keep_alive
- # Set the 'Clean Session' flag when connecting? (default is true)
- attr_accessor :clean_session
+ # Set the 'Clean Session' flag when connecting? (default is true)
+ attr_accessor :clean_session
- # Client Identifier
- attr_accessor :client_id
+ # Client Identifier
+ attr_accessor :client_id
- # Number of seconds to wait for acknowledgement packets (default is 5 seconds)
- attr_accessor :ack_timeout
+ # Number of seconds to wait for acknowledgement packets (default is 5 seconds)
+ attr_accessor :ack_timeout
- # Username to authenticate to the server with
- attr_accessor :username
+ # Username to authenticate to the server with
+ attr_accessor :username
- # Password to authenticate to the server with
- attr_accessor :password
+ # Password to authenticate to the server with
+ attr_accessor :password
- # The topic that the Will message is published to
- attr_accessor :will_topic
+ # The topic that the Will message is published to
+ attr_accessor :will_topic
- # Contents of message that is sent by server when client disconnect
- attr_accessor :will_payload
+ # Contents of message that is sent by server when client disconnect
+ attr_accessor :will_payload
- # The QoS level of the will message sent by the server
- attr_accessor :will_qos
+ # The QoS level of the will message sent by the server
+ attr_accessor :will_qos
- # If the Will message should be retain by the server after it is sent
- attr_accessor :will_retain
+ # If the Will message should be retain by the server after it is sent
+ attr_accessor :will_retain
- # Last ping response time
- attr_reader :last_ping_response
+ # Last ping response time
+ attr_reader :last_ping_response
+ # Timeout between select polls (in seconds)
+ SELECT_TIMEOUT = 0.5
- # Timeout between select polls (in seconds)
- SELECT_TIMEOUT = 0.5
+ # Default attribute values
+ ATTR_DEFAULTS = {
+ :host => nil,
+ :port => nil,
+ :version => '3.1.1',
+ :keep_alive => 15,
+ :clean_session => true,
+ :client_id => nil,
+ :ack_timeout => 5,
+ :username => nil,
+ :password => nil,
+ :will_topic => nil,
+ :will_payload => nil,
+ :will_qos => 0,
+ :will_retain => false,
+ :ssl => false
+ }
- # Default attribute values
- ATTR_DEFAULTS = {
- :host => nil,
- :port => nil,
- :version => '3.1.1',
- :keep_alive => 15,
- :clean_session => true,
- :client_id => nil,
- :ack_timeout => 5,
- :username => nil,
- :password => nil,
- :will_topic => nil,
- :will_payload => nil,
- :will_qos => 0,
- :will_retain => false,
- :ssl => false
- }
+ # Create and connect a new MQTT Client
+ #
+ # Accepts the same arguments as creating a new client.
+ # If a block is given, then it will be executed before disconnecting again.
+ #
+ # Example:
+ # MQTT::Client.connect('myserver.example.com') do |client|
+ # # do stuff here
+ # end
+ #
+ def self.connect(*args, &block)
+ client = MQTT::Client.new(*args)
+ client.connect(&block)
+ client
+ end
- # Create and connect a new MQTT Client
- #
- # Accepts the same arguments as creating a new client.
- # If a block is given, then it will be executed before disconnecting again.
- #
- # Example:
- # MQTT::Client.connect('myserver.example.com') do |client|
- # # do stuff here
- # end
- #
- def self.connect(*args, &block)
- client = MQTT::Client.new(*args)
- client.connect(&block)
- return client
- end
-
- # Generate a random client identifier
- # (using the characters 0-9 and a-z)
- def self.generate_client_id(prefix='ruby', length=16)
- str = prefix.dup
- length.times do
- num = rand(36)
- if (num<10)
- # Number
- num += 48
- else
- # Letter
- num += 87
+ # Generate a random client identifier
+ # (using the characters 0-9 and a-z)
+ def self.generate_client_id(prefix = 'ruby', length = 16)
+ str = prefix.dup
+ length.times do
+ num = rand(36)
+ # Adjust based on number or letter.
+ num += num < 10 ? 48 : 87
+ str += num.chr
end
- str += num.chr
+ str
end
- return str
- end
- # Create a new MQTT Client instance
- #
- # Accepts one of the following:
- # - a URI that uses the MQTT scheme
- # - a hostname and port
- # - a Hash containing attributes to be set on the new instance
- #
- # If no arguments are given then the method will look for a URI
- # in the MQTT_SERVER environment variable.
- #
- # Examples:
- # client = MQTT::Client.new
- # client = MQTT::Client.new('mqtt://myserver.example.com')
- # client = MQTT::Client.new('mqtt://user:pass@myserver.example.com')
- # client = MQTT::Client.new('myserver.example.com')
- # client = MQTT::Client.new('myserver.example.com', 18830)
- # client = MQTT::Client.new(:host => 'myserver.example.com')
- # client = MQTT::Client.new(:host => 'myserver.example.com', :keep_alive => 30)
- #
- def initialize(*args)
- if args.last.is_a?(Hash)
- attr = args.pop
- else
- attr = {}
- end
+ # Create a new MQTT Client instance
+ #
+ # Accepts one of the following:
+ # - a URI that uses the MQTT scheme
+ # - a hostname and port
+ # - a Hash containing attributes to be set on the new instance
+ #
+ # If no arguments are given then the method will look for a URI
+ # in the MQTT_SERVER environment variable.
+ #
+ # Examples:
+ # client = MQTT::Client.new
+ # client = MQTT::Client.new('mqtt://myserver.example.com')
+ # client = MQTT::Client.new('mqtt://user:pass@myserver.example.com')
+ # client = MQTT::Client.new('myserver.example.com')
+ # client = MQTT::Client.new('myserver.example.com', 18830)
+ # client = MQTT::Client.new(:host => 'myserver.example.com')
+ # client = MQTT::Client.new(:host => 'myserver.example.com', :keep_alive => 30)
+ #
+ def initialize(*args)
+ attributes = args.last.is_a?(Hash) ? args.pop : {}
- if args.length == 0
- if ENV['MQTT_SERVER']
- attr.merge!(parse_uri(ENV['MQTT_SERVER']))
- end
- end
+ # Set server URI from environment if present
+ attributes.merge!(parse_uri(ENV['MQTT_SERVER'])) if args.length.zero? && ENV['MQTT_SERVER']
- if args.length >= 1
- case args[0]
+ if args.length >= 1
+ case args[0]
when URI
- attr.merge!(parse_uri(args[0]))
- when %r|^mqtts?://|
- attr.merge!(parse_uri(args[0]))
+ attributes.merge!(parse_uri(args[0]))
+ when %r{^mqtts?://}
+ attributes.merge!(parse_uri(args[0]))
else
- attr.merge!(:host => args[0])
+ attributes[:host] = args[0]
+ end
end
- end
- if args.length >= 2
- attr.merge!(:port => args[1]) unless args[1].nil?
- end
+ if args.length >= 2
+ attributes[:port] = args[1] unless args[1].nil?
+ end
- if args.length >= 3
- raise ArgumentError, "Unsupported number of arguments"
- end
+ raise ArgumentError, 'Unsupported number of arguments' if args.length >= 3
- # Merge arguments with default values for attributes
- ATTR_DEFAULTS.merge(attr).each_pair do |k,v|
- self.send("#{k}=", v)
- end
+ # Merge arguments with default values for attributes
+ ATTR_DEFAULTS.merge(attributes).each_pair do |k, v|
+ send("#{k}=", v)
+ end
- # Set a default port number
- if @port.nil?
- @port = @ssl ? MQTT::DEFAULT_SSL_PORT : MQTT::DEFAULT_PORT
- end
+ # Set a default port number
+ if @port.nil?
+ @port = @ssl ? MQTT::DEFAULT_SSL_PORT : MQTT::DEFAULT_PORT
+ end
- # Initialise private instance variables
- @last_ping_request = Time.now
- @last_ping_response = Time.now
- @socket = nil
- @read_queue = Queue.new
- @pubacks = {}
- @read_thread = nil
- @write_semaphore = Mutex.new
- @pubacks_semaphore = Mutex.new
- end
+ if @ssl
+ require 'openssl'
+ require 'mqtt/openssl_fix'
+ end
- # Get the OpenSSL context, that is used if SSL/TLS is enabled
- def ssl_context
- @ssl_context ||= OpenSSL::SSL::SSLContext.new
- end
+ # Initialise private instance variables
+ @last_ping_request = current_time
+ @last_ping_response = current_time
+ @socket = nil
+ @read_queue = Queue.new
+ @pubacks = {}
+ @read_thread = nil
+ @write_semaphore = Mutex.new
+ @pubacks_semaphore = Mutex.new
+ end
- # Set a path to a file containing a PEM-format client certificate
- def cert_file=(path)
- self.cert = File.read(path)
- end
+ # Get the OpenSSL context, that is used if SSL/TLS is enabled
+ def ssl_context
+ @ssl_context ||= OpenSSL::SSL::SSLContext.new
+ end
- # PEM-format client certificate
- def cert=(cert)
- ssl_context.cert = OpenSSL::X509::Certificate.new(cert)
- end
+ # Set a path to a file containing a PEM-format client certificate
+ def cert_file=(path)
+ self.cert = File.read(path)
+ end
- # Set a path to a file containing a PEM-format client private key
- def key_file=(*args)
- path, passphrase = args.flatten
- ssl_context.key = OpenSSL::PKey::RSA.new(File.open(path), passphrase)
- end
+ # PEM-format client certificate
+ def cert=(cert)
+ ssl_context.cert = OpenSSL::X509::Certificate.new(cert)
+ end
- # Set to a PEM-format client private key
- def key=(*args)
- cert, passphrase = args.flatten
- ssl_context.key = OpenSSL::PKey::RSA.new(cert, passphrase)
- end
+ # Set a path to a file containing a PEM-format client private key
+ def key_file=(*args)
+ path, passphrase = args.flatten
+ ssl_context.key = OpenSSL::PKey::RSA.new(File.open(path), passphrase)
+ end
- # Set a path to a file containing a PEM-format CA certificate and enable peer verification
- def ca_file=(path)
- ssl_context.ca_file = path
- unless path.nil?
- ssl_context.verify_mode = OpenSSL::SSL::VERIFY_PEER
+ # Set to a PEM-format client private key
+ def key=(*args)
+ cert, passphrase = args.flatten
+ ssl_context.key = OpenSSL::PKey::RSA.new(cert, passphrase)
end
- end
- # Set the Will for the client
- #
- # The will is a message that will be delivered by the server when the client dies.
- # The Will must be set before establishing a connection to the server
- def set_will(topic, payload, retain=false, qos=0)
- self.will_topic = topic
- self.will_payload = payload
- self.will_retain = retain
- self.will_qos = qos
- end
+ # Set a path to a file containing a PEM-format CA certificate and enable peer verification
+ def ca_file=(path)
+ ssl_context.ca_file = path
+ ssl_context.verify_mode = OpenSSL::SSL::VERIFY_PEER unless path.nil?
+ end
- # Connect to the MQTT server
- # If a block is given, then yield to that block and then disconnect again.
- def connect(clientid=nil)
- unless clientid.nil?
- @client_id = clientid
+ # Set the Will for the client
+ #
+ # The will is a message that will be delivered by the server when the client dies.
+ # The Will must be set before establishing a connection to the server
+ def set_will(topic, payload, retain = false, qos = 0)
+ self.will_topic = topic
+ self.will_payload = payload
+ self.will_retain = retain
+ self.will_qos = qos
end
- if @client_id.nil? or @client_id.empty?
- if @clean_session
- if @version == '3.1.0'
- # Empty client id is not allowed for version 3.1.0
- @client_id = MQTT::Client.generate_client_id
- end
- else
- raise 'Must provide a client_id if clean_session is set to false'
+ # Connect to the MQTT server
+ # If a block is given, then yield to that block and then disconnect again.
+ def connect(clientid = nil)
+ @client_id = clientid unless clientid.nil?
+
+ if @client_id.nil? || @client_id.empty?
+ raise 'Must provide a client_id if clean_session is set to false' unless @clean_session
+
+ # Empty client id is not allowed for version 3.1.0
+ @client_id = MQTT::Client.generate_client_id if @version == '3.1.0'
end
- end
- if @host.nil?
- raise 'No MQTT server host set when attempting to connect'
- end
+ raise 'No MQTT server host set when attempting to connect' if @host.nil?
- if not connected?
- # Create network socket
- tcp_socket = TCPSocket.new(@host, @port)
+ unless connected?
+ # Create network socket
+ tcp_socket = TCPSocket.new(@host, @port)
- if @ssl
- # Set the protocol version
- if @ssl.is_a?(Symbol)
- ssl_context.ssl_version = @ssl
- end
+ if @ssl
+ # Set the protocol version
+ ssl_context.ssl_version = @ssl if @ssl.is_a?(Symbol)
- @socket = OpenSSL::SSL::SSLSocket.new(tcp_socket, ssl_context)
- @socket.sync_close = true
+ @socket = OpenSSL::SSL::SSLSocket.new(tcp_socket, ssl_context)
+ @socket.sync_close = true
- # Set hostname on secure socket for Server Name Indication (SNI)
- if @socket.respond_to?(:hostname=)
- @socket.hostname = @host
+ # Set hostname on secure socket for Server Name Indication (SNI)
+ @socket.hostname = @host if @socket.respond_to?(:hostname=)
+
+ @socket.connect
+ else
+ @socket = tcp_socket
end
- @socket.connect
- else
- @socket = tcp_socket
- end
+ # Construct a connect packet
+ packet = MQTT::Packet::Connect.new(
+ :version => @version,
+ :clean_session => @clean_session,
+ :keep_alive => @keep_alive,
+ :client_id => @client_id,
+ :username => @username,
+ :password => @password,
+ :will_topic => @will_topic,
+ :will_payload => @will_payload,
+ :will_qos => @will_qos,
+ :will_retain => @will_retain
+ )
- # Construct a connect packet
- packet = MQTT::Packet::Connect.new(
- :version => @version,
- :clean_session => @clean_session,
- :keep_alive => @keep_alive,
- :client_id => @client_id,
- :username => @username,
- :password => @password,
- :will_topic => @will_topic,
- :will_payload => @will_payload,
- :will_qos => @will_qos,
- :will_retain => @will_retain
- )
+ # Send packet
+ send_packet(packet)
- # Send packet
- send_packet(packet)
+ # Receive response
+ receive_connack
- # Receive response
- receive_connack
-
- # Start packet reading thread
- @read_thread = Thread.new(Thread.current) do |parent|
- Thread.current[:parent] = parent
- while connected? do
- receive_packet
+ # Start packet reading thread
+ @read_thread = Thread.new(Thread.current) do |parent|
+ Thread.current[:parent] = parent
+ receive_packet while connected?
end
end
- end
- # If a block is given, then yield and disconnect
- if block_given?
+ return unless block_given?
+
+ # If a block is given, then yield and disconnect
begin
yield(self)
ensure
disconnect
end
end
- end
- # Disconnect from the MQTT server.
- # If you don't want to say goodbye to the server, set send_msg to false.
- def disconnect(send_msg=true)
- # Stop reading packets from the socket first
- @read_thread.kill if @read_thread and @read_thread.alive?
- @read_thread = nil
+ # Disconnect from the MQTT server.
+ # If you don't want to say goodbye to the server, set send_msg to false.
+ def disconnect(send_msg = true)
+ # Stop reading packets from the socket first
+ @read_thread.kill if @read_thread && @read_thread.alive?
+ @read_thread = nil
- # Close the socket if it is open
- if connected?
+ return unless connected?
+
+ # Close the socket if it is open
if send_msg
packet = MQTT::Packet::Disconnect.new
send_packet(packet)
end
@socket.close unless @socket.nil?
+ handle_close
@socket = nil
end
- end
- # Checks whether the client is connected to the server.
- def connected?
- (not @socket.nil?) and (not @socket.closed?)
- end
+ # Checks whether the client is connected to the server.
+ def connected?
+ !@socket.nil? && !@socket.closed?
+ end
- # Publish a message on a particular topic to the MQTT server.
- def publish(topic, payload='', retain=false, qos=0)
- raise ArgumentError.new("Topic name cannot be nil") if topic.nil?
- raise ArgumentError.new("Topic name cannot be empty") if topic.empty?
+ # Publish a message on a particular topic to the MQTT server.
+ def publish(topic, payload = '', retain = false, qos = 0)
+ raise ArgumentError, 'Topic name cannot be nil' if topic.nil?
+ raise ArgumentError, 'Topic name cannot be empty' if topic.empty?
- packet = MQTT::Packet::Publish.new(
- :id => next_packet_id,
- :qos => qos,
- :retain => retain,
- :topic => topic,
- :payload => payload
- )
+ packet = MQTT::Packet::Publish.new(
+ :id => next_packet_id,
+ :qos => qos,
+ :retain => retain,
+ :topic => topic,
+ :payload => payload
+ )
- # Send the packet
- res = send_packet(packet)
+ # Send the packet
+ res = send_packet(packet)
- if packet.qos > 0
- Timeout.timeout(@ack_timeout) do
- while connected? do
+ return if qos.zero?
+
+ queue = Queue.new
+
+ wait_for_puback packet.id, queue
+
+ deadline = current_time + @ack_timeout
+
+ loop do
+ response = queue.pop
+ case response
+ when :read_timeout
+ return -1 if current_time > deadline
+ when :close
+ return -1
+ else
@pubacks_semaphore.synchronize do
- return res if @pubacks.delete(packet.id)
+ @pubacks.delete packet.id
end
- # FIXME: make threads communicate with each other, instead of polling
- # (using a pipe and select ?)
- sleep 0.01
+ break
end
end
- return -1
+
+ res
end
- end
- # Send a subscribe message for one or more topics on the MQTT server.
- # The topics parameter should be one of the following:
- # * String: subscribe to one topic with QoS 0
- # * Array: subscribe to multiple topics with QoS 0
- # * Hash: subscribe to multiple topics where the key is the topic and the value is the QoS level
- #
- # For example:
- # client.subscribe( 'a/b' )
- # client.subscribe( 'a/b', 'c/d' )
- # client.subscribe( ['a/b',0], ['c/d',1] )
- # client.subscribe( 'a/b' => 0, 'c/d' => 1 )
- #
- def subscribe(*topics)
- packet = MQTT::Packet::Subscribe.new(
- :id => next_packet_id,
- :topics => topics
- )
- send_packet(packet)
- end
+ # Send a subscribe message for one or more topics on the MQTT server.
+ # The topics parameter should be one of the following:
+ # * String: subscribe to one topic with QoS 0
+ # * Array: subscribe to multiple topics with QoS 0
+ # * Hash: subscribe to multiple topics where the key is the topic and the value is the QoS level
+ #
+ # For example:
+ # client.subscribe( 'a/b' )
+ # client.subscribe( 'a/b', 'c/d' )
+ # client.subscribe( ['a/b',0], ['c/d',1] )
+ # client.subscribe( 'a/b' => 0, 'c/d' => 1 )
+ #
+ def subscribe(*topics)
+ packet = MQTT::Packet::Subscribe.new(
+ :id => next_packet_id,
+ :topics => topics
+ )
+ send_packet(packet)
+ end
- # Return the next message received from the MQTT server.
- # An optional topic can be given to subscribe to.
- #
- # The method either returns the topic and message as an array:
- # topic,message = client.get
- #
- # Or can be used with a block to keep processing messages:
- # client.get('test') do |topic,payload|
- # # Do stuff here
- # end
- #
- def get(topic=nil, options={})
- if block_given?
- get_packet(topic) do |packet|
- yield(packet.topic, packet.payload) unless packet.retain && options[:omit_retained]
+ # Return the next message received from the MQTT server.
+ # An optional topic can be given to subscribe to.
+ #
+ # The method either returns the topic and message as an array:
+ # topic,message = client.get
+ #
+ # Or can be used with a block to keep processing messages:
+ # client.get('test') do |topic,payload|
+ # # Do stuff here
+ # end
+ #
+ def get(topic = nil, options = {})
+ if block_given?
+ get_packet(topic) do |packet|
+ yield(packet.topic, packet.payload) unless packet.retain && options[:omit_retained]
+ end
+ else
+ loop do
+ # Wait for one packet to be available
+ packet = get_packet(topic)
+ return packet.topic, packet.payload unless packet.retain && options[:omit_retained]
+ end
end
- else
- loop do
- # Wait for one packet to be available
- packet = get_packet(topic)
- return packet.topic, packet.payload unless packet.retain && options[:omit_retained]
- end
end
- end
- # Return the next packet object received from the MQTT server.
- # An optional topic can be given to subscribe to.
- #
- # The method either returns a single packet:
- # packet = client.get_packet
- # puts packet.topic
- #
- # Or can be used with a block to keep processing messages:
- # client.get_packet('test') do |packet|
- # # Do stuff here
- # puts packet.topic
- # end
- #
- def get_packet(topic=nil)
- # Subscribe to a topic, if an argument is given
- subscribe(topic) unless topic.nil?
+ # Return the next packet object received from the MQTT server.
+ # An optional topic can be given to subscribe to.
+ #
+ # The method either returns a single packet:
+ # packet = client.get_packet
+ # puts packet.topic
+ #
+ # Or can be used with a block to keep processing messages:
+ # client.get_packet('test') do |packet|
+ # # Do stuff here
+ # puts packet.topic
+ # end
+ #
+ def get_packet(topic = nil)
+ # Subscribe to a topic, if an argument is given
+ subscribe(topic) unless topic.nil?
- if block_given?
- # Loop forever!
- loop do
+ if block_given?
+ # Loop forever!
+ loop do
+ packet = @read_queue.pop
+ yield(packet)
+ puback_packet(packet) if packet.qos > 0
+ end
+ else
+ # Wait for one packet to be available
packet = @read_queue.pop
- yield(packet)
puback_packet(packet) if packet.qos > 0
+ return packet
end
- else
- # Wait for one packet to be available
- packet = @read_queue.pop
- puback_packet(packet) if packet.qos > 0
- return packet
end
- end
- # Returns true if the incoming message queue is empty.
- def queue_empty?
- @read_queue.empty?
- end
+ # Returns true if the incoming message queue is empty.
+ def queue_empty?
+ @read_queue.empty?
+ end
- # Returns the length of the incoming message queue.
- def queue_length
- @read_queue.length
- end
+ # Returns the length of the incoming message queue.
+ def queue_length
+ @read_queue.length
+ end
- # Send a unsubscribe message for one or more topics on the MQTT server
- def unsubscribe(*topics)
- if topics.is_a?(Enumerable) and topics.count == 1
- topics = topics.first
+ # Clear the incoming message queue.
+ def clear_queue
+ @read_queue.clear
end
- packet = MQTT::Packet::Unsubscribe.new(
- :topics => topics,
- :id => next_packet_id
- )
- send_packet(packet)
- end
+ # Send a unsubscribe message for one or more topics on the MQTT server
+ def unsubscribe(*topics)
+ topics = topics.first if topics.is_a?(Enumerable) && topics.count == 1
-private
+ packet = MQTT::Packet::Unsubscribe.new(
+ :topics => topics,
+ :id => next_packet_id
+ )
+ send_packet(packet)
+ end
- # Try to read a packet from the server
- # Also sends keep-alive ping packets.
- def receive_packet
- begin
+ private
+
+ # Try to read a packet from the server
+ # Also sends keep-alive ping packets.
+ def receive_packet
# Poll socket - is there data waiting?
result = IO.select([@socket], [], [], SELECT_TIMEOUT)
+ handle_timeouts
unless result.nil?
# Yes - read in the packet
packet = MQTT::Packet.read(@socket)
handle_packet packet
end
@@ -485,123 +475,149 @@
# Pass exceptions up to parent thread
rescue Exception => exp
unless @socket.nil?
@socket.close
@socket = nil
+ handle_close
end
Thread.current[:parent].raise(exp)
end
- end
- def handle_packet(packet)
- if packet.class == MQTT::Packet::Publish
- # Add to queue
- @read_queue.push(packet)
- elsif packet.class == MQTT::Packet::Pingresp
- @last_ping_response = Time.now
- elsif packet.class == MQTT::Packet::Puback
+ def wait_for_puback(id, queue)
@pubacks_semaphore.synchronize do
- @pubacks[packet.id] = packet
+ @pubacks[id] = queue
end
end
- # Ignore all other packets
- # FIXME: implement responses for QoS 2
- end
- def keep_alive!
- if @keep_alive > 0 && connected?
+ def handle_packet(packet)
+ if packet.class == MQTT::Packet::Publish
+ # Add to queue
+ @read_queue.push(packet)
+ elsif packet.class == MQTT::Packet::Pingresp
+ @last_ping_response = current_time
+ elsif packet.class == MQTT::Packet::Puback
+ @pubacks_semaphore.synchronize do
+ @pubacks[packet.id] << packet
+ end
+ end
+ # Ignore all other packets
+ # FIXME: implement responses for QoS 2
+ end
+
+ def handle_timeouts
+ @pubacks_semaphore.synchronize do
+ @pubacks.each_value { |q| q << :read_timeout }
+ end
+ end
+
+ def handle_close
+ @pubacks_semaphore.synchronize do
+ @pubacks.each_value { |q| q << :close }
+ end
+ end
+
+ if Process.const_defined? :CLOCK_MONOTONIC
+ def current_time
+ Process.clock_gettime(Process::CLOCK_MONOTONIC)
+ end
+ else
+ # Support older Ruby
+ def current_time
+ Time.now.to_f
+ end
+ end
+
+ def keep_alive!
+ return unless @keep_alive > 0 && connected?
+
response_timeout = (@keep_alive * 1.5).ceil
- if Time.now >= @last_ping_request + @keep_alive
+ if current_time >= @last_ping_request + @keep_alive
packet = MQTT::Packet::Pingreq.new
send_packet(packet)
- @last_ping_request = Time.now
- elsif Time.now > @last_ping_response + response_timeout
- raise MQTT::ProtocolException.new(
- "No Ping Response received for #{response_timeout} seconds"
- )
+ @last_ping_request = current_time
+ elsif current_time > @last_ping_response + response_timeout
+ raise MQTT::ProtocolException, "No Ping Response received for #{response_timeout} seconds"
end
end
- end
- def puback_packet(packet)
- send_packet(MQTT::Packet::Puback.new :id => packet.id)
- end
+ def puback_packet(packet)
+ send_packet(MQTT::Packet::Puback.new(:id => packet.id))
+ end
- # Read and check a connection acknowledgement packet
- def receive_connack
- Timeout.timeout(@ack_timeout) do
- packet = MQTT::Packet.read(@socket)
- if packet.class != MQTT::Packet::Connack
- raise MQTT::ProtocolException.new(
- "Response wasn't a connection acknowledgement: #{packet.class}"
- )
+ # Read and check a connection acknowledgement packet
+ def receive_connack
+ Timeout.timeout(@ack_timeout) do
+ packet = MQTT::Packet.read(@socket)
+ if packet.class != MQTT::Packet::Connack
+ raise MQTT::ProtocolException, "Response wasn't a connection acknowledgement: #{packet.class}"
+ end
+
+ # Check the return code
+ if packet.return_code != 0x00
+ # 3.2.2.3 If a server sends a CONNACK packet containing a non-zero
+ # return code it MUST then close the Network Connection
+ @socket.close
+ raise MQTT::ProtocolException, packet.return_msg
+ end
end
+ end
- # Check the return code
- if packet.return_code != 0x00
- # 3.2.2.3 If a server sends a CONNACK packet containing a non-zero
- # return code it MUST then close the Network Connection
- @socket.close
- raise MQTT::ProtocolException.new(packet.return_msg)
+ # Send a packet to server
+ def send_packet(data)
+ # Raise exception if we aren't connected
+ raise MQTT::NotConnectedException unless connected?
+
+ # Only allow one thread to write to socket at a time
+ @write_semaphore.synchronize do
+ @socket.write(data.to_s)
end
end
- end
- # Send a packet to server
- def send_packet(data)
- # Raise exception if we aren't connected
- raise MQTT::NotConnectedException if not connected?
+ def parse_uri(uri)
+ uri = URI.parse(uri) unless uri.is_a?(URI)
+ if uri.scheme == 'mqtt'
+ ssl = false
+ elsif uri.scheme == 'mqtts'
+ ssl = true
+ else
+ raise 'Only the mqtt:// and mqtts:// schemes are supported'
+ end
- # Only allow one thread to write to socket at a time
- @write_semaphore.synchronize do
- @socket.write(data.to_s)
+ {
+ :host => uri.host,
+ :port => uri.port || nil,
+ :username => uri.user ? CGI.unescape(uri.user) : nil,
+ :password => uri.password ? CGI.unescape(uri.password) : nil,
+ :ssl => ssl
+ }
end
- end
- private
- def parse_uri(uri)
- uri = URI.parse(uri) unless uri.is_a?(URI)
- if uri.scheme == 'mqtt'
- ssl = false
- elsif uri.scheme == 'mqtts'
- ssl = true
- else
- raise "Only the mqtt:// and mqtts:// schemes are supported"
+ def next_packet_id
+ @last_packet_id = (@last_packet_id || 0).next
+ @last_packet_id = 1 if @last_packet_id > 0xffff
+ @last_packet_id
end
- {
- :host => uri.host,
- :port => uri.port || nil,
- :username => uri.user ? URI.unescape(uri.user) : nil,
- :password => uri.password ? URI.unescape(uri.password) : nil,
- :ssl => ssl
- }
- end
+ # ---- Deprecated attributes and methods ---- #
+ public
- def next_packet_id
- @last_packet_id = ( @last_packet_id || 0 ).next
- end
+ # @deprecated Please use {#host} instead
+ def remote_host
+ host
+ end
- # ---- Deprecated attributes and methods ---- #
- public
+ # @deprecated Please use {#host=} instead
+ def remote_host=(args)
+ self.host = args
+ end
- # @deprecated Please use {#host} instead
- def remote_host
- host
- end
+ # @deprecated Please use {#port} instead
+ def remote_port
+ port
+ end
- # @deprecated Please use {#host=} instead
- def remote_host=(args)
- self.host = args
+ # @deprecated Please use {#port=} instead
+ def remote_port=(args)
+ self.port = args
+ end
end
-
- # @deprecated Please use {#port} instead
- def remote_port
- port
- end
-
- # @deprecated Please use {#port=} instead
- def remote_port=(args)
- self.port = args
- end
-
end