lib/tap/support/executable.rb in bahuvrihi-tap-0.10.7 vs lib/tap/support/executable.rb in bahuvrihi-tap-0.10.8

- old
+ new

@@ -1,28 +1,26 @@ require 'tap/support/audit' module Tap module Support - # Executable wraps methods to make them executable by App. Methods are - # wrapped by extending the object that receives them; the easiest way - # to make an object executable is to use Object#_method. + # Executable wraps objects to make them executable by App. module Executable - # The Tap::App the Executable belongs to. + # The App receiving self during enq attr_reader :app - # The method called when an Executable is executed via _execute + # The method called during _execute attr_reader :_method_name - # Stores the on complete block + # The block called when _execute completes attr_reader :on_complete_block - # An array of dependency indexes that will be resolved on _execute + # An array of dependency indicies that will be resolved on _execute attr_reader :dependencies - # The batch for the Executable. + # The batch for self attr_reader :batch public # Extends obj with Executable and sets up all required variables. The @@ -39,12 +37,12 @@ obj end # Initializes a new batch object and adds the object to batch. # The object will be a duplicate of self. (Note this method - # can raise an error for objects that don't support dup, like - # methods generated by Object#_method). + # can raise an error for objects that don't support dup, + # notably Method objects generated by Object#_method). def initialize_batch_obj obj = self.dup if obj.kind_of?(Executable) batch << obj @@ -53,16 +51,16 @@ Executable.initialize(obj, _method_name, app, batch, dependencies, &on_complete_block) end end # Returns true if the batch size is greater than one - # (the one being self). + # (the one is assumed to be self). def batched? batch.length > 1 end - # Returns the index of the self in batch. + # Returns the index of self in batch. def batch_index batch.index(self) end # Merges the batches for self and the specified Executables, @@ -123,13 +121,13 @@ merged.uniq! batches.each {|batch| batch.concat(merged) } self end - # Enqueues self and self.batch to app with the inputs. The number - # of inputs provided should match the number of inputs specified - # by the arity of the _method_name method. + # Enqueues each member of batch (and implicitly self) to app with the + # inputs. The number of inputs provided should match the number of + # inputs for the _method_name method. def enq(*inputs) batch.each do |executable| executable.unbatched_enq(*inputs) end self @@ -139,153 +137,114 @@ def unbatched_enq(*inputs) app.queue.enq(self, inputs) self end - # Sets a block to receive the results of _execute. Raises an - # error if an on_complete block is already set. Override an - # existing on_complete block by specifying override = true. + # Sets a block to receive the results of _execute for each member of + # batch (and implicitly self). Raises an error if on_complete_block + # is already set within the batch. Override the existing + # on_complete_block by specifying override = true. # - # Note the block recieves an audited result and not - # the result itself (see Audit for more information). + # Note: the block recieves an audited result and not the result + # itself (see Audit for more information). def on_complete(override=false, &block) # :yields: _result batch.each do |executable| executable.unbatched_on_complete(override, &block) end self end - # Like on_complete, but only sets on_complete_block for self. + # Like on_complete, but only sets the on_complete_block for self. def unbatched_on_complete(override=false, &block) # :yields: _result unless on_complete_block == nil || override raise "on_complete_block already set: #{self}" end @on_complete_block = block self end - # Sets a sequence workflow pattern for the tasks; each task will enque - # the next task with it's results. - # - # Notes: - # - Batched tasks will have the pattern set for each task in the batch - # - The current audited results are yielded to the block, if given, - # before the next task is enqued. - # - Executables may provided as well as tasks. + # Sets a sequence workflow pattern for the tasks; each task + # enques the next task with it's results, starting with self. + # See Joins::Sequence. def sequence(*tasks, &block) # :yields: _result Joins::Sequence.join(self, tasks, &block) end - # Sets a fork workflow pattern for the source task; each target - # will enque the results of source. - # - # Notes: - # - Batched tasks will have the pattern set for each task in the batch - # - The current audited results are yielded to the block, if given, - # before the next task is enqued. - # - Executables may provided as well as tasks. + # Sets a fork workflow pattern for self; each target + # will enque the results of self. See Joins::Fork. def fork(*targets, &block) # :yields: _result Joins::Fork.join(self, targets, &block) end - # Sets a simple merge workflow pattern for the source tasks. Each source - # enques target with it's result; no synchronization occurs, nor are - # results grouped before being sent to the target. - # - # Notes: - # - Batched tasks will have the pattern set for each task in the batch - # - The current audited results are yielded to the block, if given, - # before the next task is enqued. - # - Executables may provided as well as tasks. + # Sets a simple merge workflow pattern for the source tasks. Each + # source enques self with it's result; no synchronization occurs, + # nor are results grouped before being enqued. See Joins::Merge. def merge(*sources, &block) # :yields: _result Joins::Merge.join(self, sources, &block) end - # Sets a synchronized merge workflow for the source tasks. Results from - # each source task are collected and enqued as a single group to the target. - # The target is not enqued until all sources have completed. Raises an - # error if a source returns twice before the target is enqued. + # Sets a synchronized merge workflow for the source tasks. Results + # from each source are collected and enqued as a single group to + # self. The collective results are not enqued until all sources + # have completed. See Joins::SyncMerge. # - # Notes: - # - Batched tasks will have the pattern set for each task in the batch - # - The current audited results are yielded to the block, if given, - # before the next task is enqued. - # - Executables may provided as well as tasks. - # - #-- TODO: add notes on testing and the way results are received - # (ie as a single object) - # - # - note stacking with a sync merge is *somewhat* pointless; - # it does not change the run order, but of course it does - # allow other tasks to be interleaved. - # + # Raises an error if a source returns twice before the target is enqued. def sync_merge(*sources, &block) # :yields: _result Joins::SyncMerge.join(self, sources, &block) end - # Sets a choice workflow pattern for the source task. When the - # source task completes, switch yields the audited result to the - # block which then returns the index of the target to enque with - # the results. No target will be enqued if the index is false or - # nil; an error is raised if no target can be found for the - # specified index. - # - # Notes: - # - Batched tasks will have the pattern set for each task in the batch - # - The current audited results are yielded to the block, if given, - # before the next task is enqued. - # - Executables may provided as well as tasks. + # Sets a switch workflow pattern for self. When _execute completes, + # switch yields the audited result to the block which should return + # the index of the target to enque with the results. No target will + # be enqued if the index is false or nil; an error is raised if no + # target can be found for the specified index. See Joins::Switch. def switch(*targets, &block) # :yields: _result Joins::Switch.join(self, targets, &block) end - def unbatched_depends_on(dependency, *inputs) - raise ArgumentError, "not an Executable: #{dependency}" unless dependency.kind_of?(Executable) - raise ArgumentError, "cannot depend on self" if dependency == self - - index = app.dependencies.register(dependency, inputs) - dependencies << index unless dependencies.include?(index) - index - end - - # Adds the dependency to self, making self dependent on the dependency. - # The dependency will be resolved by calling dependency._execute with - # the input arguments during resolve_dependencies. - def depends_on(dependency, *inputs) - index = unbatched_depends_on(dependency, *inputs) + # Adds the dependency to each member in batch (and implicitly self). + # The dependency will be resolved with the input arguments during + # _execute, using resolve_dependencies. + def depends_on(dependency) batch.each do |e| - e.dependencies << index unless e.dependencies.include?(index) + e.unbatched_depends_on(dependency) end - index + self end - # Resolves dependencies by calling dependency._execute with - # the dependency arguments. (See Dependable#resolve). + # Like depends_on, but only adds the dependency to self. + def unbatched_depends_on(dependency) + raise ArgumentError, "cannot depend on self" if dependency == self + + app.dependencies.register(dependency) + dependencies << dependency unless dependencies.include?(dependency) + self + end + + # Resolves dependencies. (See Dependency#resolve). def resolve_dependencies - app.dependencies.resolve(dependencies) + dependencies.each {|dependency| dependency.resolve } self end - # Resets dependencies so they will be re-resolved on resolve_dependencies. - # (See Dependable#reset). + # Resets dependencies so they will be re-resolved on + # resolve_dependencies. (See Dependency#reset). def reset_dependencies - app.dependencies.reset(dependencies) + dependencies.each {|dependency| dependency.reset } self end - # Auditing method call. Executes _method_name for self, but audits - # the result. Sends the audited result to the on_complete_block if set. + # Auditing method call. Resolves dependencies, executes _method_name, + # and sends the audited result to the on_complete_block (if set). # # Audits are initialized in the follwing manner: - # no inputs:: create a new, empty Audit. The first value of the audit - # will be the result of call - # one input:: forks the input if it is an audit, otherwise initializes - # a new audit using the input - # multiple inputs:: merges the inputs into a new Audit. + # no inputs:: Creates a new, empty Audit. The first value of the audit + # will be the result of call. + # one input:: Forks the input if it is an audit, otherwise initializes + # a new audit using the input. + # multiple inputs:: Merges the inputs into a new Audit. # - # Dependencies are resolved using resolve_dependencies before - # _method_name is executed. def _execute(*inputs) resolve_dependencies audit = case inputs.length when 0 then Audit.new @@ -315,10 +274,19 @@ on_complete_block ? on_complete_block.call(audit) : app.aggregator.store(audit) audit end + # Raises a TerminateError if app.state == State::TERMINATE. + # check_terminate may be called at any time to provide a + # breakpoint in long-running processes. + def check_terminate + if app.state == App::State::TERMINATE + raise App::TerminateError.new + end + end + def inspect "#<#{self.class.to_s}:#{object_id} _method: #{_method_name} batch_length: #{batch.length} app: #{app}>" end end @@ -339,13 +307,14 @@ # # array # => [[1],[2,3]] # class Object - # Initializes a Tap::Support::Executable using the Method returned by - # Object#method(method_name), setting the on_complete block as specified. + # Initializes a Tap::Support::Executable using the object returned by + # Object#method(method_name). + # # Returns nil if Object#method returns nil. - def _method(method_name, app=Tap::App.instance) # :yields: _result + def _method(method_name, app=Tap::App.instance) return nil unless m = method(method_name) Tap::Support::Executable.initialize(m, :call, app) end end \ No newline at end of file