lib/tap/support/executable.rb in bahuvrihi-tap-0.10.6 vs lib/tap/support/executable.rb in bahuvrihi-tap-0.10.7
- old
+ new
@@ -1,77 +1,279 @@
require 'tap/support/audit'
-require 'tap/support/dependable'
module Tap
module Support
# Executable wraps methods to make them executable by App. Methods are
# wrapped by extending the object that receives them; the easiest way
# to make an object executable is to use Object#_method.
module Executable
- extend Dependable
+ # The Tap::App the Executable belongs to.
+ attr_reader :app
+
# The method called when an Executable is executed via _execute
attr_reader :_method_name
# Stores the on complete block
attr_reader :on_complete_block
# An array of dependency indexes that will be resolved on _execute
attr_reader :dependencies
+ # The batch for the Executable.
+ attr_reader :batch
+
public
# Extends obj with Executable and sets up all required variables. The
# specified method will be called on _execute.
- def self.initialize(obj, method_name, &on_complete_block)
+ def self.initialize(obj, method_name, app=App.instance, batch=[], dependencies=[], &on_complete_block)
obj.extend Executable
+ obj.instance_variable_set(:@app, app)
obj.instance_variable_set(:@_method_name, method_name)
obj.instance_variable_set(:@on_complete_block, on_complete_block)
- obj.instance_variable_set(:@dependencies, [])
+ obj.instance_variable_set(:@dependencies, dependencies)
+ obj.instance_variable_set(:@batch, batch)
+ batch << obj
+
obj
end
- # Sets a block to receive the results of _execute. Raises an error
- # if an on_complete block is already set. Override an existing
- # on_complete block by specifying override = true.
+ # Initializes a new batch object and adds the object to batch.
+ # The object will be a duplicate of self. (Note this method
+ # can raise an error for objects that don't support dup, like
+ # methods generated by Object#_method).
+ def initialize_batch_obj
+ obj = self.dup
+
+ if obj.kind_of?(Executable)
+ batch << obj
+ obj
+ else
+ Executable.initialize(obj, _method_name, app, batch, dependencies, &on_complete_block)
+ end
+ end
+
+ # Returns true if the batch size is greater than one
+ # (the one being self).
+ def batched?
+ batch.length > 1
+ end
+
+ # Returns the index of the self in batch.
+ def batch_index
+ batch.index(self)
+ end
+
+ # Merges the batches for self and the specified Executables,
+ # removing duplicates.
#
+ # class BatchExecutable
+ # include Tap::Support::Executable
+ # def initialize(batch=[])
+ # @batch = batch
+ # batch << self
+ # end
+ # end
+ #
+ # b1 = BatchExecutable.new
+ # b2 = BatchExecutable.new
+ # b3 = BatchExecutable.new
+ #
+ # b1.batch_with(b2, b3)
+ # b1.batch # => [b1, b2, b3]
+ # b3.batch # => [b1, b2, b3]
+ #
+ # Note that batch_with is not recursive (ie it does not
+ # merge the batches of each member in the batch):
+ #
+ # b4 = BatchExecutable.new
+ # b4.batch_with(b3)
+ #
+ # b4.batch # => [b4, b1, b2, b3]
+ # b3.batch # => [b4, b1, b2, b3]
+ # b2.batch # => [b1, b2, b3]
+ # b1.batch # => [b1, b2, b3]
+ #
+ # However it does affect all objects that share the same
+ # underlying batch:
+ #
+ # b5 = BatchExecutable.new(b1.batch)
+ # b6 = BatchExecutable.new
+ #
+ # b5.batch.object_id # => b1.batch.object_id
+ # b5.batch # => [b1, b2, b3, b5]
+ #
+ # b5.batch_with(b6)
+ #
+ # b5.batch # => [b1, b2, b3, b5, b6]
+ # b1.batch # => [b1, b2, b3, b5, b6]
+ #
+ # Returns self.
+ def batch_with(*executables)
+ batches = [batch] + executables.collect {|executable| executable.batch }
+ batches.uniq!
+
+ merged = []
+ batches.each do |batch|
+ merged.concat(batch)
+ batch.clear
+ end
+
+ merged.uniq!
+ batches.each {|batch| batch.concat(merged) }
+ self
+ end
+
+ # Enqueues self and self.batch to app with the inputs. The number
+ # of inputs provided should match the number of inputs specified
+ # by the arity of the _method_name method.
+ def enq(*inputs)
+ batch.each do |executable|
+ executable.unbatched_enq(*inputs)
+ end
+ self
+ end
+
+ # Like enq, but only enques self.
+ def unbatched_enq(*inputs)
+ app.queue.enq(self, inputs)
+ self
+ end
+
+ # Sets a block to receive the results of _execute. Raises an
+ # error if an on_complete block is already set. Override an
+ # existing on_complete block by specifying override = true.
+ #
# Note the block recieves an audited result and not
# the result itself (see Audit for more information).
def on_complete(override=false, &block) # :yields: _result
+ batch.each do |executable|
+ executable.unbatched_on_complete(override, &block)
+ end
+ self
+ end
+
+ # Like on_complete, but only sets on_complete_block for self.
+ def unbatched_on_complete(override=false, &block) # :yields: _result
unless on_complete_block == nil || override
raise "on_complete_block already set: #{self}"
end
@on_complete_block = block
+ self
end
- # Adds the dependency to self, making self dependent on the dependency.
- # The dependency will be resolved by calling dependency._execute with
- # the input arguments during resolve_dependencies.
- def depends_on(dependency, *inputs)
+ # Sets a sequence workflow pattern for the tasks; each task will enque
+ # the next task with it's results.
+ #
+ # Notes:
+ # - Batched tasks will have the pattern set for each task in the batch
+ # - The current audited results are yielded to the block, if given,
+ # before the next task is enqued.
+ # - Executables may provided as well as tasks.
+ def sequence(*tasks, &block) # :yields: _result
+ Joins::Sequence.join(self, tasks, &block)
+ end
+
+ # Sets a fork workflow pattern for the source task; each target
+ # will enque the results of source.
+ #
+ # Notes:
+ # - Batched tasks will have the pattern set for each task in the batch
+ # - The current audited results are yielded to the block, if given,
+ # before the next task is enqued.
+ # - Executables may provided as well as tasks.
+ def fork(*targets, &block) # :yields: _result
+ Joins::Fork.join(self, targets, &block)
+ end
+
+ # Sets a simple merge workflow pattern for the source tasks. Each source
+ # enques target with it's result; no synchronization occurs, nor are
+ # results grouped before being sent to the target.
+ #
+ # Notes:
+ # - Batched tasks will have the pattern set for each task in the batch
+ # - The current audited results are yielded to the block, if given,
+ # before the next task is enqued.
+ # - Executables may provided as well as tasks.
+ def merge(*sources, &block) # :yields: _result
+ Joins::Merge.join(self, sources, &block)
+ end
+
+ # Sets a synchronized merge workflow for the source tasks. Results from
+ # each source task are collected and enqued as a single group to the target.
+ # The target is not enqued until all sources have completed. Raises an
+ # error if a source returns twice before the target is enqued.
+ #
+ # Notes:
+ # - Batched tasks will have the pattern set for each task in the batch
+ # - The current audited results are yielded to the block, if given,
+ # before the next task is enqued.
+ # - Executables may provided as well as tasks.
+ #
+ #-- TODO: add notes on testing and the way results are received
+ # (ie as a single object)
+ #
+ # - note stacking with a sync merge is *somewhat* pointless;
+ # it does not change the run order, but of course it does
+ # allow other tasks to be interleaved.
+ #
+ def sync_merge(*sources, &block) # :yields: _result
+ Joins::SyncMerge.join(self, sources, &block)
+ end
+
+ # Sets a choice workflow pattern for the source task. When the
+ # source task completes, switch yields the audited result to the
+ # block which then returns the index of the target to enque with
+ # the results. No target will be enqued if the index is false or
+ # nil; an error is raised if no target can be found for the
+ # specified index.
+ #
+ # Notes:
+ # - Batched tasks will have the pattern set for each task in the batch
+ # - The current audited results are yielded to the block, if given,
+ # before the next task is enqued.
+ # - Executables may provided as well as tasks.
+ def switch(*targets, &block) # :yields: _result
+ Joins::Switch.join(self, targets, &block)
+ end
+
+ def unbatched_depends_on(dependency, *inputs)
raise ArgumentError, "not an Executable: #{dependency}" unless dependency.kind_of?(Executable)
raise ArgumentError, "cannot depend on self" if dependency == self
- index = Executable.register(dependency, inputs)
+ index = app.dependencies.register(dependency, inputs)
dependencies << index unless dependencies.include?(index)
index
end
+ # Adds the dependency to self, making self dependent on the dependency.
+ # The dependency will be resolved by calling dependency._execute with
+ # the input arguments during resolve_dependencies.
+ def depends_on(dependency, *inputs)
+ index = unbatched_depends_on(dependency, *inputs)
+ batch.each do |e|
+ e.dependencies << index unless e.dependencies.include?(index)
+ end
+ index
+ end
+
# Resolves dependencies by calling dependency._execute with
# the dependency arguments. (See Dependable#resolve).
def resolve_dependencies
- Executable.resolve(dependencies)
+ app.dependencies.resolve(dependencies)
self
end
# Resets dependencies so they will be re-resolved on resolve_dependencies.
# (See Dependable#reset).
def reset_dependencies
- Executable.reset(dependencies)
+ app.dependencies.reset(dependencies)
self
end
-
+
# Auditing method call. Executes _method_name for self, but audits
# the result. Sends the audited result to the on_complete_block if set.
#
# Audits are initialized in the follwing manner:
# no inputs:: create a new, empty Audit. The first value of the audit
@@ -108,14 +310,19 @@
end
Audit.new(inputs, sources)
end
audit._record(self, send(_method_name, *inputs))
- on_complete_block.call(audit) if on_complete_block
-
+ on_complete_block ? on_complete_block.call(audit) : app.aggregator.store(audit)
+
audit
end
+
+ def inspect
+ "#<#{self.class.to_s}:#{object_id} _method: #{_method_name} batch_length: #{batch.length} app: #{app}>"
+ end
+
end
end
end
# Tap extends Object with <tt>_method</tt> to generate executable methods
@@ -135,10 +342,10 @@
class Object
# Initializes a Tap::Support::Executable using the Method returned by
# Object#method(method_name), setting the on_complete block as specified.
# Returns nil if Object#method returns nil.
- def _method(method_name, &on_complete_block) # :yields: _result
+ def _method(method_name, app=Tap::App.instance) # :yields: _result
return nil unless m = method(method_name)
- Tap::Support::Executable.initialize(m, :call, &on_complete_block)
+ Tap::Support::Executable.initialize(m, :call, app)
end
end
\ No newline at end of file