lib/tap/app.rb in bahuvrihi-tap-0.10.6 vs lib/tap/app.rb in bahuvrihi-tap-0.10.7
- old
+ new
@@ -1,7 +1,8 @@
require 'logger'
require 'tap/support/aggregator'
+require 'tap/support/dependencies'
require 'tap/support/executable_queue'
module Tap
module Support
autoload(:Combinator, 'tap/support/combinator')
@@ -75,12 +76,12 @@
# Tasks can be batched, allowing the same input to be enqued to multiple
# tasks at once.
#
# t1 = Task.new {|task, input| input += 1 }
# t2 = Task.new {|task, input| input += 10 }
- # Task.batch(t1, t2) # => [t1, t2]
#
+ # t1.batch_with(t2)
# t1.enq 0
#
# app.run
# app.results(t1) # => [1]
# app.results(t2) # => [10]
@@ -189,10 +190,12 @@
# A Tap::Support::Aggregator to collect the results of
# methods that have no <tt>on_complete</tt> block
attr_reader :aggregator
+ attr_reader :dependencies
+
config :debug, false, &c.flag # Flag debugging
config :force, false, &c.flag # Force execution at checkpoints
config :quiet, false, &c.flag # Suppress logging
config :verbose, false, &c.flag # Enables extra logging (overrides quiet)
@@ -220,10 +223,11 @@
super()
@state = State::READY
@queue = Support::ExecutableQueue.new
@aggregator = Support::Aggregator.new
+ @dependencies = Support::Dependencies.new
initialize_config(config)
self.logger = logger
end
@@ -268,13 +272,11 @@
# Executes the input Executable with the inputs. Stores the result in
# aggregator unless an on_complete block is set. Returns the audited
# result.
def execute(m, inputs)
- _result = m._execute(*inputs)
- aggregator.store(_result) unless m.on_complete_block
- _result
+ m._execute(*inputs)
end
# Sets state = State::READY unless the app is running. Returns self.
def ready
self.state = State::READY unless self.state == State::RUN
@@ -369,145 +371,11 @@
# Returns the enqued method.
def mq(object, method_name, *inputs)
m = object._method(method_name)
enq(m, *inputs)
end
-
- # Sets a sequence workflow pattern for the tasks; each task will enque
- # the next task with it's results.
- #
- # Notes:
- # - Batched tasks will have the pattern set for each task in the batch
- # - The current audited results are yielded to the block, if given,
- # before the next task is enqued.
- # - Executables may provided as well as tasks.
- def sequence(tasks) # :yields: _result
- current_task = tasks.shift
- tasks.each do |next_task|
- # simply pass results from one task to the next.
- current_task.on_complete do |_result|
- yield(_result) if block_given?
- enq(next_task, _result)
- end
- current_task = next_task
- end
- end
-
- # Sets a fork workflow pattern for the source task; each target
- # will enque the results of source.
- #
- # Notes:
- # - Batched tasks will have the pattern set for each task in the batch
- # - The current audited results are yielded to the block, if given,
- # before the next task is enqued.
- # - Executables may provided as well as tasks.
- def fork(source, targets) # :yields: _result
- source.on_complete do |_result|
- targets.each do |target|
- yield(_result) if block_given?
- enq(target, _result)
- end
- end
- end
-
- # Sets a simple merge workflow pattern for the source tasks. Each source
- # enques target with it's result; no synchronization occurs, nor are
- # results grouped before being sent to the target.
- #
- # Notes:
- # - Batched tasks will have the pattern set for each task in the batch
- # - The current audited results are yielded to the block, if given,
- # before the next task is enqued.
- # - Executables may provided as well as tasks.
- def merge(target, sources) # :yields: _result
- sources.each do |source|
- # merging can use the existing audit trails... each distinct
- # input is getting sent to one place (the target)
- source.on_complete do |_result|
- yield(_result) if block_given?
- enq(target, _result)
- end
- end
- end
-
- # Sets a synchronized merge workflow for the source tasks. Results from
- # each source task are collected and enqued as a single group to the target.
- # The target is not enqued until all sources have completed. Raises an
- # error if a source returns twice before the target is enqued.
- #
- # Notes:
- # - Batched tasks will have the pattern set for each task in the batch
- # - The current audited results are yielded to the block, if given,
- # before the next task is enqued.
- # - Executables may provided as well as tasks.
- #
- #-- TODO: add notes on testing and the way results are received
- # (ie as a single object)
- def sync_merge(target, sources) # :yields: _result
- group = Array.new(sources.length, nil)
- sources.each_with_index do |source, index|
- batch_map = Hash.new(0)
- batch_length = if source.kind_of?(Support::Batchable)
- source.batch.each_with_index {|obj, i| batch_map[obj] = i }
- source.batch.length
- else
- 1
- end
- group[index] = Array.new(batch_length, nil)
-
- source.on_complete do |_result|
- batch_index = batch_map[_result._current_source]
-
- if group[index][batch_index] != nil
- raise "sync_merge collision... already got a result for #{_result._current_source}"
- end
-
- group[index][batch_index] = _result
-
- unless group.flatten.include?(nil)
- Support::Combinator.new(*group).each do |*combination|
- # merge the source audits
- _group_result = Support::Audit.merge(*combination)
-
- yield(_group_result) if block_given?
- target.enq(_group_result)
- end
-
- # reset the group array
- group.collect! {|i| nil }
- end
- end
- end
- end
-
- # Sets a choice workflow pattern for the source task. When the
- # source task completes, switch yields the audited result to the
- # block which then returns the index of the target to enque with
- # the results. No target will be enqued if the index is false or
- # nil; an error is raised if no target can be found for the
- # specified index.
- #
- # Notes:
- # - Batched tasks will have the pattern set for each task in the batch
- # - The current audited results are yielded to the block, if given,
- # before the next task is enqued.
- # - Executables may provided as well as tasks.
- def switch(source, targets) # :yields: _result
- source.on_complete do |_result|
- if index = yield(_result)
- unless target = targets[index]
- raise "no switch target for index: #{index}"
- end
-
- enq(target, _result)
- else
- aggregator.store(_result)
- end
- end
- end
-
# Returns all aggregated, audited results for the specified tasks.
# Results are joined into a single array. Arrays of tasks are
# allowed as inputs. See results.
def _results(*tasks)
aggregator.retrieve_all(*tasks.flatten)
@@ -527,9 +395,13 @@
# app.results(t1, t2.batch) # => [1, 11, 11]
# app.results(t2, t1) # => [11, 1]
#
def results(*tasks)
_results(tasks).collect {|_result| _result._current}
+ end
+
+ def inspect
+ "#<#{self.class.to_s}:#{object_id} root: #{root} >"
end
protected
# A hook for handling unknown configurations in subclasses, called from
\ No newline at end of file