lib/vertica/query.rb in vertica-0.12.0 vs lib/vertica/query.rb in vertica-1.0.0.rc1

- old
+ new

@@ -1,85 +1,104 @@ class Vertica::Query - attr_reader :connection, :sql, :result, :error - attr_accessor :row_handler, :copy_handler, :row_style + attr_reader :connection, :sql, :error, :result - def initialize(connection, sql, options = {}) + def initialize(connection, sql, row_handler: nil, copy_handler: nil) @connection, @sql = connection, sql - - @row_style = options[:row_style] || @connection.row_style || :hash - @row_handler = options[:row_handler] - @copy_handler = options[:copy_handler] - - @error = nil - @result = Vertica::Result.new(row_style) + if row_handler.nil? + @buffer = [] + @row_handler = lambda { |row| buffer_row(row) } + else + @row_handler = row_handler + end + @copy_handler = copy_handler + @error = nil end def run - @connection.write_message(Vertica::Messages::Query.new(sql)) + @connection.write_message(Vertica::Protocol::Query.new(sql)) begin process_message(message = @connection.read_message) - end until message.kind_of?(Vertica::Messages::ReadyForQuery) + end until message.kind_of?(Vertica::Protocol::ReadyForQuery) raise error unless error.nil? return result end - def write(data) - @connection.write_message(Vertica::Messages::CopyData.new(data)) - return self - end - - alias_method :<<, :write - def to_s @sql end protected + def buffer_rows? + !!@buffer + end + def process_message(message) case message - when Vertica::Messages::ErrorResponse + when Vertica::Protocol::ErrorResponse @error = Vertica::Error::QueryError.from_error_response(message, @sql) - when Vertica::Messages::EmptyQueryResponse + when Vertica::Protocol::EmptyQueryResponse @error = Vertica::Error::EmptyQueryError.new("A SQL string was expected, but the given string was blank or only contained SQL comments.") - when Vertica::Messages::CopyInResponse - handle_copy_from_stdin - when Vertica::Messages::RowDescription - result.descriptions = message - when Vertica::Messages::DataRow - handle_datarow(message) - when Vertica::Messages::CommandComplete - result.tag = message.tag + when Vertica::Protocol::CopyInResponse + handle_copy_in_response(message) + when Vertica::Protocol::RowDescription + handle_row_description(message) + when Vertica::Protocol::DataRow + handle_data_row(message) + when Vertica::Protocol::CommandComplete + handle_command_complete(message) else @connection.process_message(message) end end - def handle_copy_from_stdin - if copy_handler.nil? - @connection.write_message(Vertica::Messages::CopyFail.new('no handler provided')) + def handle_row_description(message) + @row_description = Vertica::RowDescription.build(message) + end + + def handle_data_row(message) + @row_handler.call(@row_description.build_row(message)) + end + + def handle_command_complete(message) + if buffer_rows? + @result = Vertica::Result.build(row_description: @row_description, rows: @buffer, tag: message.tag) + @row_description, @buffer = nil, nil else + @result = message.tag + end + end + + def handle_copy_in_response(_message) + if @copy_handler.nil? + @connection.write_message(Vertica::Protocol::CopyFail.new('no handler provided')) + else begin - if copy_handler.call(self) == :rollback - @connection.write_message(Vertica::Messages::CopyFail.new("rollback")) - else - @connection.write_message(Vertica::Messages::CopyDone.new) - end + @copy_handler.call(CopyFromStdinWriter.new(connection)) + @connection.write_message(Vertica::Protocol::CopyDone.new) rescue => e - @connection.write_message(Vertica::Messages::CopyFail.new(e.message)) + @connection.write_message(Vertica::Protocol::CopyFail.new(e.message)) end end end - def handle_datarow(datarow_message) - record = result.format_row(datarow_message) - result.add_row(record) if buffer_rows? - row_handler.call(record) if row_handler + def buffer_row(row) + @buffer << row end - def buffer_rows? - row_handler.nil? && copy_handler.nil? + class CopyFromStdinWriter + def initialize(connection) + @connection = connection + end + + def write(data) + @connection.write_message(Vertica::Protocol::CopyData.new(data)) + return self + end + + alias_method :<<, :write end + private_constant :CopyFromStdinWriter end