lib/mqtt/client.rb in mqtt-0.0.2 vs lib/mqtt/client.rb in mqtt-0.0.3
- old
+ new
@@ -2,51 +2,52 @@
require 'mqtt'
require 'mqtt/packet'
require 'thread'
require 'socket'
+require 'timeout'
module MQTT
# Client class for talking to an MQTT broker
class Client
attr_reader :remote_host # Hostname of the remote broker
attr_reader :remote_port # Port number of the remote broker
- attr_accessor :keep_alive # Time between pings to remote broker
+ attr_accessor :keep_alive # Time (in seconds) between pings to remote broker
attr_accessor :clean_start # Set the 'Clean Start' flag when connecting?
+ attr_accessor :client_id # Client Identifier
+ attr_accessor :ack_timeout # Number of seconds to wait for acknowledgement packets
# Timeout between select polls (in seconds)
SELECT_TIMEOUT = 0.5
# Create a new MQTT Client instance
def initialize(remote_host='localhost', remote_port=1883)
@remote_host = remote_host
@remote_port = remote_port
- @message_id = 0
@keep_alive = 10
@clean_start = true
+ @client_id = random_letters(16)
+ @message_id = 0
+ @ack_timeout = 5
@last_pingreq = Time.now
@last_pingresp = Time.now
@socket = nil
@read_queue = Queue.new
@read_thread = nil
@write_semaphore = Mutex.new
end
# Connect to the MQTT broker
# If a block is given, then yield to that block and then disconnect again.
- def connect(clientid)
+ def connect(clientid=nil)
+ @client_id = clientid unless clientid.nil?
+
if not connected?
# Create network socket
@socket = TCPSocket.new(@remote_host,@remote_port)
-
- # Start packet reading thread
- @read_thread = Thread.new(Thread.current) do |parent|
- Thread.current[:parent] = parent
- loop { receive_packet }
- end
# Protocol name and version
packet = MQTT::Packet.new(:type => :connect)
packet.add_string('MQIsdp')
packet.add_bytes(0x03)
@@ -59,14 +60,23 @@
# Keep Alive timer: 10 seconds
packet.add_short(@keep_alive)
# Add the client identifier
- packet.add_string(clientid)
+ packet.add_string(@client_id)
# Send packet
send_packet(packet)
+
+ # Receive response
+ receive_connack
+
+ # Start packet reading thread
+ @read_thread = Thread.new(Thread.current) do |parent|
+ Thread.current[:parent] = parent
+ loop { receive_packet }
+ end
end
# If a block is given, then yield and disconnect
if block_given?
yield(self)
@@ -80,11 +90,11 @@
if connected?
if send_msg
packet = MQTT::Packet.new(:type => :disconnect)
send_packet(packet)
end
- @read_thread.kill unless @read_thread.nil?
+ @read_thread.kill if @read_thread and @read_thread.alive?
@read_thread = nil
@socket.close unless @socket.nil?
@socket = nil
end
end
@@ -220,19 +230,62 @@
end
Thread.current[:parent].raise(exp)
end
end
+ # Read and check a connection acknowledgement packet
+ def receive_connack
+ Timeout.timeout(@ack_timeout) do
+ packet = MQTT::Packet.read(@socket)
+ if packet.type != :connack
+ raise MQTT::ProtocolException.new("Response wan't a connection acknowledgement: #{packet.type}")
+ end
+
+ # Read in the return code
+ byte1, return_code = packet.shift_bytes(2)
+ if return_code == 0x00
+ # Success
+ nil
+ elsif return_code == 0x01
+ raise MQTT::ProtocolException.new("Connection refused: unacceptable protocol version")
+ elsif return_code == 0x02
+ raise MQTT::ProtocolException.new("Connection refused: client identifier rejected")
+ elsif return_code == 0x03
+ raise MQTT::ProtocolException.new("Connection refused: broker unavailable")
+ else
+ raise MQTT::ProtocolException.new("Connection refused: #{return_code}")
+ end
+ end
+ end
+
# Send a packet to broker
def send_packet(data)
# Throw exception if we aren't connected
raise MQTT::NotConnectedException if not connected?
# Only allow one thread to write to socket at a time
@write_semaphore.synchronize do
@socket.write(data)
end
end
+
+ # Generate a string of random letters (0-9,a-z)
+ def random_letters(count)
+ str = ''
+ count.times do
+ num = rand(36)
+ if (num<10)
+ # Number
+ num += 48
+ else
+ # Letter
+ num += 87
+ end
+ str += num.chr
+ end
+ return str
+ end
+
end
end