#!/usr/bin/env ruby require "rubygems" require "mandy" require 'optparse' require 'ostruct' require 'cgi' errors = false options = OpenStruct.new OptionParser.new do |opts| opts.banner = "USAGE: mandy-hadoop script input output [options]" opts.on("-p", "--payload PAYLOAD", "Add a working directory to be sent to the cluster.") do |payload| options.payload = payload end opts.on("-c", "--conf HADOOP_CONF", "Use this cluster xml config file.") do |config| options.config = config end opts.on("-v", '--variables name=value', "Pass additional parameters to jobs") do |config| options.cmdenv = config end opts.on("-j", '--json {"key":"1 value"}', "Pass JSON encoded parameters to jobs") do |config| options.cmdenv = "json=#{CGI.escape(config)}" end opts.on("-g", '--gemfile filepath', "Path to your jobs gem yml file (defaults to ./gems.yml)") do |config| options.gemfile = config end opts.on_tail("-h", "--help", "Show this message") do puts opts exit end end.parse! exec('mandy-hadoop -h') unless ARGV.size >= 3 def absolute_path(path) path =~ /^\// ? path : File.join(Dir.pwd, path) end def gemfile(file) path = absolute_path(file || 'gems.yml') File.exist?(path) ? path : nil end def set_env(opts_string) opts_string.split(' ').each do |pair| key, value = pair.split("=") ENV[key] = value end end file = ARGV[0] filename = File.basename(file) inputs = ARGV[1].split(",") input = inputs.map {|path| "-input \"#{path}\""}.join(" ") output_folder = ARGV[2] config = absolute_path(options.config || 'cluster.xml') puts "Packaging code for distribution..." payload = Mandy::Packer.pack(file, options.payload || ARGV[0], gemfile(options.gemfile)) cmdenv = options.cmdenv set_env(cmdenv) at_exit do puts puts "Cleaning up..." Mandy::Packer.cleanup!(payload) puts errors ? "Completed with errors!" : "Completed Successfully!" end puts "Loading Mandy scripts..." require absolute_path(file) output = nil puts begin Mandy::Job.jobs.each_with_index do |job, i| puts "Submitting Job: [#{i+1}] #{job.name}..." jobconf = job.settings.map { |key, value| %(-D #{key}='#{value}') }.join(' ') output = File.join(output_folder, "#{i+1}-#{job.name.downcase.gsub(/\W/, '-')}") bootstrap_file = File.expand_path(File.join(File.dirname(__FILE__), '..', 'bootstrap.rb')) inputreader = job.input_format == :xml ? "StreamXmlRecordReader,begin=<#{job.input_format_options[:xml_tag]} ,end=</#{job.input_format_options[:xml_tag]}>" : nil command = %($HADOOP_HOME/bin/hadoop jar $HADOOP_HOME/contrib/streaming/hadoop-*-streaming.jar #{jobconf}\ -files "#{payload}","#{bootstrap_file}" \ -conf '#{config}' \ #{input} \ #{ inputreader.nil? ? '' : "-inputreader \"#{inputreader}\"" } \ -mapper "ruby bootstrap.rb #{File.basename(payload)} map #{filename} '#{job.name}'" \ -reducer "ruby bootstrap.rb #{File.basename(payload)} reduce #{filename} '#{job.name}'" \ #{ cmdenv.nil? ? '' : "-cmdenv #{cmdenv}" }\ -output "#{output}" 2>&1) result = [] IO.popen(command, 'r') do |subprocess| while line = subprocess.gets if line.include?('Running job:') job_id = line.split(' ').last.strip puts "Job ID: #{job_id}" puts "Kill Command: mandy-kill #{job_id} -c #{config}" end puts "Tracking URL: #{line.split(' ').last.strip}" if line.include?('Tracking URL:') result << line end end raise(Mandy::HadoopJobFailure.new(job, result.join("\n"))) unless $?.to_i==0 puts # puts "#{command}" input = "-input #{output}" end # print out the output location so caller can know where to get the results from puts output rescue Mandy::HadoopJobFailure => e errors = true STDERR.puts e.to_s exit(1) end