lib/mqtt/client.rb in mqtt-0.1.0 vs lib/mqtt/client.rb in mqtt-0.2.0

- old
+ new

@@ -1,44 +1,80 @@ +autoload :OpenSSL, 'openssl' +autoload :URI, 'uri' + + # Client class for talking to an MQTT broker class MQTT::Client - attr_reader :remote_host # Hostname of the remote broker - attr_reader :remote_port # Port number of the remote broker - attr_accessor :keep_alive # Time (in seconds) between pings to remote broker - attr_accessor :clean_session # Set the 'Clean Session' flag when connecting? - attr_accessor :client_id # Client Identifier - attr_accessor :ack_timeout # Number of seconds to wait for acknowledgement packets - attr_accessor :username # Username to authenticate to the broker with - attr_accessor :password # Password to authenticate to the broker with - attr_accessor :will_topic # The topic that the Will message is published to - attr_accessor :will_payload # Contents of message that is sent by broker when client disconnect - attr_accessor :will_qos # The QoS level of the will message sent by the broker - attr_accessor :will_retain # If the Will message should be retain by the broker after it is sent + # Hostname of the remote broker + attr_accessor :remote_host - # OLD deprecated clean_start - alias :clean_start :clean_session - alias :clean_start= :clean_session= + # Port number of the remote broker + attr_accessor :remote_port + # Set to true to enable SSL/TLS encrypted communication + # + # Set to a symbol to use a specific variant of SSL/TLS. + # Allowed values include: + # + # @example Using TLS 1.0 + # client = Client.new('mqtt.example.com', :ssl => :TLSv1) + # @see OpenSSL::SSL::SSLContext::METHODS + attr_accessor :ssl + + # Time (in seconds) between pings to remote broker + attr_accessor :keep_alive + + # Set the 'Clean Session' flag when connecting? + attr_accessor :clean_session + + # Client Identifier + attr_accessor :client_id + + # Number of seconds to wait for acknowledgement packets + attr_accessor :ack_timeout + + # Username to authenticate to the broker with + attr_accessor :username + + # Password to authenticate to the broker with + attr_accessor :password + + # The topic that the Will message is published to + attr_accessor :will_topic + + # Contents of message that is sent by broker when client disconnect + attr_accessor :will_payload + + # The QoS level of the will message sent by the broker + attr_accessor :will_qos + + # If the Will message should be retain by the broker after it is sent + attr_accessor :will_retain + + # Timeout between select polls (in seconds) SELECT_TIMEOUT = 0.5 # Default attribute values ATTR_DEFAULTS = { - :remote_host => MQTT::DEFAULT_HOST, - :remote_port => MQTT::DEFAULT_PORT, + :remote_host => nil, + :remote_port => nil, :keep_alive => 15, :clean_session => true, :client_id => nil, :ack_timeout => 5, :username => nil, :password => nil, :will_topic => nil, :will_payload => nil, :will_qos => 0, - :will_retain => false + :will_retain => false, + :ssl => false } # Create and connect a new MQTT Client + # # Accepts the same arguments as creating a new client. # If a block is given, then it will be executed before disconnecting again. # # Example: # MQTT::Client.connect('myserver.example.com') do |client| @@ -69,65 +105,149 @@ return str end # Create a new MQTT Client instance # + # Accepts one of the following: + # - a URI that uses the MQTT scheme + # - a hostname and port + # - a Hash containing attributes to be set on the new instance + # + # If no arguments are given then the method will look for a URI + # in the MQTT_BROKER environment variable. + # # Examples: + # client = MQTT::Client.new + # client = MQTT::Client.new('mqtt://myserver.example.com') + # client = MQTT::Client.new('mqtt://user:pass@myserver.example.com') # client = MQTT::Client.new('myserver.example.com') # client = MQTT::Client.new('myserver.example.com', 18830) # client = MQTT::Client.new(:remote_host => 'myserver.example.com') # client = MQTT::Client.new(:remote_host => 'myserver.example.com', :keep_alive => 30) # def initialize(*args) - if args.count == 0 - args = {} - elsif args.count == 1 and args[0].is_a?(Hash) - args = args[0] - elsif args.count == 1 - args = {:remote_host => args[0]} - elsif args.count == 2 - args = {:remote_host => args[0], :remote_port => args[1]} + if args.last.is_a?(Hash) + attr = args.pop else + attr = {} + end + + if args.length == 0 + if ENV['MQTT_BROKER'] + attr.merge!(parse_uri(ENV['MQTT_BROKER'])) + end + end + + if args.length >= 1 + case args[0] + when URI + attr.merge!(parse_uri(args[0])) + when %r|^mqtts?://| + attr.merge!(parse_uri(args[0])) + else + attr.merge!(:remote_host => args[0]) + end + end + + if args.length >= 2 + attr.merge!(:remote_port => args[1]) + end + + if args.length >= 3 raise ArgumentError, "Unsupported number of arguments" end # Merge arguments with default values for attributes - ATTR_DEFAULTS.merge(args).each_pair do |k,v| - instance_variable_set("@#{k}", v) + ATTR_DEFAULTS.merge(attr).each_pair do |k,v| + self.send("#{k}=", v) end + # Set a default port number + if @remote_port.nil? + @remote_port = @ssl ? MQTT::DEFAULT_SSL_PORT : MQTT::DEFAULT_PORT + end + # Initialise private instance variables @message_id = 0 @last_pingreq = Time.now @last_pingresp = Time.now @socket = nil @read_queue = Queue.new @read_thread = nil @write_semaphore = Mutex.new end + # Get the OpenSSL context, that is used if SSL/TLS is enabled + def ssl_context + @ssl_context ||= OpenSSL::SSL::SSLContext.new + end + + # Set a path to a file containing a PEM-format client certificate + def cert_file=(path) + ssl_context.cert = OpenSSL::X509::Certificate.new(File.open(path)) + end + + # Set a path to a file containing a PEM-format client private key + def key_file=(path) + ssl_context.key = OpenSSL::PKey::RSA.new(File.open(path)) + end + + # Set a path to a file containing a PEM-format CA certificate and enable peer verification + def ca_file=(path) + ssl_context.ca_file = path + unless path.nil? + ssl_context.verify_mode = OpenSSL::SSL::VERIFY_PEER + end + end + + # Set the Will for the client + # + # The will is a message that will be delivered by the broker when the client dies. + # The Will must be set before establishing a connection to the broker def set_will(topic, payload, retain=false, qos=0) self.will_topic = topic self.will_payload = payload self.will_retain = retain self.will_qos = qos end # Connect to the MQTT broker # If a block is given, then yield to that block and then disconnect again. def connect(clientid=nil) - if !clientid.nil? + unless clientid.nil? @client_id = clientid - elsif @client_id.nil? - @client_id = MQTT::Client.generate_client_id - @clean_session = true end + if @client_id.nil? or @client_id.empty? + if @clean_session + @client_id = MQTT::Client.generate_client_id + else + raise 'Must provide a client_id if clean_session is set to false' + end + end + + if @remote_host.nil? + raise 'No MQTT broker host set when attempting to connect' + end + if not connected? # Create network socket - @socket = TCPSocket.new(@remote_host,@remote_port) + tcp_socket = TCPSocket.new(@remote_host, @remote_port) + if @ssl + # Set the protocol version + if @ssl.is_a?(Symbol) + ssl_context.ssl_version = @ssl + end + + @socket = OpenSSL::SSL::SSLSocket.new(tcp_socket, ssl_context) + @socket.sync_close = true + @socket.connect + else + @socket = tcp_socket + end + # Protocol name and version packet = MQTT::Packet::Connect.new( :clean_session => @clean_session, :keep_alive => @keep_alive, :client_id => @client_id, @@ -146,11 +266,13 @@ receive_connack # Start packet reading thread @read_thread = Thread.new(Thread.current) do |parent| Thread.current[:parent] = parent - loop { receive_packet } + while connected? do + receive_packet + end end end # If a block is given, then yield and disconnect if block_given? @@ -160,28 +282,34 @@ end # Disconnect from the MQTT broker. # If you don't want to say goodbye to the broker, set send_msg to false. def disconnect(send_msg=true) + # Stop reading packets from the socket first + @read_thread.kill if @read_thread and @read_thread.alive? + @read_thread = nil + + # Close the socket if it is open if connected? if send_msg packet = MQTT::Packet::Disconnect.new send_packet(packet) end @socket.close unless @socket.nil? @socket = nil end - @read_thread.kill if @read_thread and @read_thread.alive? - @read_thread = nil end # Checks whether the client is connected to the broker. def connected? - not @socket.nil? + (not @socket.nil?) and (not @socket.closed?) end # Send a MQTT ping message to indicate that the MQTT client is alive. + # + # Note that you will not normally need to call this method + # as it is called automatically def ping packet = MQTT::Packet::Pingreq.new send_packet(packet) @last_pingreq = Time.now end @@ -286,10 +414,14 @@ @read_queue.length end # Send a unsubscribe message for one or more topics on the MQTT broker def unsubscribe(*topics) + if topics.is_a?(Enumerable) and topics.count == 1 + topics = topics.first + end + packet = MQTT::Packet::Unsubscribe.new( :topics => topics, :message_id => @message_id.next ) send_packet(packet) @@ -300,11 +432,11 @@ # Try to read a packet from the broker # Also sends keep-alive ping packets. def receive_packet begin # Poll socket - is there data waiting? - result = IO.select([@socket], nil, nil, SELECT_TIMEOUT) + result = IO.select([@socket], [], [], SELECT_TIMEOUT) unless result.nil? # Yes - read in the packet packet = MQTT::Packet.read(@socket) if packet.class == MQTT::Packet::Publish # Add to queue @@ -353,10 +485,30 @@ # Throw exception if we aren't connected raise MQTT::NotConnectedException if not connected? # Only allow one thread to write to socket at a time @write_semaphore.synchronize do - @socket.write(data) + @socket.write(data.to_s) end + end + + private + def parse_uri(uri) + uri = URI.parse(uri) unless uri.is_a?(URI) + if uri.scheme == 'mqtt' + ssl = false + elsif uri.scheme == 'mqtts' + ssl = true + else + raise "Only the mqtt:// and mqtts:// schemes are supported" + end + + { + :remote_host => uri.host, + :remote_port => uri.port || nil, + :username => uri.user, + :password => uri.password, + :ssl => ssl + } end end