lib/chronicle/etl/job.rb in chronicle-etl-0.2.4 vs lib/chronicle/etl/job.rb in chronicle-etl-0.3.0
- old
+ new
@@ -1,8 +1,13 @@
+require 'forwardable'
module Chronicle
module ETL
class Job
+ extend Forwardable
+
+ def_delegators :@job_definition, :dry_run?
+
attr_accessor :name,
:extractor_klass,
:extractor_options,
:transformer_klass,
:transformer_options,
@@ -10,62 +15,58 @@
:loader_options
# TODO: build a proper id system
alias id name
- def initialize(definition)
- definition = definition.definition # FIXME
- @name = definition[:name]
- @extractor_klass = load_klass(:extractor, definition[:extractor][:name])
- @extractor_options = definition[:extractor][:options] || {}
+ def initialize(job_definition)
+ @job_definition = job_definition
+ @name = @job_definition.definition[:name]
+ @extractor_options = @job_definition.extractor_options
+ @transformer_options = @job_definition.transformer_options
+ @loader_options = @job_definition.loader_options
- @transformer_klass = load_klass(:transformer, definition[:transformer][:name])
- @transformer_options = definition[:transformer][:options] || {}
-
- @loader_klass = load_klass(:loader, definition[:loader][:name])
- @loader_options = definition[:loader][:options] || {}
-
- set_continuation if load_continuation?
+ set_continuation if use_continuation?
yield self if block_given?
end
def instantiate_extractor
- instantiate_klass(:extractor)
+ @extractor_klass = @job_definition.extractor_klass
+ @extractor_klass.new(@extractor_options)
end
- def instantiate_transformer(data)
- instantiate_klass(:transformer, data)
+ def instantiate_transformer(extraction)
+ @transformer_klass = @job_definition.transformer_klass
+ @transformer_klass.new(@transformer_options, extraction)
end
def instantiate_loader
- instantiate_klass(:loader)
+ @loader_klass = @job_definition.loader_klass
+ @loader_klass.new(@loader_options)
end
def save_log?
# TODO: this needs more nuance
return !id.nil?
end
- private
-
- def instantiate_klass(phase, *args)
- options = self.send("#{phase.to_s}_options")
- args = args.unshift(options)
- klass = self.send("#{phase.to_s}_klass")
- klass.new(*args)
+ def to_s
+ output = "Job"
+ output += " '#{name}'".bold if name
+ output += "\n"
+ output += " → Extracting from #{@job_definition.extractor_klass.description}\n"
+ output += " → Transforming #{@job_definition.transformer_klass.description}\n"
+ output += " → Loading to #{@job_definition.loader_klass.description}\n"
end
- def load_klass phase, identifier
- Chronicle::ETL::Catalog.phase_and_identifier_to_klass(phase, identifier)
- end
+ private
def set_continuation
- continuation = Chronicle::ETL::JobLogger.load_latest(@job_id)
+ continuation = Chronicle::ETL::JobLogger.load_latest(@id)
@extractor_options[:continuation] = continuation
end
- def load_continuation?
- save_log?
+ def use_continuation?
+ @job_definition.incremental?
end
end
end
end