require 'date'
require 'flydata-core/errors'

module FlydataCore
module TableDef

class RedshiftTableDef
  TYPE_MAP_F2R = {
    'binary' => {type: 'varchar', use_params: true, default_value: ''},
    'bit' => {type: 'bigint', default_value: '0'},
    'char' => {type: 'char', use_params: true, default_value: ''},
    'date' => {type: 'date', default_value: '0000-01-01'},
    'datetime' => {type: 'timestamp', default_value: '0000-01-01'},
    'enum' => {type: 'varchar encode bytedict', default_value: ''},
    'float4' => {type: 'float4', default_value: '0'},
    'float4 unsigned' => {type: 'float4', default_value: '0'},
    'float8' => {type: 'float8', default_value: '0'},
    'float8 unsigned' => {type: 'float8', default_value: '0'},
    'int1' => {type: 'int2', default_value: '0'},
    'int1 unsigned' => {type: 'int2', unsigned: true, default_value: '0'},
    'int2' => {type: 'int2', default_value: '0'},
    'int2 unsigned' => {type: 'int4', unsigned: true, default_value: '0'},
    'int3' => {type: 'int4', default_value: '0'},
    'int3 unsigned' => {type: 'int4', unsigned: true, default_value: '0'},
    'int4' => {type: 'int4', default_value: '0'},
    'int4 unsigned' => {type: 'int8', unsigned: true, default_value: '0'},
    'int8' => {type: 'int8', default_value: '0'},
    'int8 unsigned' => {type: 'numeric(20,0)', unsigned: true, default_value: '0'},
    'numeric' => {type: 'numeric', use_params: true, max_size: [38,37], default_value: '0'},
    'numeric unsigned' => {type: 'numeric', use_params: true, max_size: [38,37], default_value: '0'},
    'set' => {type: 'varchar encode bytedict', default_value: ''},
    'text' => {type: 'varchar(max)', default_value: ''},
    'time' => {type: 'timestamp', default_value: '0000-01-01'},
    'varbinary' => {type: 'varchar', use_params: true, max_size: 65535, default_value: ''},
    'varchar' => {type: 'varchar', use_params: true, max_size: 65535, default_value: ''},
    'year' => {type: 'date', default_value: '0001-01-01'},
  }
  def self.from_flydata_tabledef(flydata_tabledef, options = {})
    options[:flydata_ctl_table] = true unless options.has_key?(:flydata_ctl_table)
    schema_name = options[:schema_name]

    begin
      tabledef = ""
      tabledef += create_schema_sql(schema_name) if options[:flydata_ctl_table] && !schema_name.to_s.empty?
      tabledef += create_flydata_ctl_table_sql(schema_name) if options[:flydata_ctl_table]
      tabledef += create_table_sql(flydata_tabledef, schema_name) unless options[:ctl_only]
      tabledef += comment_sql(flydata_tabledef, schema_name) unless options[:ctl_only]
      tabledef += flydata_ctl_sql(flydata_tabledef, schema_name)
    rescue => e
      # Catch errors from generating schema. Generally an unsupported data type
      raise TableDefError, {error: "errors generating schema. Please contact us for further instructions", table: flydata_tabledef[:table_name]}
    end
  end

  FLYDATA_CTL_COLUMNS_TABLE = "flydata_ctl_columns"
  FLYDATA_CTL_TABLES_TABLE = "flydata_ctl_tables"
  CREATE_SCHEMA_SQL = <<EOS
CREATE SCHEMA IF NOT EXISTS "%s";
EOS
  CREATE_FLYDATA_CTL_COLUMNS_SQL = <<EOS
CREATE TABLE IF NOT EXISTS %s(
  id integer NOT NULL IDENTITY(1,1),
  table_name varchar(128) NOT NULL,
  column_name varchar(128) NOT NULL,
  src_data_type varchar(1024) NOT NULL,
  revision int NOT NULL DEFAULT 1,
  ordinal_position int NOT NULL,
  PRIMARY KEY(id)
) DISTKEY(table_name) SORTKEY(table_name);
EOS
  CREATE_FLYDATA_CTL_TABLES_SQL = <<EOS
