require 'right_aws' require 'configliere/config_block' # EMR_CONFIG_DIR = '~/.wukong' unless defined?(EMR_CONFIG_DIR) # Settings.define :emr_credentials_file, :description => 'A .json file holding your AWS access credentials. See for format' Settings.define :access_key, :description => 'AWS Access key', :env_var => 'AWS_ACCESS_KEY_ID' Settings.define :secret_access_key, :description => 'AWS Secret Access key', :env_var => 'AWS_SECRET_ACCESS_KEY' Settings.define :emr_runner, :description => 'Path to the elastic-mapreduce command (~ etc will be expanded)' Settings.define :emr_root, :description => 'S3 bucket and path to use as the base for Elastic MapReduce storage, organized by job name' Settings.define :emr_data_root, :description => 'Optional ' Settings.define :emr_bootstrap_script, :description => 'Bootstrap actions for Elastic Map Reduce machine provisioning', :default => EMR_CONFIG_DIR+'/', :type => :filename, :finally => lambda{ Settings.emr_bootstrap_script = File.expand_path(Settings.emr_bootstrap_script) } Settings.define :emr_extra_args, :description => 'kludge: allows you to stuff extra args into the elastic-mapreduce invocation', :type => Array, :wukong => true Settings.define :alive, :description => 'Whether to keep machine running after job invocation', :type => :boolean # Settings.define :key_pair_file, :description => 'AWS Key pair file', :type => :filename Settings.define :key_pair, :description => "AWS Key pair name. If not specified, it's taken from key_pair_file's basename", :finally => lambda{ Settings.key_pair ||= File.basename(Settings.key_pair_file.to_s, '.pem') if Settings.key_pair_file } Settings.define :instance_type, :description => 'AWS instance type to use', :default => 'm1.small' Settings.define :master_instance_type, :description => 'Overrides the instance type for the master node', :finally => lambda{ Settings.master_instance_type ||= Settings.instance_type } Settings.define :jobflow, :description => "ID of an existing EMR job flow. Wukong will create a new job flow" #'/emr.yaml')) module Wukong # # EMR Options # module EmrCommand def execute_emr_workflow copy_script_to_cloud execute_emr_runner end def copy_script_to_cloud " Copying this script to the cloud.", mapper_s3_uri), reducer_s3_uri), bootstrap_s3_uri) end def copy_jars_to_cloud'/tmp/wukong-libs.jar'), wukong_libs_s3_uri) # "--cache-archive=#{wukong_libs_s3_uri}#vendor", end def hadoop_options_for_emr_runner [hadoop_jobconf_options, hadoop_other_args] do |hdp_opt| hdp_opt.split(' ').map {|part| "--arg '#{part}'"} end.flatten end def execute_emr_runner # fix_paths! command_args = [] if Settings.jobflow command_args << Settings.dashed_flag_for(:jobflow) else command_args << "--create --name=#{job_name}" command_args << Settings.dashed_flag_for(:alive) command_args << Settings.dashed_flags(:num_instances, [:instance_type, :slave_instance_type], :master_instance_type, :hadoop_version).join(' ') command_args << Settings.dashed_flags(:availability_zone, :key_pair, :key_pair_file).join(' ') command_args << "--bootstrap-action=#{bootstrap_s3_uri}" end command_args << Settings.dashed_flags(:enable_debugging, :step_action, [:emr_runner_verbose, :verbose], [:emr_runner_debug, :debug]).join(' ') command_args += emr_credentials command_args += [ "--log-uri=#{log_s3_uri}", "--stream", "--mapper=#{mapper_s3_uri} ", "--reducer=#{reducer_s3_uri} ", "--input=#{input_paths.join(",")} --output=#{output_path}", ] # eg to specify zero reducers: # Settings[:emr_extra_args] = "--arg '-D mapred.reduce.tasks=0'" command_args += Settings[:emr_extra_args] unless Settings[:emr_extra_args].blank? command_args += hadoop_options_for_emr_runner 'Follow along at http://localhost:9000/job' execute_command!( File.expand_path(Settings.emr_runner), *command_args ) end def emr_credentials command_args = [] if Settings.emr_credentials_file command_args << "--credentials #{File.expand_path(Settings.emr_credentials_file)}" else command_args << %Q{--access-id #{Settings.access_key} --private-key #{Settings.secret_access_key} } end command_args end # A short name for this job def job_handle File.basename($0,'.rb') end # Produces an s3 URI within the Wukong emr sandbox from a set of path # segments # # @example # Settings.emr_root = 's3://' # emr_s3_path('log', 'my_happy_job', 'run-97.log') # # => "s3://" # def emr_s3_path *path_segs File.join(Settings.emr_root, path_segs.flatten.compact) end def mapper_s3_uri emr_s3_path(job_handle, 'code', job_handle+'-mapper.rb') end def reducer_s3_uri emr_s3_path(job_handle, 'code', job_handle+'-reducer.rb') end def log_s3_uri emr_s3_path(job_handle, 'log', 'emr_jobs') end def bootstrap_s3_uri emr_s3_path(job_handle, 'bin', "") end def wukong_libs_s3_uri emr_s3_path(job_handle, 'code', "wukong-libs.jar") end ABSOLUTE_URI = %r{^/|^\w+://} # # Walk through the input paths and the output path. Prepends # Settings.emr_data_root to any that does NOT look like # an absolute path ("/foo") or a URI ("s3://yourmom/data") # def fix_paths! return if Settings.emr_data_root.blank? unless input_paths.blank? @input_paths ={|path| (path =~ ABSOLUTE_URI) ? path : File.join(Settings.emr_data_root, path) } end unless output_path.blank? @output_path = [output_path].map{|path| (path =~ ABSOLUTE_URI) ? path : File.join(Settings.emr_data_root, path) } end end # # Simple class to coordinate s3 operations # class S3Util # class methods class << self def s3 @s3 ||= Settings.access_key, Settings.secret_access_key, {:multi_thread => true, :logger => Log, :port => 80, :protocol => 'http' }) end def bucket_and_path_from_uri uri uri =~ %r{^s3\w*://([\w\.\-]+)\W*(.*)} and return([$1, $2]) end def store filename, uri dest_bucket, dest_key = bucket_and_path_from_uri(uri) Log.debug " #{filename} => #{dest_bucket} / #{dest_key}" contents = s3.store_object(:bucket => dest_bucket, :key => dest_key, :data => contents) end end end end Script.class_eval do include EmrCommand end end