require File.join(File.dirname(__FILE__), "service") module Charrington class AlterRedshiftTable # This service will add columns to an existing table dynamically based on finding new keys in the JSON structure. # This is potentially called from Insert when an insert fails. include Service include LogStash::Util::Loggable attr_reader :connection, :event, :table_name, :columns, :schema attr_accessor :column_types Error = Class.new(StandardError) AlterFailed = Class.new(Error) def initialize(connection, event, schema, table_name, columns) @connection = connection @event = event.to_hash @schema = schema @table_name = table_name @columns = columns @column_types = [] end def call set_column_types alter_table true rescue => e raise AlterFailed, e.message ensure @column_types.clear if @column_types.is_a? Array end private def alter_table execute("ALTER TABLE #{schema}#{table_name} #{columns_fragment}") end def columns_fragment column_types.map do |column| "ADD COLUMN #{column}" end.join(",") end def set_column_types (columns - current_table_columns).each_with_index do |key, idx| self.logger.info "New column: #{key}, because of event: #{event}" case event[key] when Time, LogStash::Timestamp column_types << "#{key} TIMESTAMP" when Date column_types << "#{key} DATE" when Integer column_types << "#{key} BIGINT" when BigDecimal column_types << "#{key} DECIMAL" when Float column_types << "#{key} DOUBLE PRECISION" when true, false column_types << "#{key} BOOLEAN" else column_types << "#{key} VARCHAR(512)" end end end def current_table_columns sql = "SELECT * FROM #{schema}#{table_name} LIMIT 1;" stmt, rs = executeQuery(prep_sql(sql)) meta_data = rs.getMetaData() stmt.close unless stmt.nil? column_count = meta_data.getColumnCount() (1..column_count).map {|i| meta_data.getColumnName(i) } ensure stmt.close unless stmt.nil? end def execute(sql) stmt = connection.prepareStatement(prep_sql(sql)) stmt.execute() rescue Java::JavaSql::SQLException => e puts "Alter Redshift SQLException: #{e.message}" self.logger.info "Alter Redshift SQLException: #{e.message}, with SQL: #{sql}" rescue => e puts "Alter Redshift Unknown exception: #{e.message}, with SQL: #{sql}" self.logger.info "Alter Redshift Unknown exception: #{e.message}" ensure stmt.close unless stmt.nil? end def executeQuery(sql) stmt = connection.createStatement() # only close the statement if something goes wrong # otherwise, the caller is responsible for closing the # statement when they are doen with the result set return stmt, stmt.executeQuery(prep_sql(sql)) rescue Java::JavaSql::SQLException => e puts "execute query SQLException: #{e.message}" self.logger.info "execute query SQLException: #{e.message}, with SQL: #{sql}" stmt.close unless stmt.nil? # @logger.error("#{e.message}") rescue => e puts "execute query Unknown exception: #{e.message}" self.logger.info "execute query Unknown exception: #{e.message}, with SQL: #{sql}" stmt.close unless stmt.nil? end def prep_sql(sql) sql.gsub(/\s+/, " ").strip end end end