lib/imw/dataset/workflow.rb in imw-0.1.0 vs lib/imw/dataset/workflow.rb in imw-0.1.1

- old
+ new

@@ -1,81 +1,142 @@ -# -# lib/imw/workflow.rb -- implements the workflow class -# -# == About -# -# This file implements the <tt>IMW::Workflow</tt> class which tailors -# the functionality of Rake for IMW objects. -# -# Author:: Philip flip Kromer for infochimps.org (mailto:coders@infochimps.org) -# Copyright:: Copyright (c) 2008 infochimps.org -# License:: GPL 3.0 -# Website:: http://infinitemonkeywrench.org/ -# - -require 'imw/dataset/scaffold' require 'imw/dataset/task' +require 'ostruct' module IMW - # The <tt>IMW::Workflow</tt> module is a collection of methods which - # define Rake[http://rake.rubyforge.org/] tasks specialized for each - # dataset. + # IMW encourages you to view a data transformation as a network of + # dependencies. By default, IMW defines five main steps: + # + # rip:: + # Obtain data via HTTP, FTP, SCP, RSYNC, database query, &c. + # + # extract:: + # Extract data from its ripped form to a form which can be + # parsed. + # + # parse:: + # Parse data into a structured form. + # + # munge:: + # Combine, filter, reconcile, and transform already structured + # data into a desired form. + # + # package:: + # Archive, compress, and deliver data in its final form to some + # location (HTTP, FTP, SCP, RSYNC, S3, EBS, &c.). + # + # Each step depends upon the one before it. The steps are blank by + # default so there's no need to write code for steps you don't need + # to use. + # + # Each step corresponds to a named directory in IMW::Workflow::DIRS. module Workflow - # The functions called here define the default tasks associated - # with each dataset. - def create_default_tasks - create_directories_task - create_symlinks_task - create_initialize_task - create_delete_data_task - create_destroy_task - create_workflow_tasks - end + # The <tt>Rake::TaskManager</tt> module allows the + # <tt>IMW::Dataset</tt> class to leverage the functionality of the + # Rake[http://rake.rubyforge.org/] library to manage tasks + # associated with the processing of this dataset. + include Rake::TaskManager - # Sets the default tasks in this workflow. + # Default options passed to <tt>Rake</tt>. Any class including + # the <tt>Rake::TaskManager</tt> module must define a constant by + # this name. + DEFAULT_OPTIONS = { + :dry_run => false, + :trace => false, + :verbose => false + } + + # The standard IMW workflow steps. + STEPS = [:rip, :extract, :parse, :munge, :package] + + # The steps of the IMW workflow each correspond to a directory in + # which it is customary that they deposit their files <em>once + # they are finished processing</em> (so ripped files wind up in + # the +ripd+ directory, packaged files in the +pkgd+ directory, + # and so on). + DIRS = [:ripd, :xtrd, :prsd, :mungd, :pkgd ] + + # Each workflow step can be configured to take default actions, + # each action being a proc in the array for the step in this hash. # - # The default tasks constitute a set of consecutive actions that - # must be taken in order: <tt>:rip</tt>, <tt>parse</tt>, - # <tt>munge</tt>, <tt>fix</tt>, and <tt>package</tt>. Each task - # is a <tt>Rake::Task</tt> which depends on the one before it. - # - # Each task does nothing by default other than create directories - # to hold files for this dataset as it undergoes the workflow. - def set_default_tasks - define_task(Rake::Task, {:rip => []}) - define_task(Rake::Task, {:parse => :rip}) - define_task(Rake::Task, {:munge => :parse}) - define_task(Rake::Task, {:fix => :munge}) - define_task(Rake::Task, {:package => :fix}) - comment_default_tasks + # This allows classes which include IMW::Workflow to use class + # methods named after each step (+rip+, +parse+, &c.) to directly + # define tasks. + STEPS_TASKS = returning({}) do |steps_procs| + STEPS.each do |step| + steps_procs[step] = [] + end end - # Set the initial comments for each of the default tasks. - def comment_default_tasks - self[:rip].comment = "Rip dataset from an origin" - self[:parse].comment = "Parse dataset into intermediate form" - self[:munge].comment = "Munge dataset's structure into desired form" - self[:fix].comment = "Fix and format dataset" - self[:package].comment = "Package dataset into a final format" + protected + def self.included klass + STEPS.each do |step| + klass.class_eval <<EOF +def self.#{step}(deps=nil, &block) + STEPS_TASKS[:#{step}] << [deps, block] +end +EOF + end + + end - # Creates the task dependency chain <tt>:package => :fix => :munge - # => :peel => :rip => :initialize</tt>. - def create_workflow_tasks - @last_description = "Obtain data from some source." - define_task(IMW::Task, :rip => [:initialize]) - @last_description = "Extract datafiles from ripped data." - define_task(IMW::Task, :peel => [:rip]) - @last_description = "Transform records in a dataset." - define_task(IMW::Task, :munge => [:peel]) - @last_description = "Reconcile records." - define_task(IMW::Task, :fix => [:munge]) - @last_description = "Package dataset in final form." - define_task(IMW::Task, :package => [:fix]) + def define_workflow_task deps, comment + @last_description = comment + define_task(IMW::Task, deps) + step = deps.respond_to?(:keys) ? deps.keys.first : deps + STEPS_TASKS[step].each do |deps, block| + self[step].enhance(deps) do + self.instance_eval(&block) + end + end end + # Create all the instance variables required by Rake::TaskManager + # and define default tasks for this dataset. + def initialize_workflow + @tasks = Hash.new + @rules = Array.new + @scope = Array.new + @last_description = nil + @options = OpenStruct.new(DEFAULT_OPTIONS) + define_create_directories_task + define_workflow_tasks + define_destroy_task + end + + # Creates a task <tt>:create_directories</tt> to create the + # directory structure for this dataset. + def define_create_directories_task + @last_description = "Creates workflow directories for this dataset." + define_task(IMW::Task, {:create_directories => []}) do + DIRS.each do |dir| + FileUtils.mkdir_p(path_to(dir)) unless File.exist?(path_to(dir)) + end + end + end + + # Creates a task <tt>:destroy</tt> which removes dataset's + # workflow directories. + def define_destroy_task + @last_description = "Get rid of all traces of this dataset." + define_task(IMW::Task, :destroy => [:create_directories]) do + DIRS.each do |dir| + FileUtils.rm_rf(path_to(dir)) + end + end + end + + # Creates the task dependency chain <tt>:package => :munge => + # :parse => :extract => :rip => :initialize</tt> of the + # IMW::Workflow. + def define_workflow_tasks + define_workflow_task({:rip => [:create_directories]}, "Obtain data from some source." ) + define_workflow_task({:extract => [:rip]}, "Extract data so it's ready to parse." ) + define_workflow_task({:parse => [:extract]}, "Parse data into a structured form." ) + define_workflow_task({:munge => [:parse]}, "Munge structured data into desired form.") + define_workflow_task({:package => [:munge]}, "Package dataset in final form." ) + end + end end - -# puts "#{File.basename(__FILE__)}: You find your flow next to a tall tree. Ahhhh."