module Fluent

require 'fluent/plugin/in_mysql_binlog'
require 'binlog'
require 'kodama'

# Load client library(flydata-agent/lib)
lib = File.expand_path(File.join(File.dirname(__FILE__), '../../'))
$LOAD_PATH.unshift(lib) unless $LOAD_PATH.include?(lib)
require 'flydata'
require 'flydata/sync_file_manager'
require 'flydata-core/mysql/command_generator'
require 'flydata/fluent-plugins/preference'
require 'flydata/fluent-plugins/mysql/binlog_position_file'
require 'flydata/fluent-plugins/mysql/binlog_record_dispatcher'
require 'flydata/fluent-plugins/mysql/context'
require 'flydata/fluent-plugins/idle_event_detector'
require 'flydata/fluent-plugins/mysql/table_meta'
require 'flydata/mysql/table_ddl'
require 'flydata-core/fluent/config_helper'

#Monkey-patch fluentd class (EngineClass) to support shutdown for input plugin.
#This will be called when USR1 signal is received
class EngineClass
  #Send shutdown to all the sources
  def shutdown_source
    @sources.map {|s|
      Thread.new do
        begin
          s.shutdown
        rescue => e
          $log.warn "unexpected error while shutting down", :error_class=>e.class, :error=>e
          $log.warn_backtrace
        end
      end
    }.each {|t|
      t.join
    }
  end
end


class MysqlBinlogFlydataInput < MysqlBinlogInput
  include MysqlBinlogFlydataInputPreference
  Plugin.register_input('mysql_binlog_flydata', self)

  def initialize
    super
    install_custom_signal_handler
  end

  config_param :database, :string
  config_param :tables, :string
  config_param :tables_append_only, :string
  config_param :initial_idle_interval, :integer, :default => 30     # 30 sec
  config_param :continuous_idle_interval, :integer, :default => 600 # 10 min
  config_param :idle_timeout, :time, :default => 1800               # 30 min
  config_param :check_interval, :integer, :default => 5             # 5 sec
  config_param :ssl_ca_content, :string, :default => ''
  config_param :ssl_cipher, :string, :default => nil

  # Secondary ssl cipher is for a workaround to handle 'dh key too small' error
  # If mysql-server is v5.6.25 or older, and client openssl is
  # 1.0.1f-1ubuntu-2.15(for ubuntu) or later, client will get the above error because.
  # the key length returned from mysql-server is only 512 bits.
  #
  # In case of dh key error, we retry with non-DH cipher.
  #
  # * SECURITY IMPROVEMENT: reject dh keys smaller than 768 bits
  # https://launchpad.net/ubuntu/+source/openssl/1.0.1f-1ubuntu2.15
  #
  # Supported ssl cipher list for mysql(openssl)
  # https://dev.mysql.com/doc/refman/5.6/en/ssl-options.html

  NON_DH_SSL_CIPHER = "AES256-GCM-SHA384:AES256-SHA:AES256-SHA256:CAMELLIA256-SHA:DES-CBC3-SHA:PSK-3DES-EDE-CBC-SHA:PSK-AES256-CBC-SHA:SRP-DSS-3DES-EDE-CBC-SHA:SRP-DSS-AES-128-CBC-SHA:SRP-DSS-AES-256-CBC-SHA:SRP-RSA-3DES-EDE-CBC-SHA:SRP-RSA-AES-128-CBC-S:SRP-RSA-AES-256-CBC-SHA"
  config_param :secondary_ssl_cipher, :string, :default => NON_DH_SSL_CIPHER

  def configure(conf)
    super
    @binlog_position_file = Mysql::BinLogPositionFile.new(@position_file)
    unless @binlog_position_file.exists?
      raise "No position file(#{@position_file}). Initial synchronization is required before starting."
    end

    @sync_fm = Flydata::SyncFileManager.new(nil) # Passing nil for data_entry as this class does not use methods which require data_entry
    @sent_position_file_path = @sync_fm.sent_binlog_path(@position_file)

    load_custom_conf

    # SSL configuration
    unless @ssl_ca_content.to_s.strip.empty?
      @ssl_ca_path = @sync_fm.ssl_ca_path(@position_file)
      @sync_fm.save_ssl_ca(FlydataCore::Fluent::ConfigHelper.unescape_conf(@ssl_ca_content), @ssl_ca_path)
    end

    # Db access opts
    @db_opts = { host: @host, port: @port, username: @username, password: @password, database: @database, ssl_ca: @ssl_ca_path }

    $log.info "mysql host:\"#{@host}\" port:\"#{@port}\" username:\"#{@username}\" database:\"#{@database}\" tables:\"#{@tables}\" tables_append_only:\"#{tables_append_only}\""
    $log.info "mysql client version: #{`mysql -V`}"
    server_msg = `echo 'select version();' | #{FlydataCore::Mysql::CommandGenerator.generate_mysql_cmd(@db_opts)} 2>&1`
    if ($?.exitstatus == 0)
      $log.info "mysql server version: #{server_msg.strip}"
    else
      err_msg = "Failed to access mysql server... #{server_msg.strip}"
      $log.error err_msg
      $stderr.puts err_msg
      #exit 1   # causes retry loop
    end

    @tables = @tables.split(/,\s*/)
    @omit_events = Hash.new
    @tables_append_only.split(/,\s*/).each do |table|
      @tables << table unless @tables.include?(table)
      @omit_events[table] = [:delete, :truncate_table]
    end

    # Remove tables that do not have pos files
    new_tables = @sync_fm.get_new_table_list(@tables, "pos")
    @tables -= new_tables
    $log.info "Not watching these tables: #{new_tables.join(", ")}"

    table_meta = Flydata::Mysql::TableMeta.new(@db_opts.merge(tables: @tables))

    table_revs = tables.inject({}) do |h, table_name|
      h[table_name] = @sync_fm.table_rev(table_name)
      h
    end

    # Set context
    @context = Mysql::Context.new(
      database: @database, tables: @tables,
      tag: @tag, sync_fm: @sync_fm, omit_events: @omit_events,
      table_meta: table_meta, table_revs: table_revs,
    )
    @record_dispatcher = Mysql::FlydataBinlogRecordDispatcher.new(@context)
    @idle_event_detector = IdleEventDetector.new(@initial_idle_interval, @continuous_idle_interval, @check_interval, @idle_timeout)
  end

  def start
    super
    @idle_event_detector.start do |reason, timestamp|
      case reason
      when :event_not_coming
        $log.warn "No binary log event since #{timestamp}"
      when :event_still_not_coming
        $log.warn "No binary log event since #{timestamp}"
      when :event_arrived_finally
        $log.info "Binary log event has come at #{timestamp}"
      when :event_idle_timeout
        $log.error "No binary log event since #{timestamp}. Restart the process."
        Process.kill(:HUP, Process.ppid)
      end
    end

    positions_path = @context.sync_fm.table_positions_dir_path
    Dir.mkdir positions_path unless File.exists? positions_path
  rescue Binlog::Error
    if (/basic_string::_M_replace_aux/ === $!.to_s)
      # TODO Fix the root cause in mysql-replication-listener
      $log.error <<EOS
