lib/mqtt/proxy.rb in mqtt-0.0.4 vs lib/mqtt/proxy.rb in mqtt-0.0.5

- old
+ new

@@ -1,121 +1,109 @@ -#!/usr/bin/env ruby +# Class for implementing a proxy to filter/mangle MQTT packets. +class MQTT::Proxy + attr_reader :local_host + attr_reader :local_port + attr_reader :broker_host + attr_reader :broker_port + attr_reader :listen_queue + attr_reader :select_timeout + attr_reader :logger -require 'mqtt' -require 'mqtt/packet' -require 'thread' -require 'logger' -require 'socket' + # Create a new MQTT Proxy instance. + # + # Possible argument keys: + # + # :local_host Address to bind listening socket to. + # :local_port Port to bind listening socket to. + # :broker_host Address of upstream broker to send packets upstream to. + # :broker_port Port of upstream broker to send packets upstream to. + # :select_timeout Time in seconds before disconnecting a connection. + # :logger Ruby Logger object to send informational messages to. + # + # NOTE: be careful not to connect to yourself! + def initialize(args={}) + @local_host = args[:local_host] || '0.0.0.0' + @local_port = args[:local_port] || MQTT::DEFAULT_PORT + @broker_host = args[:broker_host] || MQTT::DEFAULT_HOST + @broker_port = args[:broker_port] || 18830 + @select_timeout = args[:select_timeout] || 60 + # Setup a logger + @logger = args[:logger] + if @logger.nil? + @logger = Logger.new(STDOUT) + @logger.level = Logger::INFO + end -module MQTT + # Default is not to have any filters + @client_filter = nil + @broker_filter = nil - # Class for implementing a proxy to filter/mangle MQTT packets. - class Proxy - attr_reader :local_host - attr_reader :local_port - attr_reader :broker_host - attr_reader :broker_port - attr_reader :listen_queue - attr_reader :select_timeout - attr_reader :logger - - # Create a new MQTT Proxy instance. - # - # Possible argument keys: - # - # :local_host Address to bind listening socket to. - # :local_port Port to bind listening socket to. - # :broker_host Address of upstream broker to send packets upstream to. - # :broker_port Port of upstream broker to send packets upstream to. - # :select_timeout Time in seconds before disconnecting a connection. - # :logger Ruby Logger object to send informational messages to. - # - # NOTE: be careful not to connect to yourself! - def initialize(args={}) - @local_host = args[:local_host] || '0.0.0.0' - @local_port = args[:local_port] || 1883 - @broker_host = args[:broker_host] || 'localhost' - @broker_port = args[:broker_port] || 18830 - @select_timeout = args[:select_timeout] || 60 - - # Setup a logger - @logger = args[:logger] - if @logger.nil? - @logger = Logger.new(STDOUT) - @logger.level = Logger::INFO - end - - # Default is not to have any filters - @client_filter = nil - @broker_filter = nil - - # Create TCP server socket - @server = TCPServer.open(@local_host,@local_port) - @logger.info "MQTT::Proxy listening on #{@local_host}:#{@local_port}" - end - - # Set a filter Proc for packets coming from the client (to the broker). - def client_filter=(proc) - @client_filter = proc - end - - # Set a filter Proc for packets coming from the broker (to the client). - def broker_filter=(proc) - @broker_filter = proc - end - - # Start accepting connections and processing packets. - def run - loop do - # Wait for a client to connect and then create a thread for it - Thread.new(@server.accept) do |client_socket| - logger.info "Accepted client: #{client_socket.peeraddr.join(':')}" - broker_socket = TCPSocket.new(@broker_host,@broker_port) - begin - process_packets(client_socket,broker_socket) - rescue Exception => exp - logger.error exp.to_s - end - logger.info "Disconnected: #{client_socket.peeraddr.join(':')}" - broker_socket.close - client_socket.close + # Create TCP server socket + @server = TCPServer.open(@local_host,@local_port) + @logger.info "MQTT::Proxy listening on #{@local_host}:#{@local_port}" + end + + # Set a filter Proc for packets coming from the client (to the broker). + def client_filter=(proc) + @client_filter = proc + end + + # Set a filter Proc for packets coming from the broker (to the client). + def broker_filter=(proc) + @broker_filter = proc + end + + # Start accepting connections and processing packets. + def run + loop do + # Wait for a client to connect and then create a thread for it + Thread.new(@server.accept) do |client_socket| + logger.info "Accepted client: #{client_socket.peeraddr.join(':')}" + broker_socket = TCPSocket.new(@broker_host,@broker_port) + begin + process_packets(client_socket,broker_socket) + rescue Exception => exp + logger.error exp.to_s end + logger.info "Disconnected: #{client_socket.peeraddr.join(':')}" + broker_socket.close + client_socket.close end end + end - private - - def process_packets(client_socket,broker_socket) - loop do - # Wait for some data on either socket - selected = IO.select([client_socket,broker_socket], nil, nil, @select_timeout) - if selected.nil? - # Timeout - raise "Timeout in select" - else - # Iterate through each of the sockets with data to read - if selected[0].include?(client_socket) - packet = MQTT::Packet.read(client_socket) - logger.debug "client -> <#{packet.type}>" - packet = @client_filter.call(packet) unless @client_filter.nil? - unless packet.nil? - broker_socket.write(packet) - logger.debug "<#{packet.type}> -> broker" - end - elsif selected[0].include?(broker_socket) - packet = MQTT::Packet.read(broker_socket) - logger.debug "broker -> <#{packet.type}>" - packet = @broker_filter.call(packet) unless @broker_filter.nil? - unless packet.nil? - client_socket.write(packet) - logger.debug "<#{packet.type}> -> client" - end - else - logger.error "Problem with select: socket is neither broker or client" + private + + def process_packets(client_socket,broker_socket) + loop do + # Wait for some data on either socket + selected = IO.select([client_socket,broker_socket], nil, nil, @select_timeout) + if selected.nil? + # Timeout + raise "Timeout in select" + else + # Iterate through each of the sockets with data to read + if selected[0].include?(client_socket) + packet = MQTT::Packet.read(client_socket) + logger.debug "client -> <#{packet.type}>" + packet = @client_filter.call(packet) unless @client_filter.nil? + unless packet.nil? + broker_socket.write(packet) + logger.debug "<#{packet.type}> -> broker" end + elsif selected[0].include?(broker_socket) + packet = MQTT::Packet.read(broker_socket) + logger.debug "broker -> <#{packet.type}>" + packet = @broker_filter.call(packet) unless @broker_filter.nil? + unless packet.nil? + client_socket.write(packet) + logger.debug "<#{packet.type}> -> client" + end + else + logger.error "Problem with select: socket is neither broker or client" end end end - end -end \ No newline at end of file + +end