require 'vertica_utils/types' require 'vertica_utils/source_db' require 'vertica_utils/vertica_sql' module VerticaUtils class VerticaLoader class << self def create_table *args options = args.extract_options! columns = [] table_definition(options).each(:as => :hash) do |row| columns << row end options[:columns] = columns sql = VerticaUtils::VerticaSql.create_table_stmt options end def create_all_tables db tables = VerticaUtils::SourceDb.get_tables(db) sqls = {} tables.each do |table| puts "Creating #{db}.#{table}" sql = "DROP TABLE IF EXISTS #{db}.#{table} CASCADE;" VerticaDb::Base.connection.execute sql sql = VerticaUtils::VerticaLoader.create_table(:vertica_db => "bidw", :vertica_table => table, :vertica_schema => db, :table => table, :db => db) sqls["#{table}"] = sql VerticaDb::Base.connection.execute sql end end def prepare_options *args options = args.extract_options! options.reverse_merge!( :host => "sfo-load-dw-01", :user => "vertica", :pass => "test", :db => "bidw", :schema => "test", :table => "", :file => "", :delimiter => "\t", :null_value => "NULL", :enclosed => "" ) return options end # VerticaUtils::VerticaLoader.load_to_vertica({:schema => "king", :table => "category_overview_data", :file => "tmp/vertica/category_overview_data.tsv", :null_value => "NULL"}) def load_to_vertica *args list_of_nulls = ["0000-00-00"] options = args.extract_options! prepared_options = prepare_options options Kernel.p prepared_options[:file] if prepared_options[:file].blank? raise "No input file" end begin process_file(:file => prepared_options[:file], :list_of_nulls => list_of_nulls, :null_value => prepared_options[:null_value]) cmd = get_vsql_command(prepared_options) Kernel.p cmd system(cmd) rescue Exception => e raise e.message end end def get_vsql_command prepared_options file_extension = prepared_options[:file].split('.').last file_handler = "" file_handler = "GZIP" if file_extension == "gz" sql = "COPY #{prepared_options[:schema]}.#{prepared_options[:table]} FROM LOCAL \'#{prepared_options[:file]}\' #{file_handler} DELIMITER E\'#{prepared_options[:delimiter]}\' NULL as \'#{prepared_options[:null_value]}\' ENCLOSED BY \'#{prepared_options[:enclosed]}\' EXCEPTIONS 'tmp/vertica/load_exceptions.log';" cmd = "/opt/vertica/bin/vsql -h #{prepared_options[:host]} -U #{prepared_options[:user]} -w #{prepared_options[:pass]} -d #{prepared_options[:db]} -c \"#{sql}\"" return cmd end def process_file *args ### replace the null values in the input file options = args.extract_options! options[:file].blank? ? return : file = options[:file] options[:list_of_nulls].blank? ? list_of_nulls = [] : list_of_nulls = options[:list_of_nulls] options[:null_value].blank? ? null_value = "NULL" : null_value = options[:null_value] file_extension = file.split('.').last case file_extension when "tsv", "csv" process_flat_file(file, list_of_nulls, null_value) when "gz" process_gzip_file(file, list_of_nulls, null_value) else raise "Un supported file extension" end end def replace_null(file, list_of_nulls, null_value = "NULL") list_of_nulls.each do | value| # special case for NULL MySQL datetime/date type but the column is defined NOT NULL if value == '0000-00-00' cmd1 = "sed -i 's/#{value}/1900-01-01/g' #{file}" Kernel.p cmd1 system(cmd1) else cmd1 = "sed -i 's/#{value}/#{null_value}/g' #{file}" Kernel.p cmd1 system(cmd1) end end end def process_flat_file file, list_of_nulls, null_value # sed replace_null(file, list_of_nulls, null_value) end def process_gzip_file file, list_of_nulls, null_value # unzip cmd = "gunzip -f #{file} -c > tmp/temp.txt" system(cmd) # sed replace_null(file, list_of_nulls, null_value) # zip cmd2 = "gzip tmp/temp.txt -c > #{file}" system(cmd2) end def get_table_columns * args options = args.extract_options! if options[:host].blank? or options[:user].blank? or options[:pass].blank? or options[:db].blank? or options[:schema].blank? or options[:table].blank? raise "Unspecified host, user, pass, db, schema or table" end result = {} count = 0 begin cmd = "/opt/vertica/bin/vsql -h #{options[:host]} -U #{options[:user]} -w #{options[:pass]} -d #{options[:db]} -c '\\d #{options[:schema]}.#{options[:table]}';" puts cmd output = `#{cmd}` lines = output.split("\n") data = lines[3..lines.size-2] data.each do |item| count += 1 column_name = item.split("|")[2].lstrip!.rstrip! column_type = item.split("|")[3].lstrip!.rstrip! puts column_name + " " + column_type result[count] = {column_name => column_type} end rescue Exception => e raise e.message end return result end def ssh_connection options ssh = Net::SSH.start(options[:ssh_host], options[:ssh_user], :password => options[:ssh_password]) end def table_definition options sql = "SELECT table_schema, table_name, column_name, is_nullable, data_type, column_type, column_key " sql += "FROM INFORMATION_SCHEMA.COLUMNS where table_name = '#{options[:table]}' " sql += "and table_schema = '#{options[:db]}';" puts sql desc = SourceDb.exec_sql(options[:db], sql) return desc end end end end