require 'fiber' require 'msgpack' require 'open3' require 'mysql2' require 'flydata/sync_file_manager' require 'flydata/agent_errors' require 'flydata/compatibility_check' require 'flydata/table_def' require 'flydata/output/forwarder' require 'flydata/parser/mysql/dump_parser' #require 'ruby-prof' module Flydata module Command class Sync < Base include Helpers INSERT_PROGRESS_INTERVAL = 1000 # for dump.pos file STATUS_PARSING = 'PARSING' STATUS_WAITING = 'WAITING' STATUS_COMPLETE = 'COMPLETE' def self.slop do on 'c', 'skip-cleanup', 'Skip server cleanup' on 'y', 'yes', 'Skip command prompt assuming yes to all questions. Use this for batch operation.' on 'd', 'dump-file', 'Dump mysqldump into a file. Use this for debugging after making sure the free space.' end end def run(*tables) sender = if (sender.process_exist?) if tables.empty? # full sync puts "FlyData Agent is already running. If you'd like to restart FlyData Sync from scratch, run 'flydata sync:reset' first." else # per-table sync puts "Flydata Agent is already running. If you'd like to Sync the table(s), run 'flydata sync:flush' first." end exit 1 end de = retrieve_data_entry de = load_sync_info(override_tables(de, tables)) validate_initial_sync_status(de, tables) flush_buffer_and_stop unless de['mysql_data_entry_preference']['initial_sync'] sync_mysql_to_redshift(de) end def self.slop_flush do on 'y', 'yes', 'Skip command prompt assuming yes to all questions. Use this for batch operation.' end end def flush flush_buffer_and_stop puts "Buffers have been flushed and the sender process has been stopped." end def self.slop_reset do on 'c', 'client', 'Resets client only.' on 'y', 'yes', 'Skip command prompt assuming yes to all questions. Use this for batch operation.' end end def reset(*tables) msg = tables.empty? ? '' : " for these tables : #{tables.join(" ")}" return unless ask_yes_no("This resets the current sync#{msg}. Are you sure?") sender = sender.flush_client_buffer # TODO We should rather delete buffer files sender.stop de = retrieve_data_entry wait_for_server_buffer cleanup_sync_server(de, tables) unless opts.client? sync_fm = delete_files = [ sync_fm.dump_file_path, sync_fm.dump_pos_path, sync_fm.mysql_table_marshal_dump_path, sync_fm.sync_info_file, sync_fm.table_position_file_paths(*tables), sync_fm.table_rev_file_paths(*tables) ] delete_files << sync_fm.binlog_path if tables.empty? delete_files.flatten.each do |path| FileUtils.rm(path) if File.exists?(path) end puts "Reset completed successfully." end def wait_for_server_buffer puts "Waiting for the server buffer to get empty" while (status = check) && (status['state'] == 'processing') print_progress(status) sleep 10 end end def wait_for_server_data_processing state = :PROCESS puts "Uploading data to Redshift..." sleep 10 status = nil while (status = check) if state == :PROCESS && status['state'] == 'uploading' puts " -> Done" state = :UPLOAD puts "Finishing data upload..." end print_progress(status) sleep 10 end if (state == :PROCESS) # :UPLOAD state was skipped due to no data puts " -> Done" puts "Finishing data upload..." end puts " -> Done" end def check de = retrieve_data_entry retry_on(RestClient::Exception) do status = do_check(de) if status['complete'] nil else status end end end # skip initial sync def skip de = retrieve_data_entry sync_fm = binlog_path = sync_fm.binlog_path `touch #{binlog_path}` puts "Created an empty binlog position file." puts "-> #{binlog_path}" puts "Run 'flydata start' to start continuous sync." end def self.slop_generate_table_ddl do on 'c', 'ctl-only', 'Only generate FlyData Control definitions' on 'y', 'yes', 'Skip command prompt assuming yes to all questions. Use this for batch operation.' end end def generate_table_ddl(*tables) de = retrieve_data_entry dp = flydata.data_port.get, de['mysql_data_entry_preference']).check do_generate_table_ddl(override_tables(de, tables)) end private def validate_initial_sync_status(de, tables) sync_fm = dump_pos_info = sync_fm.load_dump_pos fp = sync_fm.dump_file_path # status is parsing but dumpfile doesn't exist due to streaming -> raise error if dump_pos_info[:status] == STATUS_PARSING && !File.exists?(fp) raise "FlyData Sync was interrupted with invalid state. Run 'flydata sync:reset#{tables.empty? ? '' : ' ' + tables.join(' ')}' first." end end def retrieve_data_entry de = retrieve_data_entries.first raise "There are no data entry." unless de case de['type'] when 'RedshiftMysqlDataEntry' mp = de['mysql_data_entry_preference'] if mp['tables_append_only'] mp['tables'] = (mp['tables'].split(",") + mp['tables_append_only'].split(",")).uniq.join(",") end else raise "No supported data entry. Only mysql-redshift sync is supported." end de end def cleanup_sync_server(de, tables = []) print "Cleaning the server." worker = do begin flydata.data_entry.cleanup_sync(de['id'], tables) rescue RestClient::RequestTimeout # server is taking time to cleanup. Try again retry end end until worker.join(5) print "." end puts puts "Done." end def do_check(de) flydata.data_entry.buffer_stat(de['id'], env_mode) end def print_progress(buffer_stat) message = buffer_stat['message'] puts message unless message.nil? || message.empty? end DDL_DUMP_CMD_TEMPLATE = "MYSQL_PWD=\"%s\" mysqldump --protocol=tcp -d -h %s -P %s -u %s %s %s" def do_generate_table_ddl(de) if `which mysqldump`.empty? raise "mysqldump is not installed. mysqldump is required to run the command" end error_list = [] schema_name = (de['schema_name'] || nil) mp = de['mysql_data_entry_preference'] params = [] params << mp['password'] if !mp['host'].empty? then params << mp['host'] else raise "MySQL `host` is neither defined in the data entry nor the local config file" end params << (mp['port'] or '3306') if !mp['username'].empty? then params << mp['username'] else raise "MySQL `username` is neither defined in the data entry nor the local config file" end if !mp['database'].empty? then params << mp['database'] else raise "`database` is neither defined in the data entry nor the local config file" end if !mp['tables'].empty? then params << mp['tables'].gsub(/,/, ' ') else raise "`tables` (or `tables_append_only`) is neither defined in the data entry nor the local config file" end command = DDL_DUMP_CMD_TEMPLATE % params Open3.popen3(command) do |stdin, stdout, stderr| stdin.close stdout.set_encoding("utf-8") # mysqldump output must be in UTF-8 create_flydata_ctl_table = mp['initial_sync'] while !stdout.eof? begin mysql_tabledef = Flydata::TableDef::MysqlTableDef.create(stdout) rescue TableDefError => e error_list << e.err_hash next end if mysql_tabledef.nil? # stream had no more create table definition break end flydata_tabledef = mysql_tabledef.to_flydata_tabledef puts Flydata::TableDef::RedshiftTableDef.from_flydata_tabledef(flydata_tabledef, flydata_ctl_table: create_flydata_ctl_table, schema_name: schema_name, ctl_only: opts.ctl_only?) create_flydata_ctl_table = false end errors = "" while !stderr.eof? line = stderr.gets.gsub('mysqldump: ', '') errors << line unless /Warning: Using a password on the command line interface can be insecure./ === line end raise errors unless errors.empty? end unless error_list.empty? $stderr.puts "We have noticed the following error(s):" group_error = error_list.group_by {|d| d[:error]} group_error.each_key do |a| $stderr.puts "The following table(s) have #{a}:" group_error[a].each do |hash| $stderr.puts " - #{hash[:table]}" if hash[:table] end end $stderr.puts "Please fix the above error(s) and try again." end end def sync_mysql_to_redshift(de) dp = flydata.data_port.get sync_fm = # Check client condition if File.exists?(sync_fm.binlog_path) and de['mysql_data_entry_preference']['initial_sync'] raise "Already synchronized. If you want to do initial sync, run 'flydata sync:reset'" end # Copy template if not exists unless Flydata::Preference::DataEntryPreference.conf_exists?(de) end generate_mysqldump(de, sync_fm, opts.dump_file?) do |mysqldump_io, db_bytesize| sync_fm.save_sync_info(de['mysql_data_entry_preference']['initial_sync'], de['mysql_data_entry_preference']['tables']) parse_mysqldump_and_send(mysqldump_io, dp, de, sync_fm, db_bytesize) end wait_for_mysqldump_processed(dp, de, sync_fm) complete end def generate_mysqldump(de, sync_fm, file_dump = true, &block) # validate parameter %w(host username database).each do |k| if de['mysql_data_entry_preference'][k].to_s.empty? raise "'#{k}' is required. Set the value in the conf file " + "-> #{Flydata::Preference::DataEntryPreference.conf_path(de)}" end end # Status is waiting or complete -> skip dump and parse dump_pos_info = sync_fm.load_dump_pos return if dump_pos_info[:status] == STATUS_WAITING || dump_pos_info[:status] == STATUS_COMPLETE # mysqldump file exists -> skip dump dp = flydata.data_port.get fp = sync_fm.dump_file_path if file_dump && File.exists?(fp) && File.size(fp) > 0 puts " -> Skip" return call_block_or_return_io(fp, &block) end tables = de['mysql_data_entry_preference']['tables'] tables ||= '<all tables>' data_servers = de['mysql_data_entry_preference']['data_servers'] ? "\n data servers: #{de['mysql_data_entry_preference']['data_servers']}" : "" confirmation_text = <<-EOM FlyData Sync will start synchonizing the following database tables host: #{de['mysql_data_entry_preference']['host']} port: #{de['mysql_data_entry_preference']['port']} username: #{de['mysql_data_entry_preference']['username']} database: #{de['mysql_data_entry_preference']['database']} tables: #{tables}#{data_servers} EOM confirmation_text << <<-EOM if file_dump dump file: #{fp} Dump file saves contents of your tables temporarily. Make sure you have enough disk space. EOM print confirmation_text if ask_yes_no('Start Sync?') puts "Checking database size(not same as mysqldump data size)..." db_bytesize =['mysql_data_entry_preference']).get_db_bytesize puts " -> #{as_size(db_bytesize)} (#{db_bytesize} byte)" puts "Exporting data from database." puts "This process can take hours depending on data size and load on your database. Please be patient...", de['mysql_data_entry_preference'], fp).check if file_dump Flydata::Parser::Mysql::MysqlDumpGeneratorNoMasterData. new(de['mysql_data_entry_preference']).dump(fp) puts " -> Done" call_block_or_return_io(fp, &block) else Flydata::Parser::Mysql::MysqlDumpGeneratorNoMasterData. new(de['mysql_data_entry_preference']).dump {|io|, db_bytesize)} end else newline puts "You can change the dump file path with 'mysqldump_path' property in the following conf file." puts puts " #{Flydata::Preference::DataEntryPreference.conf_path(de)}" puts return nil end end def call_block_or_return_io(fp, &block) if block f_io = open_file_io(fp) begin return nil ensure f_io.close rescue nil end end return open_file_io(fp) end def open_file_io(file_path), 'r', encoding: "utf-8") end # Checkpoint # -- CHANGE MASTER TO MASTER_LOG_FILE='mysql-bin.000215', MASTER_LOG_POS=120; # <- checkpoint(after binlog) #... #CREATE TABLE `accounts` ( #... #) ENGINE=InnoDB AUTO_INCREMENT=71 DEFAULT CHARSET=utf8; # <- checkpoint(after create table) #INSERT INTO `accounts` values (x,x,x),(y,y,y),....(); #INSERT INTO `accounts` values (x,x,x),(y,y,y),....(); #INSERT INTO `accounts` values (x,x,x),(y,y,y),....(); # <- checkpoint(when buffered data is sent to server) #INSERT INTO `accounts` values (x,x,x),(y,y,y),....(); #INSERT INTO `accounts` values (x,x,x),(y,y,y),....(); # #... #UNLOCK TABLES; # <- checkpoint #... #CREATE TABLE ... def parse_mysqldump_and_send(mysqldump_io, dp, de, sync_fm, db_bytesize = nil) # Prepare forwarder de_tag_name = de["tag_name#{env_suffix}"] server_port = dp['server_port'] servers = if de['mysql_data_entry_preference']['data_servers'] de['mysql_data_entry_preference']['data_servers'].split(',') else dp["servers#{env_suffix}"].collect{|s| "#{s}:#{server_port}"} end forwarder_type = de['mysql_data_entry_preference']['forwarder'] || (dp['ssl_enabled'] ? 'sslforwarder' : 'tcpforwarder') forwarder = Flydata::Output::ForwarderFactory.create(forwarder_type, de_tag_name, servers) # Load dump.pos file for resume dump_pos_info = sync_fm.load_dump_pos option = dump_pos_info || {} if option[:table_name] && option[:last_pos].to_i != -1 puts "Resuming... Last processed table: #{option[:table_name]}" else #If its a new sync, ensure server side resources are clean cleanup_sync_server(de, de['mysql_data_entry_preference']['tables'].split(',')) unless opts.skip_cleanup? end puts "Sending data to FlyData Server..." bench_start_time = # Start parsing dump file tmp_num_inserted_record = 0 dump_fp = sync_fm.dump_file_path dump_file_size = File.exists?(dump_fp) ? File.size(dump_fp) : 1 binlog_pos = mysqldump_io, # create table { |mysql_table| tmp_num_inserted_record = 0 # dump mysql_table for resume #TODO: make it option sync_fm.save_mysql_table_marshal_dump(mysql_table) }, # insert record { |mysql_table, values_set| mysql_table_name = mysql_table.table_name records = values_set.collect do |values| json = generate_json(mysql_table, values) {table_name: mysql_table_name, log: json} end ret = forwarder.emit(records) tmp_num_inserted_record += 1 ret }, # checkpoint { |mysql_table, last_pos, bytesize, binlog_pos, state, substate| # flush if buffer records exist if tmp_num_inserted_record > 0 && forwarder.buffer_record_count > 0 forwarder.flush # send buffer data to the server before checkpoint end # show the current progress if last_pos.to_i == -1 # stream dump puts " -> #{as_size(bytesize)} (#{bytesize} byte) completed..." else puts " -> #{(last_pos.to_f/dump_file_size * 100).round(1)}% (#{last_pos}/#{dump_file_size}) completed..." end # save check point table_name = mysql_table.nil? ? '' : mysql_table.table_name sync_fm.save_dump_pos(STATUS_PARSING, table_name, last_pos, binlog_pos, state, substate) } ) forwarder.close puts " -> Done" sync_fm.save_dump_pos(STATUS_WAITING, '', dump_file_size, binlog_pos) if ENV['FLYDATA_BENCHMARK'] bench_end_time = elapsed_time = bench_end_time.to_i - bench_start_time.to_i puts "Elapsed:#{elapsed_time}sec start:#{bench_start_time} end:#{bench_end_time}" end end def wait_for_mysqldump_processed(dp, de, sync_fm) return if ENV['FLYDATA_BENCHMARK'] # Status is not waiting -> skip waiting dump_pos_info = sync_fm.load_dump_pos return unless dump_pos_info[:status] == STATUS_WAITING binlog_pos = dump_pos_info[:binlog_pos] wait_for_server_data_processing tables = de['mysql_data_entry_preference']['tables'].split(',').join(' ') sync_fm.save_table_binlog_pos(tables, binlog_pos) sync_fm.save_dump_pos(STATUS_COMPLETE, '', -1, binlog_pos) end ALL_DONE_MESSAGE_TEMPLATE = <<-EOM Congratulations! FlyData has started synchronizing your database tables. What's next? - Check data on Redshift (%s) - Check your FlyData usage on the FlyData Dashboard (%s) - To manage the FlyData Agent, use the 'flydata' command (type 'flydata' for help) - If you encounter an issue, please check our documentation ( or contact our customer support team ( Thank you for using FlyData! EOM def complete de = load_sync_info(retrieve_data_entry) sync_fm = info = sync_fm.load_dump_pos if info[:status] == STATUS_COMPLETE puts "Starting FlyData Agent..." if de['mysql_data_entry_preference']['initial_sync'] sync_fm.save_binlog(info[:binlog_pos]) end sync_fm.move_table_binlog_files(de['mysql_data_entry_preference']['tables'].split(',')) sync_fm.reset_table_position_files(de['mysql_data_entry_preference']['tables'].split(',')) sync_fm.backup_dump_dir true) puts " -> Done" dashboard_url = "#{flydata.flydata_api_host}/dashboard" redshift_console_url = "#{flydata.flydata_api_host}/redshift_clusters/query/new" last_message = ALL_DONE_MESSAGE_TEMPLATE % [redshift_console_url, dashboard_url] puts last_message else raise "Initial sync status is not complete. Try running 'flydata sync'." end end def generate_json(mysql_table, values) h = {} mysql_table.columns.each_key.with_index do |k, i| h[k] = values[i] end h.to_json end def override_tables(de, tables) de['mysql_data_entry_preference']['initial_sync'] = tables.empty? if ! de['mysql_data_entry_preference']['initial_sync'] de['mysql_data_entry_preference']['tables'] = tables.join(',') end de end def load_sync_info(de) sync_fm = mp = de['mysql_data_entry_preference'] unless (rs = sync_fm.load_sync_info).nil? mp['initial_sync'] = rs[:initial_sync] mp['tables'] = rs[:tables] end de end def flush_buffer_and_stop sender = sender.flush_client_buffer wait_for_server_data_processing sender.stop(quiet: true) end end end end