lib/mqtt/client.rb in mqtt-0.3.1 vs lib/mqtt/client.rb in mqtt-0.4.0

- old
+ new

@@ -51,11 +51,11 @@ attr_accessor :will_qos # If the Will message should be retain by the server after it is sent attr_accessor :will_retain - #Last ping response time + # Last ping response time attr_reader :last_ping_response # Timeout between select polls (in seconds) SELECT_TIMEOUT = 0.5 @@ -172,34 +172,47 @@ if @port.nil? @port = @ssl ? MQTT::DEFAULT_SSL_PORT : MQTT::DEFAULT_PORT end # Initialise private instance variables - @packet_id = 0 - @last_pingreq = Time.now + @last_ping_request = Time.now @last_ping_response = Time.now @socket = nil @read_queue = Queue.new + @pubacks = {} @read_thread = nil @write_semaphore = Mutex.new + @pubacks_semaphore = Mutex.new end # Get the OpenSSL context, that is used if SSL/TLS is enabled def ssl_context @ssl_context ||= OpenSSL::SSL::SSLContext.new end # Set a path to a file containing a PEM-format client certificate def cert_file=(path) - ssl_context.cert = OpenSSL::X509::Certificate.new(File.open(path)) + self.cert = File.read(path) end + # PEM-format client certificate + def cert=(cert) + ssl_context.cert = OpenSSL::X509::Certificate.new(cert) + end + # Set a path to a file containing a PEM-format client private key - def key_file=(path) - ssl_context.key = OpenSSL::PKey::RSA.new(File.open(path)) + def key_file=(*args) + path, passphrase = args.flatten + ssl_context.key = OpenSSL::PKey::RSA.new(File.open(path), passphrase) end + # Set to a PEM-format client private key + def key=(*args) + cert, passphrase = args.flatten + ssl_context.key = OpenSSL::PKey::RSA.new(cert, passphrase) + end + # Set a path to a file containing a PEM-format CA certificate and enable peer verification def ca_file=(path) ssl_context.ca_file = path unless path.nil? ssl_context.verify_mode = OpenSSL::SSL::VERIFY_PEER @@ -285,12 +298,15 @@ end end # If a block is given, then yield and disconnect if block_given? - yield(self) - disconnect + begin + yield(self) + ensure + disconnect + end end end # Disconnect from the MQTT server. # If you don't want to say goodbye to the server, set send_msg to false. @@ -313,52 +329,56 @@ # Checks whether the client is connected to the server. def connected? (not @socket.nil?) and (not @socket.closed?) end - # Send a MQTT ping message to indicate that the MQTT client is alive. - # - # Note that you will not normally need to call this method - # as it is called automatically - def ping - packet = MQTT::Packet::Pingreq.new - send_packet(packet) - @last_pingreq = Time.now - end - # Publish a message on a particular topic to the MQTT server. def publish(topic, payload='', retain=false, qos=0) raise ArgumentError.new("Topic name cannot be nil") if topic.nil? raise ArgumentError.new("Topic name cannot be empty") if topic.empty? packet = MQTT::Packet::Publish.new( - :id => @packet_id.next, + :id => next_packet_id, :qos => qos, :retain => retain, :topic => topic, :payload => payload ) # Send the packet - send_packet(packet) + res = send_packet(packet) + + if packet.qos > 0 + Timeout.timeout(@ack_timeout) do + while connected? do + @pubacks_semaphore.synchronize do + return res if @pubacks.delete(packet.id) + end + # FIXME: make threads communicate with each other, instead of polling + # (using a pipe and select ?) + sleep 0.01 + end + end + return -1 + end end # Send a subscribe message for one or more topics on the MQTT server. # The topics parameter should be one of the following: - # * String: subscribe to one topic with QOS 0 - # * Array: subscribe to multiple topics with QOS 0 - # * Hash: subscribe to multiple topics where the key is the topic and the value is the QOS level + # * String: subscribe to one topic with QoS 0 + # * Array: subscribe to multiple topics with QoS 0 + # * Hash: subscribe to multiple topics where the key is the topic and the value is the QoS level # # For example: # client.subscribe( 'a/b' ) # client.subscribe( 'a/b', 'c/d' ) # client.subscribe( ['a/b',0], ['c/d',1] ) # client.subscribe( 'a/b' => 0, 'c/d' => 1 ) # def subscribe(*topics) packet = MQTT::Packet::Subscribe.new( - :id => @packet_id.next, + :id => next_packet_id, :topics => topics ) send_packet(packet) end @@ -372,22 +392,17 @@ # client.get('test') do |topic,payload| # # Do stuff here # end # def get(topic=nil) - # Subscribe to a topic, if an argument is given - subscribe(topic) unless topic.nil? - if block_given? - # Loop forever! - loop do - packet = @read_queue.pop + get_packet(topic) do |packet| yield(packet.topic, packet.payload) end else # Wait for one packet to be available - packet = @read_queue.pop + packet = get_packet(topic) return packet.topic, packet.payload end end # Return the next packet object received from the MQTT server. @@ -408,15 +423,19 @@ subscribe(topic) unless topic.nil? if block_given? # Loop forever! loop do - yield(@read_queue.pop) + packet = @read_queue.pop + yield(packet) + puback_packet(packet) if packet.qos > 0 end else # Wait for one packet to be available - return @read_queue.pop + packet = @read_queue.pop + puback_packet(packet) if packet.qos > 0 + return packet end end # Returns true if the incoming message queue is empty. def queue_empty? @@ -434,11 +453,11 @@ topics = topics.first end packet = MQTT::Packet::Unsubscribe.new( :topics => topics, - :id => @packet_id.next + :id => next_packet_id ) send_packet(packet) end private @@ -450,39 +469,57 @@ # Poll socket - is there data waiting? result = IO.select([@socket], [], [], SELECT_TIMEOUT) unless result.nil? # Yes - read in the packet packet = MQTT::Packet.read(@socket) - if packet.class == MQTT::Packet::Publish - # Add to queue - @read_queue.push(packet) - elsif packet.class == MQTT::Packet::Pingresp - @last_ping_response = Time.now - else - # Ignore all other packets - nil - # FIXME: implement responses for QOS 1 and 2 - end + handle_packet packet end - - # Time to send a keep-alive ping request? - if @keep_alive > 0 and Time.now > @last_pingreq + @keep_alive - ping - end - - # FIXME: check we received a ping response recently? - + keep_alive! # Pass exceptions up to parent thread rescue Exception => exp unless @socket.nil? @socket.close @socket = nil end Thread.current[:parent].raise(exp) end end + def handle_packet(packet) + if packet.class == MQTT::Packet::Publish + # Add to queue + @read_queue.push(packet) + elsif packet.class == MQTT::Packet::Pingresp + @last_ping_response = Time.now + elsif packet.class == MQTT::Packet::Puback + @pubacks_semaphore.synchronize do + @pubacks[packet.id] = packet + end + end + # Ignore all other packets + # FIXME: implement responses for QoS 2 + end + + def keep_alive! + if @keep_alive > 0 + response_timeout = (@keep_alive * 1.5).ceil + if Time.now >= @last_ping_request + @keep_alive + packet = MQTT::Packet::Pingreq.new + send_packet(packet) + @last_ping_request = Time.now + elsif Time.now > @last_ping_response + response_timeout + raise MQTT::ProtocolException.new( + "No Ping Response received for #{response_timeout} seconds" + ) + end + end + end + + def puback_packet(packet) + send_packet(MQTT::Packet::Puback.new :id => packet.id) + end + # Read and check a connection acknowledgement packet def receive_connack Timeout.timeout(@ack_timeout) do packet = MQTT::Packet.read(@socket) if packet.class != MQTT::Packet::Connack @@ -498,11 +535,11 @@ end end # Send a packet to server def send_packet(data) - # Throw exception if we aren't connected + # Raise exception if we aren't connected raise MQTT::NotConnectedException if not connected? # Only allow one thread to write to socket at a time @write_semaphore.synchronize do @socket.write(data.to_s) @@ -527,9 +564,12 @@ :password => uri.password, :ssl => ssl } end + def next_packet_id + @last_packet_id = ( @last_packet_id || 0 ).next + end # ---- Deprecated attributes and methods ---- # public # @deprecated Please use {#host} instead