#!/usr/bin/env ruby
require "rubygems"
require "mandy"
require 'optparse'
require 'ostruct'
require 'cgi'

errors = false
options = OpenStruct.new

OptionParser.new do |opts|
  opts.banner = "USAGE: mandy-hadoop script input output [options]"

  opts.on("-p", "--payload PAYLOAD", "Add a working directory to be sent to the cluster.") do |payload|
    options.payload = payload
  end
  
  opts.on("-c", "--conf HADOOP_CONF", "Use this cluster xml config file.") do |config|
    options.config = config
  end
  
  opts.on("-v", '--variables name=value', "Pass additional parameters to jobs") do |config|
    options.cmdenv = config
  end
  
  opts.on("-j", '--json {"key":"1 value"}', "Pass JSON encoded parameters to jobs") do |config|
    options.cmdenv = "json=#{CGI.escape(config)}"
  end
  
  opts.on("-g", '--gemfile filepath', "Path to your jobs gem yml file (defaults to ./gems.yml)") do |config|
    options.gemfile = config
  end
  
  opts.on_tail("-h", "--help", "Show this message") do
    puts opts
    exit
  end
end.parse!

exec('mandy-hadoop -h') unless ARGV.size >= 3

def absolute_path(path)
  path =~ /^\// ? path : File.join(Dir.pwd, path)
end

def gemfile(file)
  path = absolute_path(file || 'gems.yml')
  File.exist?(path) ? path : nil
end

def set_env(opts_string)
  opts_string.split(' ').each do |pair|
    key, value = pair.split("=")
    ENV[key] = value
  end
end

file   = ARGV[0]
filename = File.basename(file)
inputs  = ARGV[1].split(",")
input = inputs.map {|path| "-input \"#{path}\""}.join(" ")

output_folder = ARGV[2]
config = absolute_path(options.config || 'cluster.xml')
puts "Packaging code for distribution..."
payload = Mandy::Packer.pack(file, options.payload || ARGV[0], gemfile(options.gemfile))  
cmdenv = options.cmdenv
set_env(cmdenv)

at_exit do
  puts
  puts "Cleaning up..."
  Mandy::Packer.cleanup!(payload)
  puts errors ? "Completed with errors!" : "Completed Successfully!"
end

puts "Loading Mandy scripts..."
require absolute_path(file)

output = nil
puts
begin
  Mandy::Job.jobs.each_with_index do |job, i|
    puts "Submitting Job: [#{i+1}] #{job.name}..."
    
    jobconf = job.settings.map { |key, value| %(-D #{key}='#{value}') }.join(' ')
    output = File.join(output_folder, "#{i+1}-#{job.name.downcase.gsub(/\W/, '-')}")
  
    bootstrap_file = File.expand_path(File.join(File.dirname(__FILE__), '..', 'bootstrap.rb'))
    inputreader = job.input_format == :xml ? "StreamXmlRecordReader,begin=<#{job.input_format_options[:xml_tag]} ,end=</#{job.input_format_options[:xml_tag]}>" : nil
  
    command = %($HADOOP_HOME/bin/hadoop jar $HADOOP_HOME/contrib/streaming/hadoop-*-streaming.jar #{jobconf}\
                    -files "#{payload}","#{bootstrap_file}" \
                    -conf '#{config}' \
                    #{input} \
                    #{ inputreader.nil? ? '' : "-inputreader \"#{inputreader}\"" } \
                    -mapper "ruby bootstrap.rb #{File.basename(payload)} map #{filename} '#{job.name}'"  \
                    -reducer "ruby bootstrap.rb #{File.basename(payload)} reduce #{filename} '#{job.name}'"  \
                    #{ cmdenv.nil? ? '' : "-cmdenv #{cmdenv}" }\
                    -output "#{output}" 2>&1)

    result = []
    IO.popen(command, 'r') do |subprocess|
      while line = subprocess.gets
        if line.include?('Running job:')
          job_id = line.split(' ').last.strip
          puts "Job ID: #{job_id}"
          puts "Kill Command: mandy-kill #{job_id} -c #{config}"         
        end
        puts "Tracking URL: #{line.split(' ').last.strip}" if line.include?('Tracking URL:')
        result << line
      end
    end
    
    raise(Mandy::HadoopJobFailure.new(job, result.join("\n"))) unless $?.to_i==0
    puts
    # puts "#{command}"
    input = "-input #{output}"
  end
  # print out the output location so caller can know where to get the results from
  puts output
rescue Mandy::HadoopJobFailure => e
  errors = true
  STDERR.puts e.to_s
  exit(1)
end