module Flydata module FileUtil class SyncFileManager DUMP_DIR = ENV['FLYDATA_DUMP'] || File.join(FLYDATA_HOME, 'dump') BACKUP_DIR = ENV['FLYDATA_BACKUP'] || File.join(FLYDATA_HOME, 'backup') TABLE_POSITIONS_DIR = ENV['FLYDATA_TABLE_POSITIONS'] || File.join(FLYDATA_HOME, 'positions') def initialize(data_entry) @data_entry = data_entry end def dump_file_path File.join(dump_dir, @data_entry['name']) + ".dump" end # dump pos file for resume def dump_pos_path dump_file_path + ".pos" end def save_dump_pos(status, table_name, last_pos, binlog_pos, state = nil, substate = nil) File.open(dump_pos_path, 'w') do |f| f.write(dump_pos_content(status, table_name, last_pos, binlog_pos, state, substate)) end end def load_dump_pos path = dump_pos_path return {} unless File.exists?(path) items = File.open(path, 'r').readline.split("\t") raise "Invalid dump.pos file: #{path}" unless items.length >= 5 && items.length <= 7 mysql_table = load_mysql_table_marshal_dump { status: items[0], table_name: items[1], last_pos: items[2].to_i, binlog_pos: {binfile: items[3], pos: items[4].to_i}, state: items[5], substate: items[6], mysql_table: mysql_table} end # MysqlTable marshal file def mysql_table_marshal_dump_path dump_file_path + ".mysql_table" end def save_mysql_table_marshal_dump(mysql_table) File.open(mysql_table_marshal_dump_path, 'w') do |f| f.write Marshal.dump(mysql_table) end end # binlog.pos file def save_binlog(binlog_pos) path = binlog_path File.open(path, 'w') do |f| f.write(binlog_content(binlog_pos)) end end def binlog_path File.join(FLYDATA_HOME, @data_entry['name'] + ".binlog.pos") end def reset_table_position_files(tables) tables.each do |table_name| file = File.join(table_positions_dir_path, table_name + ".pos") File.open(file, "w") {|f| f.write('0') } end end def table_positions_dir_path TABLE_POSITIONS_DIR end def table_position_file_paths(*tables) tables.empty? ? Dir.glob(File.join(table_positions_dir_path, '*.pos')) : tables.map{|table| File.join(table_positions_dir_path, table + '.pos')} end # Read a sequence number from the table's position file, # increment the number and pass the number to a block. # After executing the block, saves the value to the position # file. def increment_and_save_table_position(table_name) file = File.join(table_positions_dir_path, table_name + ".pos") retry_count = 0 begin File.open(file, "r+") do |f| seq = f.read seq = seq.to_i + 1 yield(seq) f.rewind f.truncate(0) f.write(seq) end rescue Errno::ENOENT raise if retry_count > 0 # Already retried. Must be a differentfile causing the error # File not exist. Create one with initial value of '0' File.open(file, "w") {|f| f.write('0') } retry_count += 1 retry end end def sync_info_file File.join(dump_dir, "sync.info") end def save_sync_info(initial_sync, tables) File.open(sync_info_file, "w") do |f| f.write([initial_sync, tables].join("\t")) end end def load_sync_info return nil unless File.exists?(sync_info_file) items = File.open(sync_info_file, 'r').readline.split("\t") { initial_sync: (items[0] == 'true'), tables: items[1] } end def get_table_binlog_pos(table_name) file = File.join(table_positions_dir_path, table_name + ".binlog.pos") return nil unless File.exists?(file) File.open(file, 'r').readline end def table_rev_file_path(table_name) File.join(table_positions_dir_path, table_name + ".rev") end def table_rev_file_paths(*tables) tables.empty? ? Dir.glob(File.join(table_positions_dir_path, "*.rev")) : tables.map{|table| table_rev_file_path(table)} end def table_rev(table_name) file = table_rev_file_path(table_name) return 1 unless File.exists?(file) #default revision is 1 File.open(file, "r+") do |f| seq = f.read if seq.empty? return 1 else return seq.to_i end end end def increment_table_rev(table_name, base_rev) file = table_rev_file_path(table_name) new_rev = base_rev + 1 File.open(file, "w") do |f| f.write(new_rev) end new_rev end def delete_table_binlog_pos(table_name) file = File.join(table_positions_dir_path, table_name + ".binlog.pos") if File.exists?(file) FileUtils.rm(file, :force => true) else puts "#{file} does not exist. Something is wrong. Did you delete the file manually when flydata was running?" end end def save_table_binlog_pos(tables, binlog_pos) tables.split(" ").each do |table_name| file = File.join(dump_dir, table_name + ".binlog.pos") File.open(file, "w") do |f| f.write(binlog_content(binlog_pos)) end end end def move_table_binlog_files(tables) FileUtils.mkdir_p(table_positions_dir_path) unless Dir.exists?(table_positions_dir_path) tables.each do |table_name| file = File.join(dump_dir, table_name + ".binlog.pos") if ! File.exists?(file) raise "#{file} does not exist. Error!!" end FileUtils.mv(file, table_positions_dir_path) end end def backup_dump_dir backup_dir = BACKUP_DIR.dup FileUtils.mkdir_p(backup_dir) unless Dir.exists?(backup_dir) dest_dir = File.join(backup_dir, Time.now.strftime("%Y%m%d%H%M%S")) FileUtils.mkdir(dest_dir) FileUtils.mv(dump_dir, dest_dir) end private def dump_pos_content(status, table_name, last_pos, binlog_pos, state = nil, substate = nil) [status, table_name, last_pos, binlog_content(binlog_pos), state, substate].join("\t") end def binlog_content(binlog_pos) [binlog_pos[:binfile], binlog_pos[:pos]].join("\t") end def load_mysql_table_marshal_dump path = mysql_table_marshal_dump_path return nil unless File.exists?(path) Marshal.load(File.open(path, 'r')) end def dump_dir pref = @data_entry['mysql_data_entry_preference'] dump_dir = if pref and pref['mysqldump_dir'] pref['mysqldump_dir'] else nil end if dump_dir dump_dir = dump_dir.dup dump_dir[0] = ENV['HOME'] if dump_dir.match(/^~$|^~\//) else dump_dir = DUMP_DIR.dup end if File.exists?(dump_dir) and not Dir.exists?(dump_dir) raise "'mysqldump_dir'(#{dump_dir}) must be a directory." end FileUtils.mkdir_p(dump_dir) unless Dir.exists?(dump_dir) dump_dir end end end end