lib/mqtt/proxy.rb in mqtt-0.0.4 vs lib/mqtt/proxy.rb in mqtt-0.0.5
- old
+ new
@@ -1,121 +1,109 @@
-#!/usr/bin/env ruby
+# Class for implementing a proxy to filter/mangle MQTT packets.
+class MQTT::Proxy
+ attr_reader :local_host
+ attr_reader :local_port
+ attr_reader :broker_host
+ attr_reader :broker_port
+ attr_reader :listen_queue
+ attr_reader :select_timeout
+ attr_reader :logger
-require 'mqtt'
-require 'mqtt/packet'
-require 'thread'
-require 'logger'
-require 'socket'
+ # Create a new MQTT Proxy instance.
+ #
+ # Possible argument keys:
+ #
+ # :local_host Address to bind listening socket to.
+ # :local_port Port to bind listening socket to.
+ # :broker_host Address of upstream broker to send packets upstream to.
+ # :broker_port Port of upstream broker to send packets upstream to.
+ # :select_timeout Time in seconds before disconnecting a connection.
+ # :logger Ruby Logger object to send informational messages to.
+ #
+ # NOTE: be careful not to connect to yourself!
+ def initialize(args={})
+ @local_host = args[:local_host] || '0.0.0.0'
+ @local_port = args[:local_port] || MQTT::DEFAULT_PORT
+ @broker_host = args[:broker_host] || MQTT::DEFAULT_HOST
+ @broker_port = args[:broker_port] || 18830
+ @select_timeout = args[:select_timeout] || 60
+ # Setup a logger
+ @logger = args[:logger]
+ if @logger.nil?
+ @logger = Logger.new(STDOUT)
+ @logger.level = Logger::INFO
+ end
-module MQTT
+ # Default is not to have any filters
+ @client_filter = nil
+ @broker_filter = nil
- # Class for implementing a proxy to filter/mangle MQTT packets.
- class Proxy
- attr_reader :local_host
- attr_reader :local_port
- attr_reader :broker_host
- attr_reader :broker_port
- attr_reader :listen_queue
- attr_reader :select_timeout
- attr_reader :logger
-
- # Create a new MQTT Proxy instance.
- #
- # Possible argument keys:
- #
- # :local_host Address to bind listening socket to.
- # :local_port Port to bind listening socket to.
- # :broker_host Address of upstream broker to send packets upstream to.
- # :broker_port Port of upstream broker to send packets upstream to.
- # :select_timeout Time in seconds before disconnecting a connection.
- # :logger Ruby Logger object to send informational messages to.
- #
- # NOTE: be careful not to connect to yourself!
- def initialize(args={})
- @local_host = args[:local_host] || '0.0.0.0'
- @local_port = args[:local_port] || 1883
- @broker_host = args[:broker_host] || 'localhost'
- @broker_port = args[:broker_port] || 18830
- @select_timeout = args[:select_timeout] || 60
-
- # Setup a logger
- @logger = args[:logger]
- if @logger.nil?
- @logger = Logger.new(STDOUT)
- @logger.level = Logger::INFO
- end
-
- # Default is not to have any filters
- @client_filter = nil
- @broker_filter = nil
-
- # Create TCP server socket
- @server = TCPServer.open(@local_host,@local_port)
- @logger.info "MQTT::Proxy listening on #{@local_host}:#{@local_port}"
- end
-
- # Set a filter Proc for packets coming from the client (to the broker).
- def client_filter=(proc)
- @client_filter = proc
- end
-
- # Set a filter Proc for packets coming from the broker (to the client).
- def broker_filter=(proc)
- @broker_filter = proc
- end
-
- # Start accepting connections and processing packets.
- def run
- loop do
- # Wait for a client to connect and then create a thread for it
- Thread.new(@server.accept) do |client_socket|
- logger.info "Accepted client: #{client_socket.peeraddr.join(':')}"
- broker_socket = TCPSocket.new(@broker_host,@broker_port)
- begin
- process_packets(client_socket,broker_socket)
- rescue Exception => exp
- logger.error exp.to_s
- end
- logger.info "Disconnected: #{client_socket.peeraddr.join(':')}"
- broker_socket.close
- client_socket.close
+ # Create TCP server socket
+ @server = TCPServer.open(@local_host,@local_port)
+ @logger.info "MQTT::Proxy listening on #{@local_host}:#{@local_port}"
+ end
+
+ # Set a filter Proc for packets coming from the client (to the broker).
+ def client_filter=(proc)
+ @client_filter = proc
+ end
+
+ # Set a filter Proc for packets coming from the broker (to the client).
+ def broker_filter=(proc)
+ @broker_filter = proc
+ end
+
+ # Start accepting connections and processing packets.
+ def run
+ loop do
+ # Wait for a client to connect and then create a thread for it
+ Thread.new(@server.accept) do |client_socket|
+ logger.info "Accepted client: #{client_socket.peeraddr.join(':')}"
+ broker_socket = TCPSocket.new(@broker_host,@broker_port)
+ begin
+ process_packets(client_socket,broker_socket)
+ rescue Exception => exp
+ logger.error exp.to_s
end
+ logger.info "Disconnected: #{client_socket.peeraddr.join(':')}"
+ broker_socket.close
+ client_socket.close
end
end
+ end
- private
-
- def process_packets(client_socket,broker_socket)
- loop do
- # Wait for some data on either socket
- selected = IO.select([client_socket,broker_socket], nil, nil, @select_timeout)
- if selected.nil?
- # Timeout
- raise "Timeout in select"
- else
- # Iterate through each of the sockets with data to read
- if selected[0].include?(client_socket)
- packet = MQTT::Packet.read(client_socket)
- logger.debug "client -> <#{packet.type}>"
- packet = @client_filter.call(packet) unless @client_filter.nil?
- unless packet.nil?
- broker_socket.write(packet)
- logger.debug "<#{packet.type}> -> broker"
- end
- elsif selected[0].include?(broker_socket)
- packet = MQTT::Packet.read(broker_socket)
- logger.debug "broker -> <#{packet.type}>"
- packet = @broker_filter.call(packet) unless @broker_filter.nil?
- unless packet.nil?
- client_socket.write(packet)
- logger.debug "<#{packet.type}> -> client"
- end
- else
- logger.error "Problem with select: socket is neither broker or client"
+ private
+
+ def process_packets(client_socket,broker_socket)
+ loop do
+ # Wait for some data on either socket
+ selected = IO.select([client_socket,broker_socket], nil, nil, @select_timeout)
+ if selected.nil?
+ # Timeout
+ raise "Timeout in select"
+ else
+ # Iterate through each of the sockets with data to read
+ if selected[0].include?(client_socket)
+ packet = MQTT::Packet.read(client_socket)
+ logger.debug "client -> <#{packet.type}>"
+ packet = @client_filter.call(packet) unless @client_filter.nil?
+ unless packet.nil?
+ broker_socket.write(packet)
+ logger.debug "<#{packet.type}> -> broker"
end
+ elsif selected[0].include?(broker_socket)
+ packet = MQTT::Packet.read(broker_socket)
+ logger.debug "broker -> <#{packet.type}>"
+ packet = @broker_filter.call(packet) unless @broker_filter.nil?
+ unless packet.nil?
+ client_socket.write(packet)
+ logger.debug "<#{packet.type}> -> client"
+ end
+ else
+ logger.error "Problem with select: socket is neither broker or client"
end
end
end
-
end
-end
\ No newline at end of file
+
+end