Sha256: c66c8763dbed20a18b15b9fcacd037c6eaa8e4118bde3b6359e57d8146272c20
Contents?: true
Size: 1.53 KB
Versions: 34
Compression:
Stored size: 1.53 KB
Contents
require 'fluent/plugin/in_mysql_binlog' require 'binlog' require 'flydata/source_mysql/plugin_support/dml_record_handler' require 'flydata/source_mysql/plugin_support/binlog_query_dispatcher' module Flydata module SourceMysql module PluginSupport class BinlogRecordDispatcher def dispatch(event) method_name = "on_#{event.event_type.downcase}" if self.respond_to?(method_name) # TODO to_hash method call below can fail if event.event_type is # "Update_rows". This seems to be a bug of ruby-binlog. The bug must # be fixed when we support record update. record = Fluent::MysqlBinlogInput::BinlogUtil.to_hash(event) self.send(method_name, record) else # $log.trace "Unhandled type: #{record["event_type"]}" end end end class FlydataBinlogRecordDispatcher < BinlogRecordDispatcher def initialize(context) context.cur_src_pos_file = "" @context = context @query_dispatcher = FlydataBinlogQueryDispatcher.new(context) @dml_record_handler = DmlRecordHandler.new(context) end def on_rotate(record) @context.cur_src_pos_file = record["binlog_file"] end def on_write_rows(record) @dml_record_handler.process(record, :write_rows) end def on_update_rows(record) @dml_record_handler.process(record, :update_rows) end def on_delete_rows(record) @dml_record_handler.process(record, :delete_rows) end def on_query(record) @query_dispatcher.dispatch(record) end end end end end
Version data entries
34 entries across 34 versions & 1 rubygems