#!/usr/bin/env ruby require "rubygems" require "mandy" require 'optparse' require 'ostruct' require 'uri' options = OpenStruct.new OptionParser.new do |opts| opts.banner = "USAGE: mandy-hadoop script input output [options]" opts.on("-p", "--payload PAYLOAD", "Add a working directory to be sent to the cluster.") do |payload| options.payload = payload end opts.on("-c", "--conf HADOOP_CONF", "Use this cluster xml config file.") do |config| options.config = config end opts.on("-v", '--variables name=value', "Pass additional parameters to jobs") do |config| options.cmdenv = config end opts.on("-j", '--json {"key":"1 value"}', "Pass JSON encoded parameters to jobs") do |config| options.cmdenv = "json=#{URI.encode(config)}" end opts.on_tail("-h", "--help", "Show this message") do puts opts exit end end.parse! def absolute_path(path) path =~ /^\// ? path : File.join(Dir.pwd, path) end file = ARGV[0] filename = File.basename(file) input = ARGV[1] output_folder = ARGV[2] config = options.config || 'cluster.xml' payload = options.payload ? Mandy::Packer.pack(options.payload) : ARGV[0] cmdenv = options.cmdenv at_exit { Mandy::Packer.cleanup!(payload) } require absolute_path(file) output = nil Mandy::Job.jobs.each_with_index do |job, i| jobconf = job.settings.map { |key, value| %(-D #{key}='#{value}') }.join(' ') output = File.join(output_folder, "#{i+1}-#{job.name.downcase.gsub(/\W/, '-')}") command = %($HADOOP_HOME/bin/hadoop jar $HADOOP_HOME/contrib/streaming/hadoop-*-streaming.jar #{jobconf}\ -conf '#{config}' \ -input "#{input}" \ -mapper "mandy-map #{filename} '#{job.name}' #{File.basename(payload)}" \ -reducer "mandy-reduce #{filename} '#{job.name}' #{File.basename(payload)}" \ -file "#{payload}" \ -cmdenv #{cmdenv} \ -output "#{output}") `#{command}` # puts "#{command}" input = output end # print out the output location so caller can know where to get the results from puts output