#!/usr/bin/env ruby require 'rubygems' require 'configliere' ; Configliere.use(:commandline, :env_var, :define) Settings.define :index_name, :required => true, :description => "Index to write data to" Settings.define :object_type, :default => "tweet", :description => "Type of object we're indexing" Settings.define :field_names, :default => "rsrc,tweet_id,created_at,user_id,screen_name,search_id,in_reply_to_user_id,in_reply_to_screen_name,in_reply_to_search_id,in_reply_to_status_id,text,source,lang,lat,lng,retweeted_count,rt_of_user_id,rt_of_screen_name,rt_of_tweet_id,contributors", :description => "Comma separated list of field names" Settings.define :id_field, :default => "1", :description => "Index of field to use as object id (counting from 0; default 1), use -1 if no id field" Settings.define :bulk_size, :default => "1000", :description => "Number of records per bulk request" Settings.define :es_home, :default => "/usr/local/share/elasticsearch", :description => "Path to elasticsearch installation",:env_var => "ES_HOME" Settings.define :es_config, :default => "/etc/elasticsearch/elasticsearch.yml", :description => "Path to elasticsearch config" Settings.define :rm, :default => false, :description => "Remove existing output?" Settings.define :hadoop_home, :default => "/usr/lib/hadoop", :description => "Path to hadoop installation", :env_var => "HADOOP_HOME" Settings.define :min_split_size, :default => "5000000000", :description => "Min split size for maps" Settings.define :test_outputfmt, :default => false, :description => "Use this flag to run job that test the ElasticSearchOutputFormat" Settings.resolve! raise "No input file specified." if Settings.rest.first.blank? raise "No output file specified." if Settings.rest.last.blank? class Wonderdog attr_accessor :options def initialize @options = Settings.dup end def execute output = options.rest.last remove_output(output) if options.rm system %Q{ echo #{hdp_cmd} } system %Q{ #{hdp_cmd} } end def hdp_cmd [ "HADOOP_CLASSPATH=#{hadoop_classpath}", "#{options.hadoop_home}/bin/hadoop jar #{run_jar}", mainclass, "-Dmapred.map.tasks.speculative.execution=false", "-Dmapred.min.split.size=#{options.min_split_size}", "-Dwonderdog.index.name=#{options.index_name}", "-Dwonderdog.object.type=#{options.object_type}", "-Dwonderdog.id.field=#{options.id_field}", "-Dwonderdog.field.names=#{options.field_names}", "-Dwonderdog.bulk.size=#{options.bulk_size}", "-Dwonderdog.config=#{options.es_config}", "-Dwonderdog.plugins.dir=#{options.es_home}/plugins", "-libjars #{libjars}", "#{options.rest.first}", "#{options.rest.last}" ].flatten.compact.join(" \t\\\n ") end def mainclass return "com.infochimps.elasticsearch.ElasticTest" if Settings.test_outputfmt "com.infochimps.elasticsearch.wonderdog.WonderDog" end def hadoop_classpath cp = ["."] Dir[ "/etc/elasticsearch/elasticsearch.yml", "#{options.es_home}/plugins/*/*.jar", "#{options.es_home}/lib/*.jar", "#{options.es_home}/lib/sigar/*.jar" ].each{|jar| cp << jar} cp.join(':') end def run_jar File.dirname(File.expand_path(__FILE__))+'/../build/wonderdog.jar' end def libjars libjars = [] Dir[ "/etc/elasticsearch/elasticsearch.yml", "#{options.es_home}/plugins/*/*.jar", "#{options.es_home}/lib/*.jar" ].each{|jar| libjars << jar} libjars.join(',') end def remove_output output system %Q{ hdp-rm -r #{output} } end end runner = Wonderdog.new runner.execute