lib/mqtt/proxy.rb in mqtt-0.2.0 vs lib/mqtt/proxy.rb in mqtt-0.3.0
- old
+ new
@@ -3,40 +3,40 @@
# Address to bind listening socket to
attr_reader :local_host
# Port to bind listening socket to
attr_reader :local_port
-
- # Address of upstream broker to send packets upstream to
- attr_reader :broker_host
-
- # Port of upstream broker to send packets upstream to.
- attr_reader :broker_port
+ # Address of upstream server to send packets upstream to
+ attr_reader :server_host
+
+ # Port of upstream server to send packets upstream to.
+ attr_reader :server_port
+
# Time in seconds before disconnecting an idle connection
attr_reader :select_timeout
-
+
# Ruby Logger object to send informational messages to
attr_reader :logger
# Create a new MQTT Proxy instance.
#
# Possible argument keys:
#
# :local_host Address to bind listening socket to.
# :local_port Port to bind listening socket to.
- # :broker_host Address of upstream broker to send packets upstream to.
- # :broker_port Port of upstream broker to send packets upstream to.
+ # :server_host Address of upstream server to send packets upstream to.
+ # :server_port Port of upstream server to send packets upstream to.
# :select_timeout Time in seconds before disconnecting a connection.
# :logger Ruby Logger object to send informational messages to.
#
# NOTE: be careful not to connect to yourself!
def initialize(args={})
@local_host = args[:local_host] || '0.0.0.0'
@local_port = args[:local_port] || MQTT::DEFAULT_PORT
- @broker_host = args[:broker_host]
- @broker_port = args[:broker_port] || 18830
+ @server_host = args[:server_host]
+ @server_port = args[:server_port] || 18830
@select_timeout = args[:select_timeout] || 60
# Setup a logger
@logger = args[:logger]
if @logger.nil?
@@ -44,74 +44,74 @@
@logger.level = Logger::INFO
end
# Default is not to have any filters
@client_filter = nil
- @broker_filter = nil
+ @server_filter = nil
# Create TCP server socket
@server = TCPServer.open(@local_host,@local_port)
@logger.info "MQTT::Proxy listening on #{@local_host}:#{@local_port}"
end
- # Set a filter Proc for packets coming from the client (to the broker).
+ # Set a filter Proc for packets coming from the client (to the server).
def client_filter=(proc)
@client_filter = proc
end
- # Set a filter Proc for packets coming from the broker (to the client).
- def broker_filter=(proc)
- @broker_filter = proc
+ # Set a filter Proc for packets coming from the server (to the client).
+ def server_filter=(proc)
+ @server_filter = proc
end
# Start accepting connections and processing packets.
def run
loop do
# Wait for a client to connect and then create a thread for it
Thread.new(@server.accept) do |client_socket|
logger.info "Accepted client: #{client_socket.peeraddr.join(':')}"
- broker_socket = TCPSocket.new(@broker_host,@broker_port)
+ server_socket = TCPSocket.new(@server_host,@server_port)
begin
- process_packets(client_socket,broker_socket)
+ process_packets(client_socket,server_socket)
rescue Exception => exp
logger.error exp.to_s
end
logger.info "Disconnected: #{client_socket.peeraddr.join(':')}"
- broker_socket.close
+ server_socket.close
client_socket.close
end
end
end
private
- def process_packets(client_socket,broker_socket)
+ def process_packets(client_socket,server_socket)
loop do
# Wait for some data on either socket
- selected = IO.select([client_socket,broker_socket], nil, nil, @select_timeout)
+ selected = IO.select([client_socket,server_socket], nil, nil, @select_timeout)
if selected.nil?
# Timeout
raise "Timeout in select"
else
# Iterate through each of the sockets with data to read
if selected[0].include?(client_socket)
packet = MQTT::Packet.read(client_socket)
logger.debug "client -> <#{packet.type}>"
packet = @client_filter.call(packet) unless @client_filter.nil?
unless packet.nil?
- broker_socket.write(packet)
- logger.debug "<#{packet.type}> -> broker"
+ server_socket.write(packet)
+ logger.debug "<#{packet.type}> -> server"
end
- elsif selected[0].include?(broker_socket)
- packet = MQTT::Packet.read(broker_socket)
- logger.debug "broker -> <#{packet.type}>"
- packet = @broker_filter.call(packet) unless @broker_filter.nil?
+ elsif selected[0].include?(server_socket)
+ packet = MQTT::Packet.read(server_socket)
+ logger.debug "server -> <#{packet.type}>"
+ packet = @server_filter.call(packet) unless @server_filter.nil?
unless packet.nil?
client_socket.write(packet)
logger.debug "<#{packet.type}> -> client"
end
else
- logger.error "Problem with select: socket is neither broker or client"
+ logger.error "Problem with select: socket is neither server or client"
end
end
end
end