lib/vertica/connection.rb in vertica-0.8.1 vs lib/vertica/connection.rb in vertica-0.9.0.beta1
- old
+ new
@@ -1,104 +1,87 @@
module Vertica
-
class Connection
STATUSES = {
- ?I => :no_transaction,
- ?T => :in_transaction,
- ?E => :failed_transaction
+ 'I' => :no_transaction,
+ 'T' => :in_transaction,
+ 'E' => :failed_transaction
}
+ attr_reader :options, :notices, :transaction_status, :backend_pid, :backend_key, :notifications, :parameters
+
def self.cancel(existing_conn)
conn = self.new(existing_conn.options.merge(:skip_startup => true))
conn.write Messages::CancelRequest.new(existing_conn.backend_pid, existing_conn.backend_key)
conn.write Messages::Flush.new
- conn.connection.close
+ conn.socket.close
end
def initialize(options = {})
reset_values
@options = options
+ @notices = []
unless options[:skip_startup]
- connection.write Messages::Startup.new(@options[:user], @options[:database]).to_bytes
+ write Messages::Startup.new(@options[:user], @options[:database])
process
+
+ query("SET SEARCH_PATH TO #{options[:search_path]}") if options[:search_path]
+ query("SET ROLE #{options[:role]}") if options[:role]
end
end
- def connection
- @connection ||= begin
- conn = VerticaSocket.new(@options[:host], @options[:port].to_s)
+ def socket
+ @socket ||= begin
+ conn = TCPSocket.new(@options[:host], @options[:port].to_s)
if @options[:ssl]
conn.write Messages::SslRequest.new.to_bytes
- if conn.read_byte == ?S
+ if conn.read(1) == 'S'
conn = OpenSSL::SSL::SSLSocket.new(conn, OpenSSL::SSL::SSLContext.new)
conn.sync = true
conn.connect
else
raise Error::ConnectionError.new("SSL requested but server doesn't support it.")
end
end
+
conn
end
end
+ def ssl?
+ socket.kind_of?(OpenSSL::SSL::SSLSocket)
+ end
+
def opened?
- @connection && @backend_pid && @transaction_status
+ @socket && @backend_pid && @transaction_status
end
def closed?
!opened?
end
def write(message)
- connection.write_message message
+ raise ArgumentError, "invalid message: (#{message.inspect})" unless message.respond_to?(:to_bytes)
+ socket.write message.to_bytes
end
def close
write Messages::Terminate.new
- connection.shutdown
- @connection = nil
- rescue Errno::ENOTCONN # the backend closed the connection already
+ socket.close
+ @socket = nil
+ rescue Errno::ENOTCONN # the backend closed the socket already
ensure
reset_values
end
def reset
close if opened?
reset_values
end
- def options
- @options.dup
- end
-
- def transaction_status
- @transaction_status
- end
-
- def backend_pid
- @backend_pid
- end
-
- def backend_key
- @backend_key
- end
-
- def notifications
- @notifications
- end
-
- def parameters
- @parameters.dup
- end
-
- def put_copy_data; raise NotImplementedError.new; end
- def put_copy_end; raise NotImplementedError.new; end
- def get_copy_data; raise NotImplementedError.new; end
-
def query(query_string, &block)
raise ArgumentError.new("Query string cannot be blank or empty.") if query_string.nil? || query_string.empty?
reset_result
write Messages::Query.new(query_string)
@process_row = block
@@ -139,14 +122,29 @@
result
end
protected
+
+ def read_bytes(n)
+ bytes = socket.read(n)
+ raise Vertica::Error::ConnectionError.new("Couldn't read #{n} characters from socket.") if bytes.nil? || bytes.size != n
+ return bytes
+ end
+
+ def read_message
+ type = read_bytes(1)
+ size = read_bytes(4).unpack('N').first
+ raise Vertica::Error::MessageError.new("Bad message size: #{size}.") unless size >= 4
+ Messages::BackendMessage.factory type, read_bytes(size - 4)
+ end
+
+
def process(return_result = false)
result = return_result ? Result.new : nil
loop do
- case message = connection.read_message
+ case message = read_message
when Messages::Authentication
if message.code != Messages::Authentication::OK
write Messages::Password.new(@options[:password], message.code, {:user => @options[:user], :salt => message.salt})
end
@@ -157,16 +155,16 @@
when Messages::DataRow
@process_row.call(result.format_row(message)) if @process_row && result
result.add_row(message) if result && !@process_row
when Messages::ErrorResponse
- raise Error::MessageError.new(message.error)
+ error_class = result ? Vertica::Error::QueryError : Vertica::Error::ConnectionError
+ raise error_class.new(message.error_message)
+
when Messages::NoticeResponse
- message.notices.each do |notice|
- @notices << Notice.new(notice[0], notice[1])
- end
+ @notices << message.values
when Messages::NotificationResponse
@notifications << Notification.new(message.pid, message.condition, message.additional_info)
when Messages::ParameterStatus
@@ -191,18 +189,10 @@
when Messages::CloseComplete,
Messages::CommandComplete,
Messages::ParseComplete,
Messages::PortalSuspended
break
- # when Messages::CopyData
- # # nothing
- # when Messages::CopyDone
- # # nothing
- # when Messages::CopyInResponse
- # raise 'not done'
- # when Messages::CopyOutResponse
- # raise 'not done'
end
end
result
end
@@ -212,10 +202,10 @@
reset_result
@parameters = {}
@backend_pid = nil
@backend_key = nil
@transaction_status = nil
- @connection = nil
+ @socket = nil
@process_row = nil
end
def reset_notifications
@notifications = []