require 'stringio' module Nodepile # Generates "Factories" for harvesting tabular data from a source stream/file. # Includes facilities for parsing common file formats (CSV/TSV). # Includes facilities for handling common problems encountered when parsing # manually-created tabular data files such as: relevant tabular data is not # aligned "top-left", tabular data includes blank or repeated columns, # tabular data ends before end of file # summary rows appear in the tabular data that need to be ignored. class TabularRecordSource include Enumerable DEFAULT_LOADING_GUIDELINES = { mandatory_headers: [], # this can be extremely important to correctly finding tables format: :csv||:tsv||:guess, #assume CSV unless told otherwise allow_leading_skip_rows: 10, # arbitrary content that may appear before table allow_gap_rows: 2||nil, # entirely blank rows appearing mid-tabl, nil indicates allow infinite allow_gap_columns: 1, # columns which have a blank header within the table allow_left_offset: 5, # blank columnns allowed left of table duplicate_header_rule: :first||:last||:ignore||:rename||:fail, #keep the first ignored_header_char: '#', # header names starting with this are just plain ignored emit_blank_records: false, # unless true, entirely blank records are not returned trim_headers: true, #strip leading and trailing spaces }.freeze # Create a new RecordSource intended to read from the specified input # and using the parsing strategy specified by the loading guidelines. def initialize(source,**loading_guidelines) (loading_guidelines.keys - DEFAULT_LOADING_GUIDELINES.keys).tap{|x| raise <<~ERRMSG unless x.empty?} Unrecognized named parameters used for RecordSource creation #{x.inspect} ERRMSG @loading_guidelines = DEFAULT_LOADING_GUIDELINES.merge(loading_guidelines).freeze raise "The source must be non-nil" if source.nil? @source = source # will lazy load @is_mid_read = false # only relevant for non-parallel sources @replayable_flag = if @source.is_a?(String) :parallel # simultaneous each() is okay elsif @source.respond_to?(:rewind) :single # can't guarantee simultaneous each() safe else nil end end #new # Yields the "records" of the first "table" encountered in the bound data # source according to the parameters it was given. First row yielded is # always the header. Raises an error if a header is not found. # Beware... depending on the type of data source used at creation, it # may not be possible to rewind or retrieve data in parallel. # With that said, a filename or String both allow parallel retrieval. # # Also note that blank strings will be passed through until the specified # allow_gap_rows are exceeded. This can mean trailing blanks in long files. # # @yieldparam [Array] Array includes at least two elements. The first is # an Array of "fields". The second element is the record # number within the source (zero index). It's important # to note if any field contains embedded newlines, the record # number is not the same as the line number # @return [Integer,Enumerator] Returns enumerator if no block is given. # Otherwise returns the count of records yielded excluding # the header line. def each(&block) return enum_for(:each) unless block_given? raise "This data source type may only be read once." if @source.nil? raise <<~ERRMSG if @is_mid_read && @replayable_flag != :parallel For this type of data source, you may not read simultaneously. ERRMSG @is_mid_read = true scanner = self.class._make_record_stream(@source,format: @loading_guidelines[:format]) scanner = self.class._reposition_to_header_rec(scanner,@loading_guidelines) raw_header,header_pos = scanner.next header_range = self.class._calc_header_range(raw_header,@loading_guidelines[:allow_gap_columns]) # process the header line to create a "mask" yield [raw_header[header_range],header_pos] # return the trimmed header rec_count = self.class._emit_rows(scanner,header_range, @loading_guidelines[:emit_blank_records], trim_headers: @loading_guidelines[:trim_headers], tolerate_blanks: @loading_guidelines[:allow_gap_rows], &block ) @is_mid_read = false @source = nil if @replayable_flag.nil? # release resources return rec_count end #each ########################################################################### private SEPARATOR_CHAR_LIST = {tsv: "\t", csv: ','}.freeze # Note, due to the need to terminate if too many blanks, this may not read # to the end of the file def self._emit_rows(raw_rec_enum,range_mask,emit_blank_records,trim_headers:, tolerate_blanks: nil, &block ) contig_blank_count = 0 emitted_record_count = 0 need_to_trim_row = trim_headers # trim the first one loop do begin rec,pos = raw_rec_enum.next masked_rec = rec&.[](range_mask) next if masked_rec.nil? is_blank_record = masked_rec.all?{|s| s.nil? || /^\s*$/.match?(s)} contig_blank_count = is_blank_record ? (contig_blank_count+1) : 0 if tolerate_blanks && contig_blank_count > tolerate_blanks return emitted_record_count # end of records else if emit_blank_records || !is_blank_record if need_to_trim_row # only done once (for header) if at all masked_rec.map!{|s| s&.strip} need_to_trim_row = false # only first emitted row end yield [masked_rec,pos] emitted_record_count += 1 end end rescue StopIteration return emitted_record_count # running out of records is an okay end end # rescuing end #loop over records raise "Unexpected issue encountered." # should never get here end # self._emit_rows() MIN_NONBLANK_HEADER_BEFORE_GAP_ALLOWED = 3 # Given a presumed header, identify the position of the largest contiguous # block of non-blank fields and return the range information to be used # for scraping successive rows. def self._calc_header_range(raw_header_record,max_blank_cols) max_blank_cols ||= 0 # default is no blank columns tolerated in header blank_tol = max_blank_cols ix0 = nil runs = Array.new (0..(raw_header_record.length-1)).each{|ix| if raw_header_record[ix].nil? || /^\s*$/.match?(raw_header_record[ix]) if ix0.nil? # deliberate no-op, in middle of blank run elsif blank_tol >= 0 && ix-ix0 >= MIN_NONBLANK_HEADER_BEFORE_GAP_ALLOWED # at least two content-filled columns must be found before tolerating blanks blank_tol -= 1 else runs << [ix0,ix-1] ix0 = nil blank_tol = max_blank_cols end else # non blank ix0 ||= ix # record start of run blank_tol = max_blank_cols # reset tolerance for blanks end } runs << [ix0,raw_header_record.length-1-(max_blank_cols-blank_tol)] if ix0 widest = runs.max{|a,b| a[1]-a[0] <=> b[1]-b[0] } return widest && (widest[0]..widest[1]) # range defines end # Opens up a record stream based on the source provided and the format rule # specified. # @param source [] Many different values are possible # 1) a filepath to a readable text file # 2) a string variable (must contain at least one newline) # 3) An enumerable of strings where each string is a record "line" to be parsed individually # 4) An enumerable of arrays of strings (where the inner array is the column values). # In this case the format parameter is ignored # @param format [:csv,:tsv,:guess,Regexp] Indicates how to interpret column delimiters # and row delimiters. The format parameter is ignored if the source # is an enumerable of # @return [Enumerator] Whose next() method returns two-element arrays # Where the first element is the fields of the record # (in the form of an array of strings) and the second element # is the zero based index indicating the record number within the source # Note that the Enumerator returned may not be rewindable/replayable. def self._make_record_stream(source,format: :csv||:tsv||:guess||nil) col_sep = case format when nil nil when :csv,:tsv SEPARATOR_CHAR_LIST[format] when :guess # in the future, we might be able to guess based on reading # the first line and looking for tabs or commas if source.is_a?(String) && /\.(csv|tsv)$/i.match(source) SEPARATOR_CHAR_LIST[$1.downcase.to_sym] else raise "Format specified as :guess but unable to deduce format" end else raise "Currently unhandled format specifier [#{format}]" end case source in Enumerable if source.first.is_a?(String) # This is the most manual case because of the need to try and detect # lines that are split by a quoted multiline string. return _chunk_lines(source,col_sep) in Enumerable if source.first.is_a?(Array) and source.first.first.is_a?(String) # no need for further processing, assume it already is a record source return source.each_with_index in String if source.include?("\n") # if passed a string, it must be multiline to be treated as the data source return CSV.parse(source,col_sep: col_sep).each_with_index in String if !File.exist?(source) raise "Unable to find the indicated file: #{source}" in String if File.exist?(source) # presumed to be valid filepath return CSV.foreach(source,col_sep: col_sep).each_with_index end #case source raise "Unable to convert the provided source into a record stream" end # self.make_record_stream() # Tests a string to see if the last field looks like it might have a # mutiline field. This is detected by checking for whether the rightmost # field has unbalanced quote characters. # IMPORTANT NOTE: It relies on being told whether the line begins within a # quote # (meaning that a complete line contains at least one unquoted separator) def self._is_dangling?(line,sep_char,started_in_quote,quot_char: '"') qc = Array.new #quote counts qc << (start_in_quot ? 1 : 0) # count quotes in each field to identify unbalanced quotes line.each_char{|c| if c == quot_char qc[-1] += 1 elsif c == sep_char && qc.last.even? qc << 0 end } return qc.last.odd? end # bunches up groups of lines when they look like the last column may # contain a multiline value (with embedded carriage return) def self._chunk_lines(line_enum,sep_char,&is_continued) return enum_for(:_chunk_lines,line_enum,sep_char,&is_continued) unless block_given? buf = "" ix = 0 # will be one-based counter is_in_quote = false line_enum.each{|line| if _is_dangling?(line,sep_char,is_in_quote) is_in_quote = true buf.concat(line,line.last == "\n" ? '' : "\n") else is_in_quote = false rec = CSV.parse((buf.empty? ? line : buf.concat(line)),col_sep: sep_char) buf.clear yield [rec,(ix+=1)-1] end } yield [CSV.parse(buf,col_sep: sep_char),(ix+=1)-1] unless buf.empty? return nil #meaningless return value end # Assuming we are starting from the absolute top of the source, scan # forward looking for the header row according to the provided guidelines. # Note, it may have to read past the header row to ensure it's made the best # possible choice for that header row. # # @param raw_rec_enum [Enumerator] record enumerator for "raw" records such # as is generated by _make_record_stream # @param guidelines [Hash] guidelines package as generated during # instantiation of a class. Note this method is not intended to be called # publicly. # @return [Enumerator] Enumerator that should replace the enumerator passed # in and whose first record is the header row. # Important Note: The position of the raw_rec_enum is almost certain to be # changed by calling next() on it. It should not be used after this # call because of this and other buffer considerations def self._reposition_to_header_rec(raw_rec_enum,guidelines) buffer = Array.new begin loop do buffer << raw_rec_enum.next break if buffer.length > guidelines[:allow_leading_skip_rows] end rescue StopIteration #deliberately a no-op return nil if buffer.empty? end scores = Hash.new{|h,ix| h[ix] = 0} # scoring for possible header row mand_cols = guidelines[:mandatory_headers] buffer.each_with_index{|(rec,_),buf_pos| hdr_range = _calc_header_range(rec,guidelines[:allow_gap_columns]) # best possible header range next if hdr_range.nil? if mand_cols.empty? || (mand_cols - rec[hdr_range]).empty? scores[buf_pos] = 10*(hdr_range.size) + # prefer wide columns (mand_cols.empty? ? 0 : 99000) + # huge bonus for having mandatory columns (1- buf_pos.to_f/buffer.length) # slight preference for early records end # possible other factors for future consideration: # preceding blank line # containing a non-blank row immediately beneath it } # end examination of rows in the buffer best_guess = scores.max{|a,b| a[1] <=> b[1] } if best_guess.nil? raise "Unable to find header record within the first #{[buffer.length,guidelines[:allow_leading_skip_rows]].min} records examined!" end #buffer[best_guess[0]..-1].to_enum + raw_rec_enum # chain enumerators to include buffer return self._joined_enum(buffer,best_guess[0]..buffer.length, raw_rec_enum) end # This hack-method was added because for some reason Enumerator::Chain # does not seem to support the next() method as I had to hand-roll # the Chain # return [nil,Enumerator] def self._joined_enum(buffer1,buffer1_range,record_enum) return enum_for(:_joined_enum,buffer1,buffer1_range,record_enum) unless block_given? buffer1_range.each{|ix| yield buffer1[ix] } buffer1 = nil # release (in case it matters) begin loop do yield record_enum.next end rescue StopIteration #deliberately a no-op end return nil # meaningless return value end end #class TabularRecordSource end #module Nodepile