require 'tap/support/framework' module Tap # Tasks are the basic organizational unit of Tap. Tasks provide # a standard backbone for creating the working parts of an application # by facilitating configuration, batched execution of methods, and # documentation. # # The functionality of Task is built from several base modules: # - Tap::Support::Batchable # - Tap::Support::Configurable # - Tap::Support::Executable # # Tap::Workflow is built on the same foundations; the sectons on # configuration and batching apply equally to Workflows as Tasks. # # === Task Definition # # Tasks are instantiated with a task block; when the task is run # the block gets called with the enqued inputs. As such, the block # should specify the same number of inputs as you enque (plus the # task itself, which is a standard input). # # no_inputs = Task.new {|task| } # one_input = Task.new {|task, input| } # mixed_inputs = Task.new {|task, a, b, *args| } # # no_inputs.enq # one_input.enq(:a) # mixed_inputs.enq(:a, :b) # mixed_inputs.enq(:a, :b, 1, 2, 3) # # Subclasses of Task specify executable code by overridding the process # method. In this case the number of enqued inputs should correspond to # process (passing the task would be redundant). # # class NoInput < Tap::Task # def process() end # end # # class OneInput < Tap::Task # def process(input) end # end # # class MixedInputs < Tap::Task # def process(a, b, *args) end # end # # NoInput.new.enq # OneInput.new.enq(:a) # MixedInputs.new.enq(:a, :b) # MixedInputs.new.enq(:a, :b, 1, 2, 3) # # === Configuration # # Tasks are configurable. By default each task will be configured # with the default class configurations, which can be set when the # class is defined. # # class ConfiguredTask < Tap::Task # config :one, 'one' # config :two, 'two' # end # # t = ConfiguredTask.new # t.name # => "configured_task" # t.config # => {:one => 'one', :two => 'two'} # # Configurations can be validated or processed using an optional # block. Tap::Support::Validation pre-packages several common # validation/processing blocks, and can be accessed through the # class method 'c': # # class ValidatingTask < Tap::Task # # string config validated to be a string # config :string, 'str', &c.check(String) # # # integer config; string inputs are converted using YAML # config :integer, 1, &c.yaml(Integer) # end # # t = ValidatingTask.new # t.string = 1 # !> ValidationError # t.integer = 1.1 # !> ValidationError # # t.integer = "1" # t.integer == 1 # => true # # Tasks have a name that gets used in auditing, and as a relative # filepath to find associated files (for instance config files). # By default the task name is based on the task class, such that # Tap::Task has the default name 'tap/task'. Configurations # and custom names can be provided when a task is initialized. # # t = ConfiguredTask.new({:one => 'ONE', :three => 'three'}, "example") # t.name # => "example" # t.config # => {:one => 'ONE', :two => 'two', :three => 'three'} # # === Batches # # Tasks can be assembled into batches that enque and execute collectively. # Batched tasks are often alternatively-configured derivatives of one # parent task, although they can be manually assembled using Task.batch. # # app = Tap::App.instance # t1 = Tap::Task.new(:key => 'one') do |task, input| # input + task.config[:key] # end # t1.batch # => [t1] # # t2 = t1.initialize_batch_obj(:key => 'two') # t1.batch # => [t1, t2] # t2.batch # => [t1, t2] # # t1.enq 't1_by_' # t2.enq 't2_by_' # app.run # # app.results(t1) # => ["t1_by_one", "t2_by_one"] # app.results(t2) # => ["t1_by_two", "t2_by_two"] # # Here the results reflects that t1 and t2 were run in succession with the # input to t1, and then the input to t2. # # === Subclassing # Tasks can be subclassed normally, with one reminder related to batching. # # Batched tasks are generated by duplicating an existing instance, hence # all instance variables will point to the same object in the batched # and original task. At times (as with configurations), this is # undesirable; the batched task should have it's own copy of an # instance variable. # # In these cases, the initialize_copy should be overridden # and should re-initialize the appropriate variables. Be sure to call # super to invoke the default initialize_copy: # # class SubclassTask < Tap::Task # attr_accessor :array # def initialize(*args) # @array = [] # super # end # # def initialize_copy(orig) # @array = orig.array.dup # super # end # end # # t1 = SubclassTask.new # t2 = t1.initialize_batch_obj # t1.array == t2.array # => true # t1.array.object_id == t2.array.object_id # => false # class Task include Support::Executable include Support::Framework attr_reader :task_block def initialize(config={}, name=nil, app=App.instance, &task_block) super(config, name, app) @task_block = (task_block == nil ? default_task_block : task_block) @multithread = false @on_complete_block = nil @_method_name = :execute end # Enqueues self and self.batch to app with the inputs. # The number of inputs provided should match the number # of inputs specified by the arity of the _method_name method. def enq(*inputs) app.queue.enq(self, inputs) end batch_function :enq, :multithread= batch_function(:on_complete) {} # Executes self with the given inputs. Execute provides hooks for subclasses # to insert standard execution code: before_execute, on_execute_error, # and after_execute. Override any/all of these methods as needed. # # Execute passes the inputs to process and returns the result. def execute(*inputs) before_execute begin result = process(*inputs) rescue on_execute_error($!) end after_execute result end # The method for processing inputs into outputs. Override this method in # subclasses to provide class-specific process logic. The number of # arguments specified by process corresponds to the number of arguments # the task should have when enqued. # # class TaskWithTwoInputs < Tap::Task # def process(a, b) # [b,a] # end # end # # t = TaskWithTwoInputs.new # t.enq(1,2).enq(3,4) # t.app.run # t.app.results(t) # => [[2,1], [4,3]] # # By default process passes self and the input(s) to the task_block # provided during initialization. In this case the task block dictates # the number of arguments enq should receive. Simply returns the inputs # if no task_block is set. # # # two arguments in addition to task are specified # # so this Task must be enqued with two inputs... # t = Task.new {|task, a, b| [b,a] } # t.enq(1,2).enq(3,4) # t.app.run # t.app.results(t) # => [[2,1], [4,3]] # def process(*inputs) return inputs if task_block == nil inputs.unshift(self) arity = task_block.arity n = inputs.length unless n == arity || (arity < 0 && (-1-n) <= arity) raise ArgumentError.new("wrong number of arguments (#{n} for #{arity})") end task_block.call(*inputs) end protected # Hook to set a default task block. By default, nil. def default_task_block nil end # Hook to execute code before inputs are processed. def before_execute() end # Hook to execute code after inputs are processed. def after_execute() end # Hook to handle unhandled errors from processing inputs on a task level. # By default on_execute_error simply re-raises the unhandled error. def on_execute_error(err) raise err end end end