require 'fiber' require 'io/wait' require 'mysql2' require 'flydata/parser' require 'flydata/parser/source_table' require 'flydata-core/mysql/config' require 'flydata-core/mysql/command_generator' require 'flydata-core/mysql/binlog_pos' #require 'ruby-prof' # to enable profiling, also set the class' RUN_PROFILE module Flydata module SourceMysql module Parser module MysqlAccessible def mysql_conf(conf) @mysql_conf = FlydataCore::Mysql::Config.build_mysql_db_opts(conf) end def mysql_cli(conf = nil) mysql_conf(conf) if conf return FlydataMysqlClient.new(@mysql_conf) if @mysql_conf nil end end module DumpStreamIO # return position # sync command doesn't resume if pos is -1 in dump position file def pos -1 end end class MysqlDumpGenerator def initialize(conf) @conf = conf @db_opts = FlydataCore::Mysql::Config.build_mysql_db_opts(conf) end def dump(file_path) raise "subclass must implement the method" end end class MysqlDumpGeneratorNoMasterData < MysqlDumpGenerator def dump(file_path = nil, &block) unless file_path || block raise ArgumentError.new("file_path or block must be given.") end dump_cmd = generate_dump_cmd(@conf, file_path) # RDS doesn't allow obtaining binlog position using mysqldump. Get it separately and insert it into the dump file. table_locker = create_table_locker(@conf["database"], @conf["tables"]) table_locker.resume # Lock tables begin # create pipe for callback function rd_io, wr_io = IO.pipe("utf-8", "utf-8") wr_io.sync = true wr_io.set_encoding("utf-8", "utf-8") rd_io.extend(DumpStreamIO) binlog_pos = nil # start mysqldump Open3.popen3 dump_cmd do |cmd_in, cmd_out, cmd_err, wait_thr| cmd_in.close_write cmd_out.set_encoding("utf-8", "utf-8") # mysqldump output must be in UTF-8 # wait until the command starts dumping. Two different ways to # check if file_path # mysqldump dumps to a file `file_path`. Check if the file # exists and not empty. loop do if File.size? file_path break end sleep 1 end else # mysqldump dumps to an IO `cmd_out`. Wait until it has # readable contents. cmd_out.wait_readable # wait until first line comes end binfile, pos = table_locker.resume binlog_pos = {binfile: binfile, pos: pos} threads = [] # filter dump stream and write data to pipe threads << Thread.new do begin filter_dump_stream(cmd_out, wr_io) ensure wr_io.close rescue nil end end # show err message errors = "" threads << Thread.new do cmd_err.each_line do |line| errors << line unless /^Warning:/ === line end end if block # call callback function with io if block is given block.call(rd_io, binlog_pos) end threads.each(&:join) unless wait_thr.value == 0 #Raise error if the process exited with status != 0 #(Even if there was no message on stderr stream) errors = "Failed to run mysqldump command." if errors.empty? end raise errors unless errors.empty? end binlog_pos rescue # Cleanup FileUtils.rm(file_path) if file_path && File.exists?(file_path) raise ensure # Let table_locker finish its task even if an exception happened table_locker.resume if table_locker.alive? rd_io.close rescue nil wr_io.close rescue nil end end def generate_dump_cmd(conf, file_path = nil) FlydataCore::Mysql::CommandGenerator.generate_mysqldump_without_master_data_cmd(conf.merge(result_file: file_path)) end private # This query generates a query which flushes user tables with read lock FLUSH_TABLES_QUERY_TEMPLATE = "FLUSH TABLES %s WITH READ LOCK;" USER_TABLES_QUERY = <= 360 # should take an hour to get here raise "Failed to complete FLUSH TABLES WITH READ LOCK after #{retry_count} attemps" end client.query "KILL QUERY #{thread_id}" thread_id = client.thread_id end thread_id = nil ensure if thread_id client.query "KILL QUERY #{thread_id}" end end $log.debug "lock acquired" begin Fiber.yield # Lock is acquired. Wait until it can be unlocked. # obtain binlog pos $log.debug "obtaining the master binlog pos" result = client.query "SHOW MASTER STATUS;" row = result.first if row.nil? raise "MySQL DB has no replication master status. Check if the DB is set up as a replication master. In case of RDS, make sure that Backup Retention Period is set to more than 0." end $log.debug "master binlog pos obtained: #{row['File']}\t#{row['Position']}" rescue => e $log.error e.to_s raise ensure # unlock tables begin client.query "UNLOCK TABLES;" $log.debug "lock released" client.close rescue # table will be unlocked when the MySQL session is closed, # which happens when `client` object becomes out of scope. # So, we can ignore the error here. end end [row["File"], row['Position']] end end def flush_tables_with_read_lock_query(client, database, tbls) tbls ||= [] tables = "" if mysql_server_version(client).to_f >= 5.5 # FLUSH TABLES table_names,... WITH READ LOCK syntax is supported from MySQL 5.5 tables = tbls.collect{|t| "`#{database}`.`#{t}`"}.join(",") end FLUSH_TABLES_QUERY_TEMPLATE % [tables] end VERSION_QUERY = "SHOW VARIABLES LIKE 'version'" def mysql_server_version(client) result = client.query(VERSION_QUERY) result.first['Value'] end def filter_dump_stream(cmd_out, w_io) cmd_out.each_line do |line| w_io.print line w_io.puts unless line.end_with?("\n") w_io.flush end end end # Custom mysql client that gives access to the last query that was # executed using the client which can be helpful when handling exceptions class FlydataMysqlClient < Mysql2::Client attr_accessor :last_query def query(sql, options = {}) @last_query = sql max_retry = options[:retry_count] || 0 retry_interval = options[:retry_interval] || 3 count = 0 result = nil begin result = super(sql, options) rescue => e count += 1 raise e if max_retry != -1 && count > max_retry yield(count, e) if block_given? sleep retry_interval retry end result end end class MysqlDumpParser attr_accessor :binlog_pos BINLOG_INV_ERROR_CHUNK_SIZE = 250 def initialize(dump_pos_info = {}) binlog_pos_object = dump_pos_info[:source_pos] raise ArgumentError.new("source position is required") unless binlog_pos_object @binlog_pos = { binfile: binlog_pos_object.filename, pos: binlog_pos_object.pos } @dump_pos_info = dump_pos_info end def parse(dmpio, create_table_block, insert_record_block, check_point_block) unless dmpio.kind_of?(IO) raise ArgumentError.new("Invalid argument. The first parameter must be io.") end dump_io = nil invalid_file = false current_state = Flydata::Parser::State::START substate = nil buffered_line = nil bytesize = 0 readline_proc = Proc.new do line = nil if buffered_line line = buffered_line buffered_line = nil else rawline = dump_io.readline bytesize += rawline.bytesize line = rawline.strip end line end state_start = Proc.new do line = readline_proc.call # -- Server version 5.6.21-log if line.start_with?('-- Server version') current_state = Flydata::Parser::State::CREATE_TABLE check_point_block.call(nil, dump_io.pos, bytesize, @binlog_pos, current_state) end end current_table = nil state_create_table = Proc.new do line = readline_proc.call # CREATE TABLE `active_admin_comments` ( m = /^CREATE TABLE `(?[^`]+)`/.match(line) if m current_table = Flydata::Parser::SourceTable.new(m[:table_name]) current_state = Flydata::Parser::State::CREATE_TABLE_COLUMNS end end state_create_table_constraints = Proc.new do line = readline_proc.call # PRIMARY KEY (`id`), if line.start_with?(')') create_table_block.call(current_table) current_state = Flydata::Parser::State::INSERT_RECORD check_point_block.call(current_table, dump_io.pos, bytesize, @binlog_pos, current_state) end end state_create_table_columns = Proc.new do start_pos = dump_io.pos line = readline_proc.call line = FlydataCore::StringUtils.replace_invalid_utf8_char(line) # `author_type` varchar(255) COLLATE utf8_unicode_ci DEFAULT NULL, if line.start_with?("\`") column = {} # parse column line line = line[0..-2] if line.end_with?(',') colname_end_index = line.index('`', 1) - 1 column[:column_name] = line[1..colname_end_index] line = line[colname_end_index + 3..-1] items = line.split column[:format_type_str] = format_type_str = items.shift pos = format_type_str.index('(') if pos ft = column[:format_type] = format_type_str[0..pos-1] if ft == 'decimal' precision, scale = format_type_str[pos+1..-2].split(',').collect{|v| v.to_i} column[:decimal_precision] = precision column[:decimal_scale] = scale else column[:format_size] = format_type_str[pos+1..-2].to_i end else column[:format_type] = format_type_str end while (item = items.shift) do case item when 'DEFAULT' value = items.shift value = value.start_with?('\'') ? value[1..-2] : value value = nil if value == 'NULL' column[:default] = value when 'NOT' if items[1] == 'NULL' items.shift column[:not_null] = true end when 'unsigned' column[:unsigned] = true else #ignore other options end end current_table.add_column(column) else current_state = Flydata::Parser::State::CREATE_TABLE_CONSTRAINTS buffered_line = line state_create_table_constraints.call end end state_insert_record = Proc.new do line = readline_proc.call if line.start_with?('INSERT') buffered_line = line current_state = Flydata::Parser::State::PARSING_INSERT_RECORD elsif line.start_with?('UNLOCK') current_state = Flydata::Parser::State::CREATE_TABLE check_point_block.call(current_table, dump_io.pos, bytesize, @binlog_pos, current_state) end end state_parsing_insert_record = Proc.new do line = readline_proc.call values_set = nil begin values_set = InsertParser.new.parse(line) rescue => e newe = e.class.new(e.message + "line:#{line}") newe.set_backtrace(e.backtrace) raise newe end current_state = Flydata::Parser::State::INSERT_RECORD if insert_record_block.call(current_table, values_set) values_set = nil check_point_block.call(current_table, dump_io.pos, bytesize, @binlog_pos, current_state) end end # Start reading file from top begin # resume(only when using dump file) if @dump_pos_info[:last_pos] && (@dump_pos_info[:last_pos].to_i != -1) dmpio.pos = @dump_pos_info[:last_pos].to_i current_state = @dump_pos_info[:state] substate = @dump_pos_info[:substate] current_table = @dump_pos_info[:source_table] bytesize = dmpio.pos end dump_io = AsyncIO.new(dmpio) until dump_io.eof? do case current_state when Flydata::Parser::State::START state_start.call when Flydata::Parser::State::CREATE_TABLE state_create_table.call when Flydata::Parser::State::CREATE_TABLE_COLUMNS state_create_table_columns.call when Flydata::Parser::State::CREATE_TABLE_CONSTRAINTS state_create_table_constraints.call when Flydata::Parser::State::INSERT_RECORD state_insert_record.call when Flydata::Parser::State::PARSING_INSERT_RECORD state_parsing_insert_record.call end end ensure dump_io.close end @binlog_pos end # Parse the insert line containing multiple values. (max line size is 1kb) # ex) INSERT INTO `data_entries` VALUES (2,2,'access_log'), (2,3,'access_log2'); class InsertParser #INSERT INTO `data_entries` VALUES (2,2,'access_log'), (2,3,'access_log2'); RUN_PROFILE = false def start_ruby_prof RubyProf.start if RUN_PROFILE and defined?(RubyProf) and not RubyProf.running? end def stop_ruby_prof if RUN_PROFILE and defined?(RubyProf) and RubyProf.running? result = RubyProf.stop #printer = RubyProf::GraphPrinter.new(result) printer = RubyProf::GraphHtmlPrinter.new(result) #printer.print(STDOUT) printer.print(File.new("ruby-prof-out-#{Time.now.to_i}.html", "w"), :min_percent => 3) end end def parse(line) start_ruby_prof bench_start_time = Time.now begin _parse(line) rescue DumpParseError => e $log.error "Encountered an error while parsing this chunk:\n #{e.message}. backtrace: #{e.backtrace}" raise ensure stop_ruby_prof if ENV['FLYDATA_BENCHMARK'] puts " -> time:#{Time.now.to_f - bench_start_time.to_f} size:#{target_line.size}" end end end private def _parse(target_line) values = [] values_set = [] target_line = target_line.strip start_index = target_line.index('(') target_line = target_line[start_index..-2] # Split insert line text with ',' and take care of ',' inside of the values later. # # We are using the C native method that is like 'split', 'start_with?', 'regexp' # instead of 'String#each_char' and string comparision for the performance. # 'String#each_char' is twice as slow as the current strategy. begin items = target_line.split(',') rescue ArgumentError => e # If there are any non-UTF8 characters, split will fail # This will go through the chunk of data and cut the data up into chunks to make it # displayable to user. target_chunk="" chunk_size = BINLOG_INV_ERROR_CHUNK_SIZE chunk_repeat_count = target_line.length/chunk_size for i in 0..chunk_repeat_count line_chunk=target_line[(0+(i*chunk_size)..chunk_size+((i*chunk_size)-1))] unless line_chunk.valid_encoding? target_chunk=line_chunk break end end hex_string = target_chunk.bytes.map { |b| sprintf(", 0x%02X",b) }.join raise DumpParseError, "raw_text: #{target_chunk}\n hex_dump: #{hex_string}" end index = 0 cur_state = :next_values loop do case cur_state when :next_values chars = items[index] break unless chars items[index] = chars[1..-1] cur_state = :in_value when :in_value chars = items[index] index += 1 if chars.start_with?("'") chars_size = chars.size # single item (not last item) # size check added below otherwise end_with? matches the single quote which was also used by start_with? if chars_size > 1 and chars.end_with?("'") and !(chars.end_with?("\\'") and last_char_escaped?(chars)) values << (chars[1..-2]).gsub(ESCAPE_REGEXP, ESCAPE_HASH_TABLE) # single item (last item) # size check added below otherwise end_with? matches the single quote which was also used by start_with? elsif chars_size > 2 and chars.end_with?("')") and !(chars[0..-2].end_with?("\\'") and last_char_escaped?(chars[0..-2])) values << (chars[1..-3]).gsub(ESCAPE_REGEXP, ESCAPE_HASH_TABLE) values_set << values values = [] cur_state = :next_values # multi items else cur_value = chars[1..-1] loop do next_chars = items[index] index += 1 if next_chars.end_with?('\'') and !(next_chars.end_with?("\\'") and last_char_escaped?(next_chars)) cur_value << ',' cur_value << next_chars[0..-2] values << (cur_value).gsub(ESCAPE_REGEXP, ESCAPE_HASH_TABLE) break elsif next_chars.end_with?("')") and !(next_chars[0..-2].end_with?("\\'") and last_char_escaped?(next_chars[0..-2])) cur_value << ',' cur_value << next_chars[0..-3] values << (cur_value).gsub(ESCAPE_REGEXP, ESCAPE_HASH_TABLE) values_set << values values = [] cur_state = :next_values break else cur_value << ',' cur_value << next_chars end end end else if chars.end_with?(')') chars = chars[0..-2] values << (chars == 'NULL' ? nil : remove_leading_zeros(chars)) values_set << values values = [] cur_state = :next_values else values << (chars == 'NULL' ? nil : remove_leading_zeros(chars)) end end else raise "Invalid state: #{cur_state}" end end return values_set end ESCAPE_REGEXP = /\\\\|\\'|\\"|\\n|\\r/ ESCAPE_HASH_TABLE = {"\\\\" => "\\", "\\'" => "'", "\\\"" => "\"", "\\n" => "\n", "\\r" => "\r"} # For the peformance, this function is put inline. #def replace_escape_char(original) # original.gsub(ESCAPE_REGEXP, ESCAPE_HASH_TABLE) #end # This method assume that the last character is '(single quotation) # abcd\' -> true # abcd\\' -> false (back slash escape back slash) # abcd\\\' -> true def last_char_escaped?(text) flag = false (text.length - 2).downto(0) do |i| if text[i] == '\\' flag = !flag else break end end flag end def remove_leading_zeros(number_string) if number_string.start_with?('0') number_string.sub(/^0*([1-9][0-9]*(\.\d*)?|0(\.\d*)?)$/,'\1') else number_string end end end end class DatabaseSizeCheck include MysqlAccessible SIZE_CHECK_QUERY = <