require 'msgpack'
require 'open3'
require 'flydata/sync_file_manager'
#require 'ruby-prof'

module Flydata
  module Command
    class Sync < Base
      include Helpers
      CREATE_TABLE_OPTION = !!(ENV['FLYDATA_CREATE_TABLE_OPTION']) || false
      INSERT_PROGRESS_INTERVAL = 1000

      # for dump.pos file
      STATUS_PARSING = 'PARSING'
      STATUS_COMPLETE = 'COMPLETE'

      def run
        de = retrieve_data_entries.first
        raise "There are no data entry." unless de
        case de['type']
        when 'RedshiftMysqlDataEntry'
          sync_mysql_to_redshift(de)
        else
          raise "No supported data entry. Only mysql-redshift sync is supported."
        end
      end

      def reset
        de = retrieve_data_entries.first
        sync_fm = Flydata::FileUtil::SyncFileManager.new(de)
        [sync_fm.dump_file_path, sync_fm.dump_pos_path, sync_fm.binlog_path, sync_fm.mysql_table_marshal_dump_path, sync_fm.table_position_file_paths].flatten.each do |path|
          FileUtils.rm(path) if File.exists?(path)
        end
      end

      def check
        de = retrieve_data_entries.first
        ret = do_check(de)
        if ret['complete']
          puts "No buffer data on FlyData. #{ret.inspect}"
          true
        else
          puts "Now processing data on FlyData. #{ret.inspect}"
          false
        end
      end

      def complete
        de = retrieve_data_entries.first
        sync_fm = Flydata::FileUtil::SyncFileManager.new(de)
        info = sync_fm.load_dump_pos
        if info[:status] == STATUS_COMPLETE
          sync_fm.save_binlog(info[:binlog_pos])
          Flydata::Command::Sender.new.start
        else
          raise "Initial sync status is not complete. Try running 'flydata sync'."
        end
      end

      # skip initial sync
      def skip
        de = retrieve_data_entries.first
        sync_fm = Flydata::FileUtil::SyncFileManager.new(de)
        binlog_path = sync_fm.binlog_path
        `touch #{binlog_path}`
        puts "Created an empty binlog position file."
        puts "-> #{binlog_path}"
        puts "Run 'flydata start' to start continuous sync."
      end

      private

      def do_check(de)
        flydata.data_entry.buffer_stat(de['id'], env_mode)
      end

      def sync_mysql_to_redshift(de)
        dp = flydata.data_port.get
        sync_fm = Flydata::FileUtil::SyncFileManager.new(de)

        # Check client condition
        if File.exists?(sync_fm.binlog_path)
          raise "Already synchronized. If you want to do initial sync, delete #{sync_fm.binlog_path}."
        end

        # Copy template if not exists
        unless Flydata::Preference::DataEntryPreference.conf_exists?(de)
          Flydata::Command::Conf.new.copy_templates
        end

        if generate_mysqldump(de, sync_fm)
          parse_mysqldump(dp, de, sync_fm)
        end
      end

      def generate_mysqldump(de, sync_fm, overwrite = false)

        # validate parameter
        %w(host username database).each do |k|
          if de['mysql_data_entry_preference'][k].to_s.empty?
            raise "'#{k}' is required. Set the value in the conf file " +
                  "-> #{Flydata::Preference::DataEntryPreference.conf_path(de)}"
          end
        end

        puts "Running mysqldump... host:#{de['mysql_data_entry_preference']['host']} " +
             "username:#{de['mysql_data_entry_preference']['username']} " +
             "database:#{de['mysql_data_entry_preference']['database']}"
        if de['mysql_data_entry_preference']['data_servers']
          puts "Send to Custom Data Servers: #{de['mysql_data_entry_preference']['data_servers']}"
        end

        if de['mysql_data_entry_preference']['tables']
          puts "  target tables: #{de['mysql_data_entry_preference']['tables']}"
        else
          puts "  target tables: <all-tables>"
        end

        fp = sync_fm.dump_file_path
        if File.exists?(fp) and File.size(fp) > 0 and not overwrite
          puts "  -> Skip"
          return fp
        end

        puts "[Confirm] mysqldump path: #{fp}"
        if ask_yes_no('OK?')
          Flydata::Mysql::MysqlDumpGenerator.new(de['mysql_data_entry_preference']).dump(fp)
        else
          newline
          puts "You can change the mysqldump path with 'mysqldump_path' in the conf file."
          puts "Edit '#{Flydata::Preference::DataEntryPreference.conf_path(de)}'"
          return nil
        end
        puts "  -> Done"
        fp
      end

      # Checkpoint
      # -- CHANGE MASTER TO MASTER_LOG_FILE='mysql-bin.000215', MASTER_LOG_POS=120;
      #   <- checkpoint(after binlog)
      #...
      #CREATE TABLE `accounts` (
      #...
      #) ENGINE=InnoDB AUTO_INCREMENT=71 DEFAULT CHARSET=utf8;
      #   <- checkpoint(after create table)
      #INSERT INTO `accounts` values (x,x,x),(y,y,y),....();
      #INSERT INTO `accounts` values (x,x,x),(y,y,y),....();
      #INSERT INTO `accounts` values (x,x,x),(y,y,y),....();
      #   <- checkpoint(when buffered data is sent to server)
      #INSERT INTO `accounts` values (x,x,x),(y,y,y),....();
      #INSERT INTO `accounts` values (x,x,x),(y,y,y),....();
      #
      #...
      #UNLOCK TABLES;
      #   <- checkpoint
      #...
      #CREATE TABLE ...
      def parse_mysqldump(dp, de, sync_fm)
        puts "Parsing mysqldump file..."

        # Prepare forwarder
        de_tag_name = de["tag_name#{env_suffix}"]
        server_port = dp['server_port']
        servers = if de['mysql_data_entry_preference']['data_servers']
                    de['mysql_data_entry_preference']['data_servers'].split(',')
                  else
                    dp["servers#{env_suffix}"].collect{|s| "#{s}:#{server_port}"}
                  end
        forwarder_type = de['mysql_data_entry_preference']['forwarder'] ||
          (dp['ssl_enabled'] ? 'sslforwarder' : 'tcpforwarder')
        forwarder = Flydata::Output::ForwarderFactory.create(forwarder_type, de_tag_name, servers)

        # Load dump.pos file for resume
        dump_pos_info = sync_fm.load_dump_pos
        option = dump_pos_info || {}
        if option[:table_name]
          puts "Resuming... Last processed table: #{option[:table_name]}"
        end

        bench_start_time = Time.now

        # Start parsing dump file
        tmp_num_inserted_record = 0
        dump_fp = sync_fm.dump_file_path
        dump_file_size = File.size(dump_fp)
        binlog_pos = Flydata::Mysql::MysqlDumpParser.new(dump_fp, option).parse(
          # create table
          Proc.new { |mysql_table|
            redshift_table = Flydata::Mysql::RedshiftTableAdapter.new(mysql_table)
            mysql_table.set_adapter(:redshift, redshift_table)

            tmp_num_inserted_record = 0

            if CREATE_TABLE_OPTION
              print "- Creating table: #{redshift_table.table_name}"
              sql = redshift_table.create_table_sql
              ret = flydata.redshift_cluster.run_query(sql)
              if ret['message'].index('ERROR:')
                if ret['message'].index('already exists')
                  puts " -> Skip"
                else
                  raise "Failed to create table. error=#{ret['message']}"
                end
              else
                puts " -> OK"
              end
            else
              puts "- Parsing table: #{mysql_table.table_name}"
            end

            # dump mysql_table for resume
            sync_fm.save_mysql_table_marshal_dump(mysql_table)
          },
          # insert record
          Proc.new { |mysql_table, values_set|
            mysql_table_name = mysql_table.table_name
            records = values_set.collect do |values|
              json = generate_json(mysql_table, values)
              {table_name: mysql_table_name, log: json}
            end
            ret = forwarder.emit(records)
            tmp_num_inserted_record += 1
            print '.'
            ret
          },
          # checkpoint
          Proc.new { |mysql_table, last_pos, binlog_pos, state, substate|
            # flush if buffer records exist
            if tmp_num_inserted_record > 0 && forwarder.buffer_record_count > 0
              puts
              forwarder.flush # send buffer data to the server before checkpoint
            end

            # show the current progress
            puts "     #{(last_pos.to_f/dump_file_size * 100).round(1)}% (#{last_pos}/#{dump_file_size}) #{Time.now.to_i - bench_start_time.to_i}sec"

            # save check point
            table_name = mysql_table.nil? ? '' : mysql_table.table_name
            sync_fm.save_dump_pos(STATUS_PARSING, table_name, last_pos, binlog_pos, state, substate)
          }
        )
        forwarder.close

        if ENV['FLYDATA_BENCHMARK']
          puts "Done!"
          bench_end_time = Time.now
          elapsed_time = bench_end_time.to_i - bench_start_time.to_i
          puts "Elapsed:#{elapsed_time}sec start:#{bench_start_time} end:#{bench_end_time}"
          return true
        end

        # wait until finish
        puts "Start waiting until all data is processed on FlyData..."
        sleep 10
        until check
          sleep 10
        end

        sync_fm.save_dump_pos(STATUS_COMPLETE, '', dump_file_size, binlog_pos)
        puts "Congratulations! All data is processed on FlyData. Please check tables and data on your Redshift Cluster."
        puts "After checking, run 'flydata sync:complete' to start continuously synchronization."
      end

      def generate_json(mysql_table, values)
        h = {}
        mysql_table.columns.each_key.with_index do |k, i|
          h[k] = values[i]
        end
        h.to_json
      end
    end
  end

  module Output
    class ForwarderFactory

      def self.create(forwarder_key, tag, servers, options = {})
        case forwarder_key
        when nil, "tcpforwarder"
          puts "Creating TCP connection"
          forward = TcpForwarder.new(tag, servers, options)
        when "sslforwarder"
          puts "Creating SSL connection"
          forward = SslForwarder.new(tag, servers, options)
        else
          raise "Unsupported Forwarding type #{forwarder_key}"
        end
        forward
      end

    end
    class TcpForwarder
      FORWARD_HEADER = [0x92].pack('C')
      BUFFER_SIZE = 1024 * 1024 * 32 # 32M
      DEFUALT_SEND_TIMEOUT = 60 # 1 minute
      RETRY_INTERVAL = 2
      RETRY_LIMIT = 10

      def initialize(tag, servers, options = {})
        @tag = tag
        unless servers and servers.kind_of?(Array) and not servers.empty?
          raise "Servers must not be empty."
        end
        @servers = servers
        @server_index = 0
        set_options(options)
        reset
      end

      def set_options(options)
        if options[:buffer_size_limit]
          @buffer_size_limit = options[:buffer_size_limit]
        else
          @buffer_size_limit = BUFFER_SIZE
        end
      end

      attr_reader :buffer_record_count, :buffer_size

      def emit(records, time = Time.now.to_i)
        records = [records] unless records.kind_of?(Array)
        records.each do |record|
          event_data = [time,record].to_msgpack
          @buffer_records << event_data
          @buffer_record_count += 1
          @buffer_size += event_data.bytesize
        end
        if @buffer_size > @buffer_size_limit
          send
        else
          false
        end
      end

      #TODO retry logic
      def send
        if @buffer_size > 0
          puts "  -> Sending #{@buffer_record_count}records #{@buffer_size}byte"
        else
          return false
        end
        if ENV['FLYDATA_BENCHMARK']
          reset
          return true
        end
        sock = nil
        retry_count = 0
        begin
          sock = connect(pickup_server)

          # Write header
          sock.write FORWARD_HEADER
          # Write tag
          sock.write @tag.to_msgpack
          # Write records
          sock.write [0xdb, @buffer_records.bytesize].pack('CN')
          StringIO.open(@buffer_records) do |i|
            FileUtils.copy_stream(i, sock)
          end
        rescue => e
          retry_count += 1
          if retry_count > RETRY_LIMIT
            puts "! Error: Failed to send data. Exceeded the retry limit. retry_count:#{retry_count}"
            raise e
          end
          puts "! Warn: Retring to send data. retry_count:#{retry_count} error=#{e.to_s}"
          wait_time = RETRY_INTERVAL ** retry_count
          puts "  Now waiting for next retry. time=#{wait_time}sec"
          sleep wait_time
          retry
        ensure
          if sock
            sock.close rescue nil
          end
        end
        reset
        true
      end

      #TODO: Check server status
      def pickup_server
        ret_server = @servers[@server_index]
        @server_index += 1
        if @server_index >= (@servers.count)
          @server_index = 0
        end
        ret_server
      end

      def connect(server)
        host, port = server.split(':')
        sock = TCPSocket.new(host, port.to_i)

        # Set options
        opt = [1, DEFUALT_SEND_TIMEOUT].pack('I!I!')
        sock.setsockopt(Socket::SOL_SOCKET, Socket::SO_LINGER, opt)
        opt = [DEFUALT_SEND_TIMEOUT, 0].pack('L!L!')
        sock.setsockopt(Socket::SOL_SOCKET, Socket::SO_SNDTIMEO, opt)

        sock
      end

      def reset
        @buffer_records = ''
        @buffer_record_count = 0
        @buffer_size = 0
      end

      def flush
        send
      end

      def close
        flush
      end
    end
    class SslForwarder < TcpForwarder
      def connect(server)
        tcp_sock = super
        ssl_ctx = ssl_ctx_with_verification
        ssl_sock = OpenSSL::SSL::SSLSocket.new(tcp_sock, ssl_ctx)
        ssl_sock.sync_close = true
        ssl_sock.connect
        ssl_sock
      end

      private
      def ssl_ctx_with_verification
        cert_store = OpenSSL::X509::Store.new
        cert_store.set_default_paths
        ssl_ctx = OpenSSL::SSL::SSLContext.new
        ssl_ctx.verify_mode = OpenSSL::SSL::VERIFY_PEER
        ssl_ctx.cert_store = cert_store
        ssl_ctx
      end
    end
  end

  module Redshift
    module Util
      MAX_TABLENAME_LENGTH = 127
      REDSHIFT_RESERVED_WORDS = %w[
aes128 aes256 all allowoverwrite analyse analyze and any array
as asc authorization backup between binary blanksasnull both
bytedict case cast check collate column constraint create
credentials cross current_date current_time current_timestamp
current_user current_user_id default deferrable deflate defrag
delta delta32k desc disable distinct do else emptyasnull enable
encode encrypt encryption end except explicit false for foreign
freeze from full globaldict256 globaldict64k grant group gzip having
identity ignore ilike in initially inner intersect into is isnull
join leading left like limit localtime localtimestamp lun luns
minus mostly13 mostly32 mostly8 natural new not notnull null nulls
off offline offset old on only open or order outer overlaps parallel
partition percent placing primary raw readratio recover references
rejectlog resort restore right select session_user similar some
sysdate system table tag tdes text255 text32k then to top trailing
true truncatecolumns union unique user using verbose wallet when
where with without]
      # Create a symbol-keyed hash for performance
      REDSHIFT_RESERVED_WORDS_HASH = REDSHIFT_RESERVED_WORDS.inject({}) {|h, word| h[word.to_sym] = true; h}

      REDSHIFT_SYSTEM_COLUMNS = %w[oid tableoid xmin cmin xmax cmax ctid]
      REDSHIFT_SYSTEM_COLUMNS_HASH = REDSHIFT_SYSTEM_COLUMNS.inject({}) {|h, word| h[word.to_sym] = true; h}

      def convert_to_valid_name(key, type = :table)
        @memo ||= { table:{}, column:{} }
        key_sym = key.to_sym
        return @memo[type][key_sym] if @memo[type][key_sym]

        name = key.downcase.gsub(/[^a-z0-9_$]/, '_')
        name = "_#{name}" if is_redshift_reserved_word?(name, type) or name =~ /^[0-9$]/
        if name.length > MAX_TABLENAME_LENGTH
          name = nil
        end
        @memo[key_sym] = name
        name
      end

      def is_redshift_reserved_word?(name, type = :table)
        return false unless name
        return true if REDSHIFT_RESERVED_WORDS_HASH[name.to_sym] == true

        case type
        when :table
          false
        when :column
          REDSHIFT_SYSTEM_COLUMNS_HASH[name.to_sym] == true
        else
          false
        end
      end
    end
  end

  module Mysql
    class MysqlTable
      def initialize(table_name, columns = {}, primary_keys = [])
        @table_name = table_name
        @columns = columns
        @primary_keys = primary_keys
        @adapters = {}
      end

      attr_accessor :table_name, :columns, :primary_keys

      def add_column(column)
        @columns[column[:column_name]] = column
      end

      def set_adapter(key, adapter)
        @adapters[key] = adapter
      end

      def adapter(key)
        @adapters[key]
      end
    end

    class RedshiftTableAdapter
      include Flydata::Redshift::Util
      def initialize(mysql_table)
        @table_name = convert_to_valid_name(mysql_table.table_name)
        set_columns(mysql_table.columns)
        @primary_keys = mysql_table.primary_keys
      end

      attr_reader :table_name, :columns, :primary_keys

      def create_table_sql
        col_def = @columns.inject([]) { |list, (cn, column)|
          list << build_column_def(column)
          list
        }
        if @primary_keys.count > 0
          col_def << "primary key (#{@primary_keys.join(',')})"
        end
        <<EOT
