lib/chronicle/etl/job_definition.rb in chronicle-etl-0.5.5 vs lib/chronicle/etl/job_definition.rb in chronicle-etl-0.6.1

- old
+ new

@@ -1,5 +1,7 @@ +# frozen_string_literal: true + require 'active_support/core_ext/hash/deep_merge' module Chronicle module ETL class JobDefinition @@ -7,24 +9,26 @@ incremental: false, extractor: { name: 'stdin', options: {} }, - transformer: { - name: 'null', - options: {} - }, + transformers: [ + { + name: 'null', + options: {} + } + ], loader: { - name: 'table', + name: 'json', options: {} } }.freeze attr_reader :errors attr_accessor :definition - def initialize() + def initialize @definition = SKELETON_DEFINITION end def valid? validate @@ -32,30 +36,29 @@ end def validate @errors = {} - Chronicle::ETL::Registry::Connectors::PHASES.each do |phase| - __send__("#{phase}_klass".to_sym) - rescue Chronicle::ETL::PluginError => e - @errors[:plugins] ||= [] - @errors[:plugins] << e - end + extractor_klass + transformer_klasses + loader_klass + rescue Chronicle::ETL::PluginError => e + @errors[:plugins] ||= [] + @errors[:plugins] << e end def plugins_missing? validate return false unless @errors[:plugins]&.any? @errors[:plugins] - .filter { |e| e.instance_of?(Chronicle::ETL::PluginNotInstalledError) } - .any? + .any? { |e| e.instance_of?(Chronicle::ETL::PluginNotInstalledError) } end def validate! - raise(Chronicle::ETL::JobDefinitionError.new(self), "Job definition is invalid") unless valid? + raise(Chronicle::ETL::JobDefinitionError.new(self), 'Job definition is invalid') unless valid? true end # Add config hash to this definition @@ -64,23 +67,24 @@ load_credentials end # For each connector in this job, mix in secrets into the options def apply_default_secrets - Chronicle::ETL::Registry::Connectors::PHASES.each do |phase| + # FIXME: handle transformer secrets + %i[extractor loader].each do |phase| # If the option have a `secrets` key, we look up those secrets and - # mix them in. If not, use the connector's plugin name and look up + # mix them in. If not, use the connector's plugin name and look up # secrets with the same namespace if @definition[phase][:options][:secrets] namespace = @definition[phase][:options][:secrets] else # We don't want to do this lookup for built-in connectors - next if __send__("#{phase}_klass".to_sym).connector_registration.built_in? + next if __send__(:"#{phase}_klass").connector_registration.built_in? # infer plugin name from connector name and use it for secrets # namesepace - namespace = @definition[phase][:name].split(":").first + namespace = @definition[phase][:name].split(':').first end # Reverse merge secrets into connector's options (we want to preserve # options that came from job file or CLI options) secrets = Chronicle::ETL::Secrets.read(namespace) @@ -96,40 +100,48 @@ def dry_run? @definition[:dry_run] end def extractor_klass - load_klass(:extractor, @definition[:extractor][:name]) + find_connector_klass(:extractor, @definition[:extractor][:name]) end - def transformer_klass - load_klass(:transformer, @definition[:transformer][:name]) + def transformer_klasses + @definition[:transformers].map do |transformer| + find_connector_klass(:transformer, transformer[:name]) + end end def loader_klass - load_klass(:loader, @definition[:loader][:name]) + find_connector_klass(:loader, @definition[:loader][:name]) end def extractor_options @definition[:extractor][:options] end def transformer_options - @definition[:transformer][:options] + @definition[:transformers].map do |transformer| + transformer[:options] + end end def loader_options @definition[:loader][:options] end private - def load_klass(phase, identifier) + def find_schema_transformer_klass(source_klass, target) + Chronicle::ETL::Registry::Connectors.find_converter_for_source(source_klass, target).klass + end + + def find_connector_klass(phase, identifier) Chronicle::ETL::Registry::Connectors.find_by_phase_and_identifier(phase, identifier).klass end def load_credentials - Chronicle::ETL::Registry::Connectors::PHASES.each do |phase| + %i[extractor loader].each do |phase| credentials_name = @definition[phase].dig(:options, :credentials) if credentials_name credentials = Chronicle::ETL::Config.load_credentials(credentials_name) @definition[phase][:options].deep_merge(credentials) end