Sha256: ed1094da45b0d270f739d1122a4e51862d363437aad872828778c9f0d1025632
Contents?: true
Size: 865 Bytes
Versions: 6
Compression:
Stored size: 865 Bytes
Contents
module Stages class Wrap < Stage def initialize(pipeline, *args) @pipeline = pipeline @output_style = :hash unless args.empty? if args.include? :array @output_style = :array elsif args.include? :each @output_style = :each end @aggregated = true if args.include? :aggregated end super() end def process while value = input subpipe = Emit.new(value) | @pipeline results = [] while v = subpipe.run @output_style == :each ? output(v) : results << v end results = results.first if @aggregated output results if @output_style == :array output({ value => results}) if @output_style == :hash @pipeline.drop_leftmost! @pipeline.reset! end end end end
Version data entries
6 entries across 6 versions & 1 rubygems