lib/chronicle/etl/extractors/csv_extractor.rb in chronicle-etl-0.2.4 vs lib/chronicle/etl/extractors/csv_extractor.rb in chronicle-etl-0.3.0
- old
+ new
@@ -1,41 +1,48 @@
require 'csv'
-class Chronicle::ETL::CsvExtractor < Chronicle::ETL::Extractor
- DEFAULT_OPTIONS = {
- headers: true,
- filename: $stdin
- }.freeze
- def initialize(options = {})
- super(DEFAULT_OPTIONS.merge(options))
- end
+module Chronicle
+ module ETL
+ class CsvExtractor < Chronicle::ETL::Extractor
+ include Extractors::Helpers::FilesystemReader
- def extract
- csv = initialize_csv
- csv.each do |row|
- result = row.to_h
- yield result
- end
- end
+ register_connector do |r|
+ r.description = 'input as CSV'
+ end
- def results_count
- CSV.read(@options[:filename], headers: @options[:headers]).count if read_from_file?
- end
+ DEFAULT_OPTIONS = {
+ headers: true,
+ filename: $stdin
+ }.freeze
- private
+ def initialize(options = {})
+ super(DEFAULT_OPTIONS.merge(options))
+ end
- def initialize_csv
- headers = @options[:headers].is_a?(String) ? @options[:headers].split(',') : @options[:headers]
+ def extract
+ csv = initialize_csv
+ csv.each do |row|
+ yield Chronicle::ETL::Extraction.new(data: row.to_h)
+ end
+ end
- csv_options = {
- headers: headers,
- converters: :all
- }
+ def results_count
+ CSV.read(@options[:filename], headers: @options[:headers]).count unless stdin?(@options[:filename])
+ end
- stream = read_from_file? ? File.open(@options[:filename]) : @options[:filename]
- CSV.new(stream, **csv_options)
- end
+ private
- def read_from_file?
- @options[:filename] != $stdin
+ def initialize_csv
+ headers = @options[:headers].is_a?(String) ? @options[:headers].split(',') : @options[:headers]
+
+ csv_options = {
+ headers: headers,
+ converters: :all
+ }
+
+ open_from_filesystem(filename: @options[:filename]) do |file|
+ return CSV.new(file, **csv_options)
+ end
+ end
+ end
end
end