lib/tap/app.rb in bahuvrihi-tap-0.10.3 vs lib/tap/app.rb in bahuvrihi-tap-0.10.4

- old
+ new

@@ -1,11 +1,13 @@ require 'logger' -require 'tap/support/run_error' require 'tap/support/aggregator' require 'tap/support/executable_queue' module Tap + module Support + autoload(:Combinator, 'tap/support/combinator') + end # App coordinates the setup and running of tasks, and provides an interface # to the application directory structure. App is convenient for use within # scripts and, with Env, provides the basis for the 'tap' command line # application. @@ -25,11 +27,11 @@ # # When a task completes, the results will either be passed to the task # <tt>on_complete</tt> block (if set) or be collected into an Aggregator; # aggregated results may be accessed per-task, as shown above. Task # <tt>on_complete</tt> blocks typically enque other tasks, allowing the - # construction of workflows: + # construction of imperative workflows: # # # clear the previous results # app.aggregator.clear # # t2 = Task.new {|task, input| input += 10 } @@ -43,10 +45,33 @@ # app.results(t2) # => [11, 21] # # Here t1 has no results because the on_complete block passed them to t2 in # a simple sequence. # + # ==== Dependencies + # + # Tasks allow the construction of dependency-based workflows as well; tasks + # may be set to depend on other tasks such that the dependent task only + # executes after the dependencies have been resolved (ie executed with a + # given set of inputs). + # + # array = [] + # t1 = Task.new {|task, *inputs| array << inputs } + # t2 = Task.new {|task, *inputs| array << inputs } + # + # t1.depends_on(t2,1,2,3) + # t1.enq(4,5,6) + # + # app.run + # array # => [[1,2,3], [4,5,6]] + # + # Once a dependency is resolved, it will not execute again: + # + # t1.enq(7,8) + # app.run + # array # => [[1,2,3], [4,5,6], [7,8]] + # # ==== Batching # # Tasks can be batched, allowing the same input to be enqued to multiple # tasks at once. # @@ -58,35 +83,10 @@ # # app.run # app.results(t1) # => [1] # app.results(t2) # => [10] # - # ==== Multithreading - # - # App supports multithreading; multithreaded tasks execute cosynchronously, - # each on their own thread. - # - # lock = Mutex.new - # array = [] - # t1 = Task.new {|task| lock.synchronize { array << Thread.current.object_id }; sleep 0.1 } - # t2 = Task.new {|task| lock.synchronize { array << Thread.current.object_id }; sleep 0.1 } - # - # t1.multithread = true - # t1.enq - # t2.multithread = true - # t2.enq - # - # app.run - # array.length # => 2 - # array[0] == array[1] # => false - # - # Naturally, it is up to you to make sure each task is thread safe. Note - # that for the most part Tap::App is NOT thread safe; only run and - # run-related methods (ready, stop, terminate, info) are synchronized. - # Methods enq and results act on thread-safe objects ExecutableQueue and - # Aggregator, and should be ok to use from multiple threads. - # # ==== Executables # # App can use any Executable object in place of a task. One way to initialize # an Executable for a method is to use the Object#_method defined by Tap. The # result can be enqued and incorporated into workflows, but they cannot be @@ -165,12 +165,10 @@ # o-[add_one] 3 # o-[add_five] 8 # # See Tap::Support::Audit for more details. class App < Root - include MonitorMixin - class << self # Sets the current app instance attr_writer :instance # Returns the current instance of App. If no instance has been set, @@ -191,15 +189,15 @@ # A Tap::Support::Aggregator to collect the results of # methods that have no <tt>on_complete</tt> block attr_reader :aggregator - config :max_threads, 10, &c.integer # For multithread execution config :debug, false, &c.flag # Flag debugging config :force, false, &c.flag # Force execution at checkpoints config :quiet, false, &c.flag # Suppress logging - + config :verbose, false, &c.flag # Enables extra logging (overrides quiet) + # The constants defining the possible App states. module State READY = 0 RUN = 1 STOP = 2 @@ -220,22 +218,19 @@ # Creates a new App with the given configuration. def initialize(config={}, logger=DEFAULT_LOGGER) super() @state = State::READY - @threads = [].extend(MonitorMixin) - @thread_queue = nil - @run_thread = nil - @queue = Support::ExecutableQueue.new @aggregator = Support::Aggregator.new initialize_config(config) self.logger = logger end - DEFAULT_LOGGER = Logger.new(STDOUT) + # The default App logger writes to $stdout at level INFO. + DEFAULT_LOGGER = Logger.new($stdout) DEFAULT_LOGGER.level = Logger::INFO DEFAULT_LOGGER.formatter = lambda do |severity, time, progname, msg| " %s[%s] %18s %s\n" % [severity[0,1], time.strftime('%H:%M:%S') , progname || '--' , msg] end @@ -255,11 +250,11 @@ end # Logs the action and message at the input level (default INFO). # Logging is suppressed if quiet is true. def log(action, msg="", level=Logger::INFO) - logger.add(level, msg, action.to_s) unless quiet + logger.add(level, msg, action.to_s) if !quiet || verbose end # Returns the configuration filepath for the specified task name, # File.join(app['config'], task_name + ".yml"). Returns nil if # task_name is nil. @@ -278,229 +273,96 @@ _result = m._execute(*inputs) aggregator.store(_result) unless m.on_complete_block _result end - # Sets state = State::READY unless the app has a run_thread - # (ie the app is running). Returns self. + # Sets state = State::READY unless the app is running. Returns self. def ready - synchronize do - self.state = State::READY if self.run_thread == nil - self - end + self.state = State::READY unless self.state == State::RUN + self end - # Sequentially executes the methods (ie Executable objects) in queue; run - # continues until the queue is empty and then returns self. An app can - # only run on one thread at a time. If run is called when already running, - # run returns immediately. + # Sequentially calls execute with the Executable methods and inputs in + # queue; run continues until the queue is empty and then returns self. + # Calls to run when already running will return immediately. # - # === The Run Cycle - # Run can execute methods in sequential or multithreaded mode. In sequential - # mode, run executes enqued methods in order and on the current thread. Run - # continues until it reaches a method marked with multithread = true, at which - # point run switches into multithreading mode. - # - # When multithreading, run shifts methods off of the queue and executes each - # on their own thread (launching up to max_threads threads at one time). - # Multithread execution continues until run reaches a non-multithread method, - # at which point run blocks, waits for the threads to complete, and switches - # back into sequential mode. - # - # Run never executes multithreaded and non-multithreaded methods at the same - # time. - # - # ==== Checks # Run checks the state of self before executing a method. If the state is # changed to State::STOP, then no more methods will be executed; currently # running methods will continute to completion. If the state is changed to - # State::TERMINATE then no more methods will be executed and currently running - # methods will be discontinued as described below. - # - # ==== Error Handling and Termination - # When unhandled errors arise during run, run enters a termination routine. - # During termination a TerminationError is raised in each executing method so - # that the method exits or begins executing its internal error handling code - # (perhaps performing rollbacks). - # - # The TerminationError can ONLY be raised by the method itself, usually via a - # call to Tap::Support::Framework#check_terminate. <tt>check_terminate</tt> - # is available to all Framework objects (ex Task and Workflow), but not to - # Executable methods generated by _method. These methods need to check the - # state of app themselves; otherwise they will continue on to completion even - # when app is in State::TERMINATE. - # - # # this task will loop until app.terminate - # Task.new {|task| while(true) task.check_terminate end } - # - # # this task will NEVER terminate - # Task.new {|task| while(true) end; task.check_terminate } - # - # Additional errors that arise during termination are collected and packaged - # with the orignal error into a RunError. By default all errors are logged - # and run exits. If debug? is true, then the RunError will be raised for - # further handling. - # - # Note: the method that caused the original unhandled error is no longer - # executing when termination begins and thus will not recieve a - # TerminationError. + # State::TERMINATE then no more methods will be executed and currently + # running methods will be discontinued as described in terminate. def run - synchronize do - return self unless self.ready.state == State::READY + return self unless state == State::READY + self.state = State::RUN - self.run_thread = Thread.current - self.state = State::RUN - end - - # generate threading variables - self.thread_queue = max_threads > 0 ? Queue.new : nil - # TODO: log starting run - begin - execution_loop do - break if block_given? && yield(self) - - # if no tasks were in the queue - # then clear the threads and - # check for tasks again - if queue.empty? - clear_threads - # break -- no executable task was found - break if queue.empty? - end - - m, inputs = queue.deq - - if thread_queue && m.multithread - # TODO: log enqueuing task to thread - - # generate threads as needed and allowed - # to execute the threads in the thread queue - start_thread if threads.size < max_threads - - # NOTE: the producer-consumer relationship of execution - # threads and the thread_queue means that tasks will sit - # waiting until an execution thread opens up. in the most - # extreme case all executing tasks and all tasks in the - # task_queue could be the same task, each with different - # inputs. this deviates from the idea of batch processing, - # but should be rare and not at all fatal given execute - # synchronization. - thread_queue.enq [m, inputs] - - else - # TODO: log execute task - - # wait for threads to complete - # before executing the main thread - clear_threads - execute(m, inputs) - end + begin + until queue.empty? || state != State::RUN + execute(*queue.deq) end - - # if the run loop exited due to a STOP state, - # tasks may still be in the thread queue and/or - # running. be sure these are cleared - clear_thread_queue - clear_threads - - rescue - # when an error is generated, be sure to terminate - # all threads so they can clean up after themselves. - # clear the thread queue first so no more tasks are - # executed. collect any errors that arise during - # termination. - clear_thread_queue - errors = [$!] + clear_threads(false) - errors.delete_if {|error| error.kind_of?(TerminateError) } - - # handle the errors accordingly - case - when debug? - raise Tap::Support::RunError.new(errors) - else - errors.each_with_index do |err, index| - log("RunError [#{index}] #{err.class}", err.message) - end - end + rescue(TerminateError) + # gracefully fail for termination errors + rescue(Exception) + # handle other errors accordingly + raise if debug? + log($!.class, $!.message) ensure - - # reset run variables - self.thread_queue = nil - - synchronize do - self.run_thread = nil - self.state = State::READY - end + self.state = State::READY end # TODO: log run complete self end # Signals a running application to stop executing tasks in the - # queue by setting state = State::STOP. Currently executing - # tasks will continue their execution uninterrupted. + # queue by setting state = State::STOP. The task currently + # executing will continue uninterrupted to completion. # # Does nothing unless state is State::RUN. def stop - synchronize do - self.state = State::STOP if self.state == State::RUN - self - end + self.state = State::STOP if self.state == State::RUN + self end - # Signals a running application to terminate executing tasks - # by setting state = State::TERMINATE. When running tasks - # reach a termination check, the task raises a TerminationError, - # thus allowing executing tasks to invoke their specific - # error handling code, perhaps performing rollbacks. + # Signals a running application to terminate execution by setting + # state = State::TERMINATE. In this state, an executing task + # will then raise a TerminateError upon check_terminate, thus + # allowing the invocation of task-specific termination, perhaps + # performing rollbacks. (see Tap::Task#check_terminate). # - # Termination checks can be manually specified in a task - # using the check_terminate method (see Tap::Task#check_terminate). - # Termination checks automatically occur before each task execution. - # # Does nothing if state == State::READY. def terminate - synchronize do - self.state = State::TERMINATE unless self.state == State::READY - self - end + self.state = State::TERMINATE unless self.state == State::READY + self end # Returns an information string for the App. # - # App.instance.info # => 'state: 0 (READY) queue: 0 thread_queue: 0 threads: 0 results: 0' + # App.instance.info # => 'state: 0 (READY) queue: 0 results: 0' # # Provided information: # # state:: the integer and string values of self.state # queue:: the number of methods currently in the queue - # thread_queue:: number of objects in the thread queue, waiting - # to be run on an execution thread (methods, and - # perhaps nils to signal threads to clear) - # threads:: the number of execution threads # results:: the total number of results in aggregator def info - synchronize do - "state: #{state} (#{State.state_str(state)}) queue: #{queue.size} thread_queue: #{thread_queue ? thread_queue.size : 0} threads: #{threads.size} results: #{aggregator.size}" - end + "state: #{state} (#{State.state_str(state)}) queue: #{queue.size} results: #{aggregator.size}" end # Enques the task with the inputs. If the task is batched, then each # task in task.batch will be enqued with the inputs. Returns task. # # An Executable may provided instead of a task. def enq(task, *inputs) case task when Tap::Task - raise "not assigned to enqueing app: #{task}" unless task.app == self + raise ArgumentError, "not assigned to enqueing app: #{task}" unless task.app == self task.enq(*inputs) when Support::Executable queue.enq(task, inputs) else - raise "Not a Task or Executable: #{task}" + raise ArgumentError, "not a Task or Executable: #{task}" end task end # Method enque. Enques the specified method from object with the inputs. @@ -508,18 +370,19 @@ def mq(object, method_name, *inputs) m = object._method(method_name) enq(m, *inputs) end - # Sets a sequence workflow pattern for the tasks such that the - # completion of a task enqueues the next task with it's results. - # Batched tasks will have the pattern set for each task in the - # batch. The current audited results are yielded to the block, - # if given, before the next task is enqued. + # Sets a sequence workflow pattern for the tasks; each task will enque + # the next task with it's results. # - # Executables may provided as well as tasks. - def sequence(*tasks) # :yields: _result + # Notes: + # - Batched tasks will have the pattern set for each task in the batch + # - The current audited results are yielded to the block, if given, + # before the next task is enqued. + # - Executables may provided as well as tasks. + def sequence(tasks) # :yields: _result current_task = tasks.shift tasks.each do |next_task| # simply pass results from one task to the next. current_task.on_complete do |_result| yield(_result) if block_given? @@ -527,44 +390,124 @@ end current_task = next_task end end - # Sets a fork workflow pattern for the tasks such that each of the - # targets will be enqueued with the results of the source when the - # source completes. Batched tasks will have the pattern set for each - # task in the batch. The source audited results are yielded to the - # block, if given, before the targets are enqued. + # Sets a fork workflow pattern for the source task; each target + # will enque the results of source. # - # Executables may provided as well as tasks. - def fork(source, *targets) # :yields: _result + # Notes: + # - Batched tasks will have the pattern set for each task in the batch + # - The current audited results are yielded to the block, if given, + # before the next task is enqued. + # - Executables may provided as well as tasks. + def fork(source, targets) # :yields: _result source.on_complete do |_result| targets.each do |target| yield(_result) if block_given? enq(target, _result) end end end - - # Sets a merge workflow pattern for the tasks such that the results - # of each source will be enqueued to the target when the source - # completes. Batched tasks will have the pattern set for each - # task in the batch. The source audited results are yielded to - # the block, if given, before the target is enqued. + + # Sets a simple merge workflow pattern for the source tasks. Each source + # enques target with it's result; no synchronization occurs, nor are + # results grouped before being sent to the target. # - # Executables may provided as well as tasks. - def merge(target, *sources) # :yields: _result + # Notes: + # - Batched tasks will have the pattern set for each task in the batch + # - The current audited results are yielded to the block, if given, + # before the next task is enqued. + # - Executables may provided as well as tasks. + def merge(target, sources) # :yields: _result sources.each do |source| # merging can use the existing audit trails... each distinct # input is getting sent to one place (the target) source.on_complete do |_result| yield(_result) if block_given? enq(target, _result) end end end + # Sets a synchronized merge workflow for the source tasks. Results from + # each source task are collected and enqued as a single group to the target. + # The target is not enqued until all sources have completed. Raises an + # error if a source returns twice before the target is enqued. + # + # Notes: + # - Batched tasks will have the pattern set for each task in the batch + # - The current audited results are yielded to the block, if given, + # before the next task is enqued. + # - Executables may provided as well as tasks. + # + #-- TODO: add notes on testing and the way results are received + # (ie as a single object) + def sync_merge(target, sources) # :yields: _result + group = Array.new(sources.length, nil) + sources.each_with_index do |source, index| + batch_map = Hash.new(0) + batch_length = if source.kind_of?(Support::Batchable) + source.batch.each_with_index {|obj, i| batch_map[obj] = i } + source.batch.length + else + 1 + end + + group[index] = Array.new(batch_length, nil) + + source.on_complete do |_result| + batch_index = batch_map[_result._current_source] + + if group[index][batch_index] != nil + raise "sync_merge collision... already got a result for #{_result._current_source}" + end + + group[index][batch_index] = _result + + unless group.flatten.include?(nil) + Support::Combinator.new(*group).each do |*combination| + # merge the source audits + _group_result = Support::Audit.merge(*combination) + + yield(_group_result) if block_given? + target.enq(_group_result) + end + + # reset the group array + group.collect! {|i| nil } + end + end + end + end + + # Sets a choice workflow pattern for the source task. When the + # source task completes, switch yields the audited result to the + # block which then returns the index of the target to enque with + # the results. No target will be enqued if the index is false or + # nil; an error is raised if no target can be found for the + # specified index. + # + # Notes: + # - Batched tasks will have the pattern set for each task in the batch + # - The current audited results are yielded to the block, if given, + # before the next task is enqued. + # - Executables may provided as well as tasks. + def switch(source, targets) # :yields: _result + source.on_complete do |_result| + if index = yield(_result) + unless target = targets[index] + raise "no switch target for index: #{index}" + end + + enq(target, _result) + else + aggregator.store(_result) + end + end + end + # Returns all aggregated, audited results for the specified tasks. # Results are joined into a single array. Arrays of tasks are # allowed as inputs. See results. def _results(*tasks) aggregator.retrieve_all(*tasks.flatten) @@ -597,124 +540,12 @@ false end # Sets the state of the application attr_writer :state - - # The thread on which run is executing tasks. - attr_accessor :run_thread - - # An array containing the execution threads in use by run. - attr_accessor :threads - - # A Queue containing multithread tasks waiting to be run - # on the execution threads. Nil if max_threads == 0 - attr_accessor :thread_queue - - private - def execution_loop - while true - case state - when State::STOP - break - when State::TERMINATE - # if an execution thread handles the termination error, - # then the thread may end up here -- terminated but still - # running. Raise another termination error to enter the - # termination (rescue) code. - raise TerminateError.new - end - - yield - end - end - - def clear_thread_queue - return unless thread_queue - - # clear the queue and enque the thread complete - # signals, so that the thread will exit normally - dequeued = [] - while !thread_queue.empty? - dequeued << thread_queue.deq - end - - # add dequeued tasks back, in order, to the task - # queue so no tasks get lost due to the stop - # - # BUG: this will result in an already-newly-queued - # task being promoted along with it's inputs - dequeued.reverse_each do |task, inputs| - # TODO: log about not executing - queue.unshift(task, inputs) unless task.nil? - end - end - - def clear_threads(raise_errors=true) - threads.synchronize do - errors = [] - return errors if threads.empty? - - # clears threads gracefully by enqueuing nils, to break - # the threads out of their loops, then waiting for the - # threads to work through the queue to the nils - # - threads.size.times { thread_queue.enq nil } - while true - # TODO -- add a time out? - - threads.dup.each do |thread| - next if thread.alive? - threads.delete(thread) - error = thread["error"] - - next if error.nil? - raise error if raise_errors - - errors << error - end - - break if threads.empty? - Thread.pass - end - - errors - end - end - - def start_thread - threads.synchronize do - # start a new thread and add it to threads. - # threads simply loop and wait for a task to - # be queued. the thread will block until a - # task is available (due to thread_queue.deq) - # - # TODO -- track thread index like? - # thread["index"] = threads.length - threads << Thread.new do - # TODO - log thread start - - begin - execution_loop do - m, inputs = thread_queue.deq - break if m.nil? - - # TODO: log execute task on thread # - execute(m, inputs) - end - rescue - # an unhandled error should immediately - # terminate all threads - terminate - Thread.current["error"] = $! - end - end - end - end - - # TerminateErrors are raised to kill executing tasks when terminate - # is called on an running App. They are handled by the run rescue code. + # TerminateErrors are raised to kill executing tasks when terminate is + # called on an running App. They are handled by the run rescue code. class TerminateError < RuntimeError end end end \ No newline at end of file