# coding: utf-8 require 'spec_helper' require 'flydata/sync_file_manager' require 'flydata/parser/mysql/dump_parser' module Flydata module FileUtil describe SyncFileManager do let(:default_mysqldump_dir) do File.join('/tmp', "sync_dump_#{Time.now.to_i}") end let(:default_data_entry) do {"id"=>93, "name"=>"flydata_sync_mysql", "data_port_id"=>52, "display_name"=>"flydata_sync_mysql", "enabled"=>true, "heroku_log_type"=>nil, "heroku_resource_id"=>nil, "log_deletion"=>nil, "log_file_delimiter"=>nil, "log_file_type"=>nil, "log_path"=>nil, "redshift_schema_name"=>"", "redshift_table_name"=>nil, "created_at"=>"2014-01-22T18:58:43Z", "updated_at"=>"2014-01-30T02:42:26Z", "type"=>"RedshiftMysqlDataEntry", "tag_name"=>"flydata.a458c641_dp52.flydata_mysql", "tag_name_dev"=>"flydata.a458c641_dp52.flydata_mysql.dev", "data_port_key"=>"a458c641", "mysql_data_entry_preference" => { "host"=>"localhost", "port"=>3306, "username"=>"masashi", "password"=>"welcome", "database"=>"sync_test", "tables"=>nil, "mysqldump_dir"=>default_mysqldump_dir, "forwarder" => "tcpforwarder", "data_servers"=>"localhost:9905" } } end let(:default_sfm) { SyncFileManager.new(default_data_entry) } let (:status) { 'PARSING' } let (:table_name) { 'test_table' } let (:last_pos) { 9999 } let (:binlog_pos) { {binfile: 'mysqlbin.00001', pos: 111} } let (:state) { 'CREATE_TABLE' } let (:substate) { nil } after :each do if Dir.exists?(default_mysqldump_dir) Dir.delete(default_mysqldump_dir) rescue nil end if File.exists?(default_mysqldump_dir) File.delete(default_mysqldump_dir) rescue nil end end describe '#dump_file_path' do context 'when mysqldump_dir param is nil' do before do default_data_entry['mysql_data_entry_preference'].delete('mysqldump_dir') end it do expect(default_sfm.dump_file_path).to eq( File.join(Flydata::FLYDATA_HOME, 'dump', 'flydata_sync_mysql.dump')) end end context 'when file exists' do before { `touch #{default_mysqldump_dir}`} it do expect{default_sfm.dump_file_path}.to raise_error end end context 'when directory exists' do before { `mkdir -p #{default_mysqldump_dir}`} it do expect(FileUtils).to receive(:mkdir_p).with(default_mysqldump_dir).never expect(default_sfm.dump_file_path).to eq( File.join(default_mysqldump_dir, 'flydata_sync_mysql.dump')) end end context 'when directory or file does not exist' do it do expect(FileUtils).to receive(:mkdir_p).with(default_mysqldump_dir).once expect(default_sfm.dump_file_path).to eq( File.join(default_mysqldump_dir, 'flydata_sync_mysql.dump')) end end context 'when file name includes "~"' do let(:default_mysqldump_dir) { "~/tmp/dump/sync_spec_#{Time.now.to_i}" } it do expected_dir = File.join(ENV['HOME'], default_mysqldump_dir[1..-1]) expect(FileUtils).to receive(:mkdir_p).with(expected_dir).once expect(default_sfm.dump_file_path).to eq( File.join(expected_dir, 'flydata_sync_mysql.dump')) end end end describe '#dump_pos_path' do it { expect(default_sfm.dump_pos_path).to eq( File.join(default_mysqldump_dir, 'flydata_sync_mysql.dump.pos')) } end describe '#save_dump_pos' do context 'without mysql marshal data' do it do expect{default_sfm.save_dump_pos( status, table_name, last_pos, binlog_pos, state, substate)}.not_to raise_error expect(default_sfm.load_dump_pos).to eq({ status: status, table_name: table_name, last_pos: last_pos, binlog_pos: binlog_pos, state: state, substate: substate, mysql_table: nil }) end end end describe '#load_dump_pos' do let (:mysql_table) do Flydata::Parser::Mysql::MysqlTable.new( table_name, { 'id' => { format_type: 'int' }, 'value' => { format_type: 'text' } } ) end context 'with mysql marshal data' do before do default_sfm.save_mysql_table_marshal_dump(mysql_table) default_sfm.save_dump_pos(status, table_name, last_pos, binlog_pos, state, substate) end it do ret = default_sfm.load_dump_pos mt = ret.delete(:mysql_table) expect(ret).to eq({ status: status, table_name: table_name, last_pos: last_pos, binlog_pos: binlog_pos, state: state, substate: substate, }) expect(mt.table_name).to eq(table_name) expect(mt.columns).to eq({ 'id' => { format_type: 'int' }, 'value' => { format_type: 'text' }, }) end end end describe '#save_binlog' do let(:binfile) { 'mysqlbinlog.000001' } let(:pos) { 107 } let(:binlog_pos) { {binfile: binfile, pos: pos} } it do default_sfm.save_binlog(binlog_pos) expect(`cat #{default_sfm.binlog_path}`).to eq("#{binfile}\t#{pos}") end end describe '#binlog_path' do it { expect(default_sfm.binlog_path).to eq("#{FLYDATA_HOME}/flydata_sync_mysql.binlog.pos") } end describe '#increment_and_save_table_position' do let(:test_table) { 'test_table' } before do table_pos_dir = SyncFileManager::TABLE_POSITIONS_DIR FileUtils.mkdir_p table_pos_dir table_pos_file = "#{table_pos_dir}/#{test_table}.pos" FileUtils.rm(table_pos_file) if File.exist?(table_pos_file) end subject { default_sfm } context 'when an exception happens in the block' do let(:first_pos) { 1 } let(:last_pos) { first_pos + 1 } before do subject.increment_and_save_table_position(test_table) do |seq| expect(seq).to eq first_pos end begin subject.increment_and_save_table_position(test_table) do |seq| expect(seq).to eq last_pos raise "error" end rescue # intentionally left blank end end it 'yields with the previous value' do expect{|b| subject.increment_and_save_table_position(test_table, &b)}.to yield_with_args(last_pos) end end end end end end