lib/mqtt/client.rb in mqtt-0.0.2 vs lib/mqtt/client.rb in mqtt-0.0.3

- old
+ new

@@ -2,51 +2,52 @@ require 'mqtt' require 'mqtt/packet' require 'thread' require 'socket' +require 'timeout' module MQTT # Client class for talking to an MQTT broker class Client attr_reader :remote_host # Hostname of the remote broker attr_reader :remote_port # Port number of the remote broker - attr_accessor :keep_alive # Time between pings to remote broker + attr_accessor :keep_alive # Time (in seconds) between pings to remote broker attr_accessor :clean_start # Set the 'Clean Start' flag when connecting? + attr_accessor :client_id # Client Identifier + attr_accessor :ack_timeout # Number of seconds to wait for acknowledgement packets # Timeout between select polls (in seconds) SELECT_TIMEOUT = 0.5 # Create a new MQTT Client instance def initialize(remote_host='localhost', remote_port=1883) @remote_host = remote_host @remote_port = remote_port - @message_id = 0 @keep_alive = 10 @clean_start = true + @client_id = random_letters(16) + @message_id = 0 + @ack_timeout = 5 @last_pingreq = Time.now @last_pingresp = Time.now @socket = nil @read_queue = Queue.new @read_thread = nil @write_semaphore = Mutex.new end # Connect to the MQTT broker # If a block is given, then yield to that block and then disconnect again. - def connect(clientid) + def connect(clientid=nil) + @client_id = clientid unless clientid.nil? + if not connected? # Create network socket @socket = TCPSocket.new(@remote_host,@remote_port) - - # Start packet reading thread - @read_thread = Thread.new(Thread.current) do |parent| - Thread.current[:parent] = parent - loop { receive_packet } - end # Protocol name and version packet = MQTT::Packet.new(:type => :connect) packet.add_string('MQIsdp') packet.add_bytes(0x03) @@ -59,14 +60,23 @@ # Keep Alive timer: 10 seconds packet.add_short(@keep_alive) # Add the client identifier - packet.add_string(clientid) + packet.add_string(@client_id) # Send packet send_packet(packet) + + # Receive response + receive_connack + + # Start packet reading thread + @read_thread = Thread.new(Thread.current) do |parent| + Thread.current[:parent] = parent + loop { receive_packet } + end end # If a block is given, then yield and disconnect if block_given? yield(self) @@ -80,11 +90,11 @@ if connected? if send_msg packet = MQTT::Packet.new(:type => :disconnect) send_packet(packet) end - @read_thread.kill unless @read_thread.nil? + @read_thread.kill if @read_thread and @read_thread.alive? @read_thread = nil @socket.close unless @socket.nil? @socket = nil end end @@ -220,19 +230,62 @@ end Thread.current[:parent].raise(exp) end end + # Read and check a connection acknowledgement packet + def receive_connack + Timeout.timeout(@ack_timeout) do + packet = MQTT::Packet.read(@socket) + if packet.type != :connack + raise MQTT::ProtocolException.new("Response wan't a connection acknowledgement: #{packet.type}") + end + + # Read in the return code + byte1, return_code = packet.shift_bytes(2) + if return_code == 0x00 + # Success + nil + elsif return_code == 0x01 + raise MQTT::ProtocolException.new("Connection refused: unacceptable protocol version") + elsif return_code == 0x02 + raise MQTT::ProtocolException.new("Connection refused: client identifier rejected") + elsif return_code == 0x03 + raise MQTT::ProtocolException.new("Connection refused: broker unavailable") + else + raise MQTT::ProtocolException.new("Connection refused: #{return_code}") + end + end + end + # Send a packet to broker def send_packet(data) # Throw exception if we aren't connected raise MQTT::NotConnectedException if not connected? # Only allow one thread to write to socket at a time @write_semaphore.synchronize do @socket.write(data) end end + + # Generate a string of random letters (0-9,a-z) + def random_letters(count) + str = '' + count.times do + num = rand(36) + if (num<10) + # Number + num += 48 + else + # Letter + num += 87 + end + str += num.chr + end + return str + end + end end