lib/chronicle/etl/job.rb in chronicle-etl-0.2.4 vs lib/chronicle/etl/job.rb in chronicle-etl-0.3.0

- old
+ new

@@ -1,8 +1,13 @@ +require 'forwardable' module Chronicle module ETL class Job + extend Forwardable + + def_delegators :@job_definition, :dry_run? + attr_accessor :name, :extractor_klass, :extractor_options, :transformer_klass, :transformer_options, @@ -10,62 +15,58 @@ :loader_options # TODO: build a proper id system alias id name - def initialize(definition) - definition = definition.definition # FIXME - @name = definition[:name] - @extractor_klass = load_klass(:extractor, definition[:extractor][:name]) - @extractor_options = definition[:extractor][:options] || {} + def initialize(job_definition) + @job_definition = job_definition + @name = @job_definition.definition[:name] + @extractor_options = @job_definition.extractor_options + @transformer_options = @job_definition.transformer_options + @loader_options = @job_definition.loader_options - @transformer_klass = load_klass(:transformer, definition[:transformer][:name]) - @transformer_options = definition[:transformer][:options] || {} - - @loader_klass = load_klass(:loader, definition[:loader][:name]) - @loader_options = definition[:loader][:options] || {} - - set_continuation if load_continuation? + set_continuation if use_continuation? yield self if block_given? end def instantiate_extractor - instantiate_klass(:extractor) + @extractor_klass = @job_definition.extractor_klass + @extractor_klass.new(@extractor_options) end - def instantiate_transformer(data) - instantiate_klass(:transformer, data) + def instantiate_transformer(extraction) + @transformer_klass = @job_definition.transformer_klass + @transformer_klass.new(@transformer_options, extraction) end def instantiate_loader - instantiate_klass(:loader) + @loader_klass = @job_definition.loader_klass + @loader_klass.new(@loader_options) end def save_log? # TODO: this needs more nuance return !id.nil? end - private - - def instantiate_klass(phase, *args) - options = self.send("#{phase.to_s}_options") - args = args.unshift(options) - klass = self.send("#{phase.to_s}_klass") - klass.new(*args) + def to_s + output = "Job" + output += " '#{name}'".bold if name + output += "\n" + output += " → Extracting from #{@job_definition.extractor_klass.description}\n" + output += " → Transforming #{@job_definition.transformer_klass.description}\n" + output += " → Loading to #{@job_definition.loader_klass.description}\n" end - def load_klass phase, identifier - Chronicle::ETL::Catalog.phase_and_identifier_to_klass(phase, identifier) - end + private def set_continuation - continuation = Chronicle::ETL::JobLogger.load_latest(@job_id) + continuation = Chronicle::ETL::JobLogger.load_latest(@id) @extractor_options[:continuation] = continuation end - def load_continuation? - save_log? + def use_continuation? + @job_definition.incremental? end end end end