Sha256: a2b989976be26054be3ebc2ce7c227630a74c7ae3b238b839750dd4eb3e70b3a

Contents?: true

Size: 1.77 KB

Versions: 4

Compression:

Stored size: 1.77 KB

Contents

# Copyright 2009, Grégoire Marabout. All Rights Reserved.
#
# This is free software. Please see the LICENSE and COPYING files for details.

require 'cascading/base'
require 'yaml'

module Cascading
  class Cascade < Cascading::Node
    extend Registerable

    def initialize(name)
      super(name, nil) # A Cascade cannot have a parent
      self.class.add(name, self)
    end

    def flow(name, &block)
      raise "Could not build flow '#{name}'; block required" unless block_given?
      flow = Flow.new(name, self)
      add_child(flow)
      flow.instance_eval(&block)
      flow
    end

    def describe(offset = '')
      "#{offset}#{name}:cascade\n#{child_names.map{ |child| children[child].describe("#{offset}  ") }.join("\n")}"
    end

    def draw(dir, properties = nil)
      @children.each do |name, flow|
        flow.connect(properties).writeDOT("#{dir}/#{name}.dot")
      end
    end

    def sink_metadata
      @children.inject({}) do |sink_fields, (name, flow)|
        sink_fields[name] = flow.sink_metadata
        sink_fields
      end
    end

    def write_sink_metadata(file_name)
      File.open(file_name, 'w') do |file|
        YAML.dump(sink_metadata, file)
      end
    end

    def complete(properties = nil)
      begin
        Java::CascadingCascade::CascadeConnector.new.connect(name, make_flows(@children, properties)).complete
      rescue NativeException => e
        raise CascadingException.new(e, 'Error completing cascade')
      end
    end

    private

    def make_flows(flows, properties)
      flow_instances = flows.map do |name, flow|
        cascading_flow = flow.connect(properties)
        flow.listeners.each { |l| cascading_flow.addListener(l) }
        cascading_flow
      end
      flow_instances.to_java(Java::CascadingFlow::Flow)
    end
  end
end

Version data entries

4 entries across 4 versions & 1 rubygems

Version Path
cascading.jruby-0.0.8 lib/cascading/cascade.rb
cascading.jruby-0.0.7 lib/cascading/cascade.rb
cascading.jruby-0.0.6 lib/cascading/cascade.rb
cascading.jruby-0.0.5 lib/cascading/cascade.rb