lib/kjess/connection.rb in kjess-1.0.0 vs lib/kjess/connection.rb in kjess-1.1.0

- old
+ new

@@ -1,74 +1,114 @@ require 'fcntl' -require 'socket' require 'resolv' require 'resolv-replace' require 'kjess/error' +require 'kjess/socket' module KJess # Connection class Connection class Error < KJess::Error; end - CRLF = "\r\n" + # Public: The hostname/ip address to connect to. + def host + socket.host + end - # Public: - # The hostname/ip address to connect to - attr_reader :host + # Public: The port number to connect to. Default 22133 + def port + socket.port + end - # Public - # The port number to connect to. Default 22133 - attr_reader :port + # Public: The timeout for connecting in seconds. Defaults to 2 + def connect_timeout + socket.connect_timeout + end - def initialize( host, port = 22133 ) - @host = host - @port = Float( port ).to_i - @socket = nil + # Public: The timeout for reading in seconds. Defaults to 2 + def read_timeout + socket.read_timeout end - # Internal: Return the raw socket that is connected to the Kestrel server - # - # Returns the raw socket. If the socket is not connected it will connect and - # then return it. - # - # Returns a TCPSocket - def socket - return @socket if @socket and not @socket.closed? - return @socket = connect() + # Public: The timeout for writing in seconds. Defaults to 2 + def write_timeout + socket.write_timeout end - # Internal: Create the socket we use to talk to the Kestrel server - # - # Returns a TCPSocket - def connect - sock = TCPSocket.new( host, port ) + # Internal: return thekeepalive timeout + def keepalive_active? + socket.keepalive_active? + end - # close file descriptors if we exec or something like that - sock.fcntl(Fcntl::F_SETFD, Fcntl::FD_CLOEXEC) + # Internal: return the keepalive count + # The keepalive count + def keepalive_count + socket.keepalive_count + end - # Disable Nagle's algorithm - sock.setsockopt(Socket::IPPROTO_TCP, Socket::TCP_NODELAY, 1) + # Internal: return the keepalive interval + def keepalive_interval + socket.keepalive_interval + end - # limit only to IPv4? - # addr = ::Socket.getaddrinfo(host, nil, Socket::AF_INET) - # sock = ::Socket.new(::Socket.const_get(addr[0][0]), Socket::SOCK_STREAM, 0) - # saddr = ::Socket.pack_sockaddr_in(port, addr[0][3]) + # Internal: return the keepalive idle + def keepalive_idle + socket.keepalive_idle + end - # tcp keepalive - # :SOL_SOCKET, :SO_KEEPALIVE, :SOL_TCP, :TCP_KEEPIDLE, :TCP_KEEPINTVL, :TCP_KEEPCNT].all?{|c| Socket.const_defined? c} - # @sock.setsockopt(Socket::SOL_SOCKET, Socket::SO_KEEPALIVE, true) - # @sock.setsockopt(Socket::SOL_TCP, Socket::TCP_KEEPIDLE, keepalive[:time]) - # @sock.setsockopt(Socket::SOL_TCP, Socket::TCP_KEEPINTVL, keepalive[:intvl]) - # @sock.setsockopt(Socket::SOL_TCP, Socket::TCP_KEEPCNT, keepalive[:probes]) - return sock + # TODO: make port an option at next major version number change + def initialize( host, port = 22133, options = {} ) + if port.is_a?(Hash) + options = port + port = 22133 + end + + @options = options.dup + @options[:host] = host + @options[:port] = Float( port ).to_i + @socket = nil + @pid = nil + @read_buffer = '' end + # Internal: Adds time to the read timeout + # + # additional_timeout - additional number of seconds to the read timeout + # + # Returns nothing + def with_additional_read_timeout(additional_timeout, &block) + old_read_timeout = socket.read_timeout + socket.read_timeout += additional_timeout + block.call + ensure + @read_timeout = old_read_timeout + end + + # Internal: Return the socket that is connected to the Kestrel server + # + # Returns the socket. If the socket is not connected it will connect and + # then return it. + # + # Make sure that we close the socket if we are not the same process that + # opened that socket to begin with. + # + # Returns a KJess::Socket + def socket + close if @pid && @pid != Process.pid + return @socket if @socket and not @socket.closed? + @socket = Socket.connect( @options ) + @pid = Process.pid + @read_buffer = '' + return @socket + end + # Internal: close the socket if it is not already closed # # Returns nothing def close @socket.close if @socket and not @socket.closed? + @read_buffer = '' @socket = nil end # Internal: is the socket closed # @@ -83,37 +123,59 @@ # # msg - the message to write # # Returns nothing def write( msg ) - $stderr.write "--> #{msg}" if $DEBUG + $stderr.puts "--> #{msg}" if $DEBUG socket.write( msg ) + rescue KJess::Error + close + raise end # Internal: read a single line from the socket # # eom - the End Of Mesasge delimiter (default: "\r\n") # # Returns a String def readline( eom = Protocol::CRLF ) - while line = socket.readline( eom ) do - $stderr.write "<-- #{line}" if $DEBUG + while true + while (idx = @read_buffer.index(eom)) == nil + @read_buffer << socket.readpartial(10240) + end + + line = @read_buffer.slice!(0, idx + eom.length) + $stderr.puts "<-- #{line}" if $DEBUG break unless line.strip.length == 0 end return line + rescue KJess::Error + close + raise rescue EOFError close return "EOF" + rescue => e + close + raise Error, "Could not read from #{host}:#{port}: #{e.class}: #{e.message}", e.backtrace end # Internal: Read from the socket # - # args - this method takes the same arguments as IO#read + # nbytes - this method takes the number of bytes to read # # Returns what IO#read returns - def read( *args ) - d = socket.read( *args ) - $stderr.puts "<-- #{d}" if $DEBUG - return d + def read( nbytes ) + while @read_buffer.length < nbytes + @read_buffer << socket.readpartial(nbytes - @read_buffer.length) + end + + result = @read_buffer.slice!(0, nbytes) + + $stderr.puts "<-- #{result}" if $DEBUG + return result + rescue KJess::Error + close + raise end end end