lib/tap/app.rb in bahuvrihi-tap-0.10.7 vs lib/tap/app.rb in bahuvrihi-tap-0.10.8

- old
+ new

@@ -2,13 +2,10 @@ require 'tap/support/aggregator' require 'tap/support/dependencies' require 'tap/support/executable_queue' module Tap - module Support - autoload(:Combinator, 'tap/support/combinator') - end # App coordinates the setup and running of tasks, and provides an interface # to the application directory structure. App is convenient for use within # scripts and, with Env, provides the basis for the 'tap' command line # application. @@ -17,27 +14,26 @@ # # All tasks have an App (by default App.instance) through which tasks access # access application-wide resources like the logger. Additionally, task # enque command are forwarded to App#enq: # - # t1 = Task.new {|task, input| input += 1 } + # t1 = Task.intern {|task, input| input += 1 } # t1.enq(0) # app.enq(t1, 1) # # app.run # app.results(t1) # => [1, 2] # - # When a task completes, the results will either be passed to the task - # <tt>on_complete</tt> block (if set) or be collected into an Aggregator; - # aggregated results may be accessed per-task, as shown above. Task - # <tt>on_complete</tt> blocks typically enque other tasks, allowing the - # construction of imperative workflows: + # When a task completes, the results will be passed to the task on_complete + # block, if set, or be collected into an Aggregator (aggregated results may + # be accessed per-task, as shown above); on_complete blocks typically enque + # other tasks, allowing the construction of imperative workflows: # # # clear the previous results # app.aggregator.clear # - # t2 = Task.new {|task, input| input += 10 } + # t2 = Task.intern {|task, input| input += 10 } # t1.on_complete {|_result| t2.enq(_result) } # # t1.enq 0 # t1.enq 10 # @@ -50,51 +46,47 @@ # # ==== Dependencies # # Tasks allow the construction of dependency-based workflows as well; tasks # may be set to depend on other tasks such that the dependent task only - # executes after the dependencies have been resolved (ie executed with a - # given set of inputs). + # executes after the dependencies have been resolved. # # array = [] - # t1 = Task.new {|task, *inputs| array << inputs } - # t2 = Task.new {|task, *inputs| array << inputs } + # t1 = Task.intern {|task, *inputs| array << inputs } + # t2 = Task.intern {|task| array << self } # - # t1.depends_on(t2,1,2,3) + # t1.depends_on(t2) # t1.enq(4,5,6) # # app.run - # array # => [[1,2,3], [4,5,6]] + # array # => [t2, [4,5,6]] # # Once a dependency is resolved, it will not execute again: # # t1.enq(7,8) # app.run - # array # => [[1,2,3], [4,5,6], [7,8]] + # array # => [t2, [4,5,6], [7,8]] # # ==== Batching # # Tasks can be batched, allowing the same input to be enqued to multiple # tasks at once. # - # t1 = Task.new {|task, input| input += 1 } - # t2 = Task.new {|task, input| input += 10 } + # t1 = Task.intern {|task, input| input += 1 } + # t2 = Task.intern {|task, input| input += 10 } # # t1.batch_with(t2) # t1.enq 0 # # app.run # app.results(t1) # => [1] # app.results(t2) # => [10] # # ==== Executables # - # App can use any Executable object in place of a task. One way to initialize - # an Executable for a method is to use the Object#_method defined by Tap. The - # result can be enqued and incorporated into workflows, but they cannot be - # batched. - # + # App can enque and run any Executable object. One way to initialize an + # Executable for a method is to use the Object#_method added by Tap. # The mq (method enq) method generates and enques the method in one step. # # array = [] # m = array._method(:push) # @@ -105,21 +97,19 @@ # app.run # array # => [1, 2] # # === Auditing # - # All results generated by executable methods are audited to track how a given - # input evolves during a workflow. - # + # All results are audited to track how a given input evolves during a workflow. # To illustrate auditing, consider a workflow that uses the 'add_one' method # to add one to an input until the result is 3, then adds five more with the # 'add_five' method. The final result should always be 8. # - # t1 = Tap::Task.new {|task, input| input += 1 } + # t1 = Tap::Task.intern {|task, input| input += 1 } # t1.name = "add_one" # - # t2 = Tap::Task.new {|task, input| input += 5 } + # t2 = Tap::Task.intern {|task, input| input += 5 } # t2.name = "add_five" # # t1.on_complete do |_result| # # _result is the audit; use the _current method # # to get the current value in the audit trail @@ -187,13 +177,14 @@ # The state of the application (see App::State) attr_reader :state # A Tap::Support::Aggregator to collect the results of - # methods that have no <tt>on_complete</tt> block + # methods that have no on_complete block attr_reader :aggregator + # A Tap::Support::Dependencies to track dependencies. attr_reader :dependencies config :debug, false, &c.flag # Flag debugging config :force, false, &c.flag # Force execution at checkpoints config :quiet, false, &c.flag # Suppress logging @@ -268,49 +259,50 @@ # # Execution methods # - # Executes the input Executable with the inputs. Stores the result in - # aggregator unless an on_complete block is set. Returns the audited - # result. - def execute(m, inputs) - m._execute(*inputs) - end - # Sets state = State::READY unless the app is running. Returns self. def ready - self.state = State::READY unless self.state == State::RUN + @state = State::READY unless state == State::RUN self end - # Sequentially calls execute with the Executable methods and inputs in - # queue; run continues until the queue is empty and then returns self. - # Calls to run when already running will return immediately. + # Sequentially calls execute with the [executable, inputs] pairs in + # queue; run continues until the queue is empty and then returns self. # - # Run checks the state of self before executing a method. If the state is - # changed to State::STOP, then no more methods will be executed; currently - # running methods will continute to completion. If the state is changed to - # State::TERMINATE then no more methods will be executed and currently - # running methods will be discontinued as described in terminate. + # ==== Run State + # + # Run checks the state of self before executing a method. If state + # changes from State::RUN, the following behaviors result: + # + # State::STOP:: No more executables will be executed; the current + # executable will continute to completion. + # State::TERMINATE:: No more executables will be executed and the + # currently running executable will be + # discontinued as described in terminate. + # + # Calls to run when the state is not State::READY do nothing and + # return immediately. def run return self unless state == State::READY - self.state = State::RUN + @state = State::RUN # TODO: log starting run begin until queue.empty? || state != State::RUN - execute(*queue.deq) + executable, inputs = queue.deq + executable._execute(*inputs) end rescue(TerminateError) # gracefully fail for termination errors rescue(Exception) # handle other errors accordingly raise if debug? log($!.class, $!.message) ensure - self.state = State::READY + @state = State::READY end # TODO: log run complete self end @@ -319,35 +311,30 @@ # queue by setting state = State::STOP. The task currently # executing will continue uninterrupted to completion. # # Does nothing unless state is State::RUN. def stop - self.state = State::STOP if self.state == State::RUN + @state = State::STOP if state == State::RUN self end # Signals a running application to terminate execution by setting # state = State::TERMINATE. In this state, an executing task # will then raise a TerminateError upon check_terminate, thus # allowing the invocation of task-specific termination, perhaps - # performing rollbacks. (see Tap::Task#check_terminate). + # performing rollbacks. (see Tap::Support::Executable#check_terminate). # # Does nothing if state == State::READY. def terminate - self.state = State::TERMINATE unless self.state == State::READY + @state = State::TERMINATE unless state == State::READY self end # Returns an information string for the App. # # App.instance.info # => 'state: 0 (READY) queue: 0 results: 0' # - # Provided information: - # - # state:: the integer and string values of self.state - # queue:: the number of methods currently in the queue - # results:: the total number of results in aggregator def info "state: #{state} (#{State.state_str(state)}) queue: #{queue.size} results: #{aggregator.size}" end # Enques the task with the inputs. If the task is batched, then each @@ -382,12 +369,12 @@ end # Returns all aggregated results for the specified tasks. Results are # joined into a single array. Arrays of tasks are allowed as inputs. # - # t1 = Task.new {|task, input| input += 1 } - # t2 = Task.new {|task, input| input += 10 } + # t1 = Task.intern {|task, input| input += 1 } + # t2 = Task.intern {|task, input| input += 10 } # t3 = t2.initialize_batch_obj # # t1.enq(0) # t2.enq(1) # @@ -401,21 +388,9 @@ def inspect "#<#{self.class.to_s}:#{object_id} root: #{root} >" end - protected - - # A hook for handling unknown configurations in subclasses, called from - # configure. If handle_configuration evaluates to false, then configure - # raises an error. - def handle_configuation(key, value) - false - end - - # Sets the state of the application - attr_writer :state - # TerminateErrors are raised to kill executing tasks when terminate is # called on an running App. They are handled by the run rescue code. class TerminateError < RuntimeError end end \ No newline at end of file