Class Sip::Sipper
In: lib/sip/sipper.rb
Parent: Object

Methods

Attributes

config  [R] 

Public Class methods

[Source]

# File lib/sip/sipper.rb, line 5
    def initialize(config)
      @config = config
      slavefile = File.join(ENV['HADOOP_HOME'], 'conf', 'slaves')
      log "Reading slaves from file #{slavefile}..."
      begin
        open(slavefile, 'r') { |f| @slaves = f.read.split("\n") }
      rescue
        raise HadoopException, "Could not read \"#{slavefile}\".  Is your HADOOP_HOME environment variable correct?"
      end
      Utils::sanity_check(@config)
    end

Public Instance methods

[Source]

# File lib/sip/sipper.rb, line 68
    def create_script(slavename, index, dbconf, tableconf, db, max, method)
      if method == :append
        first, last = get_even_split(tableconf['incremental_index_value']+1, max, index, @slaves.length)
      else
        first, last = 1, max
      end

      log "Importing #{first} <= #{tableconf['incremental_index']} <= #{last} from table #{dbconf['dbname']}.#{tableconf['tablename']}"
      select = db.generate_command tableconf, first, last
      transpart_opts = generate_transpart_options(tableconf, db)
      Utils::write_script self, slavename, select, dbconf['dbname'], tableconf['tablename'], transpart_opts
    end

this is the case where there is no primary key index, so the whole table will need to be imported

[Source]

# File lib/sip/sipper.rb, line 40
    def create_script_without_index(dbconf, tableconf, db)
      Hive::run_hsql_create_table self, db, tableconf, :overwrite

      log "Importing all rows from table #{dbconf['dbname']}.#{tableconf['tablename']}"
      select = db.generate_command tableconf
      transpart_opts = generate_transpart_options(tableconf, db)
      @scripts << Utils::write_script(self, @slaves.first, select, dbconf['dbname'], tableconf['tablename'], transpart_opts)
    end

return number of scripts created

[Source]

# File lib/sip/sipper.rb, line 22
    def create_scripts(dbconf, tableconf)
      @scripts = []
      db = DBBase.make_interface dbconf['type'], dbconf, self

      # set default columns if necessary
      tableconf['columns'] = db.columns(tableconf['tablename']).map { |c| c.first } if tableconf['columns'].nil?

      if tableconf['incremental_index'].nil?
        create_script_without_index(dbconf, tableconf, db)
      else
        create_scripts_with_index(dbconf, tableconf, db)
      end

      @scripts.length
    end

[Source]

# File lib/sip/sipper.rb, line 49
    def create_scripts_with_index(dbconf, tableconf, db)
      max = db.get_column_max tableconf['tablename'], tableconf['incremental_index']

      method = (tableconf['method'] == "append" and not @config[:overwrite]) ? :append : :overwrite
      if method == :append and max == tableconf['incremental_index_value']
        log "Ignoring #{dbconf['dbname']}.#{tableconf['tablename']} - already up to date"
      else
        Hive::run_hsql_create_table self, db, tableconf, method
        @slaves.each_with_index { |slavename, index|
          @scripts << create_script(slavename, index, dbconf, tableconf, db, max, method)
        }
        db.close
      end

      # update incremental index value if method in conf is append, regardless of whether or 
      # not this is a forced overwrite
      tableconf['incremental_index_value'] = max if tableconf['method'] == "append"
    end

[Source]

# File lib/sip/sipper.rb, line 81
    def generate_transpart_options(tableconf, db)
      opts = CmdOpts.new
      opts['c'] = db.order_column_list(tableconf['tablename'], tableconf['columns']).join(',')
      opts['p'] = tableconf['partition_by'] if tableconf.has_key? "partition_by"
      if tableconf.has_key? 'transformations' and tableconf['transformations'].length > 0
        opts['t'] = tableconf['transformations'].map { |k,v| "#{k}:#{v}" }.join(',')
      end
      opts['H'] = tableconf['hive_table_name']
      opts['o'] = File.join(@config['tmpdir'], 'partitions')
      opts.set('d') if @config[:debug]
      opts
    end

[Source]

# File lib/sip/sipper.rb, line 94
    def get_even_split(min, max, index, count)
      size = ((max - min + 1).to_f / count.to_f).ceil
      first = (size * index) + min
      last = (size * (index+1)) + min - 1
      [first, [max, last].min]
    end

[Source]

# File lib/sip/sipper.rb, line 17
    def log(msg)
      puts "#{Utils::hostname} #{Time.now.strftime '%Y-%m-%d %H:%M:%S'}: #{msg}" if @config[:debug]
    end

[Source]

# File lib/sip/sipper.rb, line 114
    def run_hive_import(tableconf)
      Hive.run_import self, tableconf
    end

[Source]

# File lib/sip/sipper.rb, line 101
    def run_scripts
      pids = {}
      @scripts.each { |script|
        log "Running #{script}..."
        pid = fork { Kernel.exit system("sh #{script}") }
        pids[pid] = script
        sleep 1
      }
      Process.waitall.map { |r| r.last }.each { |status|
        raise ImportScriptExecutionError, "Error runing script '#{pids[status.pid]}'" if status.exitstatus != 0
      }
    end

[Validate]