require 'msgpack' require 'mysql2' require 'rest_client' require 'sys/filesystem' require 'flydata/agent' require 'flydata/command/base' require 'flydata/command/conf' require 'flydata/command/sender' require 'flydata/compatibility_check' require 'flydata/errors' require 'flydata/helpers' require 'flydata/output/forwarder' require 'flydata/parser/mysql/dump_parser' require 'flydata/preference/data_entry_preference' require 'flydata/sync_file_manager' require 'flydata-core/table_def' require 'flydata-core/mysql/binlog_pos' require 'flydata/mysql/table_ddl' require 'flydata-core/mysql/command_generator' #require 'ruby-prof' module Flydata module Command class Sync < Base include Helpers INSERT_PROGRESS_INTERVAL = 1000 SERVER_DATA_PROCESSING_TIMEOUT = 3600 # seconds # for dump.pos file STATUS_PARSING = 'PARSING' STATUS_PARSED = 'WAITING' # the value is different from the constant name on purpose for backward compatibility. STATUS_COMPLETE = 'COMPLETE' attr_reader :full_initial_sync, # true if full initial sync :full_tables, # all tables (same as data_entry['mysql_data_entry_preference']['tables']) :new_tables, # tables which is not finihed initial-sync(pos file doesn't exist) :ddl_tables, # tables generated ddl :input_tables # tables which user put #target_tables # target tables for current command(sync/reset/generate_table_ddl) #target_tables_for_api # target tables for calling api(tables parameter needs to be empty for full_initial_sync) class SyncDataEntryError < StandardError end # Command: flydata sync # - Arguments def self.slop Slop.new do on 'c', 'skip-cleanup', 'Skip server cleanup' on 'f', 'skip-flush', 'Skip server flush' on 'y', 'yes', 'Skip command prompt assuming yes to all questions. Use this for batch operation.' on 'd', 'dump-file', 'Dump mysqldump into a file. Use this for debugging after making sure the free space.' # dummy for compatibility on 's', 'dump-stream', 'Dump mysqldump stream instead of saving dump file. It might cause timeout error if db size is larger than 10GB.' on 'n', 'no-flydata-start', 'Don\'t start the flydata agent after initial sync.' #TODO : This option is temp! Should remove soon. on 'ff', 'Skip checking query queue and flush' end end # Command: flydata sync # - Entry method def run(*tables) # Process check sender = Flydata::Command::Sender.new if (sender.process_exist?) if tables.empty? # full sync log_warn_stderr("FlyData Agent is already running. If you'd like to restart FlyData Sync from scratch, run 'flydata sync:reset' first.") else # per-table sync log_warn_stderr("Flydata Agent is already running. If you'd like to Sync the table(s), run 'flydata sync:flush' first.") end exit 1 end fluentd_started = false quiet_option = false start_fluentd = Proc.new do |binlog_pos| # Start continuous sync by starting fluentd process unless opts.no_flydata_start? log_info_stdout("Starting FlyData Agent...") unless quiet_option Flydata::Command::Sender.new.start(quiet: true) log_info_stdout(" -> Done") unless quiet_option end fluentd_started = true end quiet_option = true # Start initial sync with check handle_mysql_sync(tables, binlog_ready_callback: start_fluentd) quiet_option = false start_fluentd.call unless fluentd_started # Show message dashboard_url = "#{flydata.flydata_api_host}/dashboard" redshift_console_url = "#{flydata.flydata_api_host}/redshift_clusters/query/new" last_message = ALL_DONE_MESSAGE_TEMPLATE % [redshift_console_url, dashboard_url] log_info_stdout(last_message) end run_exclusive :run # Public method # - Called from Sender#start/restart def try_mysql_sync(options) # Start initial sync handle_mysql_sync(nil, options) rescue SyncDataEntryError return end # Command: flydata sync:flush # - Arguments def self.slop_flush Slop.new do on 'f', 'skip-flush', 'Skip server flush' on 'y', 'yes', 'Skip command prompt assuming yes to all questions. Use this for batch operation.' end end # Command: flydata sync:flush # - Entry method def flush(*tables) begin flush_buffer_and_stop(tables, skip_flush: opts.skip_flush?) rescue ServerDataProcessingTimeout => e ee = ServerDataProcessingTimeout.new("Delayed Data Processing") ee.description = < e ee = ServerDataProcessingTimeout.new("Delayed Data Processing") ee.description = < #{binlog_path}") log_info_stdout("Run 'flydata start' to start continuous sync.") end run_exclusive :skip # Command: flydata sync:generate_table_ddl # - Arguments def self.slop_generate_table_ddl Slop.new do on 'c', 'ctl-only', 'Only generate FlyData Control definitions' on 'y', 'yes', 'Skip command prompt assuming yes to all questions. Use this for batch operation.' on 's', 'skip-primary-key-check', 'Skip primary key check when generating DDL' on 'all-tables', 'Generate all table schema' end end # Command: flydata sync:generate_table_ddl # - Entry method def generate_table_ddl(*tables) # Compatibility check de = data_entry dp = flydata.data_port.get Flydata::MysqlCompatibilityCheck.new(dp, de['mysql_data_entry_preference']).check # Set instance variables set_current_tables(tables, include_all_tables: true) do_generate_table_ddl(de) end run_exclusive :generate_table_ddl # Command: flydata sync:fix_binlogpos # - Arguments def self.slop_fix_binlogpos Slop.new do on 'f', 'force', 'update sent binlog position file forcibly' end end # Command: flydata sync:fix_binlogpos # Set binlog path # - Entry method def fix_binlogpos de = data_entry sync_fm = create_sync_file_manager(de) if File.exists?(sync_fm.sent_binlog_path) && !opts.force? log_info_stdout("Skip creating sent binlogpos because sent position file is exist already. (#{sync_fm.sent_binlog_path})") return end if Flydata::Command::Sender.new.process_exist? log_warn_stderr("flydata is running. flydata process needs to be stopped with 'flydata stop'.") return end binlog_info = sync_fm.load_binlog if binlog_info.nil? log_info_stdout("Skip creating sent binlogpos because binlog position file is empty or invalid. (#{sync_fm.sent_binlog_path})") return end say("Updating binlog position files...") log_info("Updating binlog position files... Original binlog_info:#{binlog_info}") # Update binlog.sent.pos file # -1 is because the position in binlog.pos is the next event's position. # on the other hand the position in sent position indicates already processed. binlog_info[:pos] -= 1 log_info("Updating sent position file. #{binlog_info} -> #{sync_fm.sent_binlog_path}") sync_fm.save_sent_binlog(binlog_info) # Update binlog.pos file to start from head of the current binlog file new_binlog_info = binlog_info.dup.tap{|h| h[:pos] = 4} # 4 is the first position of binlog file. log_info("Updating original position file. #{new_binlog_info} -> #{sync_fm.binlog_path}") sync_fm.save_binlog(new_binlog_info) log_info_stdout("Done!") end run_exclusive :fix_binlogpos def repair de = data_entry set_current_tables # Stop agent. Check sync and make sure the state is :STUCK_AT_UPLOAD # Get table status for the tables. status, pos_mismatch_tables, gap_tables, table_status_hash = _check(stop_agent:true) if status.include? :STUCK_AT_PROCESS e = AgentError.new("Data is stuck while processing") e.description = < binlog_pos if oldest_binlog && binlog_pos < oldest_binlog unrepairable_tables << table else sent_binlog_pos = binlog_pos end end end end end if oldest_binlog && sent_binlog_pos < oldest_binlog e = AgentError.new("Repair failed due to expired binlog") e.description = < #{pos}" end end log_info_stdout "Fixing the master position files..." save_master_binlog_positions(master_binlog_pos, sent_binlog_pos) # Remove the lock file if exists. File.delete(FLYDATA_LOCK) if File.exists?(FLYDATA_LOCK) log_info_stdout "Repair is done. Start Agent with `flydata start` command." end run_exclusive :repair def check(options = {}) status, pos_mismatch_tables, gap_tables = _check(options) if status.include? :OK message = "\nNo errors are found. Sync is clean.\n" else message = "\nFollowing errors are found.\n" if status.include? :STUCK_AT_PROCESS message += " - Data is stuck while processing\n" end if status.include? :STUCK_AT_UPLOAD message += " - Data is stuck while uploading\n" end if status.include? :ABNORMAL_SHUTDOWN message += " - Agent was not shut down correctly\n" end if gap_tables message += " - Sync data is missing for the following table(s)\n" gap_tables.each do |bt| message += " table:#{bt[:table]}\n" end message += "\n" end if pos_mismatch_tables message += " - Incorrect table position(s)\n" pos_mismatch_tables.each do |bt| message += " table:#{bt[:table]}, agent position:#{bt[:agent_seq] ? bt[:agent_seq] : '(missing)'}, server position:#{bt[:server_seq]}\n" end message += "\n" end end log_info_stdout message end run_exclusive :check def _check(options = {}) options[:stop_agent] ||= false set_current_tables sender = Flydata::Command::Sender.new start_process = !options[:stop_agent] && sender.process_exist? pos_mismatch_tables = nil gap_tables = nil data_stuck_at = nil abnormal_shutdown = false begin begin flush_buffer_and_stop(@full_tables, force: false, timeout: 55) rescue ServerDataProcessingTimeout => e data_stuck_at = e.state end # Agent is stopped but locked. There was an abnormal shutdown. abnormal_shutdown = sender.agent_locked? table_status_hash = get_table_status(@full_tables) pos_mismatch_tables = check_position_files(table_status_hash) gap_tables = check_gaps(table_status_hash) ensure Flydata::Command::Sender.new.start(quiet: true) if start_process end status = [] if data_stuck_at == :PROCESS status << :STUCK_AT_PROCESS end if data_stuck_at == :UPLOAD status << :STUCK_AT_UPLOAD end if gap_tables status << :TABLE_GAPS end if pos_mismatch_tables status << :TABLE_POS_MISMATCH end if abnormal_shutdown status << :ABNORMAL_SHUTDOWN end if status.empty? status << :OK end [status, pos_mismatch_tables, gap_tables, table_status_hash] end private # Initial sync def handle_mysql_sync(tables = nil, options = {}) de = data_entry # Setup instance variables sync_resumed = set_current_tables(tables, resume: true) if sync_resumed # skip confirmation prompts and resume sync right away. # #initial_sync knows where to resume from. log_info_stdout("Resuming the initial sync...") initial_sync(de, options.merge(sync_resumed: true)) elsif !@unsynced_tables.empty? show_purpose_name unsynced_table_message = "We've noticed that these tables have not been synced yet: #{@unsynced_tables.join(", ")}\n" unless @ddl_tables.empty? unsynced_table_message << " WARNING: We've noticed that at least one of these tables have not had their DDL generated yet.\n" + " We recommend you run our 'flydata sync:generate_table_ddl > create_table.sql'\n" + " to generate SQL to run on Redshift to create the correct tables\n" + " Without running this sql on your Redshift cluster, there may be issues with your data" end log_info_stdout(unsynced_table_message) if ask_yes_no("Do you want to run initial sync on all of these tables now?") initial_sync(de, options.merge(sync_resumed: false)) else #If generate_table_ddl has not been run for these tables, warn user unless @ddl_tables.empty? say(" You can generate DDL SQL for your new tables by running this command") say(" $> flydata sync:generate_table_ddl > create_table.sql") end puts "Without syncing these tables, we cannot start the flydata process" raise "Please try again" end end end def initial_sync(de, opt) # Load sync information from file validate_initial_sync_status begin if opt[:sync_resumed] # parallel cont sync has sent buffer data by now so server buffer # data will exist. Skip server data flush. flush_buffer_and_stop(target_tables_for_api, skip_flush: true) elsif !@full_initial_sync # flush leftover data for tables being added. log_info_stdout("Sending the existing buffer data...") flush_buffer_and_stop(target_tables_for_api, skip_flush: opts.skip_flush?) else # flush is unnecessary for full initial sync because it's guaranteed # that agent is stopped with no leftover buffer. end sync_mysql_to_redshift(de, opt) rescue ServerDataProcessingTimeout => e ee = ServerDataProcessingTimeout.new("Delayed Data Processing") ee.description = < #{Flydata::Preference::DataEntryPreference.conf_path(de)}" end end # Status is parsed or complete -> skip dump and parse dump_pos_info = sync_fm.load_dump_pos if dump_pos_info[:status] == STATUS_PARSED || dump_pos_info[:status] == STATUS_COMPLETE initialize_binlog_positions_and_call_callback( nil, options[:binlog_ready_callback], sync_fm) end # mysqldump file exists -> skip dump dp = flydata.data_port.get fp = sync_fm.dump_file_path if file_dump && File.exists?(fp) && File.size(fp) > 0 initialize_binlog_positions_and_call_callback( nil, options[:binlog_ready_callback], sync_fm) return call_block_or_return_io(fp, &block) end log_info_stdout("Checking MySQL server connection and configuration...") Flydata::MysqlCompatibilityCheck.new(dp, de['mysql_data_entry_preference'], dump_dir: fp, backup_dir: sync_fm.backup_dir).check log_info_stdout("Checking the database size...") db_bytesize = Flydata::Parser::Mysql::DatabaseSizeCheck.new(de['mysql_data_entry_preference'].merge('tables' => target_tables)).get_db_bytesize tables = target_tables tables ||= '' data_servers = de['mysql_data_entry_preference']['data_servers'] ? "\n data servers: #{de['mysql_data_entry_preference']['data_servers']}" : "" confirmation_text = <<-EOM FlyData Sync will start synchronizing the following database tables host: #{de['mysql_data_entry_preference']['host']} port: #{de['mysql_data_entry_preference']['port']} username: #{de['mysql_data_entry_preference']['username']} database: #{de['mysql_data_entry_preference']['database']} tables: #{tables.join(", ")}#{data_servers} EOM confirmation_text << <<-EOM if de['mysql_data_entry_preference']['ssl_ca'] ssl: Yes EOM confirmation_text << <<-EOM if file_dump dump file: #{fp} approx. size: #{as_size(db_bytesize)} (#{db_bytesize} byte) Dump file saves contents of your tables temporarily. Make sure you have enough disk space. EOM print confirmation_text log_info confirmation_text.strip if ask_yes_no('Start Sync?') if file_dump # check free disk space free_disk_bytesize = free_disk_space(File.dirname(fp)) if (free_disk_bytesize - db_bytesize) < (1024 * 1024 * 1024) # 1GB log_warn_stderr("!!WARNING There may not be enough disk space for a DB dump. We recommend 1GB free disk space after the dump. free disk space:#{as_size(free_disk_bytesize)}(#{free_disk_bytesize} byte) /" + " db size:#{as_size(db_bytesize)}(#{db_bytesize} byte)") unless ask_yes_no('Do you want to continue?') log_warn_stderr("To change the dump file directory, delete '#' and modify the path of 'mysqldump_dir:' in '#{Preference::DataEntryPreference.conf_path(de)}'") exit 1 end end end log_info_stdout("Setting binary log position and exporting data from the database.") log_info_stdout("This process can take hours depending on data size and load on your database. Please be patient...") sync_fm.save_sync_info(@full_initial_sync, target_tables) if file_dump binlog_pos = nil begin Flydata::Parser::Mysql::MysqlDumpGeneratorNoMasterData. new(de['mysql_data_entry_preference'].merge('tables' => target_tables)).dump(fp) do |_io, _binlog_pos| binlog_pos = _binlog_pos initialize_binlog_positions_and_call_callback( binlog_pos, options[:binlog_ready_callback], sync_fm) end log_info_stdout(" -> Database dump done") rescue Exception => e #Catch all exceptions including SystemExit and Interrupt. log_info_stdout "Quit while running mysqldump, deleting dump file..." sync_fm.delete_dump_file log_info_stdout "Dump file deleted, to restart the FlyData Agent, please run the 'flydata start' command " raise e end call_block_or_return_io(fp, binlog_pos, &block) else Flydata::Parser::Mysql::MysqlDumpGeneratorNoMasterData. new(de['mysql_data_entry_preference'].merge('tables' => target_tables)).dump do |io, binlog_pos| initialize_binlog_positions_and_call_callback( binlog_pos, options[:binlog_ready_callback], sync_fm) block.call(io, binlog_pos, db_bytesize) end end else exit 1 end end def initialize_binlog_positions_and_call_callback(binlog_pos, callback, sync_fm) if binlog_pos initialize_positions(sync_fm, binlog_pos) else # no binlog_pos was given because dump was completed in the # previous init sync attempt. Position files must be there already # so no initialization is necessary. end if callback callback.call(binlog_pos) end end # return available disk size(byte) def free_disk_space(dump_path) stat = Sys::Filesystem.stat(dump_path) stat.block_size * stat.blocks_available end def call_block_or_return_io(fp, binlog_pos = nil, &block) if block f_io = open_file_io(fp) begin block.call(f_io, binlog_pos) return nil ensure f_io.close rescue nil end end return open_file_io(fp) end def open_file_io(file_path) File.open(file_path, 'r', encoding: "utf-8") end # Checkpoint # -- CHANGE MASTER TO MASTER_LOG_FILE='mysql-bin.000215', MASTER_LOG_POS=120; # <- checkpoint(after binlog) #... #CREATE TABLE `accounts` ( #... #) ENGINE=InnoDB AUTO_INCREMENT=71 DEFAULT CHARSET=utf8; # <- checkpoint(after create table) #INSERT INTO `accounts` values (x,x,x),(y,y,y),....(); #INSERT INTO `accounts` values (x,x,x),(y,y,y),....(); #INSERT INTO `accounts` values (x,x,x),(y,y,y),....(); # <- checkpoint(when buffered data is sent to server) #INSERT INTO `accounts` values (x,x,x),(y,y,y),....(); #INSERT INTO `accounts` values (x,x,x),(y,y,y),....(); # #... #UNLOCK TABLES; # <- checkpoint #... #CREATE TABLE ... def parse_mysqldump_and_send(mysqldump_io, dp, de, sync_fm, binlog_pos, db_bytesize = nil) # Prepare forwarder de_tag_name = de["tag_name#{env_suffix}"] server_port = dp['server_port'] servers = if de['mysql_data_entry_preference']['data_servers'] de['mysql_data_entry_preference']['data_servers'].split(',') else dp["servers#{env_suffix}"].collect{|s| "#{s}:#{server_port}"} end forwarder_type = de['mysql_data_entry_preference']['forwarder'] || (dp['ssl_enabled'] ? 'sslforwarder' : 'tcpforwarder') forwarder = Flydata::Output::ForwarderFactory.create(forwarder_type, de_tag_name, servers) # Load dump.pos file for resume dump_pos_info = sync_fm.load_dump_pos option = dump_pos_info || {} if option[:table_name] && option[:last_pos].to_i != -1 binlog_pos = option[:binlog_pos] log_info_stdout("Resuming... Last processed table: #{option[:table_name]}") else option[:binlog_pos] = binlog_pos end log_info_stdout("Sending data to FlyData Server...") bench_start_time = Time.now # Start parsing dump file tmp_num_inserted_record = 0 dump_fp = sync_fm.dump_file_path dump_file_size = File.exists?(dump_fp) ? File.size(dump_fp) : 1 begin Flydata::Parser::Mysql::MysqlDumpParser.new(option).parse( mysqldump_io, # create table Proc.new { |mysql_table| tmp_num_inserted_record = 0 # dump mysql_table for resume #TODO: make it option sync_fm.save_mysql_table_marshal_dump(mysql_table) }, # insert record Proc.new { |mysql_table, values_set| mysql_table_name = mysql_table.table_name records = values_set.collect do |values| values = convert_to_flydata_values(mysql_table, values) json = generate_json(mysql_table, values) {table_name: mysql_table_name, log: json} end ret = forwarder.emit(records) sync_fm.save_record_count_stat(mysql_table_name, ret[:record_count]) if ret tmp_num_inserted_record += 1 ret }, # checkpoint Proc.new { |mysql_table, last_pos, bytesize, binlog_pos, state, substate| # flush if buffer records exist table_name = mysql_table.nil? ? '' : mysql_table.table_name if tmp_num_inserted_record > 0 && forwarder.buffer_record_count > 0 ret = forwarder.flush # send buffer data to the server before checkpoint sync_fm.save_record_count_stat(table_name, ret[:record_count]) if ret end # show the current progress if last_pos.to_i == -1 # stream dump log_info_stdout(" -> #{as_size(bytesize)} (#{bytesize} byte) completed...") else log_info_stdout(" -> #{(last_pos.to_f/dump_file_size * 100).round(1)}% (#{last_pos}/#{dump_file_size}) completed...") end # save check point sync_fm.save_dump_pos(STATUS_PARSING, table_name, last_pos, binlog_pos, state, substate) # send record count for the table if mysql_table && state == Flydata::Parser::Mysql::MysqlDumpParser::State::CREATE_TABLE # all records for `mysql_table` have been sent send_record_counts(de, sync_fm, mysql_table.table_name) end } ) rescue DumpParseError =>e ee = DumpParseError.new("ERROR: We encountered an error parsing this chunk:\n #{e.message}") ee.description = " Please contact support@flydata.com to report the issue." ee.set_backtrace e.backtrace raise ee end forwarder.close log_info_stdout(" -> Done") #log_info_stdout(" -> Records sent to the server") #log_info_stdout(" -> #{sync_fm.load_stats}") sync_fm.save_dump_pos(STATUS_PARSED, '', dump_file_size, binlog_pos) if ENV['FLYDATA_BENCHMARK'] bench_end_time = Time.now elapsed_time = bench_end_time.to_i - bench_start_time.to_i log_info_stdout("Elapsed:#{elapsed_time}sec start:#{bench_start_time} end:#{bench_end_time}") end end def complete_mysqldump_processing(dp, de, sync_fm) return if ENV['FLYDATA_BENCHMARK'] # Status is not parsed -> don't complete dump_pos_info = sync_fm.load_dump_pos return unless dump_pos_info[:status] == STATUS_PARSED binlog_pos = dump_pos_info[:binlog_pos] sync_fm.save_dump_pos(STATUS_COMPLETE, '', -1, binlog_pos) end # option: timeout, tables def wait_for_server_data_processing(option = {}) timeout = option[:timeout] || 0 tables = option[:tables] || [] state = :PROCESS start_time = Time.now #log_info_stdout("Uploading data to Redshift...") log_info_stdout("Processing and uploading your data on FlyData Servers...") sleep 10 status = nil prev_message =nil while (status = check_server_status(tables)) if state == :PROCESS && status['state'] == 'uploading' log_info_stdout(" -> Data processing done") state = :UPLOAD log_info_stdout("Uploading remaining data chunks...") end #TODO This is based on a temporary option if state == :UPLOAD && opts.ff? log_info_stdout("Skip checking for pending uploads") break end if status['message'] != prev_message # making some progress. Reset timer start_time = Time.now end prev_message = status['message'] if timeout > 0 && Time.now - start_time > timeout raise ServerDataProcessingTimeout.new(nil, state: state) end print_progress(status) sleep 10 end if (state == :PROCESS) # :UPLOAD state was skipped due to no data log_info_stdout(" -> Done") log_info_stdout("Finishing data upload...") end log_info_stdout(" -> Done") end def check_server_status(tables = []) de = data_entry retry_on(RestClient::Exception) do status = do_check_server_status(de, tables) if status['complete'] nil else status end end end def do_check_server_status(de, tables = []) flydata.data_entry.buffer_stat(de['id'], mode: env_mode, tables: tables) end def print_progress(buffer_stat) message = buffer_stat['message'] log_info_stdout(message) unless message.nil? || message.empty? end def complete(de) sync_fm = create_sync_file_manager(de) info = sync_fm.load_dump_pos if info[:status] == STATUS_COMPLETE send_record_counts(de, sync_fm) sync_fm.delete_dump_file sync_fm.backup_dump_dir else raise "Initial sync status is not complete. Try running 'flydata start' again." end sync_fm.close end NUM_TABLES_IN_CHUNK = 30 def send_record_counts(de, sync_fm, table = nil) stats = sync_fm.load_stats stats = { table => stats[table] } if table # single table stats stats.each_slice(NUM_TABLES_IN_CHUNK) do |slice| h = Hash[slice] send_record_counts_chunk(de, h) end end def send_record_counts_chunk(de, stats) retry_count = 0 retry_interval = 3 begin flydata.data_entry.complete_init_sync(de['id'], {init_sync_stats: {record_counts:stats}}) rescue retry_count += 1 raise if retry_count > 3 sleep retry_interval retry_interval *= 2 end end def initialize_positions(sync_fm, binlog_pos) sync_fm.save_table_binlog_pos(target_tables, binlog_pos) if @full_initial_sync sync_fm.save_binlog(binlog_pos) end sync_fm.install_table_binlog_files(target_tables) sync_fm.reset_table_position_files(target_tables) end def convert_to_flydata_values(mysql_table, values) types = mysql_table.columns.each_value.collect{|col_attrs| col_attrs[:format_type]} types.size.times.collect{|i| FlydataCore::TableDef::MysqlTableDef.convert_to_flydata_value(values[i], types[i]) } end def generate_json(mysql_table, values) h = {} mysql_table.columns.each_key.with_index do |k, i| h[k] = values[i] end h.to_json end # Sync reset def wait_for_server_buffer(option = {}) timeout = option[:timeout] || 0 tables = option[:tables] || [] start_time = Time.now log_info_stdout("Waiting for the server buffer to get empty.") prev_message =nil while (status = check_server_status(tables)) && (status['state'].nil? || status['state'] == 'processing') prev_message = status['message'] if timeout > 0 && Time.now - start_time > timeout raise ServerDataProcessingTimeout.new end print_progress(status) sleep 10 end end def cleanup_sync_server(de, tables = [], options = {}) print("Cleaning the queued items on the FlyData Servers.") log_info("Cleaning the queued items on the FlyData Servers.") worker = Thread.new do begin flydata.data_entry.cleanup_sync(de['id'], tables, options) rescue RestClient::RequestTimeout, RestClient::GatewayTimeout # server is taking time to cleanup. Try again retry end end until worker.join(5) print "." end puts log_info_stdout("Done.") end # Generate table ddl def do_generate_table_ddl(de) if `which mysqldump`.empty? raise "mysqldump is not installed. mysqldump is required to run the command" end error_list = [] schema_name = (de['schema_name'] || nil) mp = de['mysql_data_entry_preference'] tables = opts.all_tables? ? @full_tables : (@input_tables.empty? ? @unsynced_tables : @input_tables) raise "There are no valid unsynced tables, if you want to just get ddl for all tables, please run \`flydata sync:generate_table_ddl --all-tables\`" if tables.empty? %w(host username database).each do |conf_name| raise "MySQL `#{conf_name}` is neither defined in the data entry nor the local config file" if mp[conf_name].to_s.empty? end if tables.empty? raise "`tables` (or `tables_append_only`) is neither defined in the data entry nor the local config file" end create_flydata_ctl_table = true option = {skip_parimay_key_check: opts.skip_primary_key_check?}.merge(mp) missing_tables = FlydataCore::Mysql::CommandGenerator.each_mysql_tabledef(tables, option) do |mysql_tabledef, error| if error error_list << error.err_hash next end flydata_tabledef = mysql_tabledef.to_flydata_tabledef puts FlydataCore::TableDef::SyncRedshiftTableDef.from_flydata_tabledef(flydata_tabledef, flydata_ctl_table: create_flydata_ctl_table, schema_name: schema_name, ctl_only: opts.ctl_only?) create_flydata_ctl_table = false end if missing_tables missing_tables.each {|missing_table| error_list << { error: 'table does not exist in the MySQL database', table: missing_table } } end table_validity_hash = {} tables_without_error = tables unless error_list.empty? log_error_stderr("\n\nERROR: FlyData Sync will not sync the following table(s) due to an error.") group_error = error_list.group_by {|d| d[:error]} group_error.each_key do |error| group_error[error].each do |hash| if table = hash[:table] log_error_stderr(" - #{table} (#{error})") table_validity_hash[table] = error end end end log_error_stderr(< raise error if dump_pos_info[:status] == STATUS_PARSING && !File.exists?(fp) raise "FlyData Sync was interrupted with invalid state. Run 'flydata sync:reset#{@input_tables.join(',')}' first." end end def target_tables if @full_initial_sync @full_tables elsif !@input_tables.empty? @input_tables else @unsynced_tables end end # return empty array if full sync def target_tables_for_api if @full_initial_sync [] elsif !@input_tables.empty? @input_tables else @unsynced_tables end end def data_entry @de ||= retrieve_sync_data_entry end def retrieve_sync_data_entry de = retrieve_data_entries.first raise "There are no data entries." unless de case de['type'] when 'RedshiftMysqlDataEntry' mp = de['mysql_data_entry_preference'] if mp['tables_append_only'] mp['tables'] = (mp['tables'].split(",") + mp['tables_append_only'].split(",")).uniq else mp['tables'] = mp['tables'].split(",").uniq end mp['invalid_tables'] = mp['invalid_tables'].kind_of?(String) ? mp['invalid_tables'].split(",").uniq : [] mp['new_tables'] = mp['new_tables'].kind_of?(String) ? mp['new_tables'].split(",").uniq : [] unless mp['ssl_ca_content'].to_s.strip.empty? sync_fm = create_sync_file_manager(de) sync_fm.save_ssl_ca(mp['ssl_ca_content']) mp['ssl_ca'] = sync_fm.ssl_ca_path mp['sslca'] = mp['ssl_ca'] end else raise SyncDataEntryError, "No supported data entry. Only mysql-redshift sync is supported." end de end def create_sync_file_manager(de = data_entry) SyncFileManager.new(de) end def verify_input_tables(input_tables, all_tables) return unless input_tables inval_table = [] input_tables.each do |tab| inval_table << tab unless all_tables.include?(tab) end raise "These tables are not registered tables: #{inval_table.join(", ")}" unless inval_table.empty? end def check_position_files(table_status_hash) de = data_entry sync_fm = create_sync_file_manager(de) pos_mismatch_tables = [] table_status_hash.keys.collect do |table| table_status = table_status_hash[table] server_sequence = table_status["seq"] agent_sequence = sync_fm.get_table_position(table) agent_sequence = agent_sequence.to_i if agent_sequence if server_sequence != agent_sequence pos_mismatch_tables << {table: table, agent_seq: agent_sequence, server_seq: server_sequence} end end pos_mismatch_tables.empty? ? nil : pos_mismatch_tables end def check_gaps(table_status_hash) gap_tables = table_status_hash.values.select {|table_status| table_status["num_items"] > 0 && table_status["next_item"] == false }.collect{|table_status| {table: table_status["table_name"]}} gap_tables.empty? ? nil : gap_tables end def save_table_positions(table, binlog_pos, pos) de = data_entry sync_fm = create_sync_file_manager(de) s = sync_fm.get_table_binlog_pos(table) old_binlog_pos = s ? FlydataCore::Mysql::BinlogPos.new(s) : nil old_pos = sync_fm.get_table_position(table) if pos.to_i != old_pos.to_i sync_fm.save_table_position(table, pos) $log.debug "table pos updated. table:#{table} pos:#{old_pos} -> #{pos}" end if binlog_pos != old_binlog_pos sync_fm.save_table_binlog_pos(table, binlog_pos.to_s, destination: :positions) $log.debug "table binlog updated. table:#{table} binlog:`#{old_binlog_pos}` -> `#{binlog_pos}`" end [old_binlog_pos, old_pos] end def save_master_binlog_positions(master_binlog_pos, sent_binlog_pos) de = data_entry sync_fm = create_sync_file_manager(de) s = sync_fm.load_binlog old_master_binlog_pos = s ? FlydataCore::Mysql::BinlogPos.new(s) : nil s = sync_fm.load_sent_binlog old_sent_binlog_pos = s ? FlydataCore::Mysql::BinlogPos.new(s) : nil if master_binlog_pos != old_master_binlog_pos sync_fm.save_binlog(master_binlog_pos.to_s) $log.debug "master binlog positions updated. `#{old_master_binlog_pos}` -> `#{master_binlog_pos}`" end if sent_binlog_pos != old_sent_binlog_pos sync_fm.save_sent_binlog(sent_binlog_pos.to_s) $log.debug "sent binlog positions updated. `#{old_sent_binlog_pos}` -> `#{sent_binlog_pos}`" end [old_master_binlog_pos, old_sent_binlog_pos] end def get_table_status(tables) de = data_entry sync_fm = create_sync_file_manager(de) result = flydata.data_entry.table_status(de['id'], mode: env_mode, tables: tables) result = result["table_status"] table_status_hash = result.inject({}){|h, ts| h[ts["table_name"]] = ts; h} missing_tables = tables - table_status_hash.keys unless missing_tables.empty? raise "table status is not available for these table(s): #{missing_tables.join(",")}" end populate_initial_binlog_positions(table_status_hash, sync_fm) table_status_hash end # table_status has no binlog position for sequence "0". Populate the info # from 'table.binlog.pos.init' file. def populate_initial_binlog_positions(table_status_hash, sync_fm) table_status_hash.keys.each do |table| if table_status_hash[table]["src_pos"] == "-" init_binlog_pos = sync_fm.get_table_binlog_pos_init(table) unless init_binlog_pos raise "File `#{table}.binlog.pos.init` is missing" end table_status_hash[table]["src_pos"] = init_binlog_pos end end end # TODO implement def get_oldest_available_binlog nil end end end end