module Sip class Sipper attr_reader :config def initialize(config) @config = config slavefile = File.join(ENV['HADOOP_HOME'], 'conf', 'slaves') log "Reading slaves from file #{slavefile}..." begin open(slavefile, 'r') { |f| @slaves = f.read.split("\n") } rescue raise HadoopException, "Could not read \"#{slavefile}\". Is your HADOOP_HOME environment variable correct?" end Utils::sanity_check(@config) end def log(msg) puts "#{Utils::hostname} #{Time.now.strftime '%Y-%m-%d %H:%M:%S'}: #{msg}" if @config[:debug] end # return number of scripts created def create_scripts(dbconf, tableconf) @scripts = [] db = DBBase.make_interface dbconf['type'], dbconf, self # set default columns if necessary tableconf['columns'] = db.columns(tableconf['tablename']).map { |c| c.first } if tableconf['columns'].nil? if tableconf['incremental_index'].nil? create_script_without_index(dbconf, tableconf, db) else create_scripts_with_index(dbconf, tableconf, db) end @scripts.length end # this is the case where there is no primary key index, so the whole # table will need to be imported def create_script_without_index(dbconf, tableconf, db) Hive::run_hsql_create_table self, db, tableconf, :overwrite log "Importing all rows from table #{dbconf['dbname']}.#{tableconf['tablename']}" select = db.generate_command tableconf transpart_opts = generate_transpart_options(tableconf, db) @scripts << Utils::write_script(self, @slaves.first, select, dbconf['dbname'], tableconf['tablename'], transpart_opts) end def create_scripts_with_index(dbconf, tableconf, db) max = db.get_column_max tableconf['tablename'], tableconf['incremental_index'] method = (tableconf['method'] == "append" and not @config[:overwrite]) ? :append : :overwrite if method == :append and max == tableconf['incremental_index_value'] log "Ignoring #{dbconf['dbname']}.#{tableconf['tablename']} - already up to date" else Hive::run_hsql_create_table self, db, tableconf, method @slaves.each_with_index { |slavename, index| @scripts << create_script(slavename, index, dbconf, tableconf, db, max, method) } db.close end # update incremental index value if method in conf is append, regardless of whether or # not this is a forced overwrite tableconf['incremental_index_value'] = max if tableconf['method'] == "append" end def create_script(slavename, index, dbconf, tableconf, db, max, method) if method == :append first, last = get_even_split(tableconf['incremental_index_value']+1, max, index, @slaves.length) else first, last = 1, max end log "Importing #{first} <= #{tableconf['incremental_index']} <= #{last} from table #{dbconf['dbname']}.#{tableconf['tablename']}" select = db.generate_command tableconf, first, last transpart_opts = generate_transpart_options(tableconf, db) Utils::write_script self, slavename, select, dbconf['dbname'], tableconf['tablename'], transpart_opts end def generate_transpart_options(tableconf, db) opts = CmdOpts.new opts['c'] = db.order_column_list(tableconf['tablename'], tableconf['columns']).join(',') opts['p'] = tableconf['partition_by'] if tableconf.has_key? "partition_by" if tableconf.has_key? 'transformations' and tableconf['transformations'].length > 0 opts['t'] = tableconf['transformations'].map { |k,v| "#{k}:#{v}" }.join(',') end opts['H'] = tableconf['hive_table_name'] opts['o'] = File.join(@config['tmpdir'], 'partitions') opts.set('d') if @config[:debug] opts end def get_even_split(min, max, index, count) size = ((max - min + 1).to_f / count.to_f).ceil first = (size * index) + min last = (size * (index+1)) + min - 1 [first, [max, last].min] end def run_scripts pids = {} @scripts.each { |script| log "Running #{script}..." pid = fork { Kernel.exit system("sh #{script}") } pids[pid] = script sleep 1 } Process.waitall.map { |r| r.last }.each { |status| raise ImportScriptExecutionError, "Error runing script '#{pids[status.pid]}'" if status.exitstatus != 0 } end def run_hive_import(tableconf) Hive.run_import self, tableconf end end end