lib/vertica/connection.rb in vertica-0.8.1 vs lib/vertica/connection.rb in vertica-0.9.0.beta1

- old
+ new

@@ -1,104 +1,87 @@ module Vertica - class Connection STATUSES = { - ?I => :no_transaction, - ?T => :in_transaction, - ?E => :failed_transaction + 'I' => :no_transaction, + 'T' => :in_transaction, + 'E' => :failed_transaction } + attr_reader :options, :notices, :transaction_status, :backend_pid, :backend_key, :notifications, :parameters + def self.cancel(existing_conn) conn = self.new(existing_conn.options.merge(:skip_startup => true)) conn.write Messages::CancelRequest.new(existing_conn.backend_pid, existing_conn.backend_key) conn.write Messages::Flush.new - conn.connection.close + conn.socket.close end def initialize(options = {}) reset_values @options = options + @notices = [] unless options[:skip_startup] - connection.write Messages::Startup.new(@options[:user], @options[:database]).to_bytes + write Messages::Startup.new(@options[:user], @options[:database]) process + + query("SET SEARCH_PATH TO #{options[:search_path]}") if options[:search_path] + query("SET ROLE #{options[:role]}") if options[:role] end end - def connection - @connection ||= begin - conn = VerticaSocket.new(@options[:host], @options[:port].to_s) + def socket + @socket ||= begin + conn = TCPSocket.new(@options[:host], @options[:port].to_s) if @options[:ssl] conn.write Messages::SslRequest.new.to_bytes - if conn.read_byte == ?S + if conn.read(1) == 'S' conn = OpenSSL::SSL::SSLSocket.new(conn, OpenSSL::SSL::SSLContext.new) conn.sync = true conn.connect else raise Error::ConnectionError.new("SSL requested but server doesn't support it.") end end + conn end end + def ssl? + socket.kind_of?(OpenSSL::SSL::SSLSocket) + end + def opened? - @connection && @backend_pid && @transaction_status + @socket && @backend_pid && @transaction_status end def closed? !opened? end def write(message) - connection.write_message message + raise ArgumentError, "invalid message: (#{message.inspect})" unless message.respond_to?(:to_bytes) + socket.write message.to_bytes end def close write Messages::Terminate.new - connection.shutdown - @connection = nil - rescue Errno::ENOTCONN # the backend closed the connection already + socket.close + @socket = nil + rescue Errno::ENOTCONN # the backend closed the socket already ensure reset_values end def reset close if opened? reset_values end - def options - @options.dup - end - - def transaction_status - @transaction_status - end - - def backend_pid - @backend_pid - end - - def backend_key - @backend_key - end - - def notifications - @notifications - end - - def parameters - @parameters.dup - end - - def put_copy_data; raise NotImplementedError.new; end - def put_copy_end; raise NotImplementedError.new; end - def get_copy_data; raise NotImplementedError.new; end - def query(query_string, &block) raise ArgumentError.new("Query string cannot be blank or empty.") if query_string.nil? || query_string.empty? reset_result write Messages::Query.new(query_string) @process_row = block @@ -139,14 +122,29 @@ result end protected + + def read_bytes(n) + bytes = socket.read(n) + raise Vertica::Error::ConnectionError.new("Couldn't read #{n} characters from socket.") if bytes.nil? || bytes.size != n + return bytes + end + + def read_message + type = read_bytes(1) + size = read_bytes(4).unpack('N').first + raise Vertica::Error::MessageError.new("Bad message size: #{size}.") unless size >= 4 + Messages::BackendMessage.factory type, read_bytes(size - 4) + end + + def process(return_result = false) result = return_result ? Result.new : nil loop do - case message = connection.read_message + case message = read_message when Messages::Authentication if message.code != Messages::Authentication::OK write Messages::Password.new(@options[:password], message.code, {:user => @options[:user], :salt => message.salt}) end @@ -157,16 +155,16 @@ when Messages::DataRow @process_row.call(result.format_row(message)) if @process_row && result result.add_row(message) if result && !@process_row when Messages::ErrorResponse - raise Error::MessageError.new(message.error) + error_class = result ? Vertica::Error::QueryError : Vertica::Error::ConnectionError + raise error_class.new(message.error_message) + when Messages::NoticeResponse - message.notices.each do |notice| - @notices << Notice.new(notice[0], notice[1]) - end + @notices << message.values when Messages::NotificationResponse @notifications << Notification.new(message.pid, message.condition, message.additional_info) when Messages::ParameterStatus @@ -191,18 +189,10 @@ when Messages::CloseComplete, Messages::CommandComplete, Messages::ParseComplete, Messages::PortalSuspended break - # when Messages::CopyData - # # nothing - # when Messages::CopyDone - # # nothing - # when Messages::CopyInResponse - # raise 'not done' - # when Messages::CopyOutResponse - # raise 'not done' end end result end @@ -212,10 +202,10 @@ reset_result @parameters = {} @backend_pid = nil @backend_key = nil @transaction_status = nil - @connection = nil + @socket = nil @process_row = nil end def reset_notifications @notifications = []