lib/proxymachine/client_connection.rb in fizx-proxymachine-1.3.0 vs lib/proxymachine/client_connection.rb in fizx-proxymachine-1.4.0

- old
+ new

@@ -8,11 +8,15 @@ end def post_init $logger.info "Accepted #{peer}" @buffer = [] + @remote = nil @tries = 0 + @connected = false + @connect_timeout = nil + @inactivity_timeout = nil ProxyMachine.incr end def peer @peer ||= @@ -21,79 +25,106 @@ "#{ip}:#{port}" end end def receive_data(data) - if !@server_side + if !@connected @buffer << data - ensure_server_side_connection + establish_remote_server if @remote.nil? end rescue => e close_connection - $logger.info "#{e.class} - #{e.message}" + $logger.error "#{e.class} - #{e.message}" end - def ensure_server_side_connection - @timer.cancel if @timer - unless @server_side - commands = ProxyMachine.router.call(@buffer.join) - $logger.info "#{peer} #{commands.inspect}" - close_connection unless commands.instance_of?(Hash) - if remote = commands[:remote] - m, host, port = *remote.match(/^(.+):(.+)$/) - klass = commands[:callback] ? CallbackServerConnection : ServerConnection - if try_server_connect(host, port.to_i, klass) - @server_side.callback = commands[:callback] if commands[:callback] - - if data = commands[:data] - @buffer = [data] - end - if reply = commands[:reply] - send_data(reply) - end - send_and_clear_buffer - end - elsif close = commands[:close] - if close == true - close_connection - else - send_data(close) - close_connection_after_writing - end - elsif commands[:noop] - # do nothing - else + # Called when new data is available from the client but no remote + # server has been established. If a remote can be established, an + # attempt is made to connect and proxy to the remote server. + def establish_remote_server + fail "establish_remote_server called with remote established" if @remote + @commands = ProxyMachine.router.call(@buffer.join) + $logger.info "#{peer} #{@commands.inspect}" + close_connection unless @commands.instance_of?(Hash) + if remote = @commands[:remote] + m, host, port = *remote.match(/^(.+):(.+)$/) + @remote = [host, port] + if data = @commands[:data] + @buffer = [data] + end + if reply = @commands[:reply] + send_data(reply) + end + @connect_timeout = @commands[:connect_timeout] + @inactivity_timeout = @commands[:inactivity_timeout] + connect_to_server + elsif close = @commands[:close] + if close == true close_connection + else + send_data(close) + close_connection_after_writing end + elsif @commands[:noop] + # do nothing + else + close_connection end end - def try_server_connect(host, port, klass) + # Connect to the remote server + def connect_to_server + fail "connect_server called without remote established" if @remote.nil? + host, port = @remote + $logger.info "Establishing new connection with #{host}:#{port}" + cb = @commands[:callback] + klass = cb ? CallbackServerConnection : ServerConnection @server_side = klass.request(host, port, self) + @server_side.callback = cb if cb + @server_side.pending_connect_timeout = @connect_timeout + @server_side.comm_inactivity_timeout = @inactivity_timeout + end + + # Called by the server side immediately after the server connection was + # successfully established. Send any buffer we've accumulated and start + # raw proxying. + def server_connection_success + $logger.info "Successful connection to #{@remote.join(':')}" + @connected = true + @buffer.each { |data| @server_side.send_data(data) } + @buffer = [] proxy_incoming_to(@server_side, 10240) - $logger.info "Successful connection to #{host}:#{port}." - true - rescue => e - if @tries < 10 + end + + # Called by the server side when a connection could not be established, + # either due to a hard connection failure or to a connection timeout. + # Leave the client connection open and retry the server connection up to + # 10 times. + def server_connection_failed + @server_side = nil + if @connected + $logger.error "Connection with #{@remote.join(':')} was terminated prematurely." + close_connection + ProxyMachine.connect_error_callback.call(@remote.join(':')) + elsif @tries < 10 @tries += 1 - $logger.info "Failed on server connect attempt #{@tries}. Trying again..." - @timer.cancel if @timer - @timer = EventMachine::Timer.new(0.1) do - self.ensure_server_side_connection - end + $logger.warn "Retrying connection with #{@remote.join(':')} (##{@tries})" + EM.add_timer(0.1) { connect_to_server } else - $logger.info "Failed after ten connection attempts." + $logger.error "Connect #{@remote.join(':')} failed after ten attempts." + close_connection + ProxyMachine.connect_error_callback.call(@remote.join(':')) end - false end - def send_and_clear_buffer - if !@buffer.empty? - @buffer.each do |x| - @server_side.send_data(x) - end - @buffer = [] - end + # Called by the server when an inactivity timeout is detected. The timeout + # argument is the configured inactivity timeout in seconds as a float; the + # elapsed argument is the amount of time that actually elapsed since + # connecting but not receiving any data. + def server_inactivity_timeout(timeout, elapsed) + $logger.error "Disconnecting #{@remote.join(':')} after #{elapsed}s of inactivity (> #{timeout.inspect})" + @server_side = nil + close_connection + ProxyMachine.inactivity_error_callback.call(@remote.join(':')) end def unbind @server_side.close_connection_after_writing if @server_side ProxyMachine.decr