require 'socket' require 'semantic_logger' module ResilientSocket # Make Socket calls resilient by adding timeouts, retries and specific # exception categories # # Resilient TCP Client with: # * Connection Timeouts # Ability to timeout if a connect does not complete within a reasonable time # For example, this can occur when the server is turned off without shutting down # causing clients to hang creating new connections # # * Automatic retries on startup connection failure # For example, the server is being restarted while the client is starting # Gives the server a few seconds to restart to # # * Automatic retries on active connection failures # If the server is restarted during # # Connection and Read Timeouts are fully configurable # # Raises ConnectionTimeout when the connection timeout is exceeded # Raises ReadTimeout when the read timeout is exceeded # Raises ConnectionFailure when a network error occurs whilst reading or writing # # Note: Only the following methods currently have auto-reconnect enabled: # * read # * write # # Future: # # * Automatic failover to another server should the current server not respond # to a connection request by supplying an array of host names # * Add auto-reconnect feature to sysread, syswrite, etc... # * To be a drop-in replacement to TCPSocket should also need to implement the # following TCPSocket instance methods: :addr, :peeraddr # # Design Notes: # * Does not inherit from Socket or TCP Socket because the socket instance # has to be completely destroyed and recreated after a connection failure # class TCPClient # Supports embedding user supplied data along with this connection # such as sequence number, etc. attr_accessor :user_data # Returns [String] Name of the server connected to including the port number # # Example: # localhost:2000 attr_reader :server # Returns [TrueClass|FalseClass] Whether send buffering is enabled for this connection attr_reader :buffered @@reconnect_on_errors = [ Errno::ECONNABORTED, Errno::ECONNREFUSED, Errno::ECONNRESET, Errno::EHOSTUNREACH, Errno::EIO, Errno::ENETDOWN, Errno::ENETRESET, Errno::EPIPE, Errno::ETIMEDOUT, EOFError, ] # Return the array of errors that will result in an automatic connection retry # To add any additional errors to the standard list: # ResilientSocket::TCPClient.reconnect_on_errors << Errno::EPROTO def self.reconnect_on_errors @@reconnect_on_errors end # Create a connection, call the supplied block and close the connection on # completion of the block # # See #initialize for the list of parameters # # Example # ResilientSocket::TCPClient.connect( # :server => 'server:3300', # :connect_retry_interval => 0.1, # :connect_retry_count => 5 # ) do |client| # client.retry_on_connection_failure do # client.send('Update the database') # end # response = client.read(20) # puts "Received: #{response}" # end # def self.connect(params={}) begin connection = self.new(params) yield(connection) ensure connection.close if connection end end # Create a new TCP Client connection # # Parameters: # :server [String] # URL of the server to connect to with port number # 'localhost:2000' # # :servers [Array of String] # Array of URL's of servers to connect to with port numbers # ['server1:2000', 'server2:2000'] # # The second server will only be attempted once the first server # cannot be connected to or has timed out on connect # A read failure or timeout will not result in switching to the second # server, only a connection failure or during an automatic reconnect # # :read_timeout [Float] # Time in seconds to timeout on read # Can be overridden by supplying a timeout in the read call # Default: 60 # # :connect_timeout [Float] # Time in seconds to timeout when trying to connect to the server # Default: Half of the :read_timeout ( 30 seconds ) # # :log_level [Symbol] # Only set this level to override the global SemanticLogger logging level # Can be used to turn on trace or debug level logging in production # Any valid SemanticLogger log level: # :trace, :debug, :info, :warn, :error, :fatal # # :buffered [Boolean] # Whether to use Nagle's Buffering algorithm (http://en.wikipedia.org/wiki/Nagle's_algorithm) # Recommend disabling for RPC style invocations where we don't want to wait for an # ACK from the server before sending the last partial segment # Buffering is recommended in a browser or file transfer style environment # where multiple sends are expected during a single response # Default: true # # :connect_retry_count [Fixnum] # Number of times to retry connecting when a connection fails # Default: 10 # # :connect_retry_interval [Float] # Number of seconds between connection retry attempts after the first failed attempt # Default: 0.5 # # :retry_count [Fixnum] # Number of times to retry when calling #retry_on_connection_failure # This is independent of :connect_retry_count which still applies with # connection failures. This retry controls upto how many times to retry the # supplied block should a connection failure occurr during the block # Default: 3 # # :on_connect [Proc] # Directly after a connection is established and before it is made available # for use this Block is invoked. # Typical Use Cases: # - Initialize per connection session sequence numbers # - Pass any authentication information to the server # - Perform a handshake with the server # # Example # client = ResilientSocket::TCPClient.new( # :server => 'server:3300', # :connect_retry_interval => 0.1, # :connect_retry_count => 5 # ) # # client.retry_on_connection_failure do # client.send('Update the database') # end # # # Read upto 20 characters from the server # response = client.read(20) # # puts "Received: #{response}" # client.close def initialize(parameters={}) params = parameters.dup @read_timeout = (params.delete(:read_timeout) || 60.0).to_f @connect_timeout = (params.delete(:connect_timeout) || (@read_timeout/2)).to_f buffered = params.delete(:buffered) @buffered = buffered.nil? ? true : buffered @connect_retry_count = params.delete(:connect_retry_count) || 10 @retry_count = params.delete(:retry_count) || 3 @connect_retry_interval = (params.delete(:connect_retry_interval) || 0.5).to_f @on_connect = params.delete(:on_connect) unless @servers = params.delete(:servers) raise "Missing mandatory :server or :servers" unless server = params.delete(:server) @servers = [ server ] end @logger = SemanticLogger::Logger.new("#{self.class.name} #{@servers.inspect}", params.delete(:log_level) || SemanticLogger::Logger.default_level) params.each_pair {|k,v| @logger.warn "Ignoring unknown option #{k} = #{v}"} # Connect to the Server connect end # Connect to the TCP server # # Raises ConnectionTimeout when the time taken to create a connection # exceeds the :connect_timeout # Raises ConnectionFailure whenever Socket raises an error such as Error::EACCESS etc, see Socket#connect for more information # # Error handling is implemented as follows: # 1. TCP Socket Connect failure: # Cannot reach server # Server is being restarted, or is not running # Retry 50 times every 100ms before raising a ConnectionFailure # - Means all calls to #connect will take at least 5 seconds before failing if the server is not running # - Allows hot restart of server process if it restarts within 5 seconds # # 2. TCP Socket Connect timeout: # Timed out after 5 seconds trying to connect to the server # Usually means server is busy or the remote server disappeared off the network recently # No retry, just raise a ConnectionTimeout # # Note: When multiple servers are supplied it will only try to connect to # the subsequent servers once the retry count has been exceeded # # Note: Calling #connect on an open connection will close the current connection # and create a new connection def connect @socket.close if @socket && !@socket.closed? if @servers.size > 1 # Try each server in sequence @servers.each_with_index do |server, server_id| begin connect_to_server(server) rescue ConnectionFailure => exc # Raise Exception once it has also failed to connect to the last server raise(exc) if @servers.size <= (server_id + 1) end end else connect_to_server(@servers.first) end # Invoke user supplied Block every time a new connection has been established @on_connect.call(self) if @on_connect true end # Send data to the server # # Use #with_retry to add resilience to the #send method # # Raises ConnectionFailure whenever the send fails # For a description of the errors, see Socket#write # def write(data) @logger.trace("#write ==> sending", data) @logger.benchmark_debug("#write ==> sent #{data.length} bytes") do begin @socket.write(data) rescue SystemCallError => exception @logger.warn "#write Connection failure: #{exception.class}: #{exception.message}" close raise ConnectionFailure.new("Send Connection failure: #{exception.class}: #{exception.message}", @server, exception) end end end # Returns a response from the server # # Raises ConnectionTimeout when the time taken to create a connection # exceeds the :connect_timeout # Connection is closed # Raises ConnectionFailure whenever Socket raises an error such as # Error::EACCESS etc, see Socket#connect for more information # Connection is closed # Raises ReadTimeout if the timeout has been exceeded waiting for the # requested number of bytes from the server # Partial data will not be returned # Connection is _not_ closed and #read can be called again later # to read the respnse from the connection # # Parameters # length [Fixnum] # The number of bytes to return # #read will not return unitl 'length' bytes have been received from # the server # # timeout [Float] # Optional: Override the default read timeout for this read # Number of seconds before raising ReadTimeout when no data has # been returned # Default: :read_timeout supplied to #initialize # # Note: After a ResilientSocket::ReadTimeout #read can be called again on # the same socket to read the response later. # If the application no longers want the connection after a # ResilientSocket::ReadTimeout, then the #close method _must_ be called # before calling _connect_ or _retry_on_connection_failure_ to create # a new connection # def read(length, buffer=nil, timeout=nil) result = nil @logger.benchmark_debug("#read <== read #{length} bytes") do # Block on data to read for @read_timeout seconds begin ready = IO.select([@socket], nil, [@socket], timeout || @read_timeout) unless ready @logger.warn "#read Timeout waiting for server to reply" raise ReadTimeout.new("Timedout after #{timeout || @read_timeout} seconds trying to read from #{@server}") end rescue IOError => exception @logger.warn "#read Connection failure while waiting for data: #{exception.class}: #{exception.message}" close raise ConnectionFailure.new("#{exception.class}: #{exception.message}", @server, exception) end # Read data from socket begin result = buffer.nil? ? @socket.read(length) : @socket.read(length, buffer) @logger.trace("#read <== received", result.inspect) # EOF before all the data was returned if result.nil? || (result.length < length) close @logger.warn "#read server closed the connection before #{length} bytes were returned" raise ConnectionFailure.new("Connection lost while reading data", @server, EOFError.new("end of file reached")) end rescue SystemCallError, IOError => exception close @logger.warn "#read Connection failure while reading data: #{exception.class}: #{exception.message}" raise ConnectionFailure.new("#{exception.class}: #{exception.message}", @server, exception) end end result end # Send and/or receive data with automatic retry on connection failure # # On a connection failure, it will close the connection and retry the block # Returns immediately on exception ReadTimeout # # 1. Example of a resilient _readonly_ request: # # When reading data from a server that does not change state on the server # Wrap both the send and the read with #retry_on_connection_failure # since it is safe to send the same data twice to the server # # # Since the send can be sent many times it is safe to also put the receive # # inside the retry block # value = client.retry_on_connection_failure do # client.send("GETVALUE:count\n") # client.read(20).strip.to_i # end # # 2. Example of a resilient request that _modifies_ data on the server: # # When changing state on the server, for example when updating a value # Wrap _only_ the send with #retry_on_connection_failure # The read must be outside the #retry_on_connection_failure since we must # not retry the send if the connection fails during the #read # # value = 45 # # Only the send is within the retry block since we cannot re-send once # # the send was successful since the server may have made the change # client.retry_on_connection_failure do # client.send("SETVALUE:#{count}\n") # end # # Server returns "SAVED" if the call was successfull # result = client.read(20).strip # # Error handling is implemented as follows: # If a network failure occurrs during the block invocation the block # will be called again with a new connection to the server. # It will only be retried up to 3 times # The re-connect will independently retry and timeout using all the # rules of #connect def retry_on_connection_failure retries = 0 begin connect if closed? yield(self) rescue ConnectionFailure => exception # Connection no longer usable. The next call to #retry_on_connection_failure # will create a new connection since this one is now closed close exc_str = exception.cause ? "#{exception.cause.class}: #{exception.cause.message}" : exception.message # Re-raise exceptions that should not be retried if !self.class.reconnect_on_errors.include?(exception.cause.class) @logger.warn "#retry_on_connection_failure not configured to retry: #{exc_str}" raise exception elsif retries < @retry_count retries += 1 @logger.warn "#retry_on_connection_failure retry #{retries} due to #{exception.class}: #{exception.message}" connect retry end @logger.error "#retry_on_connection_failure Connection failure: #{exception.class}: #{exception.message}. Giving up after #{retries} retries" raise ConnectionFailure.new("After #{retries} retries to host '#{server}': #{exc_str}", @server, exception.cause) rescue Exception => exc # With any other exception we have to close the connection since the connection # is now in an unknown state close raise exc end end # Close the socket only if it is not already closed # # Logs a warning if an error occurs trying to close the socket def close @socket.close unless @socket.closed? rescue IOError => exception @logger.warn "IOError when attempting to close socket: #{exception.class}: #{exception.message}" end # Returns whether the socket is closed def closed? @socket.closed? end # Returns whether the connection to the server is alive # # It is useful to call this method before making a call to the server # that would change data on the server # # Note: This method is only useful if the server closed the connection or # if a previous connection failure occurred. # If the server is hard killed this will still return true until one # or more writes are attempted # # Note: In testing the overhead of this call is rather low, with the ability to # make about 120,000 calls per second against an active connection. # I.e. About 8.3 micro seconds per call def alive? return false if @socket.closed? if IO.select([@socket], nil, nil, 0) !@socket.eof? rescue false else true end rescue IOError false end # See: Socket#setsockopt def setsockopt(level, optname, optval) @socket.setsockopt(level, optname, optval) end ############################################# protected # Try connecting to a single server # Returns the connected socket # # Raises ConnectionTimeout when the connection timeout has been exceeded # Raises ConnectionFailure def connect_to_server(server) # Have to use Socket internally instead of TCPSocket since TCPSocket # does not offer async connect API amongst others: # :accept, :accept_nonblock, :bind, :connect, :connect_nonblock, :getpeereid, # :ipv6only!, :listen, :recvfrom_nonblock, :sysaccept retries = 0 @logger.benchmark_info "Connecting to server #{server}" do host_name, port = server.split(":") port = port.to_i address = Socket.getaddrinfo(host_name, nil, Socket::AF_INET) socket_address = Socket.pack_sockaddr_in(port, address[0][3]) begin begin @socket = Socket.new(Socket.const_get(address[0][0]), Socket::SOCK_STREAM, 0) @socket.setsockopt(Socket::IPPROTO_TCP, Socket::TCP_NODELAY, 1) unless buffered @socket.connect_nonblock(socket_address) rescue Errno::EINPROGRESS resp = IO.select(nil, [@socket], nil, @connect_timeout) raise(ConnectionTimeout.new("Timedout after #{@connect_timeout} seconds trying to connect to #{server}")) unless resp begin @socket.connect_nonblock(socket_address) rescue Errno::EISCONN end end break rescue SystemCallError => exception if retries < @connect_retry_count && self.class.reconnect_on_errors.include?(exception.class) retries += 1 @logger.warn "Connection failure: #{exception.class}: #{exception.message}. Retry: #{retries}" sleep @connect_retry_interval retry end @logger.error "Connection failure: #{exception.class}: #{exception.message}. Giving up after #{retries} retries" raise ConnectionFailure.new("After #{retries} connection attempts to host '#{server}': #{exception.class}: #{exception.message}", @server, exception) end end @server = server end end end