lib/kpm/database.rb in kpm-0.7.2 vs lib/kpm/database.rb in kpm-0.8.0
- old
+ new
@@ -1,140 +1,116 @@
+# frozen_string_literal: true
+
require 'tmpdir'
module KPM
-
class Database
- class << self
+ # Mysql Information functions
+ LAST_INSERTED_ID = 'SELECT LAST_INSERT_ID();'
+ ROWS_UPDATED = 'SELECT ROW_COUNT();'
- # Mysql Information functions
- LAST_INSERTED_ID = 'SELECT LAST_INSERT_ID();'
- ROWS_UPDATED = 'SELECT ROW_COUNT();'
+ # Destination database
+ DATABASE = ENV['DATABASE'] || 'killbill'
+ USERNAME = ENV['USERNAME'] || 'root'
+ PASSWORD = ENV['PASSWORD'] || 'root'
+ HOST = ENV['HOST'] || 'localhost'
+ PORT = ENV['PORT'] || '3306'
- # Destination database
- DATABASE = ENV['DATABASE'] || 'killbill'
- USERNAME = ENV['USERNAME'] || 'root'
- PASSWORD = ENV['PASSWORD'] || 'root'
- HOST = ENV['HOST'] || 'localhost'
- PORT = ENV['PORT'] || '3306'
+ COLUMN_NAME_POS = 3
- COLUMN_NAME_POS = 3
-
- STATEMENT_TMP_FILE = Dir.mktmpdir('statement') + File::SEPARATOR + 'statement.sql'
+ STATEMENT_TMP_FILE = Dir.mktmpdir('statement') + File::SEPARATOR + 'statement.sql'
- MYSQL_COMMAND_LINE = "mysql #{DATABASE} --user=#{USERNAME} --password=#{PASSWORD} "
+ def initialize(database_name, host, port, username, password, logger)
+ @database_name = database_name || DATABASE
+ @host = host || HOST
+ @port = port || PORT
+ @username = username || USERNAME
+ @password = password || PASSWORD
+ @mysql_command_line = "mysql --max_allowed_packet=128M #{@database_name} --host=#{@host} --port=#{@port} --user=#{@username} --password=#{@password} "
- @@mysql_command_line = MYSQL_COMMAND_LINE
- @@username = USERNAME
- @@password = PASSWORD
- @@database = DATABASE
- @@host = HOST
- @@port = PORT
+ @logger = logger
+ end
- def set_logger(logger)
- @@logger = logger
- end
+ def execute_insert_statement(table_name, query, qty_to_insert, _table_data, record_id = nil)
+ query = "set #{record_id[:variable]}=#{record_id[:value]}; #{query}" unless record_id.nil?
+ query = "SET sql_mode = ''; SET autocommit=0; #{query} COMMIT; SHOW WARNINGS;"
- def set_credentials(user = nil, password = nil)
- @@username = user
- @@password = password
+ File.open(STATEMENT_TMP_FILE, 'w') do |s|
+ s.puts query
end
- def set_host(host)
- @@host = host
- end
+ response = `#{@mysql_command_line} < "#{STATEMENT_TMP_FILE}" 2>&1`
- def set_port(port)
- @@port = port
+ if response.include? 'ERROR'
+ @logger.error "\e[91;1mTransaction that fails to be executed (first 1,000 chars)\e[0m"
+ # Queries can be really big (bulk imports)
+ @logger.error "\e[91m#{query[0..1000]}\e[0m"
+ raise Interrupt, "Importing table #{table_name}...... \e[91;1m#{response}\e[0m"
end
- def set_database_name(database_name = nil)
- @@database = database_name
- end
+ if response.include? 'LAST_INSERT_ID'
+ @logger.info "\e[32mImporting table #{table_name}...... Row 1 of #{qty_to_insert} success\e[0m"
- def set_mysql_command_line
- @@mysql_command_line = "mysql #{@@database} --host=#{@@host} --port=#{@@port} --user=#{@@username} --password=#{@@password} "
+ return response.split("\n")[1]
end
- def execute_insert_statement(table_name, query, qty_to_insert, table_data, record_id = nil)
-
- unless record_id.nil?
- query = "set #{record_id[:variable]}=#{record_id[:value]}; #{query}"
+ if response.include? 'ROW_COUNT'
+ # Typically, something like: "mysql: [Warning] Using a password on the command line interface can be insecure.\nROW_COUNT()\n3\n"
+ # With warning: "mysql: [Warning] Using a password on the command line interface can be insecure.\nROW_COUNT()\n1743\nLevel\tCode\tMessage\nWarning\t1264\tOut of range value for column 'amount' at row 582\n"
+ response_msg = response.split("\n")
+ idx_row_count_inserted = response_msg.index('ROW_COUNT()') + 1
+ row_count_inserted = response_msg[idx_row_count_inserted]
+ @logger.info "\e[32mImporting table #{table_name}...... Row #{row_count_inserted || 1} of #{qty_to_insert} success\e[0m"
+ if idx_row_count_inserted < response_msg.size - 1
+ warning_msg = response_msg[response_msg.size - 1]
+ @logger.warn "\e[91m#{warning_msg}\e[0m"
end
- query = "SET autocommit=0; #{query} COMMIT;"
-
- File.open(STATEMENT_TMP_FILE,'w') do |s|
- s.puts query
- end
-
- response = `#{@@mysql_command_line} < "#{STATEMENT_TMP_FILE}" 2>&1`
-
- if response.include? 'ERROR'
- @@logger.error "\e[91;1mTransaction that fails to be executed\e[0m"
- @@logger.error "\e[91m#{query}\e[0m"
- raise Interrupt, "Importing table #{table_name}...... \e[91;1m#{response}\e[0m"
- end
-
- if response.include? 'LAST_INSERT_ID'
- @@logger.info "\e[32mImporting table #{table_name}...... Row 1 of #{qty_to_insert} success\e[0m"
-
- return response.split("\n")[1]
- end
-
- if response.include? 'ROW_COUNT'
- response_msg = response.split("\n")
- row_count_inserted = response_msg[response_msg.size - 1]
- @@logger.info "\e[32mImporting table #{table_name}...... Row #{ row_count_inserted || 1} of #{qty_to_insert} success\e[0m"
-
- return true
- end
-
- return true
end
- def generate_insert_statement(tables)
+ true
+ end
- statements = []
- @@logger.info "\e[32mGenerating statements\e[0m"
+ def generate_insert_statement(tables)
+ statements = []
+ @logger.info "\e[32mGenerating statements\e[0m"
- tables.each_key do |table_name|
- table = tables[table_name]
- if !table[:rows].nil? && table[:rows].size > 0
- columns_names = table[:col_names].join(",").gsub(/'/,'')
+ tables.each_key do |table_name|
+ table = tables[table_name]
+ next unless !table[:rows].nil? && !table[:rows].empty?
- rows = []
- table[:rows].each do |row|
- rows << row.map do |value|
- if value.is_a?(Symbol)
- value.to_s
- else
- escaped_value = value.to_s.gsub(/['"]/, "'" => "\\'", '"' => '\\"')
- .gsub('\N{LINE FEED}', "\n")
- .gsub('\N{VERTICAL LINE}', "|")
- "'#{escaped_value}'"
- end
- end.join(",")
- end
+ columns_names = table[:col_names].join(',').gsub(/'/, '')
- value_data = rows.map{|row| "(#{row})" }.join(",")
-
- statements << {:query => get_insert_statement(table_name,columns_names,value_data, rows.size),
- :qty_to_insert => rows.size, :table_name => table_name, :table_data => table}
-
- end
-
+ rows = []
+ table[:rows].each do |row|
+ rows << row.map do |value|
+ if value.is_a?(Symbol)
+ value.to_s
+ elsif value.is_a?(Blob)
+ value.value
+ else
+ escaped_value = value.to_s.gsub(/['"]/, "'" => "\\'", '"' => '\\"')
+ .gsub('\N{LINE FEED}', "\n")
+ .gsub('\N{VERTICAL LINE}', '|')
+ "'#{escaped_value}'"
+ end
+ end.join(',')
end
- statements
+ # Break the insert statement into small chunks to avoid timeouts
+ rows.each_slice(1000).each do |subset_of_rows|
+ value_data = subset_of_rows.map { |row| "(#{row})" }.join(',')
+ statements << { query: build_insert_statement(table_name, columns_names, value_data, subset_of_rows.size),
+ qty_to_insert: subset_of_rows.size, table_name: table_name, table_data: table }
+ end
end
- private
+ statements
+ end
- def get_insert_statement(table_name, columns_names, values, rows_qty)
- return "INSERT INTO #{table_name} ( #{columns_names} ) VALUES #{values}; #{rows_qty == 1 ? LAST_INSERTED_ID : ROWS_UPDATED}"
- end
+ private
+ def build_insert_statement(table_name, columns_names, values, rows_qty)
+ "INSERT INTO #{table_name} ( #{columns_names} ) VALUES #{values}; #{rows_qty == 1 ? LAST_INSERTED_ID : ROWS_UPDATED}"
end
-
end
-
-end
\ No newline at end of file
+end