module Tap # Workflow is a specialized type of Task allowing the encapsulation and reuse of # workflow logic. Workflows are still under construction. # # === Workflow Definition # # During initialization, Workflow executes the workflow method (by default the # block provided to Workflow.new) to define the workflow logic. This method # defines one or more entry_points and zero or more exit points, as well as # the internal logic for the workflow. # # Workflow.new do |w| # factor = w.config[:factor] || 1 # # t1 = Task.new {|task, input| input += 1 } # t2 = Task.new {|task, input| input += 10 } # t3 = Task.new {|task, input| input *= factor } # # w.app.sequence(t1, t2, t3) # w.entry_point = t1 # w.exit_point = t3 # end # # Or equivalently: # # class SimpleSequence < Workflow # config :factor, 1 # # def workflow # t1 = Task.new {|task, input| input += 5 } # t2 = Task.new {|task, input| input += 3 } # t3 = Task.new {|task, input| input *= factor } # # app.sequence(t1, t2, t3) # self.entry_point = t1 # self.exit_point = t3 # end # end # # To facilitate the specification of entry and exit points, workflow # can accomodate either single-task assignments or a collection. By # default both are hashes, but they can be reassigned: # # Workflow.new do |w| # w.entry_point.class # => Hash # w.exit_point.class # => Hash # w.entry_point[:main] = Task.new # end # # Workflow.new {|w| w.entry_point = Task.new } # Workflow.new {|w| w.entry_point = [Task.new, Task.new] } # # Access to the group of entry/exit points is standardized to an # array via the entry_points and exit_points methods. # # === Workflow Behavior # # The entry points act as an enque batch; when the workflow is enqued, the # entry points are enqued. The exit points act as an on_complete batch; their # on_complete blocks are set for workflow.on_complete. # # w = SimpleSequence.new # w.enq(0) # app.run # app.results(w.exit_points) # => [8] # # The batching of entry and exit points is distinct from workflow.batch itself. # Workflows can be batched like Tasks, such that all entry points from all # workflows in a batch are enqued at once. # # w1 = SimpleSequence.new nil, :factor => 1 # w2 = w1.initialize_batch_obj nil, :factor => -1 # # w1.enq(0) # app.run # app.results(w1.exit_points, w2.exit_points)) # => [8, -8] # class Workflow include Support::Framework class << self protected def define(name, klass=Tap::Task, &block) instance_var = "@#{name}".to_sym define_method(name) do |*args| raise ArgumentError, "wrong number of arguments (#{args.length} for 1)" if args.length > 1 instance_name = args[0] || name instance_variable_set(instance_var, {}) unless instance_variable_defined?(instance_var) instance_variable_get(instance_var)[instance_name] ||= task(instance_name, klass, &block) end define_method("#{name}=") do |input| input = {name => input} unless input.kind_of?(Hash) instance_variable_set(instance_var, input) end end end # The entry point for self. attr_accessor :entry_point # The exit point for self. attr_accessor :exit_point # The task block provided during initialization. attr_reader :task_block # Creates a new Task with the specified attributes. def initialize(config={}, name=nil, app=App.instance, &task_block) super(config, name, app) @task_block = (task_block == nil ? default_task_block : task_block) initialize_workflow end # Initializes a new batch object, running workflow to set the # instance-specific entry/exit points. Raises an error if # no entry points are defined. def initialize_copy(orig) super initialize_workflow end def initialize_workflow @entry_point = {} @exit_point = {} workflow end # Returns an array of entry points, determined from entry_point. def entry_points case @entry_point when Hash then @entry_point.values when Support::Executable then [@entry_point] when Array then @entry_point when nil then [] else raise "unable to determine entry points from entry_point: #{@entry_point}" end end # Returns an array of exit points, determined from exit_point. def exit_points case @exit_point when Hash then @exit_point.values when Support::Executable then [@exit_point] when Array then @exit_point when nil then [] else raise "unable to determine exit points from exit_point: #{@exit_point}" end end # Enqueues all entry points for self and self.batch to app # with the inputs. The number of inputs provided should match # the number of inputs required by all the entry points; # if the entry points have different input requirements, they # have to be enqued separately. def enq(*inputs) entry_points.each do |task| app.enq(task, *inputs) end end batch_function :enq # Sets the on_complete_block for all exit points for self and # self.batch. Use unbatched_on_complete to set the on_complete_block # for just self.exit_points. def on_complete(override=false, &block) exit_points.each do |task| task.on_complete(override, &block) end self end batch_function(:on_complete) {} def task(name, klass=Tap::Task, &block) configs = config[name] || {} raise ArgumentError, "config '#{name}' is not a hash" unless configs.kind_of?(Hash) klass.new(configs, name, &block) end # The workflow definition method. By default workflow # simply calls the task_block. In subclasses, workflow # should be overridden to provide the workflow definition. def workflow task_block.call(self) if task_block end protected # Hook to set a default task block. By default, nil. def default_task_block nil end end end