require 'fluent/plugin/in_mysql_binlog' require 'binlog' require_relative 'binlog_position' module Mysql class BinlogRecordHandler TABLE_NAME = :table_name # A Flydata JSON tag to specify a table name TYPE = :type SEQ = :seq RESPECT_ORDER = :respect_order SRC_POS = :src_pos TABLE_REV = :table_rev def initialize(context) @context = context @first_empty_binlog = true # Load per-table binlog position @table_binlog_pos = {} @context.tables.each do |table_name| table_binlog_content = @context.sync_fm.get_table_binlog_pos(table_name) if table_binlog_content @table_binlog_pos[table_name] = BinLogPosition.new(table_binlog_content) end end end private def supported_database @context.database end def acceptable_db?(record) supported_database == record["db_name"] end def acceptable_table?(record, table) acceptable = @context.tables.include?(table) if acceptable and @table_binlog_pos[record['table_name']] if @table_binlog_pos[record['table_name']] >= BinLogPosition.new( "#{@context.current_binlog_file}\t#{record['next_position'] - record['event_length']}") acceptable = false else @context.sync_fm.delete_table_binlog_pos(record['table_name']) @table_binlog_pos.delete(record['table_name']) end end acceptable end def emit_record(type, record, opt = {}) return unless acceptable_db?(record) return unless record["table_name"].nil? or acceptable_table?(record, record["table_name"]) check_empty_binlog records = yield return if records.nil? # skip records = [records] unless records.kind_of?(Array) table = records.first[TABLE_NAME] || record['table_name'] raise "Missing table name. #{record}" if table.to_s.empty? return unless acceptable_table?(record, table) table_rev = @context.sync_fm.table_rev(table) position = record['next_position'] - record['event_length'] # Add common information to each record records.each do |r| if opt[:increment_table_rev] table_rev = @context.sync_fm.increment_table_rev(table, table_rev) end r[TYPE] = type r[RESPECT_ORDER] = true r[TABLE_NAME] = table r[SRC_POS] = "#{@context.current_binlog_file}\t#{position}" r[TABLE_REV] = table_rev end # Use binlog's timestamp timestamp = record["timestamp"].to_i records.each do |row| @context.sync_fm.increment_and_save_table_position(row[TABLE_NAME]) do |seq| row[SEQ] = seq Fluent::Engine.emit(@context.tag, timestamp, row) end end end def check_empty_binlog #Log one warning per consecutive records that have empty binlog filename if @context.current_binlog_file.to_s.empty? if @first_empty_binlog $log.warn "Binlog file name is empty. Rotate event not received!" @first_empty_binlog = false end else @first_empty_binlog = true end end end end