module Taskinator class Builder attr_reader :process attr_reader :definition attr_reader :args attr_reader :builder_options def initialize(process, definition, *args) @process = process @definition = definition @builder_options = args.last.is_a?(Hash) ? args.pop : {} @args = args @executor = Taskinator::Executor.new(@definition) end def option?(key, &block) yield if builder_options[key] end # defines a sub process of tasks which are executed sequentially def sequential(options={}, &block) raise ArgumentError, 'block' unless block_given? sub_process = Process.define_sequential_process_for(@definition, options) task = define_sub_process_task(@process, sub_process, options) Builder.new(sub_process, @definition, *@args).instance_eval(&block) @process.tasks << task if sub_process.tasks.any? nil end # defines a sub process of tasks which are executed concurrently def concurrent(complete_on=CompleteOn::Default, options={}, &block) raise ArgumentError, 'block' unless block_given? sub_process = Process.define_concurrent_process_for(@definition, complete_on, options) task = define_sub_process_task(@process, sub_process, options) Builder.new(sub_process, @definition, *@args).instance_eval(&block) @process.tasks << task if sub_process.tasks.any? nil end # dynamically defines tasks, using the given @iterator method # the definition will be evaluated for each yielded item def for_each(method, options={}, &block) raise ArgumentError, 'method' if method.nil? raise NoMethodError, method unless @executor.respond_to?(method) raise ArgumentError, 'block' unless block_given? # # `for_each` is an exception, since it invokes the definition # in order to yield elements to the builder, and any options passed # are included with the builder options # method_args = options.any? ? [*@args, options] : @args @executor.send(method, *method_args) do |*args| Builder.new(@process, @definition, *args).instance_eval(&block) end nil end alias_method :transform, :for_each # defines a task which executes the given @method def task(method, options={}) raise ArgumentError, 'method' if method.nil? raise NoMethodError, method unless @executor.respond_to?(method) define_step_task(@process, method, @args, options) nil end # defines a task which executes the given @job # which is expected to implement a perform method either as a class or instance method def job(job, options={}) raise ArgumentError, 'job' if job.nil? raise ArgumentError, 'job' unless job.methods.include?(:perform) || job.instance_methods.include?(:perform) define_job_task(@process, job, @args, options) nil end # TODO: add mailer # TODO: add complete! # TODO: add fail! # defines a sub process task, for the given @definition # the definition specified must have input compatible arguments # to the current definition def sub_process(definition, options={}) raise ArgumentError, 'definition' if definition.nil? raise ArgumentError, "#{definition.name} does not extend the #{Definition.name} module" unless definition.kind_of?(Definition) sub_process = definition.create_sub_process(*@args, combine_options(options)) task = define_sub_process_task(@process, sub_process, options) Builder.new(sub_process, definition, *@args) @process.tasks << task if sub_process.tasks.any? nil end private def define_step_task(process, method, args, options={}) define_task(process) { Task.define_step_task(process, method, args, combine_options(options)) } end def define_job_task(process, job, args, options={}) define_task(process) { Task.define_job_task(process, job, args, combine_options(options)) } end def define_sub_process_task(process, sub_process, options={}) Task.define_sub_process_task(process, sub_process, combine_options(options)) end def define_task(process) process.tasks << task = yield task end def combine_options(options={}) builder_options.merge(options) end end end