lib/kpm/database.rb in kpm-0.7.2 vs lib/kpm/database.rb in kpm-0.8.0

- old
+ new

@@ -1,140 +1,116 @@ +# frozen_string_literal: true + require 'tmpdir' module KPM - class Database - class << self + # Mysql Information functions + LAST_INSERTED_ID = 'SELECT LAST_INSERT_ID();' + ROWS_UPDATED = 'SELECT ROW_COUNT();' - # Mysql Information functions - LAST_INSERTED_ID = 'SELECT LAST_INSERT_ID();' - ROWS_UPDATED = 'SELECT ROW_COUNT();' + # Destination database + DATABASE = ENV['DATABASE'] || 'killbill' + USERNAME = ENV['USERNAME'] || 'root' + PASSWORD = ENV['PASSWORD'] || 'root' + HOST = ENV['HOST'] || 'localhost' + PORT = ENV['PORT'] || '3306' - # Destination database - DATABASE = ENV['DATABASE'] || 'killbill' - USERNAME = ENV['USERNAME'] || 'root' - PASSWORD = ENV['PASSWORD'] || 'root' - HOST = ENV['HOST'] || 'localhost' - PORT = ENV['PORT'] || '3306' + COLUMN_NAME_POS = 3 - COLUMN_NAME_POS = 3 - - STATEMENT_TMP_FILE = Dir.mktmpdir('statement') + File::SEPARATOR + 'statement.sql' + STATEMENT_TMP_FILE = Dir.mktmpdir('statement') + File::SEPARATOR + 'statement.sql' - MYSQL_COMMAND_LINE = "mysql #{DATABASE} --user=#{USERNAME} --password=#{PASSWORD} " + def initialize(database_name, host, port, username, password, logger) + @database_name = database_name || DATABASE + @host = host || HOST + @port = port || PORT + @username = username || USERNAME + @password = password || PASSWORD + @mysql_command_line = "mysql --max_allowed_packet=128M #{@database_name} --host=#{@host} --port=#{@port} --user=#{@username} --password=#{@password} " - @@mysql_command_line = MYSQL_COMMAND_LINE - @@username = USERNAME - @@password = PASSWORD - @@database = DATABASE - @@host = HOST - @@port = PORT + @logger = logger + end - def set_logger(logger) - @@logger = logger - end + def execute_insert_statement(table_name, query, qty_to_insert, _table_data, record_id = nil) + query = "set #{record_id[:variable]}=#{record_id[:value]}; #{query}" unless record_id.nil? + query = "SET sql_mode = ''; SET autocommit=0; #{query} COMMIT; SHOW WARNINGS;" - def set_credentials(user = nil, password = nil) - @@username = user - @@password = password + File.open(STATEMENT_TMP_FILE, 'w') do |s| + s.puts query end - def set_host(host) - @@host = host - end + response = `#{@mysql_command_line} < "#{STATEMENT_TMP_FILE}" 2>&1` - def set_port(port) - @@port = port + if response.include? 'ERROR' + @logger.error "\e[91;1mTransaction that fails to be executed (first 1,000 chars)\e[0m" + # Queries can be really big (bulk imports) + @logger.error "\e[91m#{query[0..1000]}\e[0m" + raise Interrupt, "Importing table #{table_name}...... \e[91;1m#{response}\e[0m" end - def set_database_name(database_name = nil) - @@database = database_name - end + if response.include? 'LAST_INSERT_ID' + @logger.info "\e[32mImporting table #{table_name}...... Row 1 of #{qty_to_insert} success\e[0m" - def set_mysql_command_line - @@mysql_command_line = "mysql #{@@database} --host=#{@@host} --port=#{@@port} --user=#{@@username} --password=#{@@password} " + return response.split("\n")[1] end - def execute_insert_statement(table_name, query, qty_to_insert, table_data, record_id = nil) - - unless record_id.nil? - query = "set #{record_id[:variable]}=#{record_id[:value]}; #{query}" + if response.include? 'ROW_COUNT' + # Typically, something like: "mysql: [Warning] Using a password on the command line interface can be insecure.\nROW_COUNT()\n3\n" + # With warning: "mysql: [Warning] Using a password on the command line interface can be insecure.\nROW_COUNT()\n1743\nLevel\tCode\tMessage\nWarning\t1264\tOut of range value for column 'amount' at row 582\n" + response_msg = response.split("\n") + idx_row_count_inserted = response_msg.index('ROW_COUNT()') + 1 + row_count_inserted = response_msg[idx_row_count_inserted] + @logger.info "\e[32mImporting table #{table_name}...... Row #{row_count_inserted || 1} of #{qty_to_insert} success\e[0m" + if idx_row_count_inserted < response_msg.size - 1 + warning_msg = response_msg[response_msg.size - 1] + @logger.warn "\e[91m#{warning_msg}\e[0m" end - query = "SET autocommit=0; #{query} COMMIT;" - - File.open(STATEMENT_TMP_FILE,'w') do |s| - s.puts query - end - - response = `#{@@mysql_command_line} < "#{STATEMENT_TMP_FILE}" 2>&1` - - if response.include? 'ERROR' - @@logger.error "\e[91;1mTransaction that fails to be executed\e[0m" - @@logger.error "\e[91m#{query}\e[0m" - raise Interrupt, "Importing table #{table_name}...... \e[91;1m#{response}\e[0m" - end - - if response.include? 'LAST_INSERT_ID' - @@logger.info "\e[32mImporting table #{table_name}...... Row 1 of #{qty_to_insert} success\e[0m" - - return response.split("\n")[1] - end - - if response.include? 'ROW_COUNT' - response_msg = response.split("\n") - row_count_inserted = response_msg[response_msg.size - 1] - @@logger.info "\e[32mImporting table #{table_name}...... Row #{ row_count_inserted || 1} of #{qty_to_insert} success\e[0m" - - return true - end - - return true end - def generate_insert_statement(tables) + true + end - statements = [] - @@logger.info "\e[32mGenerating statements\e[0m" + def generate_insert_statement(tables) + statements = [] + @logger.info "\e[32mGenerating statements\e[0m" - tables.each_key do |table_name| - table = tables[table_name] - if !table[:rows].nil? && table[:rows].size > 0 - columns_names = table[:col_names].join(",").gsub(/'/,'') + tables.each_key do |table_name| + table = tables[table_name] + next unless !table[:rows].nil? && !table[:rows].empty? - rows = [] - table[:rows].each do |row| - rows << row.map do |value| - if value.is_a?(Symbol) - value.to_s - else - escaped_value = value.to_s.gsub(/['"]/, "'" => "\\'", '"' => '\\"') - .gsub('\N{LINE FEED}', "\n") - .gsub('\N{VERTICAL LINE}', "|") - "'#{escaped_value}'" - end - end.join(",") - end + columns_names = table[:col_names].join(',').gsub(/'/, '') - value_data = rows.map{|row| "(#{row})" }.join(",") - - statements << {:query => get_insert_statement(table_name,columns_names,value_data, rows.size), - :qty_to_insert => rows.size, :table_name => table_name, :table_data => table} - - end - + rows = [] + table[:rows].each do |row| + rows << row.map do |value| + if value.is_a?(Symbol) + value.to_s + elsif value.is_a?(Blob) + value.value + else + escaped_value = value.to_s.gsub(/['"]/, "'" => "\\'", '"' => '\\"') + .gsub('\N{LINE FEED}', "\n") + .gsub('\N{VERTICAL LINE}', '|') + "'#{escaped_value}'" + end + end.join(',') end - statements + # Break the insert statement into small chunks to avoid timeouts + rows.each_slice(1000).each do |subset_of_rows| + value_data = subset_of_rows.map { |row| "(#{row})" }.join(',') + statements << { query: build_insert_statement(table_name, columns_names, value_data, subset_of_rows.size), + qty_to_insert: subset_of_rows.size, table_name: table_name, table_data: table } + end end - private + statements + end - def get_insert_statement(table_name, columns_names, values, rows_qty) - return "INSERT INTO #{table_name} ( #{columns_names} ) VALUES #{values}; #{rows_qty == 1 ? LAST_INSERTED_ID : ROWS_UPDATED}" - end + private + def build_insert_statement(table_name, columns_names, values, rows_qty) + "INSERT INTO #{table_name} ( #{columns_names} ) VALUES #{values}; #{rows_qty == 1 ? LAST_INSERTED_ID : ROWS_UPDATED}" end - end - -end \ No newline at end of file +end