class MQTT::Client
Constants
- ATTR_DEFAULTS
Default attribute values
- SELECT_TIMEOUT
Timeout between select polls (in seconds)
Attributes
Number of seconds to wait for acknowledgement packets (default is 5 seconds)
Set the 'Clean Session' flag when connecting? (default is true)
Client Identifier
Hostname of the remote server
Time (in seconds) between pings to remote server (default is 15 seconds)
Last ping response time
Password to authenticate to the server with
Port number of the remote server
Set to true to enable SSL/TLS encrypted communication
Set to a symbol to use a specific variant of SSL/TLS. Allowed values include:
@example Using TLS 1.0
client = Client.new('mqtt.example.com', :ssl => :TLSv1)
@see OpenSSL::SSL::SSLContext::METHODS
Username to authenticate to the server with
The version number of the MQTT protocol to use (default 3.1.1)
Contents of message that is sent by server when client disconnect
The QoS level of the will message sent by the server
If the Will message should be retain by the server after it is sent
The topic that the Will message is published to
Public Class Methods
Create and connect a new MQTT Client
Accepts the same arguments as creating a new client. If a block is given, then it will be executed before disconnecting again.
Example:
MQTT::Client.connect('myserver.example.com') do |client| # do stuff here end
# File lib/mqttbridge/client.rb, line 92 def self.connect(*args, &block) client = MQTT::Client.new(*args) client.connect(&block) return client end
Generate a random client identifier (using the characters 0-9 and a-z)
# File lib/mqttbridge/client.rb, line 100 def self.generate_client_id(prefix='ruby', length=16) str = prefix.dup length.times do num = rand(36) if (num<10) # Number num += 48 else # Letter num += 87 end str += num.chr end return str end
Create a new MQTT Client instance
Accepts one of the following:
-
a URI that uses the MQTT scheme
-
a hostname and port
-
a Hash containing attributes to be set on the new instance
If no arguments are given then the method will look for a URI in the MQTT_SERVER environment variable.
Examples:
client = MQTT::Client.new client = MQTT::Client.new('mqtt://myserver.example.com') client = MQTT::Client.new('mqtt://user:pass@myserver.example.com') client = MQTT::Client.new('myserver.example.com') client = MQTT::Client.new('myserver.example.com', 18830) client = MQTT::Client.new(:host => 'myserver.example.com') client = MQTT::Client.new(:host => 'myserver.example.com', :keep_alive => 30)
# File lib/mqttbridge/client.rb, line 135 def initialize(*args) if args.last.is_a?(Hash) attr = args.pop else attr = {} end if args.length == 0 if ENV['MQTT_SERVER'] attr.merge!(parse_uri(ENV['MQTT_SERVER'])) end end if args.length >= 1 case args[0] when URI attr.merge!(parse_uri(args[0])) when %r|^mqtts?://| attr.merge!(parse_uri(args[0])) else attr.merge!(:host => args[0]) end end if args.length >= 2 attr.merge!(:port => args[1]) unless args[1].nil? end if args.length >= 3 raise ArgumentError, "Unsupported number of arguments" end # Merge arguments with default values for attributes ATTR_DEFAULTS.merge(attr).each_pair do |k,v| self.send("#{k}=", v) end # Set a default port number if @port.nil? @port = @ssl ? MQTT::DEFAULT_SSL_PORT : MQTT::DEFAULT_PORT end # Initialise private instance variables @last_ping_request = Time.now @last_ping_response = Time.now @socket = nil @read_queue = Queue.new @pubacks = {} @read_thread = nil @write_semaphore = Mutex.new @pubacks_semaphore = Mutex.new end
Public Instance Methods
Set a path to a file containing a PEM-format CA certificate and enable peer verification
# File lib/mqttbridge/client.rb, line 216 def ca_file=(path) ssl_context.ca_file = path unless path.nil? ssl_context.verify_mode = OpenSSL::SSL::VERIFY_PEER end end
PEM-format client certificate
# File lib/mqttbridge/client.rb, line 199 def cert=(cert) ssl_context.cert = OpenSSL::X509::Certificate.new(cert) end
Set a path to a file containing a PEM-format client certificate
# File lib/mqttbridge/client.rb, line 194 def cert_file=(path) self.cert = File.read(path) end
Connect to the MQTT server If a block is given, then yield to that block and then disconnect again.
# File lib/mqttbridge/client.rb, line 236 def connect(clientid=nil) unless clientid.nil? @client_id = clientid end if @client_id.nil? or @client_id.empty? if @clean_session if @version == '3.1.0' # Empty client id is not allowed for version 3.1.0 @client_id = MQTT::Client.generate_client_id end else raise 'Must provide a client_id if clean_session is set to false' end end if @host.nil? raise 'No MQTT server host set when attempting to connect' end if not connected? # Create network socket tcp_socket = TCPSocket.new(@host, @port) if @ssl # Set the protocol version if @ssl.is_a?(Symbol) ssl_context.ssl_version = @ssl end @socket = OpenSSL::SSL::SSLSocket.new(tcp_socket, ssl_context) @socket.sync_close = true @socket.connect else @socket = tcp_socket end # Construct a connect packet packet = MQTT::Packet::Connect.new( :version => @version, :clean_session => @clean_session, :keep_alive => @keep_alive, :client_id => @client_id, :username => @username, :password => @password, :will_topic => @will_topic, :will_payload => @will_payload, :will_qos => @will_qos, :will_retain => @will_retain ) # Send packet send_packet(packet) # Receive response receive_connack # Start packet reading thread @read_thread = Thread.new(Thread.current) do |parent| Thread.current[:parent] = parent while connected? do receive_packet end end end # If a block is given, then yield and disconnect if block_given? begin yield(self) ensure disconnect end end end
Checks whether the client is connected to the server.
# File lib/mqttbridge/client.rb, line 331 def connected? (not @socket.nil?) and (not @socket.closed?) end
Disconnect from the MQTT server. If you don't want to say goodbye to the server, set send_msg to false.
# File lib/mqttbridge/client.rb, line 314 def disconnect(send_msg=true) # Stop reading packets from the socket first @read_thread.kill if @read_thread and @read_thread.alive? @read_thread = nil # Close the socket if it is open if connected? if send_msg packet = MQTT::Packet::Disconnect.new send_packet(packet) end @socket.close unless @socket.nil? @socket = nil end end
Return the next message received from the MQTT server. An optional topic can be given to subscribe to.
The method either returns the topic and message as an array:
topic,message = client.get
Or can be used with a block to keep processing messages:
client.get('test') do |topic,payload| # Do stuff here end
# File lib/mqttbridge/client.rb, line 396 def get(topic=nil) if block_given? get_packet(topic) do |packet| yield(packet.topic, packet.payload) end else # Wait for one packet to be available packet = get_packet(topic) return packet.topic, packet.payload end end
Return the next packet object received from the MQTT server. An optional topic can be given to subscribe to.
The method either returns a single packet:
packet = client.get_packet puts packet.topic
Or can be used with a block to keep processing messages:
client.get_packet('test') do |packet| # Do stuff here puts packet.topic end
# File lib/mqttbridge/client.rb, line 421 def get_packet(topic=nil) # Subscribe to a topic, if an argument is given subscribe(topic) unless topic.nil? if block_given? # Loop forever! loop do packet = @read_queue.pop yield(packet) puback_packet(packet) if packet.qos > 0 end else # Wait for one packet to be available packet = @read_queue.pop puback_packet(packet) if packet.qos > 0 return packet end end
Set to a PEM-format client private key
# File lib/mqttbridge/client.rb, line 210 def key=(*args) cert, passphrase = args.flatten ssl_context.key = OpenSSL::PKey::RSA.new(cert, passphrase) end
Set a path to a file containing a PEM-format client private key
# File lib/mqttbridge/client.rb, line 204 def key_file=(*args) path, passphrase = args.flatten ssl_context.key = OpenSSL::PKey::RSA.new(File.open(path), passphrase) end
Publish a message on a particular topic to the MQTT server.
# File lib/mqttbridge/client.rb, line 336 def publish(topic, payload='', retain=false, qos=0) raise ArgumentError.new("Topic name cannot be nil") if topic.nil? raise ArgumentError.new("Topic name cannot be empty") if topic.empty? packet = MQTT::Packet::Publish.new( :id => next_packet_id, :qos => qos, :retain => retain, :topic => topic, :payload => payload ) # Send the packet res = send_packet(packet) if packet.qos > 0 Timeout.timeout(@ack_timeout) do while connected? do @pubacks_semaphore.synchronize do return res if @pubacks.delete(packet.id) end # FIXME: make threads communicate with each other, instead of polling # (using a pipe and select ?) sleep 0.01 end end return -1 end end
Returns true if the incoming message queue is empty.
# File lib/mqttbridge/client.rb, line 441 def queue_empty? @read_queue.empty? end
Returns the length of the incoming message queue.
# File lib/mqttbridge/client.rb, line 446 def queue_length @read_queue.length end
@deprecated Please use {#host} instead
# File lib/mqttbridge/client.rb, line 582 def remote_host host end
@deprecated Please use {#host=} instead
# File lib/mqttbridge/client.rb, line 587 def remote_host=(args) self.host = args end
@deprecated Please use {#port} instead
# File lib/mqttbridge/client.rb, line 592 def remote_port port end
@deprecated Please use {#port=} instead
# File lib/mqttbridge/client.rb, line 597 def remote_port=(args) self.port = args end
Set the Will for the client
The will is a message that will be delivered by the server when the client dies. The Will must be set before establishing a connection to the server
# File lib/mqttbridge/client.rb, line 227 def set_will(topic, payload, retain=false, qos=0) self.will_topic = topic self.will_payload = payload self.will_retain = retain self.will_qos = qos end
Get the OpenSSL context, that is used if SSL/TLS is enabled
# File lib/mqttbridge/client.rb, line 189 def ssl_context @ssl_context ||= OpenSSL::SSL::SSLContext.new end
Send a subscribe message for one or more topics on the MQTT server. The topics parameter should be one of the following:
-
String: subscribe to one topic with QoS 0
-
Array: subscribe to multiple topics with QoS 0
-
Hash: subscribe to multiple topics where the key is the topic and the value is the QoS level
For example:
client.subscribe( 'a/b' ) client.subscribe( 'a/b', 'c/d' ) client.subscribe( ['a/b',0], ['c/d',1] ) client.subscribe( 'a/b' => 0, 'c/d' => 1 )
# File lib/mqttbridge/client.rb, line 377 def subscribe(*topics) packet = MQTT::Packet::Subscribe.new( :id => next_packet_id, :topics => topics ) send_packet(packet) end
Send a unsubscribe message for one or more topics on the MQTT server
# File lib/mqttbridge/client.rb, line 451 def unsubscribe(*topics) if topics.is_a?(Enumerable) and topics.count == 1 topics = topics.first end packet = MQTT::Packet::Unsubscribe.new( :topics => topics, :id => next_packet_id ) send_packet(packet) end