require 'logger' require 'tap/support/aggregator' require 'tap/support/executable_queue' module Tap module Support autoload(:Combinator, 'tap/support/combinator') end # App coordinates the setup and running of tasks, and provides an interface # to the application directory structure. App is convenient for use within # scripts and, with Env, provides the basis for the 'tap' command line # application. # # === Running Tasks # # All tasks have an App (by default App.instance) through which tasks access # access application-wide resources like the logger. Additionally, task # enque command are forwarded to App#enq: # # t1 = Task.new {|task, input| input += 1 } # t1.enq(0) # app.enq(t1, 1) # # app.run # app.results(t1) # => [1, 2] # # When a task completes, the results will either be passed to the task # on_complete block (if set) or be collected into an Aggregator; # aggregated results may be accessed per-task, as shown above. Task # on_complete blocks typically enque other tasks, allowing the # construction of imperative workflows: # # # clear the previous results # app.aggregator.clear # # t2 = Task.new {|task, input| input += 10 } # t1.on_complete {|_result| t2.enq(_result) } # # t1.enq 0 # t1.enq 10 # # app.run # app.results(t1) # => [] # app.results(t2) # => [11, 21] # # Here t1 has no results because the on_complete block passed them to t2 in # a simple sequence. # # ==== Dependencies # # Tasks allow the construction of dependency-based workflows as well; tasks # may be set to depend on other tasks such that the dependent task only # executes after the dependencies have been resolved (ie executed with a # given set of inputs). # # array = [] # t1 = Task.new {|task, *inputs| array << inputs } # t2 = Task.new {|task, *inputs| array << inputs } # # t1.depends_on(t2,1,2,3) # t1.enq(4,5,6) # # app.run # array # => [[1,2,3], [4,5,6]] # # Once a dependency is resolved, it will not execute again: # # t1.enq(7,8) # app.run # array # => [[1,2,3], [4,5,6], [7,8]] # # ==== Batching # # Tasks can be batched, allowing the same input to be enqued to multiple # tasks at once. # # t1 = Task.new {|task, input| input += 1 } # t2 = Task.new {|task, input| input += 10 } # Task.batch(t1, t2) # => [t1, t2] # # t1.enq 0 # # app.run # app.results(t1) # => [1] # app.results(t2) # => [10] # # ==== Executables # # App can use any Executable object in place of a task. One way to initialize # an Executable for a method is to use the Object#_method defined by Tap. The # result can be enqued and incorporated into workflows, but they cannot be # batched. # # The mq (method enq) method generates and enques the method in one step. # # array = [] # m = array._method(:push) # # app.enq(m, 1) # app.mq(array, :push, 2) # # array.empty? # => true # app.run # array # => [1, 2] # # === Auditing # # All results generated by executable methods are audited to track how a given # input evolves during a workflow. # # To illustrate auditing, consider a workflow that uses the 'add_one' method # to add one to an input until the result is 3, then adds five more with the # 'add_five' method. The final result should always be 8. # # t1 = Tap::Task.new {|task, input| input += 1 } # t1.name = "add_one" # # t2 = Tap::Task.new {|task, input| input += 5 } # t2.name = "add_five" # # t1.on_complete do |_result| # # _result is the audit; use the _current method # # to get the current value in the audit trail # # _result._current < 3 ? t1.enq(_result) : t2.enq(_result) # end # # t1.enq(0) # t1.enq(1) # t1.enq(2) # # app.run # app.results(t2) # => [8,8,8] # # Although the results are indistinguishable, each achieved the final value # through a different series of tasks. With auditing you can see how each # input came to the final value of 8: # # # app.results returns the actual result values # # app._results returns the audits for these values # app._results(t2).each do |_result| # puts "How #{_result._original} became #{_result._current}:" # puts _result._to_s # puts # end # # Prints: # # How 2 became 8: # o-[] 2 # o-[add_one] 3 # o-[add_five] 8 # # How 1 became 8: # o-[] 1 # o-[add_one] 2 # o-[add_one] 3 # o-[add_five] 8 # # How 0 became 8: # o-[] 0 # o-[add_one] 1 # o-[add_one] 2 # o-[add_one] 3 # o-[add_five] 8 # # See Tap::Support::Audit for more details. class App < Root class << self # Sets the current app instance attr_writer :instance # Returns the current instance of App. If no instance has been set, # then a new App with the default configuration will be initialized. def instance @instance ||= App.new end end # The shared logger attr_reader :logger # The application queue attr_reader :queue # The state of the application (see App::State) attr_reader :state # A Tap::Support::Aggregator to collect the results of # methods that have no on_complete block attr_reader :aggregator config :debug, false, &c.flag # Flag debugging config :force, false, &c.flag # Force execution at checkpoints config :quiet, false, &c.flag # Suppress logging config :verbose, false, &c.flag # Enables extra logging (overrides quiet) # The constants defining the possible App states. module State READY = 0 RUN = 1 STOP = 2 TERMINATE = 3 module_function # Returns the string corresponding to the input state value. # Returns nil for unknown states. # # State.state_str(0) # => 'READY' # State.state_str(12) # => nil def state_str(state) constants.inject(nil) {|str, s| const_get(s) == state ? s.to_s : str} end end # Creates a new App with the given configuration. def initialize(config={}, logger=DEFAULT_LOGGER) super() @state = State::READY @queue = Support::ExecutableQueue.new @aggregator = Support::Aggregator.new initialize_config(config) self.logger = logger end # The default App logger writes to $stdout at level INFO. DEFAULT_LOGGER = Logger.new($stdout) DEFAULT_LOGGER.level = Logger::INFO DEFAULT_LOGGER.formatter = lambda do |severity, time, progname, msg| " %s[%s] %18s %s\n" % [severity[0,1], time.strftime('%H:%M:%S') , progname || '--' , msg] end # True if debug or the global variable $DEBUG is true. def debug? debug || $DEBUG end # Sets the current logger. The logger level is set to Logger::DEBUG if # debug? is true. def logger=(logger) unless logger.nil? logger.level = Logger::DEBUG if debug? end @logger = logger end # Logs the action and message at the input level (default INFO). # Logging is suppressed if quiet is true. def log(action, msg="", level=Logger::INFO) logger.add(level, msg, action.to_s) if !quiet || verbose end # Returns the configuration filepath for the specified task name, # File.join(app['config'], task_name + ".yml"). Returns nil if # task_name is nil. def config_filepath(name) name == nil ? nil : filepath('config', "#{name}.yml") end # # Execution methods # # Executes the input Executable with the inputs. Stores the result in # aggregator unless an on_complete block is set. Returns the audited # result. def execute(m, inputs) _result = m._execute(*inputs) aggregator.store(_result) unless m.on_complete_block _result end # Sets state = State::READY unless the app is running. Returns self. def ready self.state = State::READY unless self.state == State::RUN self end # Sequentially calls execute with the Executable methods and inputs in # queue; run continues until the queue is empty and then returns self. # Calls to run when already running will return immediately. # # Run checks the state of self before executing a method. If the state is # changed to State::STOP, then no more methods will be executed; currently # running methods will continute to completion. If the state is changed to # State::TERMINATE then no more methods will be executed and currently # running methods will be discontinued as described in terminate. def run return self unless state == State::READY self.state = State::RUN # TODO: log starting run begin until queue.empty? || state != State::RUN execute(*queue.deq) end rescue(TerminateError) # gracefully fail for termination errors rescue(Exception) # handle other errors accordingly raise if debug? log($!.class, $!.message) ensure self.state = State::READY end # TODO: log run complete self end # Signals a running application to stop executing tasks in the # queue by setting state = State::STOP. The task currently # executing will continue uninterrupted to completion. # # Does nothing unless state is State::RUN. def stop self.state = State::STOP if self.state == State::RUN self end # Signals a running application to terminate execution by setting # state = State::TERMINATE. In this state, an executing task # will then raise a TerminateError upon check_terminate, thus # allowing the invocation of task-specific termination, perhaps # performing rollbacks. (see Tap::Task#check_terminate). # # Does nothing if state == State::READY. def terminate self.state = State::TERMINATE unless self.state == State::READY self end # Returns an information string for the App. # # App.instance.info # => 'state: 0 (READY) queue: 0 results: 0' # # Provided information: # # state:: the integer and string values of self.state # queue:: the number of methods currently in the queue # results:: the total number of results in aggregator def info "state: #{state} (#{State.state_str(state)}) queue: #{queue.size} results: #{aggregator.size}" end # Enques the task with the inputs. If the task is batched, then each # task in task.batch will be enqued with the inputs. Returns task. # # An Executable may provided instead of a task. def enq(task, *inputs) case task when Tap::Task raise ArgumentError, "not assigned to enqueing app: #{task}" unless task.app == self task.enq(*inputs) when Support::Executable queue.enq(task, inputs) else raise ArgumentError, "not a Task or Executable: #{task}" end task end # Method enque. Enques the specified method from object with the inputs. # Returns the enqued method. def mq(object, method_name, *inputs) m = object._method(method_name) enq(m, *inputs) end # Sets a sequence workflow pattern for the tasks; each task will enque # the next task with it's results. # # Notes: # - Batched tasks will have the pattern set for each task in the batch # - The current audited results are yielded to the block, if given, # before the next task is enqued. # - Executables may provided as well as tasks. def sequence(tasks) # :yields: _result current_task = tasks.shift tasks.each do |next_task| # simply pass results from one task to the next. current_task.on_complete do |_result| yield(_result) if block_given? enq(next_task, _result) end current_task = next_task end end # Sets a fork workflow pattern for the source task; each target # will enque the results of source. # # Notes: # - Batched tasks will have the pattern set for each task in the batch # - The current audited results are yielded to the block, if given, # before the next task is enqued. # - Executables may provided as well as tasks. def fork(source, targets) # :yields: _result source.on_complete do |_result| targets.each do |target| yield(_result) if block_given? enq(target, _result) end end end # Sets a simple merge workflow pattern for the source tasks. Each source # enques target with it's result; no synchronization occurs, nor are # results grouped before being sent to the target. # # Notes: # - Batched tasks will have the pattern set for each task in the batch # - The current audited results are yielded to the block, if given, # before the next task is enqued. # - Executables may provided as well as tasks. def merge(target, sources) # :yields: _result sources.each do |source| # merging can use the existing audit trails... each distinct # input is getting sent to one place (the target) source.on_complete do |_result| yield(_result) if block_given? enq(target, _result) end end end # Sets a synchronized merge workflow for the source tasks. Results from # each source task are collected and enqued as a single group to the target. # The target is not enqued until all sources have completed. Raises an # error if a source returns twice before the target is enqued. # # Notes: # - Batched tasks will have the pattern set for each task in the batch # - The current audited results are yielded to the block, if given, # before the next task is enqued. # - Executables may provided as well as tasks. # #-- TODO: add notes on testing and the way results are received # (ie as a single object) def sync_merge(target, sources) # :yields: _result group = Array.new(sources.length, nil) sources.each_with_index do |source, index| batch_map = Hash.new(0) batch_length = if source.kind_of?(Support::Batchable) source.batch.each_with_index {|obj, i| batch_map[obj] = i } source.batch.length else 1 end group[index] = Array.new(batch_length, nil) source.on_complete do |_result| batch_index = batch_map[_result._current_source] if group[index][batch_index] != nil raise "sync_merge collision... already got a result for #{_result._current_source}" end group[index][batch_index] = _result unless group.flatten.include?(nil) Support::Combinator.new(*group).each do |*combination| # merge the source audits _group_result = Support::Audit.merge(*combination) yield(_group_result) if block_given? target.enq(_group_result) end # reset the group array group.collect! {|i| nil } end end end end # Sets a choice workflow pattern for the source task. When the # source task completes, switch yields the audited result to the # block which then returns the index of the target to enque with # the results. No target will be enqued if the index is false or # nil; an error is raised if no target can be found for the # specified index. # # Notes: # - Batched tasks will have the pattern set for each task in the batch # - The current audited results are yielded to the block, if given, # before the next task is enqued. # - Executables may provided as well as tasks. def switch(source, targets) # :yields: _result source.on_complete do |_result| if index = yield(_result) unless target = targets[index] raise "no switch target for index: #{index}" end enq(target, _result) else aggregator.store(_result) end end end # Returns all aggregated, audited results for the specified tasks. # Results are joined into a single array. Arrays of tasks are # allowed as inputs. See results. def _results(*tasks) aggregator.retrieve_all(*tasks.flatten) end # Returns all aggregated results for the specified tasks. Results are # joined into a single array. Arrays of tasks are allowed as inputs. # # t1 = Task.new {|task, input| input += 1 } # t2 = Task.new {|task, input| input += 10 } # t3 = t2.initialize_batch_obj # # t1.enq(0) # t2.enq(1) # # app.run # app.results(t1, t2.batch) # => [1, 11, 11] # app.results(t2, t1) # => [11, 1] # def results(*tasks) _results(tasks).collect {|_result| _result._current} end protected # A hook for handling unknown configurations in subclasses, called from # configure. If handle_configuration evaluates to false, then configure # raises an error. def handle_configuation(key, value) false end # Sets the state of the application attr_writer :state # TerminateErrors are raised to kill executing tasks when terminate is # called on an running App. They are handled by the run rescue code. class TerminateError < RuntimeError end end end