lib/remi/data_source/csv_file.rb in remi-0.2.19 vs lib/remi/data_source/csv_file.rb in remi-0.2.20
- old
+ new
@@ -1,10 +1,12 @@
module Remi
module DataSource
class CsvFile
include DataSource
+ using Remi::Refinements::Daru
+
def self.default_csv_options
CSV::DEFAULT_OPTIONS.merge({
headers: true,
header_converters: Remi::FieldSymbolizers[:standard],
col_sep: ',',
@@ -12,14 +14,15 @@
quote_char: '"'
})
end
- def initialize(fields: {}, extractor:, csv_options: {}, logger: Remi::Settings.logger)
+ def initialize(fields: {}, extractor:, csv_options: {}, filename_field: nil, logger: Remi::Settings.logger)
@fields = fields
self.extractor = extractor
@csv_options = self.class.default_csv_options.merge(csv_options)
+ @filename_field = filename_field
@logger = logger
end
attr_accessor :fields
attr_reader :extractor
@@ -28,29 +31,34 @@
def field_symbolizer
self.class.default_csv_options[:header_converters]
end
def extract
- Array(@extractor.extract).tap { |x| raise "Multiple files not supported" if x.size > 1 }
+ @extracted = Array(@extractor.extract)
end
+ def extracted
+ @extracted || extract
+ end
+
def extractor=(arg)
case arg
when Extractor::SftpFile, Extractor::LocalFile
@extractor = arg
when String
- @extractor = Extractor::LocalFile.new(arg)
+ @extractor = Extractor::LocalFile.new(path: arg)
when Regexp
raise "Adding regex matching to local files would be easy, not done yet"
else
raise "Unknown extractor of type #{arg.class}: #{arg}"
end
end
# Only going to support single file for now
def source_filename
- @source_filename ||= extract.first
+ raise "Multiple source files detected" if extracted.size > 1
+ @source_filename ||= extracted.first
end
def first_line
# Readline assumes \n line endings. Strip out \r if it is a DOS file.
@first_line ||= File.open(source_filename) do |f|
@@ -65,11 +73,24 @@
def valid_headers?
(fields.keys - headers).empty?
end
def to_dataframe
- @logger.info "Converting #{source_filename} to a dataframe"
- Daru::DataFrame.from_csv source_filename, @csv_options
+ # Assumes that each file has exactly the same structure
+ result_df = nil
+ extracted.each_with_index do |filename, idx|
+ @logger.info "Converting #{filename} to a dataframe"
+ csv_df = Daru::DataFrame.from_csv filename, @csv_options
+
+ csv_df[@filename_field] = Daru::Vector.new([filename] * csv_df.size, index: csv_df.index) if @filename_field
+ if idx == 0
+ result_df = csv_df
+ else
+ result_df = result_df.concat csv_df
+ end
+ end
+
+ result_df
end
def df
@dataframe ||= to_dataframe
end