lib/chronicle/etl/extractors/csv_extractor.rb in chronicle-etl-0.2.4 vs lib/chronicle/etl/extractors/csv_extractor.rb in chronicle-etl-0.3.0

- old
+ new

@@ -1,41 +1,48 @@ require 'csv' -class Chronicle::ETL::CsvExtractor < Chronicle::ETL::Extractor - DEFAULT_OPTIONS = { - headers: true, - filename: $stdin - }.freeze - def initialize(options = {}) - super(DEFAULT_OPTIONS.merge(options)) - end +module Chronicle + module ETL + class CsvExtractor < Chronicle::ETL::Extractor + include Extractors::Helpers::FilesystemReader - def extract - csv = initialize_csv - csv.each do |row| - result = row.to_h - yield result - end - end + register_connector do |r| + r.description = 'input as CSV' + end - def results_count - CSV.read(@options[:filename], headers: @options[:headers]).count if read_from_file? - end + DEFAULT_OPTIONS = { + headers: true, + filename: $stdin + }.freeze - private + def initialize(options = {}) + super(DEFAULT_OPTIONS.merge(options)) + end - def initialize_csv - headers = @options[:headers].is_a?(String) ? @options[:headers].split(',') : @options[:headers] + def extract + csv = initialize_csv + csv.each do |row| + yield Chronicle::ETL::Extraction.new(data: row.to_h) + end + end - csv_options = { - headers: headers, - converters: :all - } + def results_count + CSV.read(@options[:filename], headers: @options[:headers]).count unless stdin?(@options[:filename]) + end - stream = read_from_file? ? File.open(@options[:filename]) : @options[:filename] - CSV.new(stream, **csv_options) - end + private - def read_from_file? - @options[:filename] != $stdin + def initialize_csv + headers = @options[:headers].is_a?(String) ? @options[:headers].split(',') : @options[:headers] + + csv_options = { + headers: headers, + converters: :all + } + + open_from_filesystem(filename: @options[:filename]) do |file| + return CSV.new(file, **csv_options) + end + end + end end end