Sha256: aa205277fc1cd458a805654a2903c12a378c3134d57a0b5fc2b31581ce256016

Contents?: true

Size: 1.5 KB

Versions: 1

Compression:

Stored size: 1.5 KB

Contents

module NxtPipeline
  class Pipeline
    attr_reader :burst_segment

    def initialize(*attrs)
      extract_pipe_attr_from_init_params(*attrs)
    end

    def call
      self.segments.reduce(pipe_attr) do |transformed_pipe_attr, segment|
        segment[pipe_attr_name].new(pipe_attr_name => transformed_pipe_attr).pipe_through
      rescue => error
        handle_segment_burst(error, segment)
      end
    end

    def burst?
      burst_segment.present?
    end

    class << self
      def pipe_attr(name)
        self.pipe_attr_name = name
      end

      def mount_segment(name)
        self.segments << name
      end

      def rescue_segment_burst(*errors, &block)
        self.rescueable_segment_bursts = errors
        self.rescueable_block = block
      end
    end

    private

    attr_reader :pipe_attr

    cattr_accessor :pipe_attr_name
    cattr_accessor :segments, instance_writer: false, default: []
    cattr_accessor :rescueable_segment_bursts, instance_writer: false, default: []
    cattr_accessor :rescueable_block, instance_writer: false

    def extract_pipe_attr_from_init_params(*attrs)
      raise ArgumentError, 'You need to pass a keyword param as argument to #new' unless attrs.first.is_a?(Hash)
      @pipe_attr = attrs.first.fetch(pipe_attr_name)
    end

    def handle_segment_burst(error, segment)
      @burst_segment = segment.name.split('::').last.underscore

      self.rescueable_block.call(error, burst_segment) if error.class.in?(rescueable_segment_bursts)

      raise
    end
  end
end

Version data entries

1 entries across 1 versions & 1 rubygems

Version Path
nxt_pipeline-0.1.0 lib/nxt_pipeline/pipeline.rb