# encoding: utf-8

require 'fluent_plugins_spec_helper'
require 'in_mysql_binlog_flydata'
require 'timecop'

module Fluent

  FLYDATA_HOME = Flydata::FLYDATA_HOME
  TEST_TAG = "test_tag"
  TEST_DB = "test_db"
  TEST_TABLE = "test_table"
  #TEST_SEQUENCE_FILE = /positions\/#{TEST_TABLE}.pos$/
  TEST_SEQUENCE_FILE = File.join(FLYDATA_HOME, "positions/#{TEST_TABLE}.pos")
  TEST_SEQUENCE_NUM = 1
  TEST_TABLE_BINLOG_POS = File.join(FLYDATA_HOME, "positions/#{TEST_TABLE}.binlog.pos")
  TEST_TABLES = "#{TEST_TABLE},test_table_1,test_table_2,test_table_3"
  TEST_POSITION_FILE = "test_position.log"
  TEST_REVISION_FILE = File.join(FLYDATA_HOME, "positions/#{TEST_TABLE}.rev")
  TEST_TIMESTAMP = 1389214083
  TEST_CONFIG = <<EOT
  tag #{TEST_TAG}
  database #{TEST_DB}
  tables #{TEST_TABLES}
  position_file #{TEST_POSITION_FILE}
