Sha256: 3d2937617d9b4f8182db8f0a5c457276419ec64792dab48ea0dca76710760b9d

Contents?: true

Size: 1.63 KB

Versions: 7

Compression:

Stored size: 1.63 KB

Contents

# frozen_string_literal: true

require 'thread'
require 'plumb/composable'

module Plumb
  # A stream that validates each element.
  # Example:
  #   row = Types::Tuple[String, Types::Lax::Integer]
  #   csv_stream = Types::Stream[row]
  #
  #   stream = csv_stream.parse(CSV.new(File.new('data.csv')).to_enum)
  #   stream.each |result|
  #     result.valid? # => true
  #     result.value # => ['name', 10]
  #   end
  class StreamClass
    include Composable

    attr_reader :children

    # @option element_type [Composable] the type of the elements in the stream
    def initialize(element_type: Types::Any)
      @element_type = Composable.wrap(element_type)
      @children = [@element_type].freeze
      freeze
    end

    # return a new Stream definition.
    # @param element_type [Composable] the type of the elements in the stream
    def [](element_type)
      self.class.new(element_type:)
    end

    # The [Step] interface
    # @param result [Result::Valid]
    # @return [Result::Valid, Result::Invalid]
    def call(result)
      return result.invalid(errors: 'is not an Enumerable') unless result.value.respond_to?(:each)

      enum = Enumerator.new do |y|
        result.value.each do |e|
          y << @element_type.resolve(e)
        end
      end

      result.valid(enum)
    end

    # @return [Step] a step that resolves to an Enumerator that filters out invalid elements
    def filtered
      self >> Step.new(nil, 'filtered') do |result|
        set = result.value.lazy.filter_map { |e| e.value if e.valid? }
        result.valid(set)
      end
    end

    private

    def _inspect = "Stream[#{@element_type.inspect}]"
  end
end

Version data entries

7 entries across 7 versions & 1 rubygems

Version Path
plumb-0.0.10 lib/plumb/stream_class.rb
plumb-0.0.9 lib/plumb/stream_class.rb
plumb-0.0.8 lib/plumb/stream_class.rb
plumb-0.0.7 lib/plumb/stream_class.rb
plumb-0.0.6 lib/plumb/stream_class.rb
plumb-0.0.5 lib/plumb/stream_class.rb
plumb-0.0.4 lib/plumb/stream_class.rb