require 'tap/support/batchable' require 'tap/support/executable' require 'tap/support/command_line' module Tap # Tasks are the basic organizational unit of Tap. Tasks provide # a standard backbone for creating the working parts of an application # by facilitating configuration, batched execution of methods, and # documentation. # # The functionality of Task is built from several base modules: # - Tap::Support::Batchable # - Tap::Support::Configurable # - Tap::Support::Executable # # Tap::Workflow is built on the same foundations; the sectons on # configuration and batching apply equally to Workflows as Tasks. # # === Task Definition # # Tasks are instantiated with a task block; when the task is run # the block gets called with the enqued inputs. As such, the block # should specify the same number of inputs as you enque (plus the # task itself, which is a standard input). # # no_inputs = Task.new {|task| } # one_input = Task.new {|task, input| } # mixed_inputs = Task.new {|task, a, b, *args| } # # no_inputs.enq # one_input.enq(:a) # mixed_inputs.enq(:a, :b) # mixed_inputs.enq(:a, :b, 1, 2, 3) # # Subclasses of Task specify executable code by overridding the process # method. In this case the number of enqued inputs should correspond to # process (passing the task would be redundant). # # class NoInput < Tap::Task # def process() end # end # # class OneInput < Tap::Task # def process(input) end # end # # class MixedInputs < Tap::Task # def process(a, b, *args) end # end # # NoInput.new.enq # OneInput.new.enq(:a) # MixedInputs.new.enq(:a, :b) # MixedInputs.new.enq(:a, :b, 1, 2, 3) # # === Configuration # # Tasks are configurable. By default each task will be configured # with the default class configurations, which can be set when the # class is defined. # # class ConfiguredTask < Tap::Task # config :one, 'one' # config :two, 'two' # end # # t = ConfiguredTask.new # t.name # => "configured_task" # t.config # => {:one => 'one', :two => 'two'} # # Configurations can be validated or processed using an optional # block. Tap::Support::Validation pre-packages several common # validation/processing blocks, and can be accessed through the # class method 'c': # # class ValidatingTask < Tap::Task # # string config validated to be a string # config :string, 'str', &c.check(String) # # # integer config; string inputs are converted using YAML # config :integer, 1, &c.yaml(Integer) # end # # t = ValidatingTask.new # t.string = 1 # !> ValidationError # t.integer = 1.1 # !> ValidationError # # t.integer = "1" # t.integer == 1 # => true # # Tasks have a name that gets used in auditing, and as a relative # filepath to find associated files (for instance config files). # By default the task name is based on the task class, such that # Tap::Task has the default name 'tap/task'. Configurations # and custom names can be provided when a task is initialized. # # t = ConfiguredTask.new({:one => 'ONE', :three => 'three'}, "example") # t.name # => "example" # t.config # => {:one => 'ONE', :two => 'two', :three => 'three'} # # === Batches # # Tasks can be assembled into batches that enque and execute collectively. # Batched tasks are often alternatively-configured derivatives of one # parent task, although they can be manually assembled using Task.batch. # # app = Tap::App.instance # t1 = Tap::Task.new(:key => 'one') do |task, input| # input + task.config[:key] # end # t1.batch # => [t1] # # t2 = t1.initialize_batch_obj(:key => 'two') # t1.batch # => [t1, t2] # t2.batch # => [t1, t2] # # t1.enq 't1_by_' # t2.enq 't2_by_' # app.run # # app.results(t1) # => ["t1_by_one", "t2_by_one"] # app.results(t2) # => ["t1_by_two", "t2_by_two"] # # Here the results reflects that t1 and t2 were run in succession with the # input to t1, and then the input to t2. # # === Subclassing # Tasks can be subclassed normally, with one reminder related to batching. # # Batched tasks are generated by duplicating an existing instance, hence # all instance variables will point to the same object in the batched # and original task. At times (as with configurations), this is # undesirable; the batched task should have it's own copy of an # instance variable. # # In these cases, the initialize_copy should be overridden # and should re-initialize the appropriate variables. Be sure to call # super to invoke the default initialize_copy: # # class SubclassTask < Tap::Task # attr_accessor :array # def initialize(*args) # @array = [] # super # end # # def initialize_copy(orig) # @array = orig.array.dup # super # end # end # # t1 = SubclassTask.new # t2 = t1.initialize_batch_obj # t1.array == t2.array # => true # t1.array.object_id == t2.array.object_id # => false # class Task include Support::Batchable include Support::Configurable include Support::Executable class << self # Returns the default name for the class: to_s.underscore attr_accessor :default_name # Returns class dependencies attr_reader :dependencies def inherited(child) unless child.instance_variable_defined?(:@source_file) caller.first =~ Support::Lazydoc::CALLER_REGEXP child.instance_variable_set(:@source_file, File.expand_path($1)) end child.instance_variable_set(:@default_name, child.to_s.underscore) child.instance_variable_set(:@dependencies, dependencies.dup) super end # Returns an instance of self; the instance is a kind of 'global' # instance used in class-level dependencies. See depends_on. def instance @instance ||= new end # Generates or updates the specified subclass of self. def subclass(const_name, configs={}, dependencies=[], options={}, &block) # # Lookup or create the subclass constant. # current, constants = const_name.to_s.constants_split subclass = if constants.empty? # The constant exists; validate the constant is a subclass of self. unless current.kind_of?(Class) && current.ancestors.include?(self) raise ArgumentError, "#{current} is already defined and is not a subclass of #{self}!" end current else # Generate the nesting module subclass_const = constants.pop constants.each {|const| current = current.const_set(const, Module.new)} # Create and set the subclass constant current.const_set(subclass_const, Class.new(self)) end # # Define the subclass # subclass.define_configurations(configs) subclass.define_dependencies(dependencies) subclass.define_process(block) if block_given? # # Register documentation # const_name = subclass.to_s caller.each_with_index do |line, index| case line when /\/tap\/support\/declarations.rb/ then next when Support::Lazydoc::CALLER_REGEXP subclass.source_file = File.expand_path($1) lzd = subclass.lazydoc(false) lzd[const_name, false]['manifest'] = lzd.register($3.to_i - 1) break end end arity = options[:arity] || (block_given? ? block.arity : -1) comment = Support::Comment.new comment.subject = case when arity > 0 Array.new(arity, "INPUT").join(' ') when arity < 0 array = Array.new(-1 * arity - 1, "INPUT") array << "INPUTS..." array.join(' ') else "" end subclass.lazydoc(false)[const_name, false]['args'] ||= comment subclass.default_name = const_name.underscore subclass end def instantiate(argv, app=Tap::App.instance) # => instance, argv opts = OptionParser.new # Add configurations config = {} unless configurations.empty? opts.separator "" opts.separator "configurations:" end configurations.each do |receiver, key, configuration| opts.on(*Support::CommandLine.configv(configuration)) do |value| config[key] = value end end # Add options on_tail, giving priority to configurations opts.separator "" opts.separator "options:" opts.on_tail("-h", "--help", "Print this help") do opts.banner = "#{help}usage: tap run -- #{to_s.underscore} #{args.subject}" puts opts exit end # Add option for name name = default_name opts.on_tail('--name NAME', /^[^-].*/, 'Specify a name') do |value| name = value end # Add option to add args use_args = [] opts.on_tail('--use FILE', /^[^-].*/, 'Loads inputs from file') do |value| obj = YAML.load_file(value) case obj when Hash obj.values.each do |array| # error if value isn't an array use_args.concat(array) end when Array use_args.concat(obj) else use_args << obj end end opts.parse!(argv) obj = new({}, name, app) path_configs = load_config(app.config_filepath(name)) if path_configs.kind_of?(Array) path_configs.each_with_index do |path_config, i| obj.initialize_batch_obj(path_config, "#{name}_#{i}") unless i == 0 end path_configs = path_configs[0] end argv = (argv + use_args).collect {|str| str =~ /\A---\s*\n/ ? YAML.load(str) : str } [obj.reconfigure(path_configs).reconfigure(config), argv] end def lazydoc(resolve=true) lazydoc = super(false) lazydoc.register_method_pattern('args', :process) unless lazydoc.resolved? super end DEFAULT_HELP_TEMPLATE = %Q{<% manifest = task_class.manifest %> <%= task_class %><%= manifest.subject.to_s.strip.empty? ? '' : ' -- ' %><%= manifest.subject %> <% unless manifest.empty? %> <%= '-' * 80 %> <% manifest.wrap(77, 2, nil).each do |line| %> <%= line %> <% end %> <%= '-' * 80 %> <% end %> } def help Tap::Support::Templater.new(DEFAULT_HELP_TEMPLATE, :task_class => self).build end # Sets a class-level dependency. When task class B depends_on another task # class A, instances of B are initialized to depend on A.instance, with the # specified arguments. Returns self. def depends_on(dependency_class, *args) unless dependency_class.respond_to?(:instance) raise ArgumentError, "dependency_class does not respond to instance: #{dependency_class}" end (dependencies << [dependency_class, args]).uniq! self end protected def dependency(name, dependency_class, *args) depends_on(dependency_class, *args) define_method(name) do index = Support::Executable.index(dependency_class.instance, args) Support::Executable.resolve([index]) Support::Executable.results[index]._current end public(name) end def define(name, klass=Tap::Task, &block) instance_var = "@#{name}".to_sym define_method(name) do |*args| raise ArgumentError, "wrong number of arguments (#{args.length} for 1)" if args.length > 1 instance_name = args[0] || name instance_variable_set(instance_var, {}) unless instance_variable_defined?(instance_var) instance_variable_get(instance_var)[instance_name] ||= config_task(instance_name, klass, &block) end define_method("#{name}=") do |input| input = {name => input} unless input.kind_of?(Hash) instance_variable_set(instance_var, input) end public(name, "#{name}=") end def define_configurations(configs) case configs when Hash # hash configs are simply added as default configurations attr_accessor(*configs.keys) configs.each_pair do |key, value| configurations.add(key, value) end public(*configs.keys) when Array # array configs define configuration methods configs.each do |method, key, value, opts, config_block| send(method, key, value, opts, &config_block) end else raise ArgumentError, "cannot define configurations from: #{configs}" end end def define_dependencies(dependencies) dependencies.each do |name, dependency_class, args| dependency(name, dependency_class, *(args ? args : [])) end if dependencies end def define_process(block) send(:define_method, :process, &block) end end instance_variable_set(:@source_file, __FILE__) instance_variable_set(:@default_name, 'tap/task') instance_variable_set(:@dependencies, []) lazy_attr :manifest lazy_attr :args # The application used to load config_file templates # (and hence, to initialize batched objects). attr_reader :app # The name of self. #-- # Currently names may be any object. Audit makes use of name # via to_s, as does app when figuring configuration filepaths. attr_accessor :name # The task block provided during initialization. attr_reader :task_block # Initializes a new instance and associated batch objects. Batch # objects will be initialized for each configuration template # specified by app.each_config_template(config_file) where # config_file = app.config_filepath(name). def initialize(config={}, name=nil, app=App.instance, &task_block) super() @app = app @name = name || self.class.default_name @task_block = (task_block == nil ? default_task_block : task_block) @_method_name = :execute @on_complete_block = nil @dependencies = [] case config when Support::InstanceConfiguration @config = config config.bind(self) else initialize_config(config) end self.class.dependencies.each do |task_class, args| depends_on(task_class.instance, *args) end end # Creates a new batched object and adds the object to batch. The batched object # will be a duplicate of the current object but with a new name and/or # configurations. def initialize_batch_obj(overrides={}, name=nil) obj = super().reconfigure(overrides) obj.name = name if name obj end # Enqueues self and self.batch to app with the inputs. # The number of inputs provided should match the number # of inputs specified by the arity of the _method_name method. def enq(*inputs) app.queue.enq(self, inputs) end batch_function :enq batch_function(:on_complete) {} # Convenience method, equivalent to: # self.app.sequence([self] + tasks) def sequence(*tasks) app.sequence([self] + tasks) end # Convenience method, equivalent to: # self.app.fork(self, targets) def fork(*targets) app.fork(self, targets) end # Convenience method, equivalent to: # self.app.merge(self, sources) def merge(*sources) app.merge(self, sources) end # Convenience method, equivalent to: # self.app.sync_merge(self, sources) def sync_merge(*sources) app.sync_merge(self, sources) end # Convenience method, equivalent to: # self.app.switch(self, targets, &block) def switch(*targets, &block) app.switch(self, targets, &block) end # Executes self with the given inputs. Execute provides hooks for subclasses # to insert standard execution code: before_execute, on_execute_error, # and after_execute. Override any/all of these methods as needed. # # Execute passes the inputs to process and returns the result. def execute(*inputs) before_execute begin result = process(*inputs) rescue on_execute_error($!) end after_execute result end # The method for processing inputs into outputs. Override this method in # subclasses to provide class-specific process logic. The number of # arguments specified by process corresponds to the number of arguments # the task should have when enqued. # # class TaskWithTwoInputs < Tap::Task # def process(a, b) # [b,a] # end # end # # t = TaskWithTwoInputs.new # t.enq(1,2).enq(3,4) # t.app.run # t.app.results(t) # => [[2,1], [4,3]] # # By default process passes self and the input(s) to the task_block # provided during initialization. In this case the task block dictates # the number of arguments enq should receive. Simply returns the inputs # if no task_block is set. # # # two arguments in addition to task are specified # # so this Task must be enqued with two inputs... # t = Task.new {|task, a, b| [b,a] } # t.enq(1,2).enq(3,4) # t.app.run # t.app.results(t) # => [[2,1], [4,3]] # def process(*inputs) return inputs if task_block == nil inputs.unshift(self) arity = task_block.arity n = inputs.length unless n == arity || (arity < 0 && (-1-n) <= arity) raise ArgumentError.new("wrong number of arguments (#{n} for #{arity})") end task_block.call(*inputs) end # Logs the inputs to the application logger (via app.log) def log(action, msg="", level=Logger::INFO) # TODO - add a task identifier? app.log(action, msg, level) end # Raises a TerminateError if app.state == State::TERMINATE. # check_terminate may be called at any time to provide a # breakpoint in long-running processes. def check_terminate if app.state == App::State::TERMINATE raise App::TerminateError.new end end # Returns self.name def to_s name.to_s end # Provides an abbreviated version of the default inspect, with only # the task class, object_id, name, and configurations listed. def inspect "#<#{self.class.to_s}:#{object_id} #{name} #{config.to_hash.inspect} >" end protected # Hook to set a default task block. By default, nil. def default_task_block nil end # Hook to execute code before inputs are processed. def before_execute() end # Hook to execute code after inputs are processed. def after_execute() end # Hook to handle unhandled errors from processing inputs on a task level. # By default on_execute_error simply re-raises the unhandled error. def on_execute_error(err) raise err end private def config_task(name, klass=Tap::Task, &block) configs = config[name] || {} raise ArgumentError, "config '#{name}' is not a hash" unless configs.kind_of?(Hash) klass.new(configs, name, &block) end end end