lib/vertica/connection.rb in vertica-0.9.0.beta3 vs lib/vertica/connection.rb in vertica-0.9.0.beta4

- old
+ new

@@ -1,236 +1,187 @@ require 'socket' -module Vertica - class Connection +class Vertica::Connection - STATUSES = { - 'I' => :no_transaction, - 'T' => :in_transaction, - 'E' => :failed_transaction - } + attr_reader :options, :notices, :transaction_status, :backend_pid, :backend_key, :parameters, :notice_handler - attr_reader :options, :notices, :transaction_status, :backend_pid, :backend_key, :notifications, :parameters + attr_accessor :row_style, :debug - attr_accessor :row_style, :debug + def self.cancel(existing_conn) + conn = self.new(existing_conn.options.merge(:skip_startup => true)) + conn.write Vertica::Messages::CancelRequest.new(existing_conn.backend_pid, existing_conn.backend_key) + conn.write Vertica::Messages::Flush.new + conn.socket.close + end - def self.cancel(existing_conn) - conn = self.new(existing_conn.options.merge(:skip_startup => true)) - conn.write Messages::CancelRequest.new(existing_conn.backend_pid, existing_conn.backend_key) - conn.write Messages::Flush.new - conn.socket.close - end + # Opens a connectio the a Vertica server + # @param [Hash] options The connection options to use. + def initialize(options = {}) + reset_values - def initialize(options = {}) - reset_values + @options = {} + options.each { |key, value| @options[key.to_s.to_sym] = value } + @options[:port] ||= 5433 - @options = {} - options.each { |key, value| @options[key.to_s.to_sym] = value } - - @notices = [] - - @row_style = @options[:row_style] ? @options[:row_style] : :hash - - unless options[:skip_startup] - write Messages::Startup.new(@options[:user], @options[:database]) - process - - query("SET SEARCH_PATH TO #{options[:search_path]}") if options[:search_path] - query("SET ROLE #{options[:role]}") if options[:role] - end + @row_style = @options[:row_style] ? @options[:row_style] : :hash + unless options[:skip_startup] + startup_connection + initialize_connection end - - def socket - @socket ||= begin - raw_socket = TCPSocket.new(@options[:host], @options[:port].to_s) - if @options[:ssl] - require 'openssl/ssl' - raw_socket.write Messages::SslRequest.new.to_bytes - if raw_socket.read(1) == 'S' - raw_socket = OpenSSL::SSL::SSLSocket.new(raw_socket, OpenSSL::SSL::SSLContext.new) - raw_socket.sync = true - raw_socket.connect - else - raise Error::ConnectionError.new("SSL requested but server doesn't support it.") - end + end + + def on_notice(&block) + @notice_handler = block + end + + def socket + @socket ||= begin + raw_socket = TCPSocket.new(@options[:host], @options[:port].to_i) + if @options[:ssl] + require 'openssl/ssl' + raw_socket.write Vertica::Messages::SslRequest.new.to_bytes + if raw_socket.read(1) == 'S' + raw_socket = OpenSSL::SSL::SSLSocket.new(raw_socket, OpenSSL::SSL::SSLContext.new) + raw_socket.sync = true + raw_socket.connect + else + raise Vertica::Error::ConnectionError.new("SSL requested but server doesn't support it.") end - - raw_socket end + + raw_socket end + end - def ssl? - socket.kind_of?(OpenSSL::SSL::SSLSocket) - end + def ssl? + socket.kind_of?(OpenSSL::SSL::SSLSocket) + end - def opened? - @socket && @backend_pid && @transaction_status - end + def opened? + @socket && @backend_pid && @transaction_status + end - def closed? - !opened? - end + def closed? + !opened? + end - def write(message) - raise ArgumentError, "invalid message: (#{message.inspect})" unless message.respond_to?(:to_bytes) - puts "=> #{message.inspect}" if @debug - socket.write message.to_bytes - end + def write(message) + raise ArgumentError, "invalid message: (#{message.inspect})" unless message.respond_to?(:to_bytes) + puts "=> #{message.inspect}" if @debug + socket.write message.to_bytes + end - def close - write Messages::Terminate.new - socket.close - @socket = nil - rescue Errno::ENOTCONN # the backend closed the socket already - ensure - reset_values - end + def close + write Vertica::Messages::Terminate.new + socket.close + @socket = nil + rescue Errno::ENOTCONN # the backend closed the socket already + ensure + reset_values + end - def reset - close if opened? - reset_values + def reset + close if opened? + reset_values + end + + def read_message + type = read_bytes(1) + size = read_bytes(4).unpack('N').first + raise Vertica::Error::MessageError.new("Bad message size: #{size}.") unless size >= 4 + message = Vertica::Messages::BackendMessage.factory type, read_bytes(size - 4) + puts "<= #{message.inspect}" if @debug + return message + end + + def process_message(message) + case message + when Vertica::Messages::ErrorResponse + raise Vertica::Error::ConnectionError.new(message.error_message) + when Vertica::Messages::NoticeResponse + @notice_handler.call(message) if @notice_handler + when Vertica::Messages::BackendKeyData + @backend_pid = message.pid + @backend_key = message.key + when Vertica::Messages::ParameterStatus + @parameters[message.name] = message.value + when Vertica::Messages::ReadyForQuery + @transaction_status = message.transaction_status + else + raise Vertica::Error::MessageError, "Unhandled message: #{message.inspect}" end + end + - def query(query_string, &block) - raise ArgumentError.new("Query string cannot be blank or empty.") if query_string.nil? || query_string.empty? - reset_result - write Messages::Query.new(query_string) - @process_row = block - result = process(true) - result unless @process_row + def query(sql, options = {}, &block) + job = Vertica::Query.new(self, sql, { :row_style => @row_style }.merge(options)) + job.row_handler = block if block_given? + return job.run + end + + def copy(sql, source = nil, &block) + job = Vertica::Query.new(self, sql, :row_style => @row_style) + if block_given? + job.copy_handler = block + elsif source && File.exists?(source.to_s) + job.copy_handler = lambda { |data| file_copy_handler(source, data) } + elsif source.respond_to?(:read) && source.respond_to?(:eof?) + job.copy_handler = lambda { |data| io_copy_handler(source, data) } end - - def prepare(name, query, params_count = 0) - param_types = Array.new(params_count).fill(0) - - write Messages::Parse.new(name, query, param_types) - write Messages::Describe.new(:prepared_statement, name) - write Messages::Sync.new - write Messages::Flush.new - - process + return job.run + end + + protected + + def file_copy_handler(input_file, output) + File.open(input_file, 'r') do |input| + while data = input.read(4096) + output << data + end end - - def execute_prepared(name, *param_values) - portal_name = "" # use the unnamed portal - max_rows = 0 # return all rows - - reset_result - - write Messages::Bind.new(portal_name, name, param_values) - write Messages::Execute.new(portal_name, max_rows) - write Messages::Sync.new - - result = process(true) - - # Close the portal - write Messages::Close.new(:portal, portal_name) - write Messages::Flush.new - - process - - # Return the result from the prepared statement - result + end + + def io_copy_handler(input, output) + until input.eof? + output << input.read(4096) end + end - protected - - - def read_bytes(n) - bytes = socket.read(n) - raise Vertica::Error::ConnectionError.new("Couldn't read #{n} characters from socket.") if bytes.nil? || bytes.size != n - return bytes - end - - def read_message - type = read_bytes(1) - size = read_bytes(4).unpack('N').first - raise Vertica::Error::MessageError.new("Bad message size: #{size}.") unless size >= 4 - msg = Messages::BackendMessage.factory type, read_bytes(size - 4) - puts "<= #{msg.inspect}" if @debug - return msg - end - - - def process(return_result = false) - result = return_result ? Result.new(row_style) : nil - loop do - case message = read_message - when Messages::Authentication - if message.code != Messages::Authentication::OK - write Messages::Password.new(@options[:password], message.code, {:user => @options[:user], :salt => message.salt}) - end - - when Messages::BackendKeyData - @backend_pid = message.pid - @backend_key = message.key - - when Messages::DataRow - @process_row.call(result.format_row(message)) if @process_row && result - result.add_row(message) if result && !@process_row - - when Messages::ErrorResponse - error_class = result ? Vertica::Error::QueryError : Vertica::Error::ConnectionError - raise error_class.new(message.error_message) + def read_bytes(n) + bytes = socket.read(n) + raise Vertica::Error::ConnectionError.new("Couldn't read #{n} characters from socket.") if bytes.nil? || bytes.size != n + return bytes + end - - when Messages::NoticeResponse - @notices << message.values - - when Messages::NotificationResponse - @notifications << Notification.new(message.pid, message.condition, message.additional_info) - - when Messages::ParameterStatus - @parameters[message.name] = message.value - - when Messages::ReadyForQuery - @transaction_status = STATUSES[message.transaction_status] - break unless return_result - - when Messages::RowDescription - result.descriptions = message if result - - when Messages::Unknown - raise Error::MessageError.new("Unknown message type: #{message.message_id}") - - when Messages::BindComplete, - Messages::NoData, - Messages::EmptyQueryResponse, - Messages::ParameterDescription - :nothing - - when Messages::CloseComplete, - Messages::CommandComplete, - Messages::ParseComplete, - Messages::PortalSuspended - break + def startup_connection + write Vertica::Messages::Startup.new(@options[:user], @options[:database]) + message = nil + begin + case message = read_message + when Vertica::Messages::Authentication + if message.code != Vertica::Messages::Authentication::OK + write Vertica::Messages::Password.new(@options[:password], message.code, {:user => @options[:user], :salt => message.salt}) end + else + process_message(message) end + end until message.kind_of?(Vertica::Messages::ReadyForQuery) + end + + def initialize_connection + query("SET SEARCH_PATH TO #{options[:search_path]}") if options[:search_path] + query("SET ROLE #{options[:role]}") if options[:role] + end - result - end - - def reset_values - reset_notifications - reset_result - @parameters = {} - @backend_pid = nil - @backend_key = nil - @transaction_status = nil - @socket = nil - @process_row = nil - end - - def reset_notifications - @notifications = [] - end - - def reset_result - @field_descriptions = [] - @field_values = [] - end + def reset_values + @parameters = {} + @backend_pid = nil + @backend_key = nil + @transaction_status = nil + @socket = nil end end +require 'vertica/query' require 'vertica/column' require 'vertica/result' require 'vertica/messages/message'