require 'tap/support/framework'
module Tap
# Tasks are the basic organizational unit of Tap. Tasks provide
# a standard backbone for creating the working parts of an application
# by facilitating configuration, batched execution of methods, and
# documentation.
#
# The functionality of Task is built from several base modules:
# - Tap::Support::Batchable
# - Tap::Support::Configurable
# - Tap::Support::Executable
#
# Tap::Workflow is built on the same foundations; the sectons on
# configuration and batching apply equally to Workflows as Tasks.
#
# === Task Definition
#
# Tasks are instantiated with a task block; when the task is run
# the block gets called with the enqued inputs. As such, the block
# should specify the same number of inputs as you enque (plus the
# task itself, which is a standard input).
#
# no_inputs = Task.new {|task| }
# one_input = Task.new {|task, input| }
# mixed_inputs = Task.new {|task, a, b, *args| }
#
# no_inputs.enq
# one_input.enq(:a)
# mixed_inputs.enq(:a, :b)
# mixed_inputs.enq(:a, :b, 1, 2, 3)
#
# Subclasses of Task specify executable code by overridding the process
# method. In this case the number of enqued inputs should correspond to
# process (passing the task would be redundant).
#
# class NoInput < Tap::Task
# def process() end
# end
#
# class OneInput < Tap::Task
# def process(input) end
# end
#
# class MixedInputs < Tap::Task
# def process(a, b, *args) end
# end
#
# NoInput.new.enq
# OneInput.new.enq(:a)
# MixedInputs.new.enq(:a, :b)
# MixedInputs.new.enq(:a, :b, 1, 2, 3)
#
# === Configuration
#
# Tasks are configurable. By default each task will be configured
# with the default class configurations, which can be set when the
# class is defined.
#
# class ConfiguredTask < Tap::Task
# config :one, 'one'
# config :two, 'two'
# end
#
# t = ConfiguredTask.new
# t.name # => "configured_task"
# t.config # => {:one => 'one', :two => 'two'}
#
# Configurations can be validated or processed using an optional
# block. Tap::Support::Validation pre-packages several common
# validation/processing blocks, and can be accessed through the
# class method 'c':
#
# class ValidatingTask < Tap::Task
# # string config validated to be a string
# config :string, 'str', &c.check(String)
#
# # integer config; string inputs are converted using YAML
# config :integer, 1, &c.yaml(Integer)
# end
#
# t = ValidatingTask.new
# t.string = 1 # !> ValidationError
# t.integer = 1.1 # !> ValidationError
#
# t.integer = "1"
# t.integer == 1 # => true
#
# Tasks have a name that gets used in auditing, and as a relative
# filepath to find associated files (for instance config files).
# By default the task name is based on the task class, such that
# Tap::Task has the default name 'tap/task'. Configurations
# and custom names can be provided when a task is initialized.
#
# t = ConfiguredTask.new({:one => 'ONE', :three => 'three'}, "example")
# t.name # => "example"
# t.config # => {:one => 'ONE', :two => 'two', :three => 'three'}
#
# === Batches
#
# Tasks can be assembled into batches that enque and execute collectively.
# Batched tasks are often alternatively-configured derivatives of one
# parent task, although they can be manually assembled using Task.batch.
#
# app = Tap::App.instance
# t1 = Tap::Task.new(:key => 'one') do |task, input|
# input + task.config[:key]
# end
# t1.batch # => [t1]
#
# t2 = t1.initialize_batch_obj(:key => 'two')
# t1.batch # => [t1, t2]
# t2.batch # => [t1, t2]
#
# t1.enq 't1_by_'
# t2.enq 't2_by_'
# app.run
#
# app.results(t1) # => ["t1_by_one", "t2_by_one"]
# app.results(t2) # => ["t1_by_two", "t2_by_two"]
#
# Here the results reflects that t1 and t2 were run in succession with the
# input to t1, and then the input to t2.
#
# === Subclassing
# Tasks can be subclassed normally, with one reminder related to batching.
#
# Batched tasks are generated by duplicating an existing instance, hence
# all instance variables will point to the same object in the batched
# and original task. At times (as with configurations), this is
# undesirable; the batched task should have it's own copy of an
# instance variable.
#
# In these cases, the initialize_copy should be overridden
# and should re-initialize the appropriate variables. Be sure to call
# super to invoke the default initialize_copy:
#
# class SubclassTask < Tap::Task
# attr_accessor :array
# def initialize(*args)
# @array = []
# super
# end
#
# def initialize_copy(orig)
# @array = orig.array.dup
# super
# end
# end
#
# t1 = SubclassTask.new
# t2 = t1.initialize_batch_obj
# t1.array == t2.array # => true
# t1.array.object_id == t2.array.object_id # => false
#
class Task
include Support::Executable
include Support::Framework
attr_reader :task_block
def initialize(config={}, name=nil, app=App.instance, &task_block)
super(config, name, app)
@task_block = (task_block == nil ? default_task_block : task_block)
@multithread = false
@on_complete_block = nil
@_method_name = :execute
end
# Enqueues self and self.batch to app with the inputs.
# The number of inputs provided should match the number
# of inputs specified by the arity of the _method_name method.
def enq(*inputs)
app.queue.enq(self, inputs)
end
batch_function :enq, :multithread=
batch_function(:on_complete) {}
# Executes self with the given inputs. Execute provides hooks for subclasses
# to insert standard execution code: before_execute, on_execute_error,
# and after_execute. Override any/all of these methods as needed.
#
# Execute passes the inputs to process and returns the result.
def execute(*inputs)
before_execute
begin
result = process(*inputs)
rescue
on_execute_error($!)
end
after_execute
result
end
# The method for processing inputs into outputs. Override this method in
# subclasses to provide class-specific process logic. The number of
# arguments specified by process corresponds to the number of arguments
# the task should have when enqued.
#
# class TaskWithTwoInputs < Tap::Task
# def process(a, b)
# [b,a]
# end
# end
#
# t = TaskWithTwoInputs.new
# t.enq(1,2).enq(3,4)
# t.app.run
# t.app.results(t) # => [[2,1], [4,3]]
#
# By default process passes self and the input(s) to the task_block
# provided during initialization. In this case the task block dictates
# the number of arguments enq should receive. Simply returns the inputs
# if no task_block is set.
#
# # two arguments in addition to task are specified
# # so this Task must be enqued with two inputs...
# t = Task.new {|task, a, b| [b,a] }
# t.enq(1,2).enq(3,4)
# t.app.run
# t.app.results(t) # => [[2,1], [4,3]]
#
def process(*inputs)
return inputs if task_block == nil
inputs.unshift(self)
arity = task_block.arity
n = inputs.length
unless n == arity || (arity < 0 && (-1-n) <= arity)
raise ArgumentError.new("wrong number of arguments (#{n} for #{arity})")
end
task_block.call(*inputs)
end
protected
# Hook to set a default task block. By default, nil.
def default_task_block
nil
end
# Hook to execute code before inputs are processed.
def before_execute() end
# Hook to execute code after inputs are processed.
def after_execute() end
# Hook to handle unhandled errors from processing inputs on a task level.
# By default on_execute_error simply re-raises the unhandled error.
def on_execute_error(err)
raise err
end
end
end