require 'fluent/plugin/in_mysql_binlog' require 'binlog' require 'flydata-core/mysql/binlog_pos' require 'flydata/plugin_support/sync_record_emittable' module Flydata module SourceMysql module PluginSupport class BinlogRecordHandler include Flydata::PluginSupport::SyncRecordEmittable def initialize(context) @context = context @first_empty_binlog = true @table_meta = @context.table_meta # Load per-table binlog position @table_binlog_pos = @context.table_binlog_pos end private def binlog_pos(record) "#{@context.cur_src_pos_file}\t#{record['next_position'] - record['event_length']}" end def supported_database @context.database end def acceptable_db?(record) supported_database == record["db_name"] end def acceptable_table?(record, table) acceptable = @context.tables.include?(table) if acceptable and @table_binlog_pos[record['table_name']] if @table_binlog_pos[record['table_name']] >= FlydataCore::Mysql::BinlogPos.new(binlog_pos(record)) acceptable = false else @context.sync_fm.delete_table_source_pos(record['table_name']) @table_binlog_pos.delete(record['table_name']) end end acceptable end def acceptable_event?(type, table) @context.omit_events[table].nil? || !@context.omit_events[table].include?(type) end def emit_record(type, record) return unless acceptable_db?(record) return unless record["table_name"].nil? or acceptable_table?(record, record["table_name"]) check_empty_binlog opt = {} records = yield(opt) # The block may set options as necessary return if records.nil? # skip records = [records] unless records.kind_of?(Array) table = records.first[:table_name] || record['table_name'] raise "Missing table name. #{record}" if table.to_s.empty? return unless acceptable_table?(record, table) && acceptable_event?(type, table) emit_sync_records(records, opt.merge( timestamp: record["timestamp"].to_i, type: type, table: table, src_pos: binlog_pos(record))) end def check_empty_binlog #Log one warning per consecutive records that have empty binlog filename if @context.cur_src_pos_file.to_s.empty? if @first_empty_binlog $log.warn "Binlog file name is empty. Rotate event not received!" @first_empty_binlog = false end else @first_empty_binlog = true end end end end end end