lib/tap/app.rb in bahuvrihi-tap-0.10.6 vs lib/tap/app.rb in bahuvrihi-tap-0.10.7

- old
+ new

@@ -1,7 +1,8 @@ require 'logger' require 'tap/support/aggregator' +require 'tap/support/dependencies' require 'tap/support/executable_queue' module Tap module Support autoload(:Combinator, 'tap/support/combinator') @@ -75,12 +76,12 @@ # Tasks can be batched, allowing the same input to be enqued to multiple # tasks at once. # # t1 = Task.new {|task, input| input += 1 } # t2 = Task.new {|task, input| input += 10 } - # Task.batch(t1, t2) # => [t1, t2] # + # t1.batch_with(t2) # t1.enq 0 # # app.run # app.results(t1) # => [1] # app.results(t2) # => [10] @@ -189,10 +190,12 @@ # A Tap::Support::Aggregator to collect the results of # methods that have no <tt>on_complete</tt> block attr_reader :aggregator + attr_reader :dependencies + config :debug, false, &c.flag # Flag debugging config :force, false, &c.flag # Force execution at checkpoints config :quiet, false, &c.flag # Suppress logging config :verbose, false, &c.flag # Enables extra logging (overrides quiet) @@ -220,10 +223,11 @@ super() @state = State::READY @queue = Support::ExecutableQueue.new @aggregator = Support::Aggregator.new + @dependencies = Support::Dependencies.new initialize_config(config) self.logger = logger end @@ -268,13 +272,11 @@ # Executes the input Executable with the inputs. Stores the result in # aggregator unless an on_complete block is set. Returns the audited # result. def execute(m, inputs) - _result = m._execute(*inputs) - aggregator.store(_result) unless m.on_complete_block - _result + m._execute(*inputs) end # Sets state = State::READY unless the app is running. Returns self. def ready self.state = State::READY unless self.state == State::RUN @@ -369,145 +371,11 @@ # Returns the enqued method. def mq(object, method_name, *inputs) m = object._method(method_name) enq(m, *inputs) end - - # Sets a sequence workflow pattern for the tasks; each task will enque - # the next task with it's results. - # - # Notes: - # - Batched tasks will have the pattern set for each task in the batch - # - The current audited results are yielded to the block, if given, - # before the next task is enqued. - # - Executables may provided as well as tasks. - def sequence(tasks) # :yields: _result - current_task = tasks.shift - tasks.each do |next_task| - # simply pass results from one task to the next. - current_task.on_complete do |_result| - yield(_result) if block_given? - enq(next_task, _result) - end - current_task = next_task - end - end - - # Sets a fork workflow pattern for the source task; each target - # will enque the results of source. - # - # Notes: - # - Batched tasks will have the pattern set for each task in the batch - # - The current audited results are yielded to the block, if given, - # before the next task is enqued. - # - Executables may provided as well as tasks. - def fork(source, targets) # :yields: _result - source.on_complete do |_result| - targets.each do |target| - yield(_result) if block_given? - enq(target, _result) - end - end - end - - # Sets a simple merge workflow pattern for the source tasks. Each source - # enques target with it's result; no synchronization occurs, nor are - # results grouped before being sent to the target. - # - # Notes: - # - Batched tasks will have the pattern set for each task in the batch - # - The current audited results are yielded to the block, if given, - # before the next task is enqued. - # - Executables may provided as well as tasks. - def merge(target, sources) # :yields: _result - sources.each do |source| - # merging can use the existing audit trails... each distinct - # input is getting sent to one place (the target) - source.on_complete do |_result| - yield(_result) if block_given? - enq(target, _result) - end - end - end - - # Sets a synchronized merge workflow for the source tasks. Results from - # each source task are collected and enqued as a single group to the target. - # The target is not enqued until all sources have completed. Raises an - # error if a source returns twice before the target is enqued. - # - # Notes: - # - Batched tasks will have the pattern set for each task in the batch - # - The current audited results are yielded to the block, if given, - # before the next task is enqued. - # - Executables may provided as well as tasks. - # - #-- TODO: add notes on testing and the way results are received - # (ie as a single object) - def sync_merge(target, sources) # :yields: _result - group = Array.new(sources.length, nil) - sources.each_with_index do |source, index| - batch_map = Hash.new(0) - batch_length = if source.kind_of?(Support::Batchable) - source.batch.each_with_index {|obj, i| batch_map[obj] = i } - source.batch.length - else - 1 - end - group[index] = Array.new(batch_length, nil) - - source.on_complete do |_result| - batch_index = batch_map[_result._current_source] - - if group[index][batch_index] != nil - raise "sync_merge collision... already got a result for #{_result._current_source}" - end - - group[index][batch_index] = _result - - unless group.flatten.include?(nil) - Support::Combinator.new(*group).each do |*combination| - # merge the source audits - _group_result = Support::Audit.merge(*combination) - - yield(_group_result) if block_given? - target.enq(_group_result) - end - - # reset the group array - group.collect! {|i| nil } - end - end - end - end - - # Sets a choice workflow pattern for the source task. When the - # source task completes, switch yields the audited result to the - # block which then returns the index of the target to enque with - # the results. No target will be enqued if the index is false or - # nil; an error is raised if no target can be found for the - # specified index. - # - # Notes: - # - Batched tasks will have the pattern set for each task in the batch - # - The current audited results are yielded to the block, if given, - # before the next task is enqued. - # - Executables may provided as well as tasks. - def switch(source, targets) # :yields: _result - source.on_complete do |_result| - if index = yield(_result) - unless target = targets[index] - raise "no switch target for index: #{index}" - end - - enq(target, _result) - else - aggregator.store(_result) - end - end - end - # Returns all aggregated, audited results for the specified tasks. # Results are joined into a single array. Arrays of tasks are # allowed as inputs. See results. def _results(*tasks) aggregator.retrieve_all(*tasks.flatten) @@ -527,9 +395,13 @@ # app.results(t1, t2.batch) # => [1, 11, 11] # app.results(t2, t1) # => [11, 1] # def results(*tasks) _results(tasks).collect {|_result| _result._current} + end + + def inspect + "#<#{self.class.to_s}:#{object_id} root: #{root} >" end protected # A hook for handling unknown configurations in subclasses, called from \ No newline at end of file