require "time" require "ostruct" require "bigdecimal" require "logger" # # Models a MonetDB RecordSet # class MonetDB class Data @@DEBUG = false def initialize(connection) @connection = connection @lang = @connection.lang @header = [] @query = {} @record_set = [] @index = 0 # Position of the last returned record @row_count = 0 @row_offset = 10 @row_index = Integer(REPLY_SIZE) end # Fire a query and return the server response. def execute(q) # fire a query and get ready to receive the data @connection.send(format_query(q)) data = @connection.receive return if data == nil record_set = "" # temporarly store retrieved rows record_set = receive_record_set(data) if (@lang == LANG_SQL) or (@lang == LANG_XQUERY and XQUERY_OUTPUT_SEQ) rows = receive_record_set(data) # the fired query is a SELECT; store and return the whole record set if @action == Q_TABLE @header = parse_header_table(@header) @header.freeze if @row_index.to_i < @row_count.to_i block_rows = "" while next_block data = @connection.receive block_rows += receive_record_set(data) end record_set += block_rows end end # ruby string management seems to not properly understand the MSG_PROMPT escape character. # In order to avoid data loss the @record_set array is built once that all tuples have been retrieved @record_set = record_set.split("\t]\n") if @record_set.length != @query['rows'].to_i raise MonetDB::QueryError, "Warning: Query #{@query['id']} declared to result in #{@query['rows']} but #{@record_set.length} returned instead" end elsif (@lang == XQUERY and ! XQUERY_OUTPUT_SEQ) return data # return an xml file end @record_set.freeze end # Free memory used to store the record set. def free @connection = nil @header = [] @query = {} @record_set = [] @index = 0 # Position of the last returned record @row_index = Integer(REPLY_SIZE) @row_count = 0 @row_offset = 10 end # Returns the record set entries hashed by column name orderd by column position. def fetch_all_hash columns = {} @header["columns_name"].each do |col_name| columns[col_name] = fetch_column_name(col_name) end columns end def fetch_hash if @index >= @query["rows"].to_i false else columns = {} @header["columns_name"].each do |col_name| position = @header["columns_order"].fetch(col_name) row = parse_tuple(@record_set[@index]) columns[col_name] = row[position] end @index += 1 columns end end # Returns the values for the column 'field'. def fetch_column_name(field = "") position = @header["columns_order"].fetch(field) col = Array.new @record_set.each do |row| col << parse_tuple(row[position]) end col end def fetch @index if @index > @query["rows"].to_i false else parse_tuple(@record_set[@index]) @index += 1 end end # Cursor method that retrieves all the records present in a table and stores them in a cache. def fetch_all if @query['type'] == Q_TABLE rows = Array.new @record_set.each do |row| rows << parse_tuple(row) end @index = Integer(rows.length) else raise MonetDB::DataError, "There is no record set currently available" end rows end # Returns the number of rows in the record set. def num_rows() @query["rows"].to_i end # Returns the number of fields in the record set. def num_fields() @query["columns"].to_i end # Returns the (ordered) name of the columns in the record set. def name_fields() @header["columns_name"] end # Returns the (ordered) name of the columns in the record set. def type_fields @header["columns_type"] end private # Store block of data, parse it and store it. def receive_record_set(response) rows = "" response.each_line do |row| if row[0].chr == MSG_QUERY if row[1].chr == Q_TABLE @action = Q_TABLE @query = parse_header_query(row) @query.freeze @row_count = @query['rows'].to_i #total number of rows in table elsif row[1].chr == Q_BLOCK # strip the block header from data @action = Q_BLOCK @block = parse_header_query(row) elsif row[1].chr == Q_TRANSACTION @action = Q_TRANSACTION elsif row[1].chr == Q_CREATE @action = Q_CREATE end elsif row[0].chr == MSG_INFO raise MonetDB::QueryError, row elsif row[0].chr == MSG_SCHEMA_HEADER # process header data @header << row elsif row[0].chr == MSG_TUPLE rows += row elsif row[0] == MSG_PROMPT return rows end end rows # return an array of unparsed tuples end def next_block if @row_index == @row_count return false else # The increment step is small to better deal with ruby socket's performance. # For larger values of the step performance drop; # @row_offset = [@row_offset, (@row_count - @row_index)].min # export offset amount @connection.set_export(@query['id'], @row_index.to_s, @row_offset.to_s) @row_index += @row_offset @row_offset += 1 end true end # Formats a query string so that it can be parsed by the server. def format_query(q) if @lang.downcase == LANG_SQL "s" + q + ";" elsif @lang.downcase == LANG_XQUERY "s" + q else raise LanguageNotSupported, @lang end end # Parse one tuple as returned from the server. def parse_tuple(tuple) fields = Array.new # remove trailing "[" tuple = tuple.gsub(/^\[\s+/,'') tuple.split(/,\t/).each do |f| fields << f.gsub(/\\/, '').gsub(/^"/,'').gsub(/"$/,'').gsub(/\"/, '') end return fields.freeze end # Parses a query header and returns information about the query. def parse_header_query(row) type = row[1].chr if type == Q_TABLE # Performing a SELECT: store informations about the table size, query id, total number of records and returned. id = row.split(' ')[1] rows = row.split(' ')[2] columns = row.split(' ')[3] returned = row.split(' ')[4] header = { "id" => id, "type" => type, "rows" => rows, "columns" => columns, "returned" => returned } elsif type == Q_BLOCK # processing block header id = row.split(' ')[1] columns = row.split(' ')[2] remains = row.split(' ')[3] offset = row.split(' ')[4] header = { "id" => id, "type" => type, "remains" => remains, "columns" => columns, "offset" => offset } else header = {"type" => type} end header.freeze end # Parses a Q_TABLE header and returns information about the schema. def parse_header_table(header_t) if @query["type"] == Q_TABLE if header_t != nil name_t = header_t[0].split(' ')[1].gsub(/,$/, '') name_cols = Array.new header_t[1].split('%')[1].gsub(/'^\%'/, '').split('#')[0].split(' ').each do |col| name_cols << col.gsub(/,$/, '') end type_cols = { } header_t[2].split('%')[1].gsub(/'^\%'/, '').split('#')[0].split(' ').each_with_index do |col, i| if col.gsub(/,$/, '') != nil type_cols[ name_cols[i] ] = col.gsub(/,$/, '') end end length_cols = { } header_t[3].split('%')[1].gsub(/'^\%'/, '').split('#')[0].split(' ').each_with_index do |col, i| length_cols[ name_cols[i] ] = col.gsub(/,$/, '') end columns_order = {} name_cols.each_with_index do |col, i| columns_order[col] = i end {"table_name" => name_t, "columns_name" => name_cols, "columns_type" => type_cols, "columns_length" => length_cols, "columns_order" => columns_order}.freeze end end end end end