Sha256: 6ec5e460eaf92ccfd0175c99c14a757158d1fec787376dcb58d666278aa4a418
Contents?: true
Size: 1.18 KB
Versions: 2
Compression:
Stored size: 1.18 KB
Contents
require 'core' require 'java' require 'mapred_factory' import 'org.apache.hadoop.io.IntWritable' import 'org.apache.hadoop.io.Text' include HadoopDsl # Hadoop IO types HadoopDsl::Text = Text HadoopDsl::IntWritable = IntWritable def map(key, value, output, reporter, script) mapper = MapperFactory.create(script, key.to_string, value.to_string) mapper.run write(output, mapper) end def reduce(key, values, output, reporter, script) ruby_values = values.map {|v| to_ruby(v)} reducer = ReducerFactory.create(script, key.to_string, ruby_values) reducer.run write(output, reducer) end def setup(conf, script) setup = SetupFactory.create(script, conf) setup.run setup.paths.to_java end private def write(output, controller) controller.emitted.each do |e| e.each do |k, v| output.collect(to_hadoop(k), to_hadoop(v)) end end end def to_ruby(value) case value when IntWritable then value.get when Text then value.to_string else raise "no match class: #{value.class}" end end def to_hadoop(value) case value when Integer then IntWritable.new(value) when String then t = Text.new; t.set(value); t else raise "no match class: #{value.class}" end end
Version data entries
2 entries across 2 versions & 1 rubygems
Version | Path |
---|---|
hadoop-rubydsl-0.0.2 | lib/init.rb |
hadoop-rubydsl-0.0.1 | lib/init.rb |