require 'shellwords' require_relative("runner/overwritables") require_relative("runner/map_logic") require_relative("runner/reduce_logic") require_relative("runner/local_invocation") require_relative("runner/hadoop_invocation") module Wukong module Hadoop # The Hadoop::Runner class contains the logic to examine # arguments and construct command lines which it will execute to # create the desired behavior. # # The Hadoop::Runner will introspect on its arguments to guess (if # not given) the processors to use as mapper and reducer in a # map/reduce job. It will also decide whether to run that job in # local or Hadoop mode. These decisions result in a command which # it will ultimately execute. class HadoopRunner < Wukong::Runner usage "PROCESSOR|FLOW [PROCESSOR|FLOW]" description < 2 if mode == :hadoop && (input_paths.nil? || input_paths.empty? || output_path.nil? || output_path.empty?) raise Error.new("Explicit --input and --output paths are required to run a job in Hadoop mode.") end true end # Run this command. def run if mode == :local log.info "Launching local!" execute_command!(local_commandline) else remove_output_path! if settings[:rm] || settings[:overwrite] hadoop_commandline log.info "Launching Hadoop!" execute_command!(hadoop_commandline) end end # What mode is this runner in? # # @return [:hadoop, :local] def mode settings[:mode].to_s == 'local' ? :local : :hadoop end # Were mapper and/or reducer named by a single argument? # # @return [true, false] def single_job_arg? args.size == 1 end # Were mapper and/or reducer named by separate arguments? # # @return [true, false] def separate_map_and_reduce_args? args.size == 2 end # Is there a processor registered with the given `name`? # # @param [#to_s] name # @return [true, false] def processor_registered? name Wukong.registry.registered?(name.to_s.to_sym) end # Return the guessed name of a processor at the given `path`. # # @param [String] path # @return [String] def processor_name_from_file(path) File.basename(path, '.rb') end # Does the given `path` contain a processor named after itself? # # @param [String] path # @return [true, false] def file_is_processor?(path) return false unless path processor_registered?(processor_name_from_file(path)) end # The prefix to insert befor all invocations of the # wu-local runner. # # @return [String] def command_prefix settings[:command_prefix] end # Returns parameters to pass to an invocation of # wu-local. # # Parameters like --reduce_tasks which are relevant to # Wukong-Hadoop will be interpreted and *not* passed. Others # will be passed unmodified. # # @return [String] def non_wukong_hadoop_params_string params_to_pass.reject do |param, val| params_to_pass.definition_of(param, :wukong_hadoop) end.map do |param,val| "--#{param}=#{Shellwords.escape(val.to_s)}" end.join(" ") end # Execute a command composed of the given parts. # # Will print the command instead of the --dry_run # option was given. # # @param [Array] argv def execute_command!(*argv) command = argv.flatten.reject(&:blank?).join(" \\\n ") if settings[:dry_run] log.info("Dry run:") puts command else puts `#{command}` raise Error.new("Command failed!") unless $?.success? end end end end end