require_relative 'binlog_record_handler' module Mysql class DmlRecordHandler < BinlogRecordHandler ROW = :row INTEGER_TYPES = {'TINY' => 1, 'SHORT' => 2, 'INT24' => 3, 'LONG' => 4, 'LONGLONG' => 8 } SIGNLESS_INTEGER_PREFIX = '0SL' def process(record, type) case type when :write_rows emit_insert(record) when :delete_rows emit_delete(record) when :update_rows emit_update(record) else raise "Invalid type:#{type}" end end private def emit_insert(record) emit_rows(:insert, record) end def emit_delete(record) emit_rows(:delete, record) end def emit_update(record) emit_rows(:update, record) do |row| row.last # For update, row has two arrays (old and new values) Use new values end end def emit_rows(type, record) emit_record(type, record) do |opt| table_name = record['table_name'] records = record["rows"].collect do |row| row = yield(row) if block_given? # Give the caller a chance to generate the correct row { ROW => convert_to_flydata_row_format(table_name, row) } end encode_signless_integer(records, record["columns"]) records end end def convert_to_flydata_row_format(table_name, row) row.each.with_index(1).inject({}) do |h, (v, i)| if v.kind_of?(String) v = encode_row_value(table_name, v) end h[i.to_s] = v h end end def encode_row_value(table_name, value) if src_encoding = @table_meta[table_name][:encoding] value.encode('utf-8', src_encoding, :undef => :replace, :invalid => :replace) else value.encode('utf-16', :undef => :replace, :invalid => :replace).encode('utf-8') end end def encode_signless_integer(records, column_types) records.each do |record| record[ROW].keys.each do |position| index = position.to_i - 1 column_type = column_types[index] if INTEGER_TYPES.keys.include?(column_type) # It's a signless integer. intval = record[ROW][position] next unless (intval.kind_of?(Numeric) || intval =~ /^-?[\d]+$/) width = INTEGER_TYPES[column_type] * 2 # * 2 because a single byte requires two characters (e.g. ff) signless_val = SIGNLESS_INTEGER_PREFIX signless_val += sprintf("%0#{width}x", intval).gsub(/\.\.f/, 'f' * width).slice(-width..-1) record[ROW][position] = signless_val end end end end end end