lib/mqtt/client.rb in mqtt-0.3.1 vs lib/mqtt/client.rb in mqtt-0.4.0
- old
+ new
@@ -51,11 +51,11 @@
attr_accessor :will_qos
# If the Will message should be retain by the server after it is sent
attr_accessor :will_retain
- #Last ping response time
+ # Last ping response time
attr_reader :last_ping_response
# Timeout between select polls (in seconds)
SELECT_TIMEOUT = 0.5
@@ -172,34 +172,47 @@
if @port.nil?
@port = @ssl ? MQTT::DEFAULT_SSL_PORT : MQTT::DEFAULT_PORT
end
# Initialise private instance variables
- @packet_id = 0
- @last_pingreq = Time.now
+ @last_ping_request = Time.now
@last_ping_response = Time.now
@socket = nil
@read_queue = Queue.new
+ @pubacks = {}
@read_thread = nil
@write_semaphore = Mutex.new
+ @pubacks_semaphore = Mutex.new
end
# Get the OpenSSL context, that is used if SSL/TLS is enabled
def ssl_context
@ssl_context ||= OpenSSL::SSL::SSLContext.new
end
# Set a path to a file containing a PEM-format client certificate
def cert_file=(path)
- ssl_context.cert = OpenSSL::X509::Certificate.new(File.open(path))
+ self.cert = File.read(path)
end
+ # PEM-format client certificate
+ def cert=(cert)
+ ssl_context.cert = OpenSSL::X509::Certificate.new(cert)
+ end
+
# Set a path to a file containing a PEM-format client private key
- def key_file=(path)
- ssl_context.key = OpenSSL::PKey::RSA.new(File.open(path))
+ def key_file=(*args)
+ path, passphrase = args.flatten
+ ssl_context.key = OpenSSL::PKey::RSA.new(File.open(path), passphrase)
end
+ # Set to a PEM-format client private key
+ def key=(*args)
+ cert, passphrase = args.flatten
+ ssl_context.key = OpenSSL::PKey::RSA.new(cert, passphrase)
+ end
+
# Set a path to a file containing a PEM-format CA certificate and enable peer verification
def ca_file=(path)
ssl_context.ca_file = path
unless path.nil?
ssl_context.verify_mode = OpenSSL::SSL::VERIFY_PEER
@@ -285,12 +298,15 @@
end
end
# If a block is given, then yield and disconnect
if block_given?
- yield(self)
- disconnect
+ begin
+ yield(self)
+ ensure
+ disconnect
+ end
end
end
# Disconnect from the MQTT server.
# If you don't want to say goodbye to the server, set send_msg to false.
@@ -313,52 +329,56 @@
# Checks whether the client is connected to the server.
def connected?
(not @socket.nil?) and (not @socket.closed?)
end
- # Send a MQTT ping message to indicate that the MQTT client is alive.
- #
- # Note that you will not normally need to call this method
- # as it is called automatically
- def ping
- packet = MQTT::Packet::Pingreq.new
- send_packet(packet)
- @last_pingreq = Time.now
- end
-
# Publish a message on a particular topic to the MQTT server.
def publish(topic, payload='', retain=false, qos=0)
raise ArgumentError.new("Topic name cannot be nil") if topic.nil?
raise ArgumentError.new("Topic name cannot be empty") if topic.empty?
packet = MQTT::Packet::Publish.new(
- :id => @packet_id.next,
+ :id => next_packet_id,
:qos => qos,
:retain => retain,
:topic => topic,
:payload => payload
)
# Send the packet
- send_packet(packet)
+ res = send_packet(packet)
+
+ if packet.qos > 0
+ Timeout.timeout(@ack_timeout) do
+ while connected? do
+ @pubacks_semaphore.synchronize do
+ return res if @pubacks.delete(packet.id)
+ end
+ # FIXME: make threads communicate with each other, instead of polling
+ # (using a pipe and select ?)
+ sleep 0.01
+ end
+ end
+ return -1
+ end
end
# Send a subscribe message for one or more topics on the MQTT server.
# The topics parameter should be one of the following:
- # * String: subscribe to one topic with QOS 0
- # * Array: subscribe to multiple topics with QOS 0
- # * Hash: subscribe to multiple topics where the key is the topic and the value is the QOS level
+ # * String: subscribe to one topic with QoS 0
+ # * Array: subscribe to multiple topics with QoS 0
+ # * Hash: subscribe to multiple topics where the key is the topic and the value is the QoS level
#
# For example:
# client.subscribe( 'a/b' )
# client.subscribe( 'a/b', 'c/d' )
# client.subscribe( ['a/b',0], ['c/d',1] )
# client.subscribe( 'a/b' => 0, 'c/d' => 1 )
#
def subscribe(*topics)
packet = MQTT::Packet::Subscribe.new(
- :id => @packet_id.next,
+ :id => next_packet_id,
:topics => topics
)
send_packet(packet)
end
@@ -372,22 +392,17 @@
# client.get('test') do |topic,payload|
# # Do stuff here
# end
#
def get(topic=nil)
- # Subscribe to a topic, if an argument is given
- subscribe(topic) unless topic.nil?
-
if block_given?
- # Loop forever!
- loop do
- packet = @read_queue.pop
+ get_packet(topic) do |packet|
yield(packet.topic, packet.payload)
end
else
# Wait for one packet to be available
- packet = @read_queue.pop
+ packet = get_packet(topic)
return packet.topic, packet.payload
end
end
# Return the next packet object received from the MQTT server.
@@ -408,15 +423,19 @@
subscribe(topic) unless topic.nil?
if block_given?
# Loop forever!
loop do
- yield(@read_queue.pop)
+ packet = @read_queue.pop
+ yield(packet)
+ puback_packet(packet) if packet.qos > 0
end
else
# Wait for one packet to be available
- return @read_queue.pop
+ packet = @read_queue.pop
+ puback_packet(packet) if packet.qos > 0
+ return packet
end
end
# Returns true if the incoming message queue is empty.
def queue_empty?
@@ -434,11 +453,11 @@
topics = topics.first
end
packet = MQTT::Packet::Unsubscribe.new(
:topics => topics,
- :id => @packet_id.next
+ :id => next_packet_id
)
send_packet(packet)
end
private
@@ -450,39 +469,57 @@
# Poll socket - is there data waiting?
result = IO.select([@socket], [], [], SELECT_TIMEOUT)
unless result.nil?
# Yes - read in the packet
packet = MQTT::Packet.read(@socket)
- if packet.class == MQTT::Packet::Publish
- # Add to queue
- @read_queue.push(packet)
- elsif packet.class == MQTT::Packet::Pingresp
- @last_ping_response = Time.now
- else
- # Ignore all other packets
- nil
- # FIXME: implement responses for QOS 1 and 2
- end
+ handle_packet packet
end
-
- # Time to send a keep-alive ping request?
- if @keep_alive > 0 and Time.now > @last_pingreq + @keep_alive
- ping
- end
-
- # FIXME: check we received a ping response recently?
-
+ keep_alive!
# Pass exceptions up to parent thread
rescue Exception => exp
unless @socket.nil?
@socket.close
@socket = nil
end
Thread.current[:parent].raise(exp)
end
end
+ def handle_packet(packet)
+ if packet.class == MQTT::Packet::Publish
+ # Add to queue
+ @read_queue.push(packet)
+ elsif packet.class == MQTT::Packet::Pingresp
+ @last_ping_response = Time.now
+ elsif packet.class == MQTT::Packet::Puback
+ @pubacks_semaphore.synchronize do
+ @pubacks[packet.id] = packet
+ end
+ end
+ # Ignore all other packets
+ # FIXME: implement responses for QoS 2
+ end
+
+ def keep_alive!
+ if @keep_alive > 0
+ response_timeout = (@keep_alive * 1.5).ceil
+ if Time.now >= @last_ping_request + @keep_alive
+ packet = MQTT::Packet::Pingreq.new
+ send_packet(packet)
+ @last_ping_request = Time.now
+ elsif Time.now > @last_ping_response + response_timeout
+ raise MQTT::ProtocolException.new(
+ "No Ping Response received for #{response_timeout} seconds"
+ )
+ end
+ end
+ end
+
+ def puback_packet(packet)
+ send_packet(MQTT::Packet::Puback.new :id => packet.id)
+ end
+
# Read and check a connection acknowledgement packet
def receive_connack
Timeout.timeout(@ack_timeout) do
packet = MQTT::Packet.read(@socket)
if packet.class != MQTT::Packet::Connack
@@ -498,11 +535,11 @@
end
end
# Send a packet to server
def send_packet(data)
- # Throw exception if we aren't connected
+ # Raise exception if we aren't connected
raise MQTT::NotConnectedException if not connected?
# Only allow one thread to write to socket at a time
@write_semaphore.synchronize do
@socket.write(data.to_s)
@@ -527,9 +564,12 @@
:password => uri.password,
:ssl => ssl
}
end
+ def next_packet_id
+ @last_packet_id = ( @last_packet_id || 0 ).next
+ end
# ---- Deprecated attributes and methods ---- #
public
# @deprecated Please use {#host} instead