# coding: utf-8 require 'spec_helper' require 'socket' module Flydata module Command describe Sync do let(:default_mysqldump_dir) do File.join('/tmp', "sync_dump_#{Time.now.to_i}") end let(:default_data_entry) do {"id"=>93, "name"=>"flydata_sync_mysql", "data_port_id"=>52, "display_name"=>"flydata_sync_mysql", "enabled"=>true, "heroku_log_type"=>nil, "heroku_resource_id"=>nil, "log_deletion"=>nil, "log_file_delimiter"=>nil, "log_file_type"=>nil, "log_path"=>nil, "redshift_schema_name"=>"", "redshift_table_name"=>nil, "created_at"=>"2014-01-22T18:58:43Z", "updated_at"=>"2014-01-30T02:42:26Z", "type"=>"RedshiftMysqlDataEntry", "tag_name"=>"flydata.a458c641_dp52.flydata_mysql", "tag_name_dev"=>"flydata.a458c641_dp52.flydata_mysql.dev", "data_port_key"=>"a458c641", "mysql_data_entry_preference" => { "host"=>"localhost", "port"=>3306, "username"=>"masashi", "password"=>"welcome", "database"=>"sync_test", "tables"=>"table1, table2", "mysqldump_dir"=>default_mysqldump_dir, "forwarder" => "tcpforwarder", "data_servers"=>"localhost:9905" } } end after :each do if Dir.exists?(default_mysqldump_dir) Dir.delete(default_mysqldump_dir) rescue nil end if File.exists?(default_mysqldump_dir) File.delete(default_mysqldump_dir) rescue nil end end describe '#do_generate_table_ddl' do subject { Sync.new } context 'with full options' do it 'issues mysqldump command with expected parameters' do expect(IO).to receive(:popen).with( 'mysqldump -d -h localhost -P 3306 -u masashi -pwelcome sync_test table1 table2', 'r').and_call_original subject.send(:do_generate_table_ddl, default_data_entry) end end context 'without_host' do before do default_data_entry['mysql_data_entry_preference'].delete('host') end it "throws an error" do expect { subject.send(:do_generate_table_ddl, default_data_entry) }.to raise_error end end context 'without_port' do before do default_data_entry['mysql_data_entry_preference'].delete('port') end it "uses the default port" do expect(IO).to receive(:popen).with( 'mysqldump -d -h localhost -P 3306 -u masashi -pwelcome sync_test table1 table2', 'r').and_call_original subject.send(:do_generate_table_ddl, default_data_entry) end end context 'with_port_override' do before do default_data_entry['mysql_data_entry_preference']['port'] = 1234 end it "uses the specified port" do expect(IO).to receive(:popen).with( 'mysqldump -d -h localhost -P 1234 -u masashi -pwelcome sync_test table1 table2', 'r').and_call_original subject.send(:do_generate_table_ddl, default_data_entry) end end context 'without_username' do before do default_data_entry['mysql_data_entry_preference'].delete('username') end it "throws an error" do expect { subject.send(:do_generate_table_ddl, default_data_entry) }.to raise_error end end context 'without_password' do before do default_data_entry['mysql_data_entry_preference'].delete('password') end it "call mysqldump without -p option" do expect(IO).to receive(:popen).with( 'mysqldump -d -h localhost -P 3306 -u masashi sync_test table1 table2', 'r').and_call_original subject.send(:do_generate_table_ddl, default_data_entry) end end context 'without_database' do before do default_data_entry['mysql_data_entry_preference'].delete('database') end it "throws an error" do expect { subject.send(:do_generate_table_ddl, default_data_entry) }.to raise_error end end context 'without_tables' do before do default_data_entry['mysql_data_entry_preference'].delete('tables') end it "throws an error" do expect { subject.send(:do_generate_table_ddl, default_data_entry) }.to raise_error end end end end end module FileUtil describe SyncFileManager do let(:default_mysqldump_dir) do File.join('/tmp', "sync_dump_#{Time.now.to_i}") end let(:default_data_entry) do {"id"=>93, "name"=>"flydata_sync_mysql", "data_port_id"=>52, "display_name"=>"flydata_sync_mysql", "enabled"=>true, "heroku_log_type"=>nil, "heroku_resource_id"=>nil, "log_deletion"=>nil, "log_file_delimiter"=>nil, "log_file_type"=>nil, "log_path"=>nil, "redshift_schema_name"=>"", "redshift_table_name"=>nil, "created_at"=>"2014-01-22T18:58:43Z", "updated_at"=>"2014-01-30T02:42:26Z", "type"=>"RedshiftMysqlDataEntry", "tag_name"=>"flydata.a458c641_dp52.flydata_mysql", "tag_name_dev"=>"flydata.a458c641_dp52.flydata_mysql.dev", "data_port_key"=>"a458c641", "mysql_data_entry_preference" => { "host"=>"localhost", "port"=>3306, "username"=>"masashi", "password"=>"welcome", "database"=>"sync_test", "tables"=>nil, "mysqldump_dir"=>default_mysqldump_dir, "forwarder" => "tcpforwarder", "data_servers"=>"localhost:9905" } } end let(:default_sfm) { SyncFileManager.new(default_data_entry) } let (:status) { 'PARSING' } let (:table_name) { 'test_table' } let (:last_pos) { 9999 } let (:binlog_pos) { {binfile: 'mysqlbin.00001', pos: 111} } let (:state) { 'CREATE_TABLE' } let (:substate) { nil } after :each do if Dir.exists?(default_mysqldump_dir) Dir.delete(default_mysqldump_dir) rescue nil end if File.exists?(default_mysqldump_dir) File.delete(default_mysqldump_dir) rescue nil end end describe '#dump_file_path' do context 'when mysqldump_dir param is nil' do before do default_data_entry['mysql_data_entry_preference'].delete('mysqldump_dir') end it do expect(default_sfm.dump_file_path).to eq( File.join(ENV['HOME'], '.flydata', 'dump', 'flydata_sync_mysql.dump')) end end context 'when file exists' do before { `touch #{default_mysqldump_dir}`} it do expect{default_sfm.dump_file_path}.to raise_error end end context 'when directory exists' do before { `mkdir -p #{default_mysqldump_dir}`} it do expect(FileUtils).to receive(:mkdir_p).with(default_mysqldump_dir).never expect(default_sfm.dump_file_path).to eq( File.join(default_mysqldump_dir, 'flydata_sync_mysql.dump')) end end context 'when directory or file does not exist' do it do expect(FileUtils).to receive(:mkdir_p).with(default_mysqldump_dir).once expect(default_sfm.dump_file_path).to eq( File.join(default_mysqldump_dir, 'flydata_sync_mysql.dump')) end end context 'when file name includes "~"' do let(:default_mysqldump_dir) { "~/tmp/dump/sync_spec_#{Time.now.to_i}" } it do expected_dir = File.join(ENV['HOME'], default_mysqldump_dir[1..-1]) expect(FileUtils).to receive(:mkdir_p).with(expected_dir).once expect(default_sfm.dump_file_path).to eq( File.join(expected_dir, 'flydata_sync_mysql.dump')) end end end describe '#dump_pos_path' do it { expect(default_sfm.dump_pos_path).to eq( File.join(default_mysqldump_dir, 'flydata_sync_mysql.dump.pos')) } end describe '#save_dump_pos' do context 'without mysql marshal data' do it do expect{default_sfm.save_dump_pos( status, table_name, last_pos, binlog_pos, state, substate)}.not_to raise_error expect(default_sfm.load_dump_pos).to eq({ status: status, table_name: table_name, last_pos: last_pos, binlog_pos: binlog_pos, state: state, substate: substate, mysql_table: nil }) end end end describe '#load_dump_pos' do let (:mysql_table) do Flydata::Mysql::MysqlTable.new( table_name, { 'id' => { format_type: 'int' }, 'value' => { format_type: 'text' } } ) end context 'with mysql marshal data' do before do default_sfm.save_mysql_table_marshal_dump(mysql_table) default_sfm.save_dump_pos(status, table_name, last_pos, binlog_pos, state, substate) end it do ret = default_sfm.load_dump_pos mt = ret.delete(:mysql_table) expect(ret).to eq({ status: status, table_name: table_name, last_pos: last_pos, binlog_pos: binlog_pos, state: state, substate: substate, }) expect(mt.table_name).to eq(table_name) expect(mt.columns).to eq({ 'id' => { format_type: 'int' }, 'value' => { format_type: 'text' }, }) end end end describe '#save_binlog' do let(:binfile) { 'mysqlbinlog.000001' } let(:pos) { 107 } let(:binlog_pos) { {binfile: binfile, pos: pos} } it do default_sfm.save_binlog(binlog_pos) expect(`cat #{default_sfm.binlog_path}`).to eq("#{binfile}\t#{pos}") end end describe '#binlog_path' do it { expect(default_sfm.binlog_path).to eq("#{FLYDATA_HOME}/flydata_sync_mysql.binlog.pos") } end end end module Output describe ForwarderFactory do let(:forwarder) do ForwarderFactory.create('tcpforwarder', 'test_tag', ['localhost:3456', 'localhost:4567']) end before :each do conn = double(TCPSocket) allow(conn).to receive(:setsockopt) allow(conn).to receive(:write) allow(conn).to receive(:close) allow(TCPSocket).to receive(:new).and_return(conn) allow(StringIO).to receive(:open) end describe '.create' do context 'with nil forwarder_key' do it 'should return TcpForwarder object' do forwarder = ForwarderFactory.create(nil, 'test_tag', ['localhost:3456', 'localhost:4567']) expect(forwarder.kind_of?(TcpForwarder)).to be_true end end context 'with tcpforwarder forwarder_key' do it 'should return TcpForwarder object' do forwarder = ForwarderFactory.create('tcpforwarder', 'test_tag', ['localhost:3456', 'localhost:4567']) expect(forwarder.kind_of?(TcpForwarder)).to be_true end end context 'with sslforwarder forwarder_key' do it 'should return SslForwarder object' do forwarder = ForwarderFactory.create('sslforwarder', 'test_tag', ['localhost:3456', 'localhost:4567']) expect(forwarder.kind_of?(SslForwarder)).to be_true end end end describe '#initialize' do context 'servers is empty' do it do expect{ForwarderFactory.create('tcpforwarder', 'test_tag', [])}.to raise_error end end context 'servers is nil' do it do expect{ForwarderFactory.create('tcpforwarder', 'test_tag', nil)}.to raise_error end end end describe '#emit' do let(:record) { {table_name: 'test_table_name', log: '{"key":"value"}'} } before :each do forwarder.set_options({buffer_size_limit: ([Time.now.to_i,record].to_msgpack.bytesize * 2.5)}) end context 'when the buffer size is less than threthold' do it do expect(forwarder.emit(record)).to be(false) expect(forwarder.buffer_record_count).to be(1) end end context 'when the buffer size exceeds threthold' do it do expect(forwarder.emit(record)).to be(false) expect(forwarder.emit(record)).to be(false) expect(forwarder.buffer_record_count).to be(2) expect(forwarder.emit(record)).to be(true) expect(forwarder.buffer_record_count).to be(0) end end end describe '#pickup_server' do context 'with only one server' do let(:servers) { ['localhost:1111'] } let(:forwarder) { ForwarderFactory.create('tcpforwarder', 'test_tag', servers) } it 'expect to return same server' do expect(forwarder.pickup_server).to eq('localhost:1111') expect(forwarder.pickup_server).to eq('localhost:1111') expect(forwarder.pickup_server).to eq('localhost:1111') end end context 'with servers' do let(:servers) { ['localhost:1111', 'localhost:2222', 'localhost:3333'] } let(:forwarder) { ForwarderFactory.create('tcpforwarder', 'test_tag', servers) } it 'expect to return same server' do expect(forwarder.pickup_server).to eq('localhost:1111') expect(forwarder.pickup_server).to eq('localhost:2222') expect(forwarder.pickup_server).to eq('localhost:3333') expect(forwarder.pickup_server).to eq('localhost:1111') end end end end end module Mysql describe MysqlDumpGenerator do let(:stdout) { double(:stdout) } let(:stderr) { double(:stderr) } let(:status) { double(:status) } let(:file_path) { File.join('/tmp', "flydata_sync_spec_mysqldump_#{Time.now.to_i}") } let(:default_conf) { { 'host' => 'localhost', 'port' => 3306, 'username' => 'admin', 'password' => 'pass', 'database' => 'dev', 'tables' => 'users,groups', } } let(:default_dump_generator) { MysqlDumpGenerator.new(default_conf) } describe '#initialize' do context 'with password' do subject { default_dump_generator.instance_variable_get(:@dump_cmd) } it { should eq('mysqldump -h localhost -P 3306 -uadmin -ppass --skip-lock-tables ' + '--single-transaction --flush-logs --hex-blob --master-data=2 dev users groups') } end context 'without password' do let (:dump_generator) do MysqlDumpGenerator.new(default_conf.merge({'password' => ''})) end subject { dump_generator.instance_variable_get(:@dump_cmd) } it { should eq('mysqldump -h localhost -P 3306 -uadmin --skip-lock-tables ' + '--single-transaction --flush-logs --hex-blob --master-data=2 dev users groups') } end end describe '#dump' do context 'when exit status is not 0' do before do `touch #{file_path}` expect(status).to receive(:exitstatus).and_return 1 expect(Open3).to receive(:capture3).and_return( ['(dummy std out)', '(dummy std err)', status] ) end it do expect{ default_dump_generator.dump(file_path) }.to raise_error expect(File.exists?(file_path)).to be_false end end context 'when exit status is 0 but no file' do before do expect(status).to receive(:exitstatus).and_return 0 expect(Open3).to receive(:capture3).and_return( ['(dummy std out)', '(dummy std err)', status] ) end it do expect{ default_dump_generator.dump(file_path) }.to raise_error expect(File.exists?(file_path)).to be_false end end context 'when exit status is 0 but file size is 0' do before do `touch #{file_path}` expect(status).to receive(:exitstatus).and_return 0 expect(Open3).to receive(:capture3).and_return( ['(dummy std out)', '(dummy std err)', status] ) end it do expect{ default_dump_generator.dump(file_path) }.to raise_error expect(File.exists?(file_path)).to be_true end end context 'when exit status is 0' do before do `echo "something..." > #{file_path}` expect(status).to receive(:exitstatus).and_return 0 expect(Open3).to receive(:capture3).and_return( ['(dummy std out)', '(dummy std err)', status] ) end it do expect(default_dump_generator.dump(file_path)).to be_true expect(File.exists?(file_path)).to be_true end end after :each do File.delete(file_path) if File.exists?(file_path) end end end describe MysqlDumpParser do let(:file_path) { File.join('/tmp', "flydata_sync_spec_mysqldump_parse_#{Time.now.to_i}") } let(:default_parser) { MysqlDumpParser.new(file_path) } def generate_dump_file(content) File.open(file_path, 'w') {|f| f.write(content)} end after do File.delete(file_path) if File.exists?(file_path) end describe '#initialize' do context 'when file does not exist' do it do expect{ MysqlDumpParser.new(file_path) }.to raise_error end end context 'when file exists' do before { generate_dump_file('') } it do expect(MysqlDumpParser.new(file_path)).to be_an_instance_of(MysqlDumpParser) end end end describe '#parse' do DUMP_HEADER = <<EOT -- MySQL dump 10.13 Distrib 5.6.13, for osx10.9 (x86_64) -- -- Host: localhost Database: sync_test -- ------------------------------------------------------ -- Server version>--5.6.13-log /*!40101 SET @OLD_CHARACTER_SET_CLIENT=@@CHARACTER_SET_CLIENT */; /*!40101 SET @OLD_CHARACTER_SET_RESULTS=@@CHARACTER_SET_RESULTS */; /*!40101 SET @OLD_COLLATION_CONNECTION=@@COLLATION_CONNECTION */; /*!40101 SET NAMES utf8 */; /*!40103 SET @OLD_TIME_ZONE=@@TIME_ZONE */; /*!40103 SET TIME_ZONE='+00:00' */; /*!40014 SET @OLD_UNIQUE_CHECKS=@@UNIQUE_CHECKS, UNIQUE_CHECKS=0 */; /*!40014 SET @OLD_FOREIGN_KEY_CHECKS=@@FOREIGN_KEY_CHECKS, FOREIGN_KEY_CHECKS=0 */; /*!40101 SET @OLD_SQL_MODE=@@SQL_MODE, SQL_MODE='NO_AUTO_VALUE_ON_ZERO' */; /*!40111 SET @OLD_SQL_NOTES=@@SQL_NOTES, SQL_NOTES=0 */; -- -- Position to start replication or point-in-time recovery from -- -- CHANGE MASTER TO MASTER_LOG_FILE='mysql-bin.000267', MASTER_LOG_POS=120; EOT def index_after(content, string) content.index(string) + string.bytesize + 1 end let(:default_parser) { MysqlDumpParser.new(file_path) } let(:default_binlog_pos) { {binfile: 'mysql-bin.000267', pos: 120 } } let(:dump_pos_after_binlog_pos) { index_after(DUMP_HEADER, 'MASTER_LOG_POS=120;') } let(:create_table_block) { double('create_table_block') } let(:insert_record_block) { double('insert_record_block') } let(:check_point_block) { double('check_point_block') } before do generate_dump_file('') end context 'when dump does not contain binlog pos' do before { generate_dump_file('dummy content') } it do expect(create_table_block).to receive(:call).never expect(insert_record_block).to receive(:call).never expect(check_point_block).to receive(:call).never binlog_pos = default_parser.parse( create_table_block, insert_record_block, check_point_block ) expect(binlog_pos).to be_nil end end context 'when dump contains only binlog pos' do before { generate_dump_file(<<EOT #{DUMP_HEADER} EOT ) } it do expect(create_table_block).to receive(:call).never expect(insert_record_block).to receive(:call).never expect(check_point_block).to receive(:call).once binlog_pos = default_parser.parse( create_table_block, insert_record_block, check_point_block ) expect(binlog_pos).to eq(default_binlog_pos) end end context 'when dump contains create table without records' do let(:dump_content) { <<EOT #{DUMP_HEADER} DROP TABLE IF EXISTS `users_login`; /*!40101 SET @saved_cs_client = @@character_set_client */; /*!40101 SET character_set_client = utf8 */; CREATE TABLE `users_login` ( `user_id` int(10) unsigned NOT NULL DEFAULT '0' COMMENT 'users id', `login_count` int(10) unsigned DEFAULT '0' COMMENT 'login count', `create_time` datetime NOT NULL COMMENT 'create time', `update_time` datetime NOT NULL COMMENT 'update time', PRIMARY KEY (`user_id`) ) ENGINE=InnoDB DEFAULT CHARSET=utf8 COMMENT=''; /*!40101 SET character_set_client = @saved_cs_client */; -- -- Dumping data for table `users_login` -- LOCK TABLES `users_login` WRITE; /*!40000 ALTER TABLE `users_login` DISABLE KEYS */; /*!40000 ALTER TABLE `users_login` ENABLE KEYS */; UNLOCK TABLES; EOT } before { generate_dump_file(dump_content) } it do expect(create_table_block).to receive(:call) { |mysql_table| expect(mysql_table.table_name).to eq('users_login') }.once expect(insert_record_block).to receive(:call).never expect(check_point_block).to receive(:call) { |mysql_table, last_pos, binlog_pos, state, substate| expect(mysql_table).to be_nil expect(last_pos).to eq(dump_pos_after_binlog_pos) expect(binlog_pos).to eq(default_binlog_pos) expect(state).to eq(Flydata::Mysql::MysqlDumpParser::State::CREATE_TABLE) expect(substate).to be_nil } expect(check_point_block).to receive(:call) { |mysql_table, last_pos, binlog_pos, state, substate| expect(mysql_table.table_name).to eq('users_login') expect(mysql_table.columns.keys).to eq(['user_id', 'login_count', 'create_time', 'update_time']) expect(last_pos).to eq(index_after(dump_content, 'ENGINE=InnoDB DEFAULT CHARSET=utf8 COMMENT=\'\';')) expect(binlog_pos).to eq(default_binlog_pos) expect(state).to eq(Flydata::Mysql::MysqlDumpParser::State::INSERT_RECORD) expect(substate).to be_nil } expect(check_point_block).to receive(:call) { |mysql_table, last_pos, binlog_pos, state, substate| expect(mysql_table.table_name).to eq('users_login') expect(last_pos).to eq(index_after(dump_content, 'UNLOCK TABLES;')) expect(binlog_pos).to eq(default_binlog_pos) expect(state).to eq(Flydata::Mysql::MysqlDumpParser::State::CREATE_TABLE) expect(substate).to be_nil } expect(check_point_block).to receive(:call).never binlog_pos = default_parser.parse( create_table_block, insert_record_block, check_point_block ) expect(binlog_pos).to eq(default_binlog_pos) end end context 'when dump contains create table with multi inserts' do let(:dump_content) { <<EOT #{DUMP_HEADER} DROP TABLE IF EXISTS `users_login`; /*!40101 SET @saved_cs_client = @@character_set_client */; /*!40101 SET character_set_client = utf8 */; CREATE TABLE `users_login` ( `user_id` int(10) unsigned NOT NULL DEFAULT '0' COMMENT 'users id', `login_count` int(10) unsigned DEFAULT '0' COMMENT 'login count', `comment` varchar(255) DEFAULT NULL COMMENT 'comment', `create_time` datetime NOT NULL COMMENT 'create time', `update_time` datetime NOT NULL COMMENT 'update time', PRIMARY KEY (`user_id`) ) ENGINE=InnoDB DEFAULT CHARSET=utf8; /*!40101 SET character_set_client = @saved_cs_client */; -- -- Dumping data for table `users_login` -- LOCK TABLES `users_login` WRITE; /*!40000 ALTER TABLE `users_login` DISABLE KEYS */; INSERT INTO `users_login` VALUES (15,46,'moodier','2001-10-02 08:18:08','1986-06-11 22:50:10'),(35,6,'missあいうえおteer','1991-10-15 19:38:07','1970-10-01 22:03:10'),(52,33,'sub\\\\field','1972-08-23 20:16:08','1974-10-10 23:28:11'); INSERT INTO `users_login` VALUES (373,31,'out\\'swearing','1979-10-07 08:10:08','2006-02-22 16:26:04'),(493,8,'schizophrenic','1979-07-06 07:34:07','1970-08-09 01:21:01'); /*!40000 ALTER TABLE `users_login` ENABLE KEYS */; UNLOCK TABLES; /*!40103 SET TIME_ZONE=@OLD_TIME_ZONE */; EOT } before { generate_dump_file(dump_content) } it do # create_table_block expect(create_table_block).to receive(:call) { |mysql_table| expect(mysql_table.table_name).to eq('users_login') expect(mysql_table.columns.keys).to eq(['user_id', 'login_count', 'comment', 'create_time', 'update_time']) }.once expect(create_table_block).to receive(:call).never # insert_record_block [ [ %w(15 46 moodier 2001-10-02\ 08:18:08 1986-06-11\ 22:50:10), %w(35 6 missあいうえおteer 1991-10-15\ 19:38:07 1970-10-01\ 22:03:10), %w(52 33 sub\\field 1972-08-23\ 20:16:08 1974-10-10\ 23:28:11),], [ %w(373 31 out'swearing 1979-10-07\ 08:10:08 2006-02-22\ 16:26:04), %w(493 8 schizophrenic 1979-07-06\ 07:34:07 1970-08-09\ 01:21:01),] ].each do |expected_values| expect(insert_record_block).to receive(:call) { |mysql_table, values_set| expect(mysql_table.table_name).to eq('users_login') expect(values_set).to eq(expected_values) nil }.once end expect(insert_record_block).to receive(:call).never # insert_record_block [ {state: Flydata::Mysql::MysqlDumpParser::State::CREATE_TABLE}, {state: Flydata::Mysql::MysqlDumpParser::State::INSERT_RECORD}, {state: Flydata::Mysql::MysqlDumpParser::State::CREATE_TABLE, last_pos: index_after(dump_content, 'UNLOCK TABLES;') + 10} ].each do |expected_params| expect(check_point_block).to receive(:call) { |mysql_table, last_pos, binlog_pos, state, substate| expect(mysql_table.table_name).to eq('users_login') if mysql_table expect(state).to eq(expected_params[:state]) if expected_params[:last_pos] expect(last_pos).to eq(expected_params[:last_pos]) end expect(binlog_pos).to eq(default_binlog_pos) expect(substate).to eq(expected_params[:substate]) expected_params[:result] }.once end expect(check_point_block).to receive(:call).never binlog_pos = default_parser.parse( create_table_block, insert_record_block, check_point_block ) expect(binlog_pos).to eq(default_binlog_pos) end end context 'when dump contains create table with records' do let(:dump_content) { <<EOT #{DUMP_HEADER} DROP TABLE IF EXISTS `users_login`; /*!40101 SET @saved_cs_client = @@character_set_client */; /*!40101 SET character_set_client = utf8 */; CREATE TABLE `users_login` ( `user_id` int(10) unsigned NOT NULL DEFAULT '0' COMMENT 'users id', `login_count` int(10) unsigned DEFAULT '0' COMMENT 'login count', `comment` varchar(255) DEFAULT NULL COMMENT 'comment', `create_time` datetime NOT NULL COMMENT 'create time', `update_time` datetime NOT NULL COMMENT 'update time', PRIMARY KEY (`user_id`) ) ENGINE=InnoDB DEFAULT CHARSET=utf8; /*!40101 SET character_set_client = @saved_cs_client */; -- -- Dumping data for table `users_login` -- LOCK TABLES `users_login` WRITE; /*!40000 ALTER TABLE `users_login` DISABLE KEYS */; INSERT INTO `users_login` VALUES (15,46,'moodier','2001-10-02 08:18:08','1986-06-11 22:50:10'),(35,6,NULL,'1991-10-15 19:38:07','1970-10-01 22:03:10'),(52,33,'subfield','1972-08-23 20:16:08','1974-10-10 23:28:11'); /*!40000 ALTER TABLE `users_login` ENABLE KEYS */; UNLOCK TABLES; /*!40103 SET TIME_ZONE=@OLD_TIME_ZONE */; EOT } before { generate_dump_file(dump_content) } it do # create_table_block expect(create_table_block).to receive(:call) { |mysql_table| expect(mysql_table.table_name).to eq('users_login') expect(mysql_table.columns.keys).to eq(['user_id', 'login_count', 'comment', 'create_time', 'update_time']) }.once expect(create_table_block).to receive(:call).never # insert_record_block [ [ %w(15 46 moodier 2001-10-02\ 08:18:08 1986-06-11\ 22:50:10), ['35', '6', nil, '1991-10-15 19:38:07', '1970-10-01 22:03:10'], %w(52 33 subfield 1972-08-23\ 20:16:08 1974-10-10\ 23:28:11),], ].each do |expected_values| expect(insert_record_block).to receive(:call) { |mysql_table, values_set| expect(mysql_table.table_name).to eq('users_login') expect(values_set).to eq(expected_values) nil }.once end expect(insert_record_block).to receive(:call).never # insert_record_block [ {state: Flydata::Mysql::MysqlDumpParser::State::CREATE_TABLE}, {state: Flydata::Mysql::MysqlDumpParser::State::INSERT_RECORD}, {state: Flydata::Mysql::MysqlDumpParser::State::CREATE_TABLE, last_pos: index_after(dump_content, 'UNLOCK TABLES;')} ].each do |expected_params| expect(check_point_block).to receive(:call) { |mysql_table, last_pos, binlog_pos, state, substate| expect(mysql_table.table_name).to eq('users_login') if mysql_table expect(state).to eq(expected_params[:state]) if expected_params[:last_pos] expect(last_pos).to eq(expected_params[:last_pos]) end expect(binlog_pos).to eq(default_binlog_pos) expect(substate).to eq(expected_params[:substate]) expected_params[:result] }.once end expect(check_point_block).to receive(:call).never binlog_pos = default_parser.parse( create_table_block, insert_record_block, check_point_block ) expect(binlog_pos).to eq(default_binlog_pos) end end context 'with resume after creating table' do let(:dump_content_head) { <<EOT #{DUMP_HEADER} DROP TABLE IF EXISTS `users_login`; /*!40101 SET @saved_cs_client = @@character_set_client */; /*!40101 SET character_set_client = utf8 */; CREATE TABLE `users_login` ( `user_id` int(10) unsigned NOT NULL DEFAULT '0' COMMENT 'users id', `login_count` int(10) unsigned DEFAULT '0' COMMENT 'login count', `comment` varchar(255) DEFAULT NULL COMMENT 'comment', `create_time` datetime NOT NULL COMMENT 'create time', `update_time` datetime NOT NULL COMMENT 'update time', PRIMARY KEY (`user_id`) ) ENGINE=InnoDB DEFAULT CHARSET=utf8; /*!40101 SET character_set_client = @saved_cs_client */; EOT } let(:dump_content_all) { <<EOT #{dump_content_head} -- -- Dumping data for table `users_login` -- LOCK TABLES `users_login` WRITE; /*!40000 ALTER TABLE `users_login` DISABLE KEYS */; INSERT INTO `users_login` VALUES (15,46,'moodier','2001-10-02 08:18:08','1986-06-11 22:50:10'),(35,6,'missteer','1991-10-15 19:38:07','1970-10-01 22:03:10'),(52,33,'subfield','1972-08-23 20:16:08','1974-10-10 23:28:11'); /*!40000 ALTER TABLE `users_login` ENABLE KEYS */; UNLOCK TABLES; /*!40103 SET TIME_ZONE=@OLD_TIME_ZONE */; EOT } before do generate_dump_file(dump_content_head) default_parser.parse( Proc.new{}, Proc.new{ raise "Should not be called" }, Proc.new{ |mysql_table, last_pos, binlog_pos, state, substate| if mysql_table @mysql_table = mysql_table @option = { status: Flydata::Command::Sync::STATUS_PARSING, table_name: mysql_table.table_name, last_pos: last_pos, binlog_pos: binlog_pos, state: state, substate: substate, mysql_table: mysql_table } end } ) expect(@option).to eq({ status: Flydata::Command::Sync::STATUS_PARSING, table_name: 'users_login', last_pos: index_after(dump_content_head, 'ENGINE=InnoDB DEFAULT CHARSET=utf8;'), binlog_pos: default_binlog_pos, state: Flydata::Mysql::MysqlDumpParser::State::INSERT_RECORD, substate: nil, mysql_table: @mysql_table }) end it do generate_dump_file(dump_content_all) parser_for_resume = MysqlDumpParser.new(file_path, @option) # create_table_block expect(create_table_block).to receive(:call).never # insert_record_block [ [ %w(15 46 moodier 2001-10-02\ 08:18:08 1986-06-11\ 22:50:10), %w(35 6 missteer 1991-10-15\ 19:38:07 1970-10-01\ 22:03:10), %w(52 33 subfield 1972-08-23\ 20:16:08 1974-10-10\ 23:28:11),], ].each do |expected_values| expect(insert_record_block).to receive(:call) { |mysql_table, values| expect(mysql_table.table_name).to eq('users_login') expect(values).to eq(expected_values) nil }.once end expect(insert_record_block).to receive(:call).never # check_point_block [ {state: Flydata::Mysql::MysqlDumpParser::State::CREATE_TABLE, last_pos: index_after(dump_content_all, 'UNLOCK TABLES;')} ].each do |expected_params| expect(check_point_block).to receive(:call) { |mysql_table, last_pos, binlog_pos, state, substate| expect(mysql_table.table_name).to eq('users_login') if mysql_table expect(state).to eq(expected_params[:state]) if expected_params[:last_pos] expect(last_pos).to eq(expected_params[:last_pos]) end expect(binlog_pos).to eq(default_binlog_pos) expect(substate).to eq(expected_params[:substate]) expected_params[:result] }.once end expect(check_point_block).to receive(:call).never binlog_pos = parser_for_resume.parse( create_table_block, insert_record_block, check_point_block ) expect(binlog_pos).to eq(default_binlog_pos) end end context 'with resume during insert records' do let(:dump_content) { <<EOT #{DUMP_HEADER} DROP TABLE IF EXISTS `users_login`; /*!40101 SET @saved_cs_client = @@character_set_client */; /*!40101 SET character_set_client = utf8 */; CREATE TABLE `users_login` ( `user_id` int(10) unsigned NOT NULL DEFAULT '0' COMMENT 'users id', `login_count` int(10) unsigned DEFAULT '0' COMMENT 'login count', `comment` varchar(255) DEFAULT NULL COMMENT 'comment', `create_time` datetime NOT NULL COMMENT 'create time', `update_time` datetime NOT NULL COMMENT 'update time', PRIMARY KEY (`user_id`) ) ENGINE=InnoDB DEFAULT CHARSET=utf8; /*!40101 SET character_set_client = @saved_cs_client */; -- -- Dumping data for table `users_login` -- LOCK TABLES `users_login` WRITE; /*!40000 ALTER TABLE `users_login` DISABLE KEYS */; INSERT INTO `users_login` VALUES (15,46,'moodier','2001-10-02 08:18:08','1986-06-11 22:50:10'),(35,6,'missteer','1991-10-15 19:38:07','1970-10-01 22:03:10'),(52,33,'subfield','1972-08-23 20:16:08','1974-10-10 23:28:11'); INSERT INTO `users_login` VALUES (194,11,'pandemonium','2008-01-22 22:15:10','1991-04-04 17:30:05'),(230,7,'cloudburst','2010-12-28 11:46:11','1971-06-22 13:08:01'); /*!40000 ALTER TABLE `users_login` ENABLE KEYS */; UNLOCK TABLES; /*!40103 SET TIME_ZONE=@OLD_TIME_ZONE */; EOT } before do generate_dump_file(dump_content) insert_record_block_for_resume = double('insert_record_block_for_resume') expect(insert_record_block_for_resume).to receive(:call) { true }.once expect(insert_record_block_for_resume).to receive(:call) { false }.once default_parser.parse( Proc.new{}, insert_record_block_for_resume, Proc.new{ |mysql_table, last_pos, binlog_pos, state, substate| if last_pos == index_after(dump_content, "'1972-08-23 20:16:08','1974-10-10 23:28:11');") @mysql_table = mysql_table @option = { status: Flydata::Command::Sync::STATUS_PARSING, table_name: mysql_table.table_name, last_pos: last_pos, binlog_pos: binlog_pos, state: state, substate: substate, mysql_table: mysql_table } end } ) expect(@option).to eq({ status: Flydata::Command::Sync::STATUS_PARSING, table_name: 'users_login', last_pos: index_after(dump_content, "'1972-08-23 20:16:08','1974-10-10 23:28:11');"), binlog_pos: default_binlog_pos, state: Flydata::Mysql::MysqlDumpParser::State::INSERT_RECORD, substate: nil, mysql_table: @mysql_table }) end it do generate_dump_file(dump_content) parser_for_resume = MysqlDumpParser.new(file_path, @option) # create_table_block expect(create_table_block).to receive(:call).never # insert_record_block [ [ %w(194 11 pandemonium 2008-01-22\ 22:15:10 1991-04-04\ 17:30:05), %w(230 7 cloudburst 2010-12-28\ 11:46:11 1971-06-22\ 13:08:01),], ].each do |expected_values| expect(insert_record_block).to receive(:call) { |mysql_table, values_set| expect(mysql_table.table_name).to eq('users_login') expect(values_set).to eq(expected_values) nil }.once end expect(insert_record_block).to receive(:call).never # check_point_block [ {state: Flydata::Mysql::MysqlDumpParser::State::CREATE_TABLE, last_pos: index_after(dump_content, 'UNLOCK TABLES;')} ].each do |expected_params| expect(check_point_block).to receive(:call) { |mysql_table, last_pos, binlog_pos, state, substate| expect(mysql_table.table_name).to eq('users_login') if mysql_table expect(state).to eq(expected_params[:state]) if expected_params[:last_pos] expect(last_pos).to eq(expected_params[:last_pos]) end expect(binlog_pos).to eq(default_binlog_pos) expect(substate).to eq(expected_params[:substate]) expected_params[:result] }.once end expect(check_point_block).to receive(:call).never binlog_pos = parser_for_resume.parse( create_table_block, insert_record_block, check_point_block ) expect(binlog_pos).to eq(default_binlog_pos) end end context 'when dump contains contain escape characters' do let(:dump_content) { <<EOT #{DUMP_HEADER} DROP TABLE IF EXISTS `users_login`; /*!40101 SET @saved_cs_client = @@character_set_client */; /*!40101 SET character_set_client = utf8 */; CREATE TABLE `users_login` ( `user_id` int(10) unsigned NOT NULL DEFAULT '0' COMMENT 'users id', `login_count` int(10) unsigned DEFAULT '0' COMMENT 'login count', `comment` varchar(255) DEFAULT NULL COMMENT 'comment', `create_time` datetime NOT NULL COMMENT 'create time', `update_time` datetime NOT NULL COMMENT 'update time', PRIMARY KEY (`user_id`) ) ENGINE=InnoDB DEFAULT CHARSET=utf8; /*!40101 SET character_set_client = @saved_cs_client */; -- -- Dumping data for table `users_login` -- LOCK TABLES `users_login` WRITE; /*!40000 ALTER TABLE `users_login` DISABLE KEYS */; INSERT INTO `users_login` VALUES (15,46,'moodier)','2001-10-02 08:18:08','1986-06-11 22:50:10'),(35,6,'miss,teer','1991-10-15 19:38:07','1970-10-01 22:03:10'),(52,33,'subfield\\'','1972-08-23 20:16:08','1974-10-10 23:28:11'); INSERT INTO `users_login` VALUES (373,31,'outs\\nwearing','1979-10-07 08:10:08','2006-02-22 16:26:04'),(493,8,'schiz\tophrenic','1979-07-06 07:34:07\\'),','1970-08-09,01:21:01,\\')'); /*!40000 ALTER TABLE `users_login` ENABLE KEYS */; UNLOCK TABLES; /*!40103 SET TIME_ZONE=@OLD_TIME_ZONE */; EOT } before { generate_dump_file(dump_content) } it do # create_table_block expect(create_table_block).to receive(:call) { |mysql_table| expect(mysql_table.table_name).to eq('users_login') expect(mysql_table.columns.keys).to eq(['user_id', 'login_count', 'comment', 'create_time', 'update_time']) }.once expect(create_table_block).to receive(:call).never # insert_record_block [ [ %w(15 46 moodier\) 2001-10-02\ 08:18:08 1986-06-11\ 22:50:10), %w(35 6 miss,teer 1991-10-15\ 19:38:07 1970-10-01\ 22:03:10), %w(52 33 subfield' 1972-08-23\ 20:16:08 1974-10-10\ 23:28:11),], [ ['373','31',"outs\nwearing",'1979-10-07 08:10:08','2006-02-22 16:26:04'], ['493','8',"schiz\tophrenic",'1979-07-06 07:34:07\'),','1970-08-09,01:21:01,\')'] ] ].each do |expected_values| expect(insert_record_block).to receive(:call) { |mysql_table, values_set| expect(mysql_table.table_name).to eq('users_login') expect(values_set).to eq(expected_values) nil }.once end expect(insert_record_block).to receive(:call).never # insert_record_block [ {state: Flydata::Mysql::MysqlDumpParser::State::CREATE_TABLE}, {state: Flydata::Mysql::MysqlDumpParser::State::INSERT_RECORD}, {state: Flydata::Mysql::MysqlDumpParser::State::CREATE_TABLE, last_pos: index_after(dump_content, 'UNLOCK TABLES;')} ].each do |expected_params| expect(check_point_block).to receive(:call) { |mysql_table, last_pos, binlog_pos, state, substate| expect(mysql_table.table_name).to eq('users_login') if mysql_table expect(state).to eq(expected_params[:state]) if expected_params[:last_pos] expect(last_pos).to eq(expected_params[:last_pos]) end expect(binlog_pos).to eq(default_binlog_pos) expect(substate).to eq(expected_params[:substate]) expected_params[:result] }.once end expect(check_point_block).to receive(:call).never binlog_pos = default_parser.parse( create_table_block, insert_record_block, check_point_block ) expect(binlog_pos).to eq(default_binlog_pos) end end end end describe MysqlDumpParser::InsertParser do describe '#parse' do let(:target_line) { '' } let(:parser) { MysqlDumpParser::InsertParser.new(StringIO.new(target_line)) } subject { parser.parse } context 'when single record' do let(:target_line) { "INSERT INTO `test_table` VALUES (1,'hogehoge','2014-04-15 13:49:14');" } it 'should return one element array' do expect(subject).to eq([['1','hogehoge','2014-04-15 13:49:14']]) end end context 'when 2 records' do let(:target_line) { "INSERT INTO `test_table` VALUES (1,'foo','2014-04-15 13:49:14'),(2,'var','2014-04-15 14:21:05');" } it 'should return two element array' do expect(subject).to eq([['1','foo','2014-04-15 13:49:14'],['2','var','2014-04-15 14:21:05']]) end end context 'when data includes carriage return' do let(:target_line) { "INSERT INTO `test_table` VALUES (1,'abcd\\refgh','2014-04-15 13:49:14');" } it 'should escape return carriage' do expect(subject).to eq([['1',"abcd\refgh",'2014-04-15 13:49:14']]) end end context 'when data includes new line' do let(:target_line) { "INSERT INTO `test_table` VALUES (1,'abcd\\nefgh','2014-04-15 13:49:14');" } it 'should escape new line' do expect(subject).to eq([['1',"abcd\nefgh",'2014-04-15 13:49:14']]) end end context 'when data includes escaped single quotation' do let(:target_line) { "INSERT INTO `test_table` VALUES (1,'abcd\\',\\'efgh','2014-04-15 13:49:14');" } it 'should escape single quotation' do expect(subject).to eq([['1',"abcd','efgh",'2014-04-15 13:49:14']]) end end context 'when data includes back slash' do let(:target_line) { "INSERT INTO `test_table` VALUES (1,'abcd\\\\efgh','2014-04-15 13:49:14');" } it 'should escape back slash' do expect(subject).to eq([['1',"abcd\\efgh",'2014-04-15 13:49:14']]) end end context 'when data includes mixed escaped characters' do let(:target_line) { "INSERT INTO `test_table` VALUES (1,'ab\\rcd\\\\e\\nf\\',\\'gh','2014-04-15 13:49:14');" } it 'should escape all' do expect(subject).to eq([['1',"ab\rcd\\e\nf','gh",'2014-04-15 13:49:14']]) end end context 'when data includes mixed escaped characters in a row' do let(:target_line) { "INSERT INTO `test_table` VALUES (1,'ab\\\\ncd\\\\\\nefgh','2014-04-15 13:49:14');" } it 'should escape all' do expect(subject).to eq([['1',"ab\\ncd\\\nefgh",'2014-04-15 13:49:14']]) end end context 'when data end with back slash' do let(:target_line) { "INSERT INTO `test_table` VALUES (1,'D:\\\\download\\\\','2014-04-15 13:49:14');" } it 'should escape back slash' do expect(subject).to eq([['1',"D:\\download\\",'2014-04-15 13:49:14']]) end end end end end end