require 'fiber'
require 'msgpack'
require 'open3'
require 'mysql2'
require 'flydata/sync_file_manager'
require 'flydata/agent_errors'
require 'flydata/compatibility_check'
require 'flydata/table_def'
require 'flydata/output/forwarder'
require 'flydata/parser/mysql/dump_parser'
#require 'ruby-prof'

module Flydata
  module Command
    class Sync < Base
      include Helpers
      INSERT_PROGRESS_INTERVAL = 1000

      # for dump.pos file
      STATUS_PARSING = 'PARSING'
      STATUS_WAITING = 'WAITING'
      STATUS_COMPLETE = 'COMPLETE'

      def self.slop
        Slop.new do
          on 'c', 'skip-cleanup', 'Skip server cleanup'
          on 'y', 'yes', 'Skip command prompt assuming yes to all questions.  Use this for batch operation.'
          on 'd', 'dump-file', 'Dump mysqldump into a file. Use this for debugging after making sure the free space.'
        end
      end

      def run(*tables)
        sender = Flydata::Command::Sender.new
        if (sender.process_exist?)
          if tables.empty?
            # full sync
            puts "FlyData Agent is already running.  If you'd like to restart FlyData Sync from scratch, run 'flydata sync:reset' first."
          else
            # per-table sync
            puts "Flydata Agent is already running.  If you'd like to Sync the table(s), run 'flydata sync:flush' first."
          end
          exit 1
        end

        de = retrieve_data_entry
        de = load_sync_info(override_tables(de, tables))
        validate_initial_sync_status(de, tables)
        flush_buffer_and_stop  unless de['mysql_data_entry_preference']['initial_sync']
        sync_mysql_to_redshift(de)
      end

      def self.slop_flush
        Slop.new do
          on 'y', 'yes', 'Skip command prompt assuming yes to all questions.  Use this for batch operation.'
        end
      end

      def flush
        flush_buffer_and_stop
        puts "Buffers have been flushed and the sender process has been stopped."
      end

      def self.slop_reset
        Slop.new do
          on 'c', 'client', 'Resets client only.'
          on 'y', 'yes', 'Skip command prompt assuming yes to all questions.  Use this for batch operation.'
        end
      end

      def reset(*tables)
        msg = tables.empty? ? '' : " for these tables : #{tables.join(" ")}"
        return unless ask_yes_no("This resets the current sync#{msg}.  Are you sure?")
        sender = Flydata::Command::Sender.new
        sender.flush_client_buffer # TODO We should rather delete buffer files
        sender.stop

        de = retrieve_data_entry
        wait_for_server_buffer
        cleanup_sync_server(de, tables) unless opts.client?
        sync_fm = Flydata::FileUtil::SyncFileManager.new(de)
        delete_files = [
          sync_fm.dump_file_path,
          sync_fm.dump_pos_path,
          sync_fm.mysql_table_marshal_dump_path,
          sync_fm.sync_info_file,
          sync_fm.table_position_file_paths(*tables),
          sync_fm.table_rev_file_paths(*tables)
        ]
        delete_files << sync_fm.binlog_path if tables.empty?
        delete_files.flatten.each do |path|
          FileUtils.rm(path) if File.exists?(path)
        end
        puts "Reset completed successfully."
      end

      def wait_for_server_buffer
        puts "Waiting for the server buffer to get empty"
        while (status = check) && (status['state'] == 'processing')
          print_progress(status)
          sleep 10
        end
      end

      def wait_for_server_data_processing
        state = :PROCESS
        puts "Uploading data to Redshift..."
        sleep 10
        status = nil
        while (status = check)
          if state == :PROCESS && status['state'] == 'uploading'
            puts "  -> Done"
            state = :UPLOAD
            puts "Finishing data upload..."
          end
          print_progress(status)
          sleep 10
        end
        if (state == :PROCESS)
          # :UPLOAD state was skipped due to no data
          puts "  -> Done"
          puts "Finishing data upload..."
        end
        puts "  -> Done"
      end

      def check
        de = retrieve_data_entry
        retry_on(RestClient::Exception) do
          status = do_check(de)
          if status['complete']
            nil
          else
            status
          end
        end
      end

      # skip initial sync
      def skip
        de = retrieve_data_entry
        sync_fm = Flydata::FileUtil::SyncFileManager.new(de)
        binlog_path = sync_fm.binlog_path
        `touch #{binlog_path}`
        puts "Created an empty binlog position file."
        puts "-> #{binlog_path}"
        puts "Run 'flydata start' to start continuous sync."
      end

      def self.slop_generate_table_ddl
        Slop.new do
          on 'c', 'ctl-only', 'Only generate FlyData Control definitions'
          on 'y', 'yes', 'Skip command prompt assuming yes to all questions.  Use this for batch operation.'
        end
      end

      def generate_table_ddl(*tables)
        de = retrieve_data_entry
        dp = flydata.data_port.get
        Flydata::MysqlCompatibilityCheck.new(dp, de['mysql_data_entry_preference']).check
        do_generate_table_ddl(override_tables(de, tables))
      end

      private

      def validate_initial_sync_status(de, tables)
        sync_fm = Flydata::FileUtil::SyncFileManager.new(de)
        dump_pos_info = sync_fm.load_dump_pos
        fp = sync_fm.dump_file_path

        # status is parsing but dumpfile doesn't exist due to streaming -> raise error
        if dump_pos_info[:status] == STATUS_PARSING && !File.exists?(fp)
          raise "FlyData Sync was interrupted with invalid state. Run 'flydata sync:reset#{tables.empty? ? '' : ' ' + tables.join(' ')}' first."
        end
      end

      def retrieve_data_entry
        de = retrieve_data_entries.first
        raise "There are no data entry." unless de
        case de['type']
        when 'RedshiftMysqlDataEntry'
          mp = de['mysql_data_entry_preference']
          if mp['tables_append_only']
            mp['tables'] = (mp['tables'].split(",") + mp['tables_append_only'].split(",")).uniq.join(",")
          end
        else
          raise "No supported data entry. Only mysql-redshift sync is supported."
        end
        de
      end

      def cleanup_sync_server(de, tables = [])
        print "Cleaning the server."
        worker = Thread.new do
          begin
            flydata.data_entry.cleanup_sync(de['id'], tables)
          rescue RestClient::RequestTimeout
            # server is taking time to cleanup.  Try again
            retry
          end
        end
        until worker.join(5)
          print "."
        end
        puts
        puts "Done."
      end

      def do_check(de)
        flydata.data_entry.buffer_stat(de['id'], env_mode)
      end

      def print_progress(buffer_stat)
        message = buffer_stat['message']
        puts message unless message.nil? || message.empty?
      end

      DDL_DUMP_CMD_TEMPLATE = "MYSQL_PWD=\"%s\" mysqldump --protocol=tcp -d -h %s -P %s -u %s %s %s"
      def do_generate_table_ddl(de)
        if `which mysqldump`.empty?
          raise "mysqldump is not installed.  mysqldump is required to run the command"
        end

        error_list = []

        schema_name = (de['schema_name'] || nil)

        mp = de['mysql_data_entry_preference']
        params = []
        params << mp['password']
        if !mp['host'].empty? then params << mp['host'] else raise "MySQL `host` is neither defined in the data entry nor the local config file" end
        params << (mp['port'] or '3306')
        if !mp['username'].empty? then params << mp['username'] else raise "MySQL `username` is neither defined in the data entry nor the local config file" end
        if !mp['database'].empty? then params << mp['database'] else raise "`database` is neither defined in the data entry nor the local config file" end
        if !mp['tables'].empty? then params << mp['tables'].gsub(/,/, ' ') else raise "`tables` (or `tables_append_only`) is neither defined in the data entry nor the local config file" end

        command = DDL_DUMP_CMD_TEMPLATE % params

        Open3.popen3(command) do |stdin, stdout, stderr|
          stdin.close
          stdout.set_encoding("utf-8") # mysqldump output must be in UTF-8
          create_flydata_ctl_table = mp['initial_sync']
          while !stdout.eof?
            begin
              mysql_tabledef = Flydata::TableDef::MysqlTableDef.create(stdout)
            rescue TableDefError => e
              error_list << e.err_hash
              next
            end
            if mysql_tabledef.nil?
              # stream had no more create table definition
              break
            end
            flydata_tabledef = mysql_tabledef.to_flydata_tabledef
            puts Flydata::TableDef::RedshiftTableDef.from_flydata_tabledef(flydata_tabledef, flydata_ctl_table: create_flydata_ctl_table, schema_name: schema_name, ctl_only: opts.ctl_only?)
            create_flydata_ctl_table = false
          end
          errors = ""
          while !stderr.eof?
            line = stderr.gets.gsub('mysqldump: ', '')
            errors << line unless /Warning: Using a password on the command line interface can be insecure./ === line
          end
          raise errors unless errors.empty?
        end
        unless error_list.empty?
          $stderr.puts "We have noticed the following error(s):"
          group_error =  error_list.group_by {|d| d[:error]}
          group_error.each_key do |a|
            $stderr.puts "The following table(s) have #{a}:"
            group_error[a].each do |hash|
              $stderr.puts "  - #{hash[:table]}" if hash[:table]
            end
          end
          $stderr.puts "Please fix the above error(s) and try again."
        end
      end

      def sync_mysql_to_redshift(de)
        dp = flydata.data_port.get
        sync_fm = Flydata::FileUtil::SyncFileManager.new(de)

        # Check client condition
        if File.exists?(sync_fm.binlog_path) and de['mysql_data_entry_preference']['initial_sync']
          raise "Already synchronized. If you want to do initial sync, run 'flydata sync:reset'"
        end

        # Copy template if not exists
        unless Flydata::Preference::DataEntryPreference.conf_exists?(de)
          Flydata::Command::Conf.new.copy_templates
        end

        generate_mysqldump(de, sync_fm, opts.dump_file?) do |mysqldump_io, db_bytesize|
          sync_fm.save_sync_info(de['mysql_data_entry_preference']['initial_sync'], de['mysql_data_entry_preference']['tables'])
          parse_mysqldump_and_send(mysqldump_io, dp, de, sync_fm, db_bytesize)
        end
        wait_for_mysqldump_processed(dp, de, sync_fm)
        complete
      end

      def generate_mysqldump(de, sync_fm, file_dump = true, &block)
        # validate parameter
        %w(host username database).each do |k|
          if de['mysql_data_entry_preference'][k].to_s.empty?
            raise "'#{k}' is required. Set the value in the conf file " +
                  "-> #{Flydata::Preference::DataEntryPreference.conf_path(de)}"
          end
        end

        # Status is waiting or complete -> skip dump and parse
        dump_pos_info = sync_fm.load_dump_pos
        return if dump_pos_info[:status] == STATUS_WAITING || dump_pos_info[:status] == STATUS_COMPLETE

        # mysqldump file exists -> skip dump
        dp = flydata.data_port.get
        fp = sync_fm.dump_file_path
        if file_dump && File.exists?(fp) && File.size(fp) > 0
          puts "  -> Skip"
          return call_block_or_return_io(fp, &block)
        end

        tables = de['mysql_data_entry_preference']['tables']
        tables ||= '<all tables>'
        data_servers = de['mysql_data_entry_preference']['data_servers'] ? "\n  data servers: #{de['mysql_data_entry_preference']['data_servers']}" : ""

        confirmation_text = <<-EOM

