require 'fluent/plugin/in_mysql_binlog' require 'binlog' require 'flydata/mysql/binlog_position' require 'flydata-core/record/record' module Mysql class BinlogRecordHandler TABLE_NAME = :table_name # A Flydata JSON tag to specify a table name TYPE = :type SEQ = :seq RESPECT_ORDER = :respect_order SRC_POS = :src_pos TABLE_REV = :table_rev V = :v # FlyData record format version def initialize(context) @context = context @first_empty_binlog = true @table_meta = @context.table_meta # Load per-table binlog position @table_binlog_pos = {} @context.tables.each do |table_name| table_binlog_content = @context.sync_fm.get_table_binlog_pos(table_name) if table_binlog_content @table_binlog_pos[table_name] = Flydata::Mysql::BinLogPosition.new(table_binlog_content) end end end private def supported_database @context.database end def acceptable_db?(record) supported_database == record["db_name"] end def acceptable_table?(record, table) acceptable = @context.tables.include?(table) if acceptable and @table_binlog_pos[record['table_name']] if @table_binlog_pos[record['table_name']] >= Flydata::Mysql::BinLogPosition.new( "#{@context.current_binlog_file}\t#{record['next_position'] - record['event_length']}") acceptable = false else @context.sync_fm.delete_table_binlog_pos(record['table_name']) @table_binlog_pos.delete(record['table_name']) end end acceptable end def acceptable_event?(type, table) @context.omit_events[table].nil? || !@context.omit_events[table].include?(type) end def emit_record(type, record) return unless acceptable_db?(record) return unless record["table_name"].nil? or acceptable_table?(record, record["table_name"]) check_empty_binlog opt = {} records = yield(opt) # The block may set options as necessary return if records.nil? # skip records = [records] unless records.kind_of?(Array) table = records.first[TABLE_NAME] || record['table_name'] raise "Missing table name. #{record}" if table.to_s.empty? return unless acceptable_table?(record, table) && acceptable_event?(type, table) table_rev = @context.table_revs[table] position = record['next_position'] - record['event_length'] # Add common information to each record records.each do |r| if opt[:increment_table_rev] table_rev = @context.sync_fm.increment_table_rev(table, table_rev) @context.table_revs[table] = table_rev end r[TYPE] = type r[RESPECT_ORDER] = true r[TABLE_NAME] = table r[SRC_POS] = "#{@context.current_binlog_file}\t#{position}" r[TABLE_REV] = table_rev r[V] = FlydataCore::Record::V2 end # Use binlog's timestamp timestamp = record["timestamp"].to_i records.each do |row| @context.sync_fm.increment_and_save_table_position(row[TABLE_NAME]) do |seq| row[SEQ] = seq Fluent::Engine.emit(@context.tag, timestamp, row) end end end def check_empty_binlog #Log one warning per consecutive records that have empty binlog filename if @context.current_binlog_file.to_s.empty? if @first_empty_binlog $log.warn "Binlog file name is empty. Rotate event not received!" @first_empty_binlog = false end else @first_empty_binlog = true end end end end