CREATE TABLE IF NOT EXISTS %s(
  id integer NOT NULL IDENTITY(1,1),
  table_name varchar(128) NOT NULL,
  attribute varchar(128) NOT NULL,
  value varchar(max),
  created_at timestamp DEFAULT SYSDATE,
  PRIMARY KEY(id)
) DISTKEY(table_name) SORTKEY(table_name);
EOS
  CREATE_FLYDATA_CTL_TABLE_SQL = "#{CREATE_FLYDATA_CTL_COLUMNS_SQL}#{CREATE_FLYDATA_CTL_TABLES_SQL}"


  def self.table_name_for_ddl(table_name, schema_name)
    schema_name.to_s.empty? ? "\"#{table_name}\"" : "\"#{schema_name}\".\"#{table_name}\""
  end

  def self.flydata_ctl_table_for_ddl(schema_name, ctl_table_type = :columns)
    table_name = case ctl_table_type
                 when :columns; FLYDATA_CTL_COLUMNS_TABLE
                 when :tables;  FLYDATA_CTL_TABLES_TABLE
                 end
    table_name_for_ddl(table_name, schema_name)
  end

  def self.create_schema_sql(schema_name)
    CREATE_SCHEMA_SQL % schema_name
  end

  def self.create_flydata_ctl_table_sql(schema_name)
    # No drop table here intentionally because losing the data is fatal.
    columns_tbl = flydata_ctl_table_for_ddl(schema_name, :columns)
    tables_tbl = flydata_ctl_table_for_ddl(schema_name, :tables)
    CREATE_FLYDATA_CTL_TABLE_SQL % [columns_tbl, tables_tbl]
  end

  def self.create_flydata_ctl_tables_sql(schema_name)
    tables_tbl = flydata_ctl_table_for_ddl(schema_name, :tables)
    CREATE_FLYDATA_CTL_TABLES_SQL % [tables_tbl]
  end

  CREATE_TABLE_SQL = <<EOS
