require 'fluent/plugin/in_mysql_binlog' require 'binlog' require 'flydata/mysql/binlog_position' require 'flydata-core/record/record' module Mysql class BinlogRecordHandler def initialize(context) @context = context @first_empty_binlog = true @table_meta = @context.table_meta # Load per-table binlog position @table_binlog_pos = {} @context.tables.each do |table_name| table_binlog_content = @context.sync_fm.get_table_binlog_pos(table_name) if table_binlog_content @table_binlog_pos[table_name] = Flydata::Mysql::BinLogPosition.new(table_binlog_content) end end end private def supported_database @context.database end def acceptable_db?(record) supported_database == record["db_name"] end def acceptable_table?(record, table) acceptable = @context.tables.include?(table) if acceptable and @table_binlog_pos[record['table_name']] if @table_binlog_pos[record['table_name']] >= Flydata::Mysql::BinLogPosition.new( "#{@context.current_binlog_file}\t#{record['next_position'] - record['event_length']}") acceptable = false else @context.sync_fm.delete_table_binlog_pos(record['table_name']) @table_binlog_pos.delete(record['table_name']) end end acceptable end def acceptable_event?(type, table) @context.omit_events[table].nil? || !@context.omit_events[table].include?(type) end def emit_record(type, record) return unless acceptable_db?(record) return if record["table_name"] and not acceptable_table?(record, record["table_name"]) check_empty_binlog opt = {} records = yield(opt) # The block may set options as necessary return if records.nil? # skip records = [records] unless records.kind_of?(Array) FlydataJsonHandlerFactory.create(records, record, @context, type, opt[:increment_table_rev]) end def check_empty_binlog #Log one warning per consecutive records that have empty binlog filename if @context.current_binlog_file.to_s.empty? if @first_empty_binlog $log.warn "Binlog file name is empty. Rotate event not received!" @first_empty_binlog = false end else @first_empty_binlog = true end end end class FlydataJsonHandlerFactory def self.create(records, record, context, type, increment_table_rev) table = records.first[:table_name] || record['table_name'] database = records.first[:db_name] || record['db_name'] case table when nil DatabaseFlydataJsonHandler.new(records, record, context, type) else TableFlydataJsonHandler.new(records, record, context, type, increment_table_rev) end end class FlydataJsonHandler TABLE_NAME = :table_name # A Flydata JSON tag to specify a table name TYPE = :type SEQ = :seq RESPECT_ORDER = :respect_order SRC_POS = :src_pos TABLE_REV = :table_rev V = :v # FlyData record format version DB_NAME = :db_name def initialize(records, record, context, type, increment_table_rev=nil) @context = context end def emit(timestamp, row) end end class TableFlydataJsonHandler < FlydataJsonHandler def initialize(records, record, context, type, increment_table_rev=nil) super table = records.first[TABLE_NAME] || record['table_name'] raise "Missing table name. #{record}" if table.to_s.empty? return unless acceptable_event?(type, table) table_rev = @context.table_revs[table] position = record['next_position'] - record['event_length'] # Add common information to each record records.each do |r| if increment_table_rev table_rev = @context.sync_fm.increment_table_rev(table, table_rev) @context.table_revs[table] = table_rev end r[TYPE] = type r[RESPECT_ORDER] = true r[TABLE_NAME] = table r[SRC_POS] = "#{@context.current_binlog_file}\t#{position}" r[TABLE_REV] = table_rev r[V] = FlydataCore::Record::V2 end # Use binlog's timestamp timestamp = record["timestamp"].to_i records.each do |row| emit(timestamp, row) end end def emit(timestamp, row) @context.sync_fm.increment_and_save_table_position(row[TABLE_NAME]) do |seq| row[SEQ] = seq Fluent::Engine.emit(@context.tag, timestamp, row) end end private def acceptable_event?(type, table) @context.omit_events[table].nil? || !@context.omit_events[table].include?(type) end end class DatabaseFlydataJsonHandler < FlydataJsonHandler def initialize(records, record, context, type, increment_table_rev=nil) super database = records.first[DB_NAME] || record['db_name'] return unless acceptable_event?(type) position = record['next_position'] - record['event_length'] # Add common information to each record records.each do |r| r[TYPE] = type r[RESPECT_ORDER] = true r[SRC_POS] = "#{@context.current_binlog_file}\t#{position}" r[V] = FlydataCore::Record::V2 end # Use binlog's timestamp timestamp = record["timestamp"].to_i records.each do |row| emit(timestamp, row) end end def emit(timestamp, row) Fluent::Engine.emit(@context.tag, timestamp, row) end private #TODO: No support on DS for database records currently so no record should be emitted def acceptable_event?(type) false end end end end