CREATE TABLE #{@table_name} (#{col_def.join(',')});
EOT
      end

      private

      def set_columns(columns)
        @columns = {}
        columns.each do |k, column|
          new_k = convert_to_valid_name(k, :column)
          new_column = column.dup
          new_column[:column_name] = new_k
          @columns[new_k] = convert_column_format_type(new_column)
        end
      end

      # Mysql Field Types
      # http://help.scibit.com/mascon/masconMySQL_Field_Types.html
      def convert_column_format_type(column)
        ret_c = {}.merge(column)
        ret_c.delete(:format_type_str)
        ret_c[:format_type] = case column[:format_type]
                                   when 'tinyint'
                                     'smallint'
                                   when 'smallint'
                                     column[:unsigned] ? 'integer' : 'smallint'
                                   when 'mediumint'
                                     'integer'
                                   when 'int', 'integer'
                                     column[:unsigned] ? 'bigint' : 'integer'
                                   when 'bigint'
                                     # max unsigned bigint is 18446744073709551615
                                     column[:unsigned] ? 'decimal(20,0)' : 'bigint'
                                   when 'float'
                                     'real'
                                   when 'double', 'double precision', 'real'
                                     'double precision'
                                   when 'decimal', 'numeric'
                                     ret_c[:format_type_str] = "decimal(#{column[:decimal_precision]},#{column[:decimal_scale]})"
                                     'decimal'
                                   when 'date'
                                     'date'
                                   when 'datetime'
                                     'timestamp'
                                   when 'time'
                                     'timestamp' #TODO: redshift does not support time only column type
                                   when 'year'
                                     'smallint'
                                   when 'char'
                                     ret_c[:format_type_str] = "char(#{column[:format_size]})"
                                     'char'
                                   when 'varchar'
                                     ret_c[:format_type_str] = "varchar(#{column[:format_size]})"
                                     'varchar'
                                   when 'tinyblob','tinytext'
                                     ret_c[:format_size] = 255
                                     ret_c[:format_type_str] = "varchar(#{ret_c[:format_size]})"
                                     'varchar'
                                   when 'blob','text', 'mediumblob', 'mediumtext', 'longblob', 'longtext'
                                     ret_c[:format_size] = 65535 #TODO: review
                                     ret_c[:format_type_str] = "varchar(#{ret_c[:format_size]})"
                                     'varchar'
                                   else
                                     #TODO: discuss
                                     'varchar'
                                   end
        ret_c
      end

      def build_column_def(column)
        format_type = column[:format_type]
        format_type = column[:format_type_str] if column[:format_type_str]
        def_str = "#{column[:column_name]} #{format_type}"
        if column[:not_null]
          def_str << " not null"
        elsif column.has_key?(:default)
          val = column[:default]
          val = val.nil? ? 'null' : "'#{val}'"
          def_str << " default #{val}"
        end
        def_str
      end

    end

    class MysqlDumpGenerator
      # host, port, username, password, database, tables
      MYSQL_DUMP_CMD_TEMPLATE = "mysqldump -h %s -P %s -u%s %s --skip-lock-tables --single-transaction --flush-logs --hex-blob --master-data=2 %s %s"
      def initialize(conf)
        password = conf['password'].to_s.empty? ? "" : "-p#{conf['password']}"
        tables = if conf['tables']
                   conf['tables'].split(',').join(' ')
                 else
                   ''
                 end
        @dump_cmd = MYSQL_DUMP_CMD_TEMPLATE %
          [conf['host'], conf['port'], conf['username'], password, conf['database'], tables]
      end
      def dump(file_path)
        cmd = "#{@dump_cmd} > #{file_path}"
        o, e, s = Open3.capture3(cmd)
        e.to_s.each_line {|l|  puts l unless /^Warning:/ =~ l } unless e.to_s.empty?
        unless s.exitstatus == 0
          if File.exists?(file_path)
            File.open(file_path, 'r') {|f| f.each_line{|l| puts l}}
            FileUtils.rm(file_path)
          end
          raise "Failed to run mysqldump command."
        end
        unless File.exists?(file_path)
          raise "mysqldump file does not exist. Something wrong..."
        end
        if File.size(file_path) == 0
          raise "mysqldump file is empty. Something wrong..."
        end
        true
      end
    end

    class MysqlDumpParser

      module State
        START = 'START'
        CREATE_TABLE = 'CREATE_TABLE'
        CREATE_TABLE_COLUMNS = 'CREATE_TABLE_COLUMNS'
        CREATE_TABLE_CONSTRAINTS = 'CREATE_TABLE_CONSTRAINTS'
        INSERT_RECORD = 'INSERT_RECORD'
        PARSING_INSERT_RECORD = 'PARSING_INSERT_RECORD'
      end

      attr_accessor :binlog_pos

      def initialize(file_path, option = {})
        @file_path = file_path
        raise "Dump file does not exist. file_path:#{file_path}" unless File.exist?(file_path)
        @binlog_pos = option[:binlog_pos]
        @option = option
      end

      def parse(create_table_block, insert_record_block, check_point_block)
        invalid_file = false
        current_state = State::START
        substate = nil

        state_start = Proc.new do |f|
          line = f.readline.strip
          # -- CHANGE MASTER TO MASTER_LOG_FILE='mysql-bin.000002', MASTER_LOG_POS=120;
          m = /^\-\- CHANGE MASTER TO MASTER_LOG_FILE='(?<binfile>[^']+)', MASTER_LOG_POS=(?<pos>\d+)/.match(line)
          if m
            @binlog_pos = {binfile: m[:binfile], pos: m[:pos].to_i}
            current_state = State::CREATE_TABLE
            check_point_block.call(nil, f.pos, @binlog_pos, current_state)
          end
        end

        current_table = nil
        state_create_table = Proc.new do |f|
          line = f.readline.strip
          # CREATE TABLE `active_admin_comments` (
          m = /^CREATE TABLE `(?<table_name>[^`]+)`/.match(line)
          if m
            current_table = MysqlTable.new(m[:table_name])
            current_state = State::CREATE_TABLE_COLUMNS
          end
        end

        state_create_table_constraints = Proc.new do |f|
          line = f.readline.strip
          #  PRIMARY KEY (`id`),
          if line.start_with?(')')
            create_table_block.call(current_table)
            current_state = State::INSERT_RECORD
            check_point_block.call(current_table, f.pos, @binlog_pos, current_state)
          elsif m = /^PRIMARY KEY \((?<primary_keys>[^\)]+)\)/.match(line)
            current_table.primary_keys = m[:primary_keys].split(',').collect do |pk_str|
              pk_str[1..-2]
            end
          end
        end

        state_create_table_columns = Proc.new do |f|
          start_pos = f.pos
          line = f.readline.strip
          #  `author_type` varchar(255) COLLATE utf8_unicode_ci DEFAULT NULL,
          if line.start_with?("\`")
            column = {}

            # parse column line
            line = line[0..-2] if line.end_with?(',')
            items = line.split
            column[:column_name] = items.shift[1..-2]
            column[:format_type_str] = format_type_str = items.shift
            pos = format_type_str.index('(')
            if pos
              ft = column[:format_type] = format_type_str[0..pos-1]
              if ft == 'decimal'
                precision, scale = format_type_str[pos+1..-2].split(',').collect{|v| v.to_i}
                column[:decimal_precision] = precision
                column[:decimal_scale] = scale
              else
                column[:format_size] = format_type_str[pos+1..-2].to_i
              end
            else
              column[:format_type] = format_type_str
            end
            while (item = items.shift) do
              case item
              when 'DEFAULT'
                value = items.shift
                value = value.start_with?('\'') ? value[1..-2] : value
                value = nil if value == 'NULL'
                column[:default] = value
              when 'NOT'
                if items[1] == 'NULL'
                  items.shift
                  column[:not_null] = true
                end
              when 'unsigned'
                column[:unsigned] = true
              else
                #ignore other options
              end
            end

            current_table.add_column(column)
          else
            current_state = State::CREATE_TABLE_CONSTRAINTS
            f.pos = start_pos
            state_create_table_constraints.call(f)
          end
        end

        state_insert_record = Proc.new do |f|
          original_pos = f.pos
          command = f.read(6)
          if command == 'INSERT'
            current_state = State::PARSING_INSERT_RECORD
          else
            f.pos = original_pos
            f.readline
            if command == 'UNLOCK'
              current_state = State::CREATE_TABLE
              check_point_block.call(current_table, f.pos, @binlog_pos, current_state)
            end
          end
        end

        state_parsing_insert_record = Proc.new do |f|
          values_set = InsertParser.new(f).parse
          current_state = State::INSERT_RECORD
          if insert_record_block.call(current_table, values_set)
            check_point_block.call(current_table, f.pos, @binlog_pos, current_state)
          end
        end

        # Start reading file from top
        File.open(@file_path, 'r') do |f|
          last_saved_pos = 0

          # resume
          if @option[:last_pos]
            f.pos = @option[:last_pos].to_i
            current_state = @option[:state]
            substate = @option[:substate]
            current_table = @option[:mysql_table]
          end

          until f.eof? do
            case current_state
            when State::START
              state_start.call(f)
            when State::CREATE_TABLE
              state_create_table.call(f)
            when State::CREATE_TABLE_COLUMNS
              state_create_table_columns.call(f)
            when State::CREATE_TABLE_CONSTRAINTS
              state_create_table_constraints.call(f)
            when State::INSERT_RECORD
              state_insert_record.call(f)
            when State::PARSING_INSERT_RECORD
              state_parsing_insert_record.call(f)
            end
          end
        end
        @binlog_pos
      end

      class InsertParser
        #INSERT INTO `data_entries` VALUES (2,2,'access_log'), (2,3,'access_log2');
        module State
          IN_VALUE = 'IN_VALUE'
          NEXT_VALUES = 'NEXT_VALUES'
        end

        def initialize(file)
          @file = file
          @values = []
          @values_set = []
        end

        def start_ruby_prof
          RubyProf.start if defined?(RubyProf) and not RubyProf.running?
        end

        def stop_ruby_prof
          if defined?(RubyProf) and RubyProf.running?
            result = RubyProf.stop
            #printer = RubyProf::GraphPrinter.new(result)
            printer = RubyProf::GraphHtmlPrinter.new(result)
            #printer.print(STDOUT)
            printer.print(File.new("ruby-prof-out-#{Time.now.to_i}.html", "w"), :min_percent => 3)
          end
        end

        def parse
          start_ruby_prof
          bench_start_time = Time.now
          target_line = @file.readline
          _parse(target_line)
        ensure
          stop_ruby_prof
          if ENV['FLYDATA_BENCHMARK']
            puts " -> time:#{Time.now.to_f - bench_start_time.to_f} size:#{target_line.size}"
          end
        end

        private

        def _parse(target_line)
          target_line = target_line.strip
          start_index = target_line.index('(')
          target_line = target_line[start_index..-2]
          items = target_line.split(',')
          index = 0
          cur_state = State::NEXT_VALUES

          loop do
            case cur_state
            when State::NEXT_VALUES
              chars = items[index]
              break unless chars
              items[index] = chars[1..-1]
              cur_state = State::IN_VALUE
            when State::IN_VALUE
              chars = items[index]
              index += 1
              if chars.start_with?("'")
                # single item (not last item)
                if chars.end_with?("'") and !last_char_escaped?(chars)
                  @values << replace_escape_char(chars[1..-2])
                # single item (last item)
                elsif chars.end_with?("')") and !last_char_escaped?(chars[0..-2])
                  @values << replace_escape_char(chars[1..-3])
                  @values_set << @values
                  @values = []
                  cur_state = State::NEXT_VALUES
                # multi items
                else
                  cur_value = chars[1..-1]
                  loop do
                    next_chars = items[index]
                    index += 1
                    if next_chars.end_with?('\'') and !last_char_escaped?(next_chars)
                      cur_value << ','
                      cur_value << next_chars[0..-2]
                      @values << replace_escape_char(cur_value)
                      break
                    elsif next_chars.end_with?("')") and !last_char_escaped?(next_chars[0..-2])
                      cur_value << ','
                      cur_value << next_chars[0..-3]
                      @values << replace_escape_char(cur_value)
                      @values_set << @values
                      @values = []
                      cur_state = State::NEXT_VALUES
                      break
                    else
                      cur_value << ','
                      cur_value << next_chars
                    end
                  end
                end
              else
                if chars.end_with?(')')
                  chars = chars[0..-2]
                  @values << (chars == 'NULL' ? nil : chars)
                  @values_set << @values
                  @values = []
                  cur_state = State::NEXT_VALUES
                else
                  @values << (chars == 'NULL' ? nil : chars)
                end
              end
            else
              raise "Invalid state: #{cur_state}"
            end
          end
          return @values_set
        end

        ESCAPE_HASH_TABLE = {"\\\\" => "\\", "\\'" => "'", "\\n" => "\n",  "\\r" => "\r"}

        def replace_escape_char(original)
          original.gsub(/\\\\|\\'|\\n|\\r/, ESCAPE_HASH_TABLE)
        end

        # This method assume that the last character is '(single quotation)
        # abcd\' -> true
        # abcd\\' -> false (back slash escape back slash)
        # abcd\\\' -> true
        def last_char_escaped?(text)
          flag = false
          (text.length - 2).downto(0) do |i|
            if text[i] == '\\'
              flag = !flag
            else
              break
            end
          end
          flag
        end
      end
    end
  end
end