require File.join(File.dirname(__FILE__), "service") module Charrington class AlterTable # This service will add columns to an existing table dynamically based on finding new keys in the JSON structure. # This is potentially called from Insert when an insert fails. include Service attr_reader :connection, :event, :table_name, :columns attr_accessor :column_types Error = Class.new(StandardError) AlterFailed = Class.new(Error) def initialize(connection, event, table_name, columns) @connection = connection @event = event @table_name = table_name @columns = columns @column_types = [] end def call set_column_types alter_table true rescue => e raise AlterFailed, e.message ensure @column_types.clear if @column_types.is_a? Array end private def alter_table execute("ALTER TABLE IF EXISTS #{table_name} #{columns_fragment}") end def columns_fragment column_types.map do |column| "ADD COLUMN IF NOT EXISTS #{column}" end.join(",") end def set_column_types (columns - current_table_columns).each_with_index do |key, idx| case event[key] when Time, LogStash::Timestamp column_types << "#{key} TIMESTAMP" when Date column_types << "#{key} DATE" when Integer column_types << "#{key} BIGINT" when BigDecimal column_types << "#{key} DECIMAL" when Float column_types << "#{key} DOUBLE PRECISION" when true, false column_types << "#{key} BOOLEAN" else column_types << "#{key} VARCHAR" end end end def current_table_columns sql = "SELECT * FROM #{table_name} LIMIT 1;" rs = executeQuery(sql) meta_data = rs.getMetaData() column_count = meta_data.getColumnCount() (1..column_count).map {|i| meta_data.getColumnName(i) } end def execute(sql) stmt = connection.prepareStatement(prep_sql(sql)) stmt.execute() rescue Java::OrgPostgresqlUtil::PSQLException => e # @logger.error("#{e.message}") ensure stmt.close unless stmt.nil? end def executeQuery(sql) stmt = connection.createStatement() stmt.executeQuery(prep_sql(sql)) rescue Java::OrgPostgresqlUtil::PSQLException => e # @logger.error("#{e.message}") ensure stmt.close unless stmt.nil? end def prep_sql(sql) sql.gsub(/\s+/, " ").strip end end end