EOT

  # mysqlbinlog event
  #   http://dev.mysql.com/doc/refman/5.7/en/mysqlbinlog-hexdump.html

  # Supported events
  # - 17 WRITE_ROWS_EVENT
  TEST_EVENT_INSERT=<<EOT
{"marker"=>0, "timestamp"=>#{TEST_TIMESTAMP}, "type_code"=>23, "server_id"=>1, "event_length"=>39, "next_position"=>667, "flags"=>1, "event_type"=>"Write_rows", "table_id"=>163, "db_name"=>"#{TEST_DB}", "table_name"=>"#{TEST_TABLE}", "columns"=>["LONG", "BLOB"], "columns_len"=>2, "null_bits_len"=>1, "raw_columns_before_image"=>[], "raw_used_columns"=>[255], "raw_row"=>[252, 3, 0, 0, 0, 3, 0, 102, 111, 111], "rows"=>[["1", "foo"],["2","var"],["3","hoge"]]}
EOT
  TEST_EVENT_TWO_BYTE_INSERT=<<EOT
{"marker"=>0, "timestamp"=>#{TEST_TIMESTAMP}, "type_code"=>23, "server_id"=>1, "event_length"=>39, "next_position"=>667, "flags"=>1, "event_type"=>"Write_rows", "table_id"=>163, "db_name"=>"#{TEST_DB}", "table_name"=>"#{TEST_TABLE}", "columns"=>["LONG", "BLOB"], "columns_len"=>2, "null_bits_len"=>1, "raw_columns_before_image"=>[], "raw_used_columns"=>[255], "raw_row"=>[252, 3, 0, 0, 0, 3, 0, 102, 111, 111], "rows"=>[["1", "føø"],["2","vår"],["3","høgé"]]}
EOT
  TEST_EVENT_THREE_BYTE_INSERT=<<EOT
{"marker"=>0, "timestamp"=>#{TEST_TIMESTAMP}, "type_code"=>23, "server_id"=>1, "event_length"=>39, "next_position"=>667, "flags"=>1, "event_type"=>"Write_rows", "table_id"=>163, "db_name"=>"#{TEST_DB}", "table_name"=>"#{TEST_TABLE}", "columns"=>["LONG", "BLOB"], "columns_len"=>2, "null_bits_len"=>1, "raw_columns_before_image"=>[], "raw_used_columns"=>[255], "raw_row"=>[252, 3, 0, 0, 0, 3, 0, 102, 111, 111], "rows"=>[["1", "富无无"],["2","易变的"],["3","切实切实"]]}
EOT
  # - 18 UPDATE_ROWS_EVENT
  TEST_EVENT_UPDATE = <<EOT
{"marker"=>0, "timestamp"=>#{TEST_TIMESTAMP}, "type_code"=>24, "server_id"=>1, "event_length"=>78, "next_position"=>2606, "flags"=>1, "event_type"=>"Update_rows", "table_id"=>163, "db_name"=>"#{TEST_DB}", "table_name"=>"#{TEST_TABLE}", "columns"=>["LONG", "BLOB"], "columns_len"=>2, "null_bits_len"=>1, "raw_columns_before_image"=>[255], "raw_used_columns"=>[255], "raw_row"=>[252, 6, 0, 0, 0, 1, 0, 97, 252, 6, 0, 0, 0, 1, 0, 100, 252, 6, 0, 0, 0, 1, 0, 98, 252, 6, 0, 0, 0, 1, 0, 100, 252, 6, 0, 0, 0, 1, 0, 99, 252, 6, 0, 0, 0, 1, 0, 100], "rows"=>[[["1", "foo"], ["1", "wow"]],[["3", "hoge"], ["3", "fuga"]]]}
EOT
  TEST_EVENT_TWO_BYTE_UPDATE = <<EOT
{"marker"=>0, "timestamp"=>#{TEST_TIMESTAMP}, "type_code"=>24, "server_id"=>1, "event_length"=>78, "next_position"=>2606, "flags"=>1, "event_type"=>"Update_rows", "table_id"=>163, "db_name"=>"#{TEST_DB}", "table_name"=>"#{TEST_TABLE}", "columns"=>["LONG", "BLOB"], "columns_len"=>2, "null_bits_len"=>1, "raw_columns_before_image"=>[255], "raw_used_columns"=>[255], "raw_row"=>[252, 6, 0, 0, 0, 1, 0, 97, 252, 6, 0, 0, 0, 1, 0, 100, 252, 6, 0, 0, 0, 1, 0, 98, 252, 6, 0, 0, 0, 1, 0, 100, 252, 6, 0, 0, 0, 1, 0, 99, 252, 6, 0, 0, 0, 1, 0, 100], "rows"=>[[["1", "føø"], ["1", "∑ø∑"]],[["3", "høgé"], ["3", "fügå"]]]}
EOT
  TEST_EVENT_THREE_BYTE_UPDATE = <<EOT
{"marker"=>0, "timestamp"=>#{TEST_TIMESTAMP}, "type_code"=>24, "server_id"=>1, "event_length"=>78, "next_position"=>2606, "flags"=>1, "event_type"=>"Update_rows", "table_id"=>163, "db_name"=>"#{TEST_DB}", "table_name"=>"#{TEST_TABLE}", "columns"=>["LONG", "BLOB"], "columns_len"=>2, "null_bits_len"=>1, "raw_columns_before_image"=>[255], "raw_used_columns"=>[255], "raw_row"=>[252, 6, 0, 0, 0, 1, 0, 97, 252, 6, 0, 0, 0, 1, 0, 100, 252, 6, 0, 0, 0, 1, 0, 98, 252, 6, 0, 0, 0, 1, 0, 100, 252, 6, 0, 0, 0, 1, 0, 99, 252, 6, 0, 0, 0, 1, 0, 100], "rows"=>[[["1", "富无无"], ["1", "很兴奋"]],[["3", "切实切实"], ["3", "興奮虎"]]]}
EOT
  # - 19 DELETE_ROWS_EVENT
  TEST_EVENT_DELETE = <<EOT
{"marker"=>0, "timestamp"=>#{TEST_TIMESTAMP}, "type_code"=>25, "server_id"=>1, "event_length"=>41, "next_position"=>5365, "flags"=>1, "event_type"=>"Delete_rows", "table_id"=>170, "db_name"=>"#{TEST_DB}", "table_name"=>"#{TEST_TABLE}", "columns"=>["LONG", "BLOB"], "columns_len"=>2, "null_bits_len"=>1, "raw_columns_before_image"=>[], "raw_used_columns"=>[255], "raw_row"=>[252, 2, 0, 0, 0, 5, 0, 104, 111, 104, 111, 103], "rows"=>[["2", "var"],["3","hoge"]]}
EOT
  TEST_EVENT_TWO_BYTE_DELETE = <<EOT
{"marker"=>0, "timestamp"=>#{TEST_TIMESTAMP}, "type_code"=>25, "server_id"=>1, "event_length"=>41, "next_position"=>5365, "flags"=>1, "event_type"=>"Delete_rows", "table_id"=>170, "db_name"=>"#{TEST_DB}", "table_name"=>"#{TEST_TABLE}", "columns"=>["LONG", "BLOB"], "columns_len"=>2, "null_bits_len"=>1, "raw_columns_before_image"=>[], "raw_used_columns"=>[255], "raw_row"=>[252, 2, 0, 0, 0, 5, 0, 104, 111, 104, 111, 103], "rows"=>[["2", "vår"],["3","høgé"]]}
EOT
  TEST_EVENT_THREE_BYTE_DELETE = <<EOT
{"marker"=>0, "timestamp"=>#{TEST_TIMESTAMP}, "type_code"=>25, "server_id"=>1, "event_length"=>41, "next_position"=>5365, "flags"=>1, "event_type"=>"Delete_rows", "table_id"=>170, "db_name"=>"#{TEST_DB}", "table_name"=>"#{TEST_TABLE}", "columns"=>["LONG", "BLOB"], "columns_len"=>2, "null_bits_len"=>1, "raw_columns_before_image"=>[], "raw_used_columns"=>[255], "raw_row"=>[252, 2, 0, 0, 0, 5, 0, 104, 111, 104, 111, 103], "rows"=>[["2", "易变的"],["3","切实切实"]]}
EOT
  # Unsupported event
  # - 02 QUERY_EVENT
  # QUERY: BEGIN
  TEST_EVENT_QUERY_BEGIN = <<EOT
{"marker"=>0, "timestamp"=>#{TEST_TIMESTAMP}, "type_code"=>2, "server_id"=>1, "event_length"=>73, "next_position"=>2472, "flags"=>8, "event_type"=>"Query", "thread_id"=>71, "exec_time"=>0, "error_code"=>0, "variables"=>[0, 0, 0, 0, 0, 1, 0, 0, 0, 0, 0, 0, 0, 0, 6, 3, 115, 116, 100, 4, 33, 0, 33, 0, 8, 0], "db_name"=>"#{TEST_DB}", "query"=>"BEGIN"}
EOT
  # QUERY: create database
  TEST_EVENT_QUERY_CREATE_DATABSE = <<EOT
{"marker"=>0, "timestamp"=>1389309478, "type_code"=>2, "server_id"=>1, "event_length"=>89, "next_position"=>196, "flags"=>8, "event_type"=>"Query", "thread_id"=>39, "exec_time"=>0, "error_code"=>0, "variables"=>[0, 0, 0, 0, 0, 1, 0, 0, 0, 0, 0, 0, 0, 0, 6, 3, 115, 116, 100, 4, 33, 0, 33, 0, 8, 0], "db_name"=>"test_db", "query"=>"create database test_db"}
EOT
  # QUERY: create table
  TEST_EVENT_QUERY_CREATE_TABLE = <<EOT
{"marker"=>0, "timestamp"=>#{TEST_TIMESTAMP}, "type_code"=>2, "server_id"=>1, "event_length"=>121, "next_position"=>317, "flags"=>0, "event_type"=>"Query", "thread_id"=>42, "exec_time"=>0, "error_code"=>0, "variables"=>[0, 0, 0, 0, 0, 1, 0, 0, 0, 0, 0, 0, 0, 0, 6, 3, 115, 116, 100, 4, 33, 0, 33, 0, 8, 0], "db_name"=>"test_db", "query"=>"create table test_table(id int primary key, value text)"}
EOT
  # QUERY: drop table
  TEST_EVENT_QUERY_DROP_TABLE = <<EOT
{"marker"=>0, "timestamp"=>#{TEST_TIMESTAMP}, "type_code"=>2, "server_id"=>1, "event_length"=>115, "next_position"=>568, "flags"=>0, "event_type"=>"Query", "thread_id"=>42, "exec_time"=>0, "error_code"=>0, "variables"=>[0, 0, 0, 0, 0, 1, 0, 0, 0, 0, 0, 0, 0, 0, 6, 3, 115, 116, 100, 4, 33, 0, 33, 0, 8, 0], "db_name"=>"test_db", "query"=>"DROP TABLE `test_table` /* generated by server */"}
EOT
  # QUERY: alter table add column
  TEST_EVENT_QUERY_ALTER_TABLE_ADD_COLUMN = <<EOT
{"marker"=>0, "timestamp"=>#{TEST_TIMESTAMP}, "type_code"=>2, "server_id"=>1, "event_length"=>111, "next_position"=>800, "flags"=>0, "event_type"=>"Query", "thread_id"=>42, "exec_time"=>0, "error_code"=>0, "variables"=>[0, 0, 0, 0, 0, 1, 0, 0, 0, 0, 0, 0, 0, 0, 6, 3, 115, 116, 100, 4, 33, 0, 33, 0, 8, 0], "db_name"=>"test_db", "query"=>"alter table test_table add column sum integer"}
EOT
  # QUERY: alter table drop column
  TEST_EVENT_QUERY_ALTER_TABLE_DROP_COLUMN = <<EOT
{"marker"=>0, "timestamp"=>#{TEST_TIMESTAMP}, "type_code"=>2, "server_id"=>1, "event_length"=>104, "next_position"=>904, "flags"=>0, "event_type"=>"Query", "thread_id"=>42, "exec_time"=>0, "error_code"=>0, "variables"=>[0, 0, 0, 0, 0, 1, 0, 0, 0, 0, 0, 0, 0, 0, 6, 3, 115, 116, 100, 4, 33, 0, 33, 0, 8, 0], "db_name"=>"test_db", "query"=>"alter table test_table drop column sum"}
EOT
  # QUERY: alter table modify column
  TEST_EVENT_QUERY_ALTER_TABLE_MODIFY_COLUMN = <<EOT
{"marker"=>0, "timestamp"=>#{TEST_TIMESTAMP}, "type_code"=>2, "server_id"=>1, "event_length"=>112, "next_position"=>1352, "flags"=>0, "event_type"=>"Query", "thread_id"=>48, "exec_time"=>0, "error_code"=>0, "variables"=>[0, 0, 0, 0, 0, 1, 0, 0, 0, 0, 0, 0, 0, 0, 6, 3, 115, 116, 100, 4, 33, 0, 33, 0, 8, 0], "db_name"=>"test_db", "query"=>"alter table test_table modify column sum float"}
EOT
  # - 03 STOP_EVENT
  TEST_EVENT_STOP = <<EOT
{"marker"=>0, "timestamp"=>#{TEST_TIMESTAMP}, "type_code"=>3, "server_id"=>1, "event_length"=>19, "next_position"=>2929, "flags"=>0, "event_type"=>"Stop"}
EOT
  # - 04 ROTATE_EVENT
  TEST_EVENT_ROTATE = <<EOT
{"marker"=>0, "timestamp"=>0, "type_code"=>4, "server_id"=>1, "event_length"=>43, "next_position"=>0, "flags"=>32, "event_type"=>"Rotate", "binlog_file"=>"mysql-bin.000048", "binlog_pos"=>2883}
EOT
  # - 0f FORMAT_DESCRIPTION_EVENT
  TEST_EVENT_FORMAT_DESC = <<EOT
{"marker"=>0, "timestamp"=>1389292075, "type_code"=>15, "server_id"=>1, "event_length"=>103, "next_position"=>0, "flags"=>0, "event_type"=>"Format_desc", "binlog_version"=>0, "created_ts"=>2560145104, "log_header_len"=>71}
EOT
  # - 10 XID_EVENT
  TEST_EVENT_XID = <<EOT
{"marker"=>0, "timestamp"=>#{TEST_TIMESTAMP}, "type_code"=>16, "server_id"=>1, "event_length"=>27, "next_position"=>2633, "flags"=>0, "event_type"=>"Xid", "xid_id"=>129345}
EOT

  # - 13 TABLE_MAP_EVENT
  TEST_EVENT_TABLE_MAP = <<EOT
{"marker"=>0, "timestamp"=>#{TEST_TIMESTAMP}, "type_code"=>19, "server_id"=>1, "event_length"=>56, "next_position"=>2528, "flags"=>1, "event_type"=>"Table_map", "table_id"=>163, "db_name"=>"#{TEST_DB}", "table_name"=>"#{TEST_TABLE}", "raw_columns"=>[3, 252], "columns"=>["LONG", "BLOB"], "metadata"=>[2], "null_bits"=>[3]}
EOT
  # la
  TEST_EVENT_INCIDENT = <<EOT
{"marker"=>0, "timestamp"=>0, "type_code"=>26, "server_id"=>1, "event_length"=>40, "next_position"=>2883, "flags"=>32, "event_type"=>"Incident", "incident_type"=>175, "message"=>"Operation canceled"}
EOT
  # - 04 ROTATE_EVENT
  TEST_EVENT_ROTATE_MISSING_BINLOG_FILE = <<EOT
{"marker"=>0, "timestamp"=>0, "type_code"=>4, "server_id"=>1, "event_length"=>43, "next_position"=>0, "flags"=>32, "event_type"=>"Rotate", "binlog_pos"=>2883}
EOT

  describe MysqlBinlogFlydataInput do
    def create_event(event_str)
      event = eval(event_str)
      allow(event).to receive(:event_type).and_return(event["event_type"])
      event
    end

    def expect_emitted_records(event, records)
      records = [records] unless records.kind_of?(Array)
      records.each do |r|
        expect(Engine).to receive(:emit).with(TEST_TAG, TEST_TIMESTAMP, r).ordered
      end
      plugin.event_listener(event)
    end

    def expect_no_emitted_record(event)
      expect(Engine).to receive(:emit).never
      plugin.event_listener(event)
    end

    def expect_emitted_records_with_rows(event, type, table, position, binlog_file, rows, start_seq = nil)
      rows = [rows] unless rows.kind_of?(Array)
      @seq ||= TEST_SEQUENCE_NUM
      records = rows.collect do |row|
        @seq += 1
        { :type=>type, :table_name=>table, :respect_order=>true, :seq=>@seq,
          :src_pos=>"#{binlog_file}\t#{position}", :table_rev=>1, :row=>row }
      end
      expect_emitted_records(event, records)
    end

    let(:plugin) { MysqlBinlogFlydataInput.new }
    let(:insert_event) { create_event(TEST_EVENT_INSERT) }
    let(:insert_two_byte_event) { create_event(TEST_EVENT_TWO_BYTE_INSERT) }
    let(:insert_three_byte_event) { create_event(TEST_EVENT_THREE_BYTE_INSERT) }
    let(:delete_event) { create_event(TEST_EVENT_DELETE) }
    let(:delete_two_byte_event) { create_event(TEST_EVENT_TWO_BYTE_DELETE) }
    let(:delete_three_byte_event) { create_event(TEST_EVENT_THREE_BYTE_DELETE) }
    let(:update_event) { create_event(TEST_EVENT_UPDATE) }
    let(:update_two_byte_event) { create_event(TEST_EVENT_TWO_BYTE_UPDATE) }
    let(:update_three_byte_event) { create_event(TEST_EVENT_THREE_BYTE_UPDATE) }
    let(:alter_table_add_column_event) { create_event(TEST_EVENT_QUERY_ALTER_TABLE_ADD_COLUMN) }
    let(:alter_table_drop_column_event) { create_event(TEST_EVENT_QUERY_ALTER_TABLE_DROP_COLUMN) }

    let(:query_event) { create_event(TEST_EVENT_QUERY_CREATE_DATABSE) }
    let(:table_map_event) { create_event(TEST_EVENT_TABLE_MAP) }
    let(:xid_event) { create_event(TEST_EVENT_XID) }
    let(:rotate_event){ create_event(TEST_EVENT_ROTATE) }
    let(:rotate_event_corrupt){ create_event(TEST_EVENT_ROTATE_MISSING_BINLOG_FILE) }

    let(:now) { Time.now }

    let(:binlog_position_file) do
      f = double('binlog_position_file')
      allow(f).to receive(:exists?).and_return(true)
      allow(f).to receive(:read).and_return('test_position')
      f
    end

    def create_file(file_path, content)
      File.open(file_path, 'w') {|f| f.write(content)}
    end

    def delete_file(file_path)
      FileUtils.rm(file_path) if File.exists?(file_path)
    end

    def setup_initial_flydata_files
      %w(positions dump conf).each{|f| FileUtils.mkdir_p(File.join(FLYDATA_HOME, f))}
      create_file(TEST_SEQUENCE_FILE, TEST_SEQUENCE_NUM.to_s)
    end

    def cleanup_flydata_files
      %w(positions dump conf).each{|f| FileUtils.rm_rf(File.join(FLYDATA_HOME, f))}
      delete_file(TEST_SEQUENCE_FILE)
    end

    before do
      cleanup_flydata_files
      setup_initial_flydata_files
      allow(MysqlBinlogInput::BinlogUtil).to receive(:to_hash) {|e| e}
      allow(Mysql::BinLogPositionFile).to receive(:new).with(TEST_POSITION_FILE).and_return(binlog_position_file)
      Timecop.freeze(now)
    end

    after do
      cleanup_flydata_files
      Timecop.return
    end

    describe '#event_listener' do

      context 'when received rotate event' do
        before do
          Test.configure_plugin(plugin, TEST_CONFIG)
          plugin.event_listener(rotate_event)
        end

        context 'when received insert event' do
          it do
            expect_emitted_records_with_rows(insert_event, :insert, TEST_TABLE, 628, "mysql-bin.000048",
              [{"1"=>"0SL00000001", "2"=>"foo"}, {"1"=>"0SL00000002", "2"=>"var"}, {"1"=>"0SL00000003", "2"=>"hoge"}])
          end
        end

        context 'when received insert event containing two byte UTF8 chars' do
          it do
            expect_emitted_records_with_rows(insert_two_byte_event, :insert, TEST_TABLE, 628, "mysql-bin.000048",
              [{"1"=>"0SL00000001", "2"=>"føø"}, {"1"=>"0SL00000002", "2"=>"vår"}, {"1"=>"0SL00000003", "2"=>"høgé"}])
          end
        end

        context 'when received insert event containing three byte UTF8 chars' do
          it do
            expect_emitted_records_with_rows(insert_three_byte_event, :insert, TEST_TABLE, 628, "mysql-bin.000048",
              [{"1"=>"0SL00000001", "2"=>"富无无"}, {"1"=>"0SL00000002", "2"=>"易变的"}, {"1"=>"0SL00000003", "2"=>"切实切实"}])
          end
        end

        context 'when received delete event' do
          it do
            expect_emitted_records_with_rows(delete_event, :delete, TEST_TABLE, 5324, "mysql-bin.000048",
              [{"1"=>"0SL00000002", "2"=>"var"}, {"1"=>"0SL00000003", "2"=>"hoge"}])
          end
        end

        context 'when received delete event containing two byte UTF8 chars' do
          it do
            expect_emitted_records_with_rows(delete_two_byte_event, :delete, TEST_TABLE, 5324, "mysql-bin.000048",
              [{"1"=>"0SL00000002", "2"=>"vår"}, {"1"=>"0SL00000003", "2"=>"høgé"}])
          end
        end

        context 'when received delete event with containing byte UTF8 chars' do
          it do
            expect_emitted_records_with_rows(delete_three_byte_event, :delete, TEST_TABLE, 5324, "mysql-bin.000048",
              [{"1"=>"0SL00000002", "2"=>"易变的"}, {"1"=>"0SL00000003", "2"=>"切实切实"}])
          end
        end

        context 'when received update event' do
          it do
            expect_emitted_records_with_rows(update_event, :update, TEST_TABLE, 2528, "mysql-bin.000048",
              [{"1"=>"0SL00000001", "2"=>"wow"}, {"1"=>"0SL00000003", "2"=>"fuga"}])
          end
        end

        context 'when received update event with two byte utf8 chars' do
          it do
            expect_emitted_records_with_rows(update_two_byte_event, :update, TEST_TABLE, 2528, "mysql-bin.000048",
              [{"1"=>"0SL00000001", "2"=>"∑ø∑"}, {"1"=>"0SL00000003", "2"=>"fügå"}])
          end
        end

        context 'when received update event with three byte utf8 chars' do
          it do
            expect_emitted_records_with_rows(update_three_byte_event, :update, TEST_TABLE, 2528, "mysql-bin.000048",
              [{"1"=>"0SL00000001", "2"=>"很兴奋"}, {"1"=>"0SL00000003", "2"=>"興奮虎"}])
          end
        end

        context 'when received alter table add column event' do
          it do
# TODO Replace the expectation when enabling alter table
            expect_no_emitted_record(alter_table_add_column_event)
=begin
            expect_emitted_records(alter_table_add_column_event, {
              type: :alter_table,
              table_name: "test_table",
              respect_order: true,
              src_pos: "mysql-bin.000048\t689",
              table_rev: 2,  # increment revision
              seq: 2,
              actions: [{
                action: :add_column, column: "sum", :type=>'int4'}],
            })
=end
          end
        end

        context 'when received alter table drop column event' do
          it do
# TODO Replace the expectation when enabling alter table
            expect_no_emitted_record(alter_table_drop_column_event)
=begin
            expect_emitted_records(alter_table_drop_column_event, {
              type: :alter_table,
              table_name: "test_table",
              respect_order: true,
              src_pos: "mysql-bin.000048\t800",
              table_rev: 2,  # increment revision
              seq: 2,
              actions: [{
                action: :drop_column, column: "sum"}],
            })
=end
          end
        end

        context 'when received event with another database name' do
          it do
            event = insert_event
            event['db_name'] = 'another_db'
            expect_no_emitted_record(event)
          end
        end

        context 'when received event with unsupported table name' do
          it do
            event = insert_event
            event['table_name'] = 'another_table'
            expect_no_emitted_record(event)
          end
        end

        context 'when received unsupported event' do
          it do
            expect_no_emitted_record(query_event)
            expect_no_emitted_record(table_map_event)
            expect_no_emitted_record(xid_event)
          end
        end
      end

      context 'when rotate event is not received' do
        before do
          Test.configure_plugin(plugin, TEST_CONFIG)
        end

        it 'logs a warning and emits FET with a blank binlog file name, when it receives an insert event' do
          expect($log).to receive(:warn).with("Binlog file name is empty. Rotate event not received!").once
          expect_emitted_records_with_rows(insert_event, :insert, TEST_TABLE, 628, "",
            [{"1"=>"0SL00000001", "2"=>"foo"}, {"1"=>"0SL00000002", "2"=>"var"}, {"1"=>"0SL00000003", "2"=>"hoge"}])
        end

        it 'logs a warning and emits FET with a blank binlog file name, when it receives an update event'  do
          expect($log).to receive(:warn).with("Binlog file name is empty. Rotate event not received!").once
          expect_emitted_records_with_rows(update_event, :update, TEST_TABLE, 2528, "",
            [{"1"=>"0SL00000001", "2"=>"wow"}, {"1"=>"0SL00000003", "2"=>"fuga"}])
        end

        it 'logs a warning emits FET with a blank binlog file name, when it receives a delete event' do
          expect($log).to receive(:warn).with("Binlog file name is empty. Rotate event not received!").once
          expect_emitted_records_with_rows(delete_event, :delete, TEST_TABLE, 5324, "",
            [{"1"=>"0SL00000002", "2"=>"var"}, {"1"=>"0SL00000003", "2"=>"hoge"}])
        end

        it 'logs warning once when it receives consecutive events' do
          expect($log).to receive(:warn).with("Binlog file name is empty. Rotate event not received!").once
          expect_emitted_records_with_rows(insert_event, :insert, TEST_TABLE, 628, "",
            [{"1"=>"0SL00000001", "2"=>"foo"}, {"1"=>"0SL00000002", "2"=>"var"}, {"1"=>"0SL00000003", "2"=>"hoge"}])
          expect_emitted_records_with_rows(update_event, :update, TEST_TABLE, 2528, "",
            [{"1"=>"0SL00000001", "2"=>"wow"}, {"1"=>"0SL00000003", "2"=>"fuga"}])
          expect_emitted_records_with_rows(insert_event, :insert, TEST_TABLE, 628, "",
            [{"1"=>"0SL00000001", "2"=>"foo"}, {"1"=>"0SL00000002", "2"=>"var"}, {"1"=>"0SL00000003", "2"=>"hoge"}])
          expect_emitted_records_with_rows(delete_event, :delete, TEST_TABLE, 5324, "",
            [{"1"=>"0SL00000002", "2"=>"var"}, {"1"=>"0SL00000003", "2"=>"hoge"}])
        end
      end

      context 'when received rotate event with missing binlog file' do
        before do
          Test.configure_plugin(plugin, TEST_CONFIG)
          plugin.event_listener(rotate_event_corrupt)
        end

        it 'logs a warning and emits FET with a blank binlog file name, when it receives an insert event' do
          expect($log).to receive(:warn).with("Binlog file name is empty. Rotate event not received!").once
          expect_emitted_records_with_rows(insert_event, :insert, TEST_TABLE, 628, "",
            [{"1"=>"0SL00000001", "2"=>"foo"}, {"1"=>"0SL00000002", "2"=>"var"}, {"1"=>"0SL00000003", "2"=>"hoge"}])
        end
      end

      context 'when per-table binlog file is present' do
        before do
          create_file(TEST_TABLE_BINLOG_POS, "mysql-bin.000048\t1010")
          Test.configure_plugin(plugin, TEST_CONFIG)
          plugin.event_listener(rotate_event)
        end

        it  'emits no record when it receives an event whose binlog position is less than per-table binlog position' do
          expect_no_emitted_record(insert_event)
        end

        it 'deletes the per-table-binlog file (if there is one) as soon as a record with bigger binlog position is received' do
          event = insert_event
          event['next_position'] = 1250
          expect_emitted_records_with_rows(event, :insert, TEST_TABLE, 1211, "mysql-bin.000048",
            [{"1"=>"0SL00000001", "2"=>"foo"}, {"1"=>"0SL00000002", "2"=>"var"}, {"1"=>"0SL00000003", "2"=>"hoge"}])
          expect(File.exists?(TEST_TABLE_BINLOG_POS)).to be(false)
        end

        it  'emits no record when it receives an event whose binlog position is equal to per-table binlog position' do
          event = insert_event
          event['next_position'] = 1049
          expect_no_emitted_record(event)
        end
      end
    end
  end

end