require 'fiber' require 'msgpack' require 'open3' require 'mysql2' require 'flydata/sync_file_manager' require 'flydata/errors' require 'flydata/table_def' #require 'ruby-prof' module Flydata module Command class Sync < Base include Helpers CREATE_TABLE_OPTION = !!(ENV['FLYDATA_CREATE_TABLE_OPTION']) || false INSERT_PROGRESS_INTERVAL = 1000 # for dump.pos file STATUS_PARSING = 'PARSING' STATUS_COMPLETE = 'COMPLETE' def self.slop Slop.new do on 'c', 'skip-cleanup', 'Skip server cleanup' on 'y', 'yes', 'Skip command prompt assuming yes to all questions. Use this for batch operation.' end end def run(*tables) sender = Flydata::Command::Sender.new if (sender.process_exist?) if tables.empty? # full sync puts "FlyData Agent is already running. If you'd like to restart FlyData Sync from scratch, run 'flydata sync:reset' first." else # per-table sync puts "Flydata Agent is already running. If you'd like to Sync the table(s), run 'flydata sync:flush' first." end exit 1 end de = retrieve_data_entry de = load_sync_info(override_tables(de, tables)) flush_buffer_and_stop unless de['mysql_data_entry_preference']['initial_sync'] sync_mysql_to_redshift(de) end def self.slop_flush Slop.new do on 'y', 'yes', 'Skip command prompt assuming yes to all questions. Use this for batch operation.' end end def flush flush_buffer_and_stop puts "Buffers have been flushed and the sender process has been stopped." end def self.slop_reset Slop.new do on 'c', 'client', 'Resets client only.' on 'y', 'yes', 'Skip command prompt assuming yes to all questions. Use this for batch operation.' end end def reset(*tables) msg = tables.empty? ? '' : " for these tables : #{tables.join(" ")}" return unless ask_yes_no("This resets the current sync#{msg}. Are you sure?") sender = Flydata::Command::Sender.new sender.flush_client_buffer # TODO We should rather delete buffer files sender.stop de = retrieve_data_entry wait_for_server_buffer cleanup_sync_server(de, tables) unless opts.client? sync_fm = Flydata::FileUtil::SyncFileManager.new(de) delete_files = [ sync_fm.dump_file_path, sync_fm.dump_pos_path, sync_fm.mysql_table_marshal_dump_path, sync_fm.sync_info_file, sync_fm.table_position_file_paths(*tables), sync_fm.table_rev_file_paths(*tables) ] delete_files << sync_fm.binlog_path if tables.empty? delete_files.flatten.each do |path| FileUtils.rm(path) if File.exists?(path) end end def wait_for_server_buffer puts "Waiting for the server buffer to get empty" while (status = check) && (status['state'] == 'processing') print_progress(status) sleep 10 end end def wait_for_server_data_processing state = :PROCESS puts "Uploading data to Redshift..." sleep 10 status = nil while (status = check) if state == :PROCESS && status['state'] == 'uploading' puts " -> Done" state = :UPLOAD puts "Finishing data upload..." end print_progress(status) sleep 10 end if (state == :PROCESS) # :UPLOAD state was skipped due to no data puts " -> Done" puts "Finishing data upload..." end puts " -> Done" end def check de = retrieve_data_entry retry_on(RestClient::Exception) do status = do_check(de) if status['complete'] nil else status end end end # skip initial sync def skip de = retrieve_data_entry sync_fm = Flydata::FileUtil::SyncFileManager.new(de) binlog_path = sync_fm.binlog_path `touch #{binlog_path}` puts "Created an empty binlog position file." puts "-> #{binlog_path}" puts "Run 'flydata start' to start continuous sync." end def self.slop_generate_table_ddl Slop.new do on 'c', 'ctl-only', 'Only generate FlyData Control definitions' on 'y', 'yes', 'Skip command prompt assuming yes to all questions. Use this for batch operation.' end end def generate_table_ddl(*tables) de = retrieve_data_entry Flydata::Mysql::CompatibilityCheck.new(de['mysql_data_entry_preference']).check do_generate_table_ddl(override_tables(de, tables)) end private def retrieve_data_entry de = retrieve_data_entries.first raise "There are no data entry." unless de case de['type'] when 'RedshiftMysqlDataEntry' mp = de['mysql_data_entry_preference'] if mp['tables_append_only'] mp['tables'] = (mp['tables'].split(",") + mp['tables_append_only'].split(",")).uniq.join(",") end else raise "No supported data entry. Only mysql-redshift sync is supported." end de end def cleanup_sync_server(de, tables = []) puts "Cleaning the server" flydata.data_entry.cleanup_sync(de['id'], tables) end def do_check(de) flydata.data_entry.buffer_stat(de['id'], env_mode) end def print_progress(buffer_stat) message = buffer_stat['message'] puts message unless message.nil? || message.empty? end DDL_DUMP_CMD_TEMPLATE = "mysqldump --protocol=tcp -d -h %s -P %s -u %s %s %s %s" def do_generate_table_ddl(de) if `which mysqldump`.empty? raise "mysqldump is not installed. mysqldump is required to run the command" end error_list = [] schema_name = (de['schema_name'] || nil) mp = de['mysql_data_entry_preference'] params = [] if !mp['host'].empty? then params << mp['host'] else raise "MySQL `host` is neither defined in the data entry nor the local config file" end params << (mp['port'] or '3306') if !mp['username'].empty? then params << mp['username'] else raise "MySQL `username` is neither defined in the data entry nor the local config file" end params << (mp['password'].to_s.empty? ? "" : "-p#{mp['password']}") if !mp['database'].empty? then params << mp['database'] else raise "`database` is neither defined in the data entry nor the local config file" end if !mp['tables'].empty? then params << mp['tables'].gsub(/,/, ' ') else raise "`tables` (or `tables_append_only`) is neither defined in the data entry nor the local config file" end command = DDL_DUMP_CMD_TEMPLATE % params Open3.popen3(command) do |stdin, stdout, stderr| stdin.close stdout.set_encoding("utf-8") # mysqldump output must be in UTF-8 create_flydata_ctl_table = mp['initial_sync'] while !stdout.eof? begin mysql_tabledef = Flydata::TableDef::MysqlTableDef.create(stdout) rescue TableDefError => e error_list << e.err_hash next end if mysql_tabledef.nil? # stream had no more create table definition break end flydata_tabledef = mysql_tabledef.to_flydata_tabledef puts Flydata::TableDef::RedshiftTableDef.from_flydata_tabledef(flydata_tabledef, flydata_ctl_table: create_flydata_ctl_table, schema_name: schema_name, ctl_only: opts.ctl_only?) create_flydata_ctl_table = false end errors = "" while !stderr.eof? line = stderr.gets.gsub('mysqldump: ', '') errors << line unless /Warning: Using a password on the command line interface can be insecure./ === line end raise errors unless errors.empty? end unless error_list.empty? $stderr.puts "We have noticed the following error(s):" group_error = error_list.group_by {|d| d[:error]} group_error.each_key do |a| $stderr.puts "The following table(s) have #{a}:" group_error[a].each do |hash| $stderr.puts " - #{hash[:table]}" if hash[:table] end end $stderr.puts "Please fix the above error(s) and try again." end end def sync_mysql_to_redshift(de) dp = flydata.data_port.get sync_fm = Flydata::FileUtil::SyncFileManager.new(de) # Check client condition if File.exists?(sync_fm.binlog_path) and de['mysql_data_entry_preference']['initial_sync'] raise "Already synchronized. If you want to do initial sync, run 'flydata sync:reset'" end # Copy template if not exists unless Flydata::Preference::DataEntryPreference.conf_exists?(de) Flydata::Command::Conf.new.copy_templates end if generate_mysqldump(de, sync_fm) sync_fm.save_sync_info(de['mysql_data_entry_preference']['initial_sync'], de['mysql_data_entry_preference']['tables']) parse_mysqldump_and_send(dp, de, sync_fm) complete end end def generate_mysqldump(de, sync_fm, overwrite = false) # validate parameter %w(host username database).each do |k| if de['mysql_data_entry_preference'][k].to_s.empty? raise "'#{k}' is required. Set the value in the conf file " + "-> #{Flydata::Preference::DataEntryPreference.conf_path(de)}" end end fp = sync_fm.dump_file_path if File.exists?(fp) and File.size(fp) > 0 and not overwrite puts " -> Skip" return fp end tables = de['mysql_data_entry_preference']['tables'] tables ||= '' data_servers = de['mysql_data_entry_preference']['data_servers'] ? "\n data servers: #{de['mysql_data_entry_preference']['data_servers']}" : "" confirmation_text = <<-EOM FlyData Sync will start synchonizing the following database tables host: #{de['mysql_data_entry_preference']['host']} port: #{de['mysql_data_entry_preference']['port']} username: #{de['mysql_data_entry_preference']['username']} database: #{de['mysql_data_entry_preference']['database']} tables: #{tables}#{data_servers} dump file: #{fp} Dump file saves contents of your tables temporarily. Make sure you have enough disk space. EOM print confirmation_text if ask_yes_no('Start Sync?') Flydata::Mysql::CompatibilityCheck.new(de['mysql_data_entry_preference'], fp).check puts "Exporting data from database..." Flydata::Mysql::MysqlDumpGeneratorNoMasterData.new(de['mysql_data_entry_preference']).dump(fp) else newline puts "You can change the dump file path with 'mysqldump_path' property in the following conf file." puts puts " #{Flydata::Preference::DataEntryPreference.conf_path(de)}" puts return nil end puts " -> Done" fp end # Checkpoint # -- CHANGE MASTER TO MASTER_LOG_FILE='mysql-bin.000215', MASTER_LOG_POS=120; # <- checkpoint(after binlog) #... #CREATE TABLE `accounts` ( #... #) ENGINE=InnoDB AUTO_INCREMENT=71 DEFAULT CHARSET=utf8; # <- checkpoint(after create table) #INSERT INTO `accounts` values (x,x,x),(y,y,y),....(); #INSERT INTO `accounts` values (x,x,x),(y,y,y),....(); #INSERT INTO `accounts` values (x,x,x),(y,y,y),....(); # <- checkpoint(when buffered data is sent to server) #INSERT INTO `accounts` values (x,x,x),(y,y,y),....(); #INSERT INTO `accounts` values (x,x,x),(y,y,y),....(); # #... #UNLOCK TABLES; # <- checkpoint #... #CREATE TABLE ... def parse_mysqldump_and_send(dp, de, sync_fm) # Prepare forwarder de_tag_name = de["tag_name#{env_suffix}"] server_port = dp['server_port'] servers = if de['mysql_data_entry_preference']['data_servers'] de['mysql_data_entry_preference']['data_servers'].split(',') else dp["servers#{env_suffix}"].collect{|s| "#{s}:#{server_port}"} end forwarder_type = de['mysql_data_entry_preference']['forwarder'] || (dp['ssl_enabled'] ? 'sslforwarder' : 'tcpforwarder') forwarder = Flydata::Output::ForwarderFactory.create(forwarder_type, de_tag_name, servers) # Load dump.pos file for resume dump_pos_info = sync_fm.load_dump_pos option = dump_pos_info || {} if option[:table_name] puts "Resuming... Last processed table: #{option[:table_name]}" else #If its a new sync, ensure server side resources are clean cleanup_sync_server(de, de['mysql_data_entry_preference']['tables'].split(',')) unless opts.skip_cleanup? end puts "Sending data to FlyData Server..." bench_start_time = Time.now # Start parsing dump file tmp_num_inserted_record = 0 dump_fp = sync_fm.dump_file_path dump_file_size = File.size(dump_fp) binlog_pos = Flydata::Mysql::MysqlDumpParser.new(dump_fp, option).parse( # create table Proc.new { |mysql_table| redshift_table = Flydata::Mysql::RedshiftTableAdapter.new(mysql_table) mysql_table.set_adapter(:redshift, redshift_table) tmp_num_inserted_record = 0 if CREATE_TABLE_OPTION print "- Creating table: #{redshift_table.table_name}" sql = redshift_table.create_table_sql ret = flydata.redshift_cluster.run_query(sql) if ret['message'].index('ERROR:') if ret['message'].index('already exists') puts " -> Skip" else raise "Failed to create table. error=#{ret['message']}" end else puts " -> OK" end end # dump mysql_table for resume sync_fm.save_mysql_table_marshal_dump(mysql_table) }, # insert record Proc.new { |mysql_table, values_set| mysql_table_name = mysql_table.table_name records = values_set.collect do |values| json = generate_json(mysql_table, values) {table_name: mysql_table_name, log: json} end ret = forwarder.emit(records) tmp_num_inserted_record += 1 ret }, # checkpoint Proc.new { |mysql_table, last_pos, binlog_pos, state, substate| # flush if buffer records exist if tmp_num_inserted_record > 0 && forwarder.buffer_record_count > 0 forwarder.flush # send buffer data to the server before checkpoint end # show the current progress puts " -> #{(last_pos.to_f/dump_file_size * 100).round(1)}% (#{last_pos}/#{dump_file_size}) completed..." # save check point table_name = mysql_table.nil? ? '' : mysql_table.table_name sync_fm.save_dump_pos(STATUS_PARSING, table_name, last_pos, binlog_pos, state, substate) } ) forwarder.close puts " -> Done" if ENV['FLYDATA_BENCHMARK'] bench_end_time = Time.now elapsed_time = bench_end_time.to_i - bench_start_time.to_i puts "Elapsed:#{elapsed_time}sec start:#{bench_start_time} end:#{bench_end_time}" return true end # wait until finish wait_for_server_data_processing sync_fm.save_dump_pos(STATUS_COMPLETE, '', dump_file_size, binlog_pos) tables = de['mysql_data_entry_preference']['tables'].split(',').join(' ') sync_fm.save_table_binlog_pos(tables, binlog_pos) end ALL_DONE_MESSAGE_TEMPLATE = <<-EOM Congratulations! FlyData has started synchronizing your database tables. What's next? - Check data on Redshift (%s) - Check your FlyData usage on the FlyData Dashboard (%s) - To manage the FlyData Agent, use the 'flydata' command (type 'flydata' for help) - If you encounter an issue, please check our documentation (https://www.flydata.com/docs/) or contact our customer support team (support@flydata.com) Thank you for using FlyData! EOM def complete de = load_sync_info(retrieve_data_entry) sync_fm = Flydata::FileUtil::SyncFileManager.new(de) info = sync_fm.load_dump_pos if info[:status] == STATUS_COMPLETE puts "Starting FlyData Agent..." if de['mysql_data_entry_preference']['initial_sync'] sync_fm.save_binlog(info[:binlog_pos]) end sync_fm.move_table_binlog_files(de['mysql_data_entry_preference']['tables'].split(',')) sync_fm.reset_table_position_files(de['mysql_data_entry_preference']['tables'].split(',')) sync_fm.backup_dump_dir Flydata::Command::Sender.new.start(quiet: true) puts " -> Done" data_port = flydata.data_port.get dashboard_url = "#{flydata.flydata_api_host}/data_ports/#{data_port['id']}" redshift_console_url = "#{flydata.flydata_api_host}/redshift_clusters/query/new" last_message = ALL_DONE_MESSAGE_TEMPLATE % [redshift_console_url, dashboard_url] puts last_message else raise "Initial sync status is not complete. Try running 'flydata sync'." end end def generate_json(mysql_table, values) h = {} mysql_table.columns.each_key.with_index do |k, i| h[k] = values[i] end h.to_json end def override_tables(de, tables) de['mysql_data_entry_preference']['initial_sync'] = tables.empty? if ! de['mysql_data_entry_preference']['initial_sync'] de['mysql_data_entry_preference']['tables'] = tables.join(',') end de end def load_sync_info(de) sync_fm = Flydata::FileUtil::SyncFileManager.new(de) mp = de['mysql_data_entry_preference'] unless (rs = sync_fm.load_sync_info).nil? mp['initial_sync'] = rs[:initial_sync] mp['tables'] = rs[:tables] end de end def flush_buffer_and_stop sender = Flydata::Command::Sender.new sender.flush_client_buffer wait_for_server_data_processing sender.stop(quiet: true) end end end module Output class ForwarderFactory def self.create(forwarder_key, tag, servers, options = {}) case forwarder_key when nil, "tcpforwarder" puts "Creating TCP connection" if FLYDATA_DEBUG forward = TcpForwarder.new(tag, servers, options) when "sslforwarder" puts "Creating SSL connection" if FLYDATA_DEBUG forward = SslForwarder.new(tag, servers, options) else raise "Unsupported Forwarding type #{forwarder_key}" end forward end end class TcpForwarder FORWARD_HEADER = [0x92].pack('C') BUFFER_SIZE = 1024 * 1024 * 32 # 32M DEFUALT_SEND_TIMEOUT = 60 # 1 minute RETRY_INTERVAL = 2 RETRY_LIMIT = 10 def initialize(tag, servers, options = {}) @tag = tag unless servers and servers.kind_of?(Array) and not servers.empty? raise "Servers must not be empty." end @servers = servers @server_index = 0 set_options(options) reset end def set_options(options) if options[:buffer_size_limit] @buffer_size_limit = options[:buffer_size_limit] else @buffer_size_limit = BUFFER_SIZE end end attr_reader :buffer_record_count, :buffer_size def emit(records, time = Time.now.to_i) records = [records] unless records.kind_of?(Array) records.each do |record| event_data = [time,record].to_msgpack @buffer_records << event_data @buffer_record_count += 1 @buffer_size += event_data.bytesize end if @buffer_size > @buffer_size_limit send else false end end #TODO retry logic def send if @buffer_size > 0 else return false end if ENV['FLYDATA_BENCHMARK'] reset return true end sock = nil retry_count = 0 begin sock = connect(pickup_server) # Write header sock.write FORWARD_HEADER # Write tag sock.write @tag.to_msgpack # Write records sock.write [0xdb, @buffer_records.bytesize].pack('CN') StringIO.open(@buffer_records) do |i| FileUtils.copy_stream(i, sock) end rescue => e retry_count += 1 if retry_count > RETRY_LIMIT puts "! Error: Failed to send data. Exceeded the retry limit. retry_count:#{retry_count}" raise e end puts "! Warn: Retring to send data. retry_count:#{retry_count} error=#{e.to_s}" wait_time = RETRY_INTERVAL ** retry_count puts " Now waiting for next retry. time=#{wait_time}sec" sleep wait_time retry ensure if sock sock.close rescue nil end end reset true end #TODO: Check server status def pickup_server ret_server = @servers[@server_index] @server_index += 1 if @server_index >= (@servers.count) @server_index = 0 end ret_server end def connect(server) host, port = server.split(':') sock = TCPSocket.new(host, port.to_i) # Set options opt = [1, DEFUALT_SEND_TIMEOUT].pack('I!I!') sock.setsockopt(Socket::SOL_SOCKET, Socket::SO_LINGER, opt) opt = [DEFUALT_SEND_TIMEOUT, 0].pack('L!L!') sock.setsockopt(Socket::SOL_SOCKET, Socket::SO_SNDTIMEO, opt) sock end def reset @buffer_records = '' @buffer_record_count = 0 @buffer_size = 0 end def flush send end def close flush end end class SslForwarder < TcpForwarder def connect(server) tcp_sock = super ssl_ctx = ssl_ctx_with_verification ssl_sock = OpenSSL::SSL::SSLSocket.new(tcp_sock, ssl_ctx) ssl_sock.sync_close = true ssl_sock.connect ssl_sock end private def ssl_ctx_with_verification cert_store = OpenSSL::X509::Store.new cert_store.set_default_paths ssl_ctx = OpenSSL::SSL::SSLContext.new ssl_ctx.verify_mode = OpenSSL::SSL::VERIFY_PEER ssl_ctx.cert_store = cert_store ssl_ctx end end end module Redshift module Util MAX_TABLENAME_LENGTH = 127 REDSHIFT_RESERVED_WORDS = %w[ aes128 aes256 all allowoverwrite analyse analyze and any array as asc authorization backup between binary blanksasnull both bytedict case cast check collate column constraint create credentials cross current_date current_time current_timestamp current_user current_user_id default deferrable deflate defrag delta delta32k desc disable distinct do else emptyasnull enable encode encrypt encryption end except explicit false for foreign freeze from full globaldict256 globaldict64k grant group gzip having identity ignore ilike in initially inner intersect into is isnull join leading left like limit localtime localtimestamp lun luns minus mostly13 mostly32 mostly8 natural new not notnull null nulls off offline offset old on only open or order outer overlaps parallel partition percent placing primary raw readratio recover references rejectlog resort restore right select session_user similar some sysdate system table tag tdes text255 text32k then to top trailing true truncatecolumns union unique user using verbose wallet when where with without] # Create a symbol-keyed hash for performance REDSHIFT_RESERVED_WORDS_HASH = REDSHIFT_RESERVED_WORDS.inject({}) {|h, word| h[word.to_sym] = true; h} REDSHIFT_SYSTEM_COLUMNS = %w[oid tableoid xmin cmin xmax cmax ctid] REDSHIFT_SYSTEM_COLUMNS_HASH = REDSHIFT_SYSTEM_COLUMNS.inject({}) {|h, word| h[word.to_sym] = true; h} def convert_to_valid_name(key, type = :table) @memo ||= { table:{}, column:{} } key_sym = key.to_sym return @memo[type][key_sym] if @memo[type][key_sym] name = key.downcase.gsub(/[^a-z0-9_$]/, '_') name = "_#{name}" if is_redshift_reserved_word?(name, type) or name =~ /^[0-9$]/ if name.length > MAX_TABLENAME_LENGTH name = nil end @memo[key_sym] = name name end def is_redshift_reserved_word?(name, type = :table) return false unless name return true if REDSHIFT_RESERVED_WORDS_HASH[name.to_sym] == true case type when :table false when :column REDSHIFT_SYSTEM_COLUMNS_HASH[name.to_sym] == true else false end end end end module Mysql class MysqlTable def initialize(table_name, columns = {}, primary_keys = []) @table_name = table_name @columns = columns @primary_keys = primary_keys @adapters = {} end attr_accessor :table_name, :columns, :primary_keys def add_column(column) @columns[column[:column_name]] = column end def set_adapter(key, adapter) @adapters[key] = adapter end def adapter(key) @adapters[key] end end class RedshiftTableAdapter include Flydata::Redshift::Util def initialize(mysql_table) @table_name = convert_to_valid_name(mysql_table.table_name) set_columns(mysql_table.columns) @primary_keys = mysql_table.primary_keys end attr_reader :table_name, :columns, :primary_keys def create_table_sql col_def = @columns.inject([]) { |list, (cn, column)| list << build_column_def(column) list } if @primary_keys.count > 0 col_def << "primary key (#{@primary_keys.join(',')})" end <= "5.5" # FLUSH TABLES table_names,... WITH READ LOCK syntax is supported from MySQL 5.5 result = client.query(USER_TABLES_QUERY) tables = result.collect{|r| r['tables']}.join(", ") end FLUSH_TABLES_QUERY_TEMPLATE % [tables] end VERSION_QUERY = "SHOW VARIABLES LIKE 'version'" def mysql_server_version(client) result = client.query(VERSION_QUERY) result.first['Value'] end end class MysqlDumpParser module State START = 'START' CREATE_TABLE = 'CREATE_TABLE' CREATE_TABLE_COLUMNS = 'CREATE_TABLE_COLUMNS' CREATE_TABLE_CONSTRAINTS = 'CREATE_TABLE_CONSTRAINTS' INSERT_RECORD = 'INSERT_RECORD' PARSING_INSERT_RECORD = 'PARSING_INSERT_RECORD' end attr_accessor :binlog_pos def initialize(file_path, option = {}) @file_path = file_path raise "Dump file does not exist. file_path:#{file_path}" unless File.exist?(file_path) @binlog_pos = option[:binlog_pos] @option = option end def parse(create_table_block, insert_record_block, check_point_block) invalid_file = false current_state = State::START substate = nil state_start = Proc.new do |f| line = f.readline.strip # -- CHANGE MASTER TO MASTER_LOG_FILE='mysql-bin.000002', MASTER_LOG_POS=120; m = /^\-\- CHANGE MASTER TO MASTER_LOG_FILE='(?[^']+)', MASTER_LOG_POS=(?\d+)/.match(line) if m @binlog_pos = {binfile: m[:binfile], pos: m[:pos].to_i} current_state = State::CREATE_TABLE check_point_block.call(nil, f.pos, @binlog_pos, current_state) end end current_table = nil state_create_table = Proc.new do |f| line = f.readline.strip # CREATE TABLE `active_admin_comments` ( m = /^CREATE TABLE `(?[^`]+)`/.match(line) if m current_table = MysqlTable.new(m[:table_name]) current_state = State::CREATE_TABLE_COLUMNS end end state_create_table_constraints = Proc.new do |f| line = f.readline.strip # PRIMARY KEY (`id`), if line.start_with?(')') create_table_block.call(current_table) current_state = State::INSERT_RECORD check_point_block.call(current_table, f.pos, @binlog_pos, current_state) elsif m = /^PRIMARY KEY \((?[^\)]+)\)/.match(line) current_table.primary_keys = m[:primary_keys].split(',').collect do |pk_str| pk_str[1..-2] end end end state_create_table_columns = Proc.new do |f| start_pos = f.pos line = f.readline.strip # `author_type` varchar(255) COLLATE utf8_unicode_ci DEFAULT NULL, if line.start_with?("\`") column = {} # parse column line line = line[0..-2] if line.end_with?(',') items = line.split column[:column_name] = items.shift[1..-2] column[:format_type_str] = format_type_str = items.shift pos = format_type_str.index('(') if pos ft = column[:format_type] = format_type_str[0..pos-1] if ft == 'decimal' precision, scale = format_type_str[pos+1..-2].split(',').collect{|v| v.to_i} column[:decimal_precision] = precision column[:decimal_scale] = scale else column[:format_size] = format_type_str[pos+1..-2].to_i end else column[:format_type] = format_type_str end while (item = items.shift) do case item when 'DEFAULT' value = items.shift value = value.start_with?('\'') ? value[1..-2] : value value = nil if value == 'NULL' column[:default] = value when 'NOT' if items[1] == 'NULL' items.shift column[:not_null] = true end when 'unsigned' column[:unsigned] = true else #ignore other options end end current_table.add_column(column) else current_state = State::CREATE_TABLE_CONSTRAINTS f.pos = start_pos state_create_table_constraints.call(f) end end state_insert_record = Proc.new do |f| original_pos = f.pos command = f.read(6) if command == 'INSERT' current_state = State::PARSING_INSERT_RECORD else f.pos = original_pos f.readline if command == 'UNLOCK' current_state = State::CREATE_TABLE check_point_block.call(current_table, f.pos, @binlog_pos, current_state) end end end state_parsing_insert_record = Proc.new do |f| values_set = InsertParser.new(f).parse current_state = State::INSERT_RECORD if insert_record_block.call(current_table, values_set) check_point_block.call(current_table, f.pos, @binlog_pos, current_state) end end # Start reading file from top File.open(@file_path, 'r', encoding: "utf-8") do |f| last_saved_pos = 0 # resume if @option[:last_pos] f.pos = @option[:last_pos].to_i current_state = @option[:state] substate = @option[:substate] current_table = @option[:mysql_table] end until f.eof? do case current_state when State::START state_start.call(f) when State::CREATE_TABLE state_create_table.call(f) when State::CREATE_TABLE_COLUMNS state_create_table_columns.call(f) when State::CREATE_TABLE_CONSTRAINTS state_create_table_constraints.call(f) when State::INSERT_RECORD state_insert_record.call(f) when State::PARSING_INSERT_RECORD state_parsing_insert_record.call(f) end end end @binlog_pos end # Parse the insert line containing multiple values. (max line size is 1kb) # ex) INSERT INTO `data_entries` VALUES (2,2,'access_log'), (2,3,'access_log2'); class InsertParser #INSERT INTO `data_entries` VALUES (2,2,'access_log'), (2,3,'access_log2'); module State IN_VALUE = 'IN_VALUE' NEXT_VALUES = 'NEXT_VALUES' end def initialize(file) @file = file @values = [] @values_set = [] end def start_ruby_prof RubyProf.start if defined?(RubyProf) and not RubyProf.running? end def stop_ruby_prof if defined?(RubyProf) and RubyProf.running? result = RubyProf.stop #printer = RubyProf::GraphPrinter.new(result) printer = RubyProf::GraphHtmlPrinter.new(result) #printer.print(STDOUT) printer.print(File.new("ruby-prof-out-#{Time.now.to_i}.html", "w"), :min_percent => 3) end end def parse start_ruby_prof bench_start_time = Time.now target_line = @file.readline _parse(target_line) ensure stop_ruby_prof if ENV['FLYDATA_BENCHMARK'] puts " -> time:#{Time.now.to_f - bench_start_time.to_f} size:#{target_line.size}" end end private def _parse(target_line) target_line = target_line.strip start_index = target_line.index('(') target_line = target_line[start_index..-2] # Split insert line text with ',' and take care of ',' inside of the values later. # # We are using the C native method that is like 'split', 'start_with?', 'regexp' # instead of 'String#each_char' and string comparision for the performance. # 'String#each_char' is twice as slow as the current storategy. items = target_line.split(',') index = 0 cur_state = State::NEXT_VALUES loop do case cur_state when State::NEXT_VALUES chars = items[index] break unless chars items[index] = chars[1..-1] cur_state = State::IN_VALUE when State::IN_VALUE chars = items[index] index += 1 if chars.start_with?("'") # single item (not last item) # size check added below otherwise end_with? matches the single quote which was also used by start_with? if chars.size > 1 and chars.end_with?("'") and !last_char_escaped?(chars) @values << replace_escape_char(chars[1..-2]) # single item (last item) # size check added below otherwise end_with? matches the single quote which was also used by start_with? elsif chars.size > 2 and chars.end_with?("')") and !last_char_escaped?(chars[0..-2]) @values << replace_escape_char(chars[1..-3]) @values_set << @values @values = [] cur_state = State::NEXT_VALUES # multi items else cur_value = chars[1..-1] loop do next_chars = items[index] index += 1 if next_chars.end_with?('\'') and !last_char_escaped?(next_chars) cur_value << ',' cur_value << next_chars[0..-2] @values << replace_escape_char(cur_value) break elsif next_chars.end_with?("')") and !last_char_escaped?(next_chars[0..-2]) cur_value << ',' cur_value << next_chars[0..-3] @values << replace_escape_char(cur_value) @values_set << @values @values = [] cur_state = State::NEXT_VALUES break else cur_value << ',' cur_value << next_chars end end end else if chars.end_with?(')') chars = chars[0..-2] @values << (chars == 'NULL' ? nil : remove_leading_zeros(chars)) @values_set << @values @values = [] cur_state = State::NEXT_VALUES else @values << (chars == 'NULL' ? nil : remove_leading_zeros(chars)) end end else raise "Invalid state: #{cur_state}" end end return @values_set end ESCAPE_HASH_TABLE = {"\\\\" => "\\", "\\'" => "'", "\\\"" => "\"", "\\n" => "\n", "\\r" => "\r"} def replace_escape_char(original) original.gsub(/\\\\|\\'|\\"|\\n|\\r/, ESCAPE_HASH_TABLE) end # This method assume that the last character is '(single quotation) # abcd\' -> true # abcd\\' -> false (back slash escape back slash) # abcd\\\' -> true def last_char_escaped?(text) flag = false (text.length - 2).downto(0) do |i| if text[i] == '\\' flag = !flag else break end end flag end def remove_leading_zeros(number_string) if number_string.start_with?('0') number_string.sub(/^0*([1-9][0-9]*(\.\d*)?|0(\.\d*)?)$/,'\1') else number_string end end end end class CompatibilityCheck class CompatibilityError < StandardError end SELECT_QUERY_TMPLT = "SELECT %s" def initialize(de_hash, dump_dir=nil) @db_opts = [:host, :port, :username, :password, :database].inject({}) {|h, sym| h[sym] = de_hash[sym.to_s]; h} @dump_dir = dump_dir @errors=[] end def check self.methods.grep(/^check_/).each do |m| begin send(m) rescue CompatibilityError => e @errors << e end end print_errors end def print_errors return if @errors.empty? puts "There may be some compatibility issues with your MySQL credentials: " @errors.each do |error| puts " * #{error.message}" end raise "Please correct these errors if you wish to run FlyData Sync" end def check_mysql_user_compat client = Mysql2::Client.new(@db_opts) grants_sql = "SHOW GRANTS" correct_db = ["ON (\\*|#{@db_opts[:database]})","TO '#{@db_opts[:username]}"] necessary_permission_fields= ["SELECT","RELOAD","LOCK TABLES","REPLICATION SLAVE","REPLICATION CLIENT"] all_privileges_field= ["ALL PRIVILEGES"] result = client.query(grants_sql) # Do not catch MySQL connection problem because check should stop if no MySQL connection can be made. client.close missing_priv = [] result.each do |res| # SHOW GRANTS should only return one column res_value = res.values.first if correct_db.all? {|perm| res_value.match(perm)} necessary_permission_fields.each do |priv| missing_priv << priv unless res_value.match(priv) end return true if missing_priv.empty? or all_privileges_field.all? {|d| res_value.match(d)} end end raise CompatibilityError, "The user '#{@db_opts[:username]}' does not have the correct permissions to run FlyData Sync\n * These privileges are missing: #{missing_priv.join(", ")}" end def check_mysql_protocol_tcp_compat query = "mysql -u #{@db_opts[:username]} -h #{@db_opts[:host]} -P #{@db_opts[:port]} #{@db_opts[:database]} -e \"SHOW GRANTS;\" --protocol=tcp" query << " -p#{@db_opts[:password]}" unless @db_opts[:password].to_s.empty? Open3.popen3(query) do |stdin, stdout, stderr| stdin.close while !stderr.eof? line = stderr.gets unless /Warning: Using a password on the command line interface can be insecure./ === line raise CompatibilityError, "Cannot connect to MySQL database. Please make sure you can connect with this command:\n $ mysql -u #{@db_opts[:username]} -h #{@db_opts[:host]} -P #{@db_opts[:port]} #{@db_opts[:database]} --protocol=tcp -p" end end end end def check_mysql_row_mode_compat sys_var_to_check = {'@@binlog_format'=>'ROW', '@@binlog_checksum'=>'NONE', '@@log_bin_use_v1_row_events'=>1} errors={} client = Mysql2::Client.new(@db_opts) begin sys_var_to_check.each_key do |sys_var| sel_query = SELECT_QUERY_TMPLT % sys_var begin result = client.query(sel_query) unless result.first[sys_var] == sys_var_to_check[sys_var] errors[sys_var]=result.first[sys_var] end rescue Mysql2::Error => e if e.message =~ /Unknown system variable/ unless e.message =~ /(binlog_checksum|log_bin_use_v1_row_events)/ errors[sys_var] = false end else raise e end end end ensure client.close end unless errors.empty? error_explanation = "" errors.each_key do |err_key| error_explanation << "\n * #{err_key} is #{errors[err_key]} but should be #{sys_var_to_check[err_key]}" end raise CompatibilityError, "These system variable(s) are not the correct value: #{error_explanation}\n Please change these system variables for FlyData Sync to run correctly" end end def check_writing_permissions write_errors = [] paths_to_check = ["~/.flydata"] paths_to_check << @dump_dir unless @dump_dir.to_s.empty? paths_to_check.each do |path| full_path = File.expand_path(path) full_path = File.dirname(full_path) unless File.directory?(full_path) write_errors << full_path unless File.writable?(full_path) end unless write_errors.empty? error_dir = write_errors.join(", ") raise CompatibilityError, "We cannot access the directories: #{error_dir}" end end end end end