lib/chronicle/etl/transformers/transformer.rb in chronicle-etl-0.5.5 vs lib/chronicle/etl/transformers/transformer.rb in chronicle-etl-0.6.1
- old
+ new
@@ -1,69 +1,91 @@
+# frozen_string_literal: true
+
module Chronicle
module ETL
# Abstract class representing an Transformer for an ETL job
class Transformer
extend Chronicle::ETL::Registry::SelfRegistering
include Chronicle::ETL::Configurable
+ attr_reader :stashed_records
+
# Construct a new instance of this transformer. Options are passed in from a Runner
# == Parameters:
# options::
# Options for configuring this Transformer
- def initialize(extraction, options = {})
- unless extraction.is_a?(Chronicle::ETL::Extraction)
- raise Chronicle::ETL::RunnerTypeError, "Extracted should be a Chronicle::ETL::Extraction"
- end
-
- @extraction = extraction
+ def initialize(options = {})
apply_options(options)
end
- # @abstract Subclass is expected to implement #transform
- # @!method transform
- # The main entrypoint for transforming a record. Called by a Runner on each extracted record
+ # Called once for each extracted record. Can return 0 or more transformed records.
+ def call(record, &block)
+ raise ArgumentError, 'Input must be a Chronicle::ETL::Record' unless record.is_a?(Record)
- # The domain or provider-specific id of the record this transformer is working on.
- # It is useful for:
- # - de-duping records that might exist in the loader's destination
- # - building a cursor so an extractor doesn't have to start from the beginning of a
- # a source
- def id
- raise NotImplementedError
+ yielded = false
+
+ transformed_data = transform(record) do |data|
+ new_record = update_data(record, data)
+ block.call(new_record)
+
+ yielded = true
+ end
+
+ return if yielded
+
+ # Handle transformers that don't yield anything and return
+ # transformed data directly. Skip nil values.
+ [transformed_data].flatten.compact.each do |data|
+ new_record = update_data(record, data)
+ block.call(new_record)
+ end
end
- # The domain or provider-specific timestamp of the record this transformer is working on.
- # Used for building a cursor so an extractor doesn't have to start from the beginning of a
- # data source from the beginning.
- def timestamp
- raise NotImplementedError
+ def call_finish(&block)
+ remaining_records = finish
+ return if remaining_records.nil?
+
+ remaining_records.each do |record|
+ block.call(record)
+ end
end
- # An optional, human-readable identifier for a transformation, intended for debugging or logging.
- # By default, it is just the id.
- def friendly_identifier
- id
+ def transform(_record)
+ raise NotImplementedError, 'You must implement the transform method'
end
- def to_s
- ts = begin
- unknown = "???"
- timestamp&.iso8601 || unknown
- rescue TransformationError, NotImplementedError
- unknown
- end
+ # Called once after runner has processed all records
+ def finish; end
- identifier = begin
- unknown = self.class.to_s
- friendly_identifier || self.class.to_s
- rescue TransformationError, NotImplementedError
- unknown
- end
+ protected
- "[#{ts}] #{identifier}"
+ def stash_record(record)
+ @stashed_records ||= []
+ @stashed_records << record
+ nil
end
+
+ def flush_stashed_records
+ @stashed_records.tap(&:clear)
+ end
+
+ def update_data(record, new_data)
+ new_record = record.clone
+ new_record.data = new_data
+ new_record
+ end
end
end
end
require_relative 'null_transformer'
-require_relative 'image_file_transformer'
+require_relative 'sampler_transformer'
+require_relative 'buffer_transformer'
+require_relative 'multiply_transformer'
+require_relative 'sort_transformer'
+require_relative 'chronicle_transformer'
+require_relative 'format_transformer'
+require_relative 'filter_fields_transformer'
+require_relative 'fields_limit_transformer'
+require_relative 'merge_meta_transformer'
+require_relative 'filter_transformer'
+require_relative 'chronobase_transformer'