# -*- coding: utf-8 -*- module Wukong module HadoopCommand # =========================================================================== # # Hadoop Environment # # =========================================================================== # # Hadoop Options # # # Translate the simplified args to their hairy-assed hadoop equivalents # Settings.define :max_node_map_tasks, :jobconf => true, :description => 'mapred.tasktracker.map.tasks.maximum', :wukong => true Settings.define :max_node_reduce_tasks, :jobconf => true, :description => 'mapred.tasktracker.reduce.tasks.maximum', :wukong => true Settings.define :map_tasks, :jobconf => true, :description => 'mapred.map.tasks', :wukong => true Settings.define :reduce_tasks, :jobconf => true, :description => 'mapred.reduce.tasks', :wukong => true Settings.define :sort_fields, :jobconf => true, :description => 'stream.num.map.output.key.fields', :wukong => true Settings.define :key_field_separator, :jobconf => true, :description => 'map.output.key.field.separator', :wukong => true Settings.define :partition_fields, :jobconf => true, :description => 'num.key.fields.for.partition', :wukong => true Settings.define :output_field_separator, :jobconf => true, :description => 'stream.map.output.field.separator', :wukong => true Settings.define :map_speculative, :jobconf => true, :description => 'mapred.map.tasks.speculative.execution', :wukong => true Settings.define :timeout, :jobconf => true, :description => 'mapred.task.timeout', :wukong => true Settings.define :reuse_jvms, :jobconf => true, :description => 'mapred.job.reuse.jvm.num.tasks', :wukong => true Settings.define :respect_exit_status, :jobconf => true, :description => 'stream.non.zero.exit.is.failure', :wukong => true Settings.define :noempty, :description => "don't create zero-byte reduce files (hadoop mode only)", :wukong => true Settings.define :job_name, :jobconf => true, :description => 'mapred.job.name', :wukong => true # mapred.linerecordreader.maxlength :description => "Safeguards against corrupted data: lines longer than this (in bytes) are treated as bad records." # emit a -jobconf hadoop option if the simplified command line arg is present # if not, the resulting nil will be elided later def jobconf option if options[option] "-jobconf %s=%s" % [options.description_for(option), options[option]] end end # Define what fields hadoop should treat as the keys def hadoop_sort_args [ jobconf(:key_field_separator), jobconf(:sort_fields), ] end # Define what fields hadoop should use to distribute records to reducers def hadoop_partition_args unless options[:partition_fields].blank? [ '-partitioner org.apache.hadoop.mapred.lib.KeyFieldBasedPartitioner', jobconf(:output_field_separator), jobconf(:partition_fields), ] end end # Emit options for setting the number of mappers and reducers. def hadoop_num_tasks_args [ jobconf(:max_node_map_tasks), jobconf(:max_node_reduce_tasks), jobconf(:map_tasks), jobconf(:reduce_tasks) ] end def hadoop_other_args input_path, output_path extra_str_args = [ options[:extra_args] ] extra_str_args += ' -lazyOutput' if options[:noempty] # don't create reduce file if no records options[:reuse_jvms] = '-1' if (options[:reuse_jvms] == true) options[:respect_exit_status] = 'false' if (options[:ignore_exit_status] == true) options[:job_name] ||= "#{File.basename(this_script_filename)}---#{input_path}---#{output_path}".gsub(%r{[^\w/\.\-\+]+}, '') extra_hsh_args = [:job_name, :map_speculative, :timeout, :reuse_jvms, :respect_exit_status].map{|opt| jobconf(opt) } extra_str_args + extra_hsh_args end def hadoop_recycle_env %w[RUBYLIB].map do |var| %Q{-cmdenv '#{var}=#{ENV[var]}'} if ENV[var] end.compact end # The path to the hadoop runner script def hadoop_runner options[:hadoop_runner] || (options[:hadoop_home]+'/bin/hadoop') end # # Assemble the hadoop command to execute # def hadoop_command input_path, output_path # If this is wrong, create a config/wukong-site.rb or # otherwise set Settings[:hadoop_home] to the # root of your config install. [ hadoop_runner, "jar #{Settings[:hadoop_home]}/contrib/streaming/hadoop-*-streaming.jar", hadoop_partition_args, hadoop_sort_args, hadoop_num_tasks_args, "-mapper '#{map_command}'", "-reducer '#{reduce_command}'", "-input '#{input_path}'", "-output '#{output_path}'", hadoop_recycle_env, hadoop_other_args(input_path, output_path), ].flatten.compact.join(" \t\\\n ") end module ClassMethods # # Via @pskomoroch via @tlipcon, # # "there is a little known Hadoop Streaming trick buried in this Python # script. You will notice that the date is not actually in the raw log # data itself, but is part of the filename. It turns out that Hadoop makes # job parameters you would fetch in Java with something like # job.get("mapred.input.file") available as environment variables for # streaming jobs, with periods replaced with underscores: # # filepath = os.environ["map_input_file"] # filename = os.path.split(filepath)[-1] # Thanks to Todd Lipcon for directing me to that hack. # # "HADOOP_HOME" =>"/usr/lib/hadoop-0.20/bin/..", # "HADOOP_IDENT_STRING" =>"hadoop", # "HADOOP_LOGFILE" =>"hadoop-hadoop-tasktracker-ip-10-242-14-223.log", # "HADOOP_LOG_DIR" =>"/usr/lib/hadoop-0.20/bin/../logs", # "HOME" =>"/var/run/hadoop-0.20", # "JAVA_HOME" =>"/usr/lib/jvm/java-6-sun", # "LD_LIBRARY_PATH" =>"/usr/lib/jvm/java-6-sun-", # "PATH" =>"/usr/local/sbin:/usr/local/bin:/usr/sbin:/usr/bin:/sbin:/bin:/usr/games", # "USER" =>"hadoop", # # "dfs_block_size" =>"134217728", # "map_input_start" =>"0", # "map_input_length" =>"125726898", # "mapred_output_key_class" =>"org.apache.hadoop.io.Text", # "mapred_output_value_class" =>"org.apache.hadoop.io.Text", # "mapred_output_format_class" =>"org.apache.hadoop.mapred.TextOutputFormat", # "mapred_output_compression_codec" =>"org.apache.hadoop.io.compress.DefaultCodec", # "mapred_output_compression_type" =>"BLOCK", # "mapred_task_partition" =>"0", # "mapred_tasktracker_map_tasks_maximum" =>"4", # "mapred_tasktracker_reduce_tasks_maximum" =>"2", # "mapred_tip_id" =>"task_200910221152_0023_m_000000", # "mapred_task_id" =>"attempt_200910221152_0023_m_000000_0", # "mapred_job_tracker" =>"ec2-174-129-141-78.compute-1.amazonaws.com:8021", # # "mapred_input_dir" =>"hdfs://ec2-174-129-141-78.compute-1.amazonaws.com/user/flip/ripd/com.tw/com.twitter.search/20090809", # "map_input_file" =>"hdfs://ec2-174-129-141-78.compute-1.amazonaws.com/user/flip/ripd/com.tw/com.twitter.search/20090809/com.twitter.search+20090809233441-56735-womper.tsv.bz2", # "mapred_working_dir" =>"hdfs://ec2-174-129-141-78.compute-1.amazonaws.com/user/flip", # "mapred_work_output_dir" =>"hdfs://ec2-174-129-141-78.compute-1.amazonaws.com/user/flip/tmp/twsearch-20090809/_temporary/_attempt_200910221152_0023_m_000000_0", # "mapred_output_dir" =>"hdfs://ec2-174-129-141-78.compute-1.amazonaws.com/user/flip/tmp/twsearch-20090809", # "mapred_temp_dir" =>"/mnt/tmp/hadoop-hadoop/mapred/temp", # "PWD" =>"/mnt/hadoop/mapred/local/taskTracker/jobcache/job_200910221152_0023/attempt_200910221152_0023_m_000000_0/work", # "TMPDIR" =>"/mnt/hadoop/mapred/local/taskTracker/jobcache/job_200910221152_0023/attempt_200910221152_0023_m_000000_0/work/tmp", # "stream_map_streamprocessor" =>"%2Fusr%2Fbin%2Fruby1.8+%2Fmnt%2Fhome%2Fflip%2Fics%2Fwuclan%2Fexamples%2Ftwitter%2Fparse%2Fparse_twitter_search_requests.rb+--map+--rm", # "user_name" =>"flip", # HDFS pathname to the input file currently being processed. def input_file ENV['map_input_file'] end # Directory of the input file def input_dir ENV['mapred_input_dir'] end # Offset of this chunk within the input file def map_input_start_offset ENV['map_input_start'] end # length of the mapper's input chunk def map_input_length ENV['map_input_length'] end def attempt_id ENV['mapred_task_id'] end def curr_task_id ENV['mapred_tip_id'] end def script_cmdline_urlenc ENV['stream_map_streamprocessor'] end end # Standard ClassMethods-on-include trick def self.included base base.class_eval do extend ClassMethods end end end end # -inputformat <name of inputformat (class)> (“auto” by default) # -input <additional DFS input path> # -python <python command to use on nodes> (“python” by default) # -name <job name> (“program.py” by default) # -numMapTasks <number> # -numReduceTasks <number> (no sorting or reducing will take place if this is 0) # -priority <priority value> (“NORMAL” by default) # -libjar <path to jar> (this jar gets put in the class path) # -libegg <path to egg> (this egg gets put in the Python path) # -file <local file> (this file will be put in the dir where the python program gets executed) # -cacheFile hdfs://<host>:<fs_port>/<path to file>#<link name> (a link ”<link name>” to the given file will be in the dir) # -cacheArchive hdfs://<host>:<fs_port>/<path to jar>#<link name> (link points to dir that contains files from given jar) # -cmdenv <env var name>=<value> # -jobconf <property name>=<value> # -addpath yes (replace each input key by a tuple consisting of the path of the corresponding input file and the original key) # -fake yes (fake run, only prints the underlying shell commands but does not actually execute them) # -memlimit <number of bytes> (set an upper limit on the amount of memory that can be used)