module Fluent require 'fluent/plugin/in_mysql_binlog' require 'binlog' require 'kodama' require File.dirname(__FILE__) + '/preference' require File.expand_path(File.join(File.dirname(__FILE__), '../../flydata')) require 'flydata/sync_file_manager' class MysqlBinlogFlydataInput < MysqlBinlogInput include MysqlBinlogFlydataInputPreference Plugin.register_input('mysql_binlog_flydata', self) def initialize super end config_param :database, :string config_param :tables, :string def configure(conf) super unless File.exists?(@position_file) raise "No position file(#{@position_file}). Initial synchronization is required before starting." end load_custom_conf $log.info "mysql host:\"#{@host}\" username:\"#{@username}\" database:\"#{@database}\" tables:\"#{@tables}\"" @tables = @tables.split(/,\s*/) @sync_fm = Flydata::FileUtil::SyncFileManager.new(nil) # Passing nil for data_entry as this class does not use methods which require data_entry @record_handler = FlydataMysqlBinlogRecordHandler.new( database: @database, tables: @tables, tag: @tag, sync_fm: @sync_fm) end def start super positions_path = @sync_fm.table_positions_dir_path Dir.mkdir positions_path unless File.exists? positions_path end def event_listener(event) begin @record_handler.dispatch(event) rescue Exception => e position = File.open(@position_file) {|f| f.read } $log.error "error occured while processing #{event.event_type} event at #{position}" $log.error e.message $log.error e.backtrace.join("\n") # Not reraising a StandardError because the underlying code can't handle an error well. raise unless e.kind_of?(StandardError) end end end class MysqlBinlogRecordHandler def dispatch(event) method_name = "on_#{event.event_type.downcase}" if self.respond_to?(method_name) # TODO to_hash method call below can fail if event.event_type is # "Update_rows". This seems to be a bug of ruby-binlog. The bug must # be fixed when we support record update. record = MysqlBinlogInput::BinlogUtil.to_hash(event) self.send(method_name, record) else # $log.trace "Unhandled type: #{record["event_type"]}" end end end class FlydataMysqlBinlogRecordHandler < MysqlBinlogRecordHandler TABLE_NAME = 'table_name' # A Flydata JSON tag to specify a table name TYPE = 'type' ROW = 'row' SEQ = 'seq' RESPECT_ORDER = 'respect_order' INTEGER_TYPES = {'TINY' => 1, 'SHORT' => 2, 'INT24' => 3, 'LONG' => 4, 'LONGLONG' => 8 } SIGNLESS_INTEGER_PREFIX = '0SL' def initialize(opts) mandatory_opts = [:database, :tables, :tag, :sync_fm] missing_opts = mandatory_opts - opts.keys unless (missing_opts.empty?) raise "Mandatory option(s) are missing: #{missing_opts.join(', ')}" end @database = opts[:database] @tables = opts[:tables] @tag = opts[:tag] @sync_fm = opts[:sync_fm] @query_handler = FlydataMysqlBinlogQueryHandler.new(record_handler: self) end def on_write_rows(record) emit_insert(record) end def on_update_rows(record) emit_update(record) end def on_delete_rows(record) emit_delete(record) end def on_query(record) @query_handler.dispatch(record) end def on_table_changed(table) $log.trace "Table #{table} has changed. Reloading the table column" end private def acceptable?(record) (@database == record["db_name"]) and @tables.include?(record["table_name"]) end def emit_insert(record) emit_record(:insert, record) end def emit_delete(record) emit_record(:delete, record) end def emit_update(record) emit_record(:update, record) do |row| row.last # For update, row has two arrays (old and new values) Use new values end end def emit_record(type, record) return unless acceptable?(record) table = record['table_name'] records = record["rows"].collect do |row| row = yield(row) if block_given? # Give the caller a chance to generate the correct row { TYPE => type, TABLE_NAME => table, RESPECT_ORDER => true, # Continuous sync needs record order to be kept ROW => row.each.with_index(1).inject({}) do |h, (v, i)| if v.kind_of?(String) v = v.encode('utf-16', :undef => :replace, :invalid => :replace).encode('utf-8') end h[i.to_s] = v h end } end encode_signless_integer(records, record["columns"]) # Use binlog's timestamp timestamp = record["timestamp"].to_i records.each do |row| @sync_fm.increment_and_save_table_position(row[TABLE_NAME]) do |seq| row[SEQ] = seq Engine.emit(@tag, timestamp, row) end end end private def encode_signless_integer(records, column_types) records.each do |record| record[ROW].keys.each do |position| index = position.to_i - 1 column_type = column_types[index] if INTEGER_TYPES.keys.include?(column_type) # It's a signless integer. intval = record[ROW][position] next unless (intval.kind_of?(Numeric) || intval =~ /^-?[\d]+$/) width = INTEGER_TYPES[column_type] * 2 # * 2 because a single byte requires two characters (e.g. ff) signless_val = SIGNLESS_INTEGER_PREFIX signless_val += sprintf("%0#{width}x", intval).gsub(/\.\.f/, 'f' * width).slice(-width..-1) record[ROW][position] = signless_val end end end end end class MysqlBinlogQueryHandler def initialize @mapping_table = [] end def dispatch(record) @mapping_table.each do |pattern, method_name| query = normalize_query(record["query"]) if (pattern.match(query)) if (self.respond_to?(method_name)) self.send(method_name, record, query) else raise "method '#{method_name}' is not defined in #{self.class.name} although its matching pattern is defined" end break end end end private def normalize_query(query) query = strip_comments(query) end def strip_comments(query) query = query.gsub(/--\s.*\n/, ' ') # -- style comments query = query.gsub(/\/\*[^\*].*\*\//, ' ') # /* */ style comments query = query.gsub(/\s+/, ' ') # replace multiple spaces with a space end end class FlydataMysqlBinlogQueryHandler < MysqlBinlogQueryHandler def initialize(opts) mandatory_opts = [:record_handler] missing_opts = mandatory_opts - opts.keys unless missing_opts.empty? raise "mandatory options are missing: #{missing_opts.join(", ")}" end @opts = opts @mapping_table = [ [/^alter table/i, :on_alter_table], ] end def on_alter_table(record, query) m = /alter table\s+(?