module Pipeline
# == Pipeline Stages
#
# Each pipeline is composed of sequential stages (see Pipeline::Stage::Base).
# The stages that will be executed are defined as follows:
#
# class PrepareIngredients < Pipeline::Stage::Base
# def run
# puts "Slicing..."
# end
# end
#
# class Cook < Pipeline::Stage::Base
# def run
# puts "Cooking..."
# end
# end
#
# class MakeDinnerPipeline < Pipeline::Base
# define_stages PrepareIngredients >> Cook
# end
#
# When this pipeline executes, it will run each stage sequentially, and the output
# would be:
# Slicing...
# Cooking...
#
# A pipeline can get access to its stages through the stages association.
#
# == Error Handling
#
# There are 3 types of errors that a failed stage can specifically raise:
#
# * Recoverable (requires user action): If a stage raises RecoverableError with
# input_required? == true, the pipeline gets :paused and can be
# resumed or cancelled by calling #resume and #cancel, respectively.
#
# * Recoverable (can be automatically retried): If a stage raises
# RecoverableError with input_required? == false, the pipeline goes into
# :retry state and will be automatically retried. This is currently achieved by
# +delayed_job+'s retry mechanism. Please refer to
# http://github.com/collectiveidea/delayed_job for information about how to
# configure the maximum number of retry attempts.
#
# * Irrecoverable: If a stage fails with an IrrecoverableError, the pipeline
# gets :failed and therefore cannot be resumed or restarted.
#
# If a stage fails with any other type of error, you can choose the default behaviour
# for what happens to the pipeline. By default, the pipeline will pause, so it can be
# later resumed. This can be overriden by calling +default_failure_mode+ like:
#
# class SamplePipeline < Pipeline::Base
# self.default_failure_mode = :cancel
# end
#
# You can always go back to the default mode by calling:
# self.default_failure_mode = :pause
#
# == State Transitions
#
# The following diagram represents the state transitions a pipeline instance can
# go through during its life-cycle:
#
# :not_started ---> :in_progress ---> :completed / :failed
# ^ |
# | v
# :paused / :retry
#
# [:not_started] The pipeline was instantiated but not started yet.
# [:in_progress] After started or resumed, the pipeline remains on this state while
# the stages are running.
# [:paused] If a stage fails with a recoverable error that requires user action,
# the pipeline gets paused.
# [:retry] If a stage fails with a recoverable error that can be automatically
# retried, the pipeline goes into this stage.
# [:completed] After successfully running all stages, the pipeline is completed.
# [:failed] If a stage fails with an unrecoverable error, or if the pipeline is
# cancelled, it goes into this stage.
#
# == Referencing External Objects
#
# The execution of a pipeline will usually be associated to an external entity
# (e.g. a +User+ if the stages represent an internal user registration process, or a
# +Recipe+ in the examples of this page). To be able to reference the associated object
# from the stages, Pipeline::Base has an attribute external_id that can be
# used on a custom association to any external entity. Example:
#
# class MakeDinnerPipeline < Pipeline::Base
# define_stages PrepareIngredients >> Cook
# belongs_to :recipe, :foreign_key => 'external_id'
# end
#
# A Stage can reference this object as such:
#
# class Cook < Pipeline::Stage::Base
# def run
# puts "Cooking a delicious #{pipeline.recipe.name}"
# end
# end
#
# == Callbacks
#
# You can define custom callbacks to be called before (+before_pipeline+) and after
# (+after_pipeline+) executing a pipeline. Example:
#
# class PrepareIngredients < Pipeline::Stage::Base
# def run
# puts "Slicing..."
# end
# end
#
# class Cook < Pipeline::Stage::Base
# def run
# puts "Cooking..."
# end
# end
#
# class MakeDinnerPipeline < Pipeline::Base
# define_stages PrepareIngredients >> Cook
#
# before_pipeline :wash_hands
# after_pipeline :serve_dinner
#
# private
# def wash_hands
# puts "Washing hands before we start..."
# end
#
# def serve_dinner
# puts "bon appetit!"
# end
# end
#
# Pipeline.start(MakeDinnerPipeline.new)
#
# Outputs:
# Washing hands before we start...
# Slicing...
# Cooking...
# bon appetit!
#
# Callbacks can be defined as a symbol that calls a private/protected method (like the
# example above), as an inline block, or as a +Callback+ object, as a regular
# +ActiveRecord+ callback.
class Base < ActiveRecord::Base
set_table_name :pipeline_instances
# :not_started ---> :in_progress ---> :completed / :failed
# ^ |
# | v
# :paused / :retry
symbol_attr :status
transactional_attr :status
private :status=
# Allows access to the associated stages
has_many :stages,
:class_name => 'Pipeline::Stage::Base',
:foreign_key => 'pipeline_instance_id',
:dependent => :destroy
class_inheritable_accessor :defined_stages, :instance_writer => false
self.defined_stages = []
class_inheritable_accessor :failure_mode, :instance_writer => false
self.failure_mode = :pause
define_callbacks :before_pipeline, :after_pipeline
# Defines the stages of this pipeline. Please refer to section
# "Pipeline Stages" above
def self.define_stages(stages)
self.defined_stages = stages.build_chain
end
# Sets the behaviour of this pipeline when a failure occurs. Accepted symbols are:
#
# [:pause] Pauses the pipeline on failure (default)
# [:cancel] Fails the pipeline on failure
def self.default_failure_mode=(mode)
new_mode = [:pause, :cancel].include?(mode) ? mode : :pause
self.failure_mode = new_mode
end
# Standard ActiveRecord callback to setup initial stages and status
# when a new pipeline is instantiated. If you override this callback, make
# sure to call +super+:
#
# class SamplePipeline < Pipeline::Base
# def after_initialize
# super
# self[:special_attribute] ||= "standard value"
# end
# end
def after_initialize
if new_record?
self[:status] = :not_started
self.class.defined_stages.each do |stage_class|
stages << stage_class.new(:pipeline => self)
end
end
end
# Standard +delayed_job+ method called when executing this pipeline. Raises
# InvalidStatusError if pipeline is in an invalid state for execution (e.g.
# already cancelled, or completed).
#
# This method will be called by +delayed_job+
# if this object is enqueued for asynchronous execution. However, you could
# call this method and execute the pipeline synchronously, without relying on
# +delayed_job+. Auto-retry would not work in this case, though.
def perform
_check_valid_status
begin
_setup
stages.each do |stage|
stage.perform unless stage.completed?
end
_complete_with_status(:completed)
rescue IrrecoverableError
_complete_with_status(:failed)
rescue RecoverableError => e
if e.input_required?
_complete_with_status(:paused)
else
_complete_with_status(:retry)
raise e
end
rescue Exception
_complete_with_status(failure_mode == :cancel ? :failed : :paused)
end
end
# Attempts to cancel this pipeline. Raises InvalidStatusError if pipeline is in
# an invalid state for cancelling (e.g. already cancelled, or completed)
def cancel
_check_valid_status
_complete_with_status(:failed)
end
# Attempts to resume this pipeline. Raises InvalidStatusError if pipeline is in
# an invalid state for resuming (e.g. already cancelled, or completed)
def resume
_check_valid_status
end
private
def ok_to_resume?
[:not_started, :paused, :retry].include?(status)
end
def _check_valid_status
reload unless new_record?
raise InvalidStatusError.new(status) unless ok_to_resume?
end
def _setup
self.attempts += 1
self.status = :in_progress
run_callbacks(:before_pipeline)
end
def _complete_with_status(status)
self.status = status
run_callbacks(:after_pipeline)
end
end
end