lib/vertica/connection.rb in vertica-0.9.0.beta3 vs lib/vertica/connection.rb in vertica-0.9.0.beta4
- old
+ new
@@ -1,236 +1,187 @@
require 'socket'
-module Vertica
- class Connection
+class Vertica::Connection
- STATUSES = {
- 'I' => :no_transaction,
- 'T' => :in_transaction,
- 'E' => :failed_transaction
- }
+ attr_reader :options, :notices, :transaction_status, :backend_pid, :backend_key, :parameters, :notice_handler
- attr_reader :options, :notices, :transaction_status, :backend_pid, :backend_key, :notifications, :parameters
+ attr_accessor :row_style, :debug
- attr_accessor :row_style, :debug
+ def self.cancel(existing_conn)
+ conn = self.new(existing_conn.options.merge(:skip_startup => true))
+ conn.write Vertica::Messages::CancelRequest.new(existing_conn.backend_pid, existing_conn.backend_key)
+ conn.write Vertica::Messages::Flush.new
+ conn.socket.close
+ end
- def self.cancel(existing_conn)
- conn = self.new(existing_conn.options.merge(:skip_startup => true))
- conn.write Messages::CancelRequest.new(existing_conn.backend_pid, existing_conn.backend_key)
- conn.write Messages::Flush.new
- conn.socket.close
- end
+ # Opens a connectio the a Vertica server
+ # @param [Hash] options The connection options to use.
+ def initialize(options = {})
+ reset_values
- def initialize(options = {})
- reset_values
+ @options = {}
+ options.each { |key, value| @options[key.to_s.to_sym] = value }
+ @options[:port] ||= 5433
- @options = {}
- options.each { |key, value| @options[key.to_s.to_sym] = value }
-
- @notices = []
-
- @row_style = @options[:row_style] ? @options[:row_style] : :hash
-
- unless options[:skip_startup]
- write Messages::Startup.new(@options[:user], @options[:database])
- process
-
- query("SET SEARCH_PATH TO #{options[:search_path]}") if options[:search_path]
- query("SET ROLE #{options[:role]}") if options[:role]
- end
+ @row_style = @options[:row_style] ? @options[:row_style] : :hash
+ unless options[:skip_startup]
+ startup_connection
+ initialize_connection
end
-
- def socket
- @socket ||= begin
- raw_socket = TCPSocket.new(@options[:host], @options[:port].to_s)
- if @options[:ssl]
- require 'openssl/ssl'
- raw_socket.write Messages::SslRequest.new.to_bytes
- if raw_socket.read(1) == 'S'
- raw_socket = OpenSSL::SSL::SSLSocket.new(raw_socket, OpenSSL::SSL::SSLContext.new)
- raw_socket.sync = true
- raw_socket.connect
- else
- raise Error::ConnectionError.new("SSL requested but server doesn't support it.")
- end
+ end
+
+ def on_notice(&block)
+ @notice_handler = block
+ end
+
+ def socket
+ @socket ||= begin
+ raw_socket = TCPSocket.new(@options[:host], @options[:port].to_i)
+ if @options[:ssl]
+ require 'openssl/ssl'
+ raw_socket.write Vertica::Messages::SslRequest.new.to_bytes
+ if raw_socket.read(1) == 'S'
+ raw_socket = OpenSSL::SSL::SSLSocket.new(raw_socket, OpenSSL::SSL::SSLContext.new)
+ raw_socket.sync = true
+ raw_socket.connect
+ else
+ raise Vertica::Error::ConnectionError.new("SSL requested but server doesn't support it.")
end
-
- raw_socket
end
+
+ raw_socket
end
+ end
- def ssl?
- socket.kind_of?(OpenSSL::SSL::SSLSocket)
- end
+ def ssl?
+ socket.kind_of?(OpenSSL::SSL::SSLSocket)
+ end
- def opened?
- @socket && @backend_pid && @transaction_status
- end
+ def opened?
+ @socket && @backend_pid && @transaction_status
+ end
- def closed?
- !opened?
- end
+ def closed?
+ !opened?
+ end
- def write(message)
- raise ArgumentError, "invalid message: (#{message.inspect})" unless message.respond_to?(:to_bytes)
- puts "=> #{message.inspect}" if @debug
- socket.write message.to_bytes
- end
+ def write(message)
+ raise ArgumentError, "invalid message: (#{message.inspect})" unless message.respond_to?(:to_bytes)
+ puts "=> #{message.inspect}" if @debug
+ socket.write message.to_bytes
+ end
- def close
- write Messages::Terminate.new
- socket.close
- @socket = nil
- rescue Errno::ENOTCONN # the backend closed the socket already
- ensure
- reset_values
- end
+ def close
+ write Vertica::Messages::Terminate.new
+ socket.close
+ @socket = nil
+ rescue Errno::ENOTCONN # the backend closed the socket already
+ ensure
+ reset_values
+ end
- def reset
- close if opened?
- reset_values
+ def reset
+ close if opened?
+ reset_values
+ end
+
+ def read_message
+ type = read_bytes(1)
+ size = read_bytes(4).unpack('N').first
+ raise Vertica::Error::MessageError.new("Bad message size: #{size}.") unless size >= 4
+ message = Vertica::Messages::BackendMessage.factory type, read_bytes(size - 4)
+ puts "<= #{message.inspect}" if @debug
+ return message
+ end
+
+ def process_message(message)
+ case message
+ when Vertica::Messages::ErrorResponse
+ raise Vertica::Error::ConnectionError.new(message.error_message)
+ when Vertica::Messages::NoticeResponse
+ @notice_handler.call(message) if @notice_handler
+ when Vertica::Messages::BackendKeyData
+ @backend_pid = message.pid
+ @backend_key = message.key
+ when Vertica::Messages::ParameterStatus
+ @parameters[message.name] = message.value
+ when Vertica::Messages::ReadyForQuery
+ @transaction_status = message.transaction_status
+ else
+ raise Vertica::Error::MessageError, "Unhandled message: #{message.inspect}"
end
+ end
+
- def query(query_string, &block)
- raise ArgumentError.new("Query string cannot be blank or empty.") if query_string.nil? || query_string.empty?
- reset_result
- write Messages::Query.new(query_string)
- @process_row = block
- result = process(true)
- result unless @process_row
+ def query(sql, options = {}, &block)
+ job = Vertica::Query.new(self, sql, { :row_style => @row_style }.merge(options))
+ job.row_handler = block if block_given?
+ return job.run
+ end
+
+ def copy(sql, source = nil, &block)
+ job = Vertica::Query.new(self, sql, :row_style => @row_style)
+ if block_given?
+ job.copy_handler = block
+ elsif source && File.exists?(source.to_s)
+ job.copy_handler = lambda { |data| file_copy_handler(source, data) }
+ elsif source.respond_to?(:read) && source.respond_to?(:eof?)
+ job.copy_handler = lambda { |data| io_copy_handler(source, data) }
end
-
- def prepare(name, query, params_count = 0)
- param_types = Array.new(params_count).fill(0)
-
- write Messages::Parse.new(name, query, param_types)
- write Messages::Describe.new(:prepared_statement, name)
- write Messages::Sync.new
- write Messages::Flush.new
-
- process
+ return job.run
+ end
+
+ protected
+
+ def file_copy_handler(input_file, output)
+ File.open(input_file, 'r') do |input|
+ while data = input.read(4096)
+ output << data
+ end
end
-
- def execute_prepared(name, *param_values)
- portal_name = "" # use the unnamed portal
- max_rows = 0 # return all rows
-
- reset_result
-
- write Messages::Bind.new(portal_name, name, param_values)
- write Messages::Execute.new(portal_name, max_rows)
- write Messages::Sync.new
-
- result = process(true)
-
- # Close the portal
- write Messages::Close.new(:portal, portal_name)
- write Messages::Flush.new
-
- process
-
- # Return the result from the prepared statement
- result
+ end
+
+ def io_copy_handler(input, output)
+ until input.eof?
+ output << input.read(4096)
end
+ end
- protected
-
-
- def read_bytes(n)
- bytes = socket.read(n)
- raise Vertica::Error::ConnectionError.new("Couldn't read #{n} characters from socket.") if bytes.nil? || bytes.size != n
- return bytes
- end
-
- def read_message
- type = read_bytes(1)
- size = read_bytes(4).unpack('N').first
- raise Vertica::Error::MessageError.new("Bad message size: #{size}.") unless size >= 4
- msg = Messages::BackendMessage.factory type, read_bytes(size - 4)
- puts "<= #{msg.inspect}" if @debug
- return msg
- end
-
-
- def process(return_result = false)
- result = return_result ? Result.new(row_style) : nil
- loop do
- case message = read_message
- when Messages::Authentication
- if message.code != Messages::Authentication::OK
- write Messages::Password.new(@options[:password], message.code, {:user => @options[:user], :salt => message.salt})
- end
-
- when Messages::BackendKeyData
- @backend_pid = message.pid
- @backend_key = message.key
-
- when Messages::DataRow
- @process_row.call(result.format_row(message)) if @process_row && result
- result.add_row(message) if result && !@process_row
-
- when Messages::ErrorResponse
- error_class = result ? Vertica::Error::QueryError : Vertica::Error::ConnectionError
- raise error_class.new(message.error_message)
+ def read_bytes(n)
+ bytes = socket.read(n)
+ raise Vertica::Error::ConnectionError.new("Couldn't read #{n} characters from socket.") if bytes.nil? || bytes.size != n
+ return bytes
+ end
-
- when Messages::NoticeResponse
- @notices << message.values
-
- when Messages::NotificationResponse
- @notifications << Notification.new(message.pid, message.condition, message.additional_info)
-
- when Messages::ParameterStatus
- @parameters[message.name] = message.value
-
- when Messages::ReadyForQuery
- @transaction_status = STATUSES[message.transaction_status]
- break unless return_result
-
- when Messages::RowDescription
- result.descriptions = message if result
-
- when Messages::Unknown
- raise Error::MessageError.new("Unknown message type: #{message.message_id}")
-
- when Messages::BindComplete,
- Messages::NoData,
- Messages::EmptyQueryResponse,
- Messages::ParameterDescription
- :nothing
-
- when Messages::CloseComplete,
- Messages::CommandComplete,
- Messages::ParseComplete,
- Messages::PortalSuspended
- break
+ def startup_connection
+ write Vertica::Messages::Startup.new(@options[:user], @options[:database])
+ message = nil
+ begin
+ case message = read_message
+ when Vertica::Messages::Authentication
+ if message.code != Vertica::Messages::Authentication::OK
+ write Vertica::Messages::Password.new(@options[:password], message.code, {:user => @options[:user], :salt => message.salt})
end
+ else
+ process_message(message)
end
+ end until message.kind_of?(Vertica::Messages::ReadyForQuery)
+ end
+
+ def initialize_connection
+ query("SET SEARCH_PATH TO #{options[:search_path]}") if options[:search_path]
+ query("SET ROLE #{options[:role]}") if options[:role]
+ end
- result
- end
-
- def reset_values
- reset_notifications
- reset_result
- @parameters = {}
- @backend_pid = nil
- @backend_key = nil
- @transaction_status = nil
- @socket = nil
- @process_row = nil
- end
-
- def reset_notifications
- @notifications = []
- end
-
- def reset_result
- @field_descriptions = []
- @field_values = []
- end
+ def reset_values
+ @parameters = {}
+ @backend_pid = nil
+ @backend_key = nil
+ @transaction_status = nil
+ @socket = nil
end
end
+require 'vertica/query'
require 'vertica/column'
require 'vertica/result'
require 'vertica/messages/message'