Sha256: 13b1260484f2828f067eeea43b056d6a90c5edf510751f1b723ef375af36ce02

Contents?: true

Size: 1.8 KB

Versions: 11

Compression:

Stored size: 1.8 KB

Contents

require 'flydata/fluent-plugins/mysql/binlog_query_handler'

module Mysql

class DdlQueryHandler < BinlogQueryHandler
  DDL_TABLE_QUERY = /^(?:(?:ALTER|CREATE|DROP|RENAME) +(?:\w+ +)*TABLE +([^ ]+)|TRUNCATE +(?:TABLE +)?([^ ;]+))/i

  def emit_record(type, record)
    # ddl event record doesn't have "table_name"
    record['table_name'] = table_info(record)[:table_name]
    super do |opt|
      yield opt
    end
  end

  def acceptable_db?(record)
    supported_database == table_info(record)[:db_name]
  end

  def table_info(record)
    table_info = { db_name: record["db_name"], table_name: nil }
    if DDL_TABLE_QUERY =~ record["normalized_query"]
      table_name_in_query = ($1 ? $1 : $2).tr("`", "")

      if (idx = table_name_in_query.index("."))
        table_info[:db_name] = table_name_in_query[0...idx]
        table_info[:table_name] = table_name_in_query[idx+1..-1]
      else
        table_info[:table_name] = table_name_in_query
      end
    end
    table_info
  end
end

class TableDdlQueryHandler < DdlQueryHandler
end

class DatabaseDdlQueryHandler < DdlQueryHandler
  def emit_record(type, record)
    return unless acceptable_db?(record)

    check_empty_binlog

    opt = {}
    records = yield(opt) # The block may set options as necessary
    return if records.nil? # skip
    records = [records] unless records.kind_of?(Array)

    database = records.first[DB_NAME] || record['db_name']

    return unless acceptable_event?(type)

    # Add common information to each record
    records.each do |r|
      r[TYPE] = type
      r[RESPECT_ORDER] = true
      r[SRC_POS] = binlog_pos(record)
      r[V] = FlydataCore::Record::V2
    end

    # Use binlog's timestamp
    timestamp = record["timestamp"].to_i
    records.each do |row|
      Fluent::Engine.emit(@context.tag, timestamp, row)
    end
  end
end

end

Version data entries

11 entries across 11 versions & 1 rubygems

Version Path
flydata-0.6.11 lib/flydata/fluent-plugins/mysql/ddl_query_handler.rb
flydata-0.6.10 lib/flydata/fluent-plugins/mysql/ddl_query_handler.rb
flydata-0.6.9 lib/flydata/fluent-plugins/mysql/ddl_query_handler.rb
flydata-0.6.8 lib/flydata/fluent-plugins/mysql/ddl_query_handler.rb
flydata-0.6.7 lib/flydata/fluent-plugins/mysql/ddl_query_handler.rb
flydata-0.6.6 lib/flydata/fluent-plugins/mysql/ddl_query_handler.rb
flydata-0.6.5 lib/flydata/fluent-plugins/mysql/ddl_query_handler.rb
flydata-0.6.4 lib/flydata/fluent-plugins/mysql/ddl_query_handler.rb
flydata-0.6.3 lib/flydata/fluent-plugins/mysql/ddl_query_handler.rb
flydata-0.6.2 lib/flydata/fluent-plugins/mysql/ddl_query_handler.rb
flydata-0.6.1 lib/flydata/fluent-plugins/mysql/ddl_query_handler.rb