require 'flydata/fluent-plugins/mysql/binlog_record_handler' module Mysql class DmlRecordHandler < BinlogRecordHandler ROW = :row OLD = :old INTEGER_TYPES = {'TINY' => 1, 'SHORT' => 2, 'INT24' => 3, 'LONG' => 4, 'LONGLONG' => 8 } SIGNLESS_INTEGER_PREFIX = '0SL' def process(record, type) case type when :write_rows emit_insert(record) when :delete_rows emit_delete(record) when :update_rows emit_update(record) else raise "Invalid type:#{type}" end end private def emit_insert(record) emit_rows(:insert, record) end def emit_delete(record) emit_rows(:delete, record) end def emit_update(record) emit_rows(:update, record) do |row| # For update, row has two arrays (old and new values) { OLD => row.first, ROW => row.last } end end def emit_rows(type, record) emit_record(type, record) do |opt| table_name = record['table_name'] records = record["rows"].collect do |row| row_values = { ROW => row } row_values = yield(row) if block_given? # Give the caller a chance to generate the correct row values row_types = row_values.keys row_kinds = row_types.inject({}) {|m, k| m[k] = convert_to_flydata_row_format(table_name, row_values[k]); m} encode_signless_integer(row_kinds, record["columns"], row_types) row_kinds end records end end def convert_to_flydata_row_format(table_name, row) row.each.with_index(1).inject({}) do |h, (v, i)| if v.kind_of?(String) v = encode_row_value(table_name, v) end h[i.to_s] = v h end end def encode_row_value(table_name, value) if src_encoding = @table_meta[table_name][:encoding] value.encode('utf-8', src_encoding, :undef => :replace, :invalid => :replace) else value.encode('utf-16', :undef => :replace, :invalid => :replace).encode('utf-8') end end def encode_signless_integer(record, column_types, row_types) row_types.each do |row_type| record[row_type].keys.each do |position| index = position.to_i - 1 column_type = column_types[index] if INTEGER_TYPES.keys.include?(column_type) # It's a signless integer. intval = record[row_type][position] next unless (intval.kind_of?(Numeric) || intval =~ /^-?[\d]+$/) width = INTEGER_TYPES[column_type] * 2 # * 2 because a single byte requires two characters (e.g. ff) signless_val = SIGNLESS_INTEGER_PREFIX signless_val += sprintf("%0#{width}x", intval).gsub(/\.\.f/, 'f' * width).slice(-width..-1) record[row_type][position] = signless_val end end end end end end