require 'fiber' require 'msgpack' require 'open3' require 'mysql2' require 'rest_client' require 'sys/filesystem' require 'flydata/command/base' require 'flydata/command/conf' require 'flydata/command/sender' require 'flydata/compatibility_check' require 'flydata/errors' require 'flydata/helpers' require 'flydata/output/forwarder' require 'flydata/parser/mysql/dump_parser' require 'flydata/preference/data_entry_preference' require 'flydata/sync_file_manager' require 'flydata/util/mysql_util' require 'flydata-core/table_def' #require 'ruby-prof' module Flydata module Command class Sync < Base include Helpers INSERT_PROGRESS_INTERVAL = 1000 SERVER_DATA_PROCESSING_TIMEOUT = 600 # seconds # for dump.pos file STATUS_PARSING = 'PARSING' STATUS_WAITING = 'WAITING' STATUS_COMPLETE = 'COMPLETE' attr_reader :full_initial_sync, :full_tables, :new_tables, :ddl_tables, :input_tables class SyncDataEntryError < StandardError end # Command: flydata sync # - Arguments def self.slop Slop.new do on 'c', 'skip-cleanup', 'Skip server cleanup' on 'f', 'skip-flush', 'Skip server flush' on 'y', 'yes', 'Skip command prompt assuming yes to all questions. Use this for batch operation.' on 'd', 'dump-file', 'Dump mysqldump into a file. Use this for debugging after making sure the free space.' # dummy for compatibility on 's', 'dump-stream', 'Dump mysqldump stream instead of saving dump file. It might cause timeout error if db size is larger than 10GB.' on 'n', 'no-flydata-start', 'Don\'t start the flydata agent after initial sync.' #TODO : This option is temp! Should remove soon. on 'ff', 'Skip checking query queue and flush' end end # Command: flydata sync # - Entry method def run(*tables) # Process check sender = Flydata::Command::Sender.new if (sender.process_exist?) if tables.empty? # full sync log_warn_stderr("FlyData Agent is already running. If you'd like to restart FlyData Sync from scratch, run 'flydata sync:reset' first.") else # per-table sync log_warn_stderr("Flydata Agent is already running. If you'd like to Sync the table(s), run 'flydata sync:flush' first.") end exit 1 end # Setup instance variables set_current_tables(tables) # Start initial sync with check handle_mysql_sync # Start continuous sync by starting fluentd process unless opts.no_flydata_start? log_info_stdout("Starting FlyData Agent...") Flydata::Command::Sender.new.start(quiet: true) log_info_stdout(" -> Done") end # Show message dashboard_url = "#{flydata.flydata_api_host}/dashboard" redshift_console_url = "#{flydata.flydata_api_host}/redshift_clusters/query/new" last_message = ALL_DONE_MESSAGE_TEMPLATE % [redshift_console_url, dashboard_url] log_info_stdout(last_message) end # Public method # - Called from Sender#start/restart def try_mysql_sync # Setup instance variables set_current_tables # Start initial sync handle_mysql_sync rescue SyncDataEntryError return end # Command: flydata sync:flush # - Arguments def self.slop_flush Slop.new do on 'f', 'skip-flush', 'Skip server flush' on 'y', 'yes', 'Skip command prompt assuming yes to all questions. Use this for batch operation.' end end # Command: flydata sync:flush # - Entry method def flush(*tables) begin flush_buffer_and_stop(tables) rescue ServerDataProcessingTimeout => e ee = ServerDataProcessingTimeout.new("Delayed Data Processing") ee.description = < e ee = ServerDataProcessingTimeout.new("Delayed Data Processing") ee.description = < #{binlog_path}") log_info_stdout("Run 'flydata start' to start continuous sync.") end # Command: flydata sync:generate_table_ddl # - Arguments def self.slop_generate_table_ddl Slop.new do on 'c', 'ctl-only', 'Only generate FlyData Control definitions' on 'y', 'yes', 'Skip command prompt assuming yes to all questions. Use this for batch operation.' on 's', 'skip-primary-key-check', 'Skip primary key check when generating DDL' on 'all-tables', 'Generate all table schema' end end # Command: flydata sync:generate_table_ddl # - Entry method def generate_table_ddl(*tables) # Compatibility check de = data_entry dp = flydata.data_port.get Flydata::MysqlCompatibilityCheck.new(dp, de['mysql_data_entry_preference']).check # Set instance variables set_current_tables(tables) do_generate_table_ddl(de) end private # Initial sync def handle_mysql_sync de = data_entry unless @new_tables.empty? say("We've noticed that these tables have not been synced yet: #{@new_tables.join(", ")}") unless @ddl_tables.empty? say(" WARNING: We've noticed that at least one of these tables have not had their DDL generated yet.") say(" We recommend you run our 'flydata sync:generate_table_ddl > create_table.sql'") say(" to generate SQL to run on Redshift to create the correct tables") say(" Without running this sql on your Redshift cluster, there may be issues with your data") end if ask_yes_no("Do you want to run initial sync on all of these tables now?") initial_sync(de) else #If generate_table_ddl has not been run for these tables, warn user unless @ddl_tables.empty? say(" You can generate DDL SQL for your new tables by running this command") say(" $> flydata sync:generate_table_ddl > create_table.sql") end puts "Without syncing these tables, we cannot start the flydata process" raise "Please try again" end end end def initial_sync(de) # Load sync information from file load_sync_info(de) validate_initial_sync_status begin flush_buffer_and_stop(target_tables_for_api) unless @full_initial_sync sync_mysql_to_redshift(de) rescue ServerDataProcessingTimeout => e ee = ServerDataProcessingTimeout.new("Delayed Data Processing") ee.description = < #{Flydata::Preference::DataEntryPreference.conf_path(de)}" end end # Status is waiting or complete -> skip dump and parse dump_pos_info = sync_fm.load_dump_pos return if dump_pos_info[:status] == STATUS_WAITING || dump_pos_info[:status] == STATUS_COMPLETE # mysqldump file exists -> skip dump dp = flydata.data_port.get fp = sync_fm.dump_file_path if file_dump && File.exists?(fp) && File.size(fp) > 0 log_info_stdout(" -> Skip") return call_block_or_return_io(fp, &block) end tables = target_tables tables ||= '' data_servers = de['mysql_data_entry_preference']['data_servers'] ? "\n data servers: #{de['mysql_data_entry_preference']['data_servers']}" : "" confirmation_text = <<-EOM FlyData Sync will start synchronizing the following database tables host: #{de['mysql_data_entry_preference']['host']} port: #{de['mysql_data_entry_preference']['port']} username: #{de['mysql_data_entry_preference']['username']} database: #{de['mysql_data_entry_preference']['database']} tables: #{tables.join(", ")}#{data_servers} EOM confirmation_text << <<-EOM if file_dump dump file: #{fp} Dump file saves contents of your tables temporarily. Make sure you have enough disk space. EOM print confirmation_text log_info confirmation_text.strip if ask_yes_no('Start Sync?') log_info_stdout("Checking MySQL server connection and configuration...") Flydata::MysqlCompatibilityCheck.new(dp, de['mysql_data_entry_preference'], dump_dir: fp, backup_dir: sync_fm.backup_dir).check log_info_stdout("Checking database size...") db_bytesize = Flydata::Parser::Mysql::DatabaseSizeCheck.new(de['mysql_data_entry_preference'].merge('tables' => target_tables)).get_db_bytesize log_info_stdout(" -> #{as_size(db_bytesize)} (#{db_bytesize} byte)") if file_dump # check free disk space free_disk_bytesize = free_disk_space(File.dirname(fp)) if (free_disk_bytesize - db_bytesize) < (1024 * 1024 * 1024) # 1GB log_warn_stderr("!!WARNING There may not be enough disk space for a DB dump. We recommend 1GB free disk space after the dump. free disk space:#{as_size(free_disk_bytesize)}(#{free_disk_bytesize} byte) /" + " db size:#{as_size(db_bytesize)}(#{db_bytesize} byte)") unless ask_yes_no('Do you want to continue?') log_warn_stderr("To change the dump file directory, delete '#' and modify the path of 'mysqldump_dir:' in '#{Preference::DataEntryPreference.conf_path(de)}'") exit 1 end end end log_info_stdout("Exporting data from database.") log_info_stdout("This process can take hours depending on data size and load on your database. Please be patient...") if file_dump Flydata::Parser::Mysql::MysqlDumpGeneratorNoMasterData. new(de['mysql_data_entry_preference'].merge('tables' => target_tables)).dump(fp) log_info_stdout(" -> Done") call_block_or_return_io(fp, &block) else Flydata::Parser::Mysql::MysqlDumpGeneratorNoMasterData. new(de['mysql_data_entry_preference'].merge('tables' => target_tables)).dump {|io| block.call(io, db_bytesize)} end else exit 1 end end # return available disk size(byte) def free_disk_space(dump_path) stat = Sys::Filesystem.stat(dump_path) stat.block_size * stat.blocks_available end def call_block_or_return_io(fp, &block) if block f_io = open_file_io(fp) begin block.call(f_io) return nil ensure f_io.close rescue nil end end return open_file_io(fp) end def open_file_io(file_path) File.open(file_path, 'r', encoding: "utf-8") end # Checkpoint # -- CHANGE MASTER TO MASTER_LOG_FILE='mysql-bin.000215', MASTER_LOG_POS=120; # <- checkpoint(after binlog) #... #CREATE TABLE `accounts` ( #... #) ENGINE=InnoDB AUTO_INCREMENT=71 DEFAULT CHARSET=utf8; # <- checkpoint(after create table) #INSERT INTO `accounts` values (x,x,x),(y,y,y),....(); #INSERT INTO `accounts` values (x,x,x),(y,y,y),....(); #INSERT INTO `accounts` values (x,x,x),(y,y,y),....(); # <- checkpoint(when buffered data is sent to server) #INSERT INTO `accounts` values (x,x,x),(y,y,y),....(); #INSERT INTO `accounts` values (x,x,x),(y,y,y),....(); # #... #UNLOCK TABLES; # <- checkpoint #... #CREATE TABLE ... def parse_mysqldump_and_send(mysqldump_io, dp, de, sync_fm, db_bytesize = nil) # Prepare forwarder de_tag_name = de["tag_name#{env_suffix}"] server_port = dp['server_port'] servers = if de['mysql_data_entry_preference']['data_servers'] de['mysql_data_entry_preference']['data_servers'].split(',') else dp["servers#{env_suffix}"].collect{|s| "#{s}:#{server_port}"} end forwarder_type = de['mysql_data_entry_preference']['forwarder'] || (dp['ssl_enabled'] ? 'sslforwarder' : 'tcpforwarder') forwarder = Flydata::Output::ForwarderFactory.create(forwarder_type, de_tag_name, servers) # Load dump.pos file for resume dump_pos_info = sync_fm.load_dump_pos option = dump_pos_info || {} if option[:table_name] && option[:last_pos].to_i != -1 log_info_stdout("Resuming... Last processed table: #{option[:table_name]}") else #If its a new sync, ensure server side resources are clean cleanup_sync_server(de, target_tables_for_api) unless opts.skip_cleanup? end log_info_stdout("Sending data to FlyData Server...") bench_start_time = Time.now # Start parsing dump file tmp_num_inserted_record = 0 dump_fp = sync_fm.dump_file_path dump_file_size = File.exists?(dump_fp) ? File.size(dump_fp) : 1 binlog_pos = Flydata::Parser::Mysql::MysqlDumpParser.new(option).parse( mysqldump_io, # create table Proc.new { |mysql_table| tmp_num_inserted_record = 0 # dump mysql_table for resume #TODO: make it option sync_fm.save_mysql_table_marshal_dump(mysql_table) }, # insert record Proc.new { |mysql_table, values_set| mysql_table_name = mysql_table.table_name records = values_set.collect do |values| json = generate_json(mysql_table, values) {table_name: mysql_table_name, log: json} end ret = forwarder.emit(records) tmp_num_inserted_record += 1 ret }, # checkpoint Proc.new { |mysql_table, last_pos, bytesize, binlog_pos, state, substate| # flush if buffer records exist if tmp_num_inserted_record > 0 && forwarder.buffer_record_count > 0 forwarder.flush # send buffer data to the server before checkpoint end # show the current progress if last_pos.to_i == -1 # stream dump log_info_stdout(" -> #{as_size(bytesize)} (#{bytesize} byte) completed...") else log_info_stdout(" -> #{(last_pos.to_f/dump_file_size * 100).round(1)}% (#{last_pos}/#{dump_file_size}) completed...") end # save check point table_name = mysql_table.nil? ? '' : mysql_table.table_name sync_fm.save_dump_pos(STATUS_PARSING, table_name, last_pos, binlog_pos, state, substate) } ) forwarder.close log_info_stdout(" -> Done") sync_fm.save_dump_pos(STATUS_WAITING, '', dump_file_size, binlog_pos) if ENV['FLYDATA_BENCHMARK'] bench_end_time = Time.now elapsed_time = bench_end_time.to_i - bench_start_time.to_i log_info_stdout("Elapsed:#{elapsed_time}sec start:#{bench_start_time} end:#{bench_end_time}") end end def wait_for_mysqldump_processed(dp, de, sync_fm) return if ENV['FLYDATA_BENCHMARK'] # Status is not waiting -> skip waiting dump_pos_info = sync_fm.load_dump_pos return unless dump_pos_info[:status] == STATUS_WAITING binlog_pos = dump_pos_info[:binlog_pos] wait_for_server_data_processing( timeout: SERVER_DATA_PROCESSING_TIMEOUT, tables: target_tables_for_api) sync_fm.save_table_binlog_pos(target_tables, binlog_pos) sync_fm.save_dump_pos(STATUS_COMPLETE, '', -1, binlog_pos) end # option: timeout, tables def wait_for_server_data_processing(option = {}) timeout = option[:timeout] || 0 tables = option[:tables] || [] state = :PROCESS start_time = Time.now log_info_stdout("Uploading data to Redshift...") sleep 10 status = nil prev_message =nil while (status = check_server_status(tables)) if state == :PROCESS && status['state'] == 'uploading' log_info_stdout(" -> Done") state = :UPLOAD log_info_stdout("Finishing data upload...") end #TODO This is based on a temporary option if state == :UPLOAD && opts.ff? log_info_stdout("Skip checking for pending uploads") break end if status['message'] != prev_message # making some progress. Reset timer start_time = Time.now end prev_message = status['message'] if timeout > 0 && Time.now - start_time > timeout raise ServerDataProcessingTimeout.new end print_progress(status) sleep 10 end if (state == :PROCESS) # :UPLOAD state was skipped due to no data log_info_stdout(" -> Done") log_info_stdout("Finishing data upload...") end log_info_stdout(" -> Done") end def check_server_status(tables = []) de = data_entry retry_on(RestClient::Exception) do status = do_check_server_status(de, tables) if status['complete'] nil else status end end end def do_check_server_status(de, tables = []) flydata.data_entry.buffer_stat(de['id'], mode: env_mode, tables: tables) end def print_progress(buffer_stat) message = buffer_stat['message'] log_info_stdout(message) unless message.nil? || message.empty? end def complete(de) sync_fm = create_sync_file_manager(de) info = sync_fm.load_dump_pos if info[:status] == STATUS_COMPLETE if @full_initial_sync sync_fm.save_binlog(info[:binlog_pos]) end sync_fm.install_table_binlog_files(target_tables) sync_fm.reset_table_position_files(target_tables) sync_fm.delete_dump_file sync_fm.backup_dump_dir else raise "Initial sync status is not complete. Try running 'flydata sync'." end sync_fm.close end def generate_json(mysql_table, values) h = {} mysql_table.columns.each_key.with_index do |k, i| h[k] = values[i] end h.to_json end # Sync reset def wait_for_server_buffer(option = {}) timeout = option[:timeout] || 0 tables = option[:tables] || [] start_time = Time.now log_info_stdout("Waiting for the server buffer to get empty.") prev_message =nil while (status = check_server_status(tables)) && (status['state'].nil? || status['state'] == 'processing') prev_message = status['message'] if timeout > 0 && Time.now - start_time > timeout raise ServerDataProcessingTimeout.new end print_progress(status) sleep 10 end end def cleanup_sync_server(de, tables = []) print "Cleaning the server." log_info("Cleaning the server.") worker = Thread.new do begin flydata.data_entry.cleanup_sync(de['id'], tables) rescue RestClient::RequestTimeout, RestClient::GatewayTimeout # server is taking time to cleanup. Try again retry end end until worker.join(5) print "." end puts log_info_stdout("Done.") end # Generate table ddl def do_generate_table_ddl(de) if `which mysqldump`.empty? raise "mysqldump is not installed. mysqldump is required to run the command" end error_list = [] schema_name = (de['schema_name'] || nil) mp = de['mysql_data_entry_preference'] tables = opts.all_tables? ? @full_tables : (@input_tables.empty? ? @new_tables : @input_tables) raise "There are no valid unsynced tables, if you want to just get ddl for all tables, please run \`flydata sync:generate_table_ddl --all-tables\`" if tables.empty? %w(host username database).each do |conf_name| raise "MySQL `#{conf_name}` is neither defined in the data entry nor the local config file" if mp[conf_name].to_s.empty? end if tables.empty? raise "`tables` (or `tables_append_only`) is neither defined in the data entry nor the local config file" end command = Util::MysqlUtil.generate_mysql_ddl_dump_cmd(mp.merge(tables: tables)) Open3.popen3(command) do |stdin, stdout, stderr| stdin.close stdout.set_encoding("utf-8") # mysqldump output must be in UTF-8 create_flydata_ctl_table = @full_initial_sync while !stdout.eof? begin mysql_tabledef = FlydataCore::TableDef::MysqlTableDef.create(stdout, skip_primary_key_check: opts.skip_primary_key_check?) if mysql_tabledef.nil? # stream had no more create table definition break end flydata_tabledef = mysql_tabledef.to_flydata_tabledef puts FlydataCore::TableDef::RedshiftTableDef.from_flydata_tabledef(flydata_tabledef, flydata_ctl_table: create_flydata_ctl_table, schema_name: schema_name, ctl_only: opts.ctl_only?) rescue FlydataCore::TableDefError=> e error_list << e.err_hash next end create_flydata_ctl_table = false end errors = "" while !stderr.eof? line = stderr.gets.gsub('mysqldump: ', '') errors << line unless /Warning: Using a password on the command line interface can be insecure./ === line end raise errors unless errors.empty? end unless error_list.empty? log_error_stderr("We have noticed the following error(s):") group_error = error_list.group_by {|d| d[:error]} group_error.each_key do |a| log_error_stderr("The following table(s) have #{a}:") group_error[a].each do |hash| log_error_stderr(" - #{hash[:table]}") if hash[:table] end end log_error_stderr("Please fix the above error(s) to try to sync those table(s) or contact us for further help.") end tables_without_error = tables - error_list.inject([]){|arr, err| arr << err[:table] if err[:table]} sync_fm = create_sync_file_manager(de) sync_fm.mark_generated_tables(tables_without_error) sync_fm.close end ALL_DONE_MESSAGE_TEMPLATE = <<-EOM Congratulations! FlyData has started synchronizing your database tables. What's next? - Check data on Redshift (%s) - Check your FlyData usage on the FlyData Dashboard (%s) - To manage the FlyData Agent, use the 'flydata' command (type 'flydata' for help) - If you encounter an issue, please check our documentation (https://www.flydata.com/docs/) or contact our customer support team (support@flydata.com) Thank you for using FlyData! EOM # Sync flush def flush_buffer_and_stop(tables = []) sender = Flydata::Command::Sender.new sender.flush_client_buffer if opts.skip_flush? log_info_stdout("Skip waiting for server data processing.") else wait_for_server_data_processing( timeout: SERVER_DATA_PROCESSING_TIMEOUT, tables: tables) end sender.stop(quiet: true) end # Utility methods def set_current_tables(input_tables = nil) de = data_entry sync_fm = create_sync_file_manager(de) @input_tables = input_tables || [] @full_tables = de['mysql_data_entry_preference']['tables'] @new_tables = sync_fm.get_new_table_list(@full_tables, "pos") @ddl_tables = sync_fm.get_new_table_list(@full_tables, "generated_ddl") @full_initial_sync = (@new_tables == @full_tables) sync_fm.close verify_input_tables(@input_tables, @full_tables) end def validate_initial_sync_status sync_fm = create_sync_file_manager dump_pos_info = sync_fm.load_dump_pos fp = sync_fm.dump_file_path sync_fm.close # status is parsing but dumpfile doesn't exist due to streaming -> raise error if dump_pos_info[:status] == STATUS_PARSING && !File.exists?(fp) raise "FlyData Sync was interrupted with invalid state. Run 'flydata sync:reset#{@input_tables.join(',')}' first." end end def load_sync_info(de) # for debug raise "!AssertionError. set_current_tables needs to be called in advance" if @full_tables.nil? sync_fm = create_sync_file_manager(de) if (rs = sync_fm.load_sync_info) @full_initial_sync = rs[:initial_sync] @input_tables = rs[:tables] end sync_fm.close de end def target_tables if @full_initial_sync @full_tables elsif !@input_tables.empty? @input_tables else @new_tables end end # return empty array if full sync def target_tables_for_api if @full_initial_sync [] elsif !@input_tables.empty? @input_tables else @new_tables end end def data_entry @de ||= retrieve_sync_data_entry end def retrieve_sync_data_entry de = retrieve_data_entries.first unless de raise "There are no data entries." unless de case de['type'] when 'RedshiftMysqlDataEntry' mp = de['mysql_data_entry_preference'] if mp['tables_append_only'] mp['tables'] = (mp['tables'].split(",") + mp['tables_append_only'].split(",")).uniq else mp['tables'] = mp['tables'].split(",").uniq end else raise SyncDataEntryError, "No supported data entry. Only mysql-redshift sync is supported." end de end def create_sync_file_manager(de = data_entry) SyncFileManager.new(de) end def verify_input_tables(input_tables, all_tables) return unless input_tables inval_table = [] input_tables.each do |tab| inval_table << tab unless all_tables.include?(tab) end raise "These tables are not registered tables: #{inval_table.join(", ")}" unless inval_table.empty? end end end end