lib/mqtt/client.rb in mqtt-0.0.3 vs lib/mqtt/client.rb in mqtt-0.0.4

- old
+ new

@@ -46,26 +46,16 @@ if not connected? # Create network socket @socket = TCPSocket.new(@remote_host,@remote_port) # Protocol name and version - packet = MQTT::Packet.new(:type => :connect) - packet.add_string('MQIsdp') - packet.add_bytes(0x03) - - # Connect flags - connect_flags = 0x00 - connect_flags ||= 0x02 if @clean_start - # FIXME: implement Will and Testament - packet.add_bytes(connect_flags) - - # Keep Alive timer: 10 seconds - packet.add_short(@keep_alive) - - # Add the client identifier - packet.add_string(@client_id) - + packet = MQTT::Packet::Connect.new( + :clean_start => @clean_start, + :keep_alive => @keep_alive, + :client_id => @client_id + ) + # Send packet send_packet(packet) # Receive response receive_connack @@ -87,11 +77,11 @@ # Disconnect from the MQTT broker. # If you don't want to say goodbye to the broker, set send_msg to false. def disconnect(send_msg=true) if connected? if send_msg - packet = MQTT::Packet.new(:type => :disconnect) + packet = MQTT::Packet::Disconnect.new send_packet(packet) end @read_thread.kill if @read_thread and @read_thread.alive? @read_thread = nil @socket.close unless @socket.nil? @@ -104,34 +94,25 @@ not @socket.nil? end # Send a MQTT ping message to indicate that the MQTT client is alive. def ping - packet = MQTT::Packet.new(:type => :pingreq) + packet = MQTT::Packet::Pingreq.new send_packet(packet) @last_pingreq = Time.now end # Publish a message on a particular topic to the MQTT broker. def publish(topic, payload, retain=false, qos=0) - packet = MQTT::Packet.new( - :type => :publish, + packet = MQTT::Packet::Publish.new( :qos => qos, - :retain => retain + :retain => retain, + :topic => topic, + :payload => payload, + :message_id => @message_id.next ) - - # Add the topic name - packet.add_string(topic) - - # Add Message ID for qos1 and qos2 - unless qos == 0 - packet.add_short(@message_id.next) - end - - # Add the packet payload - packet.add_data(payload) - + # Send the packet send_packet(packet) end # Send a subscribe message for one or more topics on the MQTT broker. @@ -145,31 +126,14 @@ # client.subscribe( 'a/b', 'c/d' ) # client.subscribe( ['a/b',0], ['c/d',1] ) # client.subscribe( 'a/b' => 0, 'c/d' => 1 ) # def subscribe(*topics) - array = [] - topics.each do |item| - if item.is_a?(Hash) - # Convert hash into an ordered array of arrays - array += item.sort - elsif item.is_a?(Array) - # Already in [topic,qos] format - array.push item - else - # Default to QOS 0 - array.push [item.to_s,0] - end - end - - # Create the packet - packet = MQTT::Packet.new(:type => :subscribe, :qos => 1) - packet.add_short(@message_id.next) - array.each do |item| - packet.add_string(item[0]) - packet.add_bytes(item[1]) - end + packet = MQTT::Packet::Subscribe.new( + :topics => topics, + :message_id => @message_id.next + ) send_packet(packet) end # Return the next message recieved from the MQTT broker. # This method blocks until a message is available. @@ -178,21 +142,21 @@ # topic,message = client.get # def get # Wait for a packet to be available packet = @read_queue.pop - - # Parse the variable header - topic = packet.shift_string - msg_id = packet.shift_short unless (packet.qos == 0) - return topic,packet.body + topic = packet.topic + payload = packet.payload + return topic,payload end # Send a unsubscribe message for one or more topics on the MQTT broker def unsubscribe(*topics) - packet = MQTT::Packet.new(:type => :unsubscribe, :qos => 1) - topics.each { |topic| packet.add_string(topic) } + packet = MQTT::Packet::Unsubscribe.new( + :topics => topics, + :message_id => @message_id.next + ) send_packet(packet) end private @@ -203,11 +167,11 @@ # Poll socket - is there data waiting? result = IO.select([@socket], nil, nil, SELECT_TIMEOUT) unless result.nil? # Yes - read in the packet packet = MQTT::Packet.read(@socket) - if packet.type == :publish + if packet.class == MQTT::Packet::Publish # Add to queue @read_queue.push(packet) else # Ignore all other packets nil @@ -234,26 +198,16 @@ # Read and check a connection acknowledgement packet def receive_connack Timeout.timeout(@ack_timeout) do packet = MQTT::Packet.read(@socket) - if packet.type != :connack - raise MQTT::ProtocolException.new("Response wan't a connection acknowledgement: #{packet.type}") + if packet.class != MQTT::Packet::Connack + raise MQTT::ProtocolException.new("Response wan't a connection acknowledgement: #{packet.class}") end - # Read in the return code - byte1, return_code = packet.shift_bytes(2) - if return_code == 0x00 - # Success - nil - elsif return_code == 0x01 - raise MQTT::ProtocolException.new("Connection refused: unacceptable protocol version") - elsif return_code == 0x02 - raise MQTT::ProtocolException.new("Connection refused: client identifier rejected") - elsif return_code == 0x03 - raise MQTT::ProtocolException.new("Connection refused: broker unavailable") - else - raise MQTT::ProtocolException.new("Connection refused: #{return_code}") + # Check the return code + if packet.return_code != 0x00 + raise MQTT::ProtocolException.new(packet.return_msg) end end end # Send a packet to broker