lib/tap/app.rb in bahuvrihi-tap-0.10.7 vs lib/tap/app.rb in bahuvrihi-tap-0.10.8
- old
+ new
@@ -2,13 +2,10 @@
require 'tap/support/aggregator'
require 'tap/support/dependencies'
require 'tap/support/executable_queue'
module Tap
- module Support
- autoload(:Combinator, 'tap/support/combinator')
- end
# App coordinates the setup and running of tasks, and provides an interface
# to the application directory structure. App is convenient for use within
# scripts and, with Env, provides the basis for the 'tap' command line
# application.
@@ -17,27 +14,26 @@
#
# All tasks have an App (by default App.instance) through which tasks access
# access application-wide resources like the logger. Additionally, task
# enque command are forwarded to App#enq:
#
- # t1 = Task.new {|task, input| input += 1 }
+ # t1 = Task.intern {|task, input| input += 1 }
# t1.enq(0)
# app.enq(t1, 1)
#
# app.run
# app.results(t1) # => [1, 2]
#
- # When a task completes, the results will either be passed to the task
- # <tt>on_complete</tt> block (if set) or be collected into an Aggregator;
- # aggregated results may be accessed per-task, as shown above. Task
- # <tt>on_complete</tt> blocks typically enque other tasks, allowing the
- # construction of imperative workflows:
+ # When a task completes, the results will be passed to the task on_complete
+ # block, if set, or be collected into an Aggregator (aggregated results may
+ # be accessed per-task, as shown above); on_complete blocks typically enque
+ # other tasks, allowing the construction of imperative workflows:
#
# # clear the previous results
# app.aggregator.clear
#
- # t2 = Task.new {|task, input| input += 10 }
+ # t2 = Task.intern {|task, input| input += 10 }
# t1.on_complete {|_result| t2.enq(_result) }
#
# t1.enq 0
# t1.enq 10
#
@@ -50,51 +46,47 @@
#
# ==== Dependencies
#
# Tasks allow the construction of dependency-based workflows as well; tasks
# may be set to depend on other tasks such that the dependent task only
- # executes after the dependencies have been resolved (ie executed with a
- # given set of inputs).
+ # executes after the dependencies have been resolved.
#
# array = []
- # t1 = Task.new {|task, *inputs| array << inputs }
- # t2 = Task.new {|task, *inputs| array << inputs }
+ # t1 = Task.intern {|task, *inputs| array << inputs }
+ # t2 = Task.intern {|task| array << self }
#
- # t1.depends_on(t2,1,2,3)
+ # t1.depends_on(t2)
# t1.enq(4,5,6)
#
# app.run
- # array # => [[1,2,3], [4,5,6]]
+ # array # => [t2, [4,5,6]]
#
# Once a dependency is resolved, it will not execute again:
#
# t1.enq(7,8)
# app.run
- # array # => [[1,2,3], [4,5,6], [7,8]]
+ # array # => [t2, [4,5,6], [7,8]]
#
# ==== Batching
#
# Tasks can be batched, allowing the same input to be enqued to multiple
# tasks at once.
#
- # t1 = Task.new {|task, input| input += 1 }
- # t2 = Task.new {|task, input| input += 10 }
+ # t1 = Task.intern {|task, input| input += 1 }
+ # t2 = Task.intern {|task, input| input += 10 }
#
# t1.batch_with(t2)
# t1.enq 0
#
# app.run
# app.results(t1) # => [1]
# app.results(t2) # => [10]
#
# ==== Executables
#
- # App can use any Executable object in place of a task. One way to initialize
- # an Executable for a method is to use the Object#_method defined by Tap. The
- # result can be enqued and incorporated into workflows, but they cannot be
- # batched.
- #
+ # App can enque and run any Executable object. One way to initialize an
+ # Executable for a method is to use the Object#_method added by Tap.
# The mq (method enq) method generates and enques the method in one step.
#
# array = []
# m = array._method(:push)
#
@@ -105,21 +97,19 @@
# app.run
# array # => [1, 2]
#
# === Auditing
#
- # All results generated by executable methods are audited to track how a given
- # input evolves during a workflow.
- #
+ # All results are audited to track how a given input evolves during a workflow.
# To illustrate auditing, consider a workflow that uses the 'add_one' method
# to add one to an input until the result is 3, then adds five more with the
# 'add_five' method. The final result should always be 8.
#
- # t1 = Tap::Task.new {|task, input| input += 1 }
+ # t1 = Tap::Task.intern {|task, input| input += 1 }
# t1.name = "add_one"
#
- # t2 = Tap::Task.new {|task, input| input += 5 }
+ # t2 = Tap::Task.intern {|task, input| input += 5 }
# t2.name = "add_five"
#
# t1.on_complete do |_result|
# # _result is the audit; use the _current method
# # to get the current value in the audit trail
@@ -187,13 +177,14 @@
# The state of the application (see App::State)
attr_reader :state
# A Tap::Support::Aggregator to collect the results of
- # methods that have no <tt>on_complete</tt> block
+ # methods that have no on_complete block
attr_reader :aggregator
+ # A Tap::Support::Dependencies to track dependencies.
attr_reader :dependencies
config :debug, false, &c.flag # Flag debugging
config :force, false, &c.flag # Force execution at checkpoints
config :quiet, false, &c.flag # Suppress logging
@@ -268,49 +259,50 @@
#
# Execution methods
#
- # Executes the input Executable with the inputs. Stores the result in
- # aggregator unless an on_complete block is set. Returns the audited
- # result.
- def execute(m, inputs)
- m._execute(*inputs)
- end
-
# Sets state = State::READY unless the app is running. Returns self.
def ready
- self.state = State::READY unless self.state == State::RUN
+ @state = State::READY unless state == State::RUN
self
end
- # Sequentially calls execute with the Executable methods and inputs in
- # queue; run continues until the queue is empty and then returns self.
- # Calls to run when already running will return immediately.
+ # Sequentially calls execute with the [executable, inputs] pairs in
+ # queue; run continues until the queue is empty and then returns self.
#
- # Run checks the state of self before executing a method. If the state is
- # changed to State::STOP, then no more methods will be executed; currently
- # running methods will continute to completion. If the state is changed to
- # State::TERMINATE then no more methods will be executed and currently
- # running methods will be discontinued as described in terminate.
+ # ==== Run State
+ #
+ # Run checks the state of self before executing a method. If state
+ # changes from State::RUN, the following behaviors result:
+ #
+ # State::STOP:: No more executables will be executed; the current
+ # executable will continute to completion.
+ # State::TERMINATE:: No more executables will be executed and the
+ # currently running executable will be
+ # discontinued as described in terminate.
+ #
+ # Calls to run when the state is not State::READY do nothing and
+ # return immediately.
def run
return self unless state == State::READY
- self.state = State::RUN
+ @state = State::RUN
# TODO: log starting run
begin
until queue.empty? || state != State::RUN
- execute(*queue.deq)
+ executable, inputs = queue.deq
+ executable._execute(*inputs)
end
rescue(TerminateError)
# gracefully fail for termination errors
rescue(Exception)
# handle other errors accordingly
raise if debug?
log($!.class, $!.message)
ensure
- self.state = State::READY
+ @state = State::READY
end
# TODO: log run complete
self
end
@@ -319,35 +311,30 @@
# queue by setting state = State::STOP. The task currently
# executing will continue uninterrupted to completion.
#
# Does nothing unless state is State::RUN.
def stop
- self.state = State::STOP if self.state == State::RUN
+ @state = State::STOP if state == State::RUN
self
end
# Signals a running application to terminate execution by setting
# state = State::TERMINATE. In this state, an executing task
# will then raise a TerminateError upon check_terminate, thus
# allowing the invocation of task-specific termination, perhaps
- # performing rollbacks. (see Tap::Task#check_terminate).
+ # performing rollbacks. (see Tap::Support::Executable#check_terminate).
#
# Does nothing if state == State::READY.
def terminate
- self.state = State::TERMINATE unless self.state == State::READY
+ @state = State::TERMINATE unless state == State::READY
self
end
# Returns an information string for the App.
#
# App.instance.info # => 'state: 0 (READY) queue: 0 results: 0'
#
- # Provided information:
- #
- # state:: the integer and string values of self.state
- # queue:: the number of methods currently in the queue
- # results:: the total number of results in aggregator
def info
"state: #{state} (#{State.state_str(state)}) queue: #{queue.size} results: #{aggregator.size}"
end
# Enques the task with the inputs. If the task is batched, then each
@@ -382,12 +369,12 @@
end
# Returns all aggregated results for the specified tasks. Results are
# joined into a single array. Arrays of tasks are allowed as inputs.
#
- # t1 = Task.new {|task, input| input += 1 }
- # t2 = Task.new {|task, input| input += 10 }
+ # t1 = Task.intern {|task, input| input += 1 }
+ # t2 = Task.intern {|task, input| input += 10 }
# t3 = t2.initialize_batch_obj
#
# t1.enq(0)
# t2.enq(1)
#
@@ -401,21 +388,9 @@
def inspect
"#<#{self.class.to_s}:#{object_id} root: #{root} >"
end
- protected
-
- # A hook for handling unknown configurations in subclasses, called from
- # configure. If handle_configuration evaluates to false, then configure
- # raises an error.
- def handle_configuation(key, value)
- false
- end
-
- # Sets the state of the application
- attr_writer :state
-
# TerminateErrors are raised to kill executing tasks when terminate is
# called on an running App. They are handled by the run rescue code.
class TerminateError < RuntimeError
end
end
\ No newline at end of file