require 'logger'
require 'tap/support/run_error'
require 'tap/support/aggregator'
require 'tap/support/executable_queue'
module Tap
# App coordinates the setup and running of tasks, and provides an interface
# to the application directory structure. App is convenient for use within
# scripts and, with Env, provides the basis for the 'tap' command line
# application.
#
# === Running Tasks
#
# All tasks have an App (by default App.instance) through which tasks access
# access application-wide resources like the logger. Additionally, task
# enque command are forwarded to App#enq:
#
# t1 = Task.new {|task, input| input += 1 }
# t1.enq(0)
# app.enq(t1, 1)
#
# app.run
# app.results(t1) # => [1, 2]
#
# When a task completes, the results will either be passed to the task
# on_complete block (if set) or be collected into an Aggregator;
# aggregated results may be accessed per-task, as shown above. Task
# on_complete blocks typically enque other tasks, allowing the
# construction of workflows:
#
# # clear the previous results
# app.aggregator.clear
#
# t2 = Task.new {|task, input| input += 10 }
# t1.on_complete {|_result| t2.enq(_result) }
#
# t1.enq 0
# t1.enq 10
#
# app.run
# app.results(t1) # => []
# app.results(t2) # => [11, 21]
#
# Here t1 has no results because the on_complete block passed them to t2 in
# a simple sequence.
#
# ==== Batching
#
# Tasks can be batched, allowing the same input to be enqued to multiple
# tasks at once.
#
# t1 = Task.new {|task, input| input += 1 }
# t2 = Task.new {|task, input| input += 10 }
# Task.batch(t1, t2) # => [t1, t2]
#
# t1.enq 0
#
# app.run
# app.results(t1) # => [1]
# app.results(t2) # => [10]
#
# ==== Multithreading
#
# App supports multithreading; multithreaded tasks execute cosynchronously,
# each on their own thread.
#
# lock = Mutex.new
# array = []
# t1 = Task.new {|task| lock.synchronize { array << Thread.current.object_id }; sleep 0.1 }
# t2 = Task.new {|task| lock.synchronize { array << Thread.current.object_id }; sleep 0.1 }
#
# t1.multithread = true
# t1.enq
# t2.multithread = true
# t2.enq
#
# app.run
# array.length # => 2
# array[0] == array[1] # => false
#
# Naturally, it is up to you to make sure each task is thread safe. Note
# that for the most part Tap::App is NOT thread safe; only run and
# run-related methods (ready, stop, terminate, info) are synchronized.
# Methods enq and results act on thread-safe objects ExecutableQueue and
# Aggregator, and should be ok to use from multiple threads.
#
# ==== Executables
#
# App can use any Executable object in place of a task. One way to initialize
# an Executable for a method is to use the Object#_method defined by Tap. The
# result can be enqued and incorporated into workflows, but they cannot be
# batched.
#
# The mq (method enq) method generates and enques the method in one step.
#
# array = []
# m = array._method(:push)
#
# app.enq(m, 1)
# app.mq(array, :push, 2)
#
# array.empty? # => true
# app.run
# array # => [1, 2]
#
# === Auditing
#
# All results generated by executable methods are audited to track how a given
# input evolves during a workflow.
#
# To illustrate auditing, consider a workflow that uses the 'add_one' method
# to add one to an input until the result is 3, then adds five more with the
# 'add_five' method. The final result should always be 8.
#
# t1 = Tap::Task.new {|task, input| input += 1 }
# t1.name = "add_one"
#
# t2 = Tap::Task.new {|task, input| input += 5 }
# t2.name = "add_five"
#
# t1.on_complete do |_result|
# # _result is the audit; use the _current method
# # to get the current value in the audit trail
#
# _result._current < 3 ? t1.enq(_result) : t2.enq(_result)
# end
#
# t1.enq(0)
# t1.enq(1)
# t1.enq(2)
#
# app.run
# app.results(t2) # => [8,8,8]
#
# Although the results are indistinguishable, each achieved the final value
# through a different series of tasks. With auditing you can see how each
# input came to the final value of 8:
#
# # app.results returns the actual result values
# # app._results returns the audits for these values
# app._results(t2).each do |_result|
# puts "How #{_result._original} became #{_result._current}:"
# puts _result._to_s
# puts
# end
#
# Prints:
#
# How 2 became 8:
# o-[] 2
# o-[add_one] 3
# o-[add_five] 8
#
# How 1 became 8:
# o-[] 1
# o-[add_one] 2
# o-[add_one] 3
# o-[add_five] 8
#
# How 0 became 8:
# o-[] 0
# o-[add_one] 1
# o-[add_one] 2
# o-[add_one] 3
# o-[add_five] 8
#
# See Tap::Support::Audit for more details.
class App < Root
include MonitorMixin
class << self
# Sets the current app instance
attr_writer :instance
# Returns the current instance of App. If no instance has been set,
# then a new App with the default configuration will be initialized.
def instance
@instance ||= App.new
end
end
# The shared logger
attr_reader :logger
# The application queue
attr_reader :queue
# The state of the application (see App::State)
attr_reader :state
# A Tap::Support::Aggregator to collect the results of
# methods that have no on_complete block
attr_reader :aggregator
config :max_threads, 10, &c.integer # For multithread execution
config :debug, false, &c.flag # Flag debugging
config :force, false, &c.flag # Force execution at checkpoints
config :quiet, false, &c.flag # Suppress logging
# The constants defining the possible App states.
module State
READY = 0
RUN = 1
STOP = 2
TERMINATE = 3
module_function
# Returns the string corresponding to the input state value.
# Returns nil for unknown states.
#
# State.state_str(0) # => 'READY'
# State.state_str(12) # => nil
def state_str(state)
constants.inject(nil) {|str, s| const_get(s) == state ? s.to_s : str}
end
end
# Creates a new App with the given configuration.
def initialize(config={}, logger=DEFAULT_LOGGER)
super()
@state = State::READY
@threads = [].extend(MonitorMixin)
@thread_queue = nil
@run_thread = nil
@queue = Support::ExecutableQueue.new
@aggregator = Support::Aggregator.new
initialize_config(config)
self.logger = logger
end
DEFAULT_LOGGER = Logger.new(STDOUT)
DEFAULT_LOGGER.level = Logger::INFO
DEFAULT_LOGGER.formatter = lambda do |severity, time, progname, msg|
" %s[%s] %18s %s\n" % [severity[0,1], time.strftime('%H:%M:%S') , progname || '--' , msg]
end
# True if debug or the global variable $DEBUG is true.
def debug?
debug || $DEBUG
end
# Sets the current logger. The logger level is set to Logger::DEBUG if
# debug? is true.
def logger=(logger)
unless logger.nil?
logger.level = Logger::DEBUG if debug?
end
@logger = logger
end
# Logs the action and message at the input level (default INFO).
# Logging is suppressed if quiet is true.
def log(action, msg="", level=Logger::INFO)
logger.add(level, msg, action.to_s) unless quiet
end
# Returns the configuration filepath for the specified task name,
# File.join(app['config'], task_name + ".yml"). Returns nil if
# task_name is nil.
def config_filepath(name)
name == nil ? nil : filepath('config', "#{name}.yml")
end
#
# Execution methods
#
# Executes the input Executable with the inputs. Stores the result in
# aggregator unless an on_complete block is set. Returns the audited
# result.
def execute(m, inputs)
_result = m._execute(*inputs)
aggregator.store(_result) unless m.on_complete_block
_result
end
# Sets state = State::READY unless the app has a run_thread
# (ie the app is running). Returns self.
def ready
synchronize do
self.state = State::READY if self.run_thread == nil
self
end
end
# Sequentially executes the methods (ie Executable objects) in queue; run
# continues until the queue is empty and then returns self. An app can
# only run on one thread at a time. If run is called when already running,
# run returns immediately.
#
# === The Run Cycle
# Run can execute methods in sequential or multithreaded mode. In sequential
# mode, run executes enqued methods in order and on the current thread. Run
# continues until it reaches a method marked with multithread = true, at which
# point run switches into multithreading mode.
#
# When multithreading, run shifts methods off of the queue and executes each
# on their own thread (launching up to max_threads threads at one time).
# Multithread execution continues until run reaches a non-multithread method,
# at which point run blocks, waits for the threads to complete, and switches
# back into sequential mode.
#
# Run never executes multithreaded and non-multithreaded methods at the same
# time.
#
# ==== Checks
# Run checks the state of self before executing a method. If the state is
# changed to State::STOP, then no more methods will be executed; currently
# running methods will continute to completion. If the state is changed to
# State::TERMINATE then no more methods will be executed and currently running
# methods will be discontinued as described below.
#
# ==== Error Handling and Termination
# When unhandled errors arise during run, run enters a termination routine.
# During termination a TerminationError is raised in each executing method so
# that the method exits or begins executing its internal error handling code
# (perhaps performing rollbacks).
#
# The TerminationError can ONLY be raised by the method itself, usually via a
# call to Tap::Support::Framework#check_terminate. check_terminate
# is available to all Framework objects (ex Task and Workflow), but not to
# Executable methods generated by _method. These methods need to check the
# state of app themselves; otherwise they will continue on to completion even
# when app is in State::TERMINATE.
#
# # this task will loop until app.terminate
# Task.new {|task| while(true) task.check_terminate end }
#
# # this task will NEVER terminate
# Task.new {|task| while(true) end; task.check_terminate }
#
# Additional errors that arise during termination are collected and packaged
# with the orignal error into a RunError. By default all errors are logged
# and run exits. If debug? is true, then the RunError will be raised for
# further handling.
#
# Note: the method that caused the original unhandled error is no longer
# executing when termination begins and thus will not recieve a
# TerminationError.
def run
synchronize do
return self unless self.ready.state == State::READY
self.run_thread = Thread.current
self.state = State::RUN
end
# generate threading variables
self.thread_queue = max_threads > 0 ? Queue.new : nil
# TODO: log starting run
begin
execution_loop do
break if block_given? && yield(self)
# if no tasks were in the queue
# then clear the threads and
# check for tasks again
if queue.empty?
clear_threads
# break -- no executable task was found
break if queue.empty?
end
m, inputs = queue.deq
if thread_queue && m.multithread
# TODO: log enqueuing task to thread
# generate threads as needed and allowed
# to execute the threads in the thread queue
start_thread if threads.size < max_threads
# NOTE: the producer-consumer relationship of execution
# threads and the thread_queue means that tasks will sit
# waiting until an execution thread opens up. in the most
# extreme case all executing tasks and all tasks in the
# task_queue could be the same task, each with different
# inputs. this deviates from the idea of batch processing,
# but should be rare and not at all fatal given execute
# synchronization.
thread_queue.enq [m, inputs]
else
# TODO: log execute task
# wait for threads to complete
# before executing the main thread
clear_threads
execute(m, inputs)
end
end
# if the run loop exited due to a STOP state,
# tasks may still be in the thread queue and/or
# running. be sure these are cleared
clear_thread_queue
clear_threads
rescue
# when an error is generated, be sure to terminate
# all threads so they can clean up after themselves.
# clear the thread queue first so no more tasks are
# executed. collect any errors that arise during
# termination.
clear_thread_queue
errors = [$!] + clear_threads(false)
errors.delete_if {|error| error.kind_of?(TerminateError) }
# handle the errors accordingly
case
when debug?
raise Tap::Support::RunError.new(errors)
else
errors.each_with_index do |err, index|
log("RunError [#{index}] #{err.class}", err.message)
end
end
ensure
# reset run variables
self.thread_queue = nil
synchronize do
self.run_thread = nil
self.state = State::READY
end
end
# TODO: log run complete
self
end
# Signals a running application to stop executing tasks in the
# queue by setting state = State::STOP. Currently executing
# tasks will continue their execution uninterrupted.
#
# Does nothing unless state is State::RUN.
def stop
synchronize do
self.state = State::STOP if self.state == State::RUN
self
end
end
# Signals a running application to terminate executing tasks
# by setting state = State::TERMINATE. When running tasks
# reach a termination check, the task raises a TerminationError,
# thus allowing executing tasks to invoke their specific
# error handling code, perhaps performing rollbacks.
#
# Termination checks can be manually specified in a task
# using the check_terminate method (see Tap::Task#check_terminate).
# Termination checks automatically occur before each task execution.
#
# Does nothing if state == State::READY.
def terminate
synchronize do
self.state = State::TERMINATE unless self.state == State::READY
self
end
end
# Returns an information string for the App.
#
# App.instance.info # => 'state: 0 (READY) queue: 0 thread_queue: 0 threads: 0 results: 0'
#
# Provided information:
#
# state:: the integer and string values of self.state
# queue:: the number of methods currently in the queue
# thread_queue:: number of objects in the thread queue, waiting
# to be run on an execution thread (methods, and
# perhaps nils to signal threads to clear)
# threads:: the number of execution threads
# results:: the total number of results in aggregator
def info
synchronize do
"state: #{state} (#{State.state_str(state)}) queue: #{queue.size} thread_queue: #{thread_queue ? thread_queue.size : 0} threads: #{threads.size} results: #{aggregator.size}"
end
end
# Enques the task with the inputs. If the task is batched, then each
# task in task.batch will be enqued with the inputs. Returns task.
#
# An Executable may provided instead of a task.
def enq(task, *inputs)
case task
when Tap::Task
raise "not assigned to enqueing app: #{task}" unless task.app == self
task.enq(*inputs)
when Support::Executable
queue.enq(task, inputs)
else
raise "Not a Task or Executable: #{task}"
end
task
end
# Method enque. Enques the specified method from object with the inputs.
# Returns the enqued method.
def mq(object, method_name, *inputs)
m = object._method(method_name)
enq(m, *inputs)
end
# Sets a sequence workflow pattern for the tasks such that the
# completion of a task enqueues the next task with it's results.
# Batched tasks will have the pattern set for each task in the
# batch. The current audited results are yielded to the block,
# if given, before the next task is enqued.
#
# Executables may provided as well as tasks.
def sequence(*tasks) # :yields: _result
current_task = tasks.shift
tasks.each do |next_task|
# simply pass results from one task to the next.
current_task.on_complete do |_result|
yield(_result) if block_given?
enq(next_task, _result)
end
current_task = next_task
end
end
# Sets a fork workflow pattern for the tasks such that each of the
# targets will be enqueued with the results of the source when the
# source completes. Batched tasks will have the pattern set for each
# task in the batch. The source audited results are yielded to the
# block, if given, before the targets are enqued.
#
# Executables may provided as well as tasks.
def fork(source, *targets) # :yields: _result
source.on_complete do |_result|
targets.each do |target|
yield(_result) if block_given?
enq(target, _result)
end
end
end
# Sets a merge workflow pattern for the tasks such that the results
# of each source will be enqueued to the target when the source
# completes. Batched tasks will have the pattern set for each
# task in the batch. The source audited results are yielded to
# the block, if given, before the target is enqued.
#
# Executables may provided as well as tasks.
def merge(target, *sources) # :yields: _result
sources.each do |source|
# merging can use the existing audit trails... each distinct
# input is getting sent to one place (the target)
source.on_complete do |_result|
yield(_result) if block_given?
enq(target, _result)
end
end
end
# Returns all aggregated, audited results for the specified tasks.
# Results are joined into a single array. Arrays of tasks are
# allowed as inputs. See results.
def _results(*tasks)
aggregator.retrieve_all(*tasks.flatten)
end
# Returns all aggregated results for the specified tasks. Results are
# joined into a single array. Arrays of tasks are allowed as inputs.
#
# t1 = Task.new {|task, input| input += 1 }
# t2 = Task.new {|task, input| input += 10 }
# t3 = t2.initialize_batch_obj
#
# t1.enq(0)
# t2.enq(1)
#
# app.run
# app.results(t1, t2.batch) # => [1, 11, 11]
# app.results(t2, t1) # => [11, 1]
#
def results(*tasks)
_results(tasks).collect {|_result| _result._current}
end
protected
# A hook for handling unknown configurations in subclasses, called from
# configure. If handle_configuration evaluates to false, then configure
# raises an error.
def handle_configuation(key, value)
false
end
# Sets the state of the application
attr_writer :state
# The thread on which run is executing tasks.
attr_accessor :run_thread
# An array containing the execution threads in use by run.
attr_accessor :threads
# A Queue containing multithread tasks waiting to be run
# on the execution threads. Nil if max_threads == 0
attr_accessor :thread_queue
private
def execution_loop
while true
case state
when State::STOP
break
when State::TERMINATE
# if an execution thread handles the termination error,
# then the thread may end up here -- terminated but still
# running. Raise another termination error to enter the
# termination (rescue) code.
raise TerminateError.new
end
yield
end
end
def clear_thread_queue
return unless thread_queue
# clear the queue and enque the thread complete
# signals, so that the thread will exit normally
dequeued = []
while !thread_queue.empty?
dequeued << thread_queue.deq
end
# add dequeued tasks back, in order, to the task
# queue so no tasks get lost due to the stop
#
# BUG: this will result in an already-newly-queued
# task being promoted along with it's inputs
dequeued.reverse_each do |task, inputs|
# TODO: log about not executing
queue.unshift(task, inputs) unless task.nil?
end
end
def clear_threads(raise_errors=true)
threads.synchronize do
errors = []
return errors if threads.empty?
# clears threads gracefully by enqueuing nils, to break
# the threads out of their loops, then waiting for the
# threads to work through the queue to the nils
#
threads.size.times { thread_queue.enq nil }
while true
# TODO -- add a time out?
threads.dup.each do |thread|
next if thread.alive?
threads.delete(thread)
error = thread["error"]
next if error.nil?
raise error if raise_errors
errors << error
end
break if threads.empty?
Thread.pass
end
errors
end
end
def start_thread
threads.synchronize do
# start a new thread and add it to threads.
# threads simply loop and wait for a task to
# be queued. the thread will block until a
# task is available (due to thread_queue.deq)
#
# TODO -- track thread index like?
# thread["index"] = threads.length
threads << Thread.new do
# TODO - log thread start
begin
execution_loop do
m, inputs = thread_queue.deq
break if m.nil?
# TODO: log execute task on thread #
execute(m, inputs)
end
rescue
# an unhandled error should immediately
# terminate all threads
terminate
Thread.current["error"] = $!
end
end
end
end
# TerminateErrors are raised to kill executing tasks when terminate
# is called on an running App. They are handled by the run rescue code.
class TerminateError < RuntimeError
end
end
end