Sha256: d47b6d03e1989369c25a8560050ea59d3f5d57cb6c816eb309072497924f8456

Contents?: true

Size: 2 KB

Versions: 1

Compression:

Stored size: 2 KB

Contents

module NxtPipeline
  class Pipeline
    include ActiveSupport::Callbacks
    define_callbacks :each_step_pipe_through
    
    attr_reader :failed_step

    def initialize(*attrs)
      extract_pipe_attr_from_init_params(*attrs)
    end

    def run
      self.class.steps.reduce(pipe_attr) do |transformed_pipe_attr, step|
        run_callbacks :each_step_pipe_through do
          step[self.class.pipe_attr_name].new(self.class.pipe_attr_name => transformed_pipe_attr).pipe_through
        rescue => error
          handle_segment_burst(error, step)
        end
      end
    end

    def failed?
      failed_step.present?
    end

    class << self
      def pipe_attr(name)
        @pipe_attr_name = name
      end

      def step(name)
        self.steps << name
      end

      def rescue_errors(*errors, &block)
        @rescueable_errors = errors
        self.rescueable_block = block
      end
      
      def before_each_step(*filters, &block)
        set_callback :each_step_pipe_through, :before, *filters, &block
      end
      
      def after_each_step(*filters, &block)
        set_callback :each_step_pipe_through, :after, *filters, &block
      end
      
      def around_each_step(*filters, &block)
        set_callback :each_step_pipe_through, :around, *filters, &block
      end
      
      attr_reader :pipe_attr_name
      attr_accessor :rescueable_block
      
      def steps
        @steps ||= []
      end
      
      def rescueable_errors
        @rescueable_errors ||= []
      end
    end

    private

    attr_reader :pipe_attr

    def extract_pipe_attr_from_init_params(*attrs)
      raise ArgumentError, 'You need to pass a keyword param as argument to #new' unless attrs.first.is_a?(Hash)
      @pipe_attr = attrs.first.fetch(self.class.pipe_attr_name)
    end

    def handle_segment_burst(error, step)
      @failed_step = step.name.split('::').last.underscore

      self.class.rescueable_block.call(error, failed_step) if error.class.in?(self.class.rescueable_errors)

      raise
    end
  end
end

Version data entries

1 entries across 1 versions & 1 rubygems

Version Path
nxt_pipeline-0.2.0 lib/nxt_pipeline/pipeline.rb