a mysql-replication-listener error.  This could have been caused by one of the following reasons.
  - Failed on connect: Your host is blocked because of many connection errors; unblock with 'mysqladmin flush-hosts'
EOS
    else
      $log.error "unexpected mysql-replication-listener error. exception: #{$!.class.to_s}, error: #{$!.to_s}\n#{$!.backtrace.join("\n")}"
    end
    raise
  rescue Exception
    $log.error "unexpected fatal error. exception: #{$!.class.to_s}, error: #{$!.to_s}\n#{$!.backtrace.join("\n")}"
    raise
  end

  def run
    @context.table_meta.update
    Flydata::Mysql::TableDdl.migrate_tables(@context.tables, @db_opts,
                                            @context.sync_fm, @position_file,
                                            @context) do |event|
      @record_dispatcher.dispatch(event)
    end

    current_ssl_cipher = @ssl_cipher
    retried = false
    begin
      start_kodama(mysql_url) do |c|
        c.binlog_position_file = @position_file
        if @ssl_ca_path.to_s != '' && c.respond_to?(:ssl_ca=)
          $log.info "SSL is enabled. (ssl_ca: #{@ssl_ca_path})"
          c.ssl_ca = @ssl_ca_path
          unless current_ssl_cipher.to_s.empty?
            $log.info "SSL cipher is set. (ssl_cipher: #{current_ssl_cipher})"
            c.ssl_cipher = current_ssl_cipher
          end
        end

        if c.respond_to?(:sent_binlog_position_file=)
          $log.info "Sent position feature is enabled. sent_position_file:#{@sent_position_file_path}"
          c.sent_binlog_position_file = @sent_position_file_path
        end

        $log.info("Binlog position - resume_pos:'#{IO.read(@position_file) rescue IOError}' " +
                                    "sent_pos:'#{IO.read(@sent_position_file_path) rescue IOError}'")

        c.connection_retry_limit = @retry_limit
        c.connection_retry_wait = @retry_wait
        c.log_level = @log_level.to_sym
        @listen_events.each do |event_type|
          $log.trace { "registered binlog event listener '#{event_type}'" }
          c.send("on_#{event_type}", &method(:event_listener))
        end
      end
    rescue Binlog::Error => e
      if /dh key too small/.match(e.to_s) && !retried && !@secondary_ssl_cipher.to_s.empty?
        retried = true
        current_ssl_cipher = @secondary_ssl_cipher
        $log.warn("Retry with secondary ssl cipher list due to '#{e}' - secondary_ssl_cipher: '#{@secondary_ssl_cipher}'")
        retry
      else
        raise e
      end
    end

  rescue => e
    # HACK: mysql-replication-listener has a network connection leak bug which doesn't release a connection
    # to MySQL.  Rather than fixing the bug, restarting the fluentd process for now.
    $log.warn "kodama died with an error.  Restart the process after #{@retry_wait} seconds.  error: #{e.class.to_s} '#{e.to_s}'\n#{e.backtrace.join("\n")}"
    sleep @retry_wait
    Process.kill(:HUP, Process.ppid) # Send SIGHUP to the supervisor to restart fluentd
  rescue SignalException
    $log.debug "signal exception. exception: #{$!.class.to_s}, error: #{$!.to_s}"
    raise
  rescue Exception
    $log.error "unexpected fatal error. exception: #{$!.class.to_s}, error: #{$!.to_s}\n#{$!.backtrace.join("\n")}"
    raise
  end

  def start_kodama(options, &block)
    @kodama_client = Kodama::Client.new(Kodama::Client.mysql_url(options))
    @kodama_client.logger = $log
    block.call(@kodama_client)
    @kodama_client.start
  end

  def event_listener(event)
    @idle_event_detector.notify
    @record_dispatcher.dispatch(event)
  rescue Exception => e
    position = @binlog_position_file.read
    $log.error "error occured while processing #{event.event_type} event at #{position}\n#{e.message}\n#{$!.backtrace.join("\n")}"
    # Not reraising a StandardError because the underlying code can't handle an error well.
    raise unless e.kind_of?(StandardError)
  end

  def shutdown
    @idle_event_detector.stop
    if @thread and @thread.alive?
      $log.info "Requesting stop Kodama"
      @kodama_client.stop_request
      if wait_till_safe_to_stop
        $log.info "Killing Kodama client"
        Thread.kill(@thread)
      else
        $log.error "Unable to stop Kodama"
      end
    end
    @sync_fm.close
  end

  def wait_till_safe_to_stop
    retry_count = 5
    1.upto(retry_count) do |i|
      return true if @kodama_client.safe_to_stop?
      sleep 3
    end
    false
  end

  #Hack: All that has been added here is `Fluent::Engine.shutdown_source`. This should be in
  #fluentd's supervisor#install_main_process_signal_handlers
  def install_custom_signal_handler
      trap :USR1 do
        $log.debug "fluentd main process get SIGUSR1"
        $log.info "force flushing buffered events"
        #@log.reopen!

        # Creating new thread due to mutex can't lock
        # in main thread during trap context
        Thread.new {
          begin
            Fluent::Engine.shutdown_source
            Fluent::Engine.flush!
            $log.debug "flushing thread: flushed"
          rescue Exception => e
            $log.warn "flushing thread error: #{e}"
          end
        }.run
      end
  end
end


end


# HACK
# Monkey patch the class to manage string's character encoding.
module Binlog

class Client
  old_method = instance_method(:wait_for_next_event)

  define_method(:wait_for_next_event) do
    event = old_method.bind(self).()
    if (event.kind_of?(Binlog::RowEvent))
      class << event
        def rows
          rs = super
          # HACK
          # Set string encoding to binary because ruby-binlog has no knowledge
          # about the encoding of strings.
          new_rs = rs.collect {|row|
                     row.collect{|value|
                       if (value.kind_of?(Array))
                         # Update has two rows in it
                         value.collect{|val| val.force_encoding("binary") if val.respond_to?(:force_encoding); val}
                       else
                         value.force_encoding("binary") if value.respond_to?(:force_encoding); value
                       end
                     }
                   }
          new_rs
        end
      end
    end
    event
  end
end

end

# HACK
# Monkey patch so that we can replace Kodama's logger
module Kodama

Client.class_eval do
  attr_accessor :logger
end

end