Sha256: 7cb136f555fb78f57b1d1b2f5c5ee4975fb737e862c0f14be481d1a3ffc1d02d
Contents?: true
Size: 1.85 KB
Versions: 12
Compression:
Stored size: 1.85 KB
Contents
require 'flydata/fluent-plugins/mysql/alter_table_query_handler' require 'flydata/fluent-plugins/mysql/truncate_table_query_handler' module Mysql class BinlogQueryDispatcher def initialize @handlers = [] end def dispatch(record) normalize_query(record["query"]) do |query| @handlers.each do |handler| if (handler.pattern.match(query)) handler.process(record.merge({"normalized_query" => query})) break end end end end private def normalize_query(queries) queries = strip_comments(queries) queries = queries.gsub(/\s+/, ' ') # replace multiple spaces with a space queries.split(';').each do |query| query = query.lstrip yield(query + ";") if block_given? && !query.empty? end end def strip_comments(query) q = query.dup # \/\*.*?\*\/ /* */ style comment # `.*?` `resource_name` # '(?:\\.|.)*?' 'string' # "(?:\\.|.)*?" "string" # --\s+.*?(?:\n|$) -- style comment # #\s+.*?(?:\n|$) # style comment query.scan(/(\/\*.*?\*\/|`.*?`|'(?:\\.|.)*?'|"(?:\\.|.)*?"|--\s+.*?(?:\n|$)|#.*?(?:\n|$))/m) do |m| comment_or_quoted = m.first if comment_or_quoted.start_with?("/*") || comment_or_quoted.start_with?("--") || comment_or_quoted.start_with?("#") # comment. replace with spaces of the same length idx_from = $~.offset(0)[0] idx_to = $~.offset(0)[1] len = idx_to - idx_from q[idx_from...idx_to] = ' ' * len end end q end end class FlydataBinlogQueryDispatcher < BinlogQueryDispatcher def initialize(context) @handlers = [ AlterTableQueryHandler.new(context), TruncateTableQueryHandler.new(context), ] end end end
Version data entries
12 entries across 12 versions & 1 rubygems