lib/vertica/connection.rb in vertica-0.10.2 vs lib/vertica/connection.rb in vertica-0.10.3
- old
+ new
@@ -48,11 +48,11 @@
raw_socket
end
end
def ssl?
- Object.const_defined?('OpenSSL') && socket.kind_of?(OpenSSL::SSL::SSLSocket)
+ Object.const_defined?('OpenSSL') && @socket.kind_of?(OpenSSL::SSL::SSLSocket)
end
def opened?
@socket && @backend_pid && @transaction_status
end
@@ -67,29 +67,34 @@
def ready_for_query?
@current_job.nil?
end
- def write(message)
+ def write_message(message)
raise ArgumentError, "invalid message: (#{message.inspect})" unless message.respond_to?(:to_bytes)
puts "=> #{message.inspect}" if @debug
- socket.write message.to_bytes
- rescue SystemCallError => e
+ begin
+ socket.write_nonblock message.to_bytes
+ rescue IO::WaitReadable, IO::WaitWritable => wait_error
+ io_select(wait_error)
+ retry
+ end
+ rescue SystemCallError, IOError => e
close_socket
raise Vertica::Error::ConnectionError.new(e.message)
end
def close
- write Vertica::Messages::Terminate.new
+ write_message Vertica::Messages::Terminate.new
ensure
close_socket
end
def close_socket
socket.close
@socket = nil
- rescue SystemCallError
+ rescue SystemCallError, IOError
ensure
reset_values
end
def reset_connection
@@ -102,12 +107,12 @@
initialize_connection
end
def cancel
conn = self.class.new(options.merge(:skip_startup => true))
- conn.write Vertica::Messages::CancelRequest.new(backend_pid, backend_key)
- conn.write Vertica::Messages::Flush.new
+ conn.write_message Vertica::Messages::CancelRequest.new(backend_pid, backend_key)
+ conn.write_message Vertica::Messages::Flush.new
conn.socket.close
end
def interrupt
raise Vertica::Error::InterruptImpossible, "Session cannopt be interrupted because the session ID is not known!" if session_id.nil?
@@ -120,23 +125,17 @@
def interruptable?
!session_id.nil?
end
def read_message
- ready = IO.select([socket], nil, nil, @options[:read_timeout])
- if ready
- type = read_bytes(1)
- size = read_bytes(4).unpack('N').first
- raise Vertica::Error::MessageError.new("Bad message size: #{size}.") unless size >= 4
- message = Vertica::Messages::BackendMessage.factory type, read_bytes(size - 4)
- puts "<= #{message.inspect}" if @debug
- return message
- else
- close
- raise Vertica::Error::TimedOutError.new("Connection timed out.")
- end
- rescue SystemCallError => e
+ type = read_bytes(1)
+ size = read_bytes(4).unpack('N').first
+ raise Vertica::Error::MessageError.new("Bad message size: #{size}.") unless size >= 4
+ message = Vertica::Messages::BackendMessage.factory type, read_bytes(size - 4)
+ puts "<= #{message.inspect}" if @debug
+ return message
+ rescue SystemCallError, IOError => e
close_socket
raise Vertica::Error::ConnectionError.new(e.message)
end
def process_message(message)
@@ -205,24 +204,33 @@
output << input.read(COPY_FROM_IO_BLOCK_SIZE)
end
end
def read_bytes(n)
- bytes = socket.read(n)
+ socket.read_nonblock(n)
+ rescue IO::WaitReadable, IO::WaitWritable => wait_error
+ io_select(wait_error)
+ retry
+ end
- raise Errno::EIO if bytes.nil? || bytes.size != n
-
- return bytes
+ def io_select(exception)
+ readers, writers = nil, nil
+ readers = [socket] if exception.is_a?(IO::WaitReadable)
+ writers = [socket] if exception.is_a?(IO::WaitWritable)
+ if IO.select(readers, writers, nil, @options[:read_timeout]).nil?
+ close
+ raise Vertica::Error::TimedOutError.new("Connection timed out.")
+ end
end
def startup_connection
- write Vertica::Messages::Startup.new(@options[:user] || @options[:username], @options[:database])
+ write_message Vertica::Messages::Startup.new(@options[:user] || @options[:username], @options[:database])
message = nil
begin
case message = read_message
when Vertica::Messages::Authentication
if message.code != Vertica::Messages::Authentication::OK
- write Vertica::Messages::Password.new(@options[:password], message.code, {:user => @options[:user], :salt => message.salt})
+ write_message Vertica::Messages::Password.new(@options[:password], message.code, {:user => @options[:user], :salt => message.salt})
end
else
process_message(message)
end
end until message.kind_of?(Vertica::Messages::ReadyForQuery)