Sha256: 8f5a4c850d8fbee0b7fcdfdae953c9bbd10ff61f8e880c9274b7ce1511ae94af
Contents?: true
Size: 1.58 KB
Versions: 37
Compression:
Stored size: 1.58 KB
Contents
require 'flydata/source_mysql/plugin_support/binlog_query_handler' module Flydata module SourceMysql module PluginSupport class DdlQueryHandler < BinlogQueryHandler DDL_TABLE_QUERY = /^(?:(?:ALTER|CREATE|DROP|RENAME) +(?:\w+ +)*TABLE +([^ ]+)|TRUNCATE +(?:TABLE +)?([^ ;]+))/i def emit_record(type, record) # ddl event record doesn't have "table_name" record['table_name'] = table_info(record)[:table_name] super do |opt| yield opt end end def acceptable_db?(record) supported_database == table_info(record)[:db_name] end def table_info(record) table_info = { db_name: record["db_name"], table_name: nil } if DDL_TABLE_QUERY =~ record["normalized_query"] table_name_in_query = ($1 ? $1 : $2).tr("`", "") if (idx = table_name_in_query.index(".")) table_info[:db_name] = table_name_in_query[0...idx] table_info[:table_name] = table_name_in_query[idx+1..-1] else table_info[:table_name] = table_name_in_query end end table_info end end class TableDdlQueryHandler < DdlQueryHandler end class DatabaseDdlQueryHandler < DdlQueryHandler def emit_record(type, record) return unless acceptable_db?(record) check_empty_binlog opt = {} records = yield(opt) # The block may set options as necessary return if records.nil? # skip records = [records] unless records.kind_of?(Array) return unless acceptable_event?(type) emit_sync_records(records, opt.merge( timestamp: record["timestamp"].to_i, type: type, src_pos: binlog_pos(record))) end end end end end
Version data entries
37 entries across 37 versions & 1 rubygems