DROP TABLE IF EXISTS %s;
CREATE TABLE %s (
%s
)%s;
EOS

  def self.create_table_sql(flydata_tabledef, schema_name)
    lines = flydata_tabledef[:columns].collect{|column| column_def_sql(column) }
    pk_def = primary_key_sql(flydata_tabledef)
    lines << pk_def if pk_def

    contents = lines.join(",\n")

    dk_sk_def = distkey_sortkey_sql(flydata_tabledef)

    table_name = flydata_tabledef[:table_name]
    redshift_tbl = table_name_for_ddl(table_name, schema_name)
    CREATE_TABLE_SQL % [redshift_tbl, redshift_tbl, contents, dk_sk_def]
  end

  def self.column_def_sql(column, opt = {})
    type = column[:type]
    if type =~ /\((.*?)\)/
      type = $` + $'
      params = $1
    end

    type_info = TYPE_MAP_F2R[type]
    raise "Unsupported type '#{column[:type]}'" if type_info.nil?

    rs_type = if type_info[:use_params] && params && !params.nil?
                params = check_and_replace_max(params, Array(type_info[:max_size])) if type_info[:max_size]
                type_info[:type] + "(#{params})"
              else
                type_info[:type]
              end
    line = %Q|  "#{column[:column]}" #{rs_type}|
    line += " NOT NULL" if column[:not_null]
    if (column.has_key?(:default))
      val = replace_default_value(type, type_info[:type], column[:default])
      line += " DEFAULT #{val}"
    elsif column[:not_null] && opt[:for] == :alter_table
      # Redshift doesn't allow adding a not null column without default value
      # Add a defalt value
      line += " DEFAULT '#{type_info[:default_value]}'"
    end
# Commented out because no IDENTITY column must be used for a replicated table.
# Values come from the master.
#    line += " IDENTITY(1, 1)" if (column[:auto_increment])

    line
  end

  NULL_STR = "NULL"

  def self.replace_default_value(flydata_type, redshift_type, default_value)
    return NULL_STR if default_value.nil?
    if flydata_type.start_with?('year')
      value = convert_year_into_date(remove_single_quote(default_value))
      begin
        Date.parse(value)
      rescue
        raise "default value of YEAR type must be 2 or 4-digit, value:'#{default_value}'"
      end
    end

    case redshift_type
    when 'timestamp'
      if default_value.upcase == "CURRENT_TIMESTAMP"
        'SYSDATE'
      else
        "'#{self.parse_timestamp(remove_single_quote(default_value))}'"
      end
    when 'date'
      "'#{self.parse_date(remove_single_quote(default_value))}'"
    else
      if !default_value.kind_of?(String)
        "'#{default_value}'"
      elsif /^b'.+'$/.match(default_value)
        "0b#{default_value[2..-2]}".oct
      elsif /^[xX]'.+'$/.match(default_value)
        "0x#{default_value[2..-2]}".oct
      elsif /^0[bx].+/.match(default_value)
        default_value.oct
      elsif /^'.*'$/.match(default_value)
        default_value
      else
        "'#{default_value}'"
      end
    end
  end

  def self.remove_single_quote(value)
    return value unless value.kind_of?(String)
    /^'.*'$/.match(value) ? value[1..-2] : value
  end

  def self.primary_key_sql(flydata_tabledef)
    pks = primary_keys(flydata_tabledef)
    pks.empty? ? nil : "  PRIMARY KEY (#{pks.join(',')})"
  end

  def self.primary_keys(flydata_tabledef)
    flydata_tabledef[:columns].select{|col| col[:primary_key]}.collect{|col| col[:column]}
  end

  def self.distkey_sortkey_sql(flydata_tabledef)
    pks = primary_keys(flydata_tabledef)
    return nil if pks.empty?
    " DISTKEY(#{pks.first}) SORTKEY(#{pks.join(',')})"
  end

  def self.comment_sql(flydata_tabledef, schema_name)
    sql = ""
    flydata_tabledef[:columns].each do |col|
      next unless col[:comment]

      sql += <<EOS
COMMENT ON COLUMN #{table_name_for_ddl(flydata_tabledef[:table_name], schema_name)}."#{col[:column]}"
  IS '#{col[:comment]}';
EOS
    end
    sql
  end

  def self.flydata_ctl_sql(flydata_tabledef, schema_name)
    flydata_ctl_columns_sql(flydata_tabledef, schema_name) + "\n" +
    flydata_ctl_tables_sql(flydata_tabledef, schema_name)
  end
  FLYDATA_CTL_COLUMNS_SQL = <<EOS
DELETE FROM %s WHERE table_name = '%s';
INSERT INTO %s (table_name, column_name, src_data_type, ordinal_position) VALUES
EOS
  def self.flydata_ctl_columns_sql(flydata_tabledef, schema_name)
    flydata_ctl_tbl = flydata_ctl_table_for_ddl(schema_name, :columns)
    sql = FLYDATA_CTL_COLUMNS_SQL % [ flydata_ctl_tbl, flydata_tabledef[:table_name], flydata_ctl_tbl ]
    values = []
    flydata_tabledef[:columns].each.with_index(1) do |col, i|
      charset = col[:charset] ? " cs:#{col[:charset]}" : ""
      values << "('#{flydata_tabledef[:table_name]}', '#{col[:column]}', '#{escape(col[:type])}#{charset}', #{i})"
    end
    sql += values.join(",\n") + ';'
    sql
  end

  FLYDATA_CTL_TABLES_SQL = <<EOS
DELETE FROM %s WHERE table_name = '%s';
INSERT INTO %s (table_name, attribute, value) VALUES
EOS

  def self.flydata_ctl_tables_sql(flydata_tabledef, schema_name)
    flydata_ctl_tbl = flydata_ctl_table_for_ddl(schema_name, :tables)
    sql = FLYDATA_CTL_TABLES_SQL % [ flydata_ctl_tbl, flydata_tabledef[:table_name], flydata_ctl_tbl ]
    values = []
    values << "('#{flydata_tabledef[:table_name]}', 'cs', '#{escape(flydata_tabledef[:default_charset])}')"
    values << "('#{flydata_tabledef[:table_name]}', 'revision', 1)"
    sql += values.join(",\n") + ';'
    sql
  end

  def self.escape(text)
    text.gsub("'", "\\\\'")
  end

  def self.check_and_replace_max(params, max_size_a)
      final_params = []
      params.split(",").each_with_index do |param, i|
        final_params << (/\d+/.match(param) && max_size_a[i] && param.to_i > max_size_a[i].to_i ?
                            max_size_a[i] : param)
      end
      final_params.join(",")
  end

  APACHE_TIMESTAMP_REGEXP = Regexp.new('^(?<apache_time_format>\[[0-3]\d\/\D{3}\/[1-2]\d{3}:[0-2]\d:[0-5]\d:[0-5]\d ?[\+\-]\d{2}:?\d{2}\])$')
  TIME_REGEXP = Regexp.new('^(?<sign>-)?(?<hour>\d{2,3}):(?<minute>[0-5][0-9]):(?<second>[0-5][0-9](\.\d+)?)$')

  def self.parse_timestamp(value)
    value_str = value.to_s
    return nil if value_str.empty?
    if value.kind_of?(Integer) or /^\d+$/ === value_str
      # Unix epoch in UTC
      t = DateTime.strptime(value_str, '%s')
    elsif APACHE_TIMESTAMP_REGEXP.match(value_str)
      # apache time format
      t = DateTime.strptime(value, "[%d/%b/%Y:%H:%M:%S %Z]")
    elsif time_match = TIME_REGEXP.match(value_str)
      t = convert_time_into_timestamp(time_match)
    else
      t = DateTime.parse(value_str)
    end
    t = t.new_offset(0) # Redshift Plug-in uses UTC
    t.strftime('%Y-%m-%d %H:%M:%S.%6N')
  rescue ArgumentError => ae
    # '0000-00-00 00:00:00' is valid for mysql datetime column
    if value_str.start_with?('0000-00-00 00:00:00')
      return '0001-01-01 00:00:00.000000'
    else
      raise ae
    end
  end

  STANDARD_DATETIME = DateTime.new(1, 1, 1)

  def self.convert_time_into_timestamp(time_match)
    sign = time_match[:sign] ? -1 : 1
    STANDARD_DATETIME.dup +
      sign * Rational(time_match[:hour].to_i, 24) +
      sign * Rational(time_match[:minute].to_i, 1440) +
      sign * Rational(time_match[:second].to_f, 86400)
  end

  def self.parse_date(value)
    return nil if value.nil?
    value_str = value.to_s
    dt = Date.parse(convert_year_into_date(value_str))
    dt.strftime('%Y-%m-%d')
  rescue ArgumentError => ae
    # '0000-00-00' is valid for mysql date column
    return '0001-01-01' if value_str == '0000-00-00'
    raise ae
  end

  def self.convert_year_into_date(value)
    if value == '0' || value == '0000'
      converted_value = '0001-01-01' # '0001-01-01' is the "base" date
    else
      case value
      when /^\d{4}$/
        converted_value = "#{value}-01-01"
      when /^\d{2}$/
        converted_value = "#{value.to_i >= 70 ? "19" : "20"}#{value}-01-01"
      else
        converted_value = value # Return the value as is
      end
    end
  end
end

end
end