require 'msgpack' require 'rest_client' require 'sys/filesystem' require 'flydata/agent' require 'flydata/command/base' require 'flydata/command/conf' require 'flydata/command/sender' require 'flydata/compatibility_check' require 'flydata/errors' require 'flydata/source/errors' require 'flydata/helpers' require 'flydata/json' require 'flydata/queueable_thread' require 'flydata/output/forwarder' require 'flydata/parser' require 'flydata/preference/data_entry_preference' require 'flydata/sync_file_manager' require 'flydata/table_attribute' require 'flydata-core/table_def' require 'flydata/table_ddl' require 'flydata/event/api_event_sender' require 'flydata-core/event/event_dictionary' require 'sigdump/setup' #require 'ruby-prof' # to enable profiling, also set the class' RUN_PROFILE module Flydata module Command class Sync < Base include Helpers RUN_PROFILE = false INSERT_PROGRESS_INTERVAL = 1000 SERVER_DATA_PROCESSING_TIMEOUT = 3600 # seconds # for dump.pos file STATUS_START = 'START' # only :source_pos is available at the begining of parse STATUS_PARSING = 'PARSING' STATUS_PARSED = 'WAITING' # the value is different from the constant name on purpose for backward compatibility. STATUS_COMPLETE = 'COMPLETE' attr_reader :full_initial_sync, # true if full initial sync :full_tables, # all tables (same as the value of `tables` data entry preference) :new_tables, # tables which is not finihed initial-sync(pos file doesn't exist) :ddl_tables, # tables generated ddl :input_tables # tables which user put #target_tables # target tables for current command(sync/reset/generate_table_ddl) #target_tables_for_api # target tables for calling api(tables parameter needs to be empty for full_initial_sync) # Command: flydata sync # - Arguments def self.slop Slop.new do on 'c', 'skip-cleanup', 'Skip server cleanup' on 'f', 'skip-flush', 'Skip server flush' on 'y', 'yes', 'Skip command prompt assuming yes to all questions. Use this for batch operation.' on 'd', 'dump-file', 'Save dump result into a file.' on 's', 'dump-stream', 'Stream dump result to a pipe instead of saving dump file. It might cause timeout error if db size is larger than 10GB.' on 'n', 'no-flydata-start', 'Don\'t start the flydata agent after initial sync.' #TODO : This option is temp! Should remove soon. on 'ff', 'Skip checking query queue and flush' end end # Command: flydata sync # - Entry method def run(*tables) raise "Command 'flydata sync' has been deprecated. Use 'flydata start' instead." end run_exclusive :run # Public method # - Called from Sender#start/restart def try_initial_sync(options) handle_initial_sync(options) if source.sync.supported? rescue Source::UnsupportedSourceError return end # Command: flydata sync:flush # - Arguments def self.slop_flush Slop.new do on 'f', 'skip-flush', 'Skip server flush' on 'y', 'yes', 'Skip command prompt assuming yes to all questions. Use this for batch operation.' on 'force-run', 'Run forcefully, ignoring exclusive run info' end end # Command: flydata sync:flush # - Entry method def flush(*tables) begin flush_buffer_and_stop(tables, skip_flush: opts.skip_flush?) rescue ServerDataProcessingTimeout => e ee = ServerDataProcessingTimeout.new("Delayed Data Processing") ee.description = < 0 && !opts[:force] log_info_stdout < e ee = ServerDataProcessingTimeout.new("Delayed Data Processing") ee.description = < #{source_pos_path}") log_info_stdout("Run 'flydata start' to start continuous sync.") end run_exclusive :skip # Command: flydata sync:generate_table_ddl # - Arguments def self.slop_generate_table_ddl Slop.new do on 'c', 'ctl-only', 'Only generate FlyData Control definitions' on 'y', 'yes', 'Skip command prompt assuming yes to all questions. Use this for batch operation.' on 's', 'skip-primary-key-check', 'Skip primary key check when generating DDL' on 'all-tables', 'Generate all table schema' on 'drop-append-only', 'Include queries to drop append-only tables' #no 'force-run' option. because stdout is often redirected to a file. end end # Command: flydata sync:generate_table_ddl # - Entry method def generate_table_ddl(*tables) # Compatibility check de = data_entry context = source.sync_generate_table_ddl(flydata.data_port.get) context.run_compatibility_check # Set instance variables set_current_tables(tables, include_all_tables: true) unless generate_and_print_table_ddl(context, de) raise "There are no valid unsynced tables, if you want to just get ddl for all tables, please run \`flydata sync:generate_table_ddl --all-tables\`" end end run_exclusive :generate_table_ddl # Command: flydata sync:repair # - Arguments def self.slop_repair Slop.new do on 'y', 'yes', 'Skip command prompt assuming yes to all questions. Use this for batch operation.' on 'skip-start', 'Skip auto start after repair is completed.' on 'force-run', 'Run forcefully, ignoring exclusive run info' end end # Command: flydata sync:repair # - Entry method def repair need_to_start = _repair if need_to_start && !opts.skip_start? Flydata::Command::Sender.new.start end end run_exclusive :repair # Command: flydata sync:check # - Entry method def check(options = {}) context = source.source_pos status, corrupt_master_pos_files, pos_mismatch_tables, gap_tables = _check(context, options) if status.include? :OK message = "\nNo errors are found. Sync is clean.\n" else message = "\nFollowing errors are found.\n" if status.include? :STUCK_AT_PROCESS message += " - Timeout while processing data\n" end if status.include? :STUCK_AT_UPLOAD message += " - Timeout while uploading data\n" end if status.include? :ABNORMAL_SHUTDOWN message += " - Agent was not shut down correctly\n" end if status.include? :CORRUPT_MASTER_POS message += " - Master source position is corrupted\n" end if gap_tables message += " - Sync data is missing for the following table(s)\n" gap_tables.each do |bt| message += " table:#{bt[:table]}\n" end message += "\n" end if pos_mismatch_tables message += " - Incorrect table position. This may not be a real issue, caused by pending upload chunks. Run sync:flush and try again.\n" pos_mismatch_tables.each do |bt| message += " table:#{bt[:table]}, agent position:#{bt[:agent_seq] ? bt[:agent_seq] : '(missing)'}, server position:#{bt[:server_seq]}\n" end message += "\n" end end log_info_stdout message end run_exclusive :check # Command: flydata sync:compat_check # - Entry method def compat_check context = source.sync_generate_table_ddl(flydata.data_port.get) context.run_compatibility_check end run_exclusive :compat_check private def generate_kv_pairs(keys, values) h = {} i = 0 sz = keys.size while i < sz h[keys[i]] = values[i] i += 1 end JSON.generate(h) end def _check(source_pos_context, options = {}) options[:stop_agent] ||= false set_current_tables sender = Flydata::Command::Sender.new start_process = !options[:stop_agent] && sender.process_exist? pos_mismatch_tables = nil gap_tables = nil data_stuck_at = nil abnormal_shutdown = false begin begin # wait until no data gets processed for 3 minutes. This is long # to wait, but there *are* cases where active data processing takes # more than 3 minutes between 2 chunk processing if COPY command # takes minutes to process. flush_buffer_and_stop(@full_tables, force: false, timeout: 180, dont_wait_upload: true) rescue ServerDataProcessingTimeout => e data_stuck_at = e.state end # Agent is stopped but locked. There was an abnormal shutdown. abnormal_shutdown = sender.agent_locked? table_status_hash = get_table_status(@full_tables, source_pos_context) corrupt_master_pos_files = check_master_position_files pos_mismatch_tables = check_position_files(table_status_hash) gap_tables = check_gaps(table_status_hash) ensure Flydata::Command::Sender.new.start(quiet: true) if start_process end status = [] if data_stuck_at == :PROCESS status << :STUCK_AT_PROCESS end if data_stuck_at == :UPLOAD status << :STUCK_AT_UPLOAD end if corrupt_master_pos_files status << :CORRUPT_MASTER_POS end if gap_tables status << :TABLE_GAPS end if pos_mismatch_tables status << :TABLE_POS_MISMATCH end if abnormal_shutdown status << :ABNORMAL_SHUTDOWN end if status.empty? status << :OK end [status, corrupt_master_pos_files, pos_mismatch_tables, gap_tables, table_status_hash] end def _repair de = data_entry sync_fm = create_sync_file_manager(de) context = source.source_pos set_current_tables # Stop agent. Check sync and make sure the state is :STUCK_AT_UPLOAD # Get table status for the tables. status, corrupt_master_pos_files, pos_mismatch_tables, gap_tables, table_status_hash = _check(context, stop_agent:true) if status.include? :STUCK_AT_PROCESS e = AgentError.new("Timeout while processing data") e.description = < table_source_pos if oldest_source_pos && table_source_pos < oldest_source_pos unrepairable_tables << table else sent_source_pos = table_source_pos end end end end unless unrepairable_tables.empty? # Notify expired source position tables through error logs to us log_error_stderr "[error]: Failed to repair tables due to expired source position. These tables need to be re-synced - #{unrepairable_tables.join(", ")}" end # If sent_source_pos is nil, it means: # - Sync has started for none of tables # - None of tables are broken # - All of broken tables have an expired source position # No need to repair positions nor clean buffer data. if sent_source_pos # This logic is unreachable since sent_source_pos cannot be more than oldest_source_pos if oldest_source_pos && sent_source_pos < oldest_source_pos e = AgentError.new("Repair failed due to expired source position") e.description = < create_table.sql'\n" + " to generate SQL to run on Redshift to create the correct tables\n" + " Without running this sql on your Redshift cluster, there may be issues with your data" end log_info_stdout(unsynced_table_message) if ask_yes_no("Do you want to run initial sync on all of these tables now?") initial_sync(options.merge(sync_resumed: false)) else #If generate_table_ddl has not been run for these tables, warn user unless @no_ddl_generated_tables.empty? say(" You can generate DDL SQL for your new tables by running this command") say(" $> flydata sync:generate_table_ddl > create_table.sql") end puts "Without syncing these tables, we cannot start the flydata process" raise "Please try again" end end end def initial_sync(opt) de = data_entry # Load sync information from file aborted_during_dump = validate_initial_sync_status if aborted_during_dump # The previous init-sync failed during dump. # - Discard remainng continuous-sync data by automatically running `sync:reset --init` # and run init-sync from scratch. # - Will keep ./positions/*.generated_ddl files. # No need to re-run sync:generate_table_ddl to drop tables on Redshift because # no data was written to Redshift during the previous init-sync. $log.info "Resetting the initial sync..." recover_cmd = "flydata restart" _reset(recover_cmd, reset_client_only: false, delete_tbl_ddl: false) # Setup instance variables again sync_resumed = set_current_tables(nil, resume: true) end begin if opt[:sync_resumed] # parallel cont sync has sent buffer data by now so server buffer # data will exist. Skip server data flush. flush_buffer_and_stop(target_tables_for_api, skip_flush: true) elsif !@full_initial_sync # flush leftover data for tables being added. log_info_stdout("Sending the existing buffer data...") flush_buffer_and_stop(target_tables_for_api, skip_flush: opts.skip_flush?) else # flush is unnecessary for full initial sync because it's guaranteed # that agent is stopped with no leftover buffer. end perform_initial_sync(de, opt) rescue ServerDataProcessingTimeout => e ee = ServerDataProcessingTimeout.new("Delayed Data Processing") ee.description = < skip dump and parse dump_pos_info = sync_fm.load_dump_pos if dump_pos_info[:status] == STATUS_PARSED || dump_pos_info[:status] == STATUS_COMPLETE initialize_source_positions_and_call_callback( nil, options[:source_pos_ready_callback], sync_fm) return end # dump file exists -> skip dump fp = sync_fm.dump_file_path if file_dump && File.exists?(fp) && File.size(fp) > 0 initialize_source_positions_and_call_callback( nil, options[:source_pos_ready_callback], sync_fm) return call_block_or_return_io(fp, &dump_ready_callback) end log_info_stdout("Checking the data source connection and configuration...") context.run_compatibility_check(fp, sync_fm.backup_dir) log_info_stdout("Checking the database size...") db_bytesize = context.dump_size(target_tables) tables = target_tables tables ||= '' data_servers = source.sync.data_servers data_servers = data_servers ? "\n data servers: #{data_servers}" : "" confirmation_text = <<-EOM FlyData Sync will start synchronizing the following database tables EOM context.confirmation_items.each do |name, value| confirmation_text << " %-14s%s\n" % ["#{name}:", value.to_s] end confirmation_text << <<-EOM tables: #{tables.join(", ")}#{data_servers} EOM confirmation_text << <<-EOM if file_dump dump file: #{fp} approx. size: #{as_size(db_bytesize)} (#{db_bytesize} byte) Dump file saves contents of your tables temporarily. Make sure you have enough disk space. EOM print confirmation_text log_info confirmation_text.strip if ask_yes_no('Start Sync?') if file_dump # check free disk space free_disk_bytesize = free_disk_space(File.dirname(fp)) if (free_disk_bytesize - db_bytesize) < (1024 * 1024 * 1024) # 1GB log_warn_stderr("!!WARNING There may not be enough disk space for a DB dump. We recommend 1GB free disk space after the dump. free disk space:#{as_size(free_disk_bytesize)}(#{free_disk_bytesize} byte) /" + " db size:#{as_size(db_bytesize)}(#{db_bytesize} byte)") unless ask_yes_no('Do you want to continue?') log_warn_stderr("To change the dump file directory, delete '#' and modify the path of 'dump_dir:' in '#{Preference::DataEntryPreference.conf_path(de)}'") exit 1 end end end log_info_stdout("Setting binary log position and exporting data from the database.") log_info_stdout("This process can take hours depending on data size and load on your database. Please be patient...") sync_fm.save_sync_info(@full_initial_sync, target_tables) # This notification will be uncommented after init_sync_finish email integration is released unless options[:sync_resumed] FlydataCore::Event::ApiEventSender.instance.send_event( FlydataCore::Event::INIT_SYNC, FlydataCore::Event::STARTED, data: {"tables" => target_tables, 'no_email' => options[:no_email]}, data_entry_id: de['id'], data_port_id: de['data_port_id']) end if file_dump source_pos = nil begin context.dump(target_tables, fp) do |_io, _source_pos| source_pos = _source_pos initialize_source_positions_and_call_callback(source_pos, options[:source_pos_ready_callback], sync_fm) end log_info_stdout(" -> Database dump done") rescue Exception => e #Catch all exceptions including SystemExit and Interrupt. log_info_stdout "Quit while running dump, deleting dump file..." sync_fm.delete_dump_file log_info_stdout "Dump file deleted. To restart the FlyData Agent, run the following command: flydata restart" raise e end call_block_or_return_io(fp, source_pos, &dump_ready_callback) else context.dump(target_tables) do |io, source_pos| initialize_source_positions_and_call_callback(source_pos, options[:source_pos_ready_callback], sync_fm) dump_ready_callback.call(io, source_pos) end end else exit 1 end end def initialize_source_positions_and_call_callback(source_pos, callback, sync_fm) if source_pos initialize_positions(sync_fm, source_pos) else # no source_pos was given because dump was completed in the # previous init sync attempt. Position files must be there already # so no initialization is necessary. end if callback callback.call end end # return available disk size(byte) def free_disk_space(dump_path) stat = Sys::Filesystem.stat(dump_path) stat.block_size * stat.blocks_available end def call_block_or_return_io(fp, source_pos= nil, &source_pos_block) if source_pos_block f_io = open_file_io(fp) begin source_pos_block.call(f_io, source_pos) return nil ensure f_io.close rescue nil end end return open_file_io(fp) end def open_file_io(file_path) File.open(file_path, 'r', encoding: "utf-8") end # Checkpoint # -- CHANGE MASTER TO MASTER_LOG_FILE='mysql-bin.000215', MASTER_LOG_POS=120; # <- checkpoint(after binlog) #... #CREATE TABLE `accounts` ( #... #) ENGINE=InnoDB AUTO_INCREMENT=71 DEFAULT CHARSET=utf8; # <- checkpoint(after create table) #INSERT INTO `accounts` values (x,x,x),(y,y,y),....(); #INSERT INTO `accounts` values (x,x,x),(y,y,y),....(); #INSERT INTO `accounts` values (x,x,x),(y,y,y),....(); # <- checkpoint(when buffered data is sent to server) #INSERT INTO `accounts` values (x,x,x),(y,y,y),....(); #INSERT INTO `accounts` values (x,x,x),(y,y,y),....(); # #... #UNLOCK TABLES; # <- checkpoint #... #CREATE TABLE ... def parse_dump_and_send(dump_io, dp, de, sync_fm, source_pos) context = source.parse_dump_and_send # Load dump.pos file for resume dump_pos_info = sync_fm.load_dump_pos if dump_pos_info.empty? # First time to parse. No dump pos file available. # Re-initialize dump_pos_info hash with new dump pos file. if source_pos.nil? raise "Cannot parse dump because source position is not available. Reset unfinished initial sync by command: flydata sync:reset --init" end sync_fm.save_dump_pos(STATUS_START, nil, nil, source_pos) dump_pos_info = sync_fm.load_dump_pos end source_pos = dump_pos_info[:source_pos] if source_pos.nil? # this should never happen but just in case... raise "Source position is not available. Parse dump failed. Reset unfinished initial sync by command: flydata sync:reset --init" end if dump_pos_info[:table_name] && dump_pos_info[:last_pos].to_i != -1 log_info_stdout("Resuming... Last processed table: #{dump_pos_info[:table_name]}") end if dump_pos_info[:source_table] && dump_pos_info[:source_table].value_converters.nil? # Old marshal dumped source_table object may not have value_converters dump_pos_info[:source_table].set_value_converters(context.value_converters) end if dump_pos_info[:source_table] && dump_pos_info[:source_table].column_names.nil? # Old marshal dumped source_table object may not have column_names dump_pos_info[:source_table].set_column_names end # Prepare forwarder forwarder = build_forwarder(dp, de) log_info_stdout("Sending data to FlyData Server...") # initialize stats target_tables.each do |table_name| sync_fm.save_record_count_stat(table_name, 0) end bench_start_time = Time.now output_thread = QueueableThread.new # Start parsing dump file tmp_num_inserted_record = 0 skip_checkpoint = false dump_fp = sync_fm.dump_file_path dump_file_size = File.exists?(dump_fp) ? File.size(dump_fp) : 1 send_record_counts_threads = [] begin RubyProf.start if RUN_PROFILE and defined?(RubyProf) and not RubyProf.running? context.parse_dump( dump_pos_info, dump_io, # create table Proc.new { |source_table| source_table.set_value_converters(context.value_converters) st = source_table.clone st.value_converters = {} # don't let the closure hold these objects as it causes memory leak source_table = nil output_thread.run do tmp_num_inserted_record = 0 # dump source_table for resume #TODO: make it option sync_fm.save_source_table_marshal_dump(st) log_info_stdout(" -> Started sending data for table '#{st.table_name}'") end }, # insert record Proc.new { |source_table, values_set| source_table_name = source_table.table_name records = values_set.collect do |values| convert_to_flydata_values(source_table, values) json = JSON.generate_kv_pairs(source_table.column_names, values) values.clear unless json.start_with? '{' raise DumpParseError.new("Broken JSON record json:#{json[0..100]}") end {table_name: source_table_name, log: json} end # release resources to prevent the closure from keeping it values_set.clear values_set = nil source_table = nil source_table_name = nil output_thread.run do forwarder.emit(records) tmp_num_inserted_record += 1 skip_checkpoint = !forwarder.buffer_full? end true }, # checkpoint Proc.new { |source_table, last_pos, bytesize, source_pos, state, substate| table_name = source_table.nil? ? '' : source_table.table_name source_table = nil output_thread.run do skip = skip_checkpoint skip_checkpoint = false unless skip # flush if buffer records exist if tmp_num_inserted_record > 0 && forwarder.buffer_record_count > 0 ret = forwarder.flush # send buffer data to the server before checkpoint sync_fm.save_record_count_stat(table_name, ret[:record_count]) if ret # save check point sync_fm.save_dump_pos(STATUS_PARSING, table_name, last_pos, source_pos, state, substate) end # show the current progress if last_pos.to_i == -1 # stream dump log_info_stdout(" -> #{as_size(bytesize)} (#{bytesize} byte) completed...") else log_info_stdout(" -> #{(last_pos.to_f/dump_file_size * 100).round(1)}% (#{last_pos}/#{dump_file_size}) completed...") end # send record count for the table if table_name.to_s != '' && state == Flydata::Parser::State::CREATE_TABLE # all records for `source_table` have been sent send_record_counts(de, sync_fm, table_name, send_record_counts_threads) log_info_stdout(" -> Finished sending data for table '#{table_name}'...") end end end } ) rescue DumpParseError =>e ee = DumpParseError.new("ERROR: We encountered an error parsing this chunk:\n #{e.message}") ee.description = " Please contact support@flydata.com to report the issue." ee.set_backtrace e.backtrace raise ee ensure if RUN_PROFILE and defined?(RubyProf) and RubyProf.running? result = RubyProf.stop #printer = RubyProf::GraphPrinter.new(result) printer = RubyProf::GraphHtmlPrinter.new(result) #printer.print(STDOUT) printer.print(File.new("ruby-prof-out-#{Time.now.to_i}.html", "w"), :min_percent => 3) end end send_record_counts_threads.each{|t| t.join} output_thread.join forwarder.close sync_fm.save_dump_pos(STATUS_PARSED, '', dump_file_size, source_pos) log_info_stdout(" -> Done") #log_info_stdout(" -> Records sent to the server") #log_info_stdout(" -> #{sync_fm.load_stats}") if ENV['FLYDATA_BENCHMARK'] bench_end_time = Time.now elapsed_time = bench_end_time.to_i - bench_start_time.to_i log_info_stdout("Elapsed:#{elapsed_time}sec start:#{bench_start_time} end:#{bench_end_time}") end end def complete_dump_processing(sync_fm) return if ENV['FLYDATA_BENCHMARK'] # Status is not parsed -> don't complete dump_pos_info = sync_fm.load_dump_pos return unless dump_pos_info[:status] == STATUS_PARSED source_pos = dump_pos_info[:source_pos] sync_fm.save_dump_pos(STATUS_COMPLETE, '', -1, source_pos) end # option: timeout, tables def wait_for_server_data_processing(option = {}) timeout = option[:timeout] || 0 tables = option[:tables] || [] state = :PROCESS start_time = Time.now #log_info_stdout("Uploading data to Redshift...") log_info_stdout("Processing and uploading your data on FlyData Servers...") sleep 10 status = nil prev_message =nil while (status = check_server_status(tables)) if state == :PROCESS && status['state'] == 'uploading' log_info_stdout(" -> Data processing done") state = :UPLOAD log_info_stdout("Uploading remaining data chunks...") end #TODO This is based on a temporary option if state == :UPLOAD && opts.ff? log_info_stdout("Skip checking for pending uploads") break end if state == :UPLOAD && option[:dont_wait_upload] break end if status['message'] != prev_message # making some progress. Reset timer start_time = Time.now end prev_message = status['message'] if timeout > 0 && Time.now - start_time > timeout raise ServerDataProcessingTimeout.new(nil, state: state) end print_progress(status) sleep 10 end if state == :PROCESS && !option[:dont_wait_upload] # :UPLOAD state was skipped due to no data log_info_stdout(" -> Done") log_info_stdout("Finishing data upload...") end log_info_stdout(" -> Done") end def check_server_status(tables = []) de = data_entry retry_on(RestClient::Exception) do status = do_check_server_status(de, tables) if status['complete'] nil else status end end end def do_check_server_status(de, tables = []) flydata.data_entry.buffer_stat(de['id'], mode: env_mode, tables: tables) end def print_progress(buffer_stat) message = buffer_stat['message'] log_info_stdout(message) unless message.nil? || message.empty? end def complete(de) sync_fm = create_sync_file_manager(de) dump_pos_info = sync_fm.load_dump_pos if dump_pos_info[:status] == STATUS_COMPLETE send_record_counts(de, sync_fm) sync_fm.delete_dump_file sync_fm.backup_dump_dir else raise "Initial sync status is not complete. Try running 'flydata start' again." end sync_fm.close end NUM_TABLES_IN_CHUNK = 30 def send_record_counts(de, sync_fm, table = nil, thread_array = nil) stats = sync_fm.load_stats stats = { table => stats[table] } if table # single table stats stats.each_slice(NUM_TABLES_IN_CHUNK) do |slice| h = Hash[slice] send_record_counts_chunk(de, h, thread_array) end end def send_record_counts_chunk(de, stats, thread_array) if thread_array thread_array << Thread.new { do_send_record_counts_chunk(de, stats) } else do_send_record_counts_chunk(de, stats) end end def do_send_record_counts_chunk(de, stats) retry_count = 0 retry_interval = 3 begin flydata.data_entry.complete_init_sync(de['id'], {init_sync_stats: {record_counts:stats}}) rescue retry_count += 1 raise if retry_count > 3 sleep retry_interval retry_interval *= 2 end end def initialize_positions(sync_fm, source_pos) sync_fm.save_table_source_pos(target_tables, source_pos) if @full_initial_sync sync_fm.save_source_pos(source_pos) end sync_fm.install_table_source_pos_files(target_tables) sync_fm.reset_table_position_files(target_tables) end def convert_to_flydata_values(source_table, values) vc = source_table.value_converters return if vc.empty? vc.each_pair do |index, converter| values[index] = converter.call(values[index]) end end def generate_json(column_names, values) h = {} sz = column_names.size i = 0 while i < sz h[column_names[i]] = values[i] i += 1 end h.to_json end # Sync reset def wait_for_server_buffer(option = {}) timeout = option[:timeout] || 0 tables = option[:tables] || [] start_time = Time.now log_info_stdout("Waiting for the server buffer to get empty.") prev_message =nil while (status = check_server_status(tables)) && (status['state'].nil? || status['state'] == 'processing') prev_message = status['message'] if timeout > 0 && Time.now - start_time > timeout raise ServerDataProcessingTimeout.new("Delayed Data Processing") end print_progress(status) sleep 10 end end def cleanup_sync_server(de, tables = [], options = {}) print("Cleaning the queued items on the FlyData Servers.") log_info("Cleaning the queued items on the FlyData Servers.") worker = Thread.new do begin flydata.data_entry.cleanup_sync(de['id'], tables, options) rescue RestClient::RequestTimeout, RestClient::GatewayTimeout # server is taking time to cleanup. Try again retry end end until worker.join(5) print "." end puts log_info_stdout("Done.") end # Generate and print table ddl def generate_and_print_table_ddl(context, de) generate_flydata_tabledefs(context, de) do |flydata_tabledef| puts FlydataCore::TableDef::SyncRedshiftTableDef.from_flydata_tabledef(flydata_tabledef, flydata_tabledef[:ddl_options]) end end def generate_flydata_tabledefs(context, de) schema_name = (de['schema_name'] || nil) tables = opts.all_tables? ? @full_tables : (@input_tables.empty? ? @unsynced_tables : @input_tables) # Target table checks return nil if tables.empty? # Create flydata table definitions with error list flydata_tabledefs, error_list, uk_as_pk_override = context.generate_flydata_tabledef(tables, skip_primary_key_check: opts.skip_primary_key_check?) # Output DDL for Redshift create_flydata_ctl_table = true append_only = tables & @append_only_tables flydata_tabledefs.each do |flydata_tabledef| ddl_options = { flydata_ctl_table: create_flydata_ctl_table, schema_name: schema_name, ctl_only: opts.ctl_only?, skip_drop_table: opts.drop_append_only? ? false : append_only.include?(flydata_tabledef[:table_name]), skip_primary_key_check: opts.skip_primary_key_check? } flydata_tabledef[:ddl_options] = ddl_options if block_given? yield flydata_tabledef end create_flydata_ctl_table = false end # Set error info to table_validity_hash for error tables table_validity_hash = Hash.new {|h,k| h[k] = {}} tables_without_error = tables unless error_list.empty? log_error_stderr("\n\nERROR: FlyData Sync will not sync the following table(s) due to an error.") group_error = error_list.group_by {|d| d[:error]} group_error.each_key do |error| group_error[error].each do |hash| if table = hash[:table] log_error_stderr(" - #{table} (#{error})") table_validity_hash[table][TableAttribute::INVALID_TABLE_REASON] = error end end end log_error_stderr(< raise error if dump_pos_info[:status] == STATUS_PARSING && dump_file_deleted raise "FlyData Sync was interrupted with invalid state. Run 'flydata sync:reset#{@input_tables.join(',')}' first." end # check if the previous initial sync was aborted during dump. sync_info_exists && dump_file_deleted end def target_tables if @full_initial_sync @full_tables elsif !@input_tables.empty? @input_tables else @unsynced_tables end end # return empty array if full sync def target_tables_for_api if @full_initial_sync [] elsif !@input_tables.empty? @input_tables else @unsynced_tables end end def data_entry unless @sync_de @sync_de = super source.sync.setup # updates the data entry contents end @sync_de end def create_sync_file_manager(de = data_entry) SyncFileManager.new(de, source) end def verify_input_tables(input_tables, all_tables) return unless input_tables inval_table = [] input_tables.each do |tab| inval_table << tab unless all_tables.include?(tab) end raise "These tables are not registered tables: #{inval_table.join(", ")}" unless inval_table.empty? end def check_master_position_files de = data_entry sync_fm = create_sync_file_manager(de) old_master_source_pos = sync_fm.load_source_pos old_sent_source_pos = sync_fm.load_sent_source_pos corrupt_files = [] corrupt_files << sync_fm.source_pos_path if old_master_source_pos.nil? corrupt_files << sync_fm.sent_source_pos_path if old_sent_source_pos.nil? corrupt_files.empty? ? nil : corrupt_files end def check_position_files(table_status_hash) de = data_entry sync_fm = create_sync_file_manager(de) pos_mismatch_tables = [] table_status_hash.keys.collect do |table| table_status = table_status_hash[table] server_sequence = table_status["seq"] agent_sequence = sync_fm.get_table_position(table) agent_sequence = agent_sequence.to_i # nil is considred 0 if server_sequence != agent_sequence pos_mismatch_tables << {table: table, agent_seq: agent_sequence, server_seq: server_sequence} end end pos_mismatch_tables.empty? ? nil : pos_mismatch_tables end def check_gaps(table_status_hash) gap_tables = table_status_hash.values.select {|table_status| table_status["num_items"] > 0 && table_status["next_item"] == false }.collect{|table_status| {table: table_status["table_name"]}} gap_tables.empty? ? nil : gap_tables end def save_table_positions(table, source_pos, pos, context) de = data_entry sync_fm = create_sync_file_manager(de) master_sent_source_pos = sync_fm.load_sent_source_pos old_source_pos = sync_fm.get_table_source_pos(table) old_pos = sync_fm.get_table_position(table) source_pos ||= old_source_pos source_pos ||= master_sent_source_pos raise "master sent source pos is missing" unless source_pos pos ||= old_pos raise "table pos is missing for table '#{table}'" unless pos if pos.to_i != old_pos.to_i sync_fm.save_table_position(table, pos) $log.debug "table pos updated. table:#{table} pos:#{old_pos} -> #{pos}" end if source_pos != old_source_pos # keep binlog pos file if source pos is infinity until reset happenes if old_source_pos && old_source_pos.infinity? $log.debug "table source_pos update skipped. table:#{table} source_pos:`#{old_source_pos}` -> `#{old_source_pos}`" else sync_fm.save_table_source_pos(table, source_pos, destination: :positions) $log.debug "table source_pos updated. table:#{table} source_pos:`#{old_source_pos}` -> `#{source_pos}`" end end [old_source_pos, old_pos] end def save_master_source_positions(master_source_pos, sent_source_pos, context) de = data_entry sync_fm = create_sync_file_manager(de) old_master_source_pos = sync_fm.load_source_pos old_sent_source_pos = sync_fm.load_sent_source_pos if master_source_pos != old_master_source_pos sync_fm.save_source_pos(master_source_pos) $log.debug "master source positions updated. `#{old_master_source_pos}` -> `#{master_source_pos}`" end if sent_source_pos != old_sent_source_pos sync_fm.save_sent_source_pos(sent_source_pos.to_s) $log.debug "sent source positions updated. `#{old_sent_source_pos}` -> `#{sent_source_pos}`" end [old_master_source_pos, old_sent_source_pos] end def get_table_status(tables, source_pos_context) de = data_entry sync_fm = create_sync_file_manager(de) result = flydata.data_entry.table_status(de['id'], mode: env_mode, tables: tables) result = result["table_status"] table_status_hash = result.inject({}){|h, ts| h[ts["table_name"]] = ts; h} source_pos = nil table_status_hash.keys.each do |table| src_pos_str = table_status_hash[table].delete("src_pos") source_pos = src_pos_str ? source_pos_context.create_source_pos(src_pos_str) : nil table_status_hash[table].merge!( { "source_pos" => source_pos } ) end populate_initial_source_positions(table_status_hash, sync_fm) table_status_hash end # table_status has no source position for sequence "0". Populate the info # from 'table.binlog.pos.init' file. def populate_initial_source_positions(table_status_hash, sync_fm) table_status_hash.keys.each do |table| table_source_pos = table_status_hash[table]["source_pos"] if table_source_pos.nil? || table_source_pos.empty? init_source_pos = sync_fm.get_table_source_pos_init(table) if init_source_pos table_status_hash[table]["source_pos"] = init_source_pos end end end end def get_oldest_available_source_pos source.sync_repair.get_oldest_available_source_pos end end end end