lib/remi/job.rb in remi-0.2.42 vs lib/remi/job.rb in remi-0.3.0
- old
+ new
@@ -1,176 +1,342 @@
module Remi
- module Job
- module JobClassMethods
- attr_accessor :params
- attr_accessor :sources
- attr_accessor :targets
- attr_accessor :transforms
- def define_param(key, value)
- @params ||= Hash.new { |h, key| raise "Parameter #{key} is not defined" }
- @params[key] = value
+ # The Job class is the foundation for all Remi ETL jobs. It
+ # provides a DSL for defining Remi jobs in a way that is natural for
+ # ETL style applications. In a Remi job, the user defines all of
+ # the sources, transforms, and targets necessary to transform data.
+ # Any number of sources, transforms, and targets can be defined.
+ # Transforms can call other parameterized sub-transforms. Jobs can
+ # collect data from other parameterized sub-jobs, pass data to other
+ # sub-jobs, or both pass and collect data from other sub-jobs.
+ #
+ # Jobs are executed by calling the `#execute` method in an instance
+ # of the job. This triggers all transforms to be executed in the
+ # order they are defined. Sub-transforms are only executed if they
+ # are referenced in a transform. After all transforms have
+ # executed, the targets are loaded in the order they are defined.
+ #
+ #
+ #
+ # @example
+ #
+ # class MyJob < Remi::Job
+ # source :my_csv_file do
+ # extractor my_extractor
+ # parser my_parser
+ # enforce_types
+ # end
+ #
+ # target :my_transformed_file do
+ # loader my_loader
+ # end
+ #
+ # transform :transform_data do
+ # # Data sources are converted into a dataframe the first time the #df method is called.
+ # transform_work = my_csv_file.df.dup # => a copy of the my_csv_file.df dataframe
+ #
+ # # Any arbitrary Ruby is allowed in a transform block. Remi provides a convenient
+ # # source to target map DSL to map fields from sources to targets
+ # Remi::SourceToTargetMap.apply(transform_work, my_transformed_file.df) do
+ # map source(:source_field_id) .target(:prefixed_id)
+ # .transform(->(v) { "PREFIX#{v}" })
+ # end
+ # end
+ # end
+ #
+ # # The job is executed when `#execute` is called on an instance of the job.
+ # # Transforms are executed in the order they are defined. Targets are loaded
+ # # in the order they are defined after all transforms have been executed.
+ # job = MyJob.new
+ # job.execute
+ #
+ #
+ #
+ # @todo MOAR Examples! Subtransforms, subjobs, parameters, references to even more
+ # complete sample jobs.
+ class Job
+ class << self
+
+ def inherited(base)
+ base.instance_variable_set(:@params, params.clone)
+ base.instance_variable_set(:@sources, sources.dup)
+ base.instance_variable_set(:@targets, targets.dup)
+ base.instance_variable_set(:@transforms, transforms.dup)
+ base.instance_variable_set(:@sub_jobs, sub_jobs.dup)
end
- def define_source(name, type_class, **options)
- @sources ||= []
- @sources << name unless @sources.include? name
+ # @return [Job::Parameters] all parameters defined at the class level
+ def params
+ @params ||= Parameters.new
+ end
- define_method(name) do
- iv_name = instance_variable_get("@#{name}")
- return iv_name if iv_name
+ # Defines a job parameter.
+ # @example
+ #
+ # class MyJob < Job
+ # param(:my_param) { 'the best parameter' }
+ # end
+ #
+ # job = MyJob.new
+ # job.params[:my_param] #=> 'the best parameter'
+ def param(name, &block)
+ params.__define__(name, &block)
+ end
- source = type_class.new(options)
- instance_variable_set("@#{name}", source)
- end
+ # @return [Array<Symbol>] the list of data source names
+ def sources
+ @sources ||= []
end
- def define_target(name, type_class, **options)
- @targets ||= []
- @targets << name unless @targets.include? name
- define_method(name) do
- iv_name = instance_variable_get("@#{name}")
- return iv_name if iv_name
+ # @return [Array<Symbol>] the list of sub-jobs
+ def sub_jobs
+ @sub_jobs ||= []
+ end
- target = type_class.new(options)
- instance_variable_set("@#{name}", target)
+ # Defines a sub job resource for this job.
+ # Note that the return value of the DSL block must be an instance of a Remi::Job
+ # @example
+ #
+ # class MyJob < Job
+ # sub_job(:my_sub_job) { MySubJob.new }
+ # end
+ #
+ # job = MyJob.new
+ # job.sub_job.job #=> An instance of MySubJob
+ def sub_job(name, &block)
+ sub_jobs << name unless sub_jobs.include? name
+ attr_accessor name
+
+ define_method("__init_#{name}__".to_sym) do
+ sub_job = Job::SubJob.new(self, name: name, &block)
+ instance_variable_set("@#{name}", sub_job)
end
end
- def define_transform(name, sources: [], targets: [], &block)
- @transforms ||= {}
- @transforms[name] = { sources: Array(sources), targets: Array(targets) }
+ # Defines a data source.
+ # @example
+ #
+ # class MyJob < Job
+ # source :my_source do
+ # extractor my_extractor
+ # parser my_parser
+ # end
+ # end
+ #
+ # job = MyJob.new
+ # job.my_source.df #=> a dataframe generated after extracting and parsing
+ def source(name, &block)
+ sources << name unless sources.include? name
+ attr_accessor name
- define_method(name) do
- instance_eval { @logger.info "Running transformation #{__method__}" }
- instance_eval(&block)
+ define_method("__init_#{name}__".to_sym) do
+ source = DataSource.new(self, name: name, &block)
+ instance_variable_set("@#{name}", source)
end
end
- def params
- @params || {}
+ # @return [Array<Symbol>] the list of data target names
+ def targets
+ @targets ||= []
end
- def sources
- @sources || []
- end
+ # Defines a data target.
+ # @example
+ #
+ # class MyJob < Job
+ # target :my_target do
+ # extractor my_extractor
+ # parser my_parser
+ # end
+ # end
+ #
+ # job = MyJob.new
+ # job.my_target.df #=> a dataframe generated after extracting and parsing
+ def target(name, &block)
+ targets << name unless targets.include? name
+ attr_accessor name
- def targets
- @targets || []
+ define_method("__init_#{name}__".to_sym) do
+ target = DataTarget.new(self, name: name, &block)
+ instance_variable_set("@#{name}", target)
+ end
end
+ # @return [Array<Symbol>] the list of transform names
def transforms
- @transforms || {}
+ @transforms ||= []
end
+ # Defines a transform.
+ # @example
+ #
+ # class MyJob < Job
+ # transform :my_transform do
+ # puts "hello from my_transform!"
+ # end
+ # end
+ #
+ # job = MyJob.new
+ # job.my_transform.execute #=>(stdout) 'hello from my_transform!'
+ def transform(name, &block)
+ transforms << name unless transforms.include? name
+ attr_accessor name
- def work_dir
- Settings.work_dir
+ define_method("__init_#{name}__".to_sym) do
+ transform = Transform.new(self, name: name, &block)
+ instance_variable_set("@#{name}", transform)
+ end
end
- def self.extended(receiver)
+ # Defines a sub-transform.
+ # @example
+ #
+ # class MyJob < Job
+ # sub_transform :my_sub_transform, greeting: 'hello' do
+ # puts "#{params[:greeting]} from my_sub_transform!"
+ # end
+ #
+ # transform :my_transform do
+ # import :my_sub_transform, greeting: 'bonjour' do
+ # end
+ # end
+ # end
+ #
+ # job = MyJob.new
+ # job.my_transform.execute #=>(stdout) 'bonjour from my_sub_transform!'
+ def sub_transform(name, **kargs, &block)
+ define_method(name) do
+ Transform.new(self, name: name, **kargs, &block)
+ end
end
-
- def included(receiver)
- receiver.extend(JobClassMethods)
- receiver.params = self.params.merge(receiver.params)
- receiver.sources = self.sources + receiver.sources
- receiver.targets = self.targets + receiver.targets
- receiver.transforms = self.transforms.merge(receiver.transforms)
- end
end
- def self.included(receiver)
- receiver.extend(JobClassMethods)
+ # Initializes the job
+ #
+ # @param work_dir [String, Path] sets the working directory for this job
+ # @param logger [Object] sets the logger for the job
+ # @param kargs [Hash] Optional job parameters (can be referenced in the job via `#params`)
+ def initialize(work_dir: Settings.work_dir, logger: Settings.logger, **kargs)
+ @work_dir = work_dir
+ @logger = logger
+ create_work_dir
+
+ __init_params__ **kargs
+ __init_sub_jobs__
+ __init_sources__
+ __init_targets__
+ __init_transforms__
end
+ # @return [String] the working directory used for temporary data
+ attr_reader :work_dir
- def params
- self.class.params
+ # @return [Object] the logging object
+ attr_reader :logger
+
+ # @return [Job::Parameters] parameters defined at the class level or during instantiation
+ attr_reader :params
+
+ # @return [Array] list of sub_jobs defined in the job
+ attr_reader :sub_jobs
+
+ # @return [Array] list of sources defined in the job
+ attr_reader :sources
+
+ # @return [Array] list of targets defined in the job
+ attr_reader :targets
+
+ # @return [Array] list of transforms defined in the job
+ attr_reader :transforms
+
+
+ # Creates a temporary working directory for the job
+ def create_work_dir
+ @logger.info "Creating working directory #{work_dir}"
+ FileUtils.mkdir_p work_dir
end
- def sources
- self.class.sources
+
+ # @return [self] the job object (needed to reference parent job in transform DSL)
+ def job
+ self
end
- def targets
- self.class.targets
+ def to_s
+ inspect
end
- def transforms
- self.class.transforms
+ def inspect
+ "#<#{Remi::Job}>: #{self.class}\n" +
+ " parameters: #{params.to_h.keys}\n" +
+ " sources: #{sources}\n" +
+ " targets: #{targets}\n" +
+ " transforms: #{transforms}"
end
-
- def initialize(runtime_params: {}, delete_work_dir: true, logger: Settings.logger)
- @runtime_params = runtime_params
- @delete_work_dir = delete_work_dir
- @logger = logger
- create_work_dir
+ # Execute the specified components of the job.
+ #
+ # @param components [Array<symbol>] list of components to execute (e.g., `:transforms`, `:load_targets`)
+ #
+ # @return [self]
+ def execute(*components)
+ execute_transforms if components.empty? || components.include?(:transforms)
+ execute_load_targets if components.empty? || components.include?(:load_targets)
+ self
end
- attr_accessor :runtime_params
+ private
- def work_dir
- self.class.work_dir
+ def __init_params__(**kargs)
+ @params = self.class.params.clone
+ add_params **kargs
+ params.context = self
end
- def finalize
- delete_work_dir
+ def __init_sub_jobs__
+ @sub_jobs = self.class.sub_jobs
+ @sub_jobs.each do |sub_job|
+ send("__init_#{sub_job}__".to_sym)
+ end
end
- def delete_work_dir
- if @delete_work_dir && (work_dir.match /^#{Dir.tmpdir}/)
- @logger.info "Deleting temporary directory #{work_dir}"
- FileUtils.rm_r work_dir
- else
- @logger.debug "Not going to delete working directory #{work_dir}"
- nil
+ def __init_sources__
+ @sources = self.class.sources
+ @sources.each do |source|
+ send("__init_#{source}__".to_sym)
end
end
- def create_work_dir
- @logger.info "Creating working directory #{work_dir}"
- FileUtils.mkdir_p work_dir
+ def __init_targets__
+ @targets = self.class.targets
+ @targets.each do |target|
+ send("__init_#{target}__".to_sym)
+ end
end
- # Public: Runs any transforms that use the sources and targets selected. If
- # source and target is not specified, then all transforms will be run.
- # If only the source is specified, then all transforms that use any of the
- # sources will be run. Same for specified transforms.
- #
- # sources - Array of source names
- # targets - Array of target names
- #
- # Returns an array containing the result of each transform.
- def run_transforms_using(sources: nil, targets: nil)
- transforms.map do |t, st|
- selected_sources = (st[:sources] & Array(sources || st[:sources])).size > 0
- selected_targets = (st[:targets] & Array(targets || st[:targets])).size > 0
- self.send(t) if selected_sources && selected_targets
+ def __init_transforms__
+ @transforms = self.class.transforms
+ @transforms.each do |transform|
+ send("__init_#{transform}__".to_sym)
end
end
- def run_all_transforms
- transforms.map { |t, st| self.send(t) }
+ # Executes all transforms defined
+ def execute_transforms
+ transforms.map { |t| send(t).execute }
+ self
end
- def load_all_targets
- targets.each do |target|
- @logger.info "Loading target #{target}"
- self.send(target).tap { |t| t.respond_to?(:load) ? t.load : nil }
- end
+ # Loads all targets defined
+ def execute_load_targets
+ targets.each { |t| send(t).load }
+ self
end
- # Public: Runs all transforms defined in the job.
- #
- # Returns the job instance.
- def run
- # Do all of the stuff here
- run_all_transforms
- load_all_targets
- self
+ # Adds all parameters listed to the job parameters
+ def add_params(**kargs)
+ kargs.each { |k,v| params[k] = v }
end
end
end