require 'flydata/source_mysql/plugin_support/alter_table_query_handler' require 'flydata/source_mysql/plugin_support/truncate_table_query_handler' require 'flydata/source_mysql/plugin_support/drop_database_query_handler' module Flydata module SourceMysql module PluginSupport class BinlogQueryDispatcher def initialize @handlers = [] end def dispatch(record) normalize_query(record["query"]) do |query| @handlers.each do |handler| if (handler.pattern.match(query)) handler.process(record.merge({"normalized_query" => query})) break end end end end private PLACEMENT = '{}' def normalize_query(queries) q = queries.dup strings = [] # \/\*.*?\*\/ /* */ style comment # `.*?` `resource_name` # '(?:\\.|.)*?' 'string' # "(?:\\.|.)*?" "string" # --\s+.*?(?:\n|$) -- style comment # #\s+.*?(?:\n|$) # style comment q.gsub!(/(\/\*.*?\*\/|`.*?`|'(?:\\.|.)*?'|"(?:\\.|.)*?"|--\s+.*?(?:\n|$)|#.*?(?:\n|$))/m) do |m| comment_or_quoted = $& if comment_or_quoted.start_with?("/*") || comment_or_quoted.start_with?("--") || comment_or_quoted.start_with?("#") # comment. replace with a space ' ' else # string. save the original and replace with an temporary placement strings << comment_or_quoted PLACEMENT end end q = q.gsub(/\s+/, ' ') # replace multiple spaces with a space q.split(';').each do |query| query = query.lstrip query.gsub!(PLACEMENT) do |m| strings.shift end yield(query + ";") if block_given? && !query.empty? end end end class FlydataBinlogQueryDispatcher < BinlogQueryDispatcher def initialize(context) @handlers = [ AlterTableQueryHandler.new(context), TruncateTableQueryHandler.new(context), DropDatabaseQueryHandler.new(context), ] end end end end end