require 'flydata/source_mysql/plugin_support/binlog_record_handler' module Flydata module SourceMysql module PluginSupport class DmlRecordHandler < BinlogRecordHandler ROW = :row OLD = :old INTEGER_TYPES = {'TINY' => 1, 'SHORT' => 2, 'INT24' => 3, 'LONG' => 4, 'LONGLONG' => 8 } SIGNLESS_INTEGER_PREFIX = '0SL' def process(record, type) case type when :write_rows emit_insert(record) when :delete_rows emit_delete(record) when :update_rows emit_update(record) else raise "Invalid type:#{type}" end end private def emit_insert(record) emit_rows(:insert, record) end def emit_delete(record) emit_rows(:delete, record) end def emit_update(record) emit_rows(:update, record) do |row| # For update, row has two arrays (old and new values) { OLD => row.first, ROW => row.last } end end def emit_rows(type, record) emit_record(type, record) do |opt| table_name = record['table_name'] records = record["rows"].collect do |row| row_values = { ROW => row } row_values = yield(row) if block_given? # Give the caller a chance to generate the correct row values row_types = row_values.keys row_kinds = row_types.inject({}) {|m, k| m[k] = convert_to_flydata_row_format(row_values[k]); m} encode_signless_integer(row_kinds, record["columns"], row_types) row_kinds end records end end def convert_to_flydata_row_format(row) row.each.with_index(1).inject({}) do |h, (v, i)| if v.kind_of?(String) v = encode_row_value(v) if v.encoding == Encoding::BINARY h['attrs'] ||= {} h['attrs'][i.to_s] ||= {} h['attrs'][i.to_s]['enc'] = 'b' end end h[i.to_s] = v h end end def encode_row_value(value) value.force_encoding('utf-8') unless value.valid_encoding? # value can't be treated as UTF-8. Convert to Flydata binary format. value = "0x" + value.unpack('c*').collect{|c| "%02X" % (c < 0 ? c + 256 : c)}.join('') value.force_encoding('binary') # setting to 'binary' to differentiate from UTF-8. Downstream code may rely on it. end value end def encode_signless_integer(record, column_types, row_types) row_types.each do |row_type| record[row_type].keys.each do |position| index = position.to_i - 1 column_type = column_types[index] if INTEGER_TYPES.keys.include?(column_type) # It's a signless integer. intval = record[row_type][position] next unless (intval.kind_of?(Numeric) || intval =~ /^-?[\d]+$/) begin width = INTEGER_TYPES[column_type] * 2 # * 2 because a single byte requires two characters (e.g. ff) signless_val = SIGNLESS_INTEGER_PREFIX signless_val += sprintf("%0#{width}x", intval).gsub(/\.\.f/, 'f' * width).slice(-width..-1) record[row_type][position] = signless_val rescue => e $log.debug "failed to encode signless integer. - exception:`#{e.class.to_s}` record:`#{record[row_type][position]}` column_type:`#{column_type}` width:`#{width}` intval:`#{intval}`" raise end end end end end end end end end