Sha256: aa205277fc1cd458a805654a2903c12a378c3134d57a0b5fc2b31581ce256016
Contents?: true
Size: 1.5 KB
Versions: 1
Compression:
Stored size: 1.5 KB
Contents
module NxtPipeline class Pipeline attr_reader :burst_segment def initialize(*attrs) extract_pipe_attr_from_init_params(*attrs) end def call self.segments.reduce(pipe_attr) do |transformed_pipe_attr, segment| segment[pipe_attr_name].new(pipe_attr_name => transformed_pipe_attr).pipe_through rescue => error handle_segment_burst(error, segment) end end def burst? burst_segment.present? end class << self def pipe_attr(name) self.pipe_attr_name = name end def mount_segment(name) self.segments << name end def rescue_segment_burst(*errors, &block) self.rescueable_segment_bursts = errors self.rescueable_block = block end end private attr_reader :pipe_attr cattr_accessor :pipe_attr_name cattr_accessor :segments, instance_writer: false, default: [] cattr_accessor :rescueable_segment_bursts, instance_writer: false, default: [] cattr_accessor :rescueable_block, instance_writer: false def extract_pipe_attr_from_init_params(*attrs) raise ArgumentError, 'You need to pass a keyword param as argument to #new' unless attrs.first.is_a?(Hash) @pipe_attr = attrs.first.fetch(pipe_attr_name) end def handle_segment_burst(error, segment) @burst_segment = segment.name.split('::').last.underscore self.rescueable_block.call(error, burst_segment) if error.class.in?(rescueable_segment_bursts) raise end end end
Version data entries
1 entries across 1 versions & 1 rubygems
Version | Path |
---|---|
nxt_pipeline-0.1.0 | lib/nxt_pipeline/pipeline.rb |