require 'pathname' require 'wukong/script/hadoop_command' require 'wukong/script/local_command' require 'rbconfig' module Wukong # == How to run a Wukong script # # your/script.rb --run path/to/input_files path/to/output_dir # # All of the file paths are HDFS paths ; your script path, of course, is on the local filesystem. # # == Command-line options # # If you'd like to listen for any command-line options, specify them at the # command line: # # your/script.rb --my_bool_opt --my_val_taking_opt=val \ # --run path/to/input_files path/to/output_dir # # In this case the options hash for both Mapper and Reducer will contain # # :my_bool_opt => true, # :my_val_taking_opt => 'val' # # == Complicated input paths # # To use more than one file as input, you can use normal * ? [] wildcards or # give a comma-separated list -- see the hadoop documentation for syntax. # # == Run locally (--run=local) # # To run your script locally, use --run=local # # your/script.rb --run=local path/to/input_files path/to/output_dir # # This will pipe the contents of path/to/input_files through first your # mapper, then sort, then the reducer, storing the results in the given output # directory. # # All paths refer to the /local/ filesystem -- hadoop is never involved and in # fact doesn't even have to be installed. # # == How to test your scripts # # You can supply the --map argument in place of --run to run the mapper on its # own (and similarly, --reduce to run the reducer standalone): # # cat ./local/test/input.tsv | ./examples/word_count.rb --map | more # # or, if your test data lies on the HDFS, # # hdp-cat test/input.tsv | ./examples/word_count.rb --map | more # # class Script include Wukong::HadoopCommand include Wukong::LocalCommand attr_accessor :mapper_klass, :reducer_klass, :options # # Instantiate the Script with the Mapper and the Reducer class (each a # Wukong::Streamer) it should call back. # # # == Identity or External program as map or reduce # # To use the identity reducer ('cat'), instantiate your Script class with # +nil+ as the reducer class. (And similarly to use an identity mapper, # supply +nil+ for the mapper class.) # # To use an external program as your reducer (mapper), subclass the # reduce_command (map_command) method to return the full command line # expression to call. # # class MyMapper < Wukong::Streamer::Base # # ... awesome stuff ... # end # # class MyScript < Wukong::Script # # prefix each unique line with the count of its occurrences. # def reduce_command # '/usr/bin/uniq -c' # end # end # MyScript.new(MyMapper, nil).run # def initialize mapper_klass, reducer_klass, extra_options={} self.options = default_options.merge(extra_options) process_argv! self.mapper_klass = mapper_klass self.reducer_klass = reducer_klass # If no reducer_klass and no reduce_command, then skip the reduce phase options[:reduce_tasks] = 0 if (! reducer_klass) && (! options[:reduce_command]) && (! options[:reduce_tasks]) end # # Gives default options. Command line parameters take precedence # # MAKE SURE YOU CALL SUPER: write your script according to the patter # # super.merge :my_option => :val # def default_options Wukong::CONFIG[:runner_defaults] || {} end # Options that don't need to go in the :all_args hash def std_options @std_options ||= [:run, :map, :reduce, ] + HADOOP_OPTIONS_MAP.keys end # # Parse the command-line args into the options hash. # # I should not reinvent the wheel. # Yet: here we are. # # '--foo=foo_val' produces :foo => 'foo_val' in the options hash. # '--' After seeing a non-'--' flag, or a '--' on its own, no further flags are parsed # # options[:all_args] contains all arguments that are not in std_options # options[:rest] contains all arguments following the first non-flag (or the '--') # def process_argv! options[:all_args] = [] args = ARGV.dup while args do arg = args.shift case when arg == '--' break when arg =~ /\A--(\w+)(?:=(.+))?\z/ opt, val = [$1, $2] opt = opt.to_sym val ||= true self.options[opt] = val options[:all_args] << arg unless std_options.include?(opt) else args.unshift(arg) ; break end end options[:all_args] = options[:all_args].join(" ") options[:rest] = args end def this_script_filename Pathname.new($0).realpath end def ruby_interpreter_path Pathname.new( File.join(Config::CONFIG["bindir"], Config::CONFIG["RUBY_INSTALL_NAME"]+ Config::CONFIG["EXEEXT"]) ).realpath end # # by default, call this script in --map mode # def map_command case when mapper_klass "#{ruby_interpreter_path} #{this_script_filename} --map " + options[:all_args] else options[:map_command] || Wukong::CONFIG[:default_mapper] end end # # Shell command for reduce phase # by default, call this script in --reduce mode # def reduce_command case when reducer_klass "#{ruby_interpreter_path} #{this_script_filename} --reduce " + options[:all_args] else options[:reduce_command] || Wukong::CONFIG[:default_reducer] end end # # Shell command to re-run in mapreduce mode using --map and --reduce # def runner_command input_path, output_path # run as either local or hadoop case run_mode when 'local' $stderr.puts " Reading STDIN / Writing STDOUT" command = local_command input_path, output_path when 'hadoop', 'mapred' $stderr.puts " Launching hadoop as" command = hadoop_command input_path, output_path else raise "Need to use --run=local or --run=hadoop; or to use the :default_run_mode in config.yaml just say --run " end end def run_mode # if only --run is given, assume default run mode options[:run] = Wukong::CONFIG[:default_run_mode] if (options[:run] == true) options[:run].to_s end def input_output_paths # input / output paths input_path, output_path = options[:rest][0..1] raise "You need to specify a parsed input directory and a directory for output. Got #{ARGV.inspect}" if (! options[:fake]) && (input_path.blank? || output_path.blank?) [input_path, output_path] end def maybe_overwrite_output_paths! output_path if (options[:overwrite] || options[:rm]) && (run_mode != 'local') $stderr.puts "Removing output file #{output_path}" `hdp-rm -r '#{output_path}'` end end # # Execute the runner phase # def exec_hadoop_streaming $stderr.puts "Streaming on self" input_path, output_path = input_output_paths maybe_overwrite_output_paths! output_path command = runner_command(input_path, output_path) $stderr.puts command if ! options[:fake] $stdout.puts `#{command}` end end # # If --map or --reduce, dispatch to the mapper or reducer. # Otherwise, # def run case when options[:map] mapper_klass.new(self.options).stream when options[:reduce] reducer_klass.new(self.options).stream when options[:run] exec_hadoop_streaming else self.help # Normant Vincent Peale is proud of you end end # # Command line usage # def help $stderr.puts "#{self.class} script" $stderr.puts %Q{ #{$0} --run=hadoop input_hdfs_path output_hdfs_dir # run the script with hadoop streaming #{$0} --run=local input_hdfs_path output_hdfs_dir # run the script on local filesystem using unix pipes #{$0} --run input_hdfs_path output_hdfs_dir # run the script with the mode given in config/wukong*.yaml #{$0} --map #{$0} --reduce # dispatch to the mapper or reducer You can specify as well arbitrary script-specific command line flags; they are added to your options[] hash. } end end end