lib/chronicle/etl/extractors/csv_extractor.rb in chronicle-etl-0.4.0 vs lib/chronicle/etl/extractors/csv_extractor.rb in chronicle-etl-0.4.1

- old
+ new

@@ -1,42 +1,49 @@ require 'csv' module Chronicle module ETL class CSVExtractor < Chronicle::ETL::Extractor - include Extractors::Helpers::FilesystemReader + include Extractors::Helpers::InputReader register_connector do |r| - r.description = 'input as CSV' + r.description = 'CSV' end setting :headers, default: true - setting :filename, default: $stdin + def prepare + @csvs = prepare_sources + end + def extract - csv = initialize_csv - csv.each do |row| - yield Chronicle::ETL::Extraction.new(data: row.to_h) + @csvs.each do |csv| + csv.read.each do |row| + yield Chronicle::ETL::Extraction.new(data: row.to_h) + end end end def results_count - CSV.read(@config.filename, headers: @config.headers).count unless stdin?(@config.filename) + @csvs.reduce(0) do |total_rows, csv| + row_count = csv.readlines.size + csv.rewind + total_rows + row_count + end end private - def initialize_csv - headers = @config.headers.is_a?(String) ? @config.headers.split(',') : @config.headers - - csv_options = { - headers: headers, - converters: :all - } - - open_from_filesystem(filename: @config.filename) do |file| - return CSV.new(file, **csv_options) + def prepare_sources + @csvs = [] + read_input do |csv_data| + csv_options = { + headers: @config.headers.is_a?(String) ? @config.headers.split(',') : @config.headers, + converters: :all + } + @csvs << CSV.new(csv_data, **csv_options) end + @csvs end end end end