lib/tap/app.rb in bahuvrihi-tap-0.10.3 vs lib/tap/app.rb in bahuvrihi-tap-0.10.4
- old
+ new
@@ -1,11 +1,13 @@
require 'logger'
-require 'tap/support/run_error'
require 'tap/support/aggregator'
require 'tap/support/executable_queue'
module Tap
+ module Support
+ autoload(:Combinator, 'tap/support/combinator')
+ end
# App coordinates the setup and running of tasks, and provides an interface
# to the application directory structure. App is convenient for use within
# scripts and, with Env, provides the basis for the 'tap' command line
# application.
@@ -25,11 +27,11 @@
#
# When a task completes, the results will either be passed to the task
# <tt>on_complete</tt> block (if set) or be collected into an Aggregator;
# aggregated results may be accessed per-task, as shown above. Task
# <tt>on_complete</tt> blocks typically enque other tasks, allowing the
- # construction of workflows:
+ # construction of imperative workflows:
#
# # clear the previous results
# app.aggregator.clear
#
# t2 = Task.new {|task, input| input += 10 }
@@ -43,10 +45,33 @@
# app.results(t2) # => [11, 21]
#
# Here t1 has no results because the on_complete block passed them to t2 in
# a simple sequence.
#
+ # ==== Dependencies
+ #
+ # Tasks allow the construction of dependency-based workflows as well; tasks
+ # may be set to depend on other tasks such that the dependent task only
+ # executes after the dependencies have been resolved (ie executed with a
+ # given set of inputs).
+ #
+ # array = []
+ # t1 = Task.new {|task, *inputs| array << inputs }
+ # t2 = Task.new {|task, *inputs| array << inputs }
+ #
+ # t1.depends_on(t2,1,2,3)
+ # t1.enq(4,5,6)
+ #
+ # app.run
+ # array # => [[1,2,3], [4,5,6]]
+ #
+ # Once a dependency is resolved, it will not execute again:
+ #
+ # t1.enq(7,8)
+ # app.run
+ # array # => [[1,2,3], [4,5,6], [7,8]]
+ #
# ==== Batching
#
# Tasks can be batched, allowing the same input to be enqued to multiple
# tasks at once.
#
@@ -58,35 +83,10 @@
#
# app.run
# app.results(t1) # => [1]
# app.results(t2) # => [10]
#
- # ==== Multithreading
- #
- # App supports multithreading; multithreaded tasks execute cosynchronously,
- # each on their own thread.
- #
- # lock = Mutex.new
- # array = []
- # t1 = Task.new {|task| lock.synchronize { array << Thread.current.object_id }; sleep 0.1 }
- # t2 = Task.new {|task| lock.synchronize { array << Thread.current.object_id }; sleep 0.1 }
- #
- # t1.multithread = true
- # t1.enq
- # t2.multithread = true
- # t2.enq
- #
- # app.run
- # array.length # => 2
- # array[0] == array[1] # => false
- #
- # Naturally, it is up to you to make sure each task is thread safe. Note
- # that for the most part Tap::App is NOT thread safe; only run and
- # run-related methods (ready, stop, terminate, info) are synchronized.
- # Methods enq and results act on thread-safe objects ExecutableQueue and
- # Aggregator, and should be ok to use from multiple threads.
- #
# ==== Executables
#
# App can use any Executable object in place of a task. One way to initialize
# an Executable for a method is to use the Object#_method defined by Tap. The
# result can be enqued and incorporated into workflows, but they cannot be
@@ -165,12 +165,10 @@
# o-[add_one] 3
# o-[add_five] 8
#
# See Tap::Support::Audit for more details.
class App < Root
- include MonitorMixin
-
class << self
# Sets the current app instance
attr_writer :instance
# Returns the current instance of App. If no instance has been set,
@@ -191,15 +189,15 @@
# A Tap::Support::Aggregator to collect the results of
# methods that have no <tt>on_complete</tt> block
attr_reader :aggregator
- config :max_threads, 10, &c.integer # For multithread execution
config :debug, false, &c.flag # Flag debugging
config :force, false, &c.flag # Force execution at checkpoints
config :quiet, false, &c.flag # Suppress logging
-
+ config :verbose, false, &c.flag # Enables extra logging (overrides quiet)
+
# The constants defining the possible App states.
module State
READY = 0
RUN = 1
STOP = 2
@@ -220,22 +218,19 @@
# Creates a new App with the given configuration.
def initialize(config={}, logger=DEFAULT_LOGGER)
super()
@state = State::READY
- @threads = [].extend(MonitorMixin)
- @thread_queue = nil
- @run_thread = nil
-
@queue = Support::ExecutableQueue.new
@aggregator = Support::Aggregator.new
initialize_config(config)
self.logger = logger
end
- DEFAULT_LOGGER = Logger.new(STDOUT)
+ # The default App logger writes to $stdout at level INFO.
+ DEFAULT_LOGGER = Logger.new($stdout)
DEFAULT_LOGGER.level = Logger::INFO
DEFAULT_LOGGER.formatter = lambda do |severity, time, progname, msg|
" %s[%s] %18s %s\n" % [severity[0,1], time.strftime('%H:%M:%S') , progname || '--' , msg]
end
@@ -255,11 +250,11 @@
end
# Logs the action and message at the input level (default INFO).
# Logging is suppressed if quiet is true.
def log(action, msg="", level=Logger::INFO)
- logger.add(level, msg, action.to_s) unless quiet
+ logger.add(level, msg, action.to_s) if !quiet || verbose
end
# Returns the configuration filepath for the specified task name,
# File.join(app['config'], task_name + ".yml"). Returns nil if
# task_name is nil.
@@ -278,229 +273,96 @@
_result = m._execute(*inputs)
aggregator.store(_result) unless m.on_complete_block
_result
end
- # Sets state = State::READY unless the app has a run_thread
- # (ie the app is running). Returns self.
+ # Sets state = State::READY unless the app is running. Returns self.
def ready
- synchronize do
- self.state = State::READY if self.run_thread == nil
- self
- end
+ self.state = State::READY unless self.state == State::RUN
+ self
end
- # Sequentially executes the methods (ie Executable objects) in queue; run
- # continues until the queue is empty and then returns self. An app can
- # only run on one thread at a time. If run is called when already running,
- # run returns immediately.
+ # Sequentially calls execute with the Executable methods and inputs in
+ # queue; run continues until the queue is empty and then returns self.
+ # Calls to run when already running will return immediately.
#
- # === The Run Cycle
- # Run can execute methods in sequential or multithreaded mode. In sequential
- # mode, run executes enqued methods in order and on the current thread. Run
- # continues until it reaches a method marked with multithread = true, at which
- # point run switches into multithreading mode.
- #
- # When multithreading, run shifts methods off of the queue and executes each
- # on their own thread (launching up to max_threads threads at one time).
- # Multithread execution continues until run reaches a non-multithread method,
- # at which point run blocks, waits for the threads to complete, and switches
- # back into sequential mode.
- #
- # Run never executes multithreaded and non-multithreaded methods at the same
- # time.
- #
- # ==== Checks
# Run checks the state of self before executing a method. If the state is
# changed to State::STOP, then no more methods will be executed; currently
# running methods will continute to completion. If the state is changed to
- # State::TERMINATE then no more methods will be executed and currently running
- # methods will be discontinued as described below.
- #
- # ==== Error Handling and Termination
- # When unhandled errors arise during run, run enters a termination routine.
- # During termination a TerminationError is raised in each executing method so
- # that the method exits or begins executing its internal error handling code
- # (perhaps performing rollbacks).
- #
- # The TerminationError can ONLY be raised by the method itself, usually via a
- # call to Tap::Support::Framework#check_terminate. <tt>check_terminate</tt>
- # is available to all Framework objects (ex Task and Workflow), but not to
- # Executable methods generated by _method. These methods need to check the
- # state of app themselves; otherwise they will continue on to completion even
- # when app is in State::TERMINATE.
- #
- # # this task will loop until app.terminate
- # Task.new {|task| while(true) task.check_terminate end }
- #
- # # this task will NEVER terminate
- # Task.new {|task| while(true) end; task.check_terminate }
- #
- # Additional errors that arise during termination are collected and packaged
- # with the orignal error into a RunError. By default all errors are logged
- # and run exits. If debug? is true, then the RunError will be raised for
- # further handling.
- #
- # Note: the method that caused the original unhandled error is no longer
- # executing when termination begins and thus will not recieve a
- # TerminationError.
+ # State::TERMINATE then no more methods will be executed and currently
+ # running methods will be discontinued as described in terminate.
def run
- synchronize do
- return self unless self.ready.state == State::READY
+ return self unless state == State::READY
+ self.state = State::RUN
- self.run_thread = Thread.current
- self.state = State::RUN
- end
-
- # generate threading variables
- self.thread_queue = max_threads > 0 ? Queue.new : nil
-
# TODO: log starting run
- begin
- execution_loop do
- break if block_given? && yield(self)
-
- # if no tasks were in the queue
- # then clear the threads and
- # check for tasks again
- if queue.empty?
- clear_threads
- # break -- no executable task was found
- break if queue.empty?
- end
-
- m, inputs = queue.deq
-
- if thread_queue && m.multithread
- # TODO: log enqueuing task to thread
-
- # generate threads as needed and allowed
- # to execute the threads in the thread queue
- start_thread if threads.size < max_threads
-
- # NOTE: the producer-consumer relationship of execution
- # threads and the thread_queue means that tasks will sit
- # waiting until an execution thread opens up. in the most
- # extreme case all executing tasks and all tasks in the
- # task_queue could be the same task, each with different
- # inputs. this deviates from the idea of batch processing,
- # but should be rare and not at all fatal given execute
- # synchronization.
- thread_queue.enq [m, inputs]
-
- else
- # TODO: log execute task
-
- # wait for threads to complete
- # before executing the main thread
- clear_threads
- execute(m, inputs)
- end
+ begin
+ until queue.empty? || state != State::RUN
+ execute(*queue.deq)
end
-
- # if the run loop exited due to a STOP state,
- # tasks may still be in the thread queue and/or
- # running. be sure these are cleared
- clear_thread_queue
- clear_threads
-
- rescue
- # when an error is generated, be sure to terminate
- # all threads so they can clean up after themselves.
- # clear the thread queue first so no more tasks are
- # executed. collect any errors that arise during
- # termination.
- clear_thread_queue
- errors = [$!] + clear_threads(false)
- errors.delete_if {|error| error.kind_of?(TerminateError) }
-
- # handle the errors accordingly
- case
- when debug?
- raise Tap::Support::RunError.new(errors)
- else
- errors.each_with_index do |err, index|
- log("RunError [#{index}] #{err.class}", err.message)
- end
- end
+ rescue(TerminateError)
+ # gracefully fail for termination errors
+ rescue(Exception)
+ # handle other errors accordingly
+ raise if debug?
+ log($!.class, $!.message)
ensure
-
- # reset run variables
- self.thread_queue = nil
-
- synchronize do
- self.run_thread = nil
- self.state = State::READY
- end
+ self.state = State::READY
end
# TODO: log run complete
self
end
# Signals a running application to stop executing tasks in the
- # queue by setting state = State::STOP. Currently executing
- # tasks will continue their execution uninterrupted.
+ # queue by setting state = State::STOP. The task currently
+ # executing will continue uninterrupted to completion.
#
# Does nothing unless state is State::RUN.
def stop
- synchronize do
- self.state = State::STOP if self.state == State::RUN
- self
- end
+ self.state = State::STOP if self.state == State::RUN
+ self
end
- # Signals a running application to terminate executing tasks
- # by setting state = State::TERMINATE. When running tasks
- # reach a termination check, the task raises a TerminationError,
- # thus allowing executing tasks to invoke their specific
- # error handling code, perhaps performing rollbacks.
+ # Signals a running application to terminate execution by setting
+ # state = State::TERMINATE. In this state, an executing task
+ # will then raise a TerminateError upon check_terminate, thus
+ # allowing the invocation of task-specific termination, perhaps
+ # performing rollbacks. (see Tap::Task#check_terminate).
#
- # Termination checks can be manually specified in a task
- # using the check_terminate method (see Tap::Task#check_terminate).
- # Termination checks automatically occur before each task execution.
- #
# Does nothing if state == State::READY.
def terminate
- synchronize do
- self.state = State::TERMINATE unless self.state == State::READY
- self
- end
+ self.state = State::TERMINATE unless self.state == State::READY
+ self
end
# Returns an information string for the App.
#
- # App.instance.info # => 'state: 0 (READY) queue: 0 thread_queue: 0 threads: 0 results: 0'
+ # App.instance.info # => 'state: 0 (READY) queue: 0 results: 0'
#
# Provided information:
#
# state:: the integer and string values of self.state
# queue:: the number of methods currently in the queue
- # thread_queue:: number of objects in the thread queue, waiting
- # to be run on an execution thread (methods, and
- # perhaps nils to signal threads to clear)
- # threads:: the number of execution threads
# results:: the total number of results in aggregator
def info
- synchronize do
- "state: #{state} (#{State.state_str(state)}) queue: #{queue.size} thread_queue: #{thread_queue ? thread_queue.size : 0} threads: #{threads.size} results: #{aggregator.size}"
- end
+ "state: #{state} (#{State.state_str(state)}) queue: #{queue.size} results: #{aggregator.size}"
end
# Enques the task with the inputs. If the task is batched, then each
# task in task.batch will be enqued with the inputs. Returns task.
#
# An Executable may provided instead of a task.
def enq(task, *inputs)
case task
when Tap::Task
- raise "not assigned to enqueing app: #{task}" unless task.app == self
+ raise ArgumentError, "not assigned to enqueing app: #{task}" unless task.app == self
task.enq(*inputs)
when Support::Executable
queue.enq(task, inputs)
else
- raise "Not a Task or Executable: #{task}"
+ raise ArgumentError, "not a Task or Executable: #{task}"
end
task
end
# Method enque. Enques the specified method from object with the inputs.
@@ -508,18 +370,19 @@
def mq(object, method_name, *inputs)
m = object._method(method_name)
enq(m, *inputs)
end
- # Sets a sequence workflow pattern for the tasks such that the
- # completion of a task enqueues the next task with it's results.
- # Batched tasks will have the pattern set for each task in the
- # batch. The current audited results are yielded to the block,
- # if given, before the next task is enqued.
+ # Sets a sequence workflow pattern for the tasks; each task will enque
+ # the next task with it's results.
#
- # Executables may provided as well as tasks.
- def sequence(*tasks) # :yields: _result
+ # Notes:
+ # - Batched tasks will have the pattern set for each task in the batch
+ # - The current audited results are yielded to the block, if given,
+ # before the next task is enqued.
+ # - Executables may provided as well as tasks.
+ def sequence(tasks) # :yields: _result
current_task = tasks.shift
tasks.each do |next_task|
# simply pass results from one task to the next.
current_task.on_complete do |_result|
yield(_result) if block_given?
@@ -527,44 +390,124 @@
end
current_task = next_task
end
end
- # Sets a fork workflow pattern for the tasks such that each of the
- # targets will be enqueued with the results of the source when the
- # source completes. Batched tasks will have the pattern set for each
- # task in the batch. The source audited results are yielded to the
- # block, if given, before the targets are enqued.
+ # Sets a fork workflow pattern for the source task; each target
+ # will enque the results of source.
#
- # Executables may provided as well as tasks.
- def fork(source, *targets) # :yields: _result
+ # Notes:
+ # - Batched tasks will have the pattern set for each task in the batch
+ # - The current audited results are yielded to the block, if given,
+ # before the next task is enqued.
+ # - Executables may provided as well as tasks.
+ def fork(source, targets) # :yields: _result
source.on_complete do |_result|
targets.each do |target|
yield(_result) if block_given?
enq(target, _result)
end
end
end
-
- # Sets a merge workflow pattern for the tasks such that the results
- # of each source will be enqueued to the target when the source
- # completes. Batched tasks will have the pattern set for each
- # task in the batch. The source audited results are yielded to
- # the block, if given, before the target is enqued.
+
+ # Sets a simple merge workflow pattern for the source tasks. Each source
+ # enques target with it's result; no synchronization occurs, nor are
+ # results grouped before being sent to the target.
#
- # Executables may provided as well as tasks.
- def merge(target, *sources) # :yields: _result
+ # Notes:
+ # - Batched tasks will have the pattern set for each task in the batch
+ # - The current audited results are yielded to the block, if given,
+ # before the next task is enqued.
+ # - Executables may provided as well as tasks.
+ def merge(target, sources) # :yields: _result
sources.each do |source|
# merging can use the existing audit trails... each distinct
# input is getting sent to one place (the target)
source.on_complete do |_result|
yield(_result) if block_given?
enq(target, _result)
end
end
end
+ # Sets a synchronized merge workflow for the source tasks. Results from
+ # each source task are collected and enqued as a single group to the target.
+ # The target is not enqued until all sources have completed. Raises an
+ # error if a source returns twice before the target is enqued.
+ #
+ # Notes:
+ # - Batched tasks will have the pattern set for each task in the batch
+ # - The current audited results are yielded to the block, if given,
+ # before the next task is enqued.
+ # - Executables may provided as well as tasks.
+ #
+ #-- TODO: add notes on testing and the way results are received
+ # (ie as a single object)
+ def sync_merge(target, sources) # :yields: _result
+ group = Array.new(sources.length, nil)
+ sources.each_with_index do |source, index|
+ batch_map = Hash.new(0)
+ batch_length = if source.kind_of?(Support::Batchable)
+ source.batch.each_with_index {|obj, i| batch_map[obj] = i }
+ source.batch.length
+ else
+ 1
+ end
+
+ group[index] = Array.new(batch_length, nil)
+
+ source.on_complete do |_result|
+ batch_index = batch_map[_result._current_source]
+
+ if group[index][batch_index] != nil
+ raise "sync_merge collision... already got a result for #{_result._current_source}"
+ end
+
+ group[index][batch_index] = _result
+
+ unless group.flatten.include?(nil)
+ Support::Combinator.new(*group).each do |*combination|
+ # merge the source audits
+ _group_result = Support::Audit.merge(*combination)
+
+ yield(_group_result) if block_given?
+ target.enq(_group_result)
+ end
+
+ # reset the group array
+ group.collect! {|i| nil }
+ end
+ end
+ end
+ end
+
+ # Sets a choice workflow pattern for the source task. When the
+ # source task completes, switch yields the audited result to the
+ # block which then returns the index of the target to enque with
+ # the results. No target will be enqued if the index is false or
+ # nil; an error is raised if no target can be found for the
+ # specified index.
+ #
+ # Notes:
+ # - Batched tasks will have the pattern set for each task in the batch
+ # - The current audited results are yielded to the block, if given,
+ # before the next task is enqued.
+ # - Executables may provided as well as tasks.
+ def switch(source, targets) # :yields: _result
+ source.on_complete do |_result|
+ if index = yield(_result)
+ unless target = targets[index]
+ raise "no switch target for index: #{index}"
+ end
+
+ enq(target, _result)
+ else
+ aggregator.store(_result)
+ end
+ end
+ end
+
# Returns all aggregated, audited results for the specified tasks.
# Results are joined into a single array. Arrays of tasks are
# allowed as inputs. See results.
def _results(*tasks)
aggregator.retrieve_all(*tasks.flatten)
@@ -597,124 +540,12 @@
false
end
# Sets the state of the application
attr_writer :state
-
- # The thread on which run is executing tasks.
- attr_accessor :run_thread
-
- # An array containing the execution threads in use by run.
- attr_accessor :threads
-
- # A Queue containing multithread tasks waiting to be run
- # on the execution threads. Nil if max_threads == 0
- attr_accessor :thread_queue
-
- private
- def execution_loop
- while true
- case state
- when State::STOP
- break
- when State::TERMINATE
- # if an execution thread handles the termination error,
- # then the thread may end up here -- terminated but still
- # running. Raise another termination error to enter the
- # termination (rescue) code.
- raise TerminateError.new
- end
-
- yield
- end
- end
-
- def clear_thread_queue
- return unless thread_queue
-
- # clear the queue and enque the thread complete
- # signals, so that the thread will exit normally
- dequeued = []
- while !thread_queue.empty?
- dequeued << thread_queue.deq
- end
-
- # add dequeued tasks back, in order, to the task
- # queue so no tasks get lost due to the stop
- #
- # BUG: this will result in an already-newly-queued
- # task being promoted along with it's inputs
- dequeued.reverse_each do |task, inputs|
- # TODO: log about not executing
- queue.unshift(task, inputs) unless task.nil?
- end
- end
-
- def clear_threads(raise_errors=true)
- threads.synchronize do
- errors = []
- return errors if threads.empty?
-
- # clears threads gracefully by enqueuing nils, to break
- # the threads out of their loops, then waiting for the
- # threads to work through the queue to the nils
- #
- threads.size.times { thread_queue.enq nil }
- while true
- # TODO -- add a time out?
-
- threads.dup.each do |thread|
- next if thread.alive?
- threads.delete(thread)
- error = thread["error"]
-
- next if error.nil?
- raise error if raise_errors
-
- errors << error
- end
-
- break if threads.empty?
- Thread.pass
- end
-
- errors
- end
- end
-
- def start_thread
- threads.synchronize do
- # start a new thread and add it to threads.
- # threads simply loop and wait for a task to
- # be queued. the thread will block until a
- # task is available (due to thread_queue.deq)
- #
- # TODO -- track thread index like?
- # thread["index"] = threads.length
- threads << Thread.new do
- # TODO - log thread start
-
- begin
- execution_loop do
- m, inputs = thread_queue.deq
- break if m.nil?
-
- # TODO: log execute task on thread #
- execute(m, inputs)
- end
- rescue
- # an unhandled error should immediately
- # terminate all threads
- terminate
- Thread.current["error"] = $!
- end
- end
- end
- end
-
- # TerminateErrors are raised to kill executing tasks when terminate
- # is called on an running App. They are handled by the run rescue code.
+ # TerminateErrors are raised to kill executing tasks when terminate is
+ # called on an running App. They are handled by the run rescue code.
class TerminateError < RuntimeError
end
end
end
\ No newline at end of file