lib/mqtt/client.rb in mqtt-0.2.0 vs lib/mqtt/client.rb in mqtt-0.3.0

- old
+ new

@@ -1,65 +1,69 @@ autoload :OpenSSL, 'openssl' autoload :URI, 'uri' -# Client class for talking to an MQTT broker +# Client class for talking to an MQTT server class MQTT::Client - # Hostname of the remote broker - attr_accessor :remote_host + # Hostname of the remote server + attr_accessor :host - # Port number of the remote broker - attr_accessor :remote_port + # Port number of the remote server + attr_accessor :port + # The version number of the MQTT protocol to use (default 3.1.0) + attr_accessor :version + # Set to true to enable SSL/TLS encrypted communication # # Set to a symbol to use a specific variant of SSL/TLS. - # Allowed values include: - # + # Allowed values include: + # # @example Using TLS 1.0 # client = Client.new('mqtt.example.com', :ssl => :TLSv1) # @see OpenSSL::SSL::SSLContext::METHODS attr_accessor :ssl - # Time (in seconds) between pings to remote broker + # Time (in seconds) between pings to remote server (default is 15 seconds) attr_accessor :keep_alive - # Set the 'Clean Session' flag when connecting? + # Set the 'Clean Session' flag when connecting? (default is true) attr_accessor :clean_session # Client Identifier attr_accessor :client_id - # Number of seconds to wait for acknowledgement packets + # Number of seconds to wait for acknowledgement packets (default is 5 seconds) attr_accessor :ack_timeout - # Username to authenticate to the broker with + # Username to authenticate to the server with attr_accessor :username - # Password to authenticate to the broker with + # Password to authenticate to the server with attr_accessor :password # The topic that the Will message is published to attr_accessor :will_topic - # Contents of message that is sent by broker when client disconnect + # Contents of message that is sent by server when client disconnect attr_accessor :will_payload - # The QoS level of the will message sent by the broker + # The QoS level of the will message sent by the server attr_accessor :will_qos - # If the Will message should be retain by the broker after it is sent + # If the Will message should be retain by the server after it is sent attr_accessor :will_retain # Timeout between select polls (in seconds) SELECT_TIMEOUT = 0.5 # Default attribute values ATTR_DEFAULTS = { - :remote_host => nil, - :remote_port => nil, + :host => nil, + :port => nil, + :version => '3.1.0', :keep_alive => 15, :clean_session => true, :client_id => nil, :ack_timeout => 5, :username => nil, @@ -87,11 +91,11 @@ return client end # Generate a random client identifier # (using the characters 0-9 and a-z) - def self.generate_client_id(prefix='ruby_', length=16) + def self.generate_client_id(prefix='ruby', length=16) str = prefix.dup length.times do num = rand(36) if (num<10) # Number @@ -111,47 +115,47 @@ # - a URI that uses the MQTT scheme # - a hostname and port # - a Hash containing attributes to be set on the new instance # # If no arguments are given then the method will look for a URI - # in the MQTT_BROKER environment variable. + # in the MQTT_SERVER environment variable. # # Examples: # client = MQTT::Client.new # client = MQTT::Client.new('mqtt://myserver.example.com') # client = MQTT::Client.new('mqtt://user:pass@myserver.example.com') # client = MQTT::Client.new('myserver.example.com') # client = MQTT::Client.new('myserver.example.com', 18830) - # client = MQTT::Client.new(:remote_host => 'myserver.example.com') - # client = MQTT::Client.new(:remote_host => 'myserver.example.com', :keep_alive => 30) + # client = MQTT::Client.new(:host => 'myserver.example.com') + # client = MQTT::Client.new(:host => 'myserver.example.com', :keep_alive => 30) # def initialize(*args) if args.last.is_a?(Hash) attr = args.pop else attr = {} end if args.length == 0 - if ENV['MQTT_BROKER'] - attr.merge!(parse_uri(ENV['MQTT_BROKER'])) + if ENV['MQTT_SERVER'] + attr.merge!(parse_uri(ENV['MQTT_SERVER'])) end end if args.length >= 1 case args[0] when URI attr.merge!(parse_uri(args[0])) when %r|^mqtts?://| attr.merge!(parse_uri(args[0])) else - attr.merge!(:remote_host => args[0]) + attr.merge!(:host => args[0]) end end if args.length >= 2 - attr.merge!(:remote_port => args[1]) + attr.merge!(:port => args[1]) unless args[1].nil? end if args.length >= 3 raise ArgumentError, "Unsupported number of arguments" end @@ -160,16 +164,16 @@ ATTR_DEFAULTS.merge(attr).each_pair do |k,v| self.send("#{k}=", v) end # Set a default port number - if @remote_port.nil? - @remote_port = @ssl ? MQTT::DEFAULT_SSL_PORT : MQTT::DEFAULT_PORT + if @port.nil? + @port = @ssl ? MQTT::DEFAULT_SSL_PORT : MQTT::DEFAULT_PORT end # Initialise private instance variables - @message_id = 0 + @packet_id = 0 @last_pingreq = Time.now @last_pingresp = Time.now @socket = nil @read_queue = Queue.new @read_thread = nil @@ -199,41 +203,44 @@ end end # Set the Will for the client # - # The will is a message that will be delivered by the broker when the client dies. - # The Will must be set before establishing a connection to the broker + # The will is a message that will be delivered by the server when the client dies. + # The Will must be set before establishing a connection to the server def set_will(topic, payload, retain=false, qos=0) self.will_topic = topic self.will_payload = payload self.will_retain = retain self.will_qos = qos end - # Connect to the MQTT broker + # Connect to the MQTT server # If a block is given, then yield to that block and then disconnect again. def connect(clientid=nil) unless clientid.nil? @client_id = clientid end if @client_id.nil? or @client_id.empty? if @clean_session - @client_id = MQTT::Client.generate_client_id + if @version == '3.1.0' + # Empty client id is not allowed for version 3.1.0 + @client_id = MQTT::Client.generate_client_id + end else raise 'Must provide a client_id if clean_session is set to false' end end - if @remote_host.nil? - raise 'No MQTT broker host set when attempting to connect' + if @host.nil? + raise 'No MQTT server host set when attempting to connect' end if not connected? # Create network socket - tcp_socket = TCPSocket.new(@remote_host, @remote_port) + tcp_socket = TCPSocket.new(@host, @port) if @ssl # Set the protocol version if @ssl.is_a?(Symbol) ssl_context.ssl_version = @ssl @@ -244,12 +251,13 @@ @socket.connect else @socket = tcp_socket end - # Protocol name and version + # Construct a connect packet packet = MQTT::Packet::Connect.new( + :version => @version, :clean_session => @clean_session, :keep_alive => @keep_alive, :client_id => @client_id, :username => @username, :password => @password, @@ -279,12 +287,12 @@ yield(self) disconnect end end - # Disconnect from the MQTT broker. - # If you don't want to say goodbye to the broker, set send_msg to false. + # Disconnect from the MQTT server. + # If you don't want to say goodbye to the server, set send_msg to false. def disconnect(send_msg=true) # Stop reading packets from the socket first @read_thread.kill if @read_thread and @read_thread.alive? @read_thread = nil @@ -297,11 +305,11 @@ @socket.close unless @socket.nil? @socket = nil end end - # Checks whether the client is connected to the broker. + # Checks whether the client is connected to the server. def connected? (not @socket.nil?) and (not @socket.closed?) end # Send a MQTT ping message to indicate that the MQTT client is alive. @@ -312,25 +320,28 @@ packet = MQTT::Packet::Pingreq.new send_packet(packet) @last_pingreq = Time.now end - # Publish a message on a particular topic to the MQTT broker. - def publish(topic, payload, retain=false, qos=0) + # Publish a message on a particular topic to the MQTT server. + def publish(topic, payload='', retain=false, qos=0) + raise ArgumentError.new("Topic name cannot be nil") if topic.nil? + raise ArgumentError.new("Topic name cannot be empty") if topic.empty? + packet = MQTT::Packet::Publish.new( + :id => @packet_id.next, :qos => qos, :retain => retain, :topic => topic, - :payload => payload, - :message_id => @message_id.next + :payload => payload ) # Send the packet send_packet(packet) end - # Send a subscribe message for one or more topics on the MQTT broker. + # Send a subscribe message for one or more topics on the MQTT server. # The topics parameter should be one of the following: # * String: subscribe to one topic with QOS 0 # * Array: subscribe to multiple topics with QOS 0 # * Hash: subscribe to multiple topics where the key is the topic and the value is the QOS level # @@ -340,17 +351,17 @@ # client.subscribe( ['a/b',0], ['c/d',1] ) # client.subscribe( 'a/b' => 0, 'c/d' => 1 ) # def subscribe(*topics) packet = MQTT::Packet::Subscribe.new( - :topics => topics, - :message_id => @message_id.next + :id => @packet_id.next, + :topics => topics ) send_packet(packet) end - # Return the next message received from the MQTT broker. + # Return the next message received from the MQTT server. # An optional topic can be given to subscribe to. # # The method either returns the topic and message as an array: # topic,message = client.get # @@ -374,11 +385,11 @@ packet = @read_queue.pop return packet.topic, packet.payload end end - # Return the next packet object received from the MQTT broker. + # Return the next packet object received from the MQTT server. # An optional topic can be given to subscribe to. # # The method either returns a single packet: # packet = client.get_packet # puts packet.topic @@ -412,26 +423,26 @@ # Returns the length of the incoming message queue. def queue_length @read_queue.length end - # Send a unsubscribe message for one or more topics on the MQTT broker + # Send a unsubscribe message for one or more topics on the MQTT server def unsubscribe(*topics) if topics.is_a?(Enumerable) and topics.count == 1 topics = topics.first end packet = MQTT::Packet::Unsubscribe.new( :topics => topics, - :message_id => @message_id.next + :id => @packet_id.next ) send_packet(packet) end private - # Try to read a packet from the broker + # Try to read a packet from the server # Also sends keep-alive ping packets. def receive_packet begin # Poll socket - is there data waiting? result = IO.select([@socket], [], [], SELECT_TIMEOUT) @@ -468,21 +479,23 @@ # Read and check a connection acknowledgement packet def receive_connack Timeout.timeout(@ack_timeout) do packet = MQTT::Packet.read(@socket) if packet.class != MQTT::Packet::Connack - raise MQTT::ProtocolException.new("Response wan't a connection acknowledgement: #{packet.class}") + raise MQTT::ProtocolException.new( + "Response wasn't a connection acknowledgement: #{packet.class}" + ) end # Check the return code if packet.return_code != 0x00 raise MQTT::ProtocolException.new(packet.return_msg) end end end - # Send a packet to broker + # Send a packet to server def send_packet(data) # Throw exception if we aren't connected raise MQTT::NotConnectedException if not connected? # Only allow one thread to write to socket at a time @@ -501,14 +514,38 @@ else raise "Only the mqtt:// and mqtts:// schemes are supported" end { - :remote_host => uri.host, - :remote_port => uri.port || nil, + :host => uri.host, + :port => uri.port || nil, :username => uri.user, :password => uri.password, :ssl => ssl } + end + + + # ---- Deprecated attributes and methods ---- # + public + + # @deprecated Please use {#host} instead + def remote_host + host + end + + # @deprecated Please use {#host=} instead + def remote_host=(args) + self.host = args + end + + # @deprecated Please use {#port} instead + def remote_port + port + end + + # @deprecated Please use {#port=} instead + def remote_port=(args) + self.port = args end end