lib/mqtt/client.rb in mqtt-0.0.3 vs lib/mqtt/client.rb in mqtt-0.0.4
- old
+ new
@@ -46,26 +46,16 @@
if not connected?
# Create network socket
@socket = TCPSocket.new(@remote_host,@remote_port)
# Protocol name and version
- packet = MQTT::Packet.new(:type => :connect)
- packet.add_string('MQIsdp')
- packet.add_bytes(0x03)
-
- # Connect flags
- connect_flags = 0x00
- connect_flags ||= 0x02 if @clean_start
- # FIXME: implement Will and Testament
- packet.add_bytes(connect_flags)
-
- # Keep Alive timer: 10 seconds
- packet.add_short(@keep_alive)
-
- # Add the client identifier
- packet.add_string(@client_id)
-
+ packet = MQTT::Packet::Connect.new(
+ :clean_start => @clean_start,
+ :keep_alive => @keep_alive,
+ :client_id => @client_id
+ )
+
# Send packet
send_packet(packet)
# Receive response
receive_connack
@@ -87,11 +77,11 @@
# Disconnect from the MQTT broker.
# If you don't want to say goodbye to the broker, set send_msg to false.
def disconnect(send_msg=true)
if connected?
if send_msg
- packet = MQTT::Packet.new(:type => :disconnect)
+ packet = MQTT::Packet::Disconnect.new
send_packet(packet)
end
@read_thread.kill if @read_thread and @read_thread.alive?
@read_thread = nil
@socket.close unless @socket.nil?
@@ -104,34 +94,25 @@
not @socket.nil?
end
# Send a MQTT ping message to indicate that the MQTT client is alive.
def ping
- packet = MQTT::Packet.new(:type => :pingreq)
+ packet = MQTT::Packet::Pingreq.new
send_packet(packet)
@last_pingreq = Time.now
end
# Publish a message on a particular topic to the MQTT broker.
def publish(topic, payload, retain=false, qos=0)
- packet = MQTT::Packet.new(
- :type => :publish,
+ packet = MQTT::Packet::Publish.new(
:qos => qos,
- :retain => retain
+ :retain => retain,
+ :topic => topic,
+ :payload => payload,
+ :message_id => @message_id.next
)
-
- # Add the topic name
- packet.add_string(topic)
-
- # Add Message ID for qos1 and qos2
- unless qos == 0
- packet.add_short(@message_id.next)
- end
-
- # Add the packet payload
- packet.add_data(payload)
-
+
# Send the packet
send_packet(packet)
end
# Send a subscribe message for one or more topics on the MQTT broker.
@@ -145,31 +126,14 @@
# client.subscribe( 'a/b', 'c/d' )
# client.subscribe( ['a/b',0], ['c/d',1] )
# client.subscribe( 'a/b' => 0, 'c/d' => 1 )
#
def subscribe(*topics)
- array = []
- topics.each do |item|
- if item.is_a?(Hash)
- # Convert hash into an ordered array of arrays
- array += item.sort
- elsif item.is_a?(Array)
- # Already in [topic,qos] format
- array.push item
- else
- # Default to QOS 0
- array.push [item.to_s,0]
- end
- end
-
- # Create the packet
- packet = MQTT::Packet.new(:type => :subscribe, :qos => 1)
- packet.add_short(@message_id.next)
- array.each do |item|
- packet.add_string(item[0])
- packet.add_bytes(item[1])
- end
+ packet = MQTT::Packet::Subscribe.new(
+ :topics => topics,
+ :message_id => @message_id.next
+ )
send_packet(packet)
end
# Return the next message recieved from the MQTT broker.
# This method blocks until a message is available.
@@ -178,21 +142,21 @@
# topic,message = client.get
#
def get
# Wait for a packet to be available
packet = @read_queue.pop
-
- # Parse the variable header
- topic = packet.shift_string
- msg_id = packet.shift_short unless (packet.qos == 0)
- return topic,packet.body
+ topic = packet.topic
+ payload = packet.payload
+ return topic,payload
end
# Send a unsubscribe message for one or more topics on the MQTT broker
def unsubscribe(*topics)
- packet = MQTT::Packet.new(:type => :unsubscribe, :qos => 1)
- topics.each { |topic| packet.add_string(topic) }
+ packet = MQTT::Packet::Unsubscribe.new(
+ :topics => topics,
+ :message_id => @message_id.next
+ )
send_packet(packet)
end
private
@@ -203,11 +167,11 @@
# Poll socket - is there data waiting?
result = IO.select([@socket], nil, nil, SELECT_TIMEOUT)
unless result.nil?
# Yes - read in the packet
packet = MQTT::Packet.read(@socket)
- if packet.type == :publish
+ if packet.class == MQTT::Packet::Publish
# Add to queue
@read_queue.push(packet)
else
# Ignore all other packets
nil
@@ -234,26 +198,16 @@
# Read and check a connection acknowledgement packet
def receive_connack
Timeout.timeout(@ack_timeout) do
packet = MQTT::Packet.read(@socket)
- if packet.type != :connack
- raise MQTT::ProtocolException.new("Response wan't a connection acknowledgement: #{packet.type}")
+ if packet.class != MQTT::Packet::Connack
+ raise MQTT::ProtocolException.new("Response wan't a connection acknowledgement: #{packet.class}")
end
- # Read in the return code
- byte1, return_code = packet.shift_bytes(2)
- if return_code == 0x00
- # Success
- nil
- elsif return_code == 0x01
- raise MQTT::ProtocolException.new("Connection refused: unacceptable protocol version")
- elsif return_code == 0x02
- raise MQTT::ProtocolException.new("Connection refused: client identifier rejected")
- elsif return_code == 0x03
- raise MQTT::ProtocolException.new("Connection refused: broker unavailable")
- else
- raise MQTT::ProtocolException.new("Connection refused: #{return_code}")
+ # Check the return code
+ if packet.return_code != 0x00
+ raise MQTT::ProtocolException.new(packet.return_msg)
end
end
end
# Send a packet to broker