lib/proxymachine/client_connection.rb in fizx-proxymachine-1.3.0 vs lib/proxymachine/client_connection.rb in fizx-proxymachine-1.4.0
- old
+ new
@@ -8,11 +8,15 @@
end
def post_init
$logger.info "Accepted #{peer}"
@buffer = []
+ @remote = nil
@tries = 0
+ @connected = false
+ @connect_timeout = nil
+ @inactivity_timeout = nil
ProxyMachine.incr
end
def peer
@peer ||=
@@ -21,79 +25,106 @@
"#{ip}:#{port}"
end
end
def receive_data(data)
- if !@server_side
+ if !@connected
@buffer << data
- ensure_server_side_connection
+ establish_remote_server if @remote.nil?
end
rescue => e
close_connection
- $logger.info "#{e.class} - #{e.message}"
+ $logger.error "#{e.class} - #{e.message}"
end
- def ensure_server_side_connection
- @timer.cancel if @timer
- unless @server_side
- commands = ProxyMachine.router.call(@buffer.join)
- $logger.info "#{peer} #{commands.inspect}"
- close_connection unless commands.instance_of?(Hash)
- if remote = commands[:remote]
- m, host, port = *remote.match(/^(.+):(.+)$/)
- klass = commands[:callback] ? CallbackServerConnection : ServerConnection
- if try_server_connect(host, port.to_i, klass)
- @server_side.callback = commands[:callback] if commands[:callback]
-
- if data = commands[:data]
- @buffer = [data]
- end
- if reply = commands[:reply]
- send_data(reply)
- end
- send_and_clear_buffer
- end
- elsif close = commands[:close]
- if close == true
- close_connection
- else
- send_data(close)
- close_connection_after_writing
- end
- elsif commands[:noop]
- # do nothing
- else
+ # Called when new data is available from the client but no remote
+ # server has been established. If a remote can be established, an
+ # attempt is made to connect and proxy to the remote server.
+ def establish_remote_server
+ fail "establish_remote_server called with remote established" if @remote
+ @commands = ProxyMachine.router.call(@buffer.join)
+ $logger.info "#{peer} #{@commands.inspect}"
+ close_connection unless @commands.instance_of?(Hash)
+ if remote = @commands[:remote]
+ m, host, port = *remote.match(/^(.+):(.+)$/)
+ @remote = [host, port]
+ if data = @commands[:data]
+ @buffer = [data]
+ end
+ if reply = @commands[:reply]
+ send_data(reply)
+ end
+ @connect_timeout = @commands[:connect_timeout]
+ @inactivity_timeout = @commands[:inactivity_timeout]
+ connect_to_server
+ elsif close = @commands[:close]
+ if close == true
close_connection
+ else
+ send_data(close)
+ close_connection_after_writing
end
+ elsif @commands[:noop]
+ # do nothing
+ else
+ close_connection
end
end
- def try_server_connect(host, port, klass)
+ # Connect to the remote server
+ def connect_to_server
+ fail "connect_server called without remote established" if @remote.nil?
+ host, port = @remote
+ $logger.info "Establishing new connection with #{host}:#{port}"
+ cb = @commands[:callback]
+ klass = cb ? CallbackServerConnection : ServerConnection
@server_side = klass.request(host, port, self)
+ @server_side.callback = cb if cb
+ @server_side.pending_connect_timeout = @connect_timeout
+ @server_side.comm_inactivity_timeout = @inactivity_timeout
+ end
+
+ # Called by the server side immediately after the server connection was
+ # successfully established. Send any buffer we've accumulated and start
+ # raw proxying.
+ def server_connection_success
+ $logger.info "Successful connection to #{@remote.join(':')}"
+ @connected = true
+ @buffer.each { |data| @server_side.send_data(data) }
+ @buffer = []
proxy_incoming_to(@server_side, 10240)
- $logger.info "Successful connection to #{host}:#{port}."
- true
- rescue => e
- if @tries < 10
+ end
+
+ # Called by the server side when a connection could not be established,
+ # either due to a hard connection failure or to a connection timeout.
+ # Leave the client connection open and retry the server connection up to
+ # 10 times.
+ def server_connection_failed
+ @server_side = nil
+ if @connected
+ $logger.error "Connection with #{@remote.join(':')} was terminated prematurely."
+ close_connection
+ ProxyMachine.connect_error_callback.call(@remote.join(':'))
+ elsif @tries < 10
@tries += 1
- $logger.info "Failed on server connect attempt #{@tries}. Trying again..."
- @timer.cancel if @timer
- @timer = EventMachine::Timer.new(0.1) do
- self.ensure_server_side_connection
- end
+ $logger.warn "Retrying connection with #{@remote.join(':')} (##{@tries})"
+ EM.add_timer(0.1) { connect_to_server }
else
- $logger.info "Failed after ten connection attempts."
+ $logger.error "Connect #{@remote.join(':')} failed after ten attempts."
+ close_connection
+ ProxyMachine.connect_error_callback.call(@remote.join(':'))
end
- false
end
- def send_and_clear_buffer
- if !@buffer.empty?
- @buffer.each do |x|
- @server_side.send_data(x)
- end
- @buffer = []
- end
+ # Called by the server when an inactivity timeout is detected. The timeout
+ # argument is the configured inactivity timeout in seconds as a float; the
+ # elapsed argument is the amount of time that actually elapsed since
+ # connecting but not receiving any data.
+ def server_inactivity_timeout(timeout, elapsed)
+ $logger.error "Disconnecting #{@remote.join(':')} after #{elapsed}s of inactivity (> #{timeout.inspect})"
+ @server_side = nil
+ close_connection
+ ProxyMachine.inactivity_error_callback.call(@remote.join(':'))
end
def unbind
@server_side.close_connection_after_writing if @server_side
ProxyMachine.decr