# coding: utf-8 require 'spec_helper' require 'flydata/command/sync' module Flydata module Command describe Sync do subject { subject_object } let(:subject_object){ described_class.new } let(:default_dump_dir) do File.join('/tmp', "sync_dump_#{Time.now.to_i}") end let(:default_data_entry) do {"id"=>93, "name"=>"flydata_sync_mysql", "data_port_id"=>52, "display_name"=>"flydata_sync_mysql", "enabled"=>true, "heroku_log_type"=>nil, "heroku_resource_id"=>nil, "log_deletion"=>nil, "log_file_delimiter"=>nil, "log_file_type"=>nil, "log_path"=>nil, "redshift_schema_name"=>"", "redshift_table_name"=>nil, "created_at"=>"2014-01-22T18:58:43Z", "updated_at"=>"2014-01-30T02:42:26Z", "type"=>"RedshiftMysqlDataEntry", "tag_name"=>"flydata.a458c641_dp52.flydata_mysql", "tag_name_dev"=>"flydata.a458c641_dp52.flydata_mysql.dev", "data_port_key"=>"a458c641", "mysql_data_entry_preference" => { "host"=>"localhost", "port"=>3306, "username"=>"masashi", "password"=>"welcome", "database"=>"sync_test", "tables"=>["table1", "table2", "table4"], "invalid_tables"=>["table3"], "new_tables"=>["table4"], "dump_dir"=>default_dump_dir, "forwarder" => "tcpforwarder", "data_servers"=>"localhost:9905" } } end let(:col4_type) { "binary" } let(:col4_width) { 34 } let(:col4_type_str) { "#{col4_type}(#{col4_width})" } let(:mysql_table_columns) { {"id"=>{:column_name=>"id", :format_type_str=>"int(11)", :format_type=>"int", :format_size=>11}, "name"=>{:column_name=>"name", :format_type_str=>"varchar(40)", :format_type=>"varchar", :format_size=>40, :default=>nil}, "created_at"=>{:column_name=>"created_at", :format_type_str=>"timestamp", :format_type=>"timestamp", :default=>"CURRENT_TIMESTAMP"}, "bin"=>{:column_name=>"bin", :format_type_str=>col4_type_str, :format_type=>col4_type, :format_size=>col4_width, :default=>nil}, "bin2"=>{:column_name=>"bin2", :format_type_str=>"blob", :format_type=>"blob"}, "varbin"=>{:column_name=>"varbin", :format_type_str=>"varchar(34)", :format_type=>"varchar", :format_size=>34, :default=>nil}} } after :each do if Dir.exists?(default_dump_dir) Dir.delete(default_dump_dir) rescue nil end if File.exists?(default_dump_dir) File.delete(default_dump_dir) rescue nil end end describe '#cleanup_sync_server' do let(:rest_client) { double('rest_client') } context "when rest client throws timeout errors" do before do allow(RestClient::Resource).to receive(:new).and_return(rest_client) end it "should retry in case of RestClient::RequestTimeout" do expect(rest_client).to receive(:post).and_raise(RestClient::RequestTimeout) expect(rest_client).to receive(:post).and_return("{}") subject.send(:cleanup_sync_server, default_data_entry) end it "should retry in case of RestClient::GatewayTimeout" do expect(rest_client).to receive(:post).and_raise(RestClient::GatewayTimeout) expect(rest_client).to receive(:post).and_return("{}") subject.send(:cleanup_sync_server, default_data_entry) end end end describe '#generate_source_dump' do let (:flydata) { double('flydata') } let (:dp) { double('dp') } let (:default_data_port) { double('default_data_port') } let (:default_sync_fm) { double('default_sync_fm') } let (:default_dump_pos) do { status: "DO_NOT_SKIP_DUMP" } end let (:default_fp) { double('default_fp') } let (:default_backup) { double('default_backup') } let (:target_tables) { ["test_table_1"] } let (:db_byte) { 1 } let (:disk_byte) { 100 } before do require 'flydata/source_mysql/generate_source_dump' allow_any_instance_of(Flydata::SourceMysql::GenerateSourceDump).to receive(:dump_size).and_return(db_byte) allow_any_instance_of(Flydata::SourceMysql::GenerateSourceDump).to receive(:run_compatibility_check) end before do expect(subject).to receive(:flydata).and_return(flydata).at_least(:once) allow(subject).to receive(:data_entry).and_return(default_data_entry) expect(flydata).to receive(:data_port).and_return(dp) expect(dp).to receive(:get).and_return(default_data_port) allow(File).to receive(:exists?).and_return(false) expect(default_sync_fm).to receive(:load_dump_pos).and_return(default_dump_pos) expect(default_sync_fm).to receive(:dump_file_path).and_return(default_fp) expect(default_sync_fm).to receive(:backup_dir).and_return(default_backup).at_least(:once) expect(subject).to receive(:target_tables).and_return(target_tables).at_least(:once) expect(subject).to receive(:print) expect(subject).to receive(:log_info) expect(subject).to receive(:log_info_stdout).at_least(:once) expect(subject).to receive(:ask_yes_no).and_return(true).at_least(:once) expect_any_instance_of(FlydataCore::Event::ApiEventSender).to receive(:send_event).once end context 'with no stream option' do before do expect(default_sync_fm).to receive(:save_sync_info).once expect(subject).to receive(:free_disk_space).and_return(disk_byte) expect(File).to receive(:dirname) end it 'will export to dump file' do expect(subject).to receive(:call_block_or_return_io) expect_any_instance_of(Flydata::SourceMysql::GenerateSourceDump). to receive(:dump).with(target_tables, default_fp) subject.send(:generate_source_dump, default_data_entry, default_sync_fm) end it 'will remove dump file on interrupt' do expect(default_sync_fm).to receive(:delete_dump_file) expect_any_instance_of(Flydata::SourceMysql::Parser::MysqlDumpGeneratorNoMasterData).to receive(:dump).and_raise(Interrupt) expect { subject.send(:generate_source_dump, default_data_entry, default_sync_fm) }.to raise_error end end context 'with stream option' do it 'will export to io' do expect(default_sync_fm).to receive(:save_sync_info).once expect_any_instance_of(Flydata::SourceMysql::Parser::MysqlDumpGeneratorNoMasterData).to receive(:dump) subject.send(:generate_source_dump, default_data_entry, default_sync_fm, false) end end end describe '#convert_to_flydata_values' do subject { subject_object.send(:convert_to_flydata_values, source_table, values) } let(:values) { [4, 'John', nil, col4_value, nil, nil] } let(:source_table) do Flydata::Parser::SourceTable.new("test_table", mysql_table_columns) end before do require 'flydata/parser/source_table' end before do source_table.set_value_converters(FlydataCore::TableDef::MysqlTableDef::VALUE_CONVERTERS) end context 'with binary column' do let(:col4_type) { "binary" } let(:col4_width) { 5 } let(:truncated_binary) { "0xC04482" } let(:col4_value) { "#{truncated_binary}0000" } it 'truncates trailing "0" if the type is binary' do expected_values = values.dup expected_values[3] = truncated_binary subject expect(values).to eq expected_values end end end describe '#data_entry' do subject { subject_object.send(:data_entry) } let(:de) { { 'mysql_data_entry_preference' => mp } } let(:mp) { { 'tables' => default_tables_str, 'table_attributes' => default_table_attributes, 'pk_override' => pk_override_hash } } let(:default_tables_str) { 'Users,Addresses' } let(:default_table_attributes) {[ {"table_name"=>"Users", "status"=>"init_sync_pending"}, {"table_name"=>"Addresses", "status"=>"init_sync_pending"} ]} let(:append_only_tables_list) { %w|Invoices Sessions Addresses| } let(:append_only_tables_str) { 'Invoices,Sessions,Addresses' } let(:tbl_attrs_for_append_only_tables) { [ {"table_name"=>"Invoices", "omit_events"=>["delete"], "status"=>"init_sync_pending"}, {"table_name"=>"Sessions", "omit_events"=>["delete"], "status"=>"init_sync_pending"}, {"table_name"=>"Addresses", "omit_events"=>["delete"], "status"=>"init_sync_pending"} ] } let(:invalid_tables_list) { %w|error_fullsync_1 error_append_2| } let(:invalid_tables_str) { 'error_fullsync_1,error_append_2' } let(:tbl_attrs_for_invalid_tables) {[ {"table_name"=>"error_fullsync_1", "status"=>"init_sync_pending", "invalid_table_reason"=>"No primary key defined"}, {"table_name"=>"error_append_2", "omit_events"=>["delete"], "status"=>"init_sync_pending", "invalid_table_reason"=>"Table does not exist in the MySQL database"}, ]} let(:pk_override_hash) { {"Users"=>["id"]} } let(:sfm) { double('sfm') } let(:ssl_ca_content) { double('ssl_ca_content') } let(:ssl_ca_path) { double('ssl_ca_path') } before do allow(subject_object).to receive(:retrieve_data_entries). and_return([de]) end context 'type RedshiftMysqlDataEntry' do before { de['type'] = 'RedshiftMysqlDataEntry' } context 'called once' do before do expect(subject_object).to receive(:retrieve_data_entries). and_return([de]) end context 'without tables_append_only' do it "expands a table list string to an array of tables" do subject expect(mp['tables']).to eq %w(Users Addresses) end end context 'with tables_append_only' do before do mp['tables_append_only'] = append_only_tables_str mp['table_attributes'] = default_table_attributes + tbl_attrs_for_append_only_tables end it "creates an array of tables from 'tables' and 'tables_append_only' combined" do subject expect(mp['tables']).to eq %w(Users Addresses Invoices Sessions) end it 'creates an array of append-only tables' do subject expect(mp['tables_append_only']).to eq append_only_tables_list end end context 'with invalid tables and invalid append-only tables' do before do mp['tables_append_only'] = append_only_tables_str mp['table_attributes'] = default_table_attributes + tbl_attrs_for_append_only_tables mp['invalid_tables'] = invalid_tables_str mp['table_attributes'] = default_table_attributes + tbl_attrs_for_append_only_tables + tbl_attrs_for_invalid_tables end it 'does not change value for `tables`' do subject expect(mp['tables']).to eq %w(Users Addresses Invoices Sessions) end it 'does not change value for `tables_append_only`' do subject expect(mp['tables_append_only']).to eq append_only_tables_list end it 'creates an array of invalid tables' do subject expect(mp['invalid_tables']).to eq invalid_tables_list end it 'categorize invalid tables by sync type' do subject expect(mp['invalid_tables_append_only']).to eq %w(error_append_2) expect(mp['invalid_tables_full_sync']).to eq %w(error_fullsync_1) end end context 'with an invalid table which do not have table_attributes entry' do before do mp['invalid_tables'] = invalid_tables_str + ',table_from_conf' mp['table_attributes'] += default_table_attributes + tbl_attrs_for_invalid_tables end it 'cannot determine sync type for the table and raise an error' do expect { subject }.to raise_error /Sync type for invalid table `table_from_conf` not known/ end end context 'with ssl_ca_content' do before { mp["ssl_ca_content"] = ssl_ca_content } it "saves the content to a local file via SyncFileManager" do expect(SyncFileManager).to receive(:new).with(de). and_return(sfm) expect(sfm).to receive(:save_ssl_ca).with(ssl_ca_content) expect(sfm).to receive(:ssl_ca_path).and_return(ssl_ca_path) subject expect(mp['ssl_ca']).to eq ssl_ca_path expect(mp['sslca']).to eq ssl_ca_path end end end context 'called twice' do before { subject } it "repurposes the saved de" do expect(subject_object).to receive(:retrieve_data_entries).never subject expect(mp['tables']).to eq %w(Users Addresses) end end end context 'type RedshiftFileDataEntry' do before { de['type'] = 'RedshiftFileDataEntry' } it "raises an error about unsupported data entry" do expect { subject }.to raise_error /(supported data entry|data entry.*support)/ end end end describe '#set_current_tables' do subject { subject_object.send(:set_current_tables, input_tables, options) } let(:input_tables) { nil } let(:table_lists) {{ "tables" => tables, "invalid_tables" => invalid_tables, "new_tables" => new_tables, "tables_append_only" => tables_append_only, "invalid_tables_append_only" => invalid_tables_append_only, }} let(:real_new_tables) { [] } let(:tables) { ["table1", "table2", "table4"] } let(:invalid_tables) { ["table3_invalid","append3_invalid"] } let(:invalid_tables_append_only) { ["append3_invalid"] } let(:new_tables) { ["table4"] } let(:data_entry_with_table_lists) do new_de = default_data_entry.dup new_de['mysql_data_entry_preference'].merge!(table_lists) new_de end before do allow(subject_object).to receive(:data_entry).and_return(data_entry_with_table_lists) end context 'when include_all_tables option is true' do let(:options) { { include_all_tables: true } } context 'when there are append-only tables (valid & invalid)' do let(:tables_append_only) { ["append1","append2"] } let(:invalid_tables_append_only) { ["append3_invalid"] } it do subject expect(subject_object.instance_variable_get(:@full_tables)).to eq( tables + invalid_tables) expect(subject_object.instance_variable_get(:@append_only_tables)).to eq( tables_append_only + invalid_tables_append_only) end end context "when table_lists['tables_append_only'] is nil" do let(:tables_append_only) { nil } it do subject expect(subject_object.instance_variable_get(:@full_tables)).to eq( tables + invalid_tables) expect(subject_object.instance_variable_get(:@append_only_tables)).to eq( invalid_tables_append_only) end end context 'when there is no append-only tables' do let(:tables_append_only) { [] } let(:invalid_tables_append_only) { [] } it do subject expect(subject_object.instance_variable_get(:@full_tables)).to eq( tables + invalid_tables) expect(subject_object.instance_variable_get(:@append_only_tables)).to eq([]) end end end context 'when include_all_tables option is false' do let(:options) { { include_all_tables: false } } context 'when there is no real_new_tables' do let(:real_new_tables) { [] } let(:tables_append_only) { ["append1","append2"] } let(:invalid_tables_append_only) { ["append3_invalid"] } before do allow_any_instance_of(SyncFileManager).to receive(:get_new_table_list).and_return(real_new_tables) end it do subject expect(subject_object.instance_variable_get(:@full_tables)).to eq(tables - real_new_tables) expect(subject_object.instance_variable_get(:@append_only_tables)).to eq(tables_append_only) end end context 'there is real_new_tables' do let(:real_new_tables) { ['table1'] } let(:tables_append_only) { ["append1","append2"] } let(:invalid_tables_append_only) { ["append3_invalid"] } before do allow_any_instance_of(SyncFileManager).to receive(:get_new_table_list).and_return(real_new_tables) end it do subject expect(subject_object.instance_variable_get(:@full_tables)).to eq(tables - real_new_tables) expect(subject_object.instance_variable_get(:@append_only_tables)).to eq(tables_append_only) end end end end describe '#reset' do let(:subject_object) { described_class.new(reset_opts) } subject { subject_object.reset(*arg_tables) } let(:command_options) { ["-y"] } let(:reset_opts) { Sync.slop_reset } let(:timeout_value) { Sync::SERVER_DATA_PROCESSING_TIMEOUT } #3600 let(:test_data_entry) { default_data_entry } let(:valid_tables_append_only) { nil } let(:invalid_tables_append_only) { nil } let(:full_tables) { ["table1", "table2"] } #not include invalid_tables and real_new_tables let(:new_tables) { ["table4"] } before do command_options.each {|opt_str| reset_opts.parse!([opt_str]) } allow(subject_object).to receive(:data_entry).and_return(test_data_entry) allow(subject_object).to receive(:check_server_status).and_return( { "complete" => true, "state" => "complete" } ) allow_any_instance_of(Api::DataEntry).to receive(:cleanup_sync) allow_any_instance_of(SyncFileManager).to receive(:load_sync_info).and_return(sync_info) allow_any_instance_of(SyncFileManager).to receive(:get_new_table_list).with(full_tables, "generated_ddl").and_return(full_tables) allow_any_instance_of(SyncFileManager).to receive(:get_new_table_list).with(new_tables, "pos").and_return(new_tables) allow_any_instance_of(SyncFileManager).to receive(:get_new_table_list).with(full_tables, "pos").and_return([]) end context 'when called with --all' do let(:arg_tables) { [] } let(:command_options) { ["-y", "--all"] } let(:sync_info) { nil } it do expect(subject_object).to receive(:set_current_tables).with([], resume: false).and_call_original expect(subject_object).to receive(:wait_for_server_buffer).with( {:timeout=>timeout_value, :tables=>arg_tables} ) expect(subject_object).to receive(:cleanup_sync_server).with( default_data_entry, arg_tables ) subject end end context 'when called with --init' do let(:arg_tables) { [] } let(:command_options) { ["-y", "--init"] } let(:sync_info) { {initial_sync: is_full_init_sync, tables: sync_info_tables} } let(:full_tables) { ["table1", "table2", "table4"] } let(:new_tables) { [] } before do # no new tables. test_data_entry["mysql_data_entry_preference"]["new_tables"] = [] # continuous sync has started behind the aborted init-sync and all tables have .pos file allow_any_instance_of(SyncFileManager).to receive(:load_sync_info).and_return(sync_info) allow_any_instance_of(SyncFileManager).to receive(:get_new_table_list).with(full_tables, "generated_ddl").and_return([]) allow_any_instance_of(SyncFileManager).to receive(:get_new_table_list).with(new_tables, "pos").and_return([]) allow_any_instance_of(SyncFileManager).to receive(:get_new_table_list).with(full_tables, "pos").and_return([]) end context 'when the aborted init-sync was for all tables in the application' do let(:is_full_init_sync) { true } let(:sync_info_tables) { ["table1", "table2", "table4"] } let(:target_tables_for_api) { [] } it do expect(subject_object).to receive(:set_current_tables).with([], resume: true).and_call_original expect(subject_object).to receive(:wait_for_server_buffer).with( {:timeout=>timeout_value, :tables=>target_tables_for_api} ) expect(subject_object).to receive(:cleanup_sync_server).with( default_data_entry, sync_info_tables ) subject end end context 'when the aborted init-sync was for a part of tables in the application' do let(:is_full_init_sync) { false } let(:sync_info_tables) { ["table2"] } let(:target_tables_for_api) { sync_info_tables } it do expect(subject_object).to receive(:set_current_tables).with([], resume: true).and_call_original expect(subject_object).to receive(:wait_for_server_buffer).with( {:timeout=>timeout_value, :tables=>target_tables_for_api} ) expect(subject_object).to receive(:cleanup_sync_server).with( default_data_entry, sync_info_tables ) subject end end end context 'when called with a table as an argument' do let(:arg_tables) { ["table2"] } let(:command_options) { ["-y"] } let(:sync_info) { nil } let(:target_tables_for_api) { arg_tables } before do allow_any_instance_of(SyncFileManager).to receive(:get_new_table_list).with( full_tables, "pos").and_return([]) allow_any_instance_of(SyncFileManager).to receive(:get_new_table_list).with(full_tables, "pos").and_return([]) allow_any_instance_of(SyncFileManager).to receive(:get_new_table_list).with(new_tables, "pos").and_return(new_tables) allow_any_instance_of(SyncFileManager).to receive(:get_new_table_list).with(full_tables, "generated_ddl").and_return([]) end it do expect(subject_object).to receive(:set_current_tables).with(arg_tables, resume: true).and_call_original expect(subject_object).to receive(:wait_for_server_buffer).with( {:timeout=>timeout_value, :tables=>arg_tables} ) expect(subject_object).to receive(:cleanup_sync_server).with( default_data_entry, arg_tables ) subject end end end end end end