# frozen_string_literal: true require File.join(File.dirname(__FILE__), 'service') module Charrington # This service will add columns to an existing table dynamically based on finding new keys in the JSON structure. # This is potentially called from Insert when an insert fails. class AlterRedshiftTable include Service include LogStash::Util::Loggable attr_reader :connection, :event, :table_name, :columns, :schema attr_accessor :column_types Error = Class.new(StandardError) AlterFailed = Class.new(Error) def initialize(connection, event, schema, table_name, columns) @connection = connection @event = event.to_hash @schema = schema @table_name = table_name @columns = columns @column_types = [] end def call set_column_types alter_table true rescue StandardError => e raise AlterFailed, e.message ensure @column_types.clear if @column_types.is_a? Array end private def alter_table execute_list(list_of_alter_table_stmts) end def list_of_alter_table_stmts column_types.map do |column| "ALTER TABLE #{schema}#{table_name} ADD COLUMN #{column}" end end def set_column_types (columns - current_table_columns).each_with_index do |key, _idx| logger.info "New column: #{key}, because of event: #{event}" column_types << case event[key] when Time, LogStash::Timestamp "#{key} TIMESTAMP" when Date "#{key} DATE" when Integer "#{key} BIGINT" when BigDecimal "#{key} DECIMAL" when Float "#{key} DOUBLE PRECISION" when true, false "#{key} BOOLEAN" else "#{key} VARCHAR(512)" end end end def current_table_columns sql = "SELECT * FROM #{schema}#{table_name} LIMIT 1;" stmt, rs = execute_query(prep_sql(sql)) meta_data = rs.getMetaData column_count = meta_data.getColumnCount (1..column_count).map { |i| meta_data.getColumnName(i) } ensure logger.info "Within ensure block of current_table_columns in alter_redshift_table.rb and value of stmt.nil?: #{stmt.nil?}" stmt&.close end def execute_list(list_of_sql_stmts) logger.info "Received list of sql statments to execute: #{list_of_sql_stmts}" list_of_sql_stmts.each_with_index do |sql, idx| logger.info "Executing ALTER TABLE statement with index #{idx} and sql of: #{sql}" execute(sql) end end def execute(sql) stmt = connection.prepareStatement(prep_sql(sql)) stmt.execute rescue Java::JavaSql::SQLException => e logger.error "Alter Redshift SQLException: #{e.message}, with SQL: #{sql}" rescue StandardError => e logger.error "Alter Redshift Unknown exception: #{e.message}, with SQL: #{sql}" ensure logger.error "Within ensure block of execute in alter_redshift_table.rb and value of stmt.nil?: #{stmt.nil?}" stmt&.close end def execute_query(sql) stmt = connection.createStatement # only close the statement if something goes wrong # otherwise, the caller is responsible for closing the # statement when they are doen with the result set [stmt, stmt.execute_query(prep_sql(sql))] rescue Java::JavaSql::SQLException => e puts "execute query SQLException: #{e.message}" logger.info "execute query SQLException: #{e.message}, with SQL: #{sql}" stmt&.close # @logger.error("#{e.message}") rescue StandardError => e puts "execute query Unknown exception: #{e.message}" logger.info "execute query Unknown exception: #{e.message}, with SQL: #{sql}" stmt&.close end def prep_sql(sql) sql.gsub(/\s+/, ' ').strip end end end