lib/vertica/connection.rb in vertica-0.10.2 vs lib/vertica/connection.rb in vertica-0.10.3

- old
+ new

@@ -48,11 +48,11 @@ raw_socket end end def ssl? - Object.const_defined?('OpenSSL') && socket.kind_of?(OpenSSL::SSL::SSLSocket) + Object.const_defined?('OpenSSL') && @socket.kind_of?(OpenSSL::SSL::SSLSocket) end def opened? @socket && @backend_pid && @transaction_status end @@ -67,29 +67,34 @@ def ready_for_query? @current_job.nil? end - def write(message) + def write_message(message) raise ArgumentError, "invalid message: (#{message.inspect})" unless message.respond_to?(:to_bytes) puts "=> #{message.inspect}" if @debug - socket.write message.to_bytes - rescue SystemCallError => e + begin + socket.write_nonblock message.to_bytes + rescue IO::WaitReadable, IO::WaitWritable => wait_error + io_select(wait_error) + retry + end + rescue SystemCallError, IOError => e close_socket raise Vertica::Error::ConnectionError.new(e.message) end def close - write Vertica::Messages::Terminate.new + write_message Vertica::Messages::Terminate.new ensure close_socket end def close_socket socket.close @socket = nil - rescue SystemCallError + rescue SystemCallError, IOError ensure reset_values end def reset_connection @@ -102,12 +107,12 @@ initialize_connection end def cancel conn = self.class.new(options.merge(:skip_startup => true)) - conn.write Vertica::Messages::CancelRequest.new(backend_pid, backend_key) - conn.write Vertica::Messages::Flush.new + conn.write_message Vertica::Messages::CancelRequest.new(backend_pid, backend_key) + conn.write_message Vertica::Messages::Flush.new conn.socket.close end def interrupt raise Vertica::Error::InterruptImpossible, "Session cannopt be interrupted because the session ID is not known!" if session_id.nil? @@ -120,23 +125,17 @@ def interruptable? !session_id.nil? end def read_message - ready = IO.select([socket], nil, nil, @options[:read_timeout]) - if ready - type = read_bytes(1) - size = read_bytes(4).unpack('N').first - raise Vertica::Error::MessageError.new("Bad message size: #{size}.") unless size >= 4 - message = Vertica::Messages::BackendMessage.factory type, read_bytes(size - 4) - puts "<= #{message.inspect}" if @debug - return message - else - close - raise Vertica::Error::TimedOutError.new("Connection timed out.") - end - rescue SystemCallError => e + type = read_bytes(1) + size = read_bytes(4).unpack('N').first + raise Vertica::Error::MessageError.new("Bad message size: #{size}.") unless size >= 4 + message = Vertica::Messages::BackendMessage.factory type, read_bytes(size - 4) + puts "<= #{message.inspect}" if @debug + return message + rescue SystemCallError, IOError => e close_socket raise Vertica::Error::ConnectionError.new(e.message) end def process_message(message) @@ -205,24 +204,33 @@ output << input.read(COPY_FROM_IO_BLOCK_SIZE) end end def read_bytes(n) - bytes = socket.read(n) + socket.read_nonblock(n) + rescue IO::WaitReadable, IO::WaitWritable => wait_error + io_select(wait_error) + retry + end - raise Errno::EIO if bytes.nil? || bytes.size != n - - return bytes + def io_select(exception) + readers, writers = nil, nil + readers = [socket] if exception.is_a?(IO::WaitReadable) + writers = [socket] if exception.is_a?(IO::WaitWritable) + if IO.select(readers, writers, nil, @options[:read_timeout]).nil? + close + raise Vertica::Error::TimedOutError.new("Connection timed out.") + end end def startup_connection - write Vertica::Messages::Startup.new(@options[:user] || @options[:username], @options[:database]) + write_message Vertica::Messages::Startup.new(@options[:user] || @options[:username], @options[:database]) message = nil begin case message = read_message when Vertica::Messages::Authentication if message.code != Vertica::Messages::Authentication::OK - write Vertica::Messages::Password.new(@options[:password], message.code, {:user => @options[:user], :salt => message.salt}) + write_message Vertica::Messages::Password.new(@options[:password], message.code, {:user => @options[:user], :salt => message.salt}) end else process_message(message) end end until message.kind_of?(Vertica::Messages::ReadyForQuery)