lib/remi/job.rb in remi-0.2.42 vs lib/remi/job.rb in remi-0.3.0

- old
+ new

@@ -1,176 +1,342 @@ module Remi - module Job - module JobClassMethods - attr_accessor :params - attr_accessor :sources - attr_accessor :targets - attr_accessor :transforms - def define_param(key, value) - @params ||= Hash.new { |h, key| raise "Parameter #{key} is not defined" } - @params[key] = value + # The Job class is the foundation for all Remi ETL jobs. It + # provides a DSL for defining Remi jobs in a way that is natural for + # ETL style applications. In a Remi job, the user defines all of + # the sources, transforms, and targets necessary to transform data. + # Any number of sources, transforms, and targets can be defined. + # Transforms can call other parameterized sub-transforms. Jobs can + # collect data from other parameterized sub-jobs, pass data to other + # sub-jobs, or both pass and collect data from other sub-jobs. + # + # Jobs are executed by calling the `#execute` method in an instance + # of the job. This triggers all transforms to be executed in the + # order they are defined. Sub-transforms are only executed if they + # are referenced in a transform. After all transforms have + # executed, the targets are loaded in the order they are defined. + # + # + # + # @example + # + # class MyJob < Remi::Job + # source :my_csv_file do + # extractor my_extractor + # parser my_parser + # enforce_types + # end + # + # target :my_transformed_file do + # loader my_loader + # end + # + # transform :transform_data do + # # Data sources are converted into a dataframe the first time the #df method is called. + # transform_work = my_csv_file.df.dup # => a copy of the my_csv_file.df dataframe + # + # # Any arbitrary Ruby is allowed in a transform block. Remi provides a convenient + # # source to target map DSL to map fields from sources to targets + # Remi::SourceToTargetMap.apply(transform_work, my_transformed_file.df) do + # map source(:source_field_id) .target(:prefixed_id) + # .transform(->(v) { "PREFIX#{v}" }) + # end + # end + # end + # + # # The job is executed when `#execute` is called on an instance of the job. + # # Transforms are executed in the order they are defined. Targets are loaded + # # in the order they are defined after all transforms have been executed. + # job = MyJob.new + # job.execute + # + # + # + # @todo MOAR Examples! Subtransforms, subjobs, parameters, references to even more + # complete sample jobs. + class Job + class << self + + def inherited(base) + base.instance_variable_set(:@params, params.clone) + base.instance_variable_set(:@sources, sources.dup) + base.instance_variable_set(:@targets, targets.dup) + base.instance_variable_set(:@transforms, transforms.dup) + base.instance_variable_set(:@sub_jobs, sub_jobs.dup) end - def define_source(name, type_class, **options) - @sources ||= [] - @sources << name unless @sources.include? name + # @return [Job::Parameters] all parameters defined at the class level + def params + @params ||= Parameters.new + end - define_method(name) do - iv_name = instance_variable_get("@#{name}") - return iv_name if iv_name + # Defines a job parameter. + # @example + # + # class MyJob < Job + # param(:my_param) { 'the best parameter' } + # end + # + # job = MyJob.new + # job.params[:my_param] #=> 'the best parameter' + def param(name, &block) + params.__define__(name, &block) + end - source = type_class.new(options) - instance_variable_set("@#{name}", source) - end + # @return [Array<Symbol>] the list of data source names + def sources + @sources ||= [] end - def define_target(name, type_class, **options) - @targets ||= [] - @targets << name unless @targets.include? name - define_method(name) do - iv_name = instance_variable_get("@#{name}") - return iv_name if iv_name + # @return [Array<Symbol>] the list of sub-jobs + def sub_jobs + @sub_jobs ||= [] + end - target = type_class.new(options) - instance_variable_set("@#{name}", target) + # Defines a sub job resource for this job. + # Note that the return value of the DSL block must be an instance of a Remi::Job + # @example + # + # class MyJob < Job + # sub_job(:my_sub_job) { MySubJob.new } + # end + # + # job = MyJob.new + # job.sub_job.job #=> An instance of MySubJob + def sub_job(name, &block) + sub_jobs << name unless sub_jobs.include? name + attr_accessor name + + define_method("__init_#{name}__".to_sym) do + sub_job = Job::SubJob.new(self, name: name, &block) + instance_variable_set("@#{name}", sub_job) end end - def define_transform(name, sources: [], targets: [], &block) - @transforms ||= {} - @transforms[name] = { sources: Array(sources), targets: Array(targets) } + # Defines a data source. + # @example + # + # class MyJob < Job + # source :my_source do + # extractor my_extractor + # parser my_parser + # end + # end + # + # job = MyJob.new + # job.my_source.df #=> a dataframe generated after extracting and parsing + def source(name, &block) + sources << name unless sources.include? name + attr_accessor name - define_method(name) do - instance_eval { @logger.info "Running transformation #{__method__}" } - instance_eval(&block) + define_method("__init_#{name}__".to_sym) do + source = DataSource.new(self, name: name, &block) + instance_variable_set("@#{name}", source) end end - def params - @params || {} + # @return [Array<Symbol>] the list of data target names + def targets + @targets ||= [] end - def sources - @sources || [] - end + # Defines a data target. + # @example + # + # class MyJob < Job + # target :my_target do + # extractor my_extractor + # parser my_parser + # end + # end + # + # job = MyJob.new + # job.my_target.df #=> a dataframe generated after extracting and parsing + def target(name, &block) + targets << name unless targets.include? name + attr_accessor name - def targets - @targets || [] + define_method("__init_#{name}__".to_sym) do + target = DataTarget.new(self, name: name, &block) + instance_variable_set("@#{name}", target) + end end + # @return [Array<Symbol>] the list of transform names def transforms - @transforms || {} + @transforms ||= [] end + # Defines a transform. + # @example + # + # class MyJob < Job + # transform :my_transform do + # puts "hello from my_transform!" + # end + # end + # + # job = MyJob.new + # job.my_transform.execute #=>(stdout) 'hello from my_transform!' + def transform(name, &block) + transforms << name unless transforms.include? name + attr_accessor name - def work_dir - Settings.work_dir + define_method("__init_#{name}__".to_sym) do + transform = Transform.new(self, name: name, &block) + instance_variable_set("@#{name}", transform) + end end - def self.extended(receiver) + # Defines a sub-transform. + # @example + # + # class MyJob < Job + # sub_transform :my_sub_transform, greeting: 'hello' do + # puts "#{params[:greeting]} from my_sub_transform!" + # end + # + # transform :my_transform do + # import :my_sub_transform, greeting: 'bonjour' do + # end + # end + # end + # + # job = MyJob.new + # job.my_transform.execute #=>(stdout) 'bonjour from my_sub_transform!' + def sub_transform(name, **kargs, &block) + define_method(name) do + Transform.new(self, name: name, **kargs, &block) + end end - - def included(receiver) - receiver.extend(JobClassMethods) - receiver.params = self.params.merge(receiver.params) - receiver.sources = self.sources + receiver.sources - receiver.targets = self.targets + receiver.targets - receiver.transforms = self.transforms.merge(receiver.transforms) - end end - def self.included(receiver) - receiver.extend(JobClassMethods) + # Initializes the job + # + # @param work_dir [String, Path] sets the working directory for this job + # @param logger [Object] sets the logger for the job + # @param kargs [Hash] Optional job parameters (can be referenced in the job via `#params`) + def initialize(work_dir: Settings.work_dir, logger: Settings.logger, **kargs) + @work_dir = work_dir + @logger = logger + create_work_dir + + __init_params__ **kargs + __init_sub_jobs__ + __init_sources__ + __init_targets__ + __init_transforms__ end + # @return [String] the working directory used for temporary data + attr_reader :work_dir - def params - self.class.params + # @return [Object] the logging object + attr_reader :logger + + # @return [Job::Parameters] parameters defined at the class level or during instantiation + attr_reader :params + + # @return [Array] list of sub_jobs defined in the job + attr_reader :sub_jobs + + # @return [Array] list of sources defined in the job + attr_reader :sources + + # @return [Array] list of targets defined in the job + attr_reader :targets + + # @return [Array] list of transforms defined in the job + attr_reader :transforms + + + # Creates a temporary working directory for the job + def create_work_dir + @logger.info "Creating working directory #{work_dir}" + FileUtils.mkdir_p work_dir end - def sources - self.class.sources + + # @return [self] the job object (needed to reference parent job in transform DSL) + def job + self end - def targets - self.class.targets + def to_s + inspect end - def transforms - self.class.transforms + def inspect + "#<#{Remi::Job}>: #{self.class}\n" + + " parameters: #{params.to_h.keys}\n" + + " sources: #{sources}\n" + + " targets: #{targets}\n" + + " transforms: #{transforms}" end - - def initialize(runtime_params: {}, delete_work_dir: true, logger: Settings.logger) - @runtime_params = runtime_params - @delete_work_dir = delete_work_dir - @logger = logger - create_work_dir + # Execute the specified components of the job. + # + # @param components [Array<symbol>] list of components to execute (e.g., `:transforms`, `:load_targets`) + # + # @return [self] + def execute(*components) + execute_transforms if components.empty? || components.include?(:transforms) + execute_load_targets if components.empty? || components.include?(:load_targets) + self end - attr_accessor :runtime_params + private - def work_dir - self.class.work_dir + def __init_params__(**kargs) + @params = self.class.params.clone + add_params **kargs + params.context = self end - def finalize - delete_work_dir + def __init_sub_jobs__ + @sub_jobs = self.class.sub_jobs + @sub_jobs.each do |sub_job| + send("__init_#{sub_job}__".to_sym) + end end - def delete_work_dir - if @delete_work_dir && (work_dir.match /^#{Dir.tmpdir}/) - @logger.info "Deleting temporary directory #{work_dir}" - FileUtils.rm_r work_dir - else - @logger.debug "Not going to delete working directory #{work_dir}" - nil + def __init_sources__ + @sources = self.class.sources + @sources.each do |source| + send("__init_#{source}__".to_sym) end end - def create_work_dir - @logger.info "Creating working directory #{work_dir}" - FileUtils.mkdir_p work_dir + def __init_targets__ + @targets = self.class.targets + @targets.each do |target| + send("__init_#{target}__".to_sym) + end end - # Public: Runs any transforms that use the sources and targets selected. If - # source and target is not specified, then all transforms will be run. - # If only the source is specified, then all transforms that use any of the - # sources will be run. Same for specified transforms. - # - # sources - Array of source names - # targets - Array of target names - # - # Returns an array containing the result of each transform. - def run_transforms_using(sources: nil, targets: nil) - transforms.map do |t, st| - selected_sources = (st[:sources] & Array(sources || st[:sources])).size > 0 - selected_targets = (st[:targets] & Array(targets || st[:targets])).size > 0 - self.send(t) if selected_sources && selected_targets + def __init_transforms__ + @transforms = self.class.transforms + @transforms.each do |transform| + send("__init_#{transform}__".to_sym) end end - def run_all_transforms - transforms.map { |t, st| self.send(t) } + # Executes all transforms defined + def execute_transforms + transforms.map { |t| send(t).execute } + self end - def load_all_targets - targets.each do |target| - @logger.info "Loading target #{target}" - self.send(target).tap { |t| t.respond_to?(:load) ? t.load : nil } - end + # Loads all targets defined + def execute_load_targets + targets.each { |t| send(t).load } + self end - # Public: Runs all transforms defined in the job. - # - # Returns the job instance. - def run - # Do all of the stuff here - run_all_transforms - load_all_targets - self + # Adds all parameters listed to the job parameters + def add_params(**kargs) + kargs.each { |k,v| params[k] = v } end end end