Sha256: f2e5edfb0f4db33b426a47b703f8961c0a8f1de769f62efaf01f3f1211177967

Contents?: true

Size: 1.98 KB

Versions: 3

Compression:

Stored size: 1.98 KB

Contents

module Wukong

  # Provides a very Ruby-minded way of walking a dataflow connected to
  # a driver.
  class Wiring

    # The driver instance that likely calls the #start_with method and
    # provides a #process method to be called by this wiring.
    attr_accessor :driver

    # The dataflow being wired.
    attr_accessor :dataflow

    # Construct a new Wiring for the given `driver` and `dataflow`.
    #
    # @param [#process] driver
    # @param [Wukong::Dataflow] dataflow
    def initialize(driver, dataflow)
      @driver    = driver
      @dataflow  = dataflow
    end

    # Return a proc which, if called with a record, will process that
    # record through each of the given `stages` as well as through the
    # rest of the dataflow ahead of them.
    #
    # @param [Array<Wukong::Stage>] stages
    # @return [Proc]
    def start_with(*stages)
      to_proc.curry.call(stages)
    end

    # Return a proc (the output of #start_with) which will process
    # records through the stages that are ahead of the given stage.
    #
    # @param [Wukong::Stage] stage
    # @return [Proc]
    #
    # @see #start_with
    def advance(stage)
      # This is where the tree of procs will terminate, but only after
      # having passed all output records through the driver -- the
      # last "stage".
      return start_with() if stage.nil? || stage == driver

      # Otherwise we're still in the middle of the tree...
      descendents = dataflow.descendents(stage)
      if descendents.empty?
        # No descendents it means we've reached a leaf of the tree so
        # we'll run records through the driver to generate output.
        start_with(driver)
      else
        # Otherwise continue down the tree of procs...
        start_with(*descendents)
      end
    end
    
    # :nodoc:
    def to_proc
      return @wiring if @wiring
      @wiring = Proc.new do |stages, record|
        stages.each do |stage|
          stage.process(record, &advance(stage)) if stage
        end
      end
    end
  end
end

Version data entries

3 entries across 3 versions & 2 rubygems

Version Path
ul-wukong-4.1.1 lib/wukong/driver/wiring.rb
ul-wukong-4.1.0 lib/wukong/driver/wiring.rb
wukong-4.0.0 lib/wukong/driver/wiring.rb