lib/vertica/query.rb in vertica-0.12.0 vs lib/vertica/query.rb in vertica-1.0.0.rc1
- old
+ new
@@ -1,85 +1,104 @@
class Vertica::Query
- attr_reader :connection, :sql, :result, :error
- attr_accessor :row_handler, :copy_handler, :row_style
+ attr_reader :connection, :sql, :error, :result
- def initialize(connection, sql, options = {})
+ def initialize(connection, sql, row_handler: nil, copy_handler: nil)
@connection, @sql = connection, sql
-
- @row_style = options[:row_style] || @connection.row_style || :hash
- @row_handler = options[:row_handler]
- @copy_handler = options[:copy_handler]
-
- @error = nil
- @result = Vertica::Result.new(row_style)
+ if row_handler.nil?
+ @buffer = []
+ @row_handler = lambda { |row| buffer_row(row) }
+ else
+ @row_handler = row_handler
+ end
+ @copy_handler = copy_handler
+ @error = nil
end
def run
- @connection.write_message(Vertica::Messages::Query.new(sql))
+ @connection.write_message(Vertica::Protocol::Query.new(sql))
begin
process_message(message = @connection.read_message)
- end until message.kind_of?(Vertica::Messages::ReadyForQuery)
+ end until message.kind_of?(Vertica::Protocol::ReadyForQuery)
raise error unless error.nil?
return result
end
- def write(data)
- @connection.write_message(Vertica::Messages::CopyData.new(data))
- return self
- end
-
- alias_method :<<, :write
-
def to_s
@sql
end
protected
+ def buffer_rows?
+ !!@buffer
+ end
+
def process_message(message)
case message
- when Vertica::Messages::ErrorResponse
+ when Vertica::Protocol::ErrorResponse
@error = Vertica::Error::QueryError.from_error_response(message, @sql)
- when Vertica::Messages::EmptyQueryResponse
+ when Vertica::Protocol::EmptyQueryResponse
@error = Vertica::Error::EmptyQueryError.new("A SQL string was expected, but the given string was blank or only contained SQL comments.")
- when Vertica::Messages::CopyInResponse
- handle_copy_from_stdin
- when Vertica::Messages::RowDescription
- result.descriptions = message
- when Vertica::Messages::DataRow
- handle_datarow(message)
- when Vertica::Messages::CommandComplete
- result.tag = message.tag
+ when Vertica::Protocol::CopyInResponse
+ handle_copy_in_response(message)
+ when Vertica::Protocol::RowDescription
+ handle_row_description(message)
+ when Vertica::Protocol::DataRow
+ handle_data_row(message)
+ when Vertica::Protocol::CommandComplete
+ handle_command_complete(message)
else
@connection.process_message(message)
end
end
- def handle_copy_from_stdin
- if copy_handler.nil?
- @connection.write_message(Vertica::Messages::CopyFail.new('no handler provided'))
+ def handle_row_description(message)
+ @row_description = Vertica::RowDescription.build(message)
+ end
+
+ def handle_data_row(message)
+ @row_handler.call(@row_description.build_row(message))
+ end
+
+ def handle_command_complete(message)
+ if buffer_rows?
+ @result = Vertica::Result.build(row_description: @row_description, rows: @buffer, tag: message.tag)
+ @row_description, @buffer = nil, nil
else
+ @result = message.tag
+ end
+ end
+
+ def handle_copy_in_response(_message)
+ if @copy_handler.nil?
+ @connection.write_message(Vertica::Protocol::CopyFail.new('no handler provided'))
+ else
begin
- if copy_handler.call(self) == :rollback
- @connection.write_message(Vertica::Messages::CopyFail.new("rollback"))
- else
- @connection.write_message(Vertica::Messages::CopyDone.new)
- end
+ @copy_handler.call(CopyFromStdinWriter.new(connection))
+ @connection.write_message(Vertica::Protocol::CopyDone.new)
rescue => e
- @connection.write_message(Vertica::Messages::CopyFail.new(e.message))
+ @connection.write_message(Vertica::Protocol::CopyFail.new(e.message))
end
end
end
- def handle_datarow(datarow_message)
- record = result.format_row(datarow_message)
- result.add_row(record) if buffer_rows?
- row_handler.call(record) if row_handler
+ def buffer_row(row)
+ @buffer << row
end
- def buffer_rows?
- row_handler.nil? && copy_handler.nil?
+ class CopyFromStdinWriter
+ def initialize(connection)
+ @connection = connection
+ end
+
+ def write(data)
+ @connection.write_message(Vertica::Protocol::CopyData.new(data))
+ return self
+ end
+
+ alias_method :<<, :write
end
+ private_constant :CopyFromStdinWriter
end