module Fluent require 'fluent/plugin/in_mysql_binlog' require 'binlog' require 'kodama' # Load client library(flydata/cli/lib) lib = File.absolute_path(File.dirname(__FILE__) + '/../..') $LOAD_PATH.unshift(lib) unless $LOAD_PATH.include?(lib) require 'flydata' require 'flydata/sync_file_manager' require_relative 'preference' require_relative 'mysql/binlog_position_file' require_relative 'mysql/binlog_record_dispatcher' require_relative 'mysql/context' require_relative 'idle_event_detector' require_relative 'mysql/table_meta' #Monkey-patch fluentd class (EngineClass) to support shutdown for input plugin. #This will be called when USR1 signal is received class EngineClass #Send shutdown to all the sources def shutdown_source @sources.map {|s| Thread.new do begin s.shutdown rescue => e $log.warn "unexpected error while shutting down", :error_class=>e.class, :error=>e $log.warn_backtrace end end }.each {|t| t.join } end end class MysqlBinlogFlydataInput < MysqlBinlogInput include MysqlBinlogFlydataInputPreference Plugin.register_input('mysql_binlog_flydata', self) def initialize super install_custom_signal_handler end config_param :database, :string config_param :tables, :string config_param :tables_append_only, :string config_param :initial_idle_interval, :integer, :default => 30 config_param :continuous_idle_interval, :integer, :default => 600 config_param :check_interval, :integer, :default => 5 def configure(conf) super @binlog_position_file = Mysql::BinLogPositionFile.new(@position_file) unless @binlog_position_file.exists? raise "No position file(#{@position_file}). Initial synchronization is required before starting." end load_custom_conf $log.info "mysql host:\"#{@host}\" port:\"#{@port}\" username:\"#{@username}\" database:\"#{@database}\" tables:\"#{@tables}\" tables_append_only:\"#{tables_append_only}\"" $log.info "mysql client version: #{`mysql -V`}" server_version = `echo 'select version();' | MYSQL_PWD="#{@password}" mysql -h #{@host} --port #{@port} -u #{@username} 2>/dev/null` $log.info "mysql server version: #{server_version}" @tables = @tables.split(/,\s*/) @omit_events = Hash.new @tables_append_only.split(/,\s*/).each do |table| @tables << table unless @tables.include?(table) @omit_events[table] = [:delete] end sync_fm = Flydata::FileUtil::SyncFileManager.new(nil) # Passing nil for data_entry as this class does not use methods which require data_entry table_meta = Mysql::TableMeta.new( mysql_url: mysql_url, database: @database, tables: @tables) table_meta.update @context = Mysql::Context.new( database: @database, tables: @tables, tag: @tag, sync_fm: sync_fm, omit_events: @omit_events, table_meta: table_meta, ) @record_dispatcher = Mysql::FlydataBinlogRecordDispatcher.new(@context) @idle_event_detector = IdleEventDetector.new(@initial_idle_interval, @continuous_idle_interval, @check_interval) end def start super @idle_event_detector.start do |reason, timestamp| case reason when :event_not_coming $log.warn "No binary log event since #{timestamp}" when :event_still_not_coming $log.warn "No binary log event since #{timestamp}" when :event_arrived_finally $log.info "Binary log event has come at #{timestamp}" end end positions_path = @context.sync_fm.table_positions_dir_path Dir.mkdir positions_path unless File.exists? positions_path rescue Binlog::Error if (/basic_string::_M_replace_aux/ === $!.to_s) # TODO Fix the root cause in mysql-replication-listener $log.error < e position = @binlog_position_file.read $log.error "error occured while processing #{event.event_type} event at #{position}\n#{e.message}\n#{$!.backtrace.join("\n")}" # Not reraising a StandardError because the underlying code can't handle an error well. raise unless e.kind_of?(StandardError) end def shutdown @idle_event_detector.stop if @thread and @thread.alive? $log.info "Requesting stop Kodama" @kodama_client.stop_request if wait_till_safe_to_stop $log.info "Killing Kodama client" Thread.kill(@thread) else $log.error "Unable to stop Kodama" end end end def wait_till_safe_to_stop retry_count = 5 1.upto(retry_count) do |i| return true if @kodama_client.safe_to_stop? sleep 3 end false end #Hack: All that has been added here is `Fluent::Engine.shutdown_source`. This should be in #fluentd's supervisor#install_main_process_signal_handlers def install_custom_signal_handler trap :USR1 do $log.debug "fluentd main process get SIGUSR1" $log.info "force flushing buffered events" #@log.reopen! # Creating new thread due to mutex can't lock # in main thread during trap context Thread.new { begin Fluent::Engine.shutdown_source Fluent::Engine.flush! $log.debug "flushing thread: flushed" rescue Exception => e $log.warn "flushing thread error: #{e}" end }.run end end end end # HACK # Monkey patch the class to manage string's character encoding. module Binlog class Client old_method = instance_method(:wait_for_next_event) define_method(:wait_for_next_event) do event = old_method.bind(self).() if (event.kind_of?(Binlog::RowEvent)) class << event def rows rs = super # HACK # Assuming all string values are UTF-8 # To make this right, MySQL client's encoding must be set to UTF-8 # But how? new_rs = rs.collect {|row| row.collect{|value| if (value.kind_of?(Array)) # Update has two rows in it value.collect{|val| val.force_encoding("UTF-8") if val.respond_to?(:force_encoding); val} else value.force_encoding("UTF-8") if value.respond_to?(:force_encoding); value end } } new_rs end end end event end end end # HACK # Monkey patch so that we can replace Kodama's logger module Kodama Client.class_eval do attr_accessor :logger end end