# coding: utf-8 require 'spec_helper' module Flydata module Command describe Sync do let(:default_mysqldump_dir) do File.join('/tmp', "sync_dump_#{Time.now.to_i}") end let(:default_data_entry) do {"id"=>93, "name"=>"flydata_sync_mysql", "data_port_id"=>52, "display_name"=>"flydata_sync_mysql", "enabled"=>true, "heroku_log_type"=>nil, "heroku_resource_id"=>nil, "log_deletion"=>nil, "log_file_delimiter"=>nil, "log_file_type"=>nil, "log_path"=>nil, "redshift_schema_name"=>"", "redshift_table_name"=>nil, "created_at"=>"2014-01-22T18:58:43Z", "updated_at"=>"2014-01-30T02:42:26Z", "type"=>"RedshiftMysqlDataEntry", "tag_name"=>"flydata.a458c641_dp52.flydata_mysql", "tag_name_dev"=>"flydata.a458c641_dp52.flydata_mysql.dev", "data_port_key"=>"a458c641", "mysql_data_entry_preference" => { "host"=>"localhost", "port"=>3306, "username"=>"masashi", "password"=>"welcome", "database"=>"sync_test", "tables"=>"table1, table2", "mysqldump_dir"=>default_mysqldump_dir, "forwarder" => "tcpforwarder", "data_servers"=>"localhost:9905" } } end after :each do if Dir.exists?(default_mysqldump_dir) Dir.delete(default_mysqldump_dir) rescue nil end if File.exists?(default_mysqldump_dir) File.delete(default_mysqldump_dir) rescue nil end end describe '#do_generate_table_ddl' do subject { Sync.new } context 'with full options' do it 'issues mysqldump command with expected parameters' do expect(Open3).to receive(:popen3).with( 'mysqldump --protocol=tcp -d -h localhost -P 3306 -u masashi -pwelcome sync_test table1 table2').and_call_original subject.send(:do_generate_table_ddl, default_data_entry) end end context 'without_host' do before do default_data_entry['mysql_data_entry_preference'].delete('host') end it "throws an error" do expect { subject.send(:do_generate_table_ddl, default_data_entry) }.to raise_error end end context 'without_port' do before do default_data_entry['mysql_data_entry_preference'].delete('port') end it "uses the default port" do expect(Open3).to receive(:popen3).with( 'mysqldump --protocol=tcp -d -h localhost -P 3306 -u masashi -pwelcome sync_test table1 table2').and_call_original subject.send(:do_generate_table_ddl, default_data_entry) end end context 'with_port_override' do before do default_data_entry['mysql_data_entry_preference']['port'] = 1234 end it "uses the specified port" do expect(Open3).to receive(:popen3).with( 'mysqldump --protocol=tcp -d -h localhost -P 1234 -u masashi -pwelcome sync_test table1 table2').and_call_original subject.send(:do_generate_table_ddl, default_data_entry) end end context 'without_username' do before do default_data_entry['mysql_data_entry_preference'].delete('username') end it "throws an error" do expect { subject.send(:do_generate_table_ddl, default_data_entry) }.to raise_error end end context 'without_password' do before do default_data_entry['mysql_data_entry_preference'].delete('password') end it "call mysqldump without -p option" do expect(Open3).to receive(:popen3).with( 'mysqldump --protocol=tcp -d -h localhost -P 3306 -u masashi sync_test table1 table2').and_call_original subject.send(:do_generate_table_ddl, default_data_entry) end end context 'without_database' do before do default_data_entry['mysql_data_entry_preference'].delete('database') end it "throws an error" do expect { subject.send(:do_generate_table_ddl, default_data_entry) }.to raise_error end end context 'without_tables' do before do default_data_entry['mysql_data_entry_preference'].delete('tables') end it "throws an error" do expect { subject.send(:do_generate_table_ddl, default_data_entry) }.to raise_error end end end end end module FileUtil describe SyncFileManager do let(:default_mysqldump_dir) do File.join('/tmp', "sync_dump_#{Time.now.to_i}") end let(:default_data_entry) do {"id"=>93, "name"=>"flydata_sync_mysql", "data_port_id"=>52, "display_name"=>"flydata_sync_mysql", "enabled"=>true, "heroku_log_type"=>nil, "heroku_resource_id"=>nil, "log_deletion"=>nil, "log_file_delimiter"=>nil, "log_file_type"=>nil, "log_path"=>nil, "redshift_schema_name"=>"", "redshift_table_name"=>nil, "created_at"=>"2014-01-22T18:58:43Z", "updated_at"=>"2014-01-30T02:42:26Z", "type"=>"RedshiftMysqlDataEntry", "tag_name"=>"flydata.a458c641_dp52.flydata_mysql", "tag_name_dev"=>"flydata.a458c641_dp52.flydata_mysql.dev", "data_port_key"=>"a458c641", "mysql_data_entry_preference" => { "host"=>"localhost", "port"=>3306, "username"=>"masashi", "password"=>"welcome", "database"=>"sync_test", "tables"=>nil, "mysqldump_dir"=>default_mysqldump_dir, "forwarder" => "tcpforwarder", "data_servers"=>"localhost:9905" } } end let(:default_sfm) { SyncFileManager.new(default_data_entry) } let (:status) { 'PARSING' } let (:table_name) { 'test_table' } let (:last_pos) { 9999 } let (:binlog_pos) { {binfile: 'mysqlbin.00001', pos: 111} } let (:state) { 'CREATE_TABLE' } let (:substate) { nil } after :each do if Dir.exists?(default_mysqldump_dir) Dir.delete(default_mysqldump_dir) rescue nil end if File.exists?(default_mysqldump_dir) File.delete(default_mysqldump_dir) rescue nil end end describe '#dump_file_path' do context 'when mysqldump_dir param is nil' do before do default_data_entry['mysql_data_entry_preference'].delete('mysqldump_dir') end it do expect(default_sfm.dump_file_path).to eq( File.join(ENV['HOME'], '.flydata', 'dump', 'flydata_sync_mysql.dump')) end end context 'when file exists' do before { `touch #{default_mysqldump_dir}`} it do expect{default_sfm.dump_file_path}.to raise_error end end context 'when directory exists' do before { `mkdir -p #{default_mysqldump_dir}`} it do expect(FileUtils).to receive(:mkdir_p).with(default_mysqldump_dir).never expect(default_sfm.dump_file_path).to eq( File.join(default_mysqldump_dir, 'flydata_sync_mysql.dump')) end end context 'when directory or file does not exist' do it do expect(FileUtils).to receive(:mkdir_p).with(default_mysqldump_dir).once expect(default_sfm.dump_file_path).to eq( File.join(default_mysqldump_dir, 'flydata_sync_mysql.dump')) end end context 'when file name includes "~"' do let(:default_mysqldump_dir) { "~/tmp/dump/sync_spec_#{Time.now.to_i}" } it do expected_dir = File.join(ENV['HOME'], default_mysqldump_dir[1..-1]) expect(FileUtils).to receive(:mkdir_p).with(expected_dir).once expect(default_sfm.dump_file_path).to eq( File.join(expected_dir, 'flydata_sync_mysql.dump')) end end end describe '#dump_pos_path' do it { expect(default_sfm.dump_pos_path).to eq( File.join(default_mysqldump_dir, 'flydata_sync_mysql.dump.pos')) } end describe '#save_dump_pos' do context 'without mysql marshal data' do it do expect{default_sfm.save_dump_pos( status, table_name, last_pos, binlog_pos, state, substate)}.not_to raise_error expect(default_sfm.load_dump_pos).to eq({ status: status, table_name: table_name, last_pos: last_pos, binlog_pos: binlog_pos, state: state, substate: substate, mysql_table: nil }) end end end describe '#load_dump_pos' do let (:mysql_table) do Flydata::Mysql::MysqlTable.new( table_name, { 'id' => { format_type: 'int' }, 'value' => { format_type: 'text' } } ) end context 'with mysql marshal data' do before do default_sfm.save_mysql_table_marshal_dump(mysql_table) default_sfm.save_dump_pos(status, table_name, last_pos, binlog_pos, state, substate) end it do ret = default_sfm.load_dump_pos mt = ret.delete(:mysql_table) expect(ret).to eq({ status: status, table_name: table_name, last_pos: last_pos, binlog_pos: binlog_pos, state: state, substate: substate, }) expect(mt.table_name).to eq(table_name) expect(mt.columns).to eq({ 'id' => { format_type: 'int' }, 'value' => { format_type: 'text' }, }) end end end describe '#save_binlog' do let(:binfile) { 'mysqlbinlog.000001' } let(:pos) { 107 } let(:binlog_pos) { {binfile: binfile, pos: pos} } it do default_sfm.save_binlog(binlog_pos) expect(`cat #{default_sfm.binlog_path}`).to eq("#{binfile}\t#{pos}") end end describe '#binlog_path' do it { expect(default_sfm.binlog_path).to eq("#{FLYDATA_HOME}/flydata_sync_mysql.binlog.pos") } end end end module Output describe ForwarderFactory do let(:forwarder) do ForwarderFactory.create('tcpforwarder', 'test_tag', ['localhost:3456', 'localhost:4567']) end before :each do conn = double(TCPSocket) allow(conn).to receive(:setsockopt) allow(conn).to receive(:write) allow(conn).to receive(:close) allow(TCPSocket).to receive(:new).and_return(conn) allow(StringIO).to receive(:open) end describe '.create' do context 'with nil forwarder_key' do it 'should return TcpForwarder object' do forwarder = ForwarderFactory.create(nil, 'test_tag', ['localhost:3456', 'localhost:4567']) expect(forwarder.kind_of?(TcpForwarder)).to be_truthy end end context 'with tcpforwarder forwarder_key' do it 'should return TcpForwarder object' do forwarder = ForwarderFactory.create('tcpforwarder', 'test_tag', ['localhost:3456', 'localhost:4567']) expect(forwarder.kind_of?(TcpForwarder)).to be_truthy end end context 'with sslforwarder forwarder_key' do it 'should return SslForwarder object' do forwarder = ForwarderFactory.create('sslforwarder', 'test_tag', ['localhost:3456', 'localhost:4567']) expect(forwarder.kind_of?(SslForwarder)).to be_truthy end end end describe '#initialize' do context 'servers is empty' do it do expect{ForwarderFactory.create('tcpforwarder', 'test_tag', [])}.to raise_error end end context 'servers is nil' do it do expect{ForwarderFactory.create('tcpforwarder', 'test_tag', nil)}.to raise_error end end end describe '#emit' do let(:record) { {table_name: 'test_table_name', log: '{"key":"value"}'} } before :each do forwarder.set_options({buffer_size_limit: ([Time.now.to_i,record].to_msgpack.bytesize * 2.5)}) end context 'when the buffer size is less than threthold' do it do expect(forwarder.emit(record)).to be(false) expect(forwarder.buffer_record_count).to be(1) end end context 'when the buffer size exceeds threthold' do it do expect(forwarder.emit(record)).to be(false) expect(forwarder.emit(record)).to be(false) expect(forwarder.buffer_record_count).to be(2) expect(forwarder.emit(record)).to be(true) expect(forwarder.buffer_record_count).to be(0) end end end describe '#pickup_server' do context 'with only one server' do let(:servers) { ['localhost:1111'] } let(:forwarder) { ForwarderFactory.create('tcpforwarder', 'test_tag', servers) } it 'expect to return same server' do expect(forwarder.pickup_server).to eq('localhost:1111') expect(forwarder.pickup_server).to eq('localhost:1111') expect(forwarder.pickup_server).to eq('localhost:1111') end end context 'with servers' do let(:servers) { ['localhost:1111', 'localhost:2222', 'localhost:3333'] } let(:forwarder) { ForwarderFactory.create('tcpforwarder', 'test_tag', servers) } it 'expect to return same server' do expect(forwarder.pickup_server).to eq('localhost:1111') expect(forwarder.pickup_server).to eq('localhost:2222') expect(forwarder.pickup_server).to eq('localhost:3333') expect(forwarder.pickup_server).to eq('localhost:1111') end end end end end module Mysql describe MysqlDumpGeneratorMasterData do let(:stdout) { double(:stdout) } let(:stderr) { double(:stderr) } let(:status) { double(:status) } let(:file_path) { File.join('/tmp', "flydata_sync_spec_mysqldump_#{Time.now.to_i}") } let(:default_conf) { { 'host' => 'localhost', 'port' => 3306, 'username' => 'admin', 'password' => 'pass', 'database' => 'dev', 'tables' => 'users,groups', } } let(:default_dump_generator) { MysqlDumpGeneratorMasterData.new(default_conf) } describe '#initialize' do context 'with password' do subject { default_dump_generator.instance_variable_get(:@dump_cmd) } it { is_expected.to eq('mysqldump --protocol=tcp -h localhost -P 3306 -uadmin -ppass --skip-lock-tables ' + '--single-transaction --hex-blob --flush-logs --master-data=2 dev users groups') } end context 'without password' do let (:dump_generator) do MysqlDumpGeneratorMasterData.new(default_conf.merge({'password' => ''})) end subject { dump_generator.instance_variable_get(:@dump_cmd) } it { is_expected.to eq('mysqldump --protocol=tcp -h localhost -P 3306 -uadmin --skip-lock-tables ' + '--single-transaction --hex-blob --flush-logs --master-data=2 dev users groups') } end end describe '#dump' do context 'when exit status is not 0' do before do `touch #{file_path}` expect(status).to receive(:exitstatus).and_return 1 expect(Open3).to receive(:capture3).and_return( ['(dummy std out)', '(dummy std err)', status] ) end it do expect{ default_dump_generator.dump(file_path) }.to raise_error expect(File.exists?(file_path)).to be_falsey end end context 'when exit status is 0 but no file' do before do expect(status).to receive(:exitstatus).and_return 0 expect(Open3).to receive(:capture3).and_return( ['(dummy std out)', '(dummy std err)', status] ) end it do expect{ default_dump_generator.dump(file_path) }.to raise_error expect(File.exists?(file_path)).to be_falsey end end context 'when exit status is 0 but file size is 0' do before do `touch #{file_path}` expect(status).to receive(:exitstatus).and_return 0 expect(Open3).to receive(:capture3).and_return( ['(dummy std out)', '(dummy std err)', status] ) end it do expect{ default_dump_generator.dump(file_path) }.to raise_error expect(File.exists?(file_path)).to be_truthy end end context 'when exit status is 0' do before do `echo "something..." > #{file_path}` expect(status).to receive(:exitstatus).and_return 0 expect(Open3).to receive(:capture3).and_return( ['(dummy std out)', '(dummy std err)', status] ) end it do expect(default_dump_generator.dump(file_path)).to be_truthy expect(File.exists?(file_path)).to be_truthy end end after :each do File.delete(file_path) if File.exists?(file_path) end end end describe MysqlDumpParser do let(:file_path) { File.join('/tmp', "flydata_sync_spec_mysqldump_parse_#{Time.now.to_i}") } let(:default_parser) { MysqlDumpParser.new(file_path) } def generate_dump_file(content) File.open(file_path, 'w') {|f| f.write(content)} end after do File.delete(file_path) if File.exists?(file_path) end describe '#initialize' do context 'when file does not exist' do it do expect{ MysqlDumpParser.new(file_path) }.to raise_error end end context 'when file exists' do before { generate_dump_file('') } it do expect(MysqlDumpParser.new(file_path)).to be_an_instance_of(MysqlDumpParser) end end end describe '#parse' do DUMP_HEADER = <--5.6.13-log /*!40101 SET @OLD_CHARACTER_SET_CLIENT=@@CHARACTER_SET_CLIENT */; /*!40101 SET @OLD_CHARACTER_SET_RESULTS=@@CHARACTER_SET_RESULTS */; /*!40101 SET @OLD_COLLATION_CONNECTION=@@COLLATION_CONNECTION */; /*!40101 SET NAMES utf8 */; /*!40103 SET @OLD_TIME_ZONE=@@TIME_ZONE */; /*!40103 SET TIME_ZONE='+00:00' */; /*!40014 SET @OLD_UNIQUE_CHECKS=@@UNIQUE_CHECKS, UNIQUE_CHECKS=0 */; /*!40014 SET @OLD_FOREIGN_KEY_CHECKS=@@FOREIGN_KEY_CHECKS, FOREIGN_KEY_CHECKS=0 */; /*!40101 SET @OLD_SQL_MODE=@@SQL_MODE, SQL_MODE='NO_AUTO_VALUE_ON_ZERO' */; /*!40111 SET @OLD_SQL_NOTES=@@SQL_NOTES, SQL_NOTES=0 */; -- -- Position to start replication or point-in-time recovery from -- -- CHANGE MASTER TO MASTER_LOG_FILE='mysql-bin.000267', MASTER_LOG_POS=120; EOT def index_after(content, string) content.index(string) + string.bytesize + 1 end let(:default_parser) { MysqlDumpParser.new(file_path) } let(:default_binlog_pos) { {binfile: 'mysql-bin.000267', pos: 120 } } let(:dump_pos_after_binlog_pos) { index_after(DUMP_HEADER, 'MASTER_LOG_POS=120;') } let(:create_table_block) { double('create_table_block') } let(:insert_record_block) { double('insert_record_block') } let(:check_point_block) { double('check_point_block') } before do generate_dump_file('') end context 'when dump does not contain binlog pos' do before { generate_dump_file('dummy content') } it do expect(create_table_block).to receive(:call).never expect(insert_record_block).to receive(:call).never expect(check_point_block).to receive(:call).never binlog_pos = default_parser.parse( create_table_block, insert_record_block, check_point_block ) expect(binlog_pos).to be_nil end end context 'when dump contains only binlog pos' do before { generate_dump_file(<