lib/chronicle/etl/transformers/transformer.rb in chronicle-etl-0.5.5 vs lib/chronicle/etl/transformers/transformer.rb in chronicle-etl-0.6.1

- old
+ new

@@ -1,69 +1,91 @@ +# frozen_string_literal: true + module Chronicle module ETL # Abstract class representing an Transformer for an ETL job class Transformer extend Chronicle::ETL::Registry::SelfRegistering include Chronicle::ETL::Configurable + attr_reader :stashed_records + # Construct a new instance of this transformer. Options are passed in from a Runner # == Parameters: # options:: # Options for configuring this Transformer - def initialize(extraction, options = {}) - unless extraction.is_a?(Chronicle::ETL::Extraction) - raise Chronicle::ETL::RunnerTypeError, "Extracted should be a Chronicle::ETL::Extraction" - end - - @extraction = extraction + def initialize(options = {}) apply_options(options) end - # @abstract Subclass is expected to implement #transform - # @!method transform - # The main entrypoint for transforming a record. Called by a Runner on each extracted record + # Called once for each extracted record. Can return 0 or more transformed records. + def call(record, &block) + raise ArgumentError, 'Input must be a Chronicle::ETL::Record' unless record.is_a?(Record) - # The domain or provider-specific id of the record this transformer is working on. - # It is useful for: - # - de-duping records that might exist in the loader's destination - # - building a cursor so an extractor doesn't have to start from the beginning of a - # a source - def id - raise NotImplementedError + yielded = false + + transformed_data = transform(record) do |data| + new_record = update_data(record, data) + block.call(new_record) + + yielded = true + end + + return if yielded + + # Handle transformers that don't yield anything and return + # transformed data directly. Skip nil values. + [transformed_data].flatten.compact.each do |data| + new_record = update_data(record, data) + block.call(new_record) + end end - # The domain or provider-specific timestamp of the record this transformer is working on. - # Used for building a cursor so an extractor doesn't have to start from the beginning of a - # data source from the beginning. - def timestamp - raise NotImplementedError + def call_finish(&block) + remaining_records = finish + return if remaining_records.nil? + + remaining_records.each do |record| + block.call(record) + end end - # An optional, human-readable identifier for a transformation, intended for debugging or logging. - # By default, it is just the id. - def friendly_identifier - id + def transform(_record) + raise NotImplementedError, 'You must implement the transform method' end - def to_s - ts = begin - unknown = "???" - timestamp&.iso8601 || unknown - rescue TransformationError, NotImplementedError - unknown - end + # Called once after runner has processed all records + def finish; end - identifier = begin - unknown = self.class.to_s - friendly_identifier || self.class.to_s - rescue TransformationError, NotImplementedError - unknown - end + protected - "[#{ts}] #{identifier}" + def stash_record(record) + @stashed_records ||= [] + @stashed_records << record + nil end + + def flush_stashed_records + @stashed_records.tap(&:clear) + end + + def update_data(record, new_data) + new_record = record.clone + new_record.data = new_data + new_record + end end end end require_relative 'null_transformer' -require_relative 'image_file_transformer' +require_relative 'sampler_transformer' +require_relative 'buffer_transformer' +require_relative 'multiply_transformer' +require_relative 'sort_transformer' +require_relative 'chronicle_transformer' +require_relative 'format_transformer' +require_relative 'filter_fields_transformer' +require_relative 'fields_limit_transformer' +require_relative 'merge_meta_transformer' +require_relative 'filter_transformer' +require_relative 'chronobase_transformer'