require 'date' require 'flydata-core/errors' module FlydataCore module TableDef class RedshiftTableDef TYPE_MAP_F2R = { 'binary' => {type: 'varchar', use_params: true, default_value: ''}, 'bit' => {type: 'bigint', default_value: '0'}, 'char' => {type: 'char', use_params: true, default_value: ''}, 'date' => {type: 'date', default_value: '0000-01-01'}, 'datetime' => {type: 'timestamp', default_value: '0000-01-01'}, 'enum' => {type: 'varchar encode bytedict', default_value: ''}, 'float4' => {type: 'float4', default_value: '0'}, 'float4 unsigned' => {type: 'float4', default_value: '0'}, 'float8' => {type: 'float8', default_value: '0'}, 'float8 unsigned' => {type: 'float8', default_value: '0'}, 'int1' => {type: 'int2', default_value: '0'}, 'int1 unsigned' => {type: 'int2', unsigned: true, default_value: '0'}, 'int2' => {type: 'int2', default_value: '0'}, 'int2 unsigned' => {type: 'int4', unsigned: true, default_value: '0'}, 'int3' => {type: 'int4', default_value: '0'}, 'int3 unsigned' => {type: 'int4', unsigned: true, default_value: '0'}, 'int4' => {type: 'int4', default_value: '0'}, 'int4 unsigned' => {type: 'int8', unsigned: true, default_value: '0'}, 'int8' => {type: 'int8', default_value: '0'}, 'int8 unsigned' => {type: 'numeric(20,0)', unsigned: true, default_value: '0'}, 'numeric' => {type: 'numeric', use_params: true, max_size: [38,37], default_value: '0'}, 'numeric unsigned' => {type: 'numeric', use_params: true, max_size: [38,37], default_value: '0'}, 'set' => {type: 'varchar encode bytedict', default_value: ''}, 'text' => {type: 'varchar(max)', default_value: ''}, 'time' => {type: 'timestamp', default_value: '0000-01-01'}, 'varbinary' => {type: 'varchar', use_params: true, max_size: 65535, default_value: ''}, 'varchar' => {type: 'varchar', use_params: true, max_size: 65535, default_value: ''}, 'year' => {type: 'date', default_value: '0001-01-01'}, } def self.from_flydata_tabledef(flydata_tabledef, options = {}) options[:flydata_ctl_table] = true unless options.has_key?(:flydata_ctl_table) schema_name = options[:schema_name] begin tabledef = "" tabledef += create_schema_sql(schema_name) if options[:flydata_ctl_table] && !schema_name.to_s.empty? tabledef += create_flydata_ctl_table_sql(schema_name) if options[:flydata_ctl_table] tabledef += create_table_sql(flydata_tabledef, schema_name) unless options[:ctl_only] tabledef += comment_sql(flydata_tabledef, schema_name) unless options[:ctl_only] tabledef += flydata_ctl_sql(flydata_tabledef, schema_name) rescue => e # Catch errors from generating schema. Generally an unsupported data type raise TableDefError, {error: "errors generating schema. Please contact us for further instructions", table: flydata_tabledef[:table_name]} end end FLYDATA_CTL_COLUMNS_TABLE = "flydata_ctl_columns" FLYDATA_CTL_TABLES_TABLE = "flydata_ctl_tables" CREATE_SCHEMA_SQL = <<EOS CREATE SCHEMA IF NOT EXISTS "%s"; EOS CREATE_FLYDATA_CTL_COLUMNS_SQL = <<EOS CREATE TABLE IF NOT EXISTS %s( id integer NOT NULL IDENTITY(1,1), table_name varchar(128) NOT NULL, column_name varchar(128) NOT NULL, src_data_type varchar(1024) NOT NULL, revision int NOT NULL DEFAULT 1, ordinal_position int NOT NULL, PRIMARY KEY(id) ) DISTKEY(table_name) SORTKEY(table_name); EOS CREATE_FLYDATA_CTL_TABLES_SQL = <<EOS CREATE TABLE IF NOT EXISTS %s( id integer NOT NULL IDENTITY(1,1), table_name varchar(128) NOT NULL, attribute varchar(128) NOT NULL, value varchar(max), created_at timestamp DEFAULT SYSDATE, PRIMARY KEY(id) ) DISTKEY(table_name) SORTKEY(table_name); EOS CREATE_FLYDATA_CTL_TABLE_SQL = "#{CREATE_FLYDATA_CTL_COLUMNS_SQL}#{CREATE_FLYDATA_CTL_TABLES_SQL}" def self.table_name_for_ddl(table_name, schema_name) schema_name.to_s.empty? ? "\"#{table_name}\"" : "\"#{schema_name}\".\"#{table_name}\"" end def self.flydata_ctl_table_for_ddl(schema_name, ctl_table_type = :columns) table_name = case ctl_table_type when :columns; FLYDATA_CTL_COLUMNS_TABLE when :tables; FLYDATA_CTL_TABLES_TABLE end table_name_for_ddl(table_name, schema_name) end def self.create_schema_sql(schema_name) CREATE_SCHEMA_SQL % schema_name end def self.create_flydata_ctl_table_sql(schema_name) # No drop table here intentionally because losing the data is fatal. columns_tbl = flydata_ctl_table_for_ddl(schema_name, :columns) tables_tbl = flydata_ctl_table_for_ddl(schema_name, :tables) CREATE_FLYDATA_CTL_TABLE_SQL % [columns_tbl, tables_tbl] end def self.create_flydata_ctl_tables_sql(schema_name) tables_tbl = flydata_ctl_table_for_ddl(schema_name, :tables) CREATE_FLYDATA_CTL_TABLES_SQL % [tables_tbl] end CREATE_TABLE_SQL = <<EOS DROP TABLE IF EXISTS %s; CREATE TABLE %s ( %s )%s; EOS def self.create_table_sql(flydata_tabledef, schema_name) lines = flydata_tabledef[:columns].collect{|column| column_def_sql(column) } pk_def = primary_key_sql(flydata_tabledef) lines << pk_def if pk_def contents = lines.join(",\n") dk_sk_def = distkey_sortkey_sql(flydata_tabledef) table_name = flydata_tabledef[:table_name] redshift_tbl = table_name_for_ddl(table_name, schema_name) CREATE_TABLE_SQL % [redshift_tbl, redshift_tbl, contents, dk_sk_def] end def self.column_def_sql(column, opt = {}) type = column[:type] if type =~ /\((.*?)\)/ type = $` + $' params = $1 end type_info = TYPE_MAP_F2R[type] raise "Unsupported type '#{column[:type]}'" if type_info.nil? rs_type = if type_info[:use_params] && params && !params.nil? params = check_and_replace_max(params, Array(type_info[:max_size])) if type_info[:max_size] type_info[:type] + "(#{params})" else type_info[:type] end line = %Q| "#{column[:column]}" #{rs_type}| line += " NOT NULL" if column[:not_null] if (column.has_key?(:default)) val = replace_default_value(type, type_info[:type], column[:default]) line += " DEFAULT #{val}" elsif column[:not_null] && opt[:for] == :alter_table # Redshift doesn't allow adding a not null column without default value # Add a defalt value line += " DEFAULT '#{type_info[:default_value]}'" end # Commented out because no IDENTITY column must be used for a replicated table. # Values come from the master. # line += " IDENTITY(1, 1)" if (column[:auto_increment]) line end NULL_STR = "NULL" def self.replace_default_value(flydata_type, redshift_type, default_value) return NULL_STR if default_value.nil? if flydata_type.start_with?('year') value = convert_year_into_date(remove_single_quote(default_value)) begin Date.parse(value) rescue raise "default value of YEAR type must be 2 or 4-digit, value:'#{default_value}'" end end case redshift_type when 'timestamp' if default_value.upcase == "CURRENT_TIMESTAMP" 'SYSDATE' else "'#{self.parse_timestamp(remove_single_quote(default_value))}'" end when 'date' "'#{self.parse_date(remove_single_quote(default_value))}'" else if !default_value.kind_of?(String) "'#{default_value}'" elsif /^b'.+'$/.match(default_value) "0b#{default_value[2..-2]}".oct elsif /^[xX]'.+'$/.match(default_value) "0x#{default_value[2..-2]}".oct elsif /^0[bx].+/.match(default_value) default_value.oct elsif /^'.*'$/.match(default_value) default_value else "'#{default_value}'" end end end def self.remove_single_quote(value) return value unless value.kind_of?(String) /^'.*'$/.match(value) ? value[1..-2] : value end def self.primary_key_sql(flydata_tabledef) pks = primary_keys(flydata_tabledef) pks.empty? ? nil : " PRIMARY KEY (#{pks.join(',')})" end def self.primary_keys(flydata_tabledef) flydata_tabledef[:columns].select{|col| col[:primary_key]}.collect{|col| col[:column]} end def self.distkey_sortkey_sql(flydata_tabledef) pks = primary_keys(flydata_tabledef) return nil if pks.empty? " DISTKEY(#{pks.first}) SORTKEY(#{pks.join(',')})" end def self.comment_sql(flydata_tabledef, schema_name) sql = "" flydata_tabledef[:columns].each do |col| next unless col[:comment] sql += <<EOS COMMENT ON COLUMN #{table_name_for_ddl(flydata_tabledef[:table_name], schema_name)}."#{col[:column]}" IS '#{col[:comment]}'; EOS end sql end def self.flydata_ctl_sql(flydata_tabledef, schema_name) flydata_ctl_columns_sql(flydata_tabledef, schema_name) + "\n" + flydata_ctl_tables_sql(flydata_tabledef, schema_name) end FLYDATA_CTL_COLUMNS_SQL = <<EOS DELETE FROM %s WHERE table_name = '%s'; INSERT INTO %s (table_name, column_name, src_data_type, ordinal_position) VALUES EOS def self.flydata_ctl_columns_sql(flydata_tabledef, schema_name) flydata_ctl_tbl = flydata_ctl_table_for_ddl(schema_name, :columns) sql = FLYDATA_CTL_COLUMNS_SQL % [ flydata_ctl_tbl, flydata_tabledef[:table_name], flydata_ctl_tbl ] values = [] flydata_tabledef[:columns].each.with_index(1) do |col, i| charset = col[:charset] ? " cs:#{col[:charset]}" : "" values << "('#{flydata_tabledef[:table_name]}', '#{col[:column]}', '#{escape(col[:type])}#{charset}', #{i})" end sql += values.join(",\n") + ';' sql end FLYDATA_CTL_TABLES_SQL = <<EOS DELETE FROM %s WHERE table_name = '%s'; INSERT INTO %s (table_name, attribute, value) VALUES EOS def self.flydata_ctl_tables_sql(flydata_tabledef, schema_name) flydata_ctl_tbl = flydata_ctl_table_for_ddl(schema_name, :tables) sql = FLYDATA_CTL_TABLES_SQL % [ flydata_ctl_tbl, flydata_tabledef[:table_name], flydata_ctl_tbl ] values = [] values << "('#{flydata_tabledef[:table_name]}', 'cs', '#{escape(flydata_tabledef[:default_charset])}')" values << "('#{flydata_tabledef[:table_name]}', 'revision', 1)" sql += values.join(",\n") + ';' sql end def self.escape(text) text.gsub("'", "\\\\'") end def self.check_and_replace_max(params, max_size_a) final_params = [] params.split(",").each_with_index do |param, i| final_params << (/\d+/.match(param) && max_size_a[i] && param.to_i > max_size_a[i].to_i ? max_size_a[i] : param) end final_params.join(",") end APACHE_TIMESTAMP_REGEXP = Regexp.new('^(?<apache_time_format>\[[0-3]\d\/\D{3}\/[1-2]\d{3}:[0-2]\d:[0-5]\d:[0-5]\d ?[\+\-]\d{2}:?\d{2}\])$') TIME_REGEXP = Regexp.new('^(?<sign>-)?(?<hour>\d{2,3}):(?<minute>[0-5][0-9]):(?<second>[0-5][0-9](\.\d+)?)$') def self.parse_timestamp(value) value_str = value.to_s return nil if value_str.empty? if value.kind_of?(Integer) or /^\d+$/ === value_str # Unix epoch in UTC t = DateTime.strptime(value_str, '%s') elsif APACHE_TIMESTAMP_REGEXP.match(value_str) # apache time format t = DateTime.strptime(value, "[%d/%b/%Y:%H:%M:%S %Z]") elsif time_match = TIME_REGEXP.match(value_str) t = convert_time_into_timestamp(time_match) else t = DateTime.parse(value_str) end t = t.new_offset(0) # Redshift Plug-in uses UTC t.strftime('%Y-%m-%d %H:%M:%S.%6N') rescue ArgumentError => ae # '0000-00-00 00:00:00' is valid for mysql datetime column if value_str.start_with?('0000-00-00 00:00:00') return '0001-01-01 00:00:00.000000' else raise ae end end STANDARD_DATETIME = DateTime.new(1, 1, 1) def self.convert_time_into_timestamp(time_match) sign = time_match[:sign] ? -1 : 1 STANDARD_DATETIME.dup + sign * Rational(time_match[:hour].to_i, 24) + sign * Rational(time_match[:minute].to_i, 1440) + sign * Rational(time_match[:second].to_f, 86400) end def self.parse_date(value) return nil if value.nil? value_str = value.to_s dt = Date.parse(convert_year_into_date(value_str)) dt.strftime('%Y-%m-%d') rescue ArgumentError => ae # '0000-00-00' is valid for mysql date column return '0001-01-01' if value_str == '0000-00-00' raise ae end def self.convert_year_into_date(value) if value == '0' || value == '0000' converted_value = '0001-01-01' # '0001-01-01' is the "base" date else case value when /^\d{4}$/ converted_value = "#{value}-01-01" when /^\d{2}$/ converted_value = "#{value.to_i >= 70 ? "19" : "20"}#{value}-01-01" else converted_value = value # Return the value as is end end end end end end