require 'shellwords'
require_relative("runner/overwritables")
require_relative("runner/map_logic")
require_relative("runner/reduce_logic")
require_relative("runner/local_invocation")
require_relative("runner/hadoop_invocation")
module Wukong
module Hadoop
# The Hadoop::Runner class contains the logic to examine
# arguments and construct command lines which it will execute to
# create the desired behavior.
#
# The Hadoop::Runner will introspect on its arguments to guess (if
# not given) the processors to use as mapper and reducer in a
# map/reduce job. It will also decide whether to run that job in
# local or Hadoop mode. These decisions result in a command which
# it will ultimately execute.
class HadoopRunner < Wukong::Runner
usage "PROCESSOR|FLOW [PROCESSOR|FLOW]"
description < 2
if mode == :hadoop && (input_paths.nil? || input_paths.empty? || output_path.nil? || output_path.empty?)
raise Error.new("Explicit --input and --output paths are required to run a job in Hadoop mode.")
end
true
end
# Run this command.
def run
if mode == :local
log.info "Launching local!"
execute_command!(local_commandline)
else
remove_output_path! if settings[:rm] || settings[:overwrite]
hadoop_commandline
log.info "Launching Hadoop!"
execute_command!(hadoop_commandline)
end
end
# What mode is this runner in?
#
# @return [:hadoop, :local]
def mode
settings[:mode].to_s == 'local' ? :local : :hadoop
end
# Were mapper and/or reducer named by a single argument?
#
# @return [true, false]
def single_job_arg?
args.size == 1
end
# Were mapper and/or reducer named by separate arguments?
#
# @return [true, false]
def separate_map_and_reduce_args?
args.size == 2
end
# Is there a processor registered with the given `name`?
#
# @param [#to_s] name
# @return [true, false]
def processor_registered? name
Wukong.registry.registered?(name.to_s.to_sym)
end
# Return the guessed name of a processor at the given `path`.
#
# @param [String] path
# @return [String]
def processor_name_from_file(path)
File.basename(path, '.rb')
end
# Does the given `path` contain a processor named after itself?
#
# @param [String] path
# @return [true, false]
def file_is_processor?(path)
return false unless path
processor_registered?(processor_name_from_file(path))
end
# The prefix to insert befor all invocations of the
# wu-local runner.
#
# @return [String]
def command_prefix
settings[:command_prefix]
end
# Returns parameters to pass to an invocation of
# wu-local.
#
# Parameters like --reduce_tasks which are relevant to
# Wukong-Hadoop will be interpreted and *not* passed. Others
# will be passed unmodified.
#
# @return [String]
def non_wukong_hadoop_params_string
params_to_pass.reject do |param, val|
params_to_pass.definition_of(param, :wukong_hadoop)
end.map do |param,val|
"--#{param}=#{Shellwords.escape(val.to_s)}"
end.join(" ")
end
# Execute a command composed of the given parts.
#
# Will print the command instead of the --dry_run
# option was given.
#
# @param [Array] argv
def execute_command!(*argv)
command = argv.flatten.reject(&:blank?).join(" \\\n ")
if settings[:dry_run]
log.info("Dry run:")
puts command
else
puts `#{command}`
raise Error.new("Command failed!") unless $?.success?
end
end
end
end
end