lib/em-proxy/connection.rb in em-proxy-0.1.5 vs lib/em-proxy/connection.rb in em-proxy-0.1.6

- old
+ new

@@ -1,114 +1,114 @@ -module EventMachine - module ProxyServer - class Connection < EventMachine::Connection - attr_accessor :debug - - ##### Proxy Methods - def on_data(&blk); @on_data = blk; end - def on_response(&blk); @on_response = blk; end - def on_finish(&blk); @on_finish = blk; end - def on_connect(&blk); @on_connect = blk; end - - ##### EventMachine - def initialize(options) - @debug = options[:debug] || false - @servers = {} - end - - def receive_data(data) - debug [:connection, data] - processed = @on_data.call(data) if @on_data - - return if processed == :async or processed.nil? - relay_to_servers(processed) - end - - def relay_to_servers(processed) - if processed.is_a? Array - data, servers = *processed - - # guard for "unbound" servers - servers = servers.collect {|s| @servers[s]}.compact - else - data = processed - servers ||= @servers.values.compact - end - - servers.each do |s| - s.send_data data unless data.nil? - end - end - - # - # initialize connections to backend servers - # - def server(name, opts) - srv = EventMachine::connect(opts[:host], opts[:port], EventMachine::ProxyServer::Backend, @debug) do |c| - c.name = name - c.plexer = self - c.proxy_incoming_to(self, 10240) if opts[:relay_server] - end - self.proxy_incoming_to(srv, 10240) if opts[:relay_client] - - @servers[name] = srv - end - - # - # [ip, port] of the connected client - # - def peer - peername = get_peername - @peer ||= peername ? Socket.unpack_sockaddr_in(peername).reverse : nil - end - - # - # relay data from backend server to client - # - def relay_from_backend(name, data) - debug [:relay_from_backend, name, data] - - data = @on_response.call(name, data) if @on_response - send_data data unless data.nil? - end - - def connected(name) - debug [:connected] - @on_connect.call(name) if @on_connect - end - - def unbind - debug [:unbind, :connection] - - # terminate any unfinished connections - @servers.values.compact.each do |s| - s.close_connection - end - end - - def unbind_backend(name) - debug [:unbind_backend, name] - @servers[name] = nil - - # if all connections are terminated downstream, then notify client - close_connection_after_writing if @servers.values.compact.size.zero? - - if @on_finish - @on_finish.call(name) - - # not sure if this is required - # @on_finish.call(:done) if @servers.values.compact.size.zero? - end - end - - private - - def debug(*data) - if @debug - require 'pp' - pp data - puts - end - end - end - end -end +module EventMachine + module ProxyServer + class Connection < EventMachine::Connection + attr_accessor :debug + + ##### Proxy Methods + def on_data(&blk); @on_data = blk; end + def on_response(&blk); @on_response = blk; end + def on_finish(&blk); @on_finish = blk; end + def on_connect(&blk); @on_connect = blk; end + + ##### EventMachine + def initialize(options) + @debug = options[:debug] || false + @servers = {} + end + + def receive_data(data) + debug [:connection, data] + processed = @on_data.call(data) if @on_data + + return if processed == :async or processed.nil? + relay_to_servers(processed) + end + + def relay_to_servers(processed) + if processed.is_a? Array + data, servers = *processed + + # guard for "unbound" servers + servers = servers.collect {|s| @servers[s]}.compact + else + data = processed + servers ||= @servers.values.compact + end + + servers.each do |s| + s.send_data data unless data.nil? + end + end + + # + # initialize connections to backend servers + # + def server(name, opts) + srv = EventMachine::connect(opts[:host], opts[:port], EventMachine::ProxyServer::Backend, @debug) do |c| + c.name = name + c.plexer = self + c.proxy_incoming_to(self, 10240) if opts[:relay_server] + end + self.proxy_incoming_to(srv, 10240) if opts[:relay_client] + + @servers[name] = srv + end + + # + # [ip, port] of the connected client + # + def peer + peername = get_peername + @peer ||= peername ? Socket.unpack_sockaddr_in(peername).reverse : nil + end + + # + # relay data from backend server to client + # + def relay_from_backend(name, data) + debug [:relay_from_backend, name, data] + + data = @on_response.call(name, data) if @on_response + send_data data unless data.nil? + end + + def connected(name) + debug [:connected] + @on_connect.call(name) if @on_connect + end + + def unbind + debug [:unbind, :connection] + + # terminate any unfinished connections + @servers.values.compact.each do |s| + s.close_connection + end + end + + def unbind_backend(name) + debug [:unbind_backend, name] + @servers[name] = nil + close = :close + + if @on_finish + close = @on_finish.call(name) + end + + # if all connections are terminated downstream, then notify client + if @servers.values.compact.size.zero? and close != :keep + close_connection_after_writing + end + end + + private + + def debug(*data) + if @debug + require 'pp' + pp data + puts + end + end + end + end +end