lib/mqtt/proxy.rb in mqtt-0.5.0 vs lib/mqtt/proxy.rb in mqtt-0.6.0

- old
+ new

@@ -1,99 +1,96 @@ -# Class for implementing a proxy to filter/mangle MQTT packets. -class MQTT::Proxy - # Address to bind listening socket to - attr_reader :local_host +module MQTT + # Class for implementing a proxy to filter/mangle MQTT packets. + class Proxy + # Address to bind listening socket to + attr_reader :local_host - # Port to bind listening socket to - attr_reader :local_port + # Port to bind listening socket to + attr_reader :local_port - # Address of upstream server to send packets upstream to - attr_reader :server_host + # Address of upstream server to send packets upstream to + attr_reader :server_host - # Port of upstream server to send packets upstream to. - attr_reader :server_port + # Port of upstream server to send packets upstream to. + attr_reader :server_port - # Time in seconds before disconnecting an idle connection - attr_reader :select_timeout + # Time in seconds before disconnecting an idle connection + attr_reader :select_timeout - # Ruby Logger object to send informational messages to - attr_reader :logger + # Ruby Logger object to send informational messages to + attr_reader :logger - # Create a new MQTT Proxy instance. - # - # Possible argument keys: - # - # :local_host Address to bind listening socket to. - # :local_port Port to bind listening socket to. - # :server_host Address of upstream server to send packets upstream to. - # :server_port Port of upstream server to send packets upstream to. - # :select_timeout Time in seconds before disconnecting a connection. - # :logger Ruby Logger object to send informational messages to. - # - # NOTE: be careful not to connect to yourself! - def initialize(args={}) - @local_host = args[:local_host] || '0.0.0.0' - @local_port = args[:local_port] || MQTT::DEFAULT_PORT - @server_host = args[:server_host] - @server_port = args[:server_port] || 18830 - @select_timeout = args[:select_timeout] || 60 + # A filter Proc for packets coming from the client (to the server). + attr_writer :client_filter - # Setup a logger - @logger = args[:logger] - if @logger.nil? - @logger = Logger.new(STDOUT) - @logger.level = Logger::INFO - end + # A filter Proc for packets coming from the server (to the client). + attr_writer :server_filter - # Default is not to have any filters - @client_filter = nil - @server_filter = nil + # Create a new MQTT Proxy instance. + # + # Possible argument keys: + # + # :local_host Address to bind listening socket to. + # :local_port Port to bind listening socket to. + # :server_host Address of upstream server to send packets upstream to. + # :server_port Port of upstream server to send packets upstream to. + # :select_timeout Time in seconds before disconnecting a connection. + # :logger Ruby Logger object to send informational messages to. + # + # NOTE: be careful not to connect to yourself! + def initialize(args = {}) + @local_host = args[:local_host] || '0.0.0.0' + @local_port = args[:local_port] || MQTT::DEFAULT_PORT + @server_host = args[:server_host] + @server_port = args[:server_port] || 18_830 + @select_timeout = args[:select_timeout] || 60 - # Create TCP server socket - @server = TCPServer.open(@local_host,@local_port) - @logger.info "MQTT::Proxy listening on #{@local_host}:#{@local_port}" - end + # Setup a logger + @logger = args[:logger] + if @logger.nil? + @logger = Logger.new(STDOUT) + @logger.level = Logger::INFO + end - # Set a filter Proc for packets coming from the client (to the server). - def client_filter=(proc) - @client_filter = proc - end + # Default is not to have any filters + @client_filter = nil + @server_filter = nil - # Set a filter Proc for packets coming from the server (to the client). - def server_filter=(proc) - @server_filter = proc - end + # Create TCP server socket + @server = TCPServer.open(@local_host, @local_port) + @logger.info "MQTT::Proxy listening on #{@local_host}:#{@local_port}" + end - # Start accepting connections and processing packets. - def run - loop do - # Wait for a client to connect and then create a thread for it - Thread.new(@server.accept) do |client_socket| - logger.info "Accepted client: #{client_socket.peeraddr.join(':')}" - server_socket = TCPSocket.new(@server_host,@server_port) - begin - process_packets(client_socket,server_socket) - rescue Exception => exp - logger.error exp.to_s + # Start accepting connections and processing packets. + def run + loop do + # Wait for a client to connect and then create a thread for it + Thread.new(@server.accept) do |client_socket| + logger.info "Accepted client: #{client_socket.peeraddr.join(':')}" + server_socket = TCPSocket.new(@server_host, @server_port) + begin + process_packets(client_socket, server_socket) + rescue Exception => exp + logger.error exp.to_s + end + logger.info "Disconnected: #{client_socket.peeraddr.join(':')}" + server_socket.close + client_socket.close end - logger.info "Disconnected: #{client_socket.peeraddr.join(':')}" - server_socket.close - client_socket.close end end - end - private + private - def process_packets(client_socket,server_socket) - loop do - # Wait for some data on either socket - selected = IO.select([client_socket,server_socket], nil, nil, @select_timeout) - if selected.nil? + def process_packets(client_socket, server_socket) + loop do + # Wait for some data on either socket + selected = IO.select([client_socket, server_socket], nil, nil, @select_timeout) + # Timeout - raise "Timeout in select" - else + raise 'Timeout in select' if selected.nil? + # Iterate through each of the sockets with data to read if selected[0].include?(client_socket) packet = MQTT::Packet.read(client_socket) logger.debug "client -> <#{packet.type_name}>" packet = @client_filter.call(packet) unless @client_filter.nil? @@ -108,12 +105,11 @@ unless packet.nil? client_socket.write(packet) logger.debug "<#{packet.type_name}> -> client" end else - logger.error "Problem with select: socket is neither server or client" + logger.error 'Problem with select: socket is neither server or client' end end end end - end