lib/tap/support/executable.rb in bahuvrihi-tap-0.10.7 vs lib/tap/support/executable.rb in bahuvrihi-tap-0.10.8
- old
+ new
@@ -1,28 +1,26 @@
require 'tap/support/audit'
module Tap
module Support
- # Executable wraps methods to make them executable by App. Methods are
- # wrapped by extending the object that receives them; the easiest way
- # to make an object executable is to use Object#_method.
+ # Executable wraps objects to make them executable by App.
module Executable
- # The Tap::App the Executable belongs to.
+ # The App receiving self during enq
attr_reader :app
- # The method called when an Executable is executed via _execute
+ # The method called during _execute
attr_reader :_method_name
- # Stores the on complete block
+ # The block called when _execute completes
attr_reader :on_complete_block
- # An array of dependency indexes that will be resolved on _execute
+ # An array of dependency indicies that will be resolved on _execute
attr_reader :dependencies
- # The batch for the Executable.
+ # The batch for self
attr_reader :batch
public
# Extends obj with Executable and sets up all required variables. The
@@ -39,12 +37,12 @@
obj
end
# Initializes a new batch object and adds the object to batch.
# The object will be a duplicate of self. (Note this method
- # can raise an error for objects that don't support dup, like
- # methods generated by Object#_method).
+ # can raise an error for objects that don't support dup,
+ # notably Method objects generated by Object#_method).
def initialize_batch_obj
obj = self.dup
if obj.kind_of?(Executable)
batch << obj
@@ -53,16 +51,16 @@
Executable.initialize(obj, _method_name, app, batch, dependencies, &on_complete_block)
end
end
# Returns true if the batch size is greater than one
- # (the one being self).
+ # (the one is assumed to be self).
def batched?
batch.length > 1
end
- # Returns the index of the self in batch.
+ # Returns the index of self in batch.
def batch_index
batch.index(self)
end
# Merges the batches for self and the specified Executables,
@@ -123,13 +121,13 @@
merged.uniq!
batches.each {|batch| batch.concat(merged) }
self
end
- # Enqueues self and self.batch to app with the inputs. The number
- # of inputs provided should match the number of inputs specified
- # by the arity of the _method_name method.
+ # Enqueues each member of batch (and implicitly self) to app with the
+ # inputs. The number of inputs provided should match the number of
+ # inputs for the _method_name method.
def enq(*inputs)
batch.each do |executable|
executable.unbatched_enq(*inputs)
end
self
@@ -139,153 +137,114 @@
def unbatched_enq(*inputs)
app.queue.enq(self, inputs)
self
end
- # Sets a block to receive the results of _execute. Raises an
- # error if an on_complete block is already set. Override an
- # existing on_complete block by specifying override = true.
+ # Sets a block to receive the results of _execute for each member of
+ # batch (and implicitly self). Raises an error if on_complete_block
+ # is already set within the batch. Override the existing
+ # on_complete_block by specifying override = true.
#
- # Note the block recieves an audited result and not
- # the result itself (see Audit for more information).
+ # Note: the block recieves an audited result and not the result
+ # itself (see Audit for more information).
def on_complete(override=false, &block) # :yields: _result
batch.each do |executable|
executable.unbatched_on_complete(override, &block)
end
self
end
- # Like on_complete, but only sets on_complete_block for self.
+ # Like on_complete, but only sets the on_complete_block for self.
def unbatched_on_complete(override=false, &block) # :yields: _result
unless on_complete_block == nil || override
raise "on_complete_block already set: #{self}"
end
@on_complete_block = block
self
end
- # Sets a sequence workflow pattern for the tasks; each task will enque
- # the next task with it's results.
- #
- # Notes:
- # - Batched tasks will have the pattern set for each task in the batch
- # - The current audited results are yielded to the block, if given,
- # before the next task is enqued.
- # - Executables may provided as well as tasks.
+ # Sets a sequence workflow pattern for the tasks; each task
+ # enques the next task with it's results, starting with self.
+ # See Joins::Sequence.
def sequence(*tasks, &block) # :yields: _result
Joins::Sequence.join(self, tasks, &block)
end
- # Sets a fork workflow pattern for the source task; each target
- # will enque the results of source.
- #
- # Notes:
- # - Batched tasks will have the pattern set for each task in the batch
- # - The current audited results are yielded to the block, if given,
- # before the next task is enqued.
- # - Executables may provided as well as tasks.
+ # Sets a fork workflow pattern for self; each target
+ # will enque the results of self. See Joins::Fork.
def fork(*targets, &block) # :yields: _result
Joins::Fork.join(self, targets, &block)
end
- # Sets a simple merge workflow pattern for the source tasks. Each source
- # enques target with it's result; no synchronization occurs, nor are
- # results grouped before being sent to the target.
- #
- # Notes:
- # - Batched tasks will have the pattern set for each task in the batch
- # - The current audited results are yielded to the block, if given,
- # before the next task is enqued.
- # - Executables may provided as well as tasks.
+ # Sets a simple merge workflow pattern for the source tasks. Each
+ # source enques self with it's result; no synchronization occurs,
+ # nor are results grouped before being enqued. See Joins::Merge.
def merge(*sources, &block) # :yields: _result
Joins::Merge.join(self, sources, &block)
end
- # Sets a synchronized merge workflow for the source tasks. Results from
- # each source task are collected and enqued as a single group to the target.
- # The target is not enqued until all sources have completed. Raises an
- # error if a source returns twice before the target is enqued.
+ # Sets a synchronized merge workflow for the source tasks. Results
+ # from each source are collected and enqued as a single group to
+ # self. The collective results are not enqued until all sources
+ # have completed. See Joins::SyncMerge.
#
- # Notes:
- # - Batched tasks will have the pattern set for each task in the batch
- # - The current audited results are yielded to the block, if given,
- # before the next task is enqued.
- # - Executables may provided as well as tasks.
- #
- #-- TODO: add notes on testing and the way results are received
- # (ie as a single object)
- #
- # - note stacking with a sync merge is *somewhat* pointless;
- # it does not change the run order, but of course it does
- # allow other tasks to be interleaved.
- #
+ # Raises an error if a source returns twice before the target is enqued.
def sync_merge(*sources, &block) # :yields: _result
Joins::SyncMerge.join(self, sources, &block)
end
- # Sets a choice workflow pattern for the source task. When the
- # source task completes, switch yields the audited result to the
- # block which then returns the index of the target to enque with
- # the results. No target will be enqued if the index is false or
- # nil; an error is raised if no target can be found for the
- # specified index.
- #
- # Notes:
- # - Batched tasks will have the pattern set for each task in the batch
- # - The current audited results are yielded to the block, if given,
- # before the next task is enqued.
- # - Executables may provided as well as tasks.
+ # Sets a switch workflow pattern for self. When _execute completes,
+ # switch yields the audited result to the block which should return
+ # the index of the target to enque with the results. No target will
+ # be enqued if the index is false or nil; an error is raised if no
+ # target can be found for the specified index. See Joins::Switch.
def switch(*targets, &block) # :yields: _result
Joins::Switch.join(self, targets, &block)
end
- def unbatched_depends_on(dependency, *inputs)
- raise ArgumentError, "not an Executable: #{dependency}" unless dependency.kind_of?(Executable)
- raise ArgumentError, "cannot depend on self" if dependency == self
-
- index = app.dependencies.register(dependency, inputs)
- dependencies << index unless dependencies.include?(index)
- index
- end
-
- # Adds the dependency to self, making self dependent on the dependency.
- # The dependency will be resolved by calling dependency._execute with
- # the input arguments during resolve_dependencies.
- def depends_on(dependency, *inputs)
- index = unbatched_depends_on(dependency, *inputs)
+ # Adds the dependency to each member in batch (and implicitly self).
+ # The dependency will be resolved with the input arguments during
+ # _execute, using resolve_dependencies.
+ def depends_on(dependency)
batch.each do |e|
- e.dependencies << index unless e.dependencies.include?(index)
+ e.unbatched_depends_on(dependency)
end
- index
+ self
end
- # Resolves dependencies by calling dependency._execute with
- # the dependency arguments. (See Dependable#resolve).
+ # Like depends_on, but only adds the dependency to self.
+ def unbatched_depends_on(dependency)
+ raise ArgumentError, "cannot depend on self" if dependency == self
+
+ app.dependencies.register(dependency)
+ dependencies << dependency unless dependencies.include?(dependency)
+ self
+ end
+
+ # Resolves dependencies. (See Dependency#resolve).
def resolve_dependencies
- app.dependencies.resolve(dependencies)
+ dependencies.each {|dependency| dependency.resolve }
self
end
- # Resets dependencies so they will be re-resolved on resolve_dependencies.
- # (See Dependable#reset).
+ # Resets dependencies so they will be re-resolved on
+ # resolve_dependencies. (See Dependency#reset).
def reset_dependencies
- app.dependencies.reset(dependencies)
+ dependencies.each {|dependency| dependency.reset }
self
end
- # Auditing method call. Executes _method_name for self, but audits
- # the result. Sends the audited result to the on_complete_block if set.
+ # Auditing method call. Resolves dependencies, executes _method_name,
+ # and sends the audited result to the on_complete_block (if set).
#
# Audits are initialized in the follwing manner:
- # no inputs:: create a new, empty Audit. The first value of the audit
- # will be the result of call
- # one input:: forks the input if it is an audit, otherwise initializes
- # a new audit using the input
- # multiple inputs:: merges the inputs into a new Audit.
+ # no inputs:: Creates a new, empty Audit. The first value of the audit
+ # will be the result of call.
+ # one input:: Forks the input if it is an audit, otherwise initializes
+ # a new audit using the input.
+ # multiple inputs:: Merges the inputs into a new Audit.
#
- # Dependencies are resolved using resolve_dependencies before
- # _method_name is executed.
def _execute(*inputs)
resolve_dependencies
audit = case inputs.length
when 0 then Audit.new
@@ -315,10 +274,19 @@
on_complete_block ? on_complete_block.call(audit) : app.aggregator.store(audit)
audit
end
+ # Raises a TerminateError if app.state == State::TERMINATE.
+ # check_terminate may be called at any time to provide a
+ # breakpoint in long-running processes.
+ def check_terminate
+ if app.state == App::State::TERMINATE
+ raise App::TerminateError.new
+ end
+ end
+
def inspect
"#<#{self.class.to_s}:#{object_id} _method: #{_method_name} batch_length: #{batch.length} app: #{app}>"
end
end
@@ -339,13 +307,14 @@
#
# array # => [[1],[2,3]]
#
class Object
- # Initializes a Tap::Support::Executable using the Method returned by
- # Object#method(method_name), setting the on_complete block as specified.
+ # Initializes a Tap::Support::Executable using the object returned by
+ # Object#method(method_name).
+ #
# Returns nil if Object#method returns nil.
- def _method(method_name, app=Tap::App.instance) # :yields: _result
+ def _method(method_name, app=Tap::App.instance)
return nil unless m = method(method_name)
Tap::Support::Executable.initialize(m, :call, app)
end
end
\ No newline at end of file