lib/chronicle/etl/job_definition.rb in chronicle-etl-0.5.5 vs lib/chronicle/etl/job_definition.rb in chronicle-etl-0.6.1
- old
+ new
@@ -1,5 +1,7 @@
+# frozen_string_literal: true
+
require 'active_support/core_ext/hash/deep_merge'
module Chronicle
module ETL
class JobDefinition
@@ -7,24 +9,26 @@
incremental: false,
extractor: {
name: 'stdin',
options: {}
},
- transformer: {
- name: 'null',
- options: {}
- },
+ transformers: [
+ {
+ name: 'null',
+ options: {}
+ }
+ ],
loader: {
- name: 'table',
+ name: 'json',
options: {}
}
}.freeze
attr_reader :errors
attr_accessor :definition
- def initialize()
+ def initialize
@definition = SKELETON_DEFINITION
end
def valid?
validate
@@ -32,30 +36,29 @@
end
def validate
@errors = {}
- Chronicle::ETL::Registry::Connectors::PHASES.each do |phase|
- __send__("#{phase}_klass".to_sym)
- rescue Chronicle::ETL::PluginError => e
- @errors[:plugins] ||= []
- @errors[:plugins] << e
- end
+ extractor_klass
+ transformer_klasses
+ loader_klass
+ rescue Chronicle::ETL::PluginError => e
+ @errors[:plugins] ||= []
+ @errors[:plugins] << e
end
def plugins_missing?
validate
return false unless @errors[:plugins]&.any?
@errors[:plugins]
- .filter { |e| e.instance_of?(Chronicle::ETL::PluginNotInstalledError) }
- .any?
+ .any? { |e| e.instance_of?(Chronicle::ETL::PluginNotInstalledError) }
end
def validate!
- raise(Chronicle::ETL::JobDefinitionError.new(self), "Job definition is invalid") unless valid?
+ raise(Chronicle::ETL::JobDefinitionError.new(self), 'Job definition is invalid') unless valid?
true
end
# Add config hash to this definition
@@ -64,23 +67,24 @@
load_credentials
end
# For each connector in this job, mix in secrets into the options
def apply_default_secrets
- Chronicle::ETL::Registry::Connectors::PHASES.each do |phase|
+ # FIXME: handle transformer secrets
+ %i[extractor loader].each do |phase|
# If the option have a `secrets` key, we look up those secrets and
- # mix them in. If not, use the connector's plugin name and look up
+ # mix them in. If not, use the connector's plugin name and look up
# secrets with the same namespace
if @definition[phase][:options][:secrets]
namespace = @definition[phase][:options][:secrets]
else
# We don't want to do this lookup for built-in connectors
- next if __send__("#{phase}_klass".to_sym).connector_registration.built_in?
+ next if __send__(:"#{phase}_klass").connector_registration.built_in?
# infer plugin name from connector name and use it for secrets
# namesepace
- namespace = @definition[phase][:name].split(":").first
+ namespace = @definition[phase][:name].split(':').first
end
# Reverse merge secrets into connector's options (we want to preserve
# options that came from job file or CLI options)
secrets = Chronicle::ETL::Secrets.read(namespace)
@@ -96,40 +100,48 @@
def dry_run?
@definition[:dry_run]
end
def extractor_klass
- load_klass(:extractor, @definition[:extractor][:name])
+ find_connector_klass(:extractor, @definition[:extractor][:name])
end
- def transformer_klass
- load_klass(:transformer, @definition[:transformer][:name])
+ def transformer_klasses
+ @definition[:transformers].map do |transformer|
+ find_connector_klass(:transformer, transformer[:name])
+ end
end
def loader_klass
- load_klass(:loader, @definition[:loader][:name])
+ find_connector_klass(:loader, @definition[:loader][:name])
end
def extractor_options
@definition[:extractor][:options]
end
def transformer_options
- @definition[:transformer][:options]
+ @definition[:transformers].map do |transformer|
+ transformer[:options]
+ end
end
def loader_options
@definition[:loader][:options]
end
private
- def load_klass(phase, identifier)
+ def find_schema_transformer_klass(source_klass, target)
+ Chronicle::ETL::Registry::Connectors.find_converter_for_source(source_klass, target).klass
+ end
+
+ def find_connector_klass(phase, identifier)
Chronicle::ETL::Registry::Connectors.find_by_phase_and_identifier(phase, identifier).klass
end
def load_credentials
- Chronicle::ETL::Registry::Connectors::PHASES.each do |phase|
+ %i[extractor loader].each do |phase|
credentials_name = @definition[phase].dig(:options, :credentials)
if credentials_name
credentials = Chronicle::ETL::Config.load_credentials(credentials_name)
@definition[phase][:options].deep_merge(credentials)
end