Sha256: b9ab3aaae2a81c1f272234bf10439d7692735cea7c6efbb414a5a445b8b60ca5

Contents?: true

Size: 1.49 KB

Versions: 1

Compression:

Stored size: 1.49 KB

Contents

module Wukong
  class DataflowBuilder < Hanuman::GraphBuilder

    def description desc=nil
      @description = desc if desc
      @description
    end
    
    def namespace() Wukong::Dataflow ; end

    def handle_dsl_arguments_for(stage, *args, &action)
      options = args.extract_options!
      stage.merge!(options.merge(action: action).compact)
      stage      
    end
  
    def linkable_name(direction) 
      case direction
      when :in  then directed_sort.first
      when :out then directed_sort.last
      end
    end

    def method_missing(name, *args, &blk)
      if stages[name]
        handle_dsl_arguments_for(stages[name], *args, &blk)
      else
        super
      end
    end
    
  end
  
  class Dataflow < Hanuman::Graph

    def self.description desc=nil
      @description = desc if desc
      @description
    end
    
    def has_input?(stage)
      links.any?{ |link| link.into == stage }
    end
    
    def has_output?(stage)
      links.any?{ |link| link.from == stage }
    end

    def connected?(stage)
      input  = has_input?(stage)  || stages[stage].is_a?(Wukong::Source)
      output = has_output?(stage) || stages[stage].is_a?(Wukong::Sink)
      input && output
    end

    def complete?
      stages.all?{ |(name, stage)| connected? name }
    end

    def setup
      directed_sort.each{ |name| stages[name].setup }
    end

    def run
      stages[directed_sort.first].run
    end

    def stop
      directed_sort.each{ |name| stages[name].stop }
    end

  end
end

Version data entries

1 entries across 1 versions & 1 rubygems

Version Path
wukong-3.0.1 lib/wukong/dataflow.rb