# encoding: utf-8 require 'fluent_plugins_spec_helper' require 'in_mysql_binlog_flydata' require 'timecop' module Fluent FLYDATA_HOME = Flydata::FLYDATA_HOME TEST_TAG = "test_tag" TEST_DB = "test_db" TEST_TABLE = "test_table" #TEST_SEQUENCE_FILE = /positions\/#{TEST_TABLE}.pos$/ TEST_SEQUENCE_FILE = File.join(FLYDATA_HOME, "positions/#{TEST_TABLE}.pos") TEST_SEQUENCE_NUM = 1 TEST_TABLE_BINLOG_POS = File.join(FLYDATA_HOME, "positions/#{TEST_TABLE}.binlog.pos") TEST_TABLES = "#{TEST_TABLE},test_table_1,test_table_2,test_table_3" TEST_POSITION_FILE = "test_position.log" TEST_REVISION_FILE = File.join(FLYDATA_HOME, "positions/#{TEST_TABLE}.rev") TEST_TIMESTAMP = 1389214083 TEST_CONFIG = <0, "timestamp"=>#{TEST_TIMESTAMP}, "type_code"=>23, "server_id"=>1, "event_length"=>39, "next_position"=>667, "flags"=>1, "event_type"=>"Write_rows", "table_id"=>163, "db_name"=>"#{TEST_DB}", "table_name"=>"#{TEST_TABLE}", "columns"=>["LONG", "BLOB"], "columns_len"=>2, "null_bits_len"=>1, "raw_columns_before_image"=>[], "raw_used_columns"=>[255], "raw_row"=>[252, 3, 0, 0, 0, 3, 0, 102, 111, 111], "rows"=>[["1", "foo"],["2","var"],["3","hoge"]]} EOT TEST_EVENT_TWO_BYTE_INSERT=<0, "timestamp"=>#{TEST_TIMESTAMP}, "type_code"=>23, "server_id"=>1, "event_length"=>39, "next_position"=>667, "flags"=>1, "event_type"=>"Write_rows", "table_id"=>163, "db_name"=>"#{TEST_DB}", "table_name"=>"#{TEST_TABLE}", "columns"=>["LONG", "BLOB"], "columns_len"=>2, "null_bits_len"=>1, "raw_columns_before_image"=>[], "raw_used_columns"=>[255], "raw_row"=>[252, 3, 0, 0, 0, 3, 0, 102, 111, 111], "rows"=>[["1", "føø"],["2","vår"],["3","høgé"]]} EOT TEST_EVENT_THREE_BYTE_INSERT=<0, "timestamp"=>#{TEST_TIMESTAMP}, "type_code"=>23, "server_id"=>1, "event_length"=>39, "next_position"=>667, "flags"=>1, "event_type"=>"Write_rows", "table_id"=>163, "db_name"=>"#{TEST_DB}", "table_name"=>"#{TEST_TABLE}", "columns"=>["LONG", "BLOB"], "columns_len"=>2, "null_bits_len"=>1, "raw_columns_before_image"=>[], "raw_used_columns"=>[255], "raw_row"=>[252, 3, 0, 0, 0, 3, 0, 102, 111, 111], "rows"=>[["1", "富无无"],["2","易变的"],["3","切实切实"]]} EOT # - 18 UPDATE_ROWS_EVENT TEST_EVENT_UPDATE = <0, "timestamp"=>#{TEST_TIMESTAMP}, "type_code"=>24, "server_id"=>1, "event_length"=>78, "next_position"=>2606, "flags"=>1, "event_type"=>"Update_rows", "table_id"=>163, "db_name"=>"#{TEST_DB}", "table_name"=>"#{TEST_TABLE}", "columns"=>["LONG", "BLOB"], "columns_len"=>2, "null_bits_len"=>1, "raw_columns_before_image"=>[255], "raw_used_columns"=>[255], "raw_row"=>[252, 6, 0, 0, 0, 1, 0, 97, 252, 6, 0, 0, 0, 1, 0, 100, 252, 6, 0, 0, 0, 1, 0, 98, 252, 6, 0, 0, 0, 1, 0, 100, 252, 6, 0, 0, 0, 1, 0, 99, 252, 6, 0, 0, 0, 1, 0, 100], "rows"=>[[["1", "foo"], ["1", "wow"]],[["3", "hoge"], ["3", "fuga"]]]} EOT TEST_EVENT_TWO_BYTE_UPDATE = <0, "timestamp"=>#{TEST_TIMESTAMP}, "type_code"=>24, "server_id"=>1, "event_length"=>78, "next_position"=>2606, "flags"=>1, "event_type"=>"Update_rows", "table_id"=>163, "db_name"=>"#{TEST_DB}", "table_name"=>"#{TEST_TABLE}", "columns"=>["LONG", "BLOB"], "columns_len"=>2, "null_bits_len"=>1, "raw_columns_before_image"=>[255], "raw_used_columns"=>[255], "raw_row"=>[252, 6, 0, 0, 0, 1, 0, 97, 252, 6, 0, 0, 0, 1, 0, 100, 252, 6, 0, 0, 0, 1, 0, 98, 252, 6, 0, 0, 0, 1, 0, 100, 252, 6, 0, 0, 0, 1, 0, 99, 252, 6, 0, 0, 0, 1, 0, 100], "rows"=>[[["1", "føø"], ["1", "∑ø∑"]],[["3", "høgé"], ["3", "fügå"]]]} EOT TEST_EVENT_THREE_BYTE_UPDATE = <0, "timestamp"=>#{TEST_TIMESTAMP}, "type_code"=>24, "server_id"=>1, "event_length"=>78, "next_position"=>2606, "flags"=>1, "event_type"=>"Update_rows", "table_id"=>163, "db_name"=>"#{TEST_DB}", "table_name"=>"#{TEST_TABLE}", "columns"=>["LONG", "BLOB"], "columns_len"=>2, "null_bits_len"=>1, "raw_columns_before_image"=>[255], "raw_used_columns"=>[255], "raw_row"=>[252, 6, 0, 0, 0, 1, 0, 97, 252, 6, 0, 0, 0, 1, 0, 100, 252, 6, 0, 0, 0, 1, 0, 98, 252, 6, 0, 0, 0, 1, 0, 100, 252, 6, 0, 0, 0, 1, 0, 99, 252, 6, 0, 0, 0, 1, 0, 100], "rows"=>[[["1", "富无无"], ["1", "很兴奋"]],[["3", "切实切实"], ["3", "興奮虎"]]]} EOT # - 19 DELETE_ROWS_EVENT TEST_EVENT_DELETE = <0, "timestamp"=>#{TEST_TIMESTAMP}, "type_code"=>25, "server_id"=>1, "event_length"=>41, "next_position"=>5365, "flags"=>1, "event_type"=>"Delete_rows", "table_id"=>170, "db_name"=>"#{TEST_DB}", "table_name"=>"#{TEST_TABLE}", "columns"=>["LONG", "BLOB"], "columns_len"=>2, "null_bits_len"=>1, "raw_columns_before_image"=>[], "raw_used_columns"=>[255], "raw_row"=>[252, 2, 0, 0, 0, 5, 0, 104, 111, 104, 111, 103], "rows"=>[["2", "var"],["3","hoge"]]} EOT TEST_EVENT_TWO_BYTE_DELETE = <0, "timestamp"=>#{TEST_TIMESTAMP}, "type_code"=>25, "server_id"=>1, "event_length"=>41, "next_position"=>5365, "flags"=>1, "event_type"=>"Delete_rows", "table_id"=>170, "db_name"=>"#{TEST_DB}", "table_name"=>"#{TEST_TABLE}", "columns"=>["LONG", "BLOB"], "columns_len"=>2, "null_bits_len"=>1, "raw_columns_before_image"=>[], "raw_used_columns"=>[255], "raw_row"=>[252, 2, 0, 0, 0, 5, 0, 104, 111, 104, 111, 103], "rows"=>[["2", "vår"],["3","høgé"]]} EOT TEST_EVENT_THREE_BYTE_DELETE = <0, "timestamp"=>#{TEST_TIMESTAMP}, "type_code"=>25, "server_id"=>1, "event_length"=>41, "next_position"=>5365, "flags"=>1, "event_type"=>"Delete_rows", "table_id"=>170, "db_name"=>"#{TEST_DB}", "table_name"=>"#{TEST_TABLE}", "columns"=>["LONG", "BLOB"], "columns_len"=>2, "null_bits_len"=>1, "raw_columns_before_image"=>[], "raw_used_columns"=>[255], "raw_row"=>[252, 2, 0, 0, 0, 5, 0, 104, 111, 104, 111, 103], "rows"=>[["2", "易变的"],["3","切实切实"]]} EOT # Unsupported event # - 02 QUERY_EVENT # QUERY: BEGIN TEST_EVENT_QUERY_BEGIN = <0, "timestamp"=>#{TEST_TIMESTAMP}, "type_code"=>2, "server_id"=>1, "event_length"=>73, "next_position"=>2472, "flags"=>8, "event_type"=>"Query", "thread_id"=>71, "exec_time"=>0, "error_code"=>0, "variables"=>[0, 0, 0, 0, 0, 1, 0, 0, 0, 0, 0, 0, 0, 0, 6, 3, 115, 116, 100, 4, 33, 0, 33, 0, 8, 0], "db_name"=>"#{TEST_DB}", "query"=>"BEGIN"} EOT # QUERY: create database TEST_EVENT_QUERY_CREATE_DATABSE = <0, "timestamp"=>1389309478, "type_code"=>2, "server_id"=>1, "event_length"=>89, "next_position"=>196, "flags"=>8, "event_type"=>"Query", "thread_id"=>39, "exec_time"=>0, "error_code"=>0, "variables"=>[0, 0, 0, 0, 0, 1, 0, 0, 0, 0, 0, 0, 0, 0, 6, 3, 115, 116, 100, 4, 33, 0, 33, 0, 8, 0], "db_name"=>"test_db", "query"=>"create database test_db"} EOT # QUERY: create table TEST_EVENT_QUERY_CREATE_TABLE = <0, "timestamp"=>#{TEST_TIMESTAMP}, "type_code"=>2, "server_id"=>1, "event_length"=>121, "next_position"=>317, "flags"=>0, "event_type"=>"Query", "thread_id"=>42, "exec_time"=>0, "error_code"=>0, "variables"=>[0, 0, 0, 0, 0, 1, 0, 0, 0, 0, 0, 0, 0, 0, 6, 3, 115, 116, 100, 4, 33, 0, 33, 0, 8, 0], "db_name"=>"test_db", "query"=>"create table test_table(id int primary key, value text)"} EOT # QUERY: drop table TEST_EVENT_QUERY_DROP_TABLE = <0, "timestamp"=>#{TEST_TIMESTAMP}, "type_code"=>2, "server_id"=>1, "event_length"=>115, "next_position"=>568, "flags"=>0, "event_type"=>"Query", "thread_id"=>42, "exec_time"=>0, "error_code"=>0, "variables"=>[0, 0, 0, 0, 0, 1, 0, 0, 0, 0, 0, 0, 0, 0, 6, 3, 115, 116, 100, 4, 33, 0, 33, 0, 8, 0], "db_name"=>"test_db", "query"=>"DROP TABLE `test_table` /* generated by server */"} EOT # QUERY: alter table add column TEST_EVENT_QUERY_ALTER_TABLE_ADD_COLUMN = <0, "timestamp"=>#{TEST_TIMESTAMP}, "type_code"=>2, "server_id"=>1, "event_length"=>111, "next_position"=>800, "flags"=>0, "event_type"=>"Query", "thread_id"=>42, "exec_time"=>0, "error_code"=>0, "variables"=>[0, 0, 0, 0, 0, 1, 0, 0, 0, 0, 0, 0, 0, 0, 6, 3, 115, 116, 100, 4, 33, 0, 33, 0, 8, 0], "db_name"=>"test_db", "query"=>"alter table test_table add column sum integer"} EOT # QUERY: alter table drop column TEST_EVENT_QUERY_ALTER_TABLE_DROP_COLUMN = <0, "timestamp"=>#{TEST_TIMESTAMP}, "type_code"=>2, "server_id"=>1, "event_length"=>104, "next_position"=>904, "flags"=>0, "event_type"=>"Query", "thread_id"=>42, "exec_time"=>0, "error_code"=>0, "variables"=>[0, 0, 0, 0, 0, 1, 0, 0, 0, 0, 0, 0, 0, 0, 6, 3, 115, 116, 100, 4, 33, 0, 33, 0, 8, 0], "db_name"=>"test_db", "query"=>"alter table test_table drop column sum"} EOT # QUERY: alter table modify column TEST_EVENT_QUERY_ALTER_TABLE_MODIFY_COLUMN = <0, "timestamp"=>#{TEST_TIMESTAMP}, "type_code"=>2, "server_id"=>1, "event_length"=>112, "next_position"=>1352, "flags"=>0, "event_type"=>"Query", "thread_id"=>48, "exec_time"=>0, "error_code"=>0, "variables"=>[0, 0, 0, 0, 0, 1, 0, 0, 0, 0, 0, 0, 0, 0, 6, 3, 115, 116, 100, 4, 33, 0, 33, 0, 8, 0], "db_name"=>"test_db", "query"=>"alter table test_table modify column sum float"} EOT # - 03 STOP_EVENT TEST_EVENT_STOP = <0, "timestamp"=>#{TEST_TIMESTAMP}, "type_code"=>3, "server_id"=>1, "event_length"=>19, "next_position"=>2929, "flags"=>0, "event_type"=>"Stop"} EOT # - 04 ROTATE_EVENT TEST_EVENT_ROTATE = <0, "timestamp"=>0, "type_code"=>4, "server_id"=>1, "event_length"=>43, "next_position"=>0, "flags"=>32, "event_type"=>"Rotate", "binlog_file"=>"mysql-bin.000048", "binlog_pos"=>2883} EOT # - 0f FORMAT_DESCRIPTION_EVENT TEST_EVENT_FORMAT_DESC = <0, "timestamp"=>1389292075, "type_code"=>15, "server_id"=>1, "event_length"=>103, "next_position"=>0, "flags"=>0, "event_type"=>"Format_desc", "binlog_version"=>0, "created_ts"=>2560145104, "log_header_len"=>71} EOT # - 10 XID_EVENT TEST_EVENT_XID = <0, "timestamp"=>#{TEST_TIMESTAMP}, "type_code"=>16, "server_id"=>1, "event_length"=>27, "next_position"=>2633, "flags"=>0, "event_type"=>"Xid", "xid_id"=>129345} EOT # - 13 TABLE_MAP_EVENT TEST_EVENT_TABLE_MAP = <0, "timestamp"=>#{TEST_TIMESTAMP}, "type_code"=>19, "server_id"=>1, "event_length"=>56, "next_position"=>2528, "flags"=>1, "event_type"=>"Table_map", "table_id"=>163, "db_name"=>"#{TEST_DB}", "table_name"=>"#{TEST_TABLE}", "raw_columns"=>[3, 252], "columns"=>["LONG", "BLOB"], "metadata"=>[2], "null_bits"=>[3]} EOT # la TEST_EVENT_INCIDENT = <0, "timestamp"=>0, "type_code"=>26, "server_id"=>1, "event_length"=>40, "next_position"=>2883, "flags"=>32, "event_type"=>"Incident", "incident_type"=>175, "message"=>"Operation canceled"} EOT # - 04 ROTATE_EVENT TEST_EVENT_ROTATE_MISSING_BINLOG_FILE = <0, "timestamp"=>0, "type_code"=>4, "server_id"=>1, "event_length"=>43, "next_position"=>0, "flags"=>32, "event_type"=>"Rotate", "binlog_pos"=>2883} EOT describe MysqlBinlogFlydataInput do def create_event(event_str) event = eval(event_str) allow(event).to receive(:event_type).and_return(event["event_type"]) event end def expect_emitted_records(event, records) records = [records] unless records.kind_of?(Array) records.each do |r| expect(Engine).to receive(:emit).with(TEST_TAG, TEST_TIMESTAMP, r).ordered end plugin.event_listener(event) end def expect_no_emitted_record(event) expect(Engine).to receive(:emit).never plugin.event_listener(event) end def expect_emitted_records_with_rows(event, type, table, position, binlog_file, rows, start_seq = nil) rows = [rows] unless rows.kind_of?(Array) @seq ||= TEST_SEQUENCE_NUM records = rows.collect do |row| @seq += 1 { :type=>type, :table_name=>table, :respect_order=>true, :seq=>@seq, :src_pos=>"#{binlog_file}\t#{position}", :table_rev=>1, :row=>row } end expect_emitted_records(event, records) end let(:plugin) { MysqlBinlogFlydataInput.new } let(:insert_event) { create_event(TEST_EVENT_INSERT) } let(:insert_two_byte_event) { create_event(TEST_EVENT_TWO_BYTE_INSERT) } let(:insert_three_byte_event) { create_event(TEST_EVENT_THREE_BYTE_INSERT) } let(:delete_event) { create_event(TEST_EVENT_DELETE) } let(:delete_two_byte_event) { create_event(TEST_EVENT_TWO_BYTE_DELETE) } let(:delete_three_byte_event) { create_event(TEST_EVENT_THREE_BYTE_DELETE) } let(:update_event) { create_event(TEST_EVENT_UPDATE) } let(:update_two_byte_event) { create_event(TEST_EVENT_TWO_BYTE_UPDATE) } let(:update_three_byte_event) { create_event(TEST_EVENT_THREE_BYTE_UPDATE) } let(:alter_table_add_column_event) { create_event(TEST_EVENT_QUERY_ALTER_TABLE_ADD_COLUMN) } let(:alter_table_drop_column_event) { create_event(TEST_EVENT_QUERY_ALTER_TABLE_DROP_COLUMN) } let(:query_event) { create_event(TEST_EVENT_QUERY_CREATE_DATABSE) } let(:table_map_event) { create_event(TEST_EVENT_TABLE_MAP) } let(:xid_event) { create_event(TEST_EVENT_XID) } let(:rotate_event){ create_event(TEST_EVENT_ROTATE) } let(:rotate_event_corrupt){ create_event(TEST_EVENT_ROTATE_MISSING_BINLOG_FILE) } let(:now) { Time.now } let(:binlog_position_file) do f = double('binlog_position_file') allow(f).to receive(:exists?).and_return(true) allow(f).to receive(:read).and_return('test_position') f end def create_file(file_path, content) File.open(file_path, 'w') {|f| f.write(content)} end def delete_file(file_path) FileUtils.rm(file_path) if File.exists?(file_path) end def setup_initial_flydata_files %w(positions dump conf).each{|f| FileUtils.mkdir_p(File.join(FLYDATA_HOME, f))} create_file(TEST_SEQUENCE_FILE, TEST_SEQUENCE_NUM.to_s) end def cleanup_flydata_files %w(positions dump conf).each{|f| FileUtils.rm_rf(File.join(FLYDATA_HOME, f))} delete_file(TEST_SEQUENCE_FILE) end before do cleanup_flydata_files setup_initial_flydata_files allow(MysqlBinlogInput::BinlogUtil).to receive(:to_hash) {|e| e} allow(Mysql::BinLogPositionFile).to receive(:new).with(TEST_POSITION_FILE).and_return(binlog_position_file) Timecop.freeze(now) end after do cleanup_flydata_files Timecop.return end describe '#event_listener' do context 'when received rotate event' do before do Test.configure_plugin(plugin, TEST_CONFIG) plugin.event_listener(rotate_event) end context 'when received insert event' do it do expect_emitted_records_with_rows(insert_event, :insert, TEST_TABLE, 628, "mysql-bin.000048", [{"1"=>"0SL00000001", "2"=>"foo"}, {"1"=>"0SL00000002", "2"=>"var"}, {"1"=>"0SL00000003", "2"=>"hoge"}]) end end context 'when received insert event containing two byte UTF8 chars' do it do expect_emitted_records_with_rows(insert_two_byte_event, :insert, TEST_TABLE, 628, "mysql-bin.000048", [{"1"=>"0SL00000001", "2"=>"føø"}, {"1"=>"0SL00000002", "2"=>"vår"}, {"1"=>"0SL00000003", "2"=>"høgé"}]) end end context 'when received insert event containing three byte UTF8 chars' do it do expect_emitted_records_with_rows(insert_three_byte_event, :insert, TEST_TABLE, 628, "mysql-bin.000048", [{"1"=>"0SL00000001", "2"=>"富无无"}, {"1"=>"0SL00000002", "2"=>"易变的"}, {"1"=>"0SL00000003", "2"=>"切实切实"}]) end end context 'when received delete event' do it do expect_emitted_records_with_rows(delete_event, :delete, TEST_TABLE, 5324, "mysql-bin.000048", [{"1"=>"0SL00000002", "2"=>"var"}, {"1"=>"0SL00000003", "2"=>"hoge"}]) end end context 'when received delete event containing two byte UTF8 chars' do it do expect_emitted_records_with_rows(delete_two_byte_event, :delete, TEST_TABLE, 5324, "mysql-bin.000048", [{"1"=>"0SL00000002", "2"=>"vår"}, {"1"=>"0SL00000003", "2"=>"høgé"}]) end end context 'when received delete event with containing byte UTF8 chars' do it do expect_emitted_records_with_rows(delete_three_byte_event, :delete, TEST_TABLE, 5324, "mysql-bin.000048", [{"1"=>"0SL00000002", "2"=>"易变的"}, {"1"=>"0SL00000003", "2"=>"切实切实"}]) end end context 'when received update event' do it do expect_emitted_records_with_rows(update_event, :update, TEST_TABLE, 2528, "mysql-bin.000048", [{"1"=>"0SL00000001", "2"=>"wow"}, {"1"=>"0SL00000003", "2"=>"fuga"}]) end end context 'when received update event with two byte utf8 chars' do it do expect_emitted_records_with_rows(update_two_byte_event, :update, TEST_TABLE, 2528, "mysql-bin.000048", [{"1"=>"0SL00000001", "2"=>"∑ø∑"}, {"1"=>"0SL00000003", "2"=>"fügå"}]) end end context 'when received update event with three byte utf8 chars' do it do expect_emitted_records_with_rows(update_three_byte_event, :update, TEST_TABLE, 2528, "mysql-bin.000048", [{"1"=>"0SL00000001", "2"=>"很兴奋"}, {"1"=>"0SL00000003", "2"=>"興奮虎"}]) end end context 'when received alter table event' do it do expect_no_emitted_record(alter_table_add_column_event) end end #TODO: Uncomment the following test and delete the above one after supporting alter table context 'when received alter table add column event' do it do skip "Now alter table add column is not supported" expect_emitted_records(alter_table_add_column_event, { type: :alter_table, table_name: "test_table", respect_order: true, src_pos: "mysql-bin.000048\t689", table_rev: 2, # increment revision seq: 2, actions: [{ action: :add_column, column: "sum", :type=>'int4'}], }) end end context 'when received alter table drop column event' do it do skip "Now alter table drop column is not supported" expect_emitted_records(alter_table_drop_column_event, { type: :alter_table, table_name: "test_table", respect_order: true, src_pos: "mysql-bin.000048\t800", table_rev: 2, # increment revision seq: 2, actions: [{ action: :drop_column, column: "sum"}], }) end end context 'when received event with another database name' do it do event = insert_event event['db_name'] = 'another_db' expect_no_emitted_record(event) end end context 'when received event with unsupported table name' do it do event = insert_event event['table_name'] = 'another_table' expect_no_emitted_record(event) end end context 'when received unsupported event' do it do expect_no_emitted_record(query_event) expect_no_emitted_record(table_map_event) expect_no_emitted_record(xid_event) end end end context 'when rotate event is not received' do before do Test.configure_plugin(plugin, TEST_CONFIG) end it 'logs a warning and emits FET with a blank binlog file name, when it receives an insert event' do expect($log).to receive(:warn).with("Binlog file name is empty. Rotate event not received!").once expect_emitted_records_with_rows(insert_event, :insert, TEST_TABLE, 628, "", [{"1"=>"0SL00000001", "2"=>"foo"}, {"1"=>"0SL00000002", "2"=>"var"}, {"1"=>"0SL00000003", "2"=>"hoge"}]) end it 'logs a warning and emits FET with a blank binlog file name, when it receives an update event' do expect($log).to receive(:warn).with("Binlog file name is empty. Rotate event not received!").once expect_emitted_records_with_rows(update_event, :update, TEST_TABLE, 2528, "", [{"1"=>"0SL00000001", "2"=>"wow"}, {"1"=>"0SL00000003", "2"=>"fuga"}]) end it 'logs a warning emits FET with a blank binlog file name, when it receives a delete event' do expect($log).to receive(:warn).with("Binlog file name is empty. Rotate event not received!").once expect_emitted_records_with_rows(delete_event, :delete, TEST_TABLE, 5324, "", [{"1"=>"0SL00000002", "2"=>"var"}, {"1"=>"0SL00000003", "2"=>"hoge"}]) end it 'logs warning once when it receives consecutive events' do expect($log).to receive(:warn).with("Binlog file name is empty. Rotate event not received!").once expect_emitted_records_with_rows(insert_event, :insert, TEST_TABLE, 628, "", [{"1"=>"0SL00000001", "2"=>"foo"}, {"1"=>"0SL00000002", "2"=>"var"}, {"1"=>"0SL00000003", "2"=>"hoge"}]) expect_emitted_records_with_rows(update_event, :update, TEST_TABLE, 2528, "", [{"1"=>"0SL00000001", "2"=>"wow"}, {"1"=>"0SL00000003", "2"=>"fuga"}]) expect_emitted_records_with_rows(insert_event, :insert, TEST_TABLE, 628, "", [{"1"=>"0SL00000001", "2"=>"foo"}, {"1"=>"0SL00000002", "2"=>"var"}, {"1"=>"0SL00000003", "2"=>"hoge"}]) expect_emitted_records_with_rows(delete_event, :delete, TEST_TABLE, 5324, "", [{"1"=>"0SL00000002", "2"=>"var"}, {"1"=>"0SL00000003", "2"=>"hoge"}]) end end context 'when received rotate event with missing binlog file' do before do Test.configure_plugin(plugin, TEST_CONFIG) plugin.event_listener(rotate_event_corrupt) end it 'logs a warning and emits FET with a blank binlog file name, when it receives an insert event' do expect($log).to receive(:warn).with("Binlog file name is empty. Rotate event not received!").once expect_emitted_records_with_rows(insert_event, :insert, TEST_TABLE, 628, "", [{"1"=>"0SL00000001", "2"=>"foo"}, {"1"=>"0SL00000002", "2"=>"var"}, {"1"=>"0SL00000003", "2"=>"hoge"}]) end end context 'when per-table binlog file is present' do before do create_file(TEST_TABLE_BINLOG_POS, "mysql-bin.000048\t1010") Test.configure_plugin(plugin, TEST_CONFIG) plugin.event_listener(rotate_event) end it 'emits no record when it receives an event whose binlog position is less than per-table binlog position' do expect_no_emitted_record(insert_event) end it 'deletes the per-table-binlog file (if there is one) as soon as a record with bigger binlog position is received' do event = insert_event event['next_position'] = 1250 expect_emitted_records_with_rows(event, :insert, TEST_TABLE, 1211, "mysql-bin.000048", [{"1"=>"0SL00000001", "2"=>"foo"}, {"1"=>"0SL00000002", "2"=>"var"}, {"1"=>"0SL00000003", "2"=>"hoge"}]) expect(File.exists?(TEST_TABLE_BINLOG_POS)).to be(false) end it 'emits no record when it receives an event whose binlog position is equal to per-table binlog position' do event = insert_event event['next_position'] = 1049 expect_no_emitted_record(event) end end end end end