# -*- coding: utf-8 -*- module Wukong module HadoopCommand # =========================================================================== # # Hadoop Options # # # Translate the simplified args to their hairy-assed hadoop equivalents # HADOOP_OPTIONS_MAP = { :max_node_map_tasks => 'mapred.tasktracker.map.tasks.maximum', :max_node_reduce_tasks => 'mapred.tasktracker.reduce.tasks.maximum', :map_tasks => 'mapred.map.tasks', :reduce_tasks => 'mapred.reduce.tasks', :sort_fields => 'stream.num.map.output.key.fields', :key_field_separator => 'map.output.key.field.separator', :partition_fields => 'num.key.fields.for.partition', :output_field_separator => 'stream.map.output.field.separator', :map_speculative => 'mapred.map.tasks.speculative.execution', :timeout => 'mapred.task.timeout', } # emit a -jobconf hadoop option if the simplified command line arg is present # if not, the resulting nil will be elided later def jobconf option if options[option] "-jobconf %s=%s" % [HADOOP_OPTIONS_MAP[option], options[option]] end end # Define what fields hadoop should treat as the keys def hadoop_sort_args [ jobconf(:key_field_separator), jobconf(:sort_fields), ] end # Define what fields hadoop should use to distribute records to reducers def hadoop_partition_args if options[:partition_fields] [ '-partitioner org.apache.hadoop.mapred.lib.KeyFieldBasedPartitioner', jobconf(:output_field_separator), jobconf(:partition_fields), ] end end # Emit options for setting the number of mappers and reducers. def hadoop_num_tasks_args [ jobconf(:max_node_map_tasks), jobconf(:max_node_reduce_tasks), jobconf(:map_tasks), jobconf(:reduce_tasks) ] end def hadoop_other_args extra_str_args = [ options[:extra_args] ] extra_hsh_args = [:map_speculative, :timeout].map{|opt| jobconf(opt) } extra_str_args + extra_hsh_args end # # Assemble the hadoop command to execute # def hadoop_command input_path, output_path # If this is wrong, create a config/wukong-site.rb or # otherwise set Wukong::CONFIG[:hadoop_home] to the # root of your config install. hadoop_program = Wukong::CONFIG[:hadoop_home]+'/bin/hadoop' [ hadoop_program, "jar #{Wukong::CONFIG[:hadoop_home]}/contrib/streaming/hadoop-*-streaming.jar", hadoop_partition_args, hadoop_sort_args, hadoop_num_tasks_args, "-mapper '#{map_command}'", "-reducer '#{reduce_command}'", "-input '#{input_path}'", "-output '#{output_path}'", hadoop_other_args, ].flatten.compact.join(" \t\\\n ") end end end # -inputformat <name of inputformat (class)> (“auto” by default) # -input <additional DFS input path> # -python <python command to use on nodes> (“python” by default) # -name <job name> (“program.py” by default) # -numMapTasks <number> # -numReduceTasks <number> (no sorting or reducing will take place if this is 0) # -priority <priority value> (“NORMAL” by default) # -libjar <path to jar> (this jar gets put in the class path) # -libegg <path to egg> (this egg gets put in the Python path) # -file <local file> (this file will be put in the dir where the python program gets executed) # -cacheFile hdfs://<host>:<fs_port>/<path to file>#<link name> (a link ”<link name>” to the given file will be in the dir) # -cacheArchive hdfs://<host>:<fs_port>/<path to jar>#<link name> (link points to dir that contains files from given jar) # -cmdenv <env var name>=<value> # -jobconf <property name>=<value> # -addpath yes (replace each input key by a tuple consisting of the path of the corresponding input file and the original key) # -fake yes (fake run, only prints the underlying shell commands but does not actually execute them) # -memlimit <number of bytes> (set an upper limit on the amount of memory that can be used)