module Fluent require 'fluent/plugin/in_mysql_binlog' require 'binlog' require 'kodama' # Load client library(flydata-agent/lib) lib = File.expand_path(File.join(File.dirname(__FILE__), '../../')) $LOAD_PATH.unshift(lib) unless $LOAD_PATH.include?(lib) require 'flydata' require 'flydata/sync_file_manager' require 'flydata-core/mysql/command_generator' require 'flydata/fluent-plugins/preference' require 'flydata/fluent-plugins/mysql/binlog_position_file' require 'flydata/fluent-plugins/mysql/binlog_record_dispatcher' require 'flydata/fluent-plugins/mysql/context' require 'flydata/fluent-plugins/idle_event_detector' require 'flydata/fluent-plugins/mysql/table_meta' require 'flydata/mysql/table_ddl' require 'flydata-core/fluent/config_helper' #Monkey-patch fluentd class (EngineClass) to support shutdown for input plugin. #This will be called when USR1 signal is received class EngineClass #Send shutdown to all the sources def shutdown_source {|s| do begin s.shutdown rescue => e $log.warn "unexpected error while shutting down", :error_class=>e.class, :error=>e $log.warn_backtrace end end }.each {|t| t.join } end end class MysqlBinlogFlydataInput < MysqlBinlogInput include MysqlBinlogFlydataInputPreference Plugin.register_input('mysql_binlog_flydata', self) def initialize super install_custom_signal_handler end config_param :database, :string config_param :tables, :string config_param :tables_append_only, :string config_param :initial_idle_interval, :integer, :default => 30 # 30 sec config_param :continuous_idle_interval, :integer, :default => 600 # 10 min config_param :idle_timeout, :time, :default => 1800 # 30 min config_param :check_interval, :integer, :default => 5 # 5 sec config_param :ssl_ca_content, :string, :default => '' config_param :ssl_cipher, :string, :default => nil # Secondary ssl cipher is for a workaround to handle 'dh key too small' error # If mysql-server is v5.6.25 or older, and client openssl is # 1.0.1f-1ubuntu-2.15(for ubuntu) or later, client will get the above error because. # the key length returned from mysql-server is only 512 bits. # # In case of dh key error, we retry with non-DH cipher. # # * SECURITY IMPROVEMENT: reject dh keys smaller than 768 bits # # # Supported ssl cipher list for mysql(openssl) # NON_DH_SSL_CIPHER = "AES256-GCM-SHA384:AES256-SHA:AES256-SHA256:CAMELLIA256-SHA:DES-CBC3-SHA:PSK-3DES-EDE-CBC-SHA:PSK-AES256-CBC-SHA:SRP-DSS-3DES-EDE-CBC-SHA:SRP-DSS-AES-128-CBC-SHA:SRP-DSS-AES-256-CBC-SHA:SRP-RSA-3DES-EDE-CBC-SHA:SRP-RSA-AES-128-CBC-S:SRP-RSA-AES-256-CBC-SHA" config_param :secondary_ssl_cipher, :string, :default => NON_DH_SSL_CIPHER def configure(conf) super @binlog_position_file = unless @binlog_position_file.exists? raise "No position file(#{@position_file}). Initial synchronization is required before starting." end @sync_fm = # Passing nil for data_entry as this class does not use methods which require data_entry @sent_position_file_path = @sync_fm.sent_binlog_path(@position_file) load_custom_conf # SSL configuration unless @ssl_ca_content.to_s.strip.empty? @ssl_ca_path = @sync_fm.ssl_ca_path(@position_file) @sync_fm.save_ssl_ca(FlydataCore::Fluent::ConfigHelper.unescape_conf(@ssl_ca_content), @ssl_ca_path) end # Db access opts @db_opts = { host: @host, port: @port, username: @username, password: @password, database: @database, ssl_ca: @ssl_ca_path } $ "mysql host:\"#{@host}\" port:\"#{@port}\" username:\"#{@username}\" database:\"#{@database}\" tables:\"#{@tables}\" tables_append_only:\"#{tables_append_only}\"" $ "mysql client version: #{`mysql -V`}" server_msg = `echo 'select version();' | #{FlydataCore::Mysql::CommandGenerator.generate_mysql_cmd(@db_opts)} 2>&1` if ($?.exitstatus == 0) $ "mysql server version: #{server_msg.strip}" else err_msg = "Failed to access mysql server... #{server_msg.strip}" $log.error err_msg $stderr.puts err_msg #exit 1 # causes retry loop end @tables = @tables.split(/,\s*/) @omit_events = @tables_append_only.split(/,\s*/).each do |table| @tables << table unless @tables.include?(table) @omit_events[table] = [:delete, :truncate_table] end # Remove tables that do not have pos files new_tables = @sync_fm.get_new_table_list(@tables, "pos") @tables -= new_tables $ "Not watching these tables: #{new_tables.join(", ")}" table_meta = @tables)) table_revs = tables.inject({}) do |h, table_name| h[table_name] = @sync_fm.table_rev(table_name) h end # Set context @context = database: @database, tables: @tables, tag: @tag, sync_fm: @sync_fm, omit_events: @omit_events, table_meta: table_meta, table_revs: table_revs, ) @record_dispatcher = @idle_event_detector =, @continuous_idle_interval, @check_interval, @idle_timeout) end def start super @idle_event_detector.start do |reason, timestamp| case reason when :event_not_coming $log.warn "No binary log event since #{timestamp}" when :event_still_not_coming $log.warn "No binary log event since #{timestamp}" when :event_arrived_finally $ "Binary log event has come at #{timestamp}" when :event_idle_timeout $log.error "No binary log event since #{timestamp}. Restart the process." Process.kill(:HUP, Process.ppid) end end positions_path = @context.sync_fm.table_positions_dir_path Dir.mkdir positions_path unless File.exists? positions_path rescue Binlog::Error if (/basic_string::_M_replace_aux/ === $!.to_s) # TODO Fix the root cause in mysql-replication-listener $log.error <<EOS a mysql-replication-listener error. This could have been caused by one of the following reasons. - Failed on connect: Your host is blocked because of many connection errors; unblock with 'mysqladmin flush-hosts' EOS else $log.error "unexpected mysql-replication-listener error. exception: #{$!.class.to_s}, error: #{$!.to_s}\n#{$!.backtrace.join("\n")}" end raise rescue Exception $log.error "unexpected fatal error. exception: #{$!.class.to_s}, error: #{$!.to_s}\n#{$!.backtrace.join("\n")}" raise end def run @context.table_meta.update Flydata::Mysql::TableDdl.migrate_tables(@context.tables, @db_opts, @context.sync_fm, @position_file, @context) do |event| @record_dispatcher.dispatch(event) end current_ssl_cipher = @ssl_cipher retried = false begin start_kodama(mysql_url) do |c| c.binlog_position_file = @position_file if @ssl_ca_path.to_s != '' && c.respond_to?(:ssl_ca=) $ "SSL is enabled. (ssl_ca: #{@ssl_ca_path})" c.ssl_ca = @ssl_ca_path unless current_ssl_cipher.to_s.empty? $ "SSL cipher is set. (ssl_cipher: #{current_ssl_cipher})" c.ssl_cipher = current_ssl_cipher end end if c.respond_to?(:sent_binlog_position_file=) $ "Sent position feature is enabled. sent_position_file:#{@sent_position_file_path}" c.sent_binlog_position_file = @sent_position_file_path end $"Binlog position - resume_pos:'#{ rescue IOError}' " + "sent_pos:'#{ rescue IOError}'") c.connection_retry_limit = @retry_limit c.connection_retry_wait = @retry_wait c.log_level = @log_level.to_sym @listen_events.each do |event_type| $log.trace { "registered binlog event listener '#{event_type}'" } c.send("on_#{event_type}", &method(:event_listener)) end end rescue Binlog::Error => e if /dh key too small/.match(e.to_s) && !retried && !@secondary_ssl_cipher.to_s.empty? retried = true current_ssl_cipher = @secondary_ssl_cipher $log.warn("Retry with secondary ssl cipher list due to '#{e}' - secondary_ssl_cipher: '#{@secondary_ssl_cipher}'") retry else raise e end end rescue => e # HACK: mysql-replication-listener has a network connection leak bug which doesn't release a connection # to MySQL. Rather than fixing the bug, restarting the fluentd process for now. $log.warn "kodama died with an error. Restart the process after #{@retry_wait} seconds. error: #{e.class.to_s} '#{e.to_s}'\n#{e.backtrace.join("\n")}" sleep @retry_wait Process.kill(:HUP, Process.ppid) # Send SIGHUP to the supervisor to restart fluentd rescue SignalException $log.debug "signal exception. exception: #{$!.class.to_s}, error: #{$!.to_s}" raise rescue Exception $log.error "unexpected fatal error. exception: #{$!.class.to_s}, error: #{$!.to_s}\n#{$!.backtrace.join("\n")}" raise end def start_kodama(options, &block) @kodama_client = @kodama_client.logger = $log @kodama_client.start end def event_listener(event) @idle_event_detector.notify @record_dispatcher.dispatch(event) rescue Exception => e position = $log.error "error occured while processing #{event.event_type} event at #{position}\n#{e.message}\n#{$!.backtrace.join("\n")}" # Not reraising a StandardError because the underlying code can't handle an error well. raise unless e.kind_of?(StandardError) end def shutdown @idle_event_detector.stop if @thread and @thread.alive? $ "Requesting stop Kodama" @kodama_client.stop_request if wait_till_safe_to_stop $ "Killing Kodama client" Thread.kill(@thread) else $log.error "Unable to stop Kodama" end end @sync_fm.close end def wait_till_safe_to_stop retry_count = 5 1.upto(retry_count) do |i| return true if @kodama_client.safe_to_stop? sleep 3 end false end #Hack: All that has been added here is `Fluent::Engine.shutdown_source`. This should be in #fluentd's supervisor#install_main_process_signal_handlers def install_custom_signal_handler trap :USR1 do $log.debug "fluentd main process get SIGUSR1" $ "force flushing buffered events" #@log.reopen! # Creating new thread due to mutex can't lock # in main thread during trap context { begin Fluent::Engine.shutdown_source Fluent::Engine.flush! $log.debug "flushing thread: flushed" rescue Exception => e $log.warn "flushing thread error: #{e}" end }.run end end end end # HACK # Monkey patch the class to manage string's character encoding. module Binlog class Client old_method = instance_method(:wait_for_next_event) define_method(:wait_for_next_event) do event = old_method.bind(self).() if (event.kind_of?(Binlog::RowEvent)) class << event def rows rs = super # HACK # Set string encoding to binary because ruby-binlog has no knowledge # about the encoding of strings. new_rs = rs.collect {|row| row.collect{|value| if (value.kind_of?(Array)) # Update has two rows in it value.collect{|val| val.force_encoding("binary") if val.respond_to?(:force_encoding); val} else value.force_encoding("binary") if value.respond_to?(:force_encoding); value end } } new_rs end end end event end end end # HACK # Monkey patch so that we can replace Kodama's logger module Kodama Client.class_eval do attr_accessor :logger end end