require 'fluent/plugin/in_mysql_binlog' require 'binlog' require 'flydata-core/record/record' require 'flydata-core/mysql/binlog_pos' module Mysql class BinlogRecordHandler TABLE_NAME = :table_name # A Flydata JSON tag to specify a table name TYPE = :type SEQ = :seq RESPECT_ORDER = :respect_order SRC_POS = :src_pos TABLE_REV = :table_rev V = :v # FlyData record format version def initialize(context) @context = context @first_empty_binlog = true @table_meta = @context.table_meta # Load per-table binlog position @table_binlog_pos = {} @context.tables.each do |table_name| table_binlog_pos_str = @context.sync_fm.get_table_source_raw_pos(table_name) if table_binlog_pos_str @table_binlog_pos[table_name] = FlydataCore::Mysql::BinlogPos.new(table_binlog_pos_str) end end end private def binlog_pos(record) "#{@context.current_binlog_file}\t#{record['next_position'] - record['event_length']}" end def supported_database @context.database end def acceptable_db?(record) supported_database == record["db_name"] end def acceptable_table?(record, table) acceptable = @context.tables.include?(table) if acceptable and @table_binlog_pos[record['table_name']] if @table_binlog_pos[record['table_name']] >= FlydataCore::Mysql::BinlogPos.new(binlog_pos(record)) acceptable = false else @context.sync_fm.delete_table_source_pos(record['table_name']) @table_binlog_pos.delete(record['table_name']) end end acceptable end def acceptable_event?(type, table) @context.omit_events[table].nil? || !@context.omit_events[table].include?(type) end def emit_record(type, record) return unless acceptable_db?(record) return unless record["table_name"].nil? or acceptable_table?(record, record["table_name"]) check_empty_binlog opt = {} records = yield(opt) # The block may set options as necessary return if records.nil? # skip records = [records] unless records.kind_of?(Array) table = records.first[TABLE_NAME] || record['table_name'] raise "Missing table name. #{record}" if table.to_s.empty? return unless acceptable_table?(record, table) && acceptable_event?(type, table) table_rev = @context.table_revs[table] # Add common information to each record records.each do |r| if opt[:increment_table_rev] table_rev = @context.sync_fm.increment_table_rev(table, table_rev) @context.table_revs[table] = table_rev end r[TYPE] = type r[RESPECT_ORDER] = true r[TABLE_NAME] = table r[SRC_POS] = binlog_pos(record) r[TABLE_REV] = table_rev r[V] = FlydataCore::Record::V2 end # Use binlog's timestamp timestamp = record["timestamp"].to_i records.each do |row| @context.sync_fm.increment_and_save_table_position(row[TABLE_NAME]) do |seq| row[SEQ] = seq Fluent::Engine.emit(@context.tag, timestamp, row) end end end def check_empty_binlog #Log one warning per consecutive records that have empty binlog filename if @context.current_binlog_file.to_s.empty? if @first_empty_binlog $log.warn "Binlog file name is empty. Rotate event not received!" @first_empty_binlog = false end else @first_empty_binlog = true end end end end