lib/kjess/connection.rb in kjess-1.0.0 vs lib/kjess/connection.rb in kjess-1.1.0
- old
+ new
@@ -1,74 +1,114 @@
require 'fcntl'
-require 'socket'
require 'resolv'
require 'resolv-replace'
require 'kjess/error'
+require 'kjess/socket'
module KJess
# Connection
class Connection
class Error < KJess::Error; end
- CRLF = "\r\n"
+ # Public: The hostname/ip address to connect to.
+ def host
+ socket.host
+ end
- # Public:
- # The hostname/ip address to connect to
- attr_reader :host
+ # Public: The port number to connect to. Default 22133
+ def port
+ socket.port
+ end
- # Public
- # The port number to connect to. Default 22133
- attr_reader :port
+ # Public: The timeout for connecting in seconds. Defaults to 2
+ def connect_timeout
+ socket.connect_timeout
+ end
- def initialize( host, port = 22133 )
- @host = host
- @port = Float( port ).to_i
- @socket = nil
+ # Public: The timeout for reading in seconds. Defaults to 2
+ def read_timeout
+ socket.read_timeout
end
- # Internal: Return the raw socket that is connected to the Kestrel server
- #
- # Returns the raw socket. If the socket is not connected it will connect and
- # then return it.
- #
- # Returns a TCPSocket
- def socket
- return @socket if @socket and not @socket.closed?
- return @socket = connect()
+ # Public: The timeout for writing in seconds. Defaults to 2
+ def write_timeout
+ socket.write_timeout
end
- # Internal: Create the socket we use to talk to the Kestrel server
- #
- # Returns a TCPSocket
- def connect
- sock = TCPSocket.new( host, port )
+ # Internal: return thekeepalive timeout
+ def keepalive_active?
+ socket.keepalive_active?
+ end
- # close file descriptors if we exec or something like that
- sock.fcntl(Fcntl::F_SETFD, Fcntl::FD_CLOEXEC)
+ # Internal: return the keepalive count
+ # The keepalive count
+ def keepalive_count
+ socket.keepalive_count
+ end
- # Disable Nagle's algorithm
- sock.setsockopt(Socket::IPPROTO_TCP, Socket::TCP_NODELAY, 1)
+ # Internal: return the keepalive interval
+ def keepalive_interval
+ socket.keepalive_interval
+ end
- # limit only to IPv4?
- # addr = ::Socket.getaddrinfo(host, nil, Socket::AF_INET)
- # sock = ::Socket.new(::Socket.const_get(addr[0][0]), Socket::SOCK_STREAM, 0)
- # saddr = ::Socket.pack_sockaddr_in(port, addr[0][3])
+ # Internal: return the keepalive idle
+ def keepalive_idle
+ socket.keepalive_idle
+ end
- # tcp keepalive
- # :SOL_SOCKET, :SO_KEEPALIVE, :SOL_TCP, :TCP_KEEPIDLE, :TCP_KEEPINTVL, :TCP_KEEPCNT].all?{|c| Socket.const_defined? c}
- # @sock.setsockopt(Socket::SOL_SOCKET, Socket::SO_KEEPALIVE, true)
- # @sock.setsockopt(Socket::SOL_TCP, Socket::TCP_KEEPIDLE, keepalive[:time])
- # @sock.setsockopt(Socket::SOL_TCP, Socket::TCP_KEEPINTVL, keepalive[:intvl])
- # @sock.setsockopt(Socket::SOL_TCP, Socket::TCP_KEEPCNT, keepalive[:probes])
- return sock
+ # TODO: make port an option at next major version number change
+ def initialize( host, port = 22133, options = {} )
+ if port.is_a?(Hash)
+ options = port
+ port = 22133
+ end
+
+ @options = options.dup
+ @options[:host] = host
+ @options[:port] = Float( port ).to_i
+ @socket = nil
+ @pid = nil
+ @read_buffer = ''
end
+ # Internal: Adds time to the read timeout
+ #
+ # additional_timeout - additional number of seconds to the read timeout
+ #
+ # Returns nothing
+ def with_additional_read_timeout(additional_timeout, &block)
+ old_read_timeout = socket.read_timeout
+ socket.read_timeout += additional_timeout
+ block.call
+ ensure
+ @read_timeout = old_read_timeout
+ end
+
+ # Internal: Return the socket that is connected to the Kestrel server
+ #
+ # Returns the socket. If the socket is not connected it will connect and
+ # then return it.
+ #
+ # Make sure that we close the socket if we are not the same process that
+ # opened that socket to begin with.
+ #
+ # Returns a KJess::Socket
+ def socket
+ close if @pid && @pid != Process.pid
+ return @socket if @socket and not @socket.closed?
+ @socket = Socket.connect( @options )
+ @pid = Process.pid
+ @read_buffer = ''
+ return @socket
+ end
+
# Internal: close the socket if it is not already closed
#
# Returns nothing
def close
@socket.close if @socket and not @socket.closed?
+ @read_buffer = ''
@socket = nil
end
# Internal: is the socket closed
#
@@ -83,37 +123,59 @@
#
# msg - the message to write
#
# Returns nothing
def write( msg )
- $stderr.write "--> #{msg}" if $DEBUG
+ $stderr.puts "--> #{msg}" if $DEBUG
socket.write( msg )
+ rescue KJess::Error
+ close
+ raise
end
# Internal: read a single line from the socket
#
# eom - the End Of Mesasge delimiter (default: "\r\n")
#
# Returns a String
def readline( eom = Protocol::CRLF )
- while line = socket.readline( eom ) do
- $stderr.write "<-- #{line}" if $DEBUG
+ while true
+ while (idx = @read_buffer.index(eom)) == nil
+ @read_buffer << socket.readpartial(10240)
+ end
+
+ line = @read_buffer.slice!(0, idx + eom.length)
+ $stderr.puts "<-- #{line}" if $DEBUG
break unless line.strip.length == 0
end
return line
+ rescue KJess::Error
+ close
+ raise
rescue EOFError
close
return "EOF"
+ rescue => e
+ close
+ raise Error, "Could not read from #{host}:#{port}: #{e.class}: #{e.message}", e.backtrace
end
# Internal: Read from the socket
#
- # args - this method takes the same arguments as IO#read
+ # nbytes - this method takes the number of bytes to read
#
# Returns what IO#read returns
- def read( *args )
- d = socket.read( *args )
- $stderr.puts "<-- #{d}" if $DEBUG
- return d
+ def read( nbytes )
+ while @read_buffer.length < nbytes
+ @read_buffer << socket.readpartial(nbytes - @read_buffer.length)
+ end
+
+ result = @read_buffer.slice!(0, nbytes)
+
+ $stderr.puts "<-- #{result}" if $DEBUG
+ return result
+ rescue KJess::Error
+ close
+ raise
end
end
end