# coding: utf-8 require 'spec_helper' require 'flydata/sync_file_manager' require 'flydata/parser/mysql/dump_parser' module Flydata describe SyncFileManager do let(:subject_object) { described_class.new(default_data_entry) } let(:default_mysqldump_dir) do File.join('/tmp', "sync_dump_#{Time.now.to_i}") end let(:default_data_entry) do {"id"=>93, "name"=>"flydata_sync_mysql", "data_port_id"=>52, "display_name"=>"flydata_sync_mysql", "enabled"=>true, "heroku_log_type"=>nil, "heroku_resource_id"=>nil, "log_deletion"=>nil, "log_file_delimiter"=>nil, "log_file_type"=>nil, "log_path"=>nil, "redshift_schema_name"=>"", "redshift_table_name"=>nil, "created_at"=>"2014-01-22T18:58:43Z", "updated_at"=>"2014-01-30T02:42:26Z", "type"=>"RedshiftMysqlDataEntry", "tag_name"=>"flydata.a458c641_dp52.flydata_mysql", "tag_name_dev"=>"flydata.a458c641_dp52.flydata_mysql.dev", "data_port_key"=>"a458c641", "mysql_data_entry_preference" => { "host"=>"localhost", "port"=>3306, "username"=>"masashi", "password"=>"welcome", "database"=>"sync_test", "tables"=>nil, "mysqldump_dir"=>default_mysqldump_dir, "forwarder" => "tcpforwarder", "data_servers"=>"localhost:9905" } } end let (:status) { 'PARSING' } let (:table_name) { 'test_table' } let (:table_name2) { 'test_table2' } let (:last_pos) { 9999 } let (:binlog_pos) { {binfile: 'mysqlbin.00001', pos: 111} } let (:state) { 'CREATE_TABLE' } let (:substate) { nil } after :each do if Dir.exists?(default_mysqldump_dir) Dir.delete(default_mysqldump_dir) rescue nil end if File.exists?(default_mysqldump_dir) File.delete(default_mysqldump_dir) rescue nil end end describe '#dump_file_path' do context 'when mysqldump_dir param is nil' do before do default_data_entry['mysql_data_entry_preference'].delete('mysqldump_dir') end it do stub_const('Flydata::FileUtil::SyncFileManager::DUMP_DIR', File.join(Flydata::FLYDATA_HOME, 'dump')) expect(subject_object.dump_file_path).to eq( File.join(Flydata::FLYDATA_HOME, 'dump', 'flydata_sync_mysql.dump')) end end context 'when file exists' do before { `touch #{default_mysqldump_dir}`} it do expect{subject_object.dump_file_path}.to raise_error end end context 'when directory exists' do before { `mkdir -p #{default_mysqldump_dir}`} it do expect(FileUtils).to receive(:mkdir_p).with(default_mysqldump_dir).never expect(subject_object.dump_file_path).to eq( File.join(default_mysqldump_dir, 'flydata_sync_mysql.dump')) end end context 'when directory or file does not exist' do it do expect(FileUtils).to receive(:mkdir_p).with(default_mysqldump_dir).once expect(subject_object.dump_file_path).to eq( File.join(default_mysqldump_dir, 'flydata_sync_mysql.dump')) end end context 'when file name includes "~"' do let(:default_mysqldump_dir) { "~/tmp/dump/sync_spec_#{Time.now.to_i}" } it do expected_dir = File.join(ENV['HOME'], default_mysqldump_dir[1..-1]) expect(FileUtils).to receive(:mkdir_p).with(expected_dir).once expect(subject_object.dump_file_path).to eq( File.join(expected_dir, 'flydata_sync_mysql.dump')) end end end describe '#dump_pos_path' do it { expect(subject_object.dump_pos_path).to eq( File.join(default_mysqldump_dir, 'flydata_sync_mysql.dump.pos')) } end describe '#save_dump_pos' do context 'without mysql marshal data' do it do expect{subject_object.save_dump_pos( status, table_name, last_pos, binlog_pos, state, substate)}.not_to raise_error expect(subject_object.load_dump_pos).to eq({ status: status, table_name: table_name, last_pos: last_pos, binlog_pos: binlog_pos, state: state, substate: substate, mysql_table: nil }) end end end describe '#load_dump_pos' do let (:mysql_table) do Flydata::Parser::Mysql::MysqlTable.new( table_name, { 'id' => { format_type: 'int' }, 'value' => { format_type: 'text' } } ) end context 'with mysql marshal data' do before do subject_object.save_mysql_table_marshal_dump(mysql_table) subject_object.save_dump_pos(status, table_name, last_pos, binlog_pos, state, substate) end it do ret = subject_object.load_dump_pos mt = ret.delete(:mysql_table) expect(ret).to eq({ status: status, table_name: table_name, last_pos: last_pos, binlog_pos: binlog_pos, state: state, substate: substate, }) expect(mt.table_name).to eq(table_name) expect(mt.columns).to eq({ 'id' => { format_type: 'int' }, 'value' => { format_type: 'text' }, }) end end end describe '#save_binlog' do let(:binfile) { 'mysqlbinlog.000001' } let(:pos) { 107 } let(:binlog_pos) { {binfile: binfile, pos: pos} } it do subject_object.save_binlog(binlog_pos) expect(`cat #{subject_object.binlog_path}`).to eq("#{binfile}\t#{pos}") end end describe '#load_binlog' do let(:binfile) { 'mysqlbinlog.000001' } let(:pos) { 107 } let(:binlog_pos) { {binfile: binfile, pos: pos} } subject { subject_object.load_binlog } context 'when binlog pos does not exist' do before do File.delete(subject_object.binlog_path) if File.exist?(subject_object.binlog_path) end it { is_expected.to be_nil } end context 'when binlog pos exists' do before do subject_object.save_binlog(binlog_pos) end it { is_expected.to eq(binlog_pos) } end end describe '#binlog_path' do it { expect(subject_object.binlog_path).to eq("#{FLYDATA_HOME}/flydata_sync_mysql.binlog.pos") } end describe '#sent_binlog_path' do context 'with no args' do subject { subject_object.sent_binlog_path } it { is_expected.to eq("#{FLYDATA_HOME}/flydata_sync_mysql.binlog.sent.pos") } end context 'with invalid args' do subject { subject_object.sent_binlog_path('/home/ec2-user/.flydata/flydata_sync.pos') } it { expect{subject}.to raise_error(ArgumentError) } end context 'with valid args' do subject { subject_object.sent_binlog_path('/home/ec2-user/.flydata/flydata_sync.binlog.pos') } it { is_expected.to eq('/home/ec2-user/.flydata/flydata_sync.binlog.sent.pos') } end end describe '#ssl_ca_path' do context 'with no args' do subject { subject_object.ssl_ca_path } it { is_expected.to eq("#{FLYDATA_HOME}/flydata_sync_mysql.ssl_ca.pem") } end context 'with invalid args' do subject { subject_object.ssl_ca_path('/home/ec2-user/.flydata/flydata_sync.pos') } it { expect{subject}.to raise_error(ArgumentError) } end context 'with valid args' do subject { subject_object.ssl_ca_path('/home/ec2-user/.flydata/flydata_sync.binlog.pos') } it { is_expected.to eq('/home/ec2-user/.flydata/flydata_sync.ssl_ca.pem') } end end describe '#save_ssl_ca_path' do let(:ssl_ca_path) { subject_object.ssl_ca_path } let(:ssl_ca_content) { 'aaaaabbbbbccccc' } subject { subject_object.save_ssl_ca(ssl_ca_content) } after do File.delete(ssl_ca_path) end it 'write a ssl_ca_content to ssl_ca_path' do subject expect(IO.read(ssl_ca_path)).to eq(ssl_ca_content) end end describe '#increment_and_save_table_position' do let(:test_table) { 'test_table' } before do table_pos_dir = SyncFileManager::TABLE_POSITIONS_DIR FileUtils.mkdir_p table_pos_dir table_pos_file = "#{table_pos_dir}/#{test_table}.pos" FileUtils.rm(table_pos_file) if File.exist?(table_pos_file) end subject { subject_object } context 'when an exception happens in the block' do let(:first_pos) { 1 } let(:last_pos) { first_pos + 1 } before do subject.increment_and_save_table_position(test_table) do |seq| expect(seq).to eq first_pos end begin subject.increment_and_save_table_position(test_table) do |seq| expect(seq).to eq last_pos raise "error" end rescue # intentionally left blank end end it 'yields with the previous value' do expect{|b| subject.increment_and_save_table_position(test_table, &b)}.to yield_with_args(last_pos) end end end describe '#save_generated_ddl' do subject { subject_object.save_generated_ddl(table_names, contents) } let(:contents) { '2' } let(:io) { double('io') } shared_examples 'writing to per-table .generated_ddl files in positions directory' do it do table_name_array.each do |tn| expect(File).to receive(:open).with("#{FLYDATA_HOME}/positions/#{tn}.generated_ddl", 'w').and_yield(io) expect(io).to receive(:write).with(contents) end subject end end context 'single table name' do let(:table_names) { table_name } let(:table_name_array) { [ table_name ] } it_behaves_like 'writing to per-table .generated_ddl files in positions directory' end context 'multiple table names' do let(:table_names) { [ table_name, table_name2 ] } let(:table_name_array) { table_names } it_behaves_like 'writing to per-table .generated_ddl files in positions directory' end end describe "#load_generated_ddl" do subject { subject_object.load_generated_ddl(table_names) } let(:contents) { { table_name => '2', table_name2 => '3' } } let(:io) { double('io') } let(:expected_result) { table_name_array.collect{|tn| contents[tn] } } shared_examples 'reading from per-table .generated_ddl files in positions directory' do it do table_name_array.each do |tn| expect(File).to receive(:open).with("#{FLYDATA_HOME}/positions/#{tn}.generated_ddl").and_yield(io) expect(io).to receive(:read).and_return(contents[tn]) end is_expected.to eq expected_result end end context 'single table name' do let(:table_names) { table_name } let(:table_name_array) { [ table_name ] } it_behaves_like 'reading from per-table .generated_ddl files in positions directory' end context 'multiple table names' do let(:table_names) { [ table_name, table_name2 ] } let(:table_name_array) { table_names } context 'normal case' do it_behaves_like 'reading from per-table .generated_ddl files in positions directory' end context 'first file is missing' do let(:table_name_array) { [ table_name2 ] } let(:expected_result) { [nil, contents[table_name2]] } let(:file_path) { "#{FLYDATA_HOME}/positions/#{table_name}.generated_ddl" } before do expect(File).to receive(:open).with(file_path).and_raise(Errno::ENOENT.new(file_path)) end it_behaves_like 'reading from per-table .generated_ddl files in positions directory' end end end end end