require File.join(File.dirname(__FILE__), "create_postgres_table") require File.join(File.dirname(__FILE__), "create_redshift_table") require File.join(File.dirname(__FILE__), "alter_postgres_table") require File.join(File.dirname(__FILE__), "alter_redshift_table") require File.join(File.dirname(__FILE__), "service") module Charrington class Insert # This service assumes that the data is already clean and in a flattened hash format. # The Transform service should be called before calling this. include Service include LogStash::Util::Loggable attr_accessor :event, :should_retry attr_reader :connection, :schema, :table_name, :columns, :driver attr_reader :event_as_json_keyword, :enable_event_as_json_keyword Error = Class.new(StandardError) EventNil = Class.new(Error) TableNameNil = Class.new(Error) InsertFailed = Class.new(Error) def initialize(connection, event, opts = {}) raise EventNil, "Table name is nil" if event.nil? @event = event.to_hash event_name = event["event"].to_s.downcase.strip raise TableNameNil, "Table name is nil" if event_name.empty? @connection = connection @schema = opts[:schema].empty? ? '' : "#{opts[:schema]}." @table_name = "#{event_name.gsub(/[^a-z0-9]+/, "_")}" @columns = event.keys @should_retry = false @enable_event_as_json_keyword = opts[:enable_event_as_json_keyword] @event_as_json_keyword = opts[:event_as_json_keyword] @driver = opts[:driver] end def call self.logger.info "Attempting insert into table name: #{table_name}" insert_stmt = insert_statement self.logger.info "Insert statement passed into prepareStatement is: #{insert_stmt}" stmt = connection.prepareStatement(insert_stmt) stmt = add_statement_event_params(stmt) self.logger.info "Insert statement to be run is: #{insert_stmt}" stmt.execute should_retry rescue Java::JavaSql::SQLException => e case e.getSQLState() when "42P01" self.logger.info "Received Java::JavaSql::SQLException with error sql state of 42P01, moving to create table" should_retry = create_table when "42703" self.logger.info "Received Java::JavaSql::SQLException with error sql state of 42703, moving to alter table" should_retry = alter_table else raise InsertFailed, "Charrington: Rescue from SQLException #{e.message}" end should_retry rescue => e raise InsertFailed, "Charrington: Rescue from SQLException #{e.message}" ensure stmt.close unless stmt.nil? cleanup end private def create_table if driver == "postgres" Charrington::CreatePostgresTable.call(connection, event, schema, table_name, columns) elsif driver == "redshift" Charrington::CreateRedshiftTable.call(connection, event, schema, table_name, columns) end end def alter_table if driver == "postgres" Charrington::AlterPostgresTable.call(connection, event, schema, table_name, columns) elsif driver == "redshift" Charrington::AlterRedshiftTable.call(connection, event, schema, table_name, columns) end end def cleanup @columns.clear if clearable(@columns) end ### Set Variables def columns_text @columns_text ||= arr_to_csv(columns) end def value_placeholders ('?' * columns.length).split('') end def insert_values arr_to_csv(value_placeholders) end def insert_statement "INSERT INTO #{schema}#{table_name} #{columns_text} VALUES #{insert_values}" end def prepared_statement columns.map { |column| "[#{column}]" } end def add_statement_event_params(stmt) columns.each_with_index do |key, idx| pos = idx + 1 value = event[key] case value when Time stmt.setString(pos, value.strftime(STRFTIME_FMT)) when LogStash::Timestamp stmt.setString(pos, value.time.strftime(STRFTIME_FMT)) when Integer if value > 2147483647 || value < -2147483648 stmt.setLong(pos, value) else stmt.setInt(pos, value) end when BigDecimal stmt.setBigDecimal(pos, value.to_java) when Float stmt.setFloat(pos, value) when String stmt.setString(pos, value) when Array, Hash stmt.setString(pos, value.to_json) when true, false stmt.setBoolean(pos, value) else stmt.setString(pos, nil) end end stmt end ### Helpers def arr_to_csv(arr) '(' + arr.join(', ') + ')' end def clearable(obj) obj.is_a? Hash or obj.is_a? Array end ### SQL def execute(connection, sql) statement = connection.prepareStatement( sql.gsub(/\s+/, " ").strip ) statement.execute() rescue Java::OrgPostgresqlUtil::PSQLException => e puts "PSQLException: #{e.message}" self.logger.info "PSQLException: #{e.message}, with SQL: #{sql}" rescue Java::JavaSql::SQLException => e puts "Redshift SQLException: #{e.message}" self.logger.info "Redshift SQLException: #{e.message}, with SQL: #{sql}" ensure statement.close unless statement.nil? end end end