require 'fiber' require 'flydata/util/mysql_util' module Flydata module Parser module Mysql module MysqlAccessible def mysql_conf(conf) @mysql_conf = [:host, :port, :username, :password, :database, :ssl_ca, :sslca].inject({}) {|h, sym| h[sym] = conf[sym.to_s]; h} end def mysql_cli(conf = nil) mysql_conf(conf) if conf return FlydataMysqlClient.new(@mysql_conf) if @mysql_conf nil end end module DumpStreamIO # return position # sync command doesn't resume if pos is -1 in dump position file def pos -1 end end class MysqlTable def initialize(table_name, columns = {}, primary_keys = []) @table_name = table_name @columns = columns @primary_keys = primary_keys end attr_accessor :table_name, :columns, :primary_keys def add_column(column) @columns[column[:column_name]] = column end end class MysqlDumpGenerator def initialize(conf) @conf = conf @db_opts = [:host, :port, :username, :password, :database, :ssl_ca, :sslca].inject({}) {|h, sym| h[sym] = conf[sym.to_s]; h} end def dump(file_path) raise "subclass must implement the method" end end class MysqlDumpGeneratorNoMasterData < MysqlDumpGenerator CHANGE_MASTER_TEMPLATE = <= "5.5" # FLUSH TABLES table_names,... WITH READ LOCK syntax is supported from MySQL 5.5 result = client.query(USER_TABLES_QUERY % ["mysql", client.query_options[:database].to_s].collect{|db| "'#{db}'"}.join(",")) tables = result.collect{|r| r['tables']}.join(", ") end FLUSH_TABLES_QUERY_TEMPLATE % [tables] end VERSION_QUERY = "SHOW VARIABLES LIKE 'version'" def mysql_server_version(client) result = client.query(VERSION_QUERY) result.first['Value'] end def filter_dump_stream(cmd_out, w_io) cmd_out.each_line do |line| w_io.print line w_io.puts unless line.end_with?("\n") w_io.flush end end end # Custom mysql client that sets config params (eg:-read_timeout) uniformly for all # mysql access. Also, gives access to the last query that was executed using the client # which can be helpful when handling exceptions class FlydataMysqlClient < Mysql2::Client attr_accessor :last_query def initialize(db_opts) #TODO : Pass timeout in as a setting from the data entry super(db_opts.merge(read_timeout: 1800)) end def query(sql, options = {}) @last_query = sql begin super(sql, options) rescue Mysql2::Error => e if /^Timeout waiting for a response/ === e.to_s raise "The below query timed out when running. Please check long running processes and locks in your database.\n#{last_query}" end raise e end end end class MysqlDumpParser module State START = 'START' CREATE_TABLE = 'CREATE_TABLE' CREATE_TABLE_COLUMNS = 'CREATE_TABLE_COLUMNS' CREATE_TABLE_CONSTRAINTS = 'CREATE_TABLE_CONSTRAINTS' INSERT_RECORD = 'INSERT_RECORD' PARSING_INSERT_RECORD = 'PARSING_INSERT_RECORD' end attr_accessor :binlog_pos BINLOG_INV_ERROR_CHUNK_SIZE = 250 def initialize(option = {}) @binlog_pos = option[:binlog_pos] or raise ArgumentError.new("binlog position is required") @option = option end def parse(dump_io, create_table_block, insert_record_block, check_point_block) unless dump_io.kind_of?(IO) raise ArgumentError.new("Invalid argument. The first parameter must be io.") end invalid_file = false current_state = State::START substate = nil buffered_line = nil bytesize = 0 readline_proc = Proc.new do line = nil if buffered_line line = buffered_line buffered_line = nil else rawline = dump_io.readline.encode('utf-16', :undef => :replace, :invalid => :replace).encode('utf-8') bytesize += rawline.bytesize line = rawline.strip end line end state_start = Proc.new do line = readline_proc.call # -- Server version 5.6.21-log if line.start_with?('-- Server version') current_state = State::CREATE_TABLE check_point_block.call(nil, dump_io.pos, bytesize, @binlog_pos, current_state) end end current_table = nil state_create_table = Proc.new do line = readline_proc.call # CREATE TABLE `active_admin_comments` ( m = /^CREATE TABLE `(?[^`]+)`/.match(line) if m current_table = MysqlTable.new(m[:table_name]) current_state = State::CREATE_TABLE_COLUMNS end end state_create_table_constraints = Proc.new do line = readline_proc.call # PRIMARY KEY (`id`), if line.start_with?(')') create_table_block.call(current_table) current_state = State::INSERT_RECORD check_point_block.call(current_table, dump_io.pos, bytesize, @binlog_pos, current_state) elsif m = /^PRIMARY KEY \((?[^\)]+)\)/.match(line) current_table.primary_keys = m[:primary_keys].split(',').collect do |pk_str| pk_str[1..-2] end end end state_create_table_columns = Proc.new do start_pos = dump_io.pos line = readline_proc.call # `author_type` varchar(255) COLLATE utf8_unicode_ci DEFAULT NULL, if line.start_with?("\`") column = {} # parse column line line = line[0..-2] if line.end_with?(',') items = line.split column[:column_name] = items.shift[1..-2] column[:format_type_str] = format_type_str = items.shift pos = format_type_str.index('(') if pos ft = column[:format_type] = format_type_str[0..pos-1] if ft == 'decimal' precision, scale = format_type_str[pos+1..-2].split(',').collect{|v| v.to_i} column[:decimal_precision] = precision column[:decimal_scale] = scale else column[:format_size] = format_type_str[pos+1..-2].to_i end else column[:format_type] = format_type_str end while (item = items.shift) do case item when 'DEFAULT' value = items.shift value = value.start_with?('\'') ? value[1..-2] : value value = nil if value == 'NULL' column[:default] = value when 'NOT' if items[1] == 'NULL' items.shift column[:not_null] = true end when 'unsigned' column[:unsigned] = true else #ignore other options end end current_table.add_column(column) else current_state = State::CREATE_TABLE_CONSTRAINTS buffered_line = line state_create_table_constraints.call end end state_insert_record = Proc.new do line = readline_proc.call if line.start_with?('INSERT') buffered_line = line current_state = State::PARSING_INSERT_RECORD elsif line.start_with?('UNLOCK') current_state = State::CREATE_TABLE check_point_block.call(current_table, dump_io.pos, bytesize, @binlog_pos, current_state) end end state_parsing_insert_record = Proc.new do line = readline_proc.call values_set = InsertParser.new.parse(line) current_state = State::INSERT_RECORD if insert_record_block.call(current_table, values_set) check_point_block.call(current_table, dump_io.pos, bytesize, @binlog_pos, current_state) end end # Start reading file from top begin # resume(only when using dump file) if @option[:last_pos] && (@option[:last_pos].to_i != -1) dump_io.pos = @option[:last_pos].to_i current_state = @option[:state] substate = @option[:substate] current_table = @option[:mysql_table] bytesize = dump_io.pos end until dump_io.eof? do case current_state when State::START state_start.call when State::CREATE_TABLE state_create_table.call when State::CREATE_TABLE_COLUMNS state_create_table_columns.call when State::CREATE_TABLE_CONSTRAINTS state_create_table_constraints.call when State::INSERT_RECORD state_insert_record.call when State::PARSING_INSERT_RECORD state_parsing_insert_record.call end end end @binlog_pos end # Parse the insert line containing multiple values. (max line size is 1kb) # ex) INSERT INTO `data_entries` VALUES (2,2,'access_log'), (2,3,'access_log2'); class InsertParser #INSERT INTO `data_entries` VALUES (2,2,'access_log'), (2,3,'access_log2'); def start_ruby_prof RubyProf.start if defined?(RubyProf) and not RubyProf.running? end def stop_ruby_prof if defined?(RubyProf) and RubyProf.running? result = RubyProf.stop #printer = RubyProf::GraphPrinter.new(result) printer = RubyProf::GraphHtmlPrinter.new(result) #printer.print(STDOUT) printer.print(File.new("ruby-prof-out-#{Time.now.to_i}.html", "w"), :min_percent => 3) end end def parse(line) start_ruby_prof bench_start_time = Time.now _parse(line) ensure stop_ruby_prof if ENV['FLYDATA_BENCHMARK'] puts " -> time:#{Time.now.to_f - bench_start_time.to_f} size:#{target_line.size}" end end private def _parse(target_line) values = [] values_set = [] target_line = target_line.strip start_index = target_line.index('(') target_line = target_line[start_index..-2] # Split insert line text with ',' and take care of ',' inside of the values later. # # We are using the C native method that is like 'split', 'start_with?', 'regexp' # instead of 'String#each_char' and string comparision for the performance. # 'String#each_char' is twice as slow as the current strategy. begin items = target_line.split(',') rescue ArgumentError => e # If there are any non-UTF8 characters, split will fail # This will go through the chunk of data and cut the data up into chunks to make it # displayable to user. target_chunk="" chunk_size = BINLOG_INV_ERROR_CHUNK_SIZE chunk_repeat_count = target_line.length/chunk_size for i in 0..chunk_repeat_count line_chunk=target_line[(0+(i*chunk_size)..chunk_size+((i*chunk_size)-1))] unless line_chunk.valid_encoding? target_chunk=line_chunk break end end hex_string = target_chunk.bytes.map { |b| sprintf(", 0x%02X",b) }.join raise DumpParseError, "raw_text: #{target_chunk}\n hex_dump: #{hex_string}" end index = 0 cur_state = :next_values loop do case cur_state when :next_values chars = items[index] break unless chars items[index] = chars[1..-1] cur_state = :in_value when :in_value chars = items[index] index += 1 if chars.start_with?("'") chars_size = chars.size # single item (not last item) # size check added below otherwise end_with? matches the single quote which was also used by start_with? if chars_size > 1 and chars.end_with?("'") and !(chars.end_with?("\\'") and last_char_escaped?(chars)) values << (chars[1..-2]).gsub(ESCAPE_REGEXP, ESCAPE_HASH_TABLE) # single item (last item) # size check added below otherwise end_with? matches the single quote which was also used by start_with? elsif chars_size > 2 and chars.end_with?("')") and !(chars[0..-2].end_with?("\\'") and last_char_escaped?(chars[0..-2])) values << (chars[1..-3]).gsub(ESCAPE_REGEXP, ESCAPE_HASH_TABLE) values_set << values values = [] cur_state = :next_values # multi items else cur_value = chars[1..-1] loop do next_chars = items[index] index += 1 if next_chars.end_with?('\'') and !(next_chars.end_with?("\\'") and last_char_escaped?(next_chars)) cur_value << ',' cur_value << next_chars[0..-2] values << (cur_value).gsub(ESCAPE_REGEXP, ESCAPE_HASH_TABLE) break elsif next_chars.end_with?("')") and !(next_chars[0..-2].end_with?("\\'") and last_char_escaped?(next_chars[0..-2])) cur_value << ',' cur_value << next_chars[0..-3] values << (cur_value).gsub(ESCAPE_REGEXP, ESCAPE_HASH_TABLE) values_set << values values = [] cur_state = :next_values break else cur_value << ',' cur_value << next_chars end end end else if chars.end_with?(')') chars = chars[0..-2] values << (chars == 'NULL' ? nil : remove_leading_zeros(chars)) values_set << values values = [] cur_state = :next_values else values << (chars == 'NULL' ? nil : remove_leading_zeros(chars)) end end else raise "Invalid state: #{cur_state}" end end return values_set end ESCAPE_REGEXP = /\\\\|\\'|\\"|\\n|\\r/ ESCAPE_HASH_TABLE = {"\\\\" => "\\", "\\'" => "'", "\\\"" => "\"", "\\n" => "\n", "\\r" => "\r"} # For the peformance, this function is put inline. #def replace_escape_char(original) # original.gsub(ESCAPE_REGEXP, ESCAPE_HASH_TABLE) #end # This method assume that the last character is '(single quotation) # abcd\' -> true # abcd\\' -> false (back slash escape back slash) # abcd\\\' -> true def last_char_escaped?(text) flag = false (text.length - 2).downto(0) do |i| if text[i] == '\\' flag = !flag else break end end flag end def remove_leading_zeros(number_string) if number_string.start_with?('0') number_string.sub(/^0*([1-9][0-9]*(\.\d*)?|0(\.\d*)?)$/,'\1') else number_string end end end end class DatabaseSizeCheck include MysqlAccessible SIZE_CHECK_QUERY = <