require File.join(File.dirname(__FILE__), "service") module Charrington class CreateRedshiftTable # This service will create a table dynamically based on the JSON structure. # This is potentially called from Insert when an insert fails. include Service include LogStash::Util::Loggable attr_reader :connection, :event, :table_name, :columns, :schema, :opts attr_accessor :column_types Error = Class.new(StandardError) CreateFailed = Class.new(Error) def initialize(connection, event, schema, table_name, columns, opts = {}) @connection = connection @event = event.to_hash @schema = schema @table_name = table_name @columns = columns @column_types = initial_columns end def call set_column_types self.logger.info "Finished running set_column_types and now have column_types for create table of: #{column_types}" create_table true rescue => e raise CreateFailed, e.message ensure @column_types.clear if @column_types.is_a? Array end private # https://docs.aws.amazon.com/redshift/latest/dg/r_CREATE_TABLE_NEW.html def set_column_types columns.each do |column| if column == "id" column_types << "#{column} VARCHAR(512) NOT NULL distkey CONSTRAINT #{table_name}_pkey primary key" next elsif column == "sent_at" column_types << "#{column} TIMESTAMP" next end case event[column] when Time, LogStash::Timestamp column_types << "#{column} TIMESTAMP" when Date column_types << "#{column} DATE" when Integer column_types << "#{column} BIGINT" when BigDecimal column_types << "#{column} DECIMAL" when Float column_types << "#{column} DOUBLE PRECISION" when true, false column_types << "#{column} BOOLEAN" else column_types << "#{column} VARCHAR(512)" end end end def initial_columns [ 'original_timestamp TIMESTAMP DEFAULT GETDATE()', 'received_at TIMESTAMP DEFAULT GETDATE()', 'timestamp TIMESTAMP DEFAULT GETDATE()', 'uuid_ts TIMESTAMP DEFAULT GETDATE()', "uuid bigint default \"identity\"(22828367, 2, '1,1'::text)" ] end def create_table execute("CREATE TABLE IF NOT EXISTS #{schema}#{table_name} (#{column_types.join(', ')}) diststyle key sortkey(received_at)") end def execute(sql) self.logger.info "Running sql of: #{sql}" statement = connection.prepareStatement( sql.gsub(/\s+/, " ").strip ) statement.execute() rescue Java::JavaSql::SQLException => e puts "Redshift SQLException: #{e.message}" self.logger.info "Redshift SQLException: #{e.message}, with SQL: #{sql}" ensure self.logger.info "Within ensure block of create_redshift_table.rb and value of statement.nil?: #{statement.nil?}" statement.close unless statement.nil? end end end