FlyData Sync will start synchonizing the following database tables
  host:         #{de['mysql_data_entry_preference']['host']}
  port:         #{de['mysql_data_entry_preference']['port']}
  username:     #{de['mysql_data_entry_preference']['username']}
  database:     #{de['mysql_data_entry_preference']['database']}
  tables:       #{tables}#{data_servers}
EOM
        confirmation_text << <<-EOM if file_dump
  dump file:    #{fp}

Dump file saves contents of your tables temporarily.  Make sure you have enough disk space.
EOM
        print confirmation_text

        if ask_yes_no('Start Sync?')
          puts "Checking database size(not same as mysqldump data size)..."
          db_bytesize = Flydata::Parser::Mysql::DatabaseSizeCheck.new(de['mysql_data_entry_preference']).get_db_bytesize
          puts "  -> #{as_size(db_bytesize)} (#{db_bytesize} byte)"
          puts "Exporting data from database."
          puts "This process can take hours depending on data size and load on your database.  Please be patient..."
          Flydata::MysqlCompatibilityCheck.new(dp, de['mysql_data_entry_preference'], fp).check
          if file_dump
            Flydata::Parser::Mysql::MysqlDumpGeneratorNoMasterData.
              new(de['mysql_data_entry_preference']).dump(fp)
            puts "  -> Done"
            call_block_or_return_io(fp, &block)
          else
            Flydata::Parser::Mysql::MysqlDumpGeneratorNoMasterData.
              new(de['mysql_data_entry_preference']).dump {|io| block.call(io, db_bytesize)}
          end
        else
          newline
          puts "You can change the dump file path with 'mysqldump_path' property in the following conf file."
          puts
          puts "    #{Flydata::Preference::DataEntryPreference.conf_path(de)}"
          puts
          return nil
        end
      end

      def call_block_or_return_io(fp, &block)
        if block
          f_io = open_file_io(fp)
          begin
            block.call(f_io)
            return nil
          ensure
            f_io.close rescue nil
          end
        end
        return open_file_io(fp)
      end

      def open_file_io(file_path)
        File.open(file_path, 'r', encoding: "utf-8")
      end

      # Checkpoint
      # -- CHANGE MASTER TO MASTER_LOG_FILE='mysql-bin.000215', MASTER_LOG_POS=120;
      #   <- checkpoint(after binlog)
      #...
      #CREATE TABLE `accounts` (
      #...
      #) ENGINE=InnoDB AUTO_INCREMENT=71 DEFAULT CHARSET=utf8;
      #   <- checkpoint(after create table)
      #INSERT INTO `accounts` values (x,x,x),(y,y,y),....();
      #INSERT INTO `accounts` values (x,x,x),(y,y,y),....();
      #INSERT INTO `accounts` values (x,x,x),(y,y,y),....();
      #   <- checkpoint(when buffered data is sent to server)
      #INSERT INTO `accounts` values (x,x,x),(y,y,y),....();
      #INSERT INTO `accounts` values (x,x,x),(y,y,y),....();
      #
      #...
      #UNLOCK TABLES;
      #   <- checkpoint
      #...
      #CREATE TABLE ...
      def parse_mysqldump_and_send(mysqldump_io, dp, de, sync_fm, db_bytesize = nil)
        # Prepare forwarder
        de_tag_name = de["tag_name#{env_suffix}"]
        server_port = dp['server_port']
        servers = if de['mysql_data_entry_preference']['data_servers']
                    de['mysql_data_entry_preference']['data_servers'].split(',')
                  else
                    dp["servers#{env_suffix}"].collect{|s| "#{s}:#{server_port}"}
                  end
        forwarder_type = de['mysql_data_entry_preference']['forwarder'] ||
          (dp['ssl_enabled'] ? 'sslforwarder' : 'tcpforwarder')
        forwarder = Flydata::Output::ForwarderFactory.create(forwarder_type, de_tag_name, servers)

        # Load dump.pos file for resume
        dump_pos_info = sync_fm.load_dump_pos
        option = dump_pos_info || {}
        if option[:table_name] && option[:last_pos].to_i != -1
          puts "Resuming... Last processed table: #{option[:table_name]}"
        else
          #If its a new sync, ensure server side resources are clean
          cleanup_sync_server(de, de['mysql_data_entry_preference']['tables'].split(',')) unless opts.skip_cleanup?
        end
        puts "Sending data to FlyData Server..."

        bench_start_time = Time.now

        # Start parsing dump file
        tmp_num_inserted_record = 0
        dump_fp = sync_fm.dump_file_path
        dump_file_size = File.exists?(dump_fp) ? File.size(dump_fp) : 1
        binlog_pos = Flydata::Parser::Mysql::MysqlDumpParser.new(option).parse(
          mysqldump_io,
          # create table
          Proc.new { |mysql_table|
            tmp_num_inserted_record = 0
            # dump mysql_table for resume
            #TODO: make it option
            sync_fm.save_mysql_table_marshal_dump(mysql_table)
          },
          # insert record
          Proc.new { |mysql_table, values_set|
            mysql_table_name = mysql_table.table_name
            records = values_set.collect do |values|
              json = generate_json(mysql_table, values)
              {table_name: mysql_table_name, log: json}
            end
            ret = forwarder.emit(records)
            tmp_num_inserted_record += 1
            ret
          },
          # checkpoint
          Proc.new { |mysql_table, last_pos, bytesize, binlog_pos, state, substate|
            # flush if buffer records exist
            if tmp_num_inserted_record > 0 && forwarder.buffer_record_count > 0
              forwarder.flush # send buffer data to the server before checkpoint
            end

            # show the current progress
            if last_pos.to_i == -1   # stream dump
              puts "  -> #{as_size(bytesize)} (#{bytesize} byte) completed..."
            else
              puts "  -> #{(last_pos.to_f/dump_file_size * 100).round(1)}% (#{last_pos}/#{dump_file_size}) completed..."
            end

            # save check point
            table_name = mysql_table.nil? ? '' : mysql_table.table_name
            sync_fm.save_dump_pos(STATUS_PARSING, table_name, last_pos, binlog_pos, state, substate)
          }
        )
        forwarder.close
        puts "  -> Done"
        sync_fm.save_dump_pos(STATUS_WAITING, '', dump_file_size, binlog_pos)

        if ENV['FLYDATA_BENCHMARK']
          bench_end_time = Time.now
          elapsed_time = bench_end_time.to_i - bench_start_time.to_i
          puts "Elapsed:#{elapsed_time}sec start:#{bench_start_time} end:#{bench_end_time}"
        end
      end

      def wait_for_mysqldump_processed(dp, de, sync_fm)
        return if ENV['FLYDATA_BENCHMARK']

        # Status is not waiting -> skip waiting
        dump_pos_info = sync_fm.load_dump_pos
        return unless dump_pos_info[:status] == STATUS_WAITING
        binlog_pos = dump_pos_info[:binlog_pos]

        wait_for_server_data_processing
        tables = de['mysql_data_entry_preference']['tables'].split(',').join(' ')
        sync_fm.save_table_binlog_pos(tables, binlog_pos)
        sync_fm.save_dump_pos(STATUS_COMPLETE, '', -1, binlog_pos)
      end

      ALL_DONE_MESSAGE_TEMPLATE =  <<-EOM

Congratulations! FlyData has started synchronizing your database tables.

What's next?

    - Check data on Redshift (%s)
    - Check your FlyData usage on the FlyData Dashboard (%s)
    - To manage the FlyData Agent, use the 'flydata' command (type 'flydata' for help)
    - If you encounter an issue,
        please check our documentation (https://www.flydata.com/docs/) or
        contact our customer support team (support@flydata.com)

Thank you for using FlyData!
      EOM

      def complete
        de = load_sync_info(retrieve_data_entry)
        sync_fm = Flydata::FileUtil::SyncFileManager.new(de)
        info = sync_fm.load_dump_pos
        if info[:status] == STATUS_COMPLETE
          puts "Starting FlyData Agent..."
          if de['mysql_data_entry_preference']['initial_sync']
            sync_fm.save_binlog(info[:binlog_pos])
          end
          sync_fm.move_table_binlog_files(de['mysql_data_entry_preference']['tables'].split(','))
          sync_fm.reset_table_position_files(de['mysql_data_entry_preference']['tables'].split(','))
          sync_fm.backup_dump_dir
          Flydata::Command::Sender.new.start(quiet: true)
          puts "  -> Done"

          dashboard_url = "#{flydata.flydata_api_host}/dashboard"
          redshift_console_url = "#{flydata.flydata_api_host}/redshift_clusters/query/new"
          last_message = ALL_DONE_MESSAGE_TEMPLATE % [redshift_console_url, dashboard_url]
          puts last_message
        else
          raise "Initial sync status is not complete. Try running 'flydata sync'."
        end
      end

      def generate_json(mysql_table, values)
        h = {}
        mysql_table.columns.each_key.with_index do |k, i|
          h[k] = values[i]
        end
        h.to_json
      end

      def override_tables(de, tables)
        de['mysql_data_entry_preference']['initial_sync'] = tables.empty?
        if ! de['mysql_data_entry_preference']['initial_sync']
          de['mysql_data_entry_preference']['tables'] = tables.join(',')
        end
        de
      end

      def load_sync_info(de)
        sync_fm = Flydata::FileUtil::SyncFileManager.new(de)
        mp = de['mysql_data_entry_preference']
        unless (rs = sync_fm.load_sync_info).nil?
          mp['initial_sync'] = rs[:initial_sync]
          mp['tables'] = rs[:tables]
        end
        de
      end

      def flush_buffer_and_stop
        sender = Flydata::Command::Sender.new
        sender.flush_client_buffer
        wait_for_server_data_processing
        sender.stop(quiet: true)
      end
    end
  end
end