# encoding: utf-8 require 'fluent_plugins_spec_helper' require 'in_mysql_binlog_flydata' require 'timecop' module Fluent FLYDATA_HOME = Flydata::FLYDATA_HOME TEST_TAG = "test_tag" TEST_DB = "test_db" TEST_TABLE = "test_table" #TEST_SEQUENCE_FILE = /positions\/#{TEST_TABLE}.pos$/ TEST_SEQUENCE_FILE = File.join(FLYDATA_HOME, "positions/#{TEST_TABLE}.pos") TEST_SEQUENCE_NUM = 1 TEST_TABLE_BINLOG_POS = File.join(FLYDATA_HOME, "positions/#{TEST_TABLE}.binlog.pos") TEST_TABLES = "#{TEST_TABLE},test_table_1,test_table_2,test_table_3" TEST_POSITION_FILE = "test_position.binlog.pos" TEST_REVISION_FILE = File.join(FLYDATA_HOME, "positions/#{TEST_TABLE}.rev") TEST_TIMESTAMP = 1389214083 TEST_TABLE_APPEND_ONLY = "test_table_4" TEST_SEQUENCE_FILE_1 = File.join(FLYDATA_HOME, "positions/#{TEST_TABLE_APPEND_ONLY}.pos") TEST_CONFIG = <0, "timestamp"=>#{TEST_TIMESTAMP}, "type_code"=>23, "server_id"=>1, "event_length"=>39, "next_position"=>667, "flags"=>1, "event_type"=>"Write_rows", "table_id"=>163, "db_name"=>"#{TEST_DB}", "table_name"=>"#{TEST_TABLE}", "columns"=>["LONG", "BLOB"], "columns_len"=>2, "null_bits_len"=>1, "raw_columns_before_image"=>[], "raw_used_columns"=>[255], "raw_row"=>[252, 3, 0, 0, 0, 3, 0, 102, 111, 111], "rows"=>[["1", "foo"],["2","var"],["3","hoge"]]} EOT TEST_EVENT_TWO_BYTE_INSERT=<0, "timestamp"=>#{TEST_TIMESTAMP}, "type_code"=>23, "server_id"=>1, "event_length"=>39, "next_position"=>667, "flags"=>1, "event_type"=>"Write_rows", "table_id"=>163, "db_name"=>"#{TEST_DB}", "table_name"=>"#{TEST_TABLE}", "columns"=>["LONG", "BLOB"], "columns_len"=>2, "null_bits_len"=>1, "raw_columns_before_image"=>[], "raw_used_columns"=>[255], "raw_row"=>[252, 3, 0, 0, 0, 3, 0, 102, 111, 111], "rows"=>[["1", "føø"],["2","vår"],["3","høgé"]]} EOT TEST_EVENT_THREE_BYTE_INSERT=<0, "timestamp"=>#{TEST_TIMESTAMP}, "type_code"=>23, "server_id"=>1, "event_length"=>39, "next_position"=>667, "flags"=>1, "event_type"=>"Write_rows", "table_id"=>163, "db_name"=>"#{TEST_DB}", "table_name"=>"#{TEST_TABLE}", "columns"=>["LONG", "BLOB"], "columns_len"=>2, "null_bits_len"=>1, "raw_columns_before_image"=>[], "raw_used_columns"=>[255], "raw_row"=>[252, 3, 0, 0, 0, 3, 0, 102, 111, 111], "rows"=>[["1", "富无无"],["2","易变的"],["3","切实切实"]]} EOT # - 18 UPDATE_ROWS_EVENT TEST_EVENT_UPDATE = <0, "timestamp"=>#{TEST_TIMESTAMP}, "type_code"=>24, "server_id"=>1, "event_length"=>78, "next_position"=>2606, "flags"=>1, "event_type"=>"Update_rows", "table_id"=>163, "db_name"=>"#{TEST_DB}", "table_name"=>"#{TEST_TABLE}", "columns"=>["LONG", "BLOB"], "columns_len"=>2, "null_bits_len"=>1, "raw_columns_before_image"=>[255], "raw_used_columns"=>[255], "raw_row"=>[252, 6, 0, 0, 0, 1, 0, 97, 252, 6, 0, 0, 0, 1, 0, 100, 252, 6, 0, 0, 0, 1, 0, 98, 252, 6, 0, 0, 0, 1, 0, 100, 252, 6, 0, 0, 0, 1, 0, 99, 252, 6, 0, 0, 0, 1, 0, 100], "rows"=>[[["1", "foo"], ["1", "wow"]],[["3", "hoge"], ["3", "fuga"]]]} EOT TEST_EVENT_TWO_BYTE_UPDATE = <0, "timestamp"=>#{TEST_TIMESTAMP}, "type_code"=>24, "server_id"=>1, "event_length"=>78, "next_position"=>2606, "flags"=>1, "event_type"=>"Update_rows", "table_id"=>163, "db_name"=>"#{TEST_DB}", "table_name"=>"#{TEST_TABLE}", "columns"=>["LONG", "BLOB"], "columns_len"=>2, "null_bits_len"=>1, "raw_columns_before_image"=>[255], "raw_used_columns"=>[255], "raw_row"=>[252, 6, 0, 0, 0, 1, 0, 97, 252, 6, 0, 0, 0, 1, 0, 100, 252, 6, 0, 0, 0, 1, 0, 98, 252, 6, 0, 0, 0, 1, 0, 100, 252, 6, 0, 0, 0, 1, 0, 99, 252, 6, 0, 0, 0, 1, 0, 100], "rows"=>[[["1", "føø"], ["1", "∑ø∑"]],[["3", "høgé"], ["3", "fügå"]]]} EOT TEST_EVENT_THREE_BYTE_UPDATE = <0, "timestamp"=>#{TEST_TIMESTAMP}, "type_code"=>24, "server_id"=>1, "event_length"=>78, "next_position"=>2606, "flags"=>1, "event_type"=>"Update_rows", "table_id"=>163, "db_name"=>"#{TEST_DB}", "table_name"=>"#{TEST_TABLE}", "columns"=>["LONG", "BLOB"], "columns_len"=>2, "null_bits_len"=>1, "raw_columns_before_image"=>[255], "raw_used_columns"=>[255], "raw_row"=>[252, 6, 0, 0, 0, 1, 0, 97, 252, 6, 0, 0, 0, 1, 0, 100, 252, 6, 0, 0, 0, 1, 0, 98, 252, 6, 0, 0, 0, 1, 0, 100, 252, 6, 0, 0, 0, 1, 0, 99, 252, 6, 0, 0, 0, 1, 0, 100], "rows"=>[[["1", "富无无"], ["1", "很兴奋"]],[["3", "切实切实"], ["3", "興奮虎"]]]} EOT # - 19 DELETE_ROWS_EVENT TEST_EVENT_DELETE = <0, "timestamp"=>#{TEST_TIMESTAMP}, "type_code"=>25, "server_id"=>1, "event_length"=>41, "next_position"=>5365, "flags"=>1, "event_type"=>"Delete_rows", "table_id"=>170, "db_name"=>"#{TEST_DB}", "table_name"=>"#{TEST_TABLE}", "columns"=>["LONG", "BLOB"], "columns_len"=>2, "null_bits_len"=>1, "raw_columns_before_image"=>[], "raw_used_columns"=>[255], "raw_row"=>[252, 2, 0, 0, 0, 5, 0, 104, 111, 104, 111, 103], "rows"=>[["2", "var"],["3","hoge"]]} EOT TEST_EVENT_TWO_BYTE_DELETE = <0, "timestamp"=>#{TEST_TIMESTAMP}, "type_code"=>25, "server_id"=>1, "event_length"=>41, "next_position"=>5365, "flags"=>1, "event_type"=>"Delete_rows", "table_id"=>170, "db_name"=>"#{TEST_DB}", "table_name"=>"#{TEST_TABLE}", "columns"=>["LONG", "BLOB"], "columns_len"=>2, "null_bits_len"=>1, "raw_columns_before_image"=>[], "raw_used_columns"=>[255], "raw_row"=>[252, 2, 0, 0, 0, 5, 0, 104, 111, 104, 111, 103], "rows"=>[["2", "vår"],["3","høgé"]]} EOT TEST_EVENT_THREE_BYTE_DELETE = <0, "timestamp"=>#{TEST_TIMESTAMP}, "type_code"=>25, "server_id"=>1, "event_length"=>41, "next_position"=>5365, "flags"=>1, "event_type"=>"Delete_rows", "table_id"=>170, "db_name"=>"#{TEST_DB}", "table_name"=>"#{TEST_TABLE}", "columns"=>["LONG", "BLOB"], "columns_len"=>2, "null_bits_len"=>1, "raw_columns_before_image"=>[], "raw_used_columns"=>[255], "raw_row"=>[252, 2, 0, 0, 0, 5, 0, 104, 111, 104, 111, 103], "rows"=>[["2", "易变的"],["3","切实切实"]]} EOT # Unsupported event # - 02 QUERY_EVENT # QUERY: BEGIN TEST_EVENT_QUERY_BEGIN = <0, "timestamp"=>#{TEST_TIMESTAMP}, "type_code"=>2, "server_id"=>1, "event_length"=>73, "next_position"=>2472, "flags"=>8, "event_type"=>"Query", "thread_id"=>71, "exec_time"=>0, "error_code"=>0, "variables"=>[0, 0, 0, 0, 0, 1, 0, 0, 0, 0, 0, 0, 0, 0, 6, 3, 115, 116, 100, 4, 33, 0, 33, 0, 8, 0], "db_name"=>"#{TEST_DB}", "query"=>"BEGIN"} EOT # QUERY: create database TEST_EVENT_QUERY_CREATE_DATABSE = <0, "timestamp"=>1389309478, "type_code"=>2, "server_id"=>1, "event_length"=>89, "next_position"=>196, "flags"=>8, "event_type"=>"Query", "thread_id"=>39, "exec_time"=>0, "error_code"=>0, "variables"=>[0, 0, 0, 0, 0, 1, 0, 0, 0, 0, 0, 0, 0, 0, 6, 3, 115, 116, 100, 4, 33, 0, 33, 0, 8, 0], "db_name"=>"test_db", "query"=>"create database test_db"} EOT # QUERY: create table TEST_EVENT_QUERY_CREATE_TABLE = <0, "timestamp"=>#{TEST_TIMESTAMP}, "type_code"=>2, "server_id"=>1, "event_length"=>121, "next_position"=>317, "flags"=>0, "event_type"=>"Query", "thread_id"=>42, "exec_time"=>0, "error_code"=>0, "variables"=>[0, 0, 0, 0, 0, 1, 0, 0, 0, 0, 0, 0, 0, 0, 6, 3, 115, 116, 100, 4, 33, 0, 33, 0, 8, 0], "db_name"=>"test_db", "query"=>"create table test_table(id int primary key, value text)"} EOT # QUERY: drop table TEST_EVENT_QUERY_DROP_TABLE = <0, "timestamp"=>#{TEST_TIMESTAMP}, "type_code"=>2, "server_id"=>1, "event_length"=>115, "next_position"=>568, "flags"=>0, "event_type"=>"Query", "thread_id"=>42, "exec_time"=>0, "error_code"=>0, "variables"=>[0, 0, 0, 0, 0, 1, 0, 0, 0, 0, 0, 0, 0, 0, 6, 3, 115, 116, 100, 4, 33, 0, 33, 0, 8, 0], "db_name"=>"test_db", "query"=>"DROP TABLE `test_table` /* generated by server */"} EOT # QUERY: alter table add column TEST_EVENT_QUERY_ALTER_TABLE_ADD_COLUMN = <0, "timestamp"=>#{TEST_TIMESTAMP}, "type_code"=>2, "server_id"=>1, "event_length"=>111, "next_position"=>800, "flags"=>0, "event_type"=>"Query", "thread_id"=>42, "exec_time"=>0, "error_code"=>0, "variables"=>[0, 0, 0, 0, 0, 1, 0, 0, 0, 0, 0, 0, 0, 0, 6, 3, 115, 116, 100, 4, 33, 0, 33, 0, 8, 0], "db_name"=>"test_db", "query"=>"alter table test_table add column sum integer"} EOT # QUERY: alter table drop column TEST_EVENT_QUERY_ALTER_TABLE_DROP_COLUMN = <0, "timestamp"=>#{TEST_TIMESTAMP}, "type_code"=>2, "server_id"=>1, "event_length"=>104, "next_position"=>904, "flags"=>0, "event_type"=>"Query", "thread_id"=>42, "exec_time"=>0, "error_code"=>0, "variables"=>[0, 0, 0, 0, 0, 1, 0, 0, 0, 0, 0, 0, 0, 0, 6, 3, 115, 116, 100, 4, 33, 0, 33, 0, 8, 0], "db_name"=>"test_db", "query"=>"alter table test_table drop column sum"} EOT # QUERY: alter table modify column TEST_EVENT_QUERY_ALTER_TABLE_MODIFY_COLUMN = <0, "timestamp"=>#{TEST_TIMESTAMP}, "type_code"=>2, "server_id"=>1, "event_length"=>112, "next_position"=>1352, "flags"=>0, "event_type"=>"Query", "thread_id"=>48, "exec_time"=>0, "error_code"=>0, "variables"=>[0, 0, 0, 0, 0, 1, 0, 0, 0, 0, 0, 0, 0, 0, 6, 3, 115, 116, 100, 4, 33, 0, 33, 0, 8, 0], "db_name"=>"test_db", "query"=>"alter table test_table modify column sum float"} EOT # QUERY: alter table add index TEST_EVENT_QUERY_ALTER_TABLE_ADD_INDEX = <0, "timestamp"=>#{TEST_TIMESTAMP}, "type_code"=>2, "server_id"=>1, "event_length"=>217, "next_position"=>337, "flags"=>0, "event_type"=>"Query", "thread_id"=>10097, "exec_time"=>0, "error_code"=>0, "variables"=>[0, 0, 0, 0, 0, 1, 0, 0, 0, 64, 0, 0, 0, 0, 6, 3, 115, 116, 100, 4, 33, 0, 33, 0, 8, 0, 12, 1, 109, 97, 107, 95, 100, 101, 118, 101, 108, 111, 112, 109, 101, 110, 116, 0], "db_name"=>"test_db", "query"=>"ALTER TABLE `test_db`.`test_table` \nADD INDEX `my_index` USING HASH (`extra` ASC) COMMENT 'How does this look in binlog?'"} EOT # - 03 STOP_EVENT TEST_EVENT_STOP = <0, "timestamp"=>#{TEST_TIMESTAMP}, "type_code"=>3, "server_id"=>1, "event_length"=>19, "next_position"=>2929, "flags"=>0, "event_type"=>"Stop"} EOT # - 04 ROTATE_EVENT TEST_EVENT_ROTATE = <0, "timestamp"=>0, "type_code"=>4, "server_id"=>1, "event_length"=>43, "next_position"=>0, "flags"=>32, "event_type"=>"Rotate", "binlog_file"=>"mysql-bin.000048", "binlog_pos"=>2883} EOT # - 0f FORMAT_DESCRIPTION_EVENT TEST_EVENT_FORMAT_DESC = <0, "timestamp"=>1389292075, "type_code"=>15, "server_id"=>1, "event_length"=>103, "next_position"=>0, "flags"=>0, "event_type"=>"Format_desc", "binlog_version"=>0, "created_ts"=>2560145104, "log_header_len"=>71} EOT # - 10 XID_EVENT TEST_EVENT_XID = <0, "timestamp"=>#{TEST_TIMESTAMP}, "type_code"=>16, "server_id"=>1, "event_length"=>27, "next_position"=>2633, "flags"=>0, "event_type"=>"Xid", "xid_id"=>129345} EOT # - 13 TABLE_MAP_EVENT TEST_EVENT_TABLE_MAP = <0, "timestamp"=>#{TEST_TIMESTAMP}, "type_code"=>19, "server_id"=>1, "event_length"=>56, "next_position"=>2528, "flags"=>1, "event_type"=>"Table_map", "table_id"=>163, "db_name"=>"#{TEST_DB}", "table_name"=>"#{TEST_TABLE}", "raw_columns"=>[3, 252], "columns"=>["LONG", "BLOB"], "metadata"=>[2], "null_bits"=>[3]} EOT # la TEST_EVENT_INCIDENT = <0, "timestamp"=>0, "type_code"=>26, "server_id"=>1, "event_length"=>40, "next_position"=>2883, "flags"=>32, "event_type"=>"Incident", "incident_type"=>175, "message"=>"Operation canceled"} EOT # - 04 ROTATE_EVENT TEST_EVENT_ROTATE_MISSING_BINLOG_FILE = <0, "timestamp"=>0, "type_code"=>4, "server_id"=>1, "event_length"=>43, "next_position"=>0, "flags"=>32, "event_type"=>"Rotate", "binlog_pos"=>2883} EOT describe MysqlBinlogFlydataInput do def create_event(event_str) event = eval(event_str) allow(event).to receive(:event_type).and_return(event["event_type"]) event end def expect_emitted_records(event, records) records = [records] unless records.kind_of?(Array) records.each do |r| expect(Engine).to receive(:emit).with(TEST_TAG, TEST_TIMESTAMP, r).ordered end plugin.event_listener(event) end def expect_no_emitted_record(event) expect(Engine).to receive(:emit).never plugin.event_listener(event) end def expect_emitted_records_with_rows(event, type, table, position, binlog_file, rows, start_seq = nil) rows = [rows] unless rows.kind_of?(Array) @seq ||= TEST_SEQUENCE_NUM records = rows.collect do |row| record = row.kind_of?(Hash) && row.keys.include?(:row) ? row : { row: row } @seq += 1 record.merge({ :type=>type, :table_name=>table, :respect_order=>true, :seq=>@seq, :src_pos=>"#{binlog_file}\t#{position}", :table_rev=>1 }) end expect_emitted_records(event, records) end let(:plugin) { MysqlBinlogFlydataInput.new } let(:insert_event) { create_event(TEST_EVENT_INSERT) } let(:insert_two_byte_event) { create_event(TEST_EVENT_TWO_BYTE_INSERT) } let(:insert_three_byte_event) { create_event(TEST_EVENT_THREE_BYTE_INSERT) } let(:delete_event) { create_event(TEST_EVENT_DELETE) } let(:delete_two_byte_event) { create_event(TEST_EVENT_TWO_BYTE_DELETE) } let(:delete_three_byte_event) { create_event(TEST_EVENT_THREE_BYTE_DELETE) } let(:update_event) { create_event(TEST_EVENT_UPDATE) } let(:update_two_byte_event) { create_event(TEST_EVENT_TWO_BYTE_UPDATE) } let(:update_three_byte_event) { create_event(TEST_EVENT_THREE_BYTE_UPDATE) } let(:alter_table_add_column_event) { create_event(TEST_EVENT_QUERY_ALTER_TABLE_ADD_COLUMN) } let(:alter_table_drop_column_event) { create_event(TEST_EVENT_QUERY_ALTER_TABLE_DROP_COLUMN) } let(:alter_table_add_index_event) { create_event(TEST_EVENT_QUERY_ALTER_TABLE_ADD_INDEX) } let(:query_event) { create_event(TEST_EVENT_QUERY_CREATE_DATABSE) } let(:table_map_event) { create_event(TEST_EVENT_TABLE_MAP) } let(:xid_event) { create_event(TEST_EVENT_XID) } let(:rotate_event){ create_event(TEST_EVENT_ROTATE) } let(:rotate_event_corrupt){ create_event(TEST_EVENT_ROTATE_MISSING_BINLOG_FILE) } let(:now) { Time.now } let(:binlog_position_file) do f = double('binlog_position_file') allow(f).to receive(:exists?).and_return(true) allow(f).to receive(:read).and_return('test_position') f end def create_file(file_path, content) File.open(file_path, 'w') {|f| f.write(content)} end def delete_file(file_path) FileUtils.rm(file_path) if File.exists?(file_path) end def setup_initial_flydata_files %w(positions dump conf).each{|f| FileUtils.mkdir_p(File.join(FLYDATA_HOME, f))} create_file(TEST_SEQUENCE_FILE, TEST_SEQUENCE_NUM.to_s) create_file(TEST_SEQUENCE_FILE_1, TEST_SEQUENCE_NUM.to_s) end def cleanup_flydata_files %w(positions dump conf).each{|f| FileUtils.rm_rf(File.join(FLYDATA_HOME, f))} delete_file(TEST_SEQUENCE_FILE) delete_file(TEST_SEQUENCE_FILE_1) end before do cleanup_flydata_files setup_initial_flydata_files allow(MysqlBinlogInput::BinlogUtil).to receive(:to_hash) {|e| e} allow(Mysql::BinLogPositionFile).to receive(:new).with(TEST_POSITION_FILE).and_return(binlog_position_file) allow(Mysql::TableMeta).to receive(:update) allow(plugin).to receive(:`).with("mysql -V").and_return("mysql Ver 14.14 Distrib 5.5.40") allow(plugin).to receive(:`).with(/^echo 'select version\(\);'/).and_return("version()\n5.5.40-0ubuntu0.12.04.1-log") Timecop.freeze(now) end after do cleanup_flydata_files Timecop.return end describe '#event_listener' do context 'when received rotate event' do before do Test.configure_plugin(plugin, TEST_CONFIG) plugin.event_listener(rotate_event) end context 'when received insert event' do it do expect_emitted_records_with_rows(insert_event, :insert, TEST_TABLE, 628, "mysql-bin.000048", [{"1"=>"0SL00000001", "2"=>"foo"}, {"1"=>"0SL00000002", "2"=>"var"}, {"1"=>"0SL00000003", "2"=>"hoge"}]) end end context 'when received insert event containing two byte UTF8 chars' do it do expect_emitted_records_with_rows(insert_two_byte_event, :insert, TEST_TABLE, 628, "mysql-bin.000048", [{"1"=>"0SL00000001", "2"=>"føø"}, {"1"=>"0SL00000002", "2"=>"vår"}, {"1"=>"0SL00000003", "2"=>"høgé"}]) end end context 'when received insert event containing three byte UTF8 chars' do it do expect_emitted_records_with_rows(insert_three_byte_event, :insert, TEST_TABLE, 628, "mysql-bin.000048", [{"1"=>"0SL00000001", "2"=>"富无无"}, {"1"=>"0SL00000002", "2"=>"易变的"}, {"1"=>"0SL00000003", "2"=>"切实切实"}]) end end context 'when received delete event' do it do expect_emitted_records_with_rows(delete_event, :delete, TEST_TABLE, 5324, "mysql-bin.000048", [{"1"=>"0SL00000002", "2"=>"var"}, {"1"=>"0SL00000003", "2"=>"hoge"}]) end end context 'when received delete event containing two byte UTF8 chars' do it do expect_emitted_records_with_rows(delete_two_byte_event, :delete, TEST_TABLE, 5324, "mysql-bin.000048", [{"1"=>"0SL00000002", "2"=>"vår"}, {"1"=>"0SL00000003", "2"=>"høgé"}]) end end context 'when received delete event with containing byte UTF8 chars' do it do expect_emitted_records_with_rows(delete_three_byte_event, :delete, TEST_TABLE, 5324, "mysql-bin.000048", [{"1"=>"0SL00000002", "2"=>"易变的"}, {"1"=>"0SL00000003", "2"=>"切实切实"}]) end end context 'when received update event' do it do expect_emitted_records_with_rows(update_event, :update, TEST_TABLE, 2528, "mysql-bin.000048", [{old:{"1"=>"0SL00000001", "2"=>"foo"}, row:{"1"=>"0SL00000001", "2"=>"wow"}}, {old:{"1"=>"0SL00000003", "2"=>"hoge"}, row:{"1"=>"0SL00000003", "2"=>"fuga"}}]) end end context 'when received update event with two byte utf8 chars' do it do expect_emitted_records_with_rows(update_two_byte_event, :update, TEST_TABLE, 2528, "mysql-bin.000048", [{old:{"1"=>"0SL00000001", "2"=>"føø"}, row:{"1"=>"0SL00000001", "2"=>"∑ø∑"}}, {old:{"1"=>"0SL00000003", "2"=>"høgé"}, row:{"1"=>"0SL00000003", "2"=>"fügå"}}]) end end context 'when received update event with three byte utf8 chars' do it do expect_emitted_records_with_rows(update_three_byte_event, :update, TEST_TABLE, 2528, "mysql-bin.000048", [{old:{"1"=>"0SL00000001", "2"=>"富无无"}, row:{"1"=>"0SL00000001", "2"=>"很兴奋"}}, {old:{"1"=>"0SL00000003", "2"=>"切实切实"}, row:{"1"=>"0SL00000003", "2"=>"興奮虎"}}]) end end context 'when received alter table add column event' do it do expect_emitted_records(alter_table_add_column_event, { type: :alter_table, table_name: "test_table", respect_order: true, src_pos: "mysql-bin.000048\t689", table_rev: 2, # increment revision seq: 2, actions: [{ action: :add_column, column: "sum", :type=>'int4(11)', :query=>'add column sum integer'}], }) end end context 'when received alter table drop column event' do it do expect_emitted_records(alter_table_drop_column_event, { type: :alter_table, table_name: "test_table", respect_order: true, src_pos: "mysql-bin.000048\t800", table_rev: 2, # increment revision seq: 2, actions: [{ action: :drop_column, column: "sum", :query=>'drop column sum'}], }) end end context 'when received alter table add index event' do it 'emits a nonbreaking event without table_rev increment' do expect_emitted_records(alter_table_add_index_event, { type: :alter_table, table_name: "test_table", schema_name: "test_db", respect_order: true, src_pos: "mysql-bin.000048\t#{337 - 217}", table_rev: 1, seq: 2, actions: [{ action: :add_index, support_level: :nonbreaking, query: "ADD INDEX `my_index` USING HASH (`extra` ASC) COMMENT 'How does this look in binlog?'", }], }) end end context 'when received event with another database name' do it do event = insert_event event['db_name'] = 'another_db' expect_no_emitted_record(event) end end context 'when received event with unsupported table name' do it do event = insert_event event['table_name'] = 'another_table' expect_no_emitted_record(event) end end context 'when received unsupported event' do it do expect_no_emitted_record(query_event) expect_no_emitted_record(table_map_event) expect_no_emitted_record(xid_event) end end end context 'when rotate event is not received' do before do Test.configure_plugin(plugin, TEST_CONFIG) end it 'logs a warning and emits FET with a blank binlog file name, when it receives an insert event' do expect($log).to receive(:warn).with("Binlog file name is empty. Rotate event not received!").once expect_emitted_records_with_rows(insert_event, :insert, TEST_TABLE, 628, "", [{"1"=>"0SL00000001", "2"=>"foo"}, {"1"=>"0SL00000002", "2"=>"var"}, {"1"=>"0SL00000003", "2"=>"hoge"}]) end it 'logs a warning and emits FET with a blank binlog file name, when it receives an update event' do expect($log).to receive(:warn).with("Binlog file name is empty. Rotate event not received!").once expect_emitted_records_with_rows(update_event, :update, TEST_TABLE, 2528, "", [{old:{"1"=>"0SL00000001", "2"=>"foo"}, row:{"1"=>"0SL00000001", "2"=>"wow"}}, {old:{"1"=>"0SL00000003", "2"=>"hoge"}, row:{"1"=>"0SL00000003", "2"=>"fuga"}}]) end it 'logs a warning emits FET with a blank binlog file name, when it receives a delete event' do expect($log).to receive(:warn).with("Binlog file name is empty. Rotate event not received!").once expect_emitted_records_with_rows(delete_event, :delete, TEST_TABLE, 5324, "", [{"1"=>"0SL00000002", "2"=>"var"}, {"1"=>"0SL00000003", "2"=>"hoge"}]) end it 'logs warning once when it receives consecutive events' do expect($log).to receive(:warn).with("Binlog file name is empty. Rotate event not received!").once expect_emitted_records_with_rows(insert_event, :insert, TEST_TABLE, 628, "", [{"1"=>"0SL00000001", "2"=>"foo"}, {"1"=>"0SL00000002", "2"=>"var"}, {"1"=>"0SL00000003", "2"=>"hoge"}]) expect_emitted_records_with_rows(update_event, :update, TEST_TABLE, 2528, "", [{old:{"1"=>"0SL00000001", "2"=>"foo"}, row:{"1"=>"0SL00000001", "2"=>"wow"}}, {old:{"1"=>"0SL00000003", "2"=>"hoge"}, row:{"1"=>"0SL00000003", "2"=>"fuga"}}]) expect_emitted_records_with_rows(insert_event, :insert, TEST_TABLE, 628, "", [{"1"=>"0SL00000001", "2"=>"foo"}, {"1"=>"0SL00000002", "2"=>"var"}, {"1"=>"0SL00000003", "2"=>"hoge"}]) expect_emitted_records_with_rows(delete_event, :delete, TEST_TABLE, 5324, "", [{"1"=>"0SL00000002", "2"=>"var"}, {"1"=>"0SL00000003", "2"=>"hoge"}]) end end context 'when received rotate event with missing binlog file' do before do Test.configure_plugin(plugin, TEST_CONFIG) plugin.event_listener(rotate_event_corrupt) end it 'logs a warning and emits FET with a blank binlog file name, when it receives an insert event' do expect($log).to receive(:warn).with("Binlog file name is empty. Rotate event not received!").once expect_emitted_records_with_rows(insert_event, :insert, TEST_TABLE, 628, "", [{"1"=>"0SL00000001", "2"=>"foo"}, {"1"=>"0SL00000002", "2"=>"var"}, {"1"=>"0SL00000003", "2"=>"hoge"}]) end end context 'when per-table binlog file is present' do before do create_file(TEST_TABLE_BINLOG_POS, "mysql-bin.000048\t1010") Test.configure_plugin(plugin, TEST_CONFIG) plugin.event_listener(rotate_event) end it 'emits no record when it receives an event whose binlog position is less than per-table binlog position' do expect_no_emitted_record(insert_event) end it 'deletes the per-table-binlog file (if there is one) as soon as a record with bigger binlog position is received' do event = insert_event event['next_position'] = 1250 expect_emitted_records_with_rows(event, :insert, TEST_TABLE, 1211, "mysql-bin.000048", [{"1"=>"0SL00000001", "2"=>"foo"}, {"1"=>"0SL00000002", "2"=>"var"}, {"1"=>"0SL00000003", "2"=>"hoge"}]) expect(File.exists?(TEST_TABLE_BINLOG_POS)).to be(false) end it 'emits no record when it receives an event whose binlog position is equal to per-table binlog position' do event = insert_event event['next_position'] = 1049 expect_no_emitted_record(event) end end context 'for append only' do shared_examples 'emits records correctly' do it 'emits records when it receives an insert event for append only table' do event = insert_event event['table_name'] = TEST_TABLE_APPEND_ONLY expect_emitted_records_with_rows(event, :insert, TEST_TABLE_APPEND_ONLY, 628, "mysql-bin.000048", [{"1"=>"0SL00000001", "2"=>"foo"}, {"1"=>"0SL00000002", "2"=>"var"}, {"1"=>"0SL00000003", "2"=>"hoge"}]) end it 'does not emit a record when it receives a delete event for append only table' do event = delete_event event['table_name'] = TEST_TABLE_APPEND_ONLY expect_no_emitted_record(event) end it 'emits records when it receives an update event for append only table' do event = update_event event['table_name'] = TEST_TABLE_APPEND_ONLY expect_emitted_records_with_rows(event, :update, TEST_TABLE_APPEND_ONLY, 2528, "mysql-bin.000048", [{old:{"1"=>"0SL00000001", "2"=>"foo"}, row:{"1"=>"0SL00000001", "2"=>"wow"}}, {old:{"1"=>"0SL00000003", "2"=>"hoge"}, row:{"1"=>"0SL00000003", "2"=>"fuga"}}]) end it 'emits a record when it receives a delete event for non-append only table' do expect_emitted_records_with_rows(delete_event, :delete, TEST_TABLE, 5324, "mysql-bin.000048", [{"1"=>"0SL00000002", "2"=>"var"}, {"1"=>"0SL00000003", "2"=>"hoge"}]) end it 'emits records when it receives an insert event for non-append only table' do expect_emitted_records_with_rows(insert_event, :insert, TEST_TABLE, 628, "mysql-bin.000048", [{"1"=>"0SL00000001", "2"=>"foo"}, {"1"=>"0SL00000002", "2"=>"var"}, {"1"=>"0SL00000003", "2"=>"hoge"}]) end end context 'no duplicate entries in tables and tables_append_only' do before do Test.configure_plugin(plugin, TEST_TABLES_APPEND_ONLY_CONFIG) plugin.event_listener(rotate_event) end include_examples 'emits records correctly' end context 'duplicate entries in tables and tables_append_only' do before do Test.configure_plugin(plugin, TEST_TABLES_DUPLICATE_CONFIG) plugin.event_listener(rotate_event) end include_examples 'emits records correctly' end context 'do not watch table if no position file is found' do # Stub SyncFileManager.new(nil).get_new_table_list() and return [TEST_TABLE] # Need to make sure no event is sent... how do I do that let(:sync_fm) { double('sync_fm') } before do Flydata::SyncFileManager.any_instance.should_receive(:get_new_table_list).with(TEST_TABLES.split(","), "pos").and_return([TEST_TABLE]) Test.configure_plugin(plugin, TEST_CONFIG) plugin.event_listener(rotate_event) end it do event = insert_event event['table_name'] = TEST_TABLE expect_no_emitted_record(event) end end end end end end