module Wukong module Hadoop # Provides methods for executing a map/reduce job on a Hadoop # cluster via {Hadoop # streaming}[http://hadoop.apache.org/docs/r0.15.2/streaming.html]. module HadoopInvocation # Remove the output path. # # Will not actually do anything if the --dry_run option # is also given. def remove_output_path! cmd = %Q{#{hadoop_runner} fs -rmr '#{output_path}'} log.info "Removing output file #{output_path}: #{cmd}" puts `#{cmd}` unless settings[:dry_run] end # Return the Hadoop command used to launch this job in a Hadoop # cluster. # # You should be able to copy, paste, and run this command # unmodified when debugging. # # @return [String] def hadoop_commandline [ hadoop_runner, "jar #{settings[:hadoop_home]}/contrib/streaming/hadoop-*streaming*.jar", hadoop_jobconf_options, "-D mapred.job.name='#{job_name}'", hadoop_other_args, hadoop_files, "-mapper '#{mapper_commandline}'", "-reducer '#{reducer_commandline}'", "-input '#{input_paths}'", "-output '#{output_path}'", io_formats, hadoop_recycle_env, ].flatten.compact.join(" \t\\\n ") end # The job name that will be passed to Hadoop. # # Respects the --job_name option if given, otherwise # constructs one from the given processors, input, and output # paths. # # @return [String] def job_name return settings[:job_name] if settings[:job_name] relevant_filename = args.compact.uniq.map { |path| File.basename(path, '.rb') }.join('-') "#{relevant_filename}---#{input_paths}---#{output_path}".gsub(%r{[^\w/\.\-\+]+}, '') end # The input format to use. # # Respects the value of --input_format. # # @return [String] def input_format settings[:input_format] end # The output format to use. # # Respects the value of --output_format. # # @return [String] def output_format settings[:output_format] end # :nodoc: def io_formats input = "-inputformat '#{input_format}'" if input_format output = "-outputformat '#{output_format}'" if output_format [input, output] end # The name of the Hadoop binary to use. # # Respects the value of --hadoop_runner if given. # # @return [String] def hadoop_runner settings[:hadoop_runner] || File.join(settings[:hadoop_home], 'bin/hadoop') end # Return an array of jobconf (-D) options that will be passed to Hadoop. # # Translates the "friendly" wu-hadoop names into the # less-friendly Hadoop names. # # @return [Array] def hadoop_jobconf_options jobconf_options = [] settings[:reuse_jvms] = '-1' if (settings[:reuse_jvms] == true) settings[:respect_exit_status] = 'false' if (settings[:ignore_exit_status] == true) # If no reducer and no reduce_command, then skip the reduce phase settings[:reduce_tasks] ||= 0 unless reduce? # Fields hadoop should use to distribute records to reducers unless settings[:partition_fields].blank? jobconf_options += [jobconf(:partition_fields), jobconf(:output_field_separator)] end jobconf_options += [ :io_sort_mb, :io_sort_record_percent, :map_speculative, :map_tasks, :max_maps_per_cluster, :max_maps_per_node, :max_node_map_tasks, :max_node_reduce_tasks, :max_reduces_per_cluster, :max_reduces_per_node, :max_record_length, :min_split_size, :output_field_separator, :key_field_separator, :partition_fields, :sort_fields, :reduce_tasks, :respect_exit_status, :reuse_jvms, :timeout, :max_tracker_failures, :max_map_attempts, :max_reduce_attempts, :reduce_speculative ].map do |opt| defn = settings.definition_of(opt, :description) val = settings[opt] java_opt(defn, val) end jobconf_options.flatten.compact end # Returns other arguments used by Hadoop streaming. # # @return [String] def hadoop_other_args extra_str_args = parsed_java_opts if settings[:split_on_xml_tag] extra_str_args << %Q{-inputreader 'StreamXmlRecordReader,begin=<#{options.split_on_xml_tag}>,end='} end extra_str_args << ' -lazyOutput' if settings[:noempty] # don't create reduce file if no records extra_str_args << ' -partitioner org.apache.hadoop.mapred.lib.KeyFieldBasedPartitioner' unless settings[:partition_fields].blank? extra_str_args end # :nodoc: # # http://hadoop.apache.org/docs/r0.20.2/streaming.html#Package+Files+With+Job+Submissions def hadoop_files args.find_all { |arg| arg.to_s =~ /\.rb$/ }.each do |arg| settings[:files] << arg end [].tap do |files_options| { :files => '-files ', :jars => '-libjars ', :archives => '-archives ' }.each_pair do |file_type_name, file_option_name| unless settings[file_type_name].nil? || settings[file_type_name].empty? files = settings[file_type_name].map do |file_name_or_glob| # Don't glob on the HDFS file_type_name == :archives ? file_name_or_glob : [Dir[file_name_or_glob], file_name_or_glob] end.flatten.compact.uniq.join(',') files_options << "#{file_option_name}'#{files}'" end end end end # :nodoc: def ruby_interpreter_path Pathname.new(File.join(Config::CONFIG['bindir'], Config::CONFIG['RUBY_INSTALL_NAME'] + Config::CONFIG['EXEEXT'])).realpath end # :nodoc: def use_alternative_gemfile ENV['BUNDLE_GEMFILE'] = settings[:gemfile] end # :nodoc: def hadoop_recycle_env use_alternative_gemfile if settings[:gemfile] %w[BUNDLE_GEMFILE LANG].map{ |var| %Q{-cmdenv '#{var}=#{ENV[var]}'} if ENV[var] }.compact end # :nodoc: def parsed_java_opts settings[:java_opts].map do |java_opt| java_opt.split('-D').reject{ |opt| opt.blank? }.map{ |opt| '-D ' + opt.strip } end.flatten end # :nodoc: def java_opt option, value "-D %s=%s" % [option, Shellwords.escape(value.to_s)] if value end end end end