Sha256: b429d84ef592b7dd7385a2123f477d71c32068bf6e967d2f18cfdc9d85d13a37

Contents?: true

Size: 1.62 KB

Versions: 1

Compression:

Stored size: 1.62 KB

Contents

# Copyright 2009, Grégoire Marabout. All Rights Reserved.
#
# This is free software. Please see the LICENSE and COPYING files for details.

require 'cascading/base'
require 'yaml'

module Cascading
  class Cascade < Cascading::Node
    extend Registerable

    def initialize(name)
      super(name, nil) # A Cascade cannot have a parent
      self.class.add(name, self)
    end

    def flow(name, &block)
      raise "Could not build flow '#{name}'; block required" unless block_given?
      flow = Flow.new(name, self)
      add_child(flow)
      flow.instance_eval(&block)
      flow
    end

    def draw(dir, properties = nil)
      @children.each do |name, flow|
        flow.connect(properties).writeDOT("#{dir}/#{name}.dot")
      end
    end

    def sink_metadata
      @children.inject({}) do |sink_fields, (name, flow)|
        sink_fields[name] = flow.sink_metadata
        sink_fields
      end
    end

    def write_sink_metadata(file_name)
      File.open(file_name, 'w') do |file|
        YAML.dump(sink_metadata, file)
      end
    end

    def complete(properties = nil)
      begin
        Java::CascadingCascade::CascadeConnector.new.connect(name, make_flows(@children, properties)).complete
      rescue NativeException => e
        raise CascadingException.new(e, 'Error completing cascade')
      end
    end

    private

    def make_flows(flows, properties)
      flow_instances = flows.map do |name, flow|
        cascading_flow = flow.connect(properties)
        flow.listeners.each { |l| cascading_flow.addListener(l) }
        cascading_flow
      end
      flow_instances.to_java(Java::CascadingFlow::Flow)
    end
  end
end

Version data entries

1 entries across 1 versions & 1 rubygems

Version Path
cascading.jruby-0.0.4 lib/cascading/cascade.rb