require 'mysql2' require 'flydata-core/query_job' module Flydata class SyncFileManager DUMP_DIR = ENV['FLYDATA_DUMP'] || File.join(FLYDATA_HOME, 'dump') BACKUP_DIR = ENV['FLYDATA_BACKUP'] || File.join(FLYDATA_HOME, 'backup') TABLE_POSITIONS_DIR = ENV['FLYDATA_TABLE_POSITIONS'] || File.join(FLYDATA_HOME, 'positions') def initialize(data_entry) @data_entry = data_entry @table_position_files = {} # File objects keyed by table name end def close @table_position_files.values.each {|f| f.close } @table_position_files = {} end def dump_file_path File.join(dump_dir, @data_entry['name']) + ".dump" end # dump pos file for resume def dump_pos_path dump_file_path + ".pos" end def save_dump_pos(status, table_name, last_pos, binlog_pos, state = nil, substate = nil) File.open(dump_pos_path, 'w') do |f| f.write(dump_pos_content(status, table_name, last_pos, binlog_pos, state, substate)) end end def load_dump_pos path = dump_pos_path return {} unless File.exists?(path) items = File.open(path, 'r').readline.split("\t") raise "Invalid dump.pos file: #{path}" unless items.length >= 5 && items.length <= 7 mysql_table = load_mysql_table_marshal_dump { status: items[0], table_name: items[1], last_pos: items[2].to_i, binlog_pos: {binfile: items[3], pos: items[4].to_i}, state: items[5], substate: items[6], mysql_table: mysql_table} end def load_generated_ddl(tables) tables = [ tables ] unless tables.kind_of?(Array) paths = table_ddl_file_paths(*tables) paths.collect{|path| begin File.open(path) {|f| f.read } rescue Errno::ENOENT nil end } end def save_generated_ddl(tables, contents = "1") tables = [ tables ] unless tables.kind_of?(Array) table_positions_dir_path = ENV['FLYDATA_TABLE_POSITIONS'] || File.join(FLYDATA_HOME, 'positions') #Create positions if dir does not exist unless File.directory?(table_positions_dir_path) FileUtils.mkdir_p(table_positions_dir_path) end tables.each do |tab| File.open(File.join(table_positions_dir_path, "#{tab}.generated_ddl"), 'w') {|f| f.write(contents) } end end def get_new_table_list(tables, file_type) table_positions_dir_path = ENV['FLYDATA_TABLE_POSITIONS'] || File.join(FLYDATA_HOME, 'positions') new_tables = [] tables.each do |table| new_tables << table unless File.exists?(File.join(table_positions_dir_path, "#{table}.#{file_type}")) end new_tables end # MysqlTable marshal file def mysql_table_marshal_dump_path dump_file_path + ".mysql_table" end def save_mysql_table_marshal_dump(mysql_table) File.open(mysql_table_marshal_dump_path, 'w') do |f| f.write Marshal.dump(mysql_table) end end # master binlog.pos file def save_binlog(binlog_pos) path = binlog_path File.open(path, 'w') do |f| f.write(binlog_content(binlog_pos)) end end def load_binlog(file_path = binlog_path) return nil unless File.exists?(file_path) f, pos = IO.read(file_path).strip.split("\t") return nil if f.nil? || f.empty? || pos.nil? { binfile: f, pos: pos.to_i } end def binlog_path File.join(FLYDATA_HOME, @data_entry['name'] + ".binlog.pos") end # sent binlog.pos file def save_sent_binlog(binlog_pos) File.open(sent_binlog_path, 'w') do |f| f.write(binlog_content(binlog_pos)) end end def sent_binlog_path(master_binlog_path = binlog_path) unless master_binlog_path && master_binlog_path.end_with?('binlog.pos') raise ArgumentError.new("Invalid binlog path. binlog path needs to end with 'binlog.pos'") end "#{master_binlog_path[0..-5]}.sent.pos" end # ssl_ca file path def ssl_ca_path(master_binlog_path = binlog_path) unless master_binlog_path && master_binlog_path.end_with?('binlog.pos') raise ArgumentError.new("Invalid binlog path. binlog path needs to end with 'binlog.pos'") end # .ssl_ca.pem "#{master_binlog_path[0..-12]}.ssl_ca.pem" end def save_ssl_ca(ssl_ca_content, path = ssl_ca_path) File.open(path, 'w') do |f| f.write(ssl_ca_content) end end # table files def reset_table_position_files(tables) tables.each do |table_name| file = File.join(table_positions_dir_path, table_name + ".pos") File.open(file, "w") {|f| f.write('0') } end end def table_positions_dir_path TABLE_POSITIONS_DIR end def table_position_file_paths(*tables) tables.empty? ? Dir.glob(File.join(table_positions_dir_path, '*.pos')) : tables.map{|table| File.join(table_positions_dir_path, table + '.pos')} end def table_ddl_file_paths(*tables) tables.empty? ? Dir.glob(File.join(table_positions_dir_path, '*.generated_ddl')) : tables.map{|table| File.join(table_positions_dir_path, table + '.generated_ddl')} end def table_binlog_pos_paths(*tables) tables.empty? ? Dir.glob(File.join(table_positions_dir_path, '*.binlog.pos')) : tables.map{|table| File.join(table_positions_dir_path, table + '.binlog.pos')} end def table_binlog_pos_init_paths(*tables) tables.empty? ? Dir.glob(File.join(table_positions_dir_path, '*.binlog.pos.init')) : tables.map{|table| File.join(table_positions_dir_path, table + '.binlog.pos.init')} end # Read a sequence number from the table's position file, # increment the number and pass the number to a block. # After executing the block, saves the value to the position # file. def increment_and_save_table_position(table_name) file = File.join(table_positions_dir_path, table_name + ".pos") retry_count = 0 begin @table_position_files[table_name] ||= File.open(file, "r+") rescue Errno::ENOENT raise if retry_count > 0 # Already retried. Must be a differentfile causing the error # File not exist. Create one with initial value of '0' File.open(file, "w") {|f| f.write('0') } retry_count += 1 retry end f = @table_position_files[table_name] seq = f.read seq = seq.to_i + 1 seq = FlydataCore::QueryJob::SYNC_FIRST_SEQ if seq == 1 begin yield(seq) ensure # when an error happened in yield, the sequence number should remain # as is. For the next call to read the value correctly, the position # must be rewound. f.rewind end f.truncate(0) f.write(seq) f.flush f.rewind end def sync_info_file File.join(dump_dir, "sync.info") end def save_sync_info(initial_sync, tables) File.open(sync_info_file, "w") do |f| f.write([initial_sync, tables.join(" ")].join("\t")) end end def load_sync_info return nil unless File.exists?(sync_info_file) items = File.open(sync_info_file, 'r').readline.split("\t") { initial_sync: (items[0] == 'true'), tables: items[1].split(" ") } end def get_table_binlog_pos(table_name) file = File.join(table_positions_dir_path, table_name + ".binlog.pos") return nil unless File.exists?(file) File.open(file, 'r').readline end def table_rev_file_path(table_name) File.join(table_positions_dir_path, table_name + ".rev") end def table_rev_file_paths(*tables) tables.empty? ? Dir.glob(File.join(table_positions_dir_path, "*.rev")) : tables.map{|table| table_rev_file_path(table)} end def table_rev(table_name) file = table_rev_file_path(table_name) return 1 unless File.exists?(file) #default revision is 1 File.open(file, "r+") do |f| seq = f.read if seq.empty? return 1 else return seq.to_i end end end def increment_table_rev(table_name, base_rev) file = table_rev_file_path(table_name) new_rev = base_rev + 1 File.open(file, "w") do |f| f.write(new_rev) end new_rev end def delete_table_binlog_pos(table_name) file = File.join(table_positions_dir_path, table_name + ".binlog.pos") if File.exists?(file) FileUtils.rm(file, :force => true) else puts "#{file} does not exist. Something is wrong. Did you delete the file manually when flydata was running?" end end def save_table_binlog_pos(tables, binlog_pos) tables.each do |table_name| file = File.join(dump_dir, table_name + ".binlog.pos") File.open(file, "w") do |f| f.write(binlog_content(binlog_pos)) end end end def install_table_binlog_files(tables) FileUtils.mkdir_p(table_positions_dir_path) unless Dir.exists?(table_positions_dir_path) tables.each do |table_name| file_name = table_name + ".binlog.pos" src_file = File.join(dump_dir, file_name) if ! File.exists?(src_file) raise "#{src_file} does not exist. Error!!" end FileUtils.mv(src_file, table_positions_dir_path) # save the position at initial sync. this is used for repair if # necessary. FileUtils.cp(File.join(table_positions_dir_path, file_name), File.join(table_positions_dir_path, file_name + ".init")) end end def delete_dump_file FileUtils.rm(dump_file_path) if File.exists?(dump_file_path) end def backup_dump_dir backup_dir = BACKUP_DIR.dup FileUtils.mkdir_p(backup_dir) unless Dir.exists?(backup_dir) dest_dir = File.join(backup_dir, Time.now.strftime("%Y%m%d%H%M%S")) FileUtils.mkdir(dest_dir) FileUtils.mv(Dir.glob("#{dump_dir}/*"), dest_dir) end def backup_dir BACKUP_DIR end def stats_path File.join(dump_dir, @data_entry['name']) + ".stats" end def save_record_count_stat(table, record_count) stats = load_stats || Hash.new stats[table] = stats[table] ? stats[table].to_i + record_count : record_count save_stats(stats) end def load_stats return nil unless File.exists?(stats_path) Hash[*File.read(stats_path).split(/\t/)] end private def save_stats(stats) File.open(stats_path, 'w') do |f| f.write(stats.flatten.join("\t")) end end def dump_pos_content(status, table_name, last_pos, binlog_pos, state = nil, substate = nil) [status, table_name, last_pos, binlog_content(binlog_pos), state, substate].join("\t") end def binlog_content(binlog_pos) [binlog_pos[:binfile], binlog_pos[:pos]].join("\t") end def load_mysql_table_marshal_dump path = mysql_table_marshal_dump_path return nil unless File.exists?(path) Marshal.load(File.open(path, 'r')) end def dump_dir pref = @data_entry['mysql_data_entry_preference'] dump_dir = if pref and pref['mysqldump_dir'] pref['mysqldump_dir'] else nil end if dump_dir dump_dir = dump_dir.dup dump_dir[0] = ENV['HOME'] if dump_dir.match(/^~$|^~\//) else dump_dir = DUMP_DIR.dup end if File.exists?(dump_dir) and not Dir.exists?(dump_dir) raise "'mysqldump_dir'(#{dump_dir}) must be a directory." end FileUtils.mkdir_p(dump_dir) unless Dir.exists?(dump_dir) dump_dir end end end