lib/mqtt/client.rb in mqtt-0.0.4 vs lib/mqtt/client.rb in mqtt-0.0.5

- old
+ new

@@ -1,245 +1,246 @@ -#!/usr/bin/env ruby +# Client class for talking to an MQTT broker +class MQTT::Client + attr_reader :remote_host # Hostname of the remote broker + attr_reader :remote_port # Port number of the remote broker + attr_accessor :keep_alive # Time (in seconds) between pings to remote broker + attr_accessor :clean_session # Set the 'Clean Session' flag when connecting? + attr_accessor :client_id # Client Identifier + attr_accessor :ack_timeout # Number of seconds to wait for acknowledgement packets + attr_accessor :username # Username to authenticate to the broker with + attr_accessor :password # Password to authenticate to the broker with -require 'mqtt' -require 'mqtt/packet' -require 'thread' -require 'socket' -require 'timeout' + # OLD deprecated clean_start + alias :clean_start :clean_session + alias :clean_start= :clean_session= + # Timeout between select polls (in seconds) + SELECT_TIMEOUT = 0.5 -module MQTT + # Create a new MQTT Client instance + def initialize(remote_host=MQTT::DEFAULT_HOST, remote_port=MQTT::DEFAULT_PORT) + @remote_host = remote_host + @remote_port = remote_port + @keep_alive = 10 + @clean_session = true + @client_id = nil + @message_id = 0 + @ack_timeout = 5 + @last_pingreq = Time.now + @last_pingresp = Time.now + @socket = nil + @read_queue = Queue.new + @read_thread = nil + @write_semaphore = Mutex.new + @username = nil + @password = nil + end - # Client class for talking to an MQTT broker - class Client - attr_reader :remote_host # Hostname of the remote broker - attr_reader :remote_port # Port number of the remote broker - attr_accessor :keep_alive # Time (in seconds) between pings to remote broker - attr_accessor :clean_start # Set the 'Clean Start' flag when connecting? - attr_accessor :client_id # Client Identifier - attr_accessor :ack_timeout # Number of seconds to wait for acknowledgement packets - - # Timeout between select polls (in seconds) - SELECT_TIMEOUT = 0.5 - - # Create a new MQTT Client instance - def initialize(remote_host='localhost', remote_port=1883) - @remote_host = remote_host - @remote_port = remote_port - @keep_alive = 10 - @clean_start = true + # Connect to the MQTT broker + # If a block is given, then yield to that block and then disconnect again. + def connect(clientid=nil) + if !clientid.nil? + @client_id = clientid + elsif clientid.nil? @client_id = random_letters(16) - @message_id = 0 - @ack_timeout = 5 - @last_pingreq = Time.now - @last_pingresp = Time.now - @socket = nil - @read_queue = Queue.new - @read_thread = nil - @write_semaphore = Mutex.new + @clean_session = true end - - # Connect to the MQTT broker - # If a block is given, then yield to that block and then disconnect again. - def connect(clientid=nil) - @client_id = clientid unless clientid.nil? - - if not connected? - # Create network socket - @socket = TCPSocket.new(@remote_host,@remote_port) - # Protocol name and version - packet = MQTT::Packet::Connect.new( - :clean_start => @clean_start, - :keep_alive => @keep_alive, - :client_id => @client_id - ) + if not connected? + # Create network socket + @socket = TCPSocket.new(@remote_host,@remote_port) - # Send packet - send_packet(packet) + # Protocol name and version + packet = MQTT::Packet::Connect.new( + :clean_session => @clean_session, + :keep_alive => @keep_alive, + :client_id => @client_id, + :username => @username, + :password => @password + ) - # Receive response - receive_connack - - # Start packet reading thread - @read_thread = Thread.new(Thread.current) do |parent| - Thread.current[:parent] = parent - loop { receive_packet } - end - end - - # If a block is given, then yield and disconnect - if block_given? - yield(self) - disconnect - end - end - - # Disconnect from the MQTT broker. - # If you don't want to say goodbye to the broker, set send_msg to false. - def disconnect(send_msg=true) - if connected? - if send_msg - packet = MQTT::Packet::Disconnect.new - send_packet(packet) - end - @read_thread.kill if @read_thread and @read_thread.alive? - @read_thread = nil - @socket.close unless @socket.nil? - @socket = nil - end - end - - # Checks whether the client is connected to the broker. - def connected? - not @socket.nil? - end - - # Send a MQTT ping message to indicate that the MQTT client is alive. - def ping - packet = MQTT::Packet::Pingreq.new + # Send packet send_packet(packet) - @last_pingreq = Time.now - end - # Publish a message on a particular topic to the MQTT broker. - def publish(topic, payload, retain=false, qos=0) - packet = MQTT::Packet::Publish.new( - :qos => qos, - :retain => retain, - :topic => topic, - :payload => payload, - :message_id => @message_id.next - ) + # Receive response + receive_connack - # Send the packet - send_packet(packet) + # Start packet reading thread + @read_thread = Thread.new(Thread.current) do |parent| + Thread.current[:parent] = parent + loop { receive_packet } + end end - - # Send a subscribe message for one or more topics on the MQTT broker. - # The topics parameter should be one of the following: - # * String: subscribe to one topic with QOS 0 - # * Array: subscribe to multiple topics with QOS 0 - # * Hash: subscribe to multiple topics where the key is the topic and the value is the QOS level - # - # For example: - # client.subscribe( 'a/b' ) - # client.subscribe( 'a/b', 'c/d' ) - # client.subscribe( ['a/b',0], ['c/d',1] ) - # client.subscribe( 'a/b' => 0, 'c/d' => 1 ) - # - def subscribe(*topics) - packet = MQTT::Packet::Subscribe.new( - :topics => topics, - :message_id => @message_id.next - ) - send_packet(packet) + + # If a block is given, then yield and disconnect + if block_given? + yield(self) + disconnect end - - # Return the next message recieved from the MQTT broker. - # This method blocks until a message is available. - # - # The method returns the topic and message as an array: - # topic,message = client.get - # - def get - # Wait for a packet to be available - packet = @read_queue.pop - topic = packet.topic - payload = packet.payload - return topic,payload - end - - # Send a unsubscribe message for one or more topics on the MQTT broker - def unsubscribe(*topics) - packet = MQTT::Packet::Unsubscribe.new( - :topics => topics, - :message_id => @message_id.next - ) - send_packet(packet) - end - - private - - # Try to read a packet from the broker - # Also sends keep-alive ping packets. - def receive_packet - begin - # Poll socket - is there data waiting? - result = IO.select([@socket], nil, nil, SELECT_TIMEOUT) - unless result.nil? - # Yes - read in the packet - packet = MQTT::Packet.read(@socket) - if packet.class == MQTT::Packet::Publish - # Add to queue - @read_queue.push(packet) - else - # Ignore all other packets - nil - # FIXME: implement responses for QOS 1 and 2 - end - end - - # Time to send a keep-alive ping request? - if Time.now > @last_pingreq + @keep_alive - ping - end - - # FIXME: check we received a ping response recently? + end - # Pass exceptions up to parent thread - rescue Exception => exp - unless @socket.nil? - @socket.close - @socket = nil - end - Thread.current[:parent].raise(exp) + # Disconnect from the MQTT broker. + # If you don't want to say goodbye to the broker, set send_msg to false. + def disconnect(send_msg=true) + if connected? + if send_msg + packet = MQTT::Packet::Disconnect.new + send_packet(packet) end + @read_thread.kill if @read_thread and @read_thread.alive? + @read_thread = nil + @socket.close unless @socket.nil? + @socket = nil end - - # Read and check a connection acknowledgement packet - def receive_connack - Timeout.timeout(@ack_timeout) do + end + + # Checks whether the client is connected to the broker. + def connected? + not @socket.nil? + end + + # Send a MQTT ping message to indicate that the MQTT client is alive. + def ping + packet = MQTT::Packet::Pingreq.new + send_packet(packet) + @last_pingreq = Time.now + end + + # Publish a message on a particular topic to the MQTT broker. + def publish(topic, payload, retain=false, qos=0) + packet = MQTT::Packet::Publish.new( + :qos => qos, + :retain => retain, + :topic => topic, + :payload => payload, + :message_id => @message_id.next + ) + + # Send the packet + send_packet(packet) + end + + # Send a subscribe message for one or more topics on the MQTT broker. + # The topics parameter should be one of the following: + # * String: subscribe to one topic with QOS 0 + # * Array: subscribe to multiple topics with QOS 0 + # * Hash: subscribe to multiple topics where the key is the topic and the value is the QOS level + # + # For example: + # client.subscribe( 'a/b' ) + # client.subscribe( 'a/b', 'c/d' ) + # client.subscribe( ['a/b',0], ['c/d',1] ) + # client.subscribe( 'a/b' => 0, 'c/d' => 1 ) + # + def subscribe(*topics) + packet = MQTT::Packet::Subscribe.new( + :topics => topics, + :message_id => @message_id.next + ) + send_packet(packet) + end + + # Return the next message recieved from the MQTT broker. + # This method blocks until a message is available. + # + # The method returns the topic and message as an array: + # topic,message = client.get + # + def get + # Wait for a packet to be available + packet = @read_queue.pop + topic = packet.topic + payload = packet.payload + return topic,payload + end + + # Send a unsubscribe message for one or more topics on the MQTT broker + def unsubscribe(*topics) + packet = MQTT::Packet::Unsubscribe.new( + :topics => topics, + :message_id => @message_id.next + ) + send_packet(packet) + end + +private + + # Try to read a packet from the broker + # Also sends keep-alive ping packets. + def receive_packet + begin + # Poll socket - is there data waiting? + result = IO.select([@socket], nil, nil, SELECT_TIMEOUT) + unless result.nil? + # Yes - read in the packet packet = MQTT::Packet.read(@socket) - if packet.class != MQTT::Packet::Connack - raise MQTT::ProtocolException.new("Response wan't a connection acknowledgement: #{packet.class}") + if packet.class == MQTT::Packet::Publish + # Add to queue + @read_queue.push(packet) + else + # Ignore all other packets + nil + # FIXME: implement responses for QOS 1 and 2 end - - # Check the return code - if packet.return_code != 0x00 - raise MQTT::ProtocolException.new(packet.return_msg) - end end + + # Time to send a keep-alive ping request? + if Time.now > @last_pingreq + @keep_alive + ping + end + + # FIXME: check we received a ping response recently? + + # Pass exceptions up to parent thread + rescue Exception => exp + unless @socket.nil? + @socket.close + @socket = nil + end + Thread.current[:parent].raise(exp) end - - # Send a packet to broker - def send_packet(data) - # Throw exception if we aren't connected - raise MQTT::NotConnectedException if not connected? - - # Only allow one thread to write to socket at a time - @write_semaphore.synchronize do - @socket.write(data) + end + + # Read and check a connection acknowledgement packet + def receive_connack + Timeout.timeout(@ack_timeout) do + packet = MQTT::Packet.read(@socket) + if packet.class != MQTT::Packet::Connack + raise MQTT::ProtocolException.new("Response wan't a connection acknowledgement: #{packet.class}") end + + # Check the return code + if packet.return_code != 0x00 + raise MQTT::ProtocolException.new(packet.return_msg) + end end + end - # Generate a string of random letters (0-9,a-z) - def random_letters(count) - str = '' - count.times do - num = rand(36) - if (num<10) - # Number - num += 48 - else - # Letter - num += 87 - end - str += num.chr + # Send a packet to broker + def send_packet(data) + # Throw exception if we aren't connected + raise MQTT::NotConnectedException if not connected? + + # Only allow one thread to write to socket at a time + @write_semaphore.synchronize do + @socket.write(data) + end + end + + # Generate a string of random letters (0-9,a-z) + def random_letters(count) + str = '' + count.times do + num = rand(36) + if (num<10) + # Number + num += 48 + else + # Letter + num += 87 end - return str + str += num.chr end - - + return str end end