lib/chronicle/etl/extractors/csv_extractor.rb in chronicle-etl-0.4.0 vs lib/chronicle/etl/extractors/csv_extractor.rb in chronicle-etl-0.4.1
- old
+ new
@@ -1,42 +1,49 @@
require 'csv'
module Chronicle
module ETL
class CSVExtractor < Chronicle::ETL::Extractor
- include Extractors::Helpers::FilesystemReader
+ include Extractors::Helpers::InputReader
register_connector do |r|
- r.description = 'input as CSV'
+ r.description = 'CSV'
end
setting :headers, default: true
- setting :filename, default: $stdin
+ def prepare
+ @csvs = prepare_sources
+ end
+
def extract
- csv = initialize_csv
- csv.each do |row|
- yield Chronicle::ETL::Extraction.new(data: row.to_h)
+ @csvs.each do |csv|
+ csv.read.each do |row|
+ yield Chronicle::ETL::Extraction.new(data: row.to_h)
+ end
end
end
def results_count
- CSV.read(@config.filename, headers: @config.headers).count unless stdin?(@config.filename)
+ @csvs.reduce(0) do |total_rows, csv|
+ row_count = csv.readlines.size
+ csv.rewind
+ total_rows + row_count
+ end
end
private
- def initialize_csv
- headers = @config.headers.is_a?(String) ? @config.headers.split(',') : @config.headers
-
- csv_options = {
- headers: headers,
- converters: :all
- }
-
- open_from_filesystem(filename: @config.filename) do |file|
- return CSV.new(file, **csv_options)
+ def prepare_sources
+ @csvs = []
+ read_input do |csv_data|
+ csv_options = {
+ headers: @config.headers.is_a?(String) ? @config.headers.split(',') : @config.headers,
+ converters: :all
+ }
+ @csvs << CSV.new(csv_data, **csv_options)
end
+ @csvs
end
end
end
end