Sha256: b429d84ef592b7dd7385a2123f477d71c32068bf6e967d2f18cfdc9d85d13a37
Contents?: true
Size: 1.62 KB
Versions: 1
Compression:
Stored size: 1.62 KB
Contents
# Copyright 2009, Grégoire Marabout. All Rights Reserved. # # This is free software. Please see the LICENSE and COPYING files for details. require 'cascading/base' require 'yaml' module Cascading class Cascade < Cascading::Node extend Registerable def initialize(name) super(name, nil) # A Cascade cannot have a parent self.class.add(name, self) end def flow(name, &block) raise "Could not build flow '#{name}'; block required" unless block_given? flow = Flow.new(name, self) add_child(flow) flow.instance_eval(&block) flow end def draw(dir, properties = nil) @children.each do |name, flow| flow.connect(properties).writeDOT("#{dir}/#{name}.dot") end end def sink_metadata @children.inject({}) do |sink_fields, (name, flow)| sink_fields[name] = flow.sink_metadata sink_fields end end def write_sink_metadata(file_name) File.open(file_name, 'w') do |file| YAML.dump(sink_metadata, file) end end def complete(properties = nil) begin Java::CascadingCascade::CascadeConnector.new.connect(name, make_flows(@children, properties)).complete rescue NativeException => e raise CascadingException.new(e, 'Error completing cascade') end end private def make_flows(flows, properties) flow_instances = flows.map do |name, flow| cascading_flow = flow.connect(properties) flow.listeners.each { |l| cascading_flow.addListener(l) } cascading_flow end flow_instances.to_java(Java::CascadingFlow::Flow) end end end
Version data entries
1 entries across 1 versions & 1 rubygems
Version | Path |
---|---|
cascading.jruby-0.0.4 | lib/cascading/cascade.rb |