require 'msgpack' require 'open3' require 'flydata/sync_file_manager' #require 'ruby-prof' module Flydata module Command class Sync < Base include Helpers CREATE_TABLE_OPTION = !!(ENV['FLYDATA_CREATE_TABLE_OPTION']) || false INSERT_PROGRESS_INTERVAL = 1000 # for dump.pos file STATUS_PARSING = 'PARSING' STATUS_COMPLETE = 'COMPLETE' def run de = retrieve_data_entries.first raise "There are no data entry." unless de case de['type'] when 'RedshiftMysqlDataEntry' sync_mysql_to_redshift(de) else raise "No supported data entry. Only mysql-redshift sync is supported." end end def reset de = retrieve_data_entries.first sync_fm = Flydata::FileUtil::SyncFileManager.new(de) [sync_fm.dump_file_path, sync_fm.dump_pos_path, sync_fm.binlog_path, sync_fm.mysql_table_marshal_dump_path, sync_fm.table_position_file_paths].flatten.each do |path| FileUtils.rm(path) if File.exists?(path) end end def check de = retrieve_data_entries.first ret = do_check(de) if ret['complete'] puts "No buffer data on FlyData. #{ret.inspect}" true else puts "Now processing data on FlyData. #{ret.inspect}" false end end def complete de = retrieve_data_entries.first sync_fm = Flydata::FileUtil::SyncFileManager.new(de) info = sync_fm.load_dump_pos if info[:status] == STATUS_COMPLETE sync_fm.save_binlog(info[:binlog_pos]) Flydata::Command::Sender.new.start else raise "Initial sync status is not complete. Try running 'flydata sync'." end end # skip initial sync def skip de = retrieve_data_entries.first sync_fm = Flydata::FileUtil::SyncFileManager.new(de) binlog_path = sync_fm.binlog_path `touch #{binlog_path}` puts "Created an empty binlog position file." puts "-> #{binlog_path}" puts "Run 'flydata start' to start continuous sync." end private def do_check(de) flydata.data_entry.buffer_stat(de['id'], env_mode) end def sync_mysql_to_redshift(de) dp = flydata.data_port.get sync_fm = Flydata::FileUtil::SyncFileManager.new(de) # Check client condition if File.exists?(sync_fm.binlog_path) raise "Already synchronized. If you want to do initial sync, delete #{sync_fm.binlog_path}." end # Copy template if not exists unless Flydata::Preference::DataEntryPreference.conf_exists?(de) Flydata::Command::Conf.new.copy_templates end if generate_mysqldump(de, sync_fm) parse_mysqldump(dp, de, sync_fm) end end def generate_mysqldump(de, sync_fm, overwrite = false) # validate parameter %w(host username database).each do |k| if de['mysql_data_entry_preference'][k].to_s.empty? raise "'#{k}' is required. Set the value in the conf file " + "-> #{Flydata::Preference::DataEntryPreference.conf_path(de)}" end end puts "Running mysqldump... host:#{de['mysql_data_entry_preference']['host']} " + "username:#{de['mysql_data_entry_preference']['username']} " + "database:#{de['mysql_data_entry_preference']['database']}" if de['mysql_data_entry_preference']['data_servers'] puts "Send to Custom Data Servers: #{de['mysql_data_entry_preference']['data_servers']}" end if de['mysql_data_entry_preference']['tables'] puts " target tables: #{de['mysql_data_entry_preference']['tables']}" else puts " target tables: <all-tables>" end fp = sync_fm.dump_file_path if File.exists?(fp) and File.size(fp) > 0 and not overwrite puts " -> Skip" return fp end puts "[Confirm] mysqldump path: #{fp}" if ask_yes_no('OK?') Flydata::Mysql::MysqlDumpGenerator.new(de['mysql_data_entry_preference']).dump(fp) else newline puts "You can change the mysqldump path with 'mysqldump_path' in the conf file." puts "Edit '#{Flydata::Preference::DataEntryPreference.conf_path(de)}'" return nil end puts " -> Done" fp end # Checkpoint # -- CHANGE MASTER TO MASTER_LOG_FILE='mysql-bin.000215', MASTER_LOG_POS=120; # <- checkpoint(after binlog) #... #CREATE TABLE `accounts` ( #... #) ENGINE=InnoDB AUTO_INCREMENT=71 DEFAULT CHARSET=utf8; # <- checkpoint(after create table) #INSERT INTO `accounts` values (x,x,x),(y,y,y),....(); #INSERT INTO `accounts` values (x,x,x),(y,y,y),....(); #INSERT INTO `accounts` values (x,x,x),(y,y,y),....(); # <- checkpoint(when buffered data is sent to server) #INSERT INTO `accounts` values (x,x,x),(y,y,y),....(); #INSERT INTO `accounts` values (x,x,x),(y,y,y),....(); # #... #UNLOCK TABLES; # <- checkpoint #... #CREATE TABLE ... def parse_mysqldump(dp, de, sync_fm) puts "Parsing mysqldump file..." # Prepare forwarder de_tag_name = de["tag_name#{env_suffix}"] server_port = dp['server_port'] servers = if de['mysql_data_entry_preference']['data_servers'] de['mysql_data_entry_preference']['data_servers'].split(',') else dp["servers#{env_suffix}"].collect{|s| "#{s}:#{server_port}"} end forwarder_type = de['mysql_data_entry_preference']['forwarder'] || (dp['ssl_enabled'] ? 'sslforwarder' : 'tcpforwarder') forwarder = Flydata::Output::ForwarderFactory.create(forwarder_type, de_tag_name, servers) # Load dump.pos file for resume dump_pos_info = sync_fm.load_dump_pos option = dump_pos_info || {} if option[:table_name] puts "Resuming... Last processed table: #{option[:table_name]}" end bench_start_time = Time.now # Start parsing dump file tmp_num_inserted_record = 0 dump_fp = sync_fm.dump_file_path dump_file_size = File.size(dump_fp) binlog_pos = Flydata::Mysql::MysqlDumpParser.new(dump_fp, option).parse( # create table Proc.new { |mysql_table| redshift_table = Flydata::Mysql::RedshiftTableAdapter.new(mysql_table) mysql_table.set_adapter(:redshift, redshift_table) tmp_num_inserted_record = 0 if CREATE_TABLE_OPTION print "- Creating table: #{redshift_table.table_name}" sql = redshift_table.create_table_sql ret = flydata.redshift_cluster.run_query(sql) if ret['message'].index('ERROR:') if ret['message'].index('already exists') puts " -> Skip" else raise "Failed to create table. error=#{ret['message']}" end else puts " -> OK" end else puts "- Parsing table: #{mysql_table.table_name}" end # dump mysql_table for resume sync_fm.save_mysql_table_marshal_dump(mysql_table) }, # insert record Proc.new { |mysql_table, values_set| mysql_table_name = mysql_table.table_name records = values_set.collect do |values| json = generate_json(mysql_table, values) {table_name: mysql_table_name, log: json} end ret = forwarder.emit(records) tmp_num_inserted_record += 1 print '.' ret }, # checkpoint Proc.new { |mysql_table, last_pos, binlog_pos, state, substate| # flush if buffer records exist if tmp_num_inserted_record > 0 && forwarder.buffer_record_count > 0 puts forwarder.flush # send buffer data to the server before checkpoint end # show the current progress puts " #{(last_pos.to_f/dump_file_size * 100).round(1)}% (#{last_pos}/#{dump_file_size}) #{Time.now.to_i - bench_start_time.to_i}sec" # save check point table_name = mysql_table.nil? ? '' : mysql_table.table_name sync_fm.save_dump_pos(STATUS_PARSING, table_name, last_pos, binlog_pos, state, substate) } ) forwarder.close if ENV['FLYDATA_BENCHMARK'] puts "Done!" bench_end_time = Time.now elapsed_time = bench_end_time.to_i - bench_start_time.to_i puts "Elapsed:#{elapsed_time}sec start:#{bench_start_time} end:#{bench_end_time}" return true end # wait until finish puts "Start waiting until all data is processed on FlyData..." sleep 10 until check sleep 10 end sync_fm.save_dump_pos(STATUS_COMPLETE, '', dump_file_size, binlog_pos) puts "Congratulations! All data is processed on FlyData. Please check tables and data on your Redshift Cluster." puts "After checking, run 'flydata sync:complete' to start continuously synchronization." end def generate_json(mysql_table, values) h = {} mysql_table.columns.each_key.with_index do |k, i| h[k] = values[i] end h.to_json end end end module Output class ForwarderFactory def self.create(forwarder_key, tag, servers, options = {}) case forwarder_key when nil, "tcpforwarder" puts "Creating TCP connection" forward = TcpForwarder.new(tag, servers, options) when "sslforwarder" puts "Creating SSL connection" forward = SslForwarder.new(tag, servers, options) else raise "Unsupported Forwarding type #{forwarder_key}" end forward end end class TcpForwarder FORWARD_HEADER = [0x92].pack('C') BUFFER_SIZE = 1024 * 1024 * 32 # 32M DEFUALT_SEND_TIMEOUT = 60 # 1 minute RETRY_INTERVAL = 2 RETRY_LIMIT = 10 def initialize(tag, servers, options = {}) @tag = tag unless servers and servers.kind_of?(Array) and not servers.empty? raise "Servers must not be empty." end @servers = servers @server_index = 0 set_options(options) reset end def set_options(options) if options[:buffer_size_limit] @buffer_size_limit = options[:buffer_size_limit] else @buffer_size_limit = BUFFER_SIZE end end attr_reader :buffer_record_count, :buffer_size def emit(records, time = Time.now.to_i) records = [records] unless records.kind_of?(Array) records.each do |record| event_data = [time,record].to_msgpack @buffer_records << event_data @buffer_record_count += 1 @buffer_size += event_data.bytesize end if @buffer_size > @buffer_size_limit send else false end end #TODO retry logic def send if @buffer_size > 0 puts " -> Sending #{@buffer_record_count}records #{@buffer_size}byte" else return false end if ENV['FLYDATA_BENCHMARK'] reset return true end sock = nil retry_count = 0 begin sock = connect(pickup_server) # Write header sock.write FORWARD_HEADER # Write tag sock.write @tag.to_msgpack # Write records sock.write [0xdb, @buffer_records.bytesize].pack('CN') StringIO.open(@buffer_records) do |i| FileUtils.copy_stream(i, sock) end rescue => e retry_count += 1 if retry_count > RETRY_LIMIT puts "! Error: Failed to send data. Exceeded the retry limit. retry_count:#{retry_count}" raise e end puts "! Warn: Retring to send data. retry_count:#{retry_count} error=#{e.to_s}" wait_time = RETRY_INTERVAL ** retry_count puts " Now waiting for next retry. time=#{wait_time}sec" sleep wait_time retry ensure if sock sock.close rescue nil end end reset true end #TODO: Check server status def pickup_server ret_server = @servers[@server_index] @server_index += 1 if @server_index >= (@servers.count) @server_index = 0 end ret_server end def connect(server) host, port = server.split(':') sock = TCPSocket.new(host, port.to_i) # Set options opt = [1, DEFUALT_SEND_TIMEOUT].pack('I!I!') sock.setsockopt(Socket::SOL_SOCKET, Socket::SO_LINGER, opt) opt = [DEFUALT_SEND_TIMEOUT, 0].pack('L!L!') sock.setsockopt(Socket::SOL_SOCKET, Socket::SO_SNDTIMEO, opt) sock end def reset @buffer_records = '' @buffer_record_count = 0 @buffer_size = 0 end def flush send end def close flush end end class SslForwarder < TcpForwarder def connect(server) tcp_sock = super ssl_ctx = ssl_ctx_with_verification ssl_sock = OpenSSL::SSL::SSLSocket.new(tcp_sock, ssl_ctx) ssl_sock.sync_close = true ssl_sock.connect ssl_sock end private def ssl_ctx_with_verification cert_store = OpenSSL::X509::Store.new cert_store.set_default_paths ssl_ctx = OpenSSL::SSL::SSLContext.new ssl_ctx.verify_mode = OpenSSL::SSL::VERIFY_PEER ssl_ctx.cert_store = cert_store ssl_ctx end end end module Redshift module Util MAX_TABLENAME_LENGTH = 127 REDSHIFT_RESERVED_WORDS = %w[ aes128 aes256 all allowoverwrite analyse analyze and any array as asc authorization backup between binary blanksasnull both bytedict case cast check collate column constraint create credentials cross current_date current_time current_timestamp current_user current_user_id default deferrable deflate defrag delta delta32k desc disable distinct do else emptyasnull enable encode encrypt encryption end except explicit false for foreign freeze from full globaldict256 globaldict64k grant group gzip having identity ignore ilike in initially inner intersect into is isnull join leading left like limit localtime localtimestamp lun luns minus mostly13 mostly32 mostly8 natural new not notnull null nulls off offline offset old on only open or order outer overlaps parallel partition percent placing primary raw readratio recover references rejectlog resort restore right select session_user similar some sysdate system table tag tdes text255 text32k then to top trailing true truncatecolumns union unique user using verbose wallet when where with without] # Create a symbol-keyed hash for performance REDSHIFT_RESERVED_WORDS_HASH = REDSHIFT_RESERVED_WORDS.inject({}) {|h, word| h[word.to_sym] = true; h} REDSHIFT_SYSTEM_COLUMNS = %w[oid tableoid xmin cmin xmax cmax ctid] REDSHIFT_SYSTEM_COLUMNS_HASH = REDSHIFT_SYSTEM_COLUMNS.inject({}) {|h, word| h[word.to_sym] = true; h} def convert_to_valid_name(key, type = :table) @memo ||= { table:{}, column:{} } key_sym = key.to_sym return @memo[type][key_sym] if @memo[type][key_sym] name = key.downcase.gsub(/[^a-z0-9_$]/, '_') name = "_#{name}" if is_redshift_reserved_word?(name, type) or name =~ /^[0-9$]/ if name.length > MAX_TABLENAME_LENGTH name = nil end @memo[key_sym] = name name end def is_redshift_reserved_word?(name, type = :table) return false unless name return true if REDSHIFT_RESERVED_WORDS_HASH[name.to_sym] == true case type when :table false when :column REDSHIFT_SYSTEM_COLUMNS_HASH[name.to_sym] == true else false end end end end module Mysql class MysqlTable def initialize(table_name, columns = {}, primary_keys = []) @table_name = table_name @columns = columns @primary_keys = primary_keys @adapters = {} end attr_accessor :table_name, :columns, :primary_keys def add_column(column) @columns[column[:column_name]] = column end def set_adapter(key, adapter) @adapters[key] = adapter end def adapter(key) @adapters[key] end end class RedshiftTableAdapter include Flydata::Redshift::Util def initialize(mysql_table) @table_name = convert_to_valid_name(mysql_table.table_name) set_columns(mysql_table.columns) @primary_keys = mysql_table.primary_keys end attr_reader :table_name, :columns, :primary_keys def create_table_sql col_def = @columns.inject([]) { |list, (cn, column)| list << build_column_def(column) list } if @primary_keys.count > 0 col_def << "primary key (#{@primary_keys.join(',')})" end <<EOT CREATE TABLE #{@table_name} (#{col_def.join(',')}); EOT end private def set_columns(columns) @columns = {} columns.each do |k, column| new_k = convert_to_valid_name(k, :column) new_column = column.dup new_column[:column_name] = new_k @columns[new_k] = convert_column_format_type(new_column) end end # Mysql Field Types # http://help.scibit.com/mascon/masconMySQL_Field_Types.html def convert_column_format_type(column) ret_c = {}.merge(column) ret_c.delete(:format_type_str) ret_c[:format_type] = case column[:format_type] when 'tinyint' 'smallint' when 'smallint' column[:unsigned] ? 'integer' : 'smallint' when 'mediumint' 'integer' when 'int', 'integer' column[:unsigned] ? 'bigint' : 'integer' when 'bigint' # max unsigned bigint is 18446744073709551615 column[:unsigned] ? 'decimal(20,0)' : 'bigint' when 'float' 'real' when 'double', 'double precision', 'real' 'double precision' when 'decimal', 'numeric' ret_c[:format_type_str] = "decimal(#{column[:decimal_precision]},#{column[:decimal_scale]})" 'decimal' when 'date' 'date' when 'datetime' 'timestamp' when 'time' 'timestamp' #TODO: redshift does not support time only column type when 'year' 'smallint' when 'char' ret_c[:format_type_str] = "char(#{column[:format_size]})" 'char' when 'varchar' ret_c[:format_type_str] = "varchar(#{column[:format_size]})" 'varchar' when 'tinyblob','tinytext' ret_c[:format_size] = 255 ret_c[:format_type_str] = "varchar(#{ret_c[:format_size]})" 'varchar' when 'blob','text', 'mediumblob', 'mediumtext', 'longblob', 'longtext' ret_c[:format_size] = 65535 #TODO: review ret_c[:format_type_str] = "varchar(#{ret_c[:format_size]})" 'varchar' else #TODO: discuss 'varchar' end ret_c end def build_column_def(column) format_type = column[:format_type] format_type = column[:format_type_str] if column[:format_type_str] def_str = "#{column[:column_name]} #{format_type}" if column[:not_null] def_str << " not null" elsif column.has_key?(:default) val = column[:default] val = val.nil? ? 'null' : "'#{val}'" def_str << " default #{val}" end def_str end end class MysqlDumpGenerator # host, port, username, password, database, tables MYSQL_DUMP_CMD_TEMPLATE = "mysqldump -h %s -P %s -u%s %s --skip-lock-tables --single-transaction --flush-logs --hex-blob --master-data=2 %s %s" def initialize(conf) password = conf['password'].to_s.empty? ? "" : "-p#{conf['password']}" tables = if conf['tables'] conf['tables'].split(',').join(' ') else '' end @dump_cmd = MYSQL_DUMP_CMD_TEMPLATE % [conf['host'], conf['port'], conf['username'], password, conf['database'], tables] end def dump(file_path) cmd = "#{@dump_cmd} > #{file_path}" o, e, s = Open3.capture3(cmd) e.to_s.each_line {|l| puts l unless /^Warning:/ =~ l } unless e.to_s.empty? unless s.exitstatus == 0 if File.exists?(file_path) File.open(file_path, 'r') {|f| f.each_line{|l| puts l}} FileUtils.rm(file_path) end raise "Failed to run mysqldump command." end unless File.exists?(file_path) raise "mysqldump file does not exist. Something wrong..." end if File.size(file_path) == 0 raise "mysqldump file is empty. Something wrong..." end true end end class MysqlDumpParser module State START = 'START' CREATE_TABLE = 'CREATE_TABLE' CREATE_TABLE_COLUMNS = 'CREATE_TABLE_COLUMNS' CREATE_TABLE_CONSTRAINTS = 'CREATE_TABLE_CONSTRAINTS' INSERT_RECORD = 'INSERT_RECORD' PARSING_INSERT_RECORD = 'PARSING_INSERT_RECORD' end attr_accessor :binlog_pos def initialize(file_path, option = {}) @file_path = file_path raise "Dump file does not exist. file_path:#{file_path}" unless File.exist?(file_path) @binlog_pos = option[:binlog_pos] @option = option end def parse(create_table_block, insert_record_block, check_point_block) invalid_file = false current_state = State::START substate = nil state_start = Proc.new do |f| line = f.readline.strip # -- CHANGE MASTER TO MASTER_LOG_FILE='mysql-bin.000002', MASTER_LOG_POS=120; m = /^\-\- CHANGE MASTER TO MASTER_LOG_FILE='(?<binfile>[^']+)', MASTER_LOG_POS=(?<pos>\d+)/.match(line) if m @binlog_pos = {binfile: m[:binfile], pos: m[:pos].to_i} current_state = State::CREATE_TABLE check_point_block.call(nil, f.pos, @binlog_pos, current_state) end end current_table = nil state_create_table = Proc.new do |f| line = f.readline.strip # CREATE TABLE `active_admin_comments` ( m = /^CREATE TABLE `(?<table_name>[^`]+)`/.match(line) if m current_table = MysqlTable.new(m[:table_name]) current_state = State::CREATE_TABLE_COLUMNS end end state_create_table_constraints = Proc.new do |f| line = f.readline.strip # PRIMARY KEY (`id`), if line.start_with?(')') create_table_block.call(current_table) current_state = State::INSERT_RECORD check_point_block.call(current_table, f.pos, @binlog_pos, current_state) elsif m = /^PRIMARY KEY \((?<primary_keys>[^\)]+)\)/.match(line) current_table.primary_keys = m[:primary_keys].split(',').collect do |pk_str| pk_str[1..-2] end end end state_create_table_columns = Proc.new do |f| start_pos = f.pos line = f.readline.strip # `author_type` varchar(255) COLLATE utf8_unicode_ci DEFAULT NULL, if line.start_with?("\`") column = {} # parse column line line = line[0..-2] if line.end_with?(',') items = line.split column[:column_name] = items.shift[1..-2] column[:format_type_str] = format_type_str = items.shift pos = format_type_str.index('(') if pos ft = column[:format_type] = format_type_str[0..pos-1] if ft == 'decimal' precision, scale = format_type_str[pos+1..-2].split(',').collect{|v| v.to_i} column[:decimal_precision] = precision column[:decimal_scale] = scale else column[:format_size] = format_type_str[pos+1..-2].to_i end else column[:format_type] = format_type_str end while (item = items.shift) do case item when 'DEFAULT' value = items.shift value = value.start_with?('\'') ? value[1..-2] : value value = nil if value == 'NULL' column[:default] = value when 'NOT' if items[1] == 'NULL' items.shift column[:not_null] = true end when 'unsigned' column[:unsigned] = true else #ignore other options end end current_table.add_column(column) else current_state = State::CREATE_TABLE_CONSTRAINTS f.pos = start_pos state_create_table_constraints.call(f) end end state_insert_record = Proc.new do |f| original_pos = f.pos command = f.read(6) if command == 'INSERT' current_state = State::PARSING_INSERT_RECORD else f.pos = original_pos f.readline if command == 'UNLOCK' current_state = State::CREATE_TABLE check_point_block.call(current_table, f.pos, @binlog_pos, current_state) end end end state_parsing_insert_record = Proc.new do |f| values_set = InsertParser.new(f).parse current_state = State::INSERT_RECORD if insert_record_block.call(current_table, values_set) check_point_block.call(current_table, f.pos, @binlog_pos, current_state) end end # Start reading file from top File.open(@file_path, 'r') do |f| last_saved_pos = 0 # resume if @option[:last_pos] f.pos = @option[:last_pos].to_i current_state = @option[:state] substate = @option[:substate] current_table = @option[:mysql_table] end until f.eof? do case current_state when State::START state_start.call(f) when State::CREATE_TABLE state_create_table.call(f) when State::CREATE_TABLE_COLUMNS state_create_table_columns.call(f) when State::CREATE_TABLE_CONSTRAINTS state_create_table_constraints.call(f) when State::INSERT_RECORD state_insert_record.call(f) when State::PARSING_INSERT_RECORD state_parsing_insert_record.call(f) end end end @binlog_pos end class InsertParser #INSERT INTO `data_entries` VALUES (2,2,'access_log'), (2,3,'access_log2'); module State IN_VALUE = 'IN_VALUE' NEXT_VALUES = 'NEXT_VALUES' end def initialize(file) @file = file @values = [] @values_set = [] end def start_ruby_prof RubyProf.start if defined?(RubyProf) and not RubyProf.running? end def stop_ruby_prof if defined?(RubyProf) and RubyProf.running? result = RubyProf.stop #printer = RubyProf::GraphPrinter.new(result) printer = RubyProf::GraphHtmlPrinter.new(result) #printer.print(STDOUT) printer.print(File.new("ruby-prof-out-#{Time.now.to_i}.html", "w"), :min_percent => 3) end end def parse start_ruby_prof bench_start_time = Time.now target_line = @file.readline _parse(target_line) ensure stop_ruby_prof if ENV['FLYDATA_BENCHMARK'] puts " -> time:#{Time.now.to_f - bench_start_time.to_f} size:#{target_line.size}" end end private def _parse(target_line) target_line = target_line.strip start_index = target_line.index('(') target_line = target_line[start_index..-2] items = target_line.split(',') index = 0 cur_state = State::NEXT_VALUES loop do case cur_state when State::NEXT_VALUES chars = items[index] break unless chars items[index] = chars[1..-1] cur_state = State::IN_VALUE when State::IN_VALUE chars = items[index] index += 1 if chars.start_with?("'") # single item (not last item) if chars.end_with?("'") and !last_char_escaped?(chars) @values << replace_escape_char(chars[1..-2]) # single item (last item) elsif chars.end_with?("')") and !last_char_escaped?(chars[0..-2]) @values << replace_escape_char(chars[1..-3]) @values_set << @values @values = [] cur_state = State::NEXT_VALUES # multi items else cur_value = chars[1..-1] loop do next_chars = items[index] index += 1 if next_chars.end_with?('\'') and !last_char_escaped?(next_chars) cur_value << ',' cur_value << next_chars[0..-2] @values << replace_escape_char(cur_value) break elsif next_chars.end_with?("')") and !last_char_escaped?(next_chars[0..-2]) cur_value << ',' cur_value << next_chars[0..-3] @values << replace_escape_char(cur_value) @values_set << @values @values = [] cur_state = State::NEXT_VALUES break else cur_value << ',' cur_value << next_chars end end end else if chars.end_with?(')') chars = chars[0..-2] @values << (chars == 'NULL' ? nil : chars) @values_set << @values @values = [] cur_state = State::NEXT_VALUES else @values << (chars == 'NULL' ? nil : chars) end end else raise "Invalid state: #{cur_state}" end end return @values_set end ESCAPE_HASH_TABLE = {"\\\\" => "\\", "\\'" => "'", "\\n" => "\n", "\\r" => "\r"} def replace_escape_char(original) original.gsub(/\\\\|\\'|\\n|\\r/, ESCAPE_HASH_TABLE) end # This method assume that the last character is '(single quotation) # abcd\' -> true # abcd\\' -> false (back slash escape back slash) # abcd\\\' -> true def last_char_escaped?(text) flag = false (text.length - 2).downto(0) do |i| if text[i] == '\\' flag = !flag else break end end flag end end end end end