# Licensed to the Apache Software Foundation (ASF) under one # or more contributor license agreements. See the NOTICE file # distributed with this work for additional information # regarding copyright ownership. The ASF licenses this file # to you under the Apache License, Version 2.0 (the # "License"); you may not use this file except in compliance # with the License. You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, # software distributed under the License is distributed on an # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY # KIND, either express or implied. See the License for the # specific language governing permissions and limitations # under the License. require "csv" require "pathname" require "time" module Arrow class CSVLoader class << self def load(path_or_data, **options) new(path_or_data, **options).load end end def initialize(path_or_data, **options) @path_or_data = path_or_data @options = options if @options.key?(:delimiter) @options[:col_sep] = @options.delete(:delimiter) end @compression = @options.delete(:compression) end def load case @path_or_data when Pathname load_from_path(@path_or_data.to_path) when /\A.+\.csv\z/i load_from_path(@path_or_data) else load_data(@path_or_data) end end private def open_csv(path, **options) CSV.open(path, **options) do |csv| yield(csv) end end def parse_csv_data(data, **options) csv = CSV.new(data, **options) begin yield(csv) ensure csv.close end end def read_csv(csv) values_set = [] csv.each do |row| if row.is_a?(CSV::Row) row = row.collect(&:last) end row.each_with_index do |value, i| values = (values_set[i] ||= []) values << value end end return nil if values_set.empty? arrays = values_set.collect.with_index do |values, i| ArrayBuilder.build(values) end if csv.headers names = csv.headers else names = arrays.size.times.collect(&:to_s) end raw_table = {} names.each_with_index do |name, i| raw_table[name] = arrays[i] end Table.new(raw_table) end def reader_options options = CSVReadOptions.new @options.each do |key, value| case key when :headers case value when ::Array options.column_names = value when String return nil else if value options.generate_column_names = false else options.generate_column_names = true end end when :column_types value.each do |name, type| options.add_column_type(name, type) end when :schema options.add_schema(value) when :encoding # process encoding on opening input when :col_sep options.delimiter = value else setter = "#{key}=" if options.respond_to?(setter) options.__send__(setter, value) else return nil end end end options end def open_decompress_input(raw_input) if @compression codec = Codec.new(@compression) CompressedInputStream.open(codec, raw_input) do |input| yield(input) end else yield(raw_input) end end def open_encoding_convert_stream(raw_input, &block) encoding = @options[:encoding] if encoding converter = Gio::CharsetConverter.new("UTF-8", encoding) convert_input_stream = Gio::ConverterInputStream.new(raw_input, converter) GIOInputStream.open(convert_input_stream, &block) else yield(raw_input) end end def wrap_input(raw_input) open_decompress_input(raw_input) do |input_| open_encoding_convert_stream(input_) do |input__| yield(input__) end end end def load_from_path(path) options = reader_options if options begin MemoryMappedInputStream.open(path) do |raw_input| wrap_input(raw_input) do |input| return CSVReader.new(input, options).read end end rescue Arrow::Error::Invalid, Gio::Error end end options = update_csv_parse_options(@options, :open_csv, path) open_csv(path, **options) do |csv| read_csv(csv) end end def load_data(data) options = reader_options if options begin BufferInputStream.open(Buffer.new(data)) do |raw_input| wrap_input(raw_input) do |input| return CSVReader.new(input, options).read end end rescue Arrow::Error::Invalid, Gio::Error end end options = update_csv_parse_options(@options, :parse_csv_data, data) parse_csv_data(data, **options) do |csv| read_csv(csv) end end def selective_converter(target_index) lambda do |field, field_info| if target_index.nil? or field_info.index == target_index yield(field) else field end end end BOOLEAN_CONVERTER = lambda do |field| begin encoded_field = field.encode(CSV::ConverterEncoding) rescue EncodingError field else case encoded_field when "true" true when "false" false else field end end end ISO8601_CONVERTER = lambda do |field| begin encoded_field = field.encode(CSV::ConverterEncoding) rescue EncodingError field else begin ::Time.iso8601(encoded_field) rescue ArgumentError field end end end AVAILABLE_CSV_PARSE_OPTIONS = {} CSV.instance_method(:initialize).parameters.each do |type, name| AVAILABLE_CSV_PARSE_OPTIONS[name] = true if type == :key end def update_csv_parse_options(options, create_csv, *args) if options.key?(:converters) new_options = options.dup else converters = [:all, BOOLEAN_CONVERTER, ISO8601_CONVERTER] new_options = options.merge(converters: converters) end # TODO: Support :schema and :column_types unless AVAILABLE_CSV_PARSE_OPTIONS.empty? new_options.select! do |key, value| AVAILABLE_CSV_PARSE_OPTIONS.key?(key) end end unless options.key?(:headers) __send__(create_csv, *args, **new_options) do |csv| new_options[:headers] = have_header?(csv) end end unless options.key?(:converters) __send__(create_csv, *args, **new_options) do |csv| new_options[:converters] = detect_robust_converters(csv) end end new_options end def have_header?(csv) if @options.key?(:headers) return @options[:headers] end row1 = csv.shift return false if row1.nil? return false if row1.any?(&:nil?) row2 = csv.shift return nil if row2.nil? return true if row2.any?(&:nil?) return false if row1.any? {|value| not value.is_a?(String)} if row1.collect(&:class) != row2.collect(&:class) return true end nil end def detect_robust_converters(csv) column_types = [] csv.each do |row| if row.is_a?(CSV::Row) each_value = Enumerator.new do |yielder| row.each do |_name, value| yielder << value end end else each_value = row.each end each_value.with_index do |value, i| current_column_type = column_types[i] next if current_column_type == :string candidate_type = nil case value when nil next when "true", "false", true, false candidate_type = :boolean when Integer candidate_type = :integer if current_column_type == :float candidate_type = :float end when Float candidate_type = :float if current_column_type == :integer column_types[i] = candidate_type end when ::Time candidate_type = :time when DateTime candidate_type = :date_time when Date candidate_type = :date when String next if value.empty? candidate_type = :string else candidate_type = :string end column_types[i] ||= candidate_type if column_types[i] != candidate_type column_types[i] = :string end end end converters = [] column_types.each_with_index do |type, i| case type when :boolean converters << selective_converter(i, &BOOLEAN_CONVERTER) when :integer converters << selective_converter(i) do |field| if field.nil? or field.empty? nil else CSV::Converters[:integer].call(field) end end when :float converters << selective_converter(i) do |field| if field.nil? or field.empty? nil else CSV::Converters[:float].call(field) end end when :time converters << selective_converter(i, &ISO8601_CONVERTER) when :date_time converters << selective_converter(i, &CSV::Converters[:date_time]) when :date converters << selective_converter(i, &CSV::Converters[:date]) end end converters end end end