module Fluent require 'fluent/plugin/in_mysql_binlog' require 'binlog' require 'kodama' # Load client library(flydata-agent/lib) lib = File.expand_path(File.join(File.dirname(__FILE__), '../../')) $LOAD_PATH.unshift(lib) unless $LOAD_PATH.include?(lib) require 'flydata' require 'flydata/sync_file_manager' require 'flydata-core/mysql/command_generator' require 'flydata/fluent-plugins/preference' require 'flydata/fluent-plugins/mysql/binlog_position_file' require 'flydata/fluent-plugins/mysql/binlog_record_dispatcher' require 'flydata/fluent-plugins/mysql/context' require 'flydata/fluent-plugins/idle_event_detector' require 'flydata/fluent-plugins/mysql/table_meta' require 'flydata/mysql/table_ddl' require 'flydata-core/fluent/config_helper' require 'flydata-core/mysql/ssl' #Monkey-patch fluentd class (EngineClass) to support shutdown for input plugin. #This will be called when USR1 signal is received class EngineClass #Send shutdown to all the sources def shutdown_source @sources.map {|s| Thread.new do begin s.shutdown rescue => e $log.warn "unexpected error while shutting down", :error_class=>e.class, :error=>e $log.warn_backtrace end end }.each {|t| t.join } end end class MysqlBinlogFlydataInput < MysqlBinlogInput include MysqlBinlogFlydataInputPreference Plugin.register_input('mysql_binlog_flydata', self) def initialize super install_custom_signal_handler end config_param :database, :string config_param :tables, :string config_param :tables_append_only, :string config_param :initial_idle_interval, :integer, :default => 30 # 30 sec config_param :continuous_idle_interval, :integer, :default => 600 # 10 min config_param :idle_timeout, :time, :default => 1800 # 30 min config_param :check_interval, :integer, :default => 5 # 5 sec config_param :ssl_ca_content, :string, :default => '' config_param :ssl_cipher, :string, :default => nil # Secondary ssl cipher is for a workaround to handle 'dh key too small' error # If mysql-server is v5.6.25 or older, and client openssl is # 1.0.1f-1ubuntu-2.15(for ubuntu) or later, client will get the above error because. # the key length returned from mysql-server is only 512 bits. # # In case of dh key error, we retry with non-DH cipher. # # * SECURITY IMPROVEMENT: reject dh keys smaller than 768 bits # https://launchpad.net/ubuntu/+source/openssl/1.0.1f-1ubuntu2.15 # # Supported ssl cipher list for mysql(openssl) # https://dev.mysql.com/doc/refman/5.6/en/ssl-options.html config_param :secondary_ssl_cipher, :string, :default => FlydataCore::Mysql::Ssl::NON_DH_SSL_CIPHER def configure(conf) super @binlog_position_file = Mysql::BinLogPositionFile.new(@position_file) unless @binlog_position_file.exists? raise "No position file(#{@position_file}). Initial synchronization is required before starting." end @sync_fm = Flydata::SyncFileManager.new(nil) # Passing nil for data_entry as this class does not use methods which require data_entry @sent_position_file_path = @sync_fm.sent_binlog_path(@position_file) load_custom_conf # SSL configuration unless @ssl_ca_content.to_s.strip.empty? @ssl_ca_path = @sync_fm.ssl_ca_path(@position_file) @sync_fm.save_ssl_ca(FlydataCore::Fluent::ConfigHelper.unescape_conf(@ssl_ca_content), @ssl_ca_path) end # Db access opts @db_opts = { host: @host, port: @port, username: @username, password: @password, database: @database, ssl_ca: @ssl_ca_path, ssl_cipher: @ssl_cipher } $log.info "mysql host:\"#{@host}\" port:\"#{@port}\" username:\"#{@username}\" database:\"#{@database}\" tables:\"#{@tables}\" tables_append_only:\"#{tables_append_only}\"" $log.info "mysql client version: #{`mysql -V`}" server_msg = `echo 'select version();' | #{FlydataCore::Mysql::CommandGenerator.generate_mysql_cmd(@db_opts)} 2>&1` if ($? && $?.exitstatus == 0) $log.info "mysql server version: #{server_msg.strip}" else err_msg = "Failed to access mysql server... #{server_msg.strip}" $log.error err_msg $stderr.puts err_msg #exit 1 # causes retry loop end @tables = @tables.split(/,\s*/) @omit_events = Hash.new @tables_append_only.split(/,\s*/).each do |table| @tables << table unless @tables.include?(table) @omit_events[table] = [:delete, :truncate_table] end # Remove tables that do not have pos files new_tables = @sync_fm.get_new_table_list(@tables, "pos") @tables -= new_tables $log.info "Not watching these tables: #{new_tables.join(", ")}" table_meta = Flydata::Mysql::TableMeta.new(@db_opts.merge(tables: @tables)) table_revs = tables.inject({}) do |h, table_name| h[table_name] = @sync_fm.table_rev(table_name) h end # Set context @context = Mysql::Context.new( database: @database, tables: @tables, tag: @tag, sync_fm: @sync_fm, omit_events: @omit_events, table_meta: table_meta, table_revs: table_revs, ) @record_dispatcher = Mysql::FlydataBinlogRecordDispatcher.new(@context) @idle_event_detector = IdleEventDetector.new(@initial_idle_interval, @continuous_idle_interval, @check_interval, @idle_timeout) @lock_file = Flydata::FLYDATA_LOCK end def start if File.exists?(@lock_file) $log.error "Previous process was terminated abnormally. To start, remove the lock file after checking data integrity." @abort = true Process.kill(:TERM, Process.ppid) return end super @idle_event_detector.start do |reason, timestamp| case reason when :event_not_coming $log.warn "No binary log event since #{timestamp}" when :event_still_not_coming $log.warn "No binary log event since #{timestamp}" when :event_arrived_finally $log.info "Binary log event has come at #{timestamp}" when :event_idle_timeout $log.warn "No binary log event since #{timestamp}. Restarting the process." Process.kill(:HUP, Process.ppid) end end positions_path = @context.sync_fm.table_positions_dir_path Dir.mkdir positions_path unless File.exists? positions_path rescue Binlog::Error if (/basic_string::_M_replace_aux/ === $!.to_s) # TODO Fix the root cause in mysql-replication-listener $log.error < e if /dh key too small/.match(e.to_s) && !retried && !@secondary_ssl_cipher.to_s.empty? retried = true current_ssl_cipher = @secondary_ssl_cipher $log.warn("Retry with secondary ssl cipher list due to '#{e}' - secondary_ssl_cipher: '#{@secondary_ssl_cipher}'") retry elsif /binlog file.*does not exist/.match(e.to_s) $log.error("#{e.to_s}. Sync must be reset. Terminating the agent.") Process.kill(:TERM, Process.ppid) else raise e end ensure if !transaction_broken && # leave the lock file when a transaction is broken File.exists?(@lock_file) && Process.pid == File.open(@lock_file, "r") {|f| f.read}.to_i File.delete(@lock_file) end end rescue => e # HACK: mysql-replication-listener has a network connection leak bug which doesn't release a connection # to MySQL. Rather than fixing the bug, restarting the fluentd process for now. $log.warn "kodama died with an error. Restart the process after #{@retry_wait} seconds. error: #{e.class.to_s} '#{e.to_s}'\n#{e.backtrace.join("\n")}" sleep @retry_wait Process.kill(:HUP, Process.ppid) # Send SIGHUP to the supervisor to restart fluentd rescue SignalException $log.debug "signal exception. exception: #{$!.class.to_s}, error: #{$!.to_s}" raise rescue Exception $log.error "unexpected fatal error. exception: #{$!.class.to_s}, error: #{$!.to_s}\n#{$!.backtrace.join("\n")}" raise end def start_kodama(options, &block) @kodama_client = Kodama::Client.new(Kodama::Client.mysql_url(options)) @kodama_client.logger = $log block.call(@kodama_client) @kodama_client.start end def event_listener(event) @idle_event_detector.notify @record_dispatcher.dispatch(event) rescue Exception => e position = @binlog_position_file.read $log.error "error occurred while processing #{event.event_type} event at #{position}, event_class:#{event.class.to_s}\n#{e.message}\n#{$!.backtrace.join("\n")}" raise end def shutdown return if instance_variable_defined? :@abort @idle_event_detector.stop if @thread and @thread.alive? $log.info "Requesting stop Kodama" begin @kodama_client.stop_request if wait_till_safe_to_stop $log.info "Killing Kodama client" Thread.kill(@thread) else $log.error "Unable to stop Kodama" end rescue => e $log.warn "an error occurred during Kodama shutdown. error:'#{e.to_s}'\n#{e.backtrace.join("\n")}" end end @sync_fm.close end def wait_till_safe_to_stop retry_count = 5 1.upto(retry_count) do |i| return true if @kodama_client.safe_to_stop? sleep 3 end false end #Hack: All that has been added here is `Fluent::Engine.shutdown_source`. This should be in #fluentd's supervisor#install_main_process_signal_handlers def install_custom_signal_handler trap :USR1 do $log.debug "fluentd main process get SIGUSR1" $log.info "force flushing buffered events" #@log.reopen! # Creating new thread due to mutex can't lock # in main thread during trap context Thread.new { begin Fluent::Engine.shutdown_source Fluent::Engine.flush! $log.debug "flushing thread: flushed" rescue Exception => e $log.warn "flushing thread error: #{e}" end }.run end end end end # HACK # Monkey patch the class to manage string's character encoding. module Binlog class Client old_method = instance_method(:wait_for_next_event) define_method(:wait_for_next_event) do event = old_method.bind(self).() if (event.kind_of?(Binlog::RowEvent)) class << event def rows rs = super # HACK # Set string encoding to binary because ruby-binlog has no knowledge # about the encoding of strings. new_rs = rs.collect {|row| row.collect{|value| if (value.kind_of?(Array)) # Update has two rows in it value.collect{|val| val.force_encoding("binary") if val.respond_to?(:force_encoding); val} else value.force_encoding("binary") if value.respond_to?(:force_encoding); value end } } new_rs end end end event end end end # HACK # Monkey patch so that we can replace Kodama's logger module Kodama Client.class_eval do attr_accessor :logger end end