require_relative 'flydata_plugin_ext/base' module Fluent require 'fluent/plugin/in_mysql_binlog' require 'binlog' require 'kodama' require 'flydata/sync_file_manager' require 'flydata-core/mysql/command_generator' require 'flydata/fluent-plugins/flydata_plugin_ext/preference' require 'flydata/fluent-plugins/flydata_plugin_ext/idle_event_detector' require 'flydata/fluent-plugins/flydata_plugin_ext/flydata_sync' require 'flydata/source_mysql/plugin_support/source_position_file' require 'flydata/source_mysql/plugin_support/binlog_record_dispatcher' require 'flydata/source_mysql/plugin_support/context' require 'flydata/source_mysql/table_meta' require 'flydata/source_mysql/table_ddl' require 'flydata-core/fluent/config_helper' require 'flydata-core/mysql/ssl' class MysqlBinlogFlydataInput < MysqlBinlogInput include MysqlBinlogFlydataInputPreference include FlydataSync Plugin.register_input('mysql_binlog_flydata', self) SOURCE_POSITION_FILE_CLASS = Flydata::SourceMysql::PluginSupport::SourcePositionFile config_param :database, :string config_param :initial_idle_interval, :integer, :default => 30 # 30 sec config_param :continuous_idle_interval, :integer, :default => 600 # 10 min config_param :idle_timeout, :time, :default => 1800 # 30 min config_param :check_interval, :integer, :default => 5 # 5 sec config_param :ssl_ca_content, :string, :default => '' config_param :ssl_cipher, :string, :default => nil # Secondary ssl cipher is for a workaround to handle 'dh key too small' error # If mysql-server is v5.6.25 or older, and client openssl is # 1.0.1f-1ubuntu-2.15(for ubuntu) or later, client will get the above error because. # the key length returned from mysql-server is only 512 bits. # # In case of dh key error, we retry with non-DH cipher. # # * SECURITY IMPROVEMENT: reject dh keys smaller than 768 bits # https://launchpad.net/ubuntu/+source/openssl/1.0.1f-1ubuntu2.15 # # Supported ssl cipher list for mysql(openssl) # https://dev.mysql.com/doc/refman/5.6/en/ssl-options.html config_param :secondary_ssl_cipher, :string, default: FlydataCore::Mysql::Ssl::NON_DH_SSL_CIPHER def configure(conf) super # SSL configuration unless @ssl_ca_content.to_s.strip.empty? @ssl_ca_path = @sync_fm.ssl_ca_path(@source_position_file.path) @sync_fm.save_ssl_ca(FlydataCore::Fluent::ConfigHelper.unescape_conf(@ssl_ca_content), @ssl_ca_path) end # Db access opts @db_opts = { host: @host, port: @port, username: @username, password: @password, database: @database, ssl_ca: @ssl_ca_path, ssl_cipher: @ssl_cipher } $log.info "mysql host:\"#{@host}\" port:\"#{@port}\" username:\"#{@username}\" database:\"#{@database}\" tables:\"#{@tables}\" tables_append_only:\"#{tables_append_only}\"" $log.info "mysql client version: #{`mysql -V`}" server_msg = `echo 'select version();' | #{FlydataCore::Mysql::CommandGenerator.generate_mysql_cmd(@db_opts)} 2>&1` if ($? && $?.exitstatus == 0) $log.info "mysql server version: #{server_msg.strip}" else err_msg = "Failed to access mysql server... #{server_msg.strip}" $log.error err_msg $stderr.puts err_msg #exit 1 # causes retry loop end table_meta = Flydata::SourceMysql::TableMeta.new(@db_opts.merge(tables: @tables)) # Set context @context = Flydata::SourceMysql::PluginSupport::Context.new( database: @database, tables: @tables, tag: @tag, sync_fm: @sync_fm, omit_events: @omit_events, table_meta: table_meta, table_revs: @table_revs, dbconf: @db_opts ) @record_dispatcher = Flydata::SourceMysql::PluginSupport::FlydataBinlogRecordDispatcher.new(@context) @idle_event_detector = IdleEventDetector.new(@initial_idle_interval, @continuous_idle_interval, @check_interval, @idle_timeout) end def start super @idle_event_detector.start do |reason, timestamp| case reason when :event_not_coming $log.warn "No binary log event since #{timestamp}" when :event_still_not_coming $log.warn "No binary log event since #{timestamp}" when :event_arrived_finally $log.info "Binary log event has come at #{timestamp}" when :event_idle_timeout $log.warn "No binary log event since #{timestamp}. Restarting the process." Process.kill(:HUP, Process.ppid) end end rescue Binlog::Error if (/basic_string::_M_replace_aux/ === $!.to_s) # TODO Fix the root cause in mysql-replication-listener $log.error < e if /dh key too small/.match(e.to_s) && !retried && !@secondary_ssl_cipher.to_s.empty? retried = true current_ssl_cipher = @secondary_ssl_cipher $log.warn("Retry with secondary ssl cipher list due to '#{e}' - secondary_ssl_cipher: '#{@secondary_ssl_cipher}'") retry elsif /binlog file.*does not exist/.match(e.to_s) $log.error("#{e.to_s}. Sync must be reset. Terminating the agent.") Process.kill(:TERM, Process.ppid) elsif e.kind_of?(Kodama::ConnectionEstablishError) $log.error("#{e.to_s}. Connection to MySQL server could not be established. Terminating the agent.") Process.kill(:TERM, Process.ppid) else raise e end end end rescue => e # HACK: mysql-replication-listener has a network connection leak bug which doesn't release a connection # to MySQL. Rather than fixing the bug, restarting the fluentd process for now. $log.warn "kodama died with an error. Restart the process after #{@retry_wait} seconds. error: #{e.class.to_s} '#{e.to_s}'\n#{e.backtrace.join("\n")}" sleep @retry_wait Process.kill(:HUP, Process.ppid) # Send SIGHUP to the supervisor to restart fluentd rescue SignalException $log.debug "signal exception. exception: #{$!.class.to_s}, error: #{$!.to_s}" raise rescue Exception $log.error "unexpected fatal error. exception: #{$!.class.to_s}, error: #{$!.to_s}\n#{$!.backtrace.join("\n")}" raise end def start_kodama(options, &block) @kodama_client = Kodama::Client.new(Kodama::Client.mysql_url(options)) @kodama_client.logger = $log block.call(@kodama_client) @kodama_client.start end def event_listener(event) @idle_event_detector.notify @record_dispatcher.dispatch(event) rescue Exception => e position = @source_position_file.read $log.error "error occurred while processing #{event.event_type} event at #{position}, event_class:#{event.class.to_s}\n#{e.message}\n#{$!.backtrace.join("\n")}" raise end def shutdown return if process_aborted? @idle_event_detector.stop if @thread and @thread.alive? $log.info "Requesting stop Kodama" begin @kodama_client.stop_request if wait_till_safe_to_stop @thread.join $log.info "Kodama has stopped successfully" else $log.error "Unable to stop Kodama" end rescue => e $log.warn "an error occurred during Kodama shutdown. error:'#{e.to_s}'\n#{e.backtrace.join("\n")}" end end @sync_fm.close end def wait_till_safe_to_stop retry_count = 5 1.upto(retry_count) do |i| return true if @kodama_client.safe_to_stop? sleep 3 end false end end end # HACK # Monkey patch the class to manage string's character encoding. module Binlog class Client old_method = instance_method(:wait_for_next_event) define_method(:wait_for_next_event) do event = old_method.bind(self).() if (event.kind_of?(Binlog::RowEvent)) class << event def rows rs = super # HACK # Set string encoding to binary because ruby-binlog has no knowledge # about the encoding of strings. new_rs = rs.collect {|row| row.collect{|value| if (value.kind_of?(Array)) # Update has two rows in it value.collect{|val| val.force_encoding("binary") if val.respond_to?(:force_encoding); val} else value.force_encoding("binary") if value.respond_to?(:force_encoding); value end } } new_rs end end end event end end end # HACK # Monkey patch so that we can replace Kodama's logger module Kodama Client.class_eval do attr_accessor :logger end end