require 'flydata-core/query_job' require 'flydata/parser/source_table' # Required for Marshal.load require 'flydata/parser' require 'set' require 'flydata/source' require 'flydata/source/source_pos' module Flydata class SyncFileManager DUMP_DIR = ENV['FLYDATA_DUMP'] || File.join(FLYDATA_HOME, 'dump') BACKUP_DIR = ENV['FLYDATA_BACKUP'] || File.join(FLYDATA_HOME, 'backup') TABLE_POSITIONS_DIR = FLYDATA_TABLE_POSITIONS_DIR def initialize(data_entry, source = nil) @data_entry = data_entry @source = source #for Source dependent objects @table_position_files = {} # File objects keyed by table name end def source @source end def close @table_position_files.values.each {|f| f.close } @table_position_files = {} end def dump_file_path File.join(dump_dir, @data_entry['name']) + ".dump" end # dump pos file for resume def dump_pos_path dump_file_path + ".pos" end def save_dump_pos(status, table_name, last_pos, source_pos, state = nil, substate = nil) raise "Cannot create dump pos file because source position is unavailable." unless source_pos File.open(dump_pos_path, 'w') do |f| f.write(dump_pos_content(status, table_name, last_pos, source_pos, state, substate)) end end def load_dump_pos path = dump_pos_path return {} unless File.exists?(path) content = File.open(path, 'r').readline source_table = load_source_table_marshal_dump dump_pos_content_to_hash(content).merge( { source_table: source_table} ) end def load_generated_ddl(tables) tables = [ tables ] unless tables.kind_of?(Array) paths = table_ddl_file_paths(*tables) paths.collect{|path| begin File.open(path) {|f| f.read } rescue Errno::ENOENT nil end } end def save_generated_ddl(tables, contents = "1") tables = [ tables ] unless tables.kind_of?(Array) #Create positions if dir does not exist unless File.directory?(table_positions_dir_path) FileUtils.mkdir_p(table_positions_dir_path) end tables.each do |tab| File.open(File.join(table_positions_dir_path, "#{tab}.generated_ddl"), 'w') {|f| f.write(contents) } end end def get_new_table_list(tables, file_type) new_tables = [] tables.each do |table| new_tables << table unless File.exists?(File.join(table_positions_dir_path, "#{table}.#{file_type}")) end new_tables end SOURCE_TABLE_EXT = "mysql_table" # SourceTable marshal file def source_table_marshal_dump_path dump_file_path + ".#{SOURCE_TABLE_EXT}" end def save_source_table_marshal_dump(source_table) File.open(source_table_marshal_dump_path, 'w') do |f| f.write Marshal.dump(source_table) end end # master binlog.pos file def save_source_pos(source_pos) path = source_pos_path File.open(path, 'w') do |f| f.write(source_pos.to_s) end end def load_source_pos(file_path = source_pos_path) return nil unless File.exists?(file_path) source_pos_str = IO.read(file_path).strip begin context = source.source_pos source_pos = context.create_source_pos(source_pos_str) rescue RuntimeError return nil end source_pos end def source_pos_path File.join(FLYDATA_HOME, @data_entry['name'] + ".binlog.pos") end # sent source pos file (binlog.pos) def save_sent_source_pos(source_pos) File.open(sent_source_pos_path, 'w') do |f| f.write(source_pos.to_s) end end def load_sent_source_pos(file_path = sent_source_pos_path) return nil unless File.exists?(file_path) source_pos_str = IO.read(file_path).strip begin source_pos = source.source_pos.create_source_pos(source_pos_str) rescue RuntimeError return nil end source_pos end def sent_source_pos_path(master_source_pos_path = source_pos_path) validate_master_source_pos_path(master_source_pos_path) "#{master_source_pos_path[0..-5]}.sent.pos" end # ssl_ca file path def ssl_ca_path(master_source_pos_path = source_pos_path) validate_master_source_pos_path(master_source_pos_path) # .ssl_ca.pem "#{master_source_pos_path[0..-12]}.ssl_ca.pem" end def save_ssl_ca(ssl_ca_content, path = ssl_ca_path) File.open(path, 'w') do |f| f.write(ssl_ca_content) end end # ssl_cipher file path def ssl_cipher_path(master_source_pos_path = source_pos_path) validate_master_source_pos_path(master_source_pos_path) # .ssl_cipher "#{master_source_pos_path[0..-12]}.ssl_cipher" end def save_ssl_cipher(ssl_cipher_content, path = ssl_cipher_path) File.open(path, 'w') do |f| f.write(ssl_cipher_content) end end # table files def reset_table_position_files(tables) tables.each do |table_name| file = File.join(table_positions_dir_path, table_name + ".pos") File.open(file, "w") {|f| f.write('0') } end end def table_positions_dir_path TABLE_POSITIONS_DIR end def table_position_file_paths(*tables) tables.empty? ? Dir.glob(File.join(table_positions_dir_path, '*.pos')) : tables.map{|table| File.join(table_positions_dir_path, table + '.pos')} end def table_ddl_file_paths(*tables) tables.empty? ? Dir.glob(File.join(table_positions_dir_path, '*.generated_ddl')) : tables.map{|table| File.join(table_positions_dir_path, table + '.generated_ddl')} end def table_source_pos_paths(*tables) tables.empty? ? Dir.glob(File.join(table_positions_dir_path, '*.binlog.pos')) : tables.map{|table| File.join(table_positions_dir_path, table + '.binlog.pos')} end def table_source_pos_init_paths(*tables) tables.empty? ? Dir.glob(File.join(table_positions_dir_path, '*.binlog.pos.init')) : tables.map{|table| File.join(table_positions_dir_path, table + '.binlog.pos.init')} end def increment_table_position(seq) seq = seq.to_i + 1 seq = FlydataCore::QueryJob::SYNC_FIRST_SEQ if seq == 1 seq end # Read a sequence number from the table's position file, # increment the number and pass the number to a block. # After executing the block, saves the value to the position # file. def increment_and_save_table_position(table_name) seq = get_table_position(table_name) seq = increment_table_position(seq) # logical transaction starts yield(seq) save_table_position(table_name, seq) # logical transaction ends end def open_table_position_file(table_name) file = File.join(table_positions_dir_path, table_name + ".pos") retry_count = 0 begin @table_position_files[table_name] ||= (f = File.open(file, File::RDWR); f.sync = true; f) rescue Errno::ENOENT raise if retry_count > 0 # Already retried. Must be a differentfile causing the error # File not exist. Create one with initial value of '0' File.open(file, "w") {|f| f.write('0') } retry_count += 1 retry end @table_position_files[table_name] end def save_table_position(table_name, seq) f = open_table_position_file(table_name) prev_seq_len = f.size seq_to_write = seq.to_s new_seq_len = seq_to_write.size if new_seq_len < prev_seq_len seq_to_write += " " * (prev_seq_len - new_seq_len) end f.write(seq_to_write) f.truncate(new_seq_len) if new_seq_len < prev_seq_len f.rewind end def get_table_position(table_name) f = open_table_position_file(table_name) seq = f.read f.rewind seq end def get_next_table_position(table_name) seq = get_table_position(table_name) increment_table_position(seq) end def lock_pid_file FLYDATA_LOCK end def sync_info_file File.join(dump_dir, "sync.info") end def save_sync_info(initial_sync, tables) File.open(sync_info_file, "w") do |f| f.write([initial_sync, tables.join(" ")].join("\t")) end end def load_sync_info return nil unless File.exists?(sync_info_file) items = File.open(sync_info_file, 'r').readline.split("\t") { initial_sync: (items[0] == 'true'), tables: items[1].split(" ") } end def get_table_source_pos_init(table_name) file = File.join(table_positions_dir_path, table_name + ".binlog.pos.init") return nil unless File.exists?(file) source.source_pos.create_source_pos( File.open(file, 'r').readline ) end def table_rev_file_path(table_name) File.join(table_positions_dir_path, table_name + ".rev") end def table_rev_file_paths(*tables) tables.empty? ? Dir.glob(File.join(table_positions_dir_path, "*.rev")) : tables.map{|table| table_rev_file_path(table)} end def table_rev(table_name) file = table_rev_file_path(table_name) return 1 unless File.exists?(file) #default revision is 1 File.open(file, "r+") do |f| seq = f.read if seq.empty? return 1 else return seq.to_i end end end def increment_table_rev(table_name, base_rev) file = table_rev_file_path(table_name) new_rev = base_rev + 1 File.open(file, "w") do |f| f.write(new_rev) end new_rev end def delete_table_position_files(*tables) files_to_delete = [ table_position_file_paths(*tables), table_source_pos_paths(*tables), table_source_pos_init_paths(*tables), ] files_to_delete.flatten.each do |path| FileUtils.rm(path) if File.exists?(path) end end def delete_table_rev_files(*tables) files_to_delete = table_rev_file_paths(*tables) files_to_delete.each do |path| FileUtils.rm(path) if File.exists?(path) end end def delete_table_ddl_files(*tables) files_to_delete = table_ddl_file_paths(*tables) files_to_delete.each do |path| FileUtils.rm(path) if File.exists?(path) end end def tables_from_positions_dir all_table_control_files = Dir.glob(File.join(table_positions_dir_path, '*.{pos,generated_ddl,init,rev}')) tables = Set.new all_table_control_files.each do |control_file| file_name = File.basename(control_file) file_name = file_name.slice(0...(file_name.index('.'))) tables << file_name end tables.to_a end def delete_dump_files files_to_delete = [ dump_file_path, dump_pos_path, source_table_marshal_dump_path, sync_info_file, stats_path ] files_to_delete.flatten.each do |file_to_delete| FileUtils.rm(file_to_delete) if File.exists?(file_to_delete) end end def delete_master_position_files files_to_delete = [ source_pos_path, sent_source_pos_path, lock_pid_file, ] files_to_delete.flatten.each do |file_to_delete| FileUtils.rm(file_to_delete) if File.exists?(file_to_delete) end end def delete_table_source_pos(table_name) file = File.join(table_positions_dir_path, table_name + ".binlog.pos") if File.exists?(file) FileUtils.rm(file, :force => true) else puts "#{file} does not exist. Something is wrong. Did you delete the file manually when flydata was running?" end end def save_table_source_pos(tables, source_pos, options = {}) dest_dir = case options[:destination] when :positions; table_positions_dir_path when :dump; dump_dir else dump_dir end tables = [ tables ] unless tables.kind_of?(Array) tables.each do |table_name| file = File.join(dest_dir, table_name + ".binlog.pos") File.open(file, "w") do |f| f.write(source_pos.to_s) end end end def get_table_source_pos(table_name) source_pos_str = get_table_source_raw_pos(table_name) return nil unless source_pos_str source.source_pos.create_source_pos( source_pos_str ) end def get_table_source_raw_pos(table_name) #returns String. interface for fluentd file = table_source_pos_file_path(table_name) return nil unless File.exists?(file) File.open(file, 'r').readline end def table_source_pos_file_path(table_name) File.join(table_positions_dir_path, table_name + ".binlog.pos") end def install_table_source_pos_files(tables) FileUtils.mkdir_p(table_positions_dir_path) unless Dir.exists?(table_positions_dir_path) tables.each do |table_name| file_name = table_name + ".binlog.pos" src_file = File.join(dump_dir, file_name) if ! File.exists?(src_file) raise "#{src_file} does not exist. Error!!" end FileUtils.mv(src_file, table_positions_dir_path) # save the position at initial sync. this is used for repair if # necessary. FileUtils.cp(File.join(table_positions_dir_path, file_name), File.join(table_positions_dir_path, file_name + ".init")) end end def delete_dump_file FileUtils.rm(dump_file_path) if File.exists?(dump_file_path) end def backup_dump_dir backup_dir = BACKUP_DIR.dup FileUtils.mkdir_p(backup_dir) unless Dir.exists?(backup_dir) dest_dir = File.join(backup_dir, Time.now.strftime("%Y%m%d%H%M%S")) FileUtils.mkdir(dest_dir) ['info', 'pos', 'stats', SOURCE_TABLE_EXT].each do |ext| FileUtils.mv(Dir.glob("#{dump_dir}/*.#{ext}"), dest_dir) end end def backup_dir BACKUP_DIR end def stats_path File.join(dump_dir, @data_entry['name']) + ".stats" end def save_record_count_stat(table, record_count) stats = load_stats || Hash.new stats[table] = stats[table] ? stats[table].to_i + record_count : record_count save_stats(stats) end def load_stats return nil unless File.exists?(stats_path) Hash[*File.read(stats_path).split(/\t/)] end private def save_stats(stats) File.open(stats_path, 'w') do |f| f.write(stats.flatten.join("\t")) end end def dump_pos_content(status, table_name, last_pos, source_pos, state = nil, substate = nil) [status, table_name, last_pos, source_pos.to_s, state, substate].join("\t") end def dump_pos_content_to_hash(content) items = content.split("\t",-1) # -1 means,(1) there is no limit to the number of fields returned # (2) trailing null fields are not suppressed. raise "Invalid dump.pos file: #{path}" unless items.length >= 6 status = items[0] table_name = items[1] last_pos = items[2].to_i if status == Command::Sync::STATUS_START #Use only source_pos information. Discard and initialize other items table_name = nil last_pos = nil end source_pos_str = items[3..-3].join("\t") context = source.source_pos source_pos = context.create_source_pos(source_pos_str) state = get_parse_state(items[-2]) substate = get_parse_state(items[-1]) { status: status, table_name: table_name, last_pos: last_pos, source_pos: source_pos, state: state, substate: substate } end def get_parse_state(state) state.empty? ? nil : state end def load_source_table_marshal_dump path = source_table_marshal_dump_path return nil unless File.exists?(path) Marshal.load(File.open(path, 'r')) end OLD_DUMP_DIR_PROPERTY = 'mysqldump_dir' DUMP_DIR_PROPERTY = 'dump_dir' def dump_dir pref = @data_entry['mysql_data_entry_preference'] || @data_entry['postgresql_data_entry_preference'] dump_dir = nil dump_dir_property = DUMP_DIR_PROPERTY if pref # check the old property for backward compatibility [DUMP_DIR_PROPERTY, OLD_DUMP_DIR_PROPERTY].each do |prop| if pref[prop] dump_dir_property = prop dump_dir = pref[prop] break end end end if dump_dir dump_dir = dump_dir.dup dump_dir[0] = ENV['HOME'] if dump_dir.match(/^~$|^~\//) else dump_dir = DUMP_DIR.dup end if File.exists?(dump_dir) and not Dir.exists?(dump_dir) raise "'#{dump_dir_property}'(#{dump_dir}) must be a directory." end FileUtils.mkdir_p(dump_dir) unless Dir.exists?(dump_dir) dump_dir end def validate_master_source_pos_path(master_source_pos_path) unless master_source_pos_path && master_source_pos_path.end_with?('binlog.pos') raise ArgumentError.new("Invalid source position path. Source position path needs to end with 'binlog.pos'") end end end end