#!/usr/bin/env ruby require "rubygems" require "mandy" require 'optparse' require 'ostruct' require 'cgi' options = OpenStruct.new OptionParser.new do |opts| opts.banner = "USAGE: mandy-hadoop script input output [options]" opts.on("-p", "--payload PAYLOAD", "Add a working directory to be sent to the cluster.") do |payload| options.payload = payload end opts.on("-c", "--conf HADOOP_CONF", "Use this cluster xml config file.") do |config| options.config = config end opts.on("-v", '--variables name=value', "Pass additional parameters to jobs") do |config| options.cmdenv = config end opts.on("-j", '--json {"key":"1 value"}', "Pass JSON encoded parameters to jobs") do |config| options.cmdenv = "json=#{CGI.escape(config)}" end opts.on("-g", '--gemfile filepath', "Path to your jobs Gemfile (defaults to ./Gemfile)") do |config| options.gemfile = config end opts.on_tail("-h", "--help", "Show this message") do puts opts exit end end.parse! exec('mandy-hadoop -h') unless ARGV.size >= 3 def absolute_path(path) path =~ /^\// ? path : File.join(Dir.pwd, path) end def gemfile(file) path = absolute_path(file || 'Gemfile') File.exist?(path) ? path : File.expand_path(File.join(File.dirname(__FILE__), '..', 'Gemfile')) end file = ARGV[0] filename = File.basename(file) input = ARGV[1] output_folder = ARGV[2] config = options.config || 'cluster.xml' puts "Packaging Gems for distribution..." payload = Mandy::Packer.pack(file, options.payload || ARGV[0], gemfile(options.gemfile)) cmdenv = options.cmdenv at_exit do puts "Cleaning up..." Mandy::Packer.cleanup!(payload) puts "All done!" end puts "Loading Mandy scripts..." require absolute_path(file) output = nil puts "Sending jobs to Hadoop..." Mandy::Job.jobs.each_with_index do |job, i| jobconf = job.settings.map { |key, value| %(-D #{key}='#{value}') }.join(' ') output = File.join(output_folder, "#{i+1}-#{job.name.downcase.gsub(/\W/, '-')}") bootstrap_file = File.expand_path(File.join(File.dirname(__FILE__), '..', 'bootstrap.rb')) inputreader = job.input_format == :xml ? "StreamXmlRecordReader,begin=<#{job.input_format_options[:xml_tag]} ,end=</#{job.input_format_options[:xml_tag]}>" : nil command = %($HADOOP_HOME/bin/hadoop jar $HADOOP_HOME/contrib/streaming/hadoop-*-streaming.jar #{jobconf}\ -files "#{payload}","#{bootstrap_file}" \ -conf '#{config}' \ -input "#{input}" \ #{ inputreader.nil? ? '' : "-inputreader \"#{inputreader}\"" } \ -mapper "ruby bootstrap.rb #{File.basename(payload)} map #{filename} '#{job.name}'" \ -reducer "ruby bootstrap.rb #{File.basename(payload)} reduce #{filename} '#{job.name}'" \ #{ cmdenv.nil? ? '' : "-cmdenv #{cmdenv}" }\ -output "#{output}") `#{command}` # puts "#{command}" input = output end # print out the output location so caller can know where to get the results from puts output