spec/flydata/command/sync_spec.rb in flydata-0.2.8 vs spec/flydata/command/sync_spec.rb in flydata-0.2.9

- old
+ new

@@ -139,1166 +139,6 @@ include_examples 'throws an error' end end end end - - module FileUtil - describe SyncFileManager do - let(:default_mysqldump_dir) do - File.join('/tmp', "sync_dump_#{Time.now.to_i}") - end - let(:default_data_entry) do - {"id"=>93, - "name"=>"flydata_sync_mysql", - "data_port_id"=>52, - "display_name"=>"flydata_sync_mysql", - "enabled"=>true, - "heroku_log_type"=>nil, - "heroku_resource_id"=>nil, - "log_deletion"=>nil, - "log_file_delimiter"=>nil, - "log_file_type"=>nil, - "log_path"=>nil, - "redshift_schema_name"=>"", - "redshift_table_name"=>nil, - "created_at"=>"2014-01-22T18:58:43Z", - "updated_at"=>"2014-01-30T02:42:26Z", - "type"=>"RedshiftMysqlDataEntry", - "tag_name"=>"flydata.a458c641_dp52.flydata_mysql", - "tag_name_dev"=>"flydata.a458c641_dp52.flydata_mysql.dev", - "data_port_key"=>"a458c641", - "mysql_data_entry_preference" => - { "host"=>"localhost", "port"=>3306, "username"=>"masashi", - "password"=>"welcome", "database"=>"sync_test", "tables"=>nil, - "mysqldump_dir"=>default_mysqldump_dir, "forwarder" => "tcpforwarder", - "data_servers"=>"localhost:9905" } - } - end - let(:default_sfm) { SyncFileManager.new(default_data_entry) } - - let (:status) { 'PARSING' } - let (:table_name) { 'test_table' } - let (:last_pos) { 9999 } - let (:binlog_pos) { {binfile: 'mysqlbin.00001', pos: 111} } - let (:state) { 'CREATE_TABLE' } - let (:substate) { nil } - - after :each do - if Dir.exists?(default_mysqldump_dir) - Dir.delete(default_mysqldump_dir) rescue nil - end - if File.exists?(default_mysqldump_dir) - File.delete(default_mysqldump_dir) rescue nil - end - end - - describe '#dump_file_path' do - context 'when mysqldump_dir param is nil' do - before do - default_data_entry['mysql_data_entry_preference'].delete('mysqldump_dir') - end - it do - expect(default_sfm.dump_file_path).to eq( - File.join(Flydata::FLYDATA_HOME, 'dump', 'flydata_sync_mysql.dump')) - end - end - context 'when file exists' do - before { `touch #{default_mysqldump_dir}`} - it do - expect{default_sfm.dump_file_path}.to raise_error - end - end - context 'when directory exists' do - before { `mkdir -p #{default_mysqldump_dir}`} - it do - expect(FileUtils).to receive(:mkdir_p).with(default_mysqldump_dir).never - expect(default_sfm.dump_file_path).to eq( - File.join(default_mysqldump_dir, 'flydata_sync_mysql.dump')) - end - end - context 'when directory or file does not exist' do - it do - expect(FileUtils).to receive(:mkdir_p).with(default_mysqldump_dir).once - expect(default_sfm.dump_file_path).to eq( - File.join(default_mysqldump_dir, 'flydata_sync_mysql.dump')) - end - end - context 'when file name includes "~"' do - let(:default_mysqldump_dir) { "~/tmp/dump/sync_spec_#{Time.now.to_i}" } - it do - expected_dir = File.join(ENV['HOME'], default_mysqldump_dir[1..-1]) - expect(FileUtils).to receive(:mkdir_p).with(expected_dir).once - expect(default_sfm.dump_file_path).to eq( - File.join(expected_dir, 'flydata_sync_mysql.dump')) - end - end - end - - describe '#dump_pos_path' do - it { expect(default_sfm.dump_pos_path).to eq( - File.join(default_mysqldump_dir, 'flydata_sync_mysql.dump.pos')) } - end - - describe '#save_dump_pos' do - context 'without mysql marshal data' do - it do - expect{default_sfm.save_dump_pos( - status, table_name, last_pos, binlog_pos, state, substate)}.not_to raise_error - expect(default_sfm.load_dump_pos).to eq({ - status: status, table_name: table_name, last_pos: last_pos, - binlog_pos: binlog_pos, state: state, substate: substate, - mysql_table: nil - }) - end - end - end - - describe '#load_dump_pos' do - let (:mysql_table) do - Flydata::Mysql::MysqlTable.new( - table_name, { 'id' => { format_type: 'int' }, 'value' => { format_type: 'text' } } - ) - end - - context 'with mysql marshal data' do - before do - default_sfm.save_mysql_table_marshal_dump(mysql_table) - default_sfm.save_dump_pos(status, table_name, last_pos, binlog_pos, state, substate) - end - it do - ret = default_sfm.load_dump_pos - mt = ret.delete(:mysql_table) - expect(ret).to eq({ - status: status, table_name: table_name, last_pos: last_pos, - binlog_pos: binlog_pos, state: state, substate: substate, - }) - expect(mt.table_name).to eq(table_name) - expect(mt.columns).to eq({ - 'id' => { format_type: 'int' }, - 'value' => { format_type: 'text' }, - }) - end - end - end - - describe '#save_binlog' do - let(:binfile) { 'mysqlbinlog.000001' } - let(:pos) { 107 } - let(:binlog_pos) { {binfile: binfile, pos: pos} } - it do - default_sfm.save_binlog(binlog_pos) - expect(`cat #{default_sfm.binlog_path}`).to eq("#{binfile}\t#{pos}") - end - end - - describe '#binlog_path' do - it { expect(default_sfm.binlog_path).to eq("#{FLYDATA_HOME}/flydata_sync_mysql.binlog.pos") } - end - end - end - - module Output - describe ForwarderFactory do - let(:forwarder) do - ForwarderFactory.create('tcpforwarder', 'test_tag', ['localhost:3456', 'localhost:4567']) - end - before :each do - conn = double(TCPSocket) - allow(conn).to receive(:setsockopt) - allow(conn).to receive(:write) - allow(conn).to receive(:close) - allow(TCPSocket).to receive(:new).and_return(conn) - allow(StringIO).to receive(:open) - end - - describe '.create' do - context 'with nil forwarder_key' do - it 'should return TcpForwarder object' do - forwarder = ForwarderFactory.create(nil, 'test_tag', ['localhost:3456', 'localhost:4567']) - expect(forwarder.kind_of?(TcpForwarder)).to be_truthy - end - end - - context 'with tcpforwarder forwarder_key' do - it 'should return TcpForwarder object' do - forwarder = ForwarderFactory.create('tcpforwarder', 'test_tag', ['localhost:3456', 'localhost:4567']) - expect(forwarder.kind_of?(TcpForwarder)).to be_truthy - end - end - - context 'with sslforwarder forwarder_key' do - it 'should return SslForwarder object' do - forwarder = ForwarderFactory.create('sslforwarder', 'test_tag', ['localhost:3456', 'localhost:4567']) - expect(forwarder.kind_of?(SslForwarder)).to be_truthy - end - end - end - - describe '#initialize' do - context 'servers is empty' do - it do - expect{ForwarderFactory.create('tcpforwarder', 'test_tag', [])}.to raise_error - end - end - context 'servers is nil' do - it do - expect{ForwarderFactory.create('tcpforwarder', 'test_tag', nil)}.to raise_error - end - end - end - - describe '#emit' do - let(:record) { {table_name: 'test_table_name', log: '{"key":"value"}'} } - before :each do - forwarder.set_options({buffer_size_limit: ([Time.now.to_i,record].to_msgpack.bytesize * 2.5)}) - end - context 'when the buffer size is less than threthold' do - it do - expect(forwarder.emit(record)).to be(false) - expect(forwarder.buffer_record_count).to be(1) - end - end - context 'when the buffer size exceeds threthold' do - it do - expect(forwarder.emit(record)).to be(false) - expect(forwarder.emit(record)).to be(false) - expect(forwarder.buffer_record_count).to be(2) - expect(forwarder.emit(record)).to be(true) - expect(forwarder.buffer_record_count).to be(0) - end - end - end - - describe '#pickup_server' do - context 'with only one server' do - let(:servers) { ['localhost:1111'] } - let(:forwarder) { - ForwarderFactory.create('tcpforwarder', 'test_tag', servers) - } - it 'expect to return same server' do - expect(forwarder.pickup_server).to eq('localhost:1111') - expect(forwarder.pickup_server).to eq('localhost:1111') - expect(forwarder.pickup_server).to eq('localhost:1111') - end - end - context 'with servers' do - let(:servers) { ['localhost:1111', 'localhost:2222', 'localhost:3333'] } - let(:forwarder) { - ForwarderFactory.create('tcpforwarder', 'test_tag', servers) - } - it 'expect to return same server' do - expect(forwarder.pickup_server).to eq('localhost:1111') - expect(forwarder.pickup_server).to eq('localhost:2222') - expect(forwarder.pickup_server).to eq('localhost:3333') - expect(forwarder.pickup_server).to eq('localhost:1111') - end - end - end - end - end - - module Mysql - describe MysqlDumpGeneratorMasterData do - let(:stdout) { double(:stdout) } - let(:stderr) { double(:stderr) } - let(:status) { double(:status) } - let(:file_path) { File.join('/tmp', "flydata_sync_spec_mysqldump_#{Time.now.to_i}") } - let(:default_conf) { { - 'host' => 'localhost', 'port' => 3306, 'username' => 'admin', - 'password' => 'pass', 'database' => 'dev', 'tables' => 'users,groups', - } } - let(:default_dump_generator) { MysqlDumpGeneratorMasterData.new(default_conf) } - - describe '#initialize' do - context 'with password' do - subject { default_dump_generator.instance_variable_get(:@dump_cmd) } - it { is_expected.to eq('mysqldump --protocol=tcp -h localhost -P 3306 -uadmin -ppass --skip-lock-tables ' + - '--single-transaction --hex-blob --flush-logs --master-data=2 dev users groups') } - end - context 'without password' do - let (:dump_generator) do - MysqlDumpGeneratorMasterData.new(default_conf.merge({'password' => ''})) - end - subject { dump_generator.instance_variable_get(:@dump_cmd) } - it { is_expected.to eq('mysqldump --protocol=tcp -h localhost -P 3306 -uadmin --skip-lock-tables ' + - '--single-transaction --hex-blob --flush-logs --master-data=2 dev users groups') } - end - end - - describe '#dump' do - context 'when exit status is not 0' do - before do - `touch #{file_path}` - expect(status).to receive(:exitstatus).and_return 1 - expect(Open3).to receive(:capture3).and_return( - ['(dummy std out)', '(dummy std err)', status] - ) - end - it do - expect{ default_dump_generator.dump(file_path) }.to raise_error - expect(File.exists?(file_path)).to be_falsey - end - end - context 'when exit status is 0 but no file' do - before do - expect(status).to receive(:exitstatus).and_return 0 - expect(Open3).to receive(:capture3).and_return( - ['(dummy std out)', '(dummy std err)', status] - ) - end - it do - expect{ default_dump_generator.dump(file_path) }.to raise_error - expect(File.exists?(file_path)).to be_falsey - end - end - context 'when exit status is 0 but file size is 0' do - before do - `touch #{file_path}` - expect(status).to receive(:exitstatus).and_return 0 - expect(Open3).to receive(:capture3).and_return( - ['(dummy std out)', '(dummy std err)', status] - ) - end - it do - expect{ default_dump_generator.dump(file_path) }.to raise_error - expect(File.exists?(file_path)).to be_truthy - end - end - context 'when exit status is 0' do - before do - `echo "something..." > #{file_path}` - expect(status).to receive(:exitstatus).and_return 0 - expect(Open3).to receive(:capture3).and_return( - ['(dummy std out)', '(dummy std err)', status] - ) - end - it do - expect(default_dump_generator.dump(file_path)).to be_truthy - expect(File.exists?(file_path)).to be_truthy - end - end - after :each do - File.delete(file_path) if File.exists?(file_path) - end - end - end - - describe MysqlDumpParser do - let(:file_path) { File.join('/tmp', "flydata_sync_spec_mysqldump_parse_#{Time.now.to_i}") } - let(:default_parser) { MysqlDumpParser.new(file_path) } - - def generate_dump_file(content) - File.open(file_path, 'w') {|f| f.write(content)} - end - - after do - File.delete(file_path) if File.exists?(file_path) - end - - describe '#initialize' do - context 'when file does not exist' do - it do - expect{ MysqlDumpParser.new(file_path) }.to raise_error - end - end - context 'when file exists' do - before { generate_dump_file('') } - it do - expect(MysqlDumpParser.new(file_path)).to be_an_instance_of(MysqlDumpParser) - end - end - end - - describe '#parse' do - DUMP_HEADER = <<EOT --- MySQL dump 10.13 Distrib 5.6.13, for osx10.9 (x86_64) --- --- Host: localhost Database: sync_test --- ------------------------------------------------------ --- Server version>--5.6.13-log - -/*!40101 SET @OLD_CHARACTER_SET_CLIENT=@@CHARACTER_SET_CLIENT */; -/*!40101 SET @OLD_CHARACTER_SET_RESULTS=@@CHARACTER_SET_RESULTS */; -/*!40101 SET @OLD_COLLATION_CONNECTION=@@COLLATION_CONNECTION */; -/*!40101 SET NAMES utf8 */; -/*!40103 SET @OLD_TIME_ZONE=@@TIME_ZONE */; -/*!40103 SET TIME_ZONE='+00:00' */; -/*!40014 SET @OLD_UNIQUE_CHECKS=@@UNIQUE_CHECKS, UNIQUE_CHECKS=0 */; -/*!40014 SET @OLD_FOREIGN_KEY_CHECKS=@@FOREIGN_KEY_CHECKS, FOREIGN_KEY_CHECKS=0 */; -/*!40101 SET @OLD_SQL_MODE=@@SQL_MODE, SQL_MODE='NO_AUTO_VALUE_ON_ZERO' */; -/*!40111 SET @OLD_SQL_NOTES=@@SQL_NOTES, SQL_NOTES=0 */; - --- --- Position to start replication or point-in-time recovery from --- - --- CHANGE MASTER TO MASTER_LOG_FILE='mysql-bin.000267', MASTER_LOG_POS=120; -EOT - - def index_after(content, string) - content.index(string) + string.bytesize + 1 - end - - let(:default_parser) { MysqlDumpParser.new(file_path) } - let(:default_binlog_pos) { {binfile: 'mysql-bin.000267', pos: 120 } } - let(:dump_pos_after_binlog_pos) { index_after(DUMP_HEADER, 'MASTER_LOG_POS=120;') } - - let(:create_table_block) { double('create_table_block') } - let(:insert_record_block) { double('insert_record_block') } - let(:check_point_block) { double('check_point_block') } - - before do - generate_dump_file('') - end - - context 'when dump does not contain binlog pos' do - before { generate_dump_file('dummy content') } - it do - expect(create_table_block).to receive(:call).never - expect(insert_record_block).to receive(:call).never - expect(check_point_block).to receive(:call).never - binlog_pos = default_parser.parse( - create_table_block, insert_record_block, check_point_block - ) - expect(binlog_pos).to be_nil - end - end - - context 'when dump contains only binlog pos' do - before { generate_dump_file(<<EOT -#{DUMP_HEADER} -EOT -) } - it do - expect(create_table_block).to receive(:call).never - expect(insert_record_block).to receive(:call).never - expect(check_point_block).to receive(:call).once - binlog_pos = default_parser.parse( - create_table_block, insert_record_block, check_point_block - ) - expect(binlog_pos).to eq(default_binlog_pos) - end - end - - context 'when dump contains create table without records' do - let(:dump_content) { <<EOT -#{DUMP_HEADER} - -DROP TABLE IF EXISTS `users_login`; -/*!40101 SET @saved_cs_client = @@character_set_client */; -/*!40101 SET character_set_client = utf8 */; -CREATE TABLE `users_login` ( - `user_id` int(10) unsigned NOT NULL DEFAULT '0' COMMENT 'users id', - `login_count` int(10) unsigned DEFAULT '0' COMMENT 'login count', - `create_time` datetime NOT NULL COMMENT 'create time', - `update_time` datetime NOT NULL COMMENT 'update time', - PRIMARY KEY (`user_id`) -) ENGINE=InnoDB DEFAULT CHARSET=utf8 COMMENT=''; -/*!40101 SET character_set_client = @saved_cs_client */; - --- --- Dumping data for table `users_login` --- - -LOCK TABLES `users_login` WRITE; -/*!40000 ALTER TABLE `users_login` DISABLE KEYS */; -/*!40000 ALTER TABLE `users_login` ENABLE KEYS */; -UNLOCK TABLES; - -EOT - } - before { generate_dump_file(dump_content) } - it do - expect(create_table_block).to receive(:call) { |mysql_table| - expect(mysql_table.table_name).to eq('users_login') - }.once - expect(insert_record_block).to receive(:call).never - expect(check_point_block).to receive(:call) { |mysql_table, last_pos, binlog_pos, state, substate| - expect(mysql_table).to be_nil - expect(last_pos).to eq(dump_pos_after_binlog_pos) - expect(binlog_pos).to eq(default_binlog_pos) - expect(state).to eq(Flydata::Mysql::MysqlDumpParser::State::CREATE_TABLE) - expect(substate).to be_nil - } - expect(check_point_block).to receive(:call) { |mysql_table, last_pos, binlog_pos, state, substate| - expect(mysql_table.table_name).to eq('users_login') - expect(mysql_table.columns.keys).to eq(['user_id', 'login_count', 'create_time', 'update_time']) - expect(last_pos).to eq(index_after(dump_content, 'ENGINE=InnoDB DEFAULT CHARSET=utf8 COMMENT=\'\';')) - expect(binlog_pos).to eq(default_binlog_pos) - expect(state).to eq(Flydata::Mysql::MysqlDumpParser::State::INSERT_RECORD) - expect(substate).to be_nil - } - expect(check_point_block).to receive(:call) { |mysql_table, last_pos, binlog_pos, state, substate| - expect(mysql_table.table_name).to eq('users_login') - expect(last_pos).to eq(index_after(dump_content, 'UNLOCK TABLES;')) - expect(binlog_pos).to eq(default_binlog_pos) - expect(state).to eq(Flydata::Mysql::MysqlDumpParser::State::CREATE_TABLE) - expect(substate).to be_nil - } - expect(check_point_block).to receive(:call).never - - binlog_pos = default_parser.parse( - create_table_block, insert_record_block, check_point_block - ) - expect(binlog_pos).to eq(default_binlog_pos) - end - end - - context 'when dump contains create table with multi inserts' do - let(:dump_content) { <<EOT -#{DUMP_HEADER} - -DROP TABLE IF EXISTS `users_login`; -/*!40101 SET @saved_cs_client = @@character_set_client */; -/*!40101 SET character_set_client = utf8 */; -CREATE TABLE `users_login` ( - `user_id` int(10) unsigned NOT NULL DEFAULT '0' COMMENT 'users id', - `login_count` int(10) unsigned DEFAULT '0' COMMENT 'login count', - `comment` varchar(255) DEFAULT NULL COMMENT 'comment', - `create_time` datetime NOT NULL COMMENT 'create time', - `update_time` datetime NOT NULL COMMENT 'update time', - PRIMARY KEY (`user_id`) -) ENGINE=InnoDB DEFAULT CHARSET=utf8; -/*!40101 SET character_set_client = @saved_cs_client */; - --- --- Dumping data for table `users_login` --- - -LOCK TABLES `users_login` WRITE; -/*!40000 ALTER TABLE `users_login` DISABLE KEYS */; -INSERT INTO `users_login` VALUES (15,46,'moodier','2001-10-02 08:18:08','1986-06-11 22:50:10'),(35,6,'missあいうえおteer','1991-10-15 19:38:07','1970-10-01 22:03:10'),(52,33,'sub\\\\field','1972-08-23 20:16:08','1974-10-10 23:28:11'); -INSERT INTO `users_login` VALUES (373,31,'out\\'swearing','1979-10-07 08:10:08','2006-02-22 16:26:04'),(493,8,'schizophrenic','1979-07-06 07:34:07','1970-08-09 01:21:01'); -/*!40000 ALTER TABLE `users_login` ENABLE KEYS */; -UNLOCK TABLES; -/*!40103 SET TIME_ZONE=@OLD_TIME_ZONE */; - -EOT - } - before { generate_dump_file(dump_content) } - it do - # create_table_block - expect(create_table_block).to receive(:call) { |mysql_table| - expect(mysql_table.table_name).to eq('users_login') - expect(mysql_table.columns.keys).to eq(['user_id', 'login_count', 'comment', 'create_time', 'update_time']) - }.once - expect(create_table_block).to receive(:call).never - - # insert_record_block - [ - [ %w(15 46 moodier 2001-10-02\ 08:18:08 1986-06-11\ 22:50:10), - %w(35 6 missあいうえおteer 1991-10-15\ 19:38:07 1970-10-01\ 22:03:10), - %w(52 33 sub\\field 1972-08-23\ 20:16:08 1974-10-10\ 23:28:11),], - [ %w(373 31 out'swearing 1979-10-07\ 08:10:08 2006-02-22\ 16:26:04), - %w(493 8 schizophrenic 1979-07-06\ 07:34:07 1970-08-09\ 01:21:01),] - ].each do |expected_values| - expect(insert_record_block).to receive(:call) { |mysql_table, values_set| - expect(mysql_table.table_name).to eq('users_login') - expect(values_set).to eq(expected_values) - nil - }.once - end - expect(insert_record_block).to receive(:call).never - - # insert_record_block - [ - {state: Flydata::Mysql::MysqlDumpParser::State::CREATE_TABLE}, - {state: Flydata::Mysql::MysqlDumpParser::State::INSERT_RECORD}, - {state: Flydata::Mysql::MysqlDumpParser::State::CREATE_TABLE, - last_pos: index_after(dump_content, 'UNLOCK TABLES;') + 10} - ].each do |expected_params| - expect(check_point_block).to receive(:call) { |mysql_table, last_pos, binlog_pos, state, substate| - expect(mysql_table.table_name).to eq('users_login') if mysql_table - expect(state).to eq(expected_params[:state]) - if expected_params[:last_pos] - expect(last_pos).to eq(expected_params[:last_pos]) - end - expect(binlog_pos).to eq(default_binlog_pos) - expect(substate).to eq(expected_params[:substate]) - expected_params[:result] - }.once - end - expect(check_point_block).to receive(:call).never - - binlog_pos = default_parser.parse( - create_table_block, insert_record_block, check_point_block - ) - expect(binlog_pos).to eq(default_binlog_pos) - end - end - - context 'when dump contains create table with records' do - let(:dump_content) { <<EOT -#{DUMP_HEADER} - -DROP TABLE IF EXISTS `users_login`; -/*!40101 SET @saved_cs_client = @@character_set_client */; -/*!40101 SET character_set_client = utf8 */; -CREATE TABLE `users_login` ( - `user_id` int(10) unsigned NOT NULL DEFAULT '0' COMMENT 'users id', - `login_count` int(10) unsigned DEFAULT '0' COMMENT 'login count', - `comment` varchar(255) DEFAULT NULL COMMENT 'comment', - `create_time` datetime NOT NULL COMMENT 'create time', - `update_time` datetime NOT NULL COMMENT 'update time', - PRIMARY KEY (`user_id`) -) ENGINE=InnoDB DEFAULT CHARSET=utf8; -/*!40101 SET character_set_client = @saved_cs_client */; - --- --- Dumping data for table `users_login` --- - -LOCK TABLES `users_login` WRITE; -/*!40000 ALTER TABLE `users_login` DISABLE KEYS */; -INSERT INTO `users_login` VALUES (15,46,'moodier','2001-10-02 08:18:08','1986-06-11 22:50:10'),(35,6,NULL,'1991-10-15 19:38:07','1970-10-01 22:03:10'),(52,33,'subfield','1972-08-23 20:16:08','1974-10-10 23:28:11'); -/*!40000 ALTER TABLE `users_login` ENABLE KEYS */; -UNLOCK TABLES; -/*!40103 SET TIME_ZONE=@OLD_TIME_ZONE */; - -EOT - } - before { generate_dump_file(dump_content) } - it do - # create_table_block - expect(create_table_block).to receive(:call) { |mysql_table| - expect(mysql_table.table_name).to eq('users_login') - expect(mysql_table.columns.keys).to eq(['user_id', 'login_count', 'comment', 'create_time', 'update_time']) - }.once - expect(create_table_block).to receive(:call).never - - # insert_record_block - [ - [ %w(15 46 moodier 2001-10-02\ 08:18:08 1986-06-11\ 22:50:10), - ['35', '6', nil, '1991-10-15 19:38:07', '1970-10-01 22:03:10'], - %w(52 33 subfield 1972-08-23\ 20:16:08 1974-10-10\ 23:28:11),], - ].each do |expected_values| - expect(insert_record_block).to receive(:call) { |mysql_table, values_set| - expect(mysql_table.table_name).to eq('users_login') - expect(values_set).to eq(expected_values) - nil - }.once - end - expect(insert_record_block).to receive(:call).never - - # insert_record_block - [ - {state: Flydata::Mysql::MysqlDumpParser::State::CREATE_TABLE}, - {state: Flydata::Mysql::MysqlDumpParser::State::INSERT_RECORD}, - {state: Flydata::Mysql::MysqlDumpParser::State::CREATE_TABLE, - last_pos: index_after(dump_content, 'UNLOCK TABLES;')} - ].each do |expected_params| - expect(check_point_block).to receive(:call) { |mysql_table, last_pos, binlog_pos, state, substate| - expect(mysql_table.table_name).to eq('users_login') if mysql_table - expect(state).to eq(expected_params[:state]) - if expected_params[:last_pos] - expect(last_pos).to eq(expected_params[:last_pos]) - end - expect(binlog_pos).to eq(default_binlog_pos) - expect(substate).to eq(expected_params[:substate]) - expected_params[:result] - }.once - end - expect(check_point_block).to receive(:call).never - - binlog_pos = default_parser.parse( - create_table_block, insert_record_block, check_point_block - ) - expect(binlog_pos).to eq(default_binlog_pos) - end - end - - context 'with resume after creating table' do - let(:dump_content_head) { <<EOT -#{DUMP_HEADER} - -DROP TABLE IF EXISTS `users_login`; -/*!40101 SET @saved_cs_client = @@character_set_client */; -/*!40101 SET character_set_client = utf8 */; -CREATE TABLE `users_login` ( - `user_id` int(10) unsigned NOT NULL DEFAULT '0' COMMENT 'users id', - `login_count` int(10) unsigned DEFAULT '0' COMMENT 'login count', - `comment` varchar(255) DEFAULT NULL COMMENT 'comment', - `create_time` datetime NOT NULL COMMENT 'create time', - `update_time` datetime NOT NULL COMMENT 'update time', - PRIMARY KEY (`user_id`) -) ENGINE=InnoDB DEFAULT CHARSET=utf8; -/*!40101 SET character_set_client = @saved_cs_client */; - -EOT - } - let(:dump_content_all) { <<EOT -#{dump_content_head} --- --- Dumping data for table `users_login` --- - -LOCK TABLES `users_login` WRITE; -/*!40000 ALTER TABLE `users_login` DISABLE KEYS */; -INSERT INTO `users_login` VALUES (15,46,'moodier','2001-10-02 08:18:08','1986-06-11 22:50:10'),(35,6,'missteer','1991-10-15 19:38:07','1970-10-01 22:03:10'),(52,33,'subfield','1972-08-23 20:16:08','1974-10-10 23:28:11'); -/*!40000 ALTER TABLE `users_login` ENABLE KEYS */; -UNLOCK TABLES; -/*!40103 SET TIME_ZONE=@OLD_TIME_ZONE */; -EOT - } - before do - generate_dump_file(dump_content_head) - default_parser.parse( - Proc.new{}, - Proc.new{ - raise "Should not be called" - }, - Proc.new{ |mysql_table, last_pos, binlog_pos, state, substate| - if mysql_table - @mysql_table = mysql_table - @option = { status: Flydata::Command::Sync::STATUS_PARSING, - table_name: mysql_table.table_name, - last_pos: last_pos, - binlog_pos: binlog_pos, - state: state, - substate: substate, - mysql_table: mysql_table } - end - } - ) - expect(@option).to eq({ - status: Flydata::Command::Sync::STATUS_PARSING, - table_name: 'users_login', - last_pos: index_after(dump_content_head, 'ENGINE=InnoDB DEFAULT CHARSET=utf8;'), - binlog_pos: default_binlog_pos, - state: Flydata::Mysql::MysqlDumpParser::State::INSERT_RECORD, - substate: nil, - mysql_table: @mysql_table - }) - end - it do - generate_dump_file(dump_content_all) - parser_for_resume = MysqlDumpParser.new(file_path, @option) - - # create_table_block - expect(create_table_block).to receive(:call).never - - # insert_record_block - [ - [ %w(15 46 moodier 2001-10-02\ 08:18:08 1986-06-11\ 22:50:10), - %w(35 6 missteer 1991-10-15\ 19:38:07 1970-10-01\ 22:03:10), - %w(52 33 subfield 1972-08-23\ 20:16:08 1974-10-10\ 23:28:11),], - ].each do |expected_values| - expect(insert_record_block).to receive(:call) { |mysql_table, values| - expect(mysql_table.table_name).to eq('users_login') - expect(values).to eq(expected_values) - nil - }.once - end - expect(insert_record_block).to receive(:call).never - - # check_point_block - [ - {state: Flydata::Mysql::MysqlDumpParser::State::CREATE_TABLE, - last_pos: index_after(dump_content_all, 'UNLOCK TABLES;')} - ].each do |expected_params| - expect(check_point_block).to receive(:call) { |mysql_table, last_pos, binlog_pos, state, substate| - expect(mysql_table.table_name).to eq('users_login') if mysql_table - expect(state).to eq(expected_params[:state]) - if expected_params[:last_pos] - expect(last_pos).to eq(expected_params[:last_pos]) - end - expect(binlog_pos).to eq(default_binlog_pos) - expect(substate).to eq(expected_params[:substate]) - expected_params[:result] - }.once - end - expect(check_point_block).to receive(:call).never - - binlog_pos = parser_for_resume.parse( - create_table_block, insert_record_block, check_point_block - ) - expect(binlog_pos).to eq(default_binlog_pos) - end - end - - context 'with resume during insert records' do - let(:dump_content) { <<EOT -#{DUMP_HEADER} - -DROP TABLE IF EXISTS `users_login`; -/*!40101 SET @saved_cs_client = @@character_set_client */; -/*!40101 SET character_set_client = utf8 */; -CREATE TABLE `users_login` ( - `user_id` int(10) unsigned NOT NULL DEFAULT '0' COMMENT 'users id', - `login_count` int(10) unsigned DEFAULT '0' COMMENT 'login count', - `comment` varchar(255) DEFAULT NULL COMMENT 'comment', - `create_time` datetime NOT NULL COMMENT 'create time', - `update_time` datetime NOT NULL COMMENT 'update time', - PRIMARY KEY (`user_id`) -) ENGINE=InnoDB DEFAULT CHARSET=utf8; -/*!40101 SET character_set_client = @saved_cs_client */; - --- --- Dumping data for table `users_login` --- - -LOCK TABLES `users_login` WRITE; -/*!40000 ALTER TABLE `users_login` DISABLE KEYS */; -INSERT INTO `users_login` VALUES (15,46,'moodier','2001-10-02 08:18:08','1986-06-11 22:50:10'),(35,6,'missteer','1991-10-15 19:38:07','1970-10-01 22:03:10'),(52,33,'subfield','1972-08-23 20:16:08','1974-10-10 23:28:11'); -INSERT INTO `users_login` VALUES (194,11,'pandemonium','2008-01-22 22:15:10','1991-04-04 17:30:05'),(230,7,'cloudburst','2010-12-28 11:46:11','1971-06-22 13:08:01'); -/*!40000 ALTER TABLE `users_login` ENABLE KEYS */; -UNLOCK TABLES; -/*!40103 SET TIME_ZONE=@OLD_TIME_ZONE */; -EOT - } - before do - generate_dump_file(dump_content) - - insert_record_block_for_resume = double('insert_record_block_for_resume') - expect(insert_record_block_for_resume).to receive(:call) { true }.once - expect(insert_record_block_for_resume).to receive(:call) { false }.once - - default_parser.parse( - Proc.new{}, - insert_record_block_for_resume, - Proc.new{ |mysql_table, last_pos, binlog_pos, state, substate| - if last_pos == index_after(dump_content, "'1972-08-23 20:16:08','1974-10-10 23:28:11');") - @mysql_table = mysql_table - @option = { status: Flydata::Command::Sync::STATUS_PARSING, - table_name: mysql_table.table_name, - last_pos: last_pos, - binlog_pos: binlog_pos, - state: state, - substate: substate, - mysql_table: mysql_table } - end - } - ) - expect(@option).to eq({ - status: Flydata::Command::Sync::STATUS_PARSING, - table_name: 'users_login', - last_pos: index_after(dump_content, "'1972-08-23 20:16:08','1974-10-10 23:28:11');"), - binlog_pos: default_binlog_pos, - state: Flydata::Mysql::MysqlDumpParser::State::INSERT_RECORD, - substate: nil, - mysql_table: @mysql_table - }) - end - it do - generate_dump_file(dump_content) - parser_for_resume = MysqlDumpParser.new(file_path, @option) - - # create_table_block - expect(create_table_block).to receive(:call).never - - # insert_record_block - [ - [ %w(194 11 pandemonium 2008-01-22\ 22:15:10 1991-04-04\ 17:30:05), - %w(230 7 cloudburst 2010-12-28\ 11:46:11 1971-06-22\ 13:08:01),], - ].each do |expected_values| - expect(insert_record_block).to receive(:call) { |mysql_table, values_set| - expect(mysql_table.table_name).to eq('users_login') - expect(values_set).to eq(expected_values) - nil - }.once - end - expect(insert_record_block).to receive(:call).never - - # check_point_block - [ - {state: Flydata::Mysql::MysqlDumpParser::State::CREATE_TABLE, - last_pos: index_after(dump_content, 'UNLOCK TABLES;')} - ].each do |expected_params| - expect(check_point_block).to receive(:call) { |mysql_table, last_pos, binlog_pos, state, substate| - expect(mysql_table.table_name).to eq('users_login') if mysql_table - expect(state).to eq(expected_params[:state]) - if expected_params[:last_pos] - expect(last_pos).to eq(expected_params[:last_pos]) - end - expect(binlog_pos).to eq(default_binlog_pos) - expect(substate).to eq(expected_params[:substate]) - expected_params[:result] - }.once - end - expect(check_point_block).to receive(:call).never - - binlog_pos = parser_for_resume.parse( - create_table_block, insert_record_block, check_point_block - ) - expect(binlog_pos).to eq(default_binlog_pos) - end - end - - context 'when dump contains contain escape characters' do - let(:dump_content) { <<EOT -#{DUMP_HEADER} - -DROP TABLE IF EXISTS `users_login`; -/*!40101 SET @saved_cs_client = @@character_set_client */; -/*!40101 SET character_set_client = utf8 */; -CREATE TABLE `users_login` ( - `user_id` int(10) unsigned NOT NULL DEFAULT '0' COMMENT 'users id', - `login_count` int(10) unsigned DEFAULT '0' COMMENT 'login count', - `comment` varchar(255) DEFAULT NULL COMMENT 'comment', - `create_time` datetime NOT NULL COMMENT 'create time', - `update_time` datetime NOT NULL COMMENT 'update time', - PRIMARY KEY (`user_id`) -) ENGINE=InnoDB DEFAULT CHARSET=utf8; -/*!40101 SET character_set_client = @saved_cs_client */; - --- --- Dumping data for table `users_login` --- - -LOCK TABLES `users_login` WRITE; -/*!40000 ALTER TABLE `users_login` DISABLE KEYS */; -INSERT INTO `users_login` VALUES (15,46,'moodier)','2001-10-02 08:18:08','1986-06-11 22:50:10'),(35,6,'miss,teer','1991-10-15 19:38:07','1970-10-01 22:03:10'),(52,33,'subfield\\'','1972-08-23 20:16:08','1974-10-10 23:28:11'); -INSERT INTO `users_login` VALUES (373,31,'outs\\nwearing','1979-10-07 08:10:08','2006-02-22 16:26:04'),(493,8,'schiz\tophrenic','1979-07-06 07:34:07\\'),','1970-08-09,01:21:01,\\')'); -/*!40000 ALTER TABLE `users_login` ENABLE KEYS */; -UNLOCK TABLES; -/*!40103 SET TIME_ZONE=@OLD_TIME_ZONE */; - -EOT - } - before { generate_dump_file(dump_content) } - it do - # create_table_block - expect(create_table_block).to receive(:call) { |mysql_table| - expect(mysql_table.table_name).to eq('users_login') - expect(mysql_table.columns.keys).to eq(['user_id', 'login_count', 'comment', 'create_time', 'update_time']) - }.once - expect(create_table_block).to receive(:call).never - - # insert_record_block - [ - [ %w(15 46 moodier\) 2001-10-02\ 08:18:08 1986-06-11\ 22:50:10), - %w(35 6 miss,teer 1991-10-15\ 19:38:07 1970-10-01\ 22:03:10), - %w(52 33 subfield' 1972-08-23\ 20:16:08 1974-10-10\ 23:28:11),], - [ ['373','31',"outs\nwearing",'1979-10-07 08:10:08','2006-02-22 16:26:04'], - ['493','8',"schiz\tophrenic",'1979-07-06 07:34:07\'),','1970-08-09,01:21:01,\')'] ] - ].each do |expected_values| - expect(insert_record_block).to receive(:call) { |mysql_table, values_set| - expect(mysql_table.table_name).to eq('users_login') - expect(values_set).to eq(expected_values) - nil - }.once - end - expect(insert_record_block).to receive(:call).never - - # insert_record_block - [ - {state: Flydata::Mysql::MysqlDumpParser::State::CREATE_TABLE}, - {state: Flydata::Mysql::MysqlDumpParser::State::INSERT_RECORD}, - {state: Flydata::Mysql::MysqlDumpParser::State::CREATE_TABLE, - last_pos: index_after(dump_content, 'UNLOCK TABLES;')} - ].each do |expected_params| - expect(check_point_block).to receive(:call) { |mysql_table, last_pos, binlog_pos, state, substate| - expect(mysql_table.table_name).to eq('users_login') if mysql_table - expect(state).to eq(expected_params[:state]) - if expected_params[:last_pos] - expect(last_pos).to eq(expected_params[:last_pos]) - end - expect(binlog_pos).to eq(default_binlog_pos) - expect(substate).to eq(expected_params[:substate]) - expected_params[:result] - }.once - end - expect(check_point_block).to receive(:call).never - - binlog_pos = default_parser.parse( - create_table_block, insert_record_block, check_point_block - ) - expect(binlog_pos).to eq(default_binlog_pos) - end - end - end - end - describe MysqlDumpParser::InsertParser do - describe '#parse' do - let(:target_line) { '' } - let(:parser) { MysqlDumpParser::InsertParser.new(StringIO.new(target_line)) } - subject { parser.parse } - - context 'when single record' do - let(:target_line) { "INSERT INTO `test_table` VALUES (1,'hogehoge','2014-04-15 13:49:14');" } - it 'should return one element array' do - expect(subject).to eq([['1','hogehoge','2014-04-15 13:49:14']]) - end - end - - context 'when 2 records' do - let(:target_line) { "INSERT INTO `test_table` VALUES (1,'foo','2014-04-15 13:49:14'),(2,'var','2014-04-15 14:21:05');" } - it 'should return two element array' do - expect(subject).to eq([['1','foo','2014-04-15 13:49:14'],['2','var','2014-04-15 14:21:05']]) - end - end - - context 'when data includes carriage return' do - let(:target_line) { %q|INSERT INTO `test_table` VALUES (1,'abcd\refgh','2014-04-15 13:49:14');| } - it 'should escape return carriage' do - expect(subject).to eq([['1',"abcd\refgh",'2014-04-15 13:49:14']]) - end - end - - context 'when data includes new line' do - let(:target_line) { %q|INSERT INTO `test_table` VALUES (1,'abcd\nefgh','2014-04-15 13:49:14');| } - it 'should escape new line' do - expect(subject).to eq([['1',"abcd\nefgh",'2014-04-15 13:49:14']]) - end - end - - context 'when data includes escaped single quotation' do - let(:target_line) { %q|INSERT INTO `test_table` VALUES (1,'abcd\',\'efgh','2014-04-15 13:49:14');| } - it 'should escape single quotation' do - expect(subject).to eq([['1',"abcd','efgh",'2014-04-15 13:49:14']]) - end - end - - context 'when data includes back slash' do - let(:target_line) { %q|INSERT INTO `test_table` VALUES (1,'abcd\\efgh','2014-04-15 13:49:14');| } - it 'should escape back slash' do - expect(subject).to eq([['1',"abcd\\efgh",'2014-04-15 13:49:14']]) - end - end - - context 'when data includes back slash with double quote' do - # \"\'\' value on insert query shoulde be "'' - let(:target_line) { %q|INSERT INTO `tast_table` VALUES (1,'\"\'\'','2014-04-15 13:49:14');| } - it 'should escape back slash' do - expect(subject).to eq([['1',%q|"''| ,'2014-04-15 13:49:14']]) - end - end - - context 'when data includes back slash with double quote another case' do - # \"\"\"\"\"''\"'' value on insert query shoulde be """""''"'' - let(:target_line) { %q|INSERT INTO `test_table` VALUES (1,'\"\"\"\"\"\'\'\"\'\'','2014-04-15 13:49:14');| } - it 'should escape back slash' do - expect(subject).to eq([['1',%q|"""""''"''|,'2014-04-15 13:49:14']]) - end - end - - context 'when data includes mixed escaped characters' do - let(:target_line) { %q|INSERT INTO `test_table` VALUES (1,'ab\rcd\\e\nf\',\'gh','2014-04-15 13:49:14');| } - it 'should escape all' do - expect(subject).to eq([['1',"ab\rcd\\e\nf','gh",'2014-04-15 13:49:14']]) - end - end - - context 'when data includes mixed escaped characters in a row' do - let(:target_line) { "INSERT INTO `test_table` VALUES (1,'ab\\\\ncd\\\\\\nefgh','2014-04-15 13:49:14');" } - it 'should escape all' do - expect(subject).to eq([['1',"ab\\ncd\\\nefgh",'2014-04-15 13:49:14']]) - end - end - - context 'when data end with back slash' do - let(:target_line) { "INSERT INTO `test_table` VALUES (1,'D:\\\\download\\\\','2014-04-15 13:49:14');" } - it 'should escape back slash' do - expect(subject).to eq([['1',"D:\\download\\",'2014-04-15 13:49:14']]) - end - end - - context 'when comma is the first character of a string' do - let(:target_line) { "INSERT INTO `test_table` VALUES (1,',9','2014-04-15 13:49:14');" } - it 'should parse the string correctly' do - expect(subject).to eq([['1',',9','2014-04-15 13:49:14']]) - end - end - - context 'when comma is the last character of a string' do - let(:target_line) { "INSERT INTO `test_table` VALUES (1,'9,','2014-04-15 13:49:14');" } - it 'should parse the string correctly' do - expect(subject).to eq([['1','9,','2014-04-15 13:49:14']]) - end - end - - context 'when a string consists of a comma' do - let(:target_line) { "INSERT INTO `test_table` VALUES (1,',','2014-04-15 13:49:14');" } - it 'should parse the string correctly' do - expect(subject).to eq([['1',',','2014-04-15 13:49:14']]) - end - end - - context 'when two commas are given as a string' do - let(:target_line) { "INSERT INTO `test_table` VALUES (1,',,','2014-04-15 13:49:14');" } - it 'should parse the string correctly' do - expect(subject).to eq([['1',',,','2014-04-15 13:49:14']]) - end - end - - context 'when an empty string value is given' do - let(:target_line) { "INSERT INTO `test_table` VALUES (1,'','2014-04-15 13:49:14');" } - it 'should parse the string correctly' do - expect(subject).to eq([['1','','2014-04-15 13:49:14']]) - end - end - - context 'when a value consists of a comma followed by closing bracket' do - let(:target_line) { "INSERT INTO `test_table` VALUES (1,'),ksd','2014-04-15 13:49:14');" } - it 'should parse the string correctly' do - expect(subject).to eq([['1','),ksd','2014-04-15 13:49:14']]) - end - end - - context 'when there is a white space before the closing bracket' do - let(:target_line) { "INSERT INTO `test_table` VALUES (1,'aa','2014-04-15 13:49:14' );" } - it 'should fail to parse. This is intentional for performance reason' do - expect{subject}.to raise_error - end - end - - context 'when an integer that has leading zeros is given' do - let(:target_line) {"INSERT INTO `test_table` VALUES (1,00013);"} - it 'should remove the leading zeros' do - expect(subject).to eq([['1', '13']]) - end - end - - context 'when a decimal that has leading zeros is given' do - let(:target_line) {"INSERT INTO `test_table` VALUES (1,00013.40);"} - it 'should remove the leading zeros' do - expect(subject).to eq([['1', '13.40']]) - end - end - - context 'when a timestamp that has leading zeros is given' do - let(:target_line) {"INSERT INTO `test_table` VALUES (1,'0004-04-15 13:49:14');"} - it 'should not remove the leading zeros' do - expect(subject).to eq([['1', '0004-04-15 13:49:14']]) - end - end - - context 'when a string that has leading zeros is given' do - let(:target_line) {"INSERT INTO `test_table` VALUES (1,'00000aa');"} - it 'should not remove the leading zeros' do - expect(subject).to eq([['1', '00000aa']]) - end - end - - context 'when a string that has leading zeros, numbers and a comma in between is given' do - let(:target_line) {"INSERT INTO `test_table` VALUES (1,'0003,00033');"} - it 'should not remove the leading zeros' do - expect(subject).to eq([['1', '0003,00033']]) - end - end - - context 'when a string that has leading zeros, has only numbers is given' do - let(:target_line) {"INSERT INTO `test_table` VALUES (1,'00033');"} - it 'should not remove the leading zeros' do - expect(subject).to eq([['1', '00033']]) - end - end - - context 'when 0 is given' do - let(:target_line) {"INSERT INTO `test_table` VALUES (1,0);"} - it 'should not remove the leading zeros' do - expect(subject).to eq([['1', '0']]) - end - end - - context 'when 0.00 is given' do - let(:target_line) {"INSERT INTO `test_table` VALUES (1,0.00);"} - it 'should not remove the leading zeros' do - expect(subject).to eq([['1', '0.00']]) - end - end - end - end - end - end