module Wukong
module Hadoop
# Implements logic for figuring out the correct reducer
# commandline given wu-hadoop's arguments and whether or not to
# run a map-only (no-reduce) job.
module ReduceLogic
# Return the actual commandline used by the reducer, whether
# running in local or Hadoop mode.
#
# You should be able to copy, paste, and run this command
# unmodified to debug the reducer.
#
# @return [String]
def reducer_commandline
return '' unless reduce?
return settings[:reduce_command] if explicit_reduce_command?
arg = (mode == :hadoop ? File.basename(reducer_arg) : reducer_arg)
[command_prefix, 'wu-local', arg].tap do |cmd|
cmd << "--run=#{reducer_name}" if reducer_needs_run_arg?
cmd << non_wukong_hadoop_params_string
end.compact.map(&:to_s).reject(&:empty?).join(' ')
end
# Were we given an explicit reduce command (like 'uniq -c') or
# are we to introspect and construct the command?
#
# @return [true, false]
def explicit_reduce_command?
settings[:reduce_command]
end
# Were we given a processor to use as our reducer explicitly by
# name or are we to introspect to discover the correct
# processor?
#
# @return [true, false]
def explicit_reduce_processor?
settings[:reducer]
end
# Were we given an explicit reducer (either as a command or as a
# processor) or should we introspect to find one?
#
# @return [true, false]
def explicit_reducer?
explicit_reduce_processor? || explicit_reduce_command?
end
# The argument that we should introspect on to turn into our
# reducer.
#
# @return [String]
def reducer_arg
args.last
end
# Should we perform a reduce or is this a map-only job?
#
# We will definitely reduce if
#
# - given an explicit --reduce_command
# - we discovered a reducer
#
# We will not reduce if:
#
# - --reduce_tasks was explicitly set to 0
#
# @return [true, false]
def reduce?
return false if settings[:reduce_tasks] && settings[:reduce_tasks].to_i == 0
return true if settings[:reduce_command]
return true if reducer_name
false
end
# Is this a map-only job?
#
# @see #reduce?
#
# @return [true, false]
def map_only?
(! reduce?)
end
# Does the reducer commandline need an explicit --run argument?
#
# Will not be used if the processor name is the same as the name
# of the script.
#
# @return [true, false]
def reducer_needs_run_arg?
return false if reducer_arg.to_s == reducer_name.to_s
return false if File.basename(reducer_arg.to_s, '.rb') == reducer_name
true
end
# Return the name of the processor to use as the reducer.
#
# Will raise a Wukong::Error if a given reducer is
# invalid. Will return nil if no reducer can be guessed.
#
# Most of the logic that examines explicit command line
# arguments and checks for the existence of named processors or
# files is here.
#
# @return [String]
def reducer_name
case
when explicit_reducer?
if processor_registered?(settings[:reducer])
settings[:reducer]
else
raise Error.new("No such processor: '#{settings[:reducer]}'")
end
when single_job_arg? && explicit_mapper? && processor_registered?(reducer_arg)
reducer_arg
when separate_map_and_reduce_args? && processor_registered?(reducer_arg)
reducer_arg
when separate_map_and_reduce_args? && file_is_processor?(reducer_arg)
processor_name_from_file(reducer_arg)
when processor_registered?('reducer')
'reducer'
end
end
end
end
end