#!/usr/bin/env ruby
require "rubygems"
require "mandy"
require 'optparse'
require 'ostruct'
require 'cgi'

options = OpenStruct.new

OptionParser.new do |opts|
  opts.banner = "USAGE: mandy-hadoop script input output [options]"

  opts.on("-p", "--payload PAYLOAD", "Add a working directory to be sent to the cluster.") do |payload|
    options.payload = payload
  end
  
  opts.on("-c", "--conf HADOOP_CONF", "Use this cluster xml config file.") do |config|
    options.config = config
  end
  
  opts.on("-v", '--variables name=value', "Pass additional parameters to jobs") do |config|
    options.cmdenv = config
  end
  
  opts.on("-j", '--json {"key":"1 value"}', "Pass JSON encoded parameters to jobs") do |config|
    options.cmdenv = "json=#{CGI.escape(config)}"
  end
  
  opts.on("-g", '--gemfile filepath', "Path to your jobs Gemfile (defaults to ./Gemfile)") do |config|
    options.gemfile = config
  end
  
  opts.on_tail("-h", "--help", "Show this message") do
    puts opts
    exit
  end
end.parse!

exec('mandy-hadoop -h') unless ARGV.size >= 3

def absolute_path(path)
  path =~ /^\// ? path : File.join(Dir.pwd, path)
end

def gemfile(file)
  path = absolute_path(file || 'Gemfile')
  File.exist?(path) ? path : File.expand_path(File.join(File.dirname(__FILE__), '..', 'Gemfile'))
end

file   = ARGV[0]
filename = File.basename(file)
input  = ARGV[1]
output_folder = ARGV[2]
config = options.config || 'cluster.xml'
puts "Packaging Gems for distribution..."
payload = Mandy::Packer.pack(file, options.payload || ARGV[0], gemfile(options.gemfile))
cmdenv = options.cmdenv

at_exit do 
  puts "Cleaning up..."
  Mandy::Packer.cleanup!(payload)
  puts "All done!"
end

puts "Loading Mandy scripts..."
require absolute_path(file)

output = nil

puts "Sending jobs to Hadoop..."

Mandy::Job.jobs.each_with_index do |job, i|
  
  jobconf = job.settings.map { |key, value| %(-D #{key}='#{value}') }.join(' ')
  output = File.join(output_folder, "#{i+1}-#{job.name.downcase.gsub(/\W/, '-')}")
  
  bootstrap_file = File.expand_path(File.join(File.dirname(__FILE__), '..', 'bootstrap.rb'))
  inputreader = job.input_format == :xml ? "StreamXmlRecordReader,begin=<#{job.input_format_options[:xml_tag]} ,end=</#{job.input_format_options[:xml_tag]}>" : nil
  
  command = %($HADOOP_HOME/bin/hadoop jar $HADOOP_HOME/contrib/streaming/hadoop-*-streaming.jar #{jobconf}\
                  -files "#{payload}","#{bootstrap_file}" \
                  -conf '#{config}' \
                  -input "#{input}"  \
                  #{ inputreader.nil? ? '' : "-inputreader \"#{inputreader}\"" } \
                  -mapper "ruby bootstrap.rb #{File.basename(payload)} map #{filename} '#{job.name}'"  \
                  -reducer "ruby bootstrap.rb #{File.basename(payload)} reduce #{filename} '#{job.name}'"  \
                  #{ cmdenv.nil? ? '' : "-cmdenv #{cmdenv}" }\
                  -output "#{output}")

  `#{command}`

  # puts "#{command}"
  input = output
end

# print out the output location so caller can know where to get the results from
puts output