require 'logger' require 'tap/support/run_error' require 'tap/support/aggregator' require 'tap/support/executable_queue' module Tap # App coordinates the setup and running of tasks, and provides an interface # to the application directory structure. App is convenient for use within # scripts and, with Env, provides the basis for the 'tap' command line # application. # # === Running Tasks # # All tasks have an App (by default App.instance) through which tasks access # access application-wide resources like the logger. Additionally, task # enque command are forwarded to App#enq: # # t1 = Task.new {|task, input| input += 1 } # t1.enq(0) # app.enq(t1, 1) # # app.run # app.results(t1) # => [1, 2] # # When a task completes, the results will either be passed to the task # on_complete block (if set) or be collected into an Aggregator; # aggregated results may be accessed per-task, as shown above. Task # on_complete blocks typically enque other tasks, allowing the # construction of workflows: # # # clear the previous results # app.aggregator.clear # # t2 = Task.new {|task, input| input += 10 } # t1.on_complete {|_result| t2.enq(_result) } # # t1.enq 0 # t1.enq 10 # # app.run # app.results(t1) # => [] # app.results(t2) # => [11, 21] # # Here t1 has no results because the on_complete block passed them to t2 in # a simple sequence. # # ==== Batching # # Tasks can be batched, allowing the same input to be enqued to multiple # tasks at once. # # t1 = Task.new {|task, input| input += 1 } # t2 = Task.new {|task, input| input += 10 } # Task.batch(t1, t2) # => [t1, t2] # # t1.enq 0 # # app.run # app.results(t1) # => [1] # app.results(t2) # => [10] # # ==== Multithreading # # App supports multithreading; multithreaded tasks execute cosynchronously, # each on their own thread. # # lock = Mutex.new # array = [] # t1 = Task.new {|task| lock.synchronize { array << Thread.current.object_id }; sleep 0.1 } # t2 = Task.new {|task| lock.synchronize { array << Thread.current.object_id }; sleep 0.1 } # # t1.multithread = true # t1.enq # t2.multithread = true # t2.enq # # app.run # array.length # => 2 # array[0] == array[1] # => false # # Naturally, it is up to you to make sure each task is thread safe. Note # that for the most part Tap::App is NOT thread safe; only run and # run-related methods (ready, stop, terminate, info) are synchronized. # Methods enq and results act on thread-safe objects ExecutableQueue and # Aggregator, and should be ok to use from multiple threads. # # ==== Executables # # App can use any Executable object in place of a task. One way to initialize # an Executable for a method is to use the Object#_method defined by Tap. The # result can be enqued and incorporated into workflows, but they cannot be # batched. # # The mq (method enq) method generates and enques the method in one step. # # array = [] # m = array._method(:push) # # app.enq(m, 1) # app.mq(array, :push, 2) # # array.empty? # => true # app.run # array # => [1, 2] # # === Auditing # # All results generated by executable methods are audited to track how a given # input evolves during a workflow. # # To illustrate auditing, consider a workflow that uses the 'add_one' method # to add one to an input until the result is 3, then adds five more with the # 'add_five' method. The final result should always be 8. # # t1 = Tap::Task.new {|task, input| input += 1 } # t1.name = "add_one" # # t2 = Tap::Task.new {|task, input| input += 5 } # t2.name = "add_five" # # t1.on_complete do |_result| # # _result is the audit; use the _current method # # to get the current value in the audit trail # # _result._current < 3 ? t1.enq(_result) : t2.enq(_result) # end # # t1.enq(0) # t1.enq(1) # t1.enq(2) # # app.run # app.results(t2) # => [8,8,8] # # Although the results are indistinguishable, each achieved the final value # through a different series of tasks. With auditing you can see how each # input came to the final value of 8: # # # app.results returns the actual result values # # app._results returns the audits for these values # app._results(t2).each do |_result| # puts "How #{_result._original} became #{_result._current}:" # puts _result._to_s # puts # end # # Prints: # # How 2 became 8: # o-[] 2 # o-[add_one] 3 # o-[add_five] 8 # # How 1 became 8: # o-[] 1 # o-[add_one] 2 # o-[add_one] 3 # o-[add_five] 8 # # How 0 became 8: # o-[] 0 # o-[add_one] 1 # o-[add_one] 2 # o-[add_one] 3 # o-[add_five] 8 # # See Tap::Support::Audit for more details. class App < Root include MonitorMixin class << self # Sets the current app instance attr_writer :instance # Returns the current instance of App. If no instance has been set, # then a new App with the default configuration will be initialized. def instance @instance ||= App.new end end # The shared logger attr_reader :logger # The application queue attr_reader :queue # The state of the application (see App::State) attr_reader :state # A Tap::Support::Aggregator to collect the results of # methods that have no on_complete block attr_reader :aggregator config :max_threads, 10, &c.integer # For multithread execution config :debug, false, &c.flag # Flag debugging config :force, false, &c.flag # Force execution at checkpoints config :quiet, false, &c.flag # Suppress logging # The constants defining the possible App states. module State READY = 0 RUN = 1 STOP = 2 TERMINATE = 3 module_function # Returns the string corresponding to the input state value. # Returns nil for unknown states. # # State.state_str(0) # => 'READY' # State.state_str(12) # => nil def state_str(state) constants.inject(nil) {|str, s| const_get(s) == state ? s.to_s : str} end end # Creates a new App with the given configuration. def initialize(config={}, logger=DEFAULT_LOGGER) super() @state = State::READY @threads = [].extend(MonitorMixin) @thread_queue = nil @run_thread = nil @queue = Support::ExecutableQueue.new @aggregator = Support::Aggregator.new initialize_config(config) self.logger = logger end DEFAULT_LOGGER = Logger.new(STDOUT) DEFAULT_LOGGER.level = Logger::INFO DEFAULT_LOGGER.formatter = lambda do |severity, time, progname, msg| " %s[%s] %18s %s\n" % [severity[0,1], time.strftime('%H:%M:%S') , progname || '--' , msg] end # True if debug or the global variable $DEBUG is true. def debug? debug || $DEBUG end # Sets the current logger. The logger level is set to Logger::DEBUG if # debug? is true. def logger=(logger) unless logger.nil? logger.level = Logger::DEBUG if debug? end @logger = logger end # Logs the action and message at the input level (default INFO). # Logging is suppressed if quiet is true. def log(action, msg="", level=Logger::INFO) logger.add(level, msg, action.to_s) unless quiet end # Returns the configuration filepath for the specified task name, # File.join(app['config'], task_name + ".yml"). Returns nil if # task_name is nil. def config_filepath(name) name == nil ? nil : filepath('config', "#{name}.yml") end # # Execution methods # # Executes the input Executable with the inputs. Stores the result in # aggregator unless an on_complete block is set. Returns the audited # result. def execute(m, inputs) _result = m._execute(*inputs) aggregator.store(_result) unless m.on_complete_block _result end # Sets state = State::READY unless the app has a run_thread # (ie the app is running). Returns self. def ready synchronize do self.state = State::READY if self.run_thread == nil self end end # Sequentially executes the methods (ie Executable objects) in queue; run # continues until the queue is empty and then returns self. An app can # only run on one thread at a time. If run is called when already running, # run returns immediately. # # === The Run Cycle # Run can execute methods in sequential or multithreaded mode. In sequential # mode, run executes enqued methods in order and on the current thread. Run # continues until it reaches a method marked with multithread = true, at which # point run switches into multithreading mode. # # When multithreading, run shifts methods off of the queue and executes each # on their own thread (launching up to max_threads threads at one time). # Multithread execution continues until run reaches a non-multithread method, # at which point run blocks, waits for the threads to complete, and switches # back into sequential mode. # # Run never executes multithreaded and non-multithreaded methods at the same # time. # # ==== Checks # Run checks the state of self before executing a method. If the state is # changed to State::STOP, then no more methods will be executed; currently # running methods will continute to completion. If the state is changed to # State::TERMINATE then no more methods will be executed and currently running # methods will be discontinued as described below. # # ==== Error Handling and Termination # When unhandled errors arise during run, run enters a termination routine. # During termination a TerminationError is raised in each executing method so # that the method exits or begins executing its internal error handling code # (perhaps performing rollbacks). # # The TerminationError can ONLY be raised by the method itself, usually via a # call to Tap::Support::Framework#check_terminate. check_terminate # is available to all Framework objects (ex Task and Workflow), but not to # Executable methods generated by _method. These methods need to check the # state of app themselves; otherwise they will continue on to completion even # when app is in State::TERMINATE. # # # this task will loop until app.terminate # Task.new {|task| while(true) task.check_terminate end } # # # this task will NEVER terminate # Task.new {|task| while(true) end; task.check_terminate } # # Additional errors that arise during termination are collected and packaged # with the orignal error into a RunError. By default all errors are logged # and run exits. If debug? is true, then the RunError will be raised for # further handling. # # Note: the method that caused the original unhandled error is no longer # executing when termination begins and thus will not recieve a # TerminationError. def run synchronize do return self unless self.ready.state == State::READY self.run_thread = Thread.current self.state = State::RUN end # generate threading variables self.thread_queue = max_threads > 0 ? Queue.new : nil # TODO: log starting run begin execution_loop do break if block_given? && yield(self) # if no tasks were in the queue # then clear the threads and # check for tasks again if queue.empty? clear_threads # break -- no executable task was found break if queue.empty? end m, inputs = queue.deq if thread_queue && m.multithread # TODO: log enqueuing task to thread # generate threads as needed and allowed # to execute the threads in the thread queue start_thread if threads.size < max_threads # NOTE: the producer-consumer relationship of execution # threads and the thread_queue means that tasks will sit # waiting until an execution thread opens up. in the most # extreme case all executing tasks and all tasks in the # task_queue could be the same task, each with different # inputs. this deviates from the idea of batch processing, # but should be rare and not at all fatal given execute # synchronization. thread_queue.enq [m, inputs] else # TODO: log execute task # wait for threads to complete # before executing the main thread clear_threads execute(m, inputs) end end # if the run loop exited due to a STOP state, # tasks may still be in the thread queue and/or # running. be sure these are cleared clear_thread_queue clear_threads rescue # when an error is generated, be sure to terminate # all threads so they can clean up after themselves. # clear the thread queue first so no more tasks are # executed. collect any errors that arise during # termination. clear_thread_queue errors = [$!] + clear_threads(false) errors.delete_if {|error| error.kind_of?(TerminateError) } # handle the errors accordingly case when debug? raise Tap::Support::RunError.new(errors) else errors.each_with_index do |err, index| log("RunError [#{index}] #{err.class}", err.message) end end ensure # reset run variables self.thread_queue = nil synchronize do self.run_thread = nil self.state = State::READY end end # TODO: log run complete self end # Signals a running application to stop executing tasks in the # queue by setting state = State::STOP. Currently executing # tasks will continue their execution uninterrupted. # # Does nothing unless state is State::RUN. def stop synchronize do self.state = State::STOP if self.state == State::RUN self end end # Signals a running application to terminate executing tasks # by setting state = State::TERMINATE. When running tasks # reach a termination check, the task raises a TerminationError, # thus allowing executing tasks to invoke their specific # error handling code, perhaps performing rollbacks. # # Termination checks can be manually specified in a task # using the check_terminate method (see Tap::Task#check_terminate). # Termination checks automatically occur before each task execution. # # Does nothing if state == State::READY. def terminate synchronize do self.state = State::TERMINATE unless self.state == State::READY self end end # Returns an information string for the App. # # App.instance.info # => 'state: 0 (READY) queue: 0 thread_queue: 0 threads: 0 results: 0' # # Provided information: # # state:: the integer and string values of self.state # queue:: the number of methods currently in the queue # thread_queue:: number of objects in the thread queue, waiting # to be run on an execution thread (methods, and # perhaps nils to signal threads to clear) # threads:: the number of execution threads # results:: the total number of results in aggregator def info synchronize do "state: #{state} (#{State.state_str(state)}) queue: #{queue.size} thread_queue: #{thread_queue ? thread_queue.size : 0} threads: #{threads.size} results: #{aggregator.size}" end end # Enques the task with the inputs. If the task is batched, then each # task in task.batch will be enqued with the inputs. Returns task. # # An Executable may provided instead of a task. def enq(task, *inputs) case task when Tap::Task raise "not assigned to enqueing app: #{task}" unless task.app == self task.enq(*inputs) when Support::Executable queue.enq(task, inputs) else raise "Not a Task or Executable: #{task}" end task end # Method enque. Enques the specified method from object with the inputs. # Returns the enqued method. def mq(object, method_name, *inputs) m = object._method(method_name) enq(m, *inputs) end # Sets a sequence workflow pattern for the tasks such that the # completion of a task enqueues the next task with it's results. # Batched tasks will have the pattern set for each task in the # batch. The current audited results are yielded to the block, # if given, before the next task is enqued. # # Executables may provided as well as tasks. def sequence(*tasks) # :yields: _result current_task = tasks.shift tasks.each do |next_task| # simply pass results from one task to the next. current_task.on_complete do |_result| yield(_result) if block_given? enq(next_task, _result) end current_task = next_task end end # Sets a fork workflow pattern for the tasks such that each of the # targets will be enqueued with the results of the source when the # source completes. Batched tasks will have the pattern set for each # task in the batch. The source audited results are yielded to the # block, if given, before the targets are enqued. # # Executables may provided as well as tasks. def fork(source, *targets) # :yields: _result source.on_complete do |_result| targets.each do |target| yield(_result) if block_given? enq(target, _result) end end end # Sets a merge workflow pattern for the tasks such that the results # of each source will be enqueued to the target when the source # completes. Batched tasks will have the pattern set for each # task in the batch. The source audited results are yielded to # the block, if given, before the target is enqued. # # Executables may provided as well as tasks. def merge(target, *sources) # :yields: _result sources.each do |source| # merging can use the existing audit trails... each distinct # input is getting sent to one place (the target) source.on_complete do |_result| yield(_result) if block_given? enq(target, _result) end end end # Returns all aggregated, audited results for the specified tasks. # Results are joined into a single array. Arrays of tasks are # allowed as inputs. See results. def _results(*tasks) aggregator.retrieve_all(*tasks.flatten) end # Returns all aggregated results for the specified tasks. Results are # joined into a single array. Arrays of tasks are allowed as inputs. # # t1 = Task.new {|task, input| input += 1 } # t2 = Task.new {|task, input| input += 10 } # t3 = t2.initialize_batch_obj # # t1.enq(0) # t2.enq(1) # # app.run # app.results(t1, t2.batch) # => [1, 11, 11] # app.results(t2, t1) # => [11, 1] # def results(*tasks) _results(tasks).collect {|_result| _result._current} end protected # A hook for handling unknown configurations in subclasses, called from # configure. If handle_configuration evaluates to false, then configure # raises an error. def handle_configuation(key, value) false end # Sets the state of the application attr_writer :state # The thread on which run is executing tasks. attr_accessor :run_thread # An array containing the execution threads in use by run. attr_accessor :threads # A Queue containing multithread tasks waiting to be run # on the execution threads. Nil if max_threads == 0 attr_accessor :thread_queue private def execution_loop while true case state when State::STOP break when State::TERMINATE # if an execution thread handles the termination error, # then the thread may end up here -- terminated but still # running. Raise another termination error to enter the # termination (rescue) code. raise TerminateError.new end yield end end def clear_thread_queue return unless thread_queue # clear the queue and enque the thread complete # signals, so that the thread will exit normally dequeued = [] while !thread_queue.empty? dequeued << thread_queue.deq end # add dequeued tasks back, in order, to the task # queue so no tasks get lost due to the stop # # BUG: this will result in an already-newly-queued # task being promoted along with it's inputs dequeued.reverse_each do |task, inputs| # TODO: log about not executing queue.unshift(task, inputs) unless task.nil? end end def clear_threads(raise_errors=true) threads.synchronize do errors = [] return errors if threads.empty? # clears threads gracefully by enqueuing nils, to break # the threads out of their loops, then waiting for the # threads to work through the queue to the nils # threads.size.times { thread_queue.enq nil } while true # TODO -- add a time out? threads.dup.each do |thread| next if thread.alive? threads.delete(thread) error = thread["error"] next if error.nil? raise error if raise_errors errors << error end break if threads.empty? Thread.pass end errors end end def start_thread threads.synchronize do # start a new thread and add it to threads. # threads simply loop and wait for a task to # be queued. the thread will block until a # task is available (due to thread_queue.deq) # # TODO -- track thread index like? # thread["index"] = threads.length threads << Thread.new do # TODO - log thread start begin execution_loop do m, inputs = thread_queue.deq break if m.nil? # TODO: log execute task on thread # execute(m, inputs) end rescue # an unhandled error should immediately # terminate all threads terminate Thread.current["error"] = $! end end end end # TerminateErrors are raised to kill executing tasks when terminate # is called on an running App. They are handled by the run rescue code. class TerminateError < RuntimeError end end end