# frozen_string_literal: true require File.join(File.dirname(__FILE__), 'create_postgres_table') require File.join(File.dirname(__FILE__), 'create_redshift_table') require File.join(File.dirname(__FILE__), 'alter_postgres_table') require File.join(File.dirname(__FILE__), 'alter_redshift_table') require File.join(File.dirname(__FILE__), 'service') require 'time' module Charrington # This service assumes that the data is already clean and in a flattened hash format. # The Transform service should be called before calling this. class Insert # rubocop:disable Metrics/ClassLength include Service include LogStash::Util::Loggable attr_accessor :event, :should_retry attr_reader :connection, :schema, :table_name, :columns, :driver, :opts, :transformer, :tracks, :event_as_json_keyword, :enable_event_as_json_keyword # TODO: create a current_table_columns (alter_postgres_table.rb) query on the tracks table to get the current columns REDSHIFT_TRACKS_COLUMNS = %w[id action app_name received_at uuid uuid_ts anonymous_id context_ip context_library_name context_library_version context_page_path context_page_referrer context_page_title context_page_url context_user_agent event event_text original_timestamp sent_at timestamp user_id user_uid context_campaign_medium context_campaign_name context_page_search context_campaign_source segment_dedupe_id context_campaign_content].freeze POSTGRES_TRACKS_COLUMNS = %w[anonymous_user app_name event published_at session_ip session_library_name session_library_version session_page_path session_page_referrer session_page_search session_page_title session_page_url session_user_agent user_id user_uid].freeze TIMESTAMP_COLUMNS = %w[published_at sent_at original_timestamp received_at timestamp].freeze Error = Class.new(StandardError) EventNil = Class.new(Error) TableNameNil = Class.new(Error) InsertFailed = Class.new(Error) def initialize(connection, event, opts = {}) raise EventNil, 'Table name is nil' if event.nil? @transformer = opts[:transformer] @event = event.to_hash @tracks = create_tracks(@event) event_name = event['event'].to_s.strip raise TableNameNil, 'Table name is nil' if event_name.empty? @connection = connection @schema = opts[:schema].empty? ? '' : "#{opts[:schema]}." @table_name = underscore(event_name) @columns = event.keys.map { |x| underscore(x) } @should_retry = false @enable_event_as_json_keyword = opts[:enable_event_as_json_keyword] @event_as_json_keyword = opts[:event_as_json_keyword] @driver = opts[:driver] @opts = opts end def call logger.info "Attempting insert into table name: #{table_name}" insert_sql = insert_event_statement insert_stmt = connection.prepareStatement(insert_sql) logger.info "Insert statement passed into prepareStatement is: #{insert_stmt}" insert_stmt = add_statement_event_params(insert_stmt, event) logger.info "Insert statement to be run is: #{insert_stmt.toString}" insert_stmt.execute logger.info 'Attempting insert into tracks table' do_tracks_insert should_retry rescue Java::JavaSql::SQLException => e case e.getSQLState when '42P01' logger.info 'Received Java::JavaSql::SQLException with error sql state of 42P01, moving to create table' should_retry = create_table when '42703' logger.info 'Received Java::JavaSql::SQLException with error sql state of 42703, moving to alter table' should_retry = alter_table else raise InsertFailed, "SQLException (Charrington:Insert) #{e.message} #{insert_sql}" end should_retry rescue StandardError => e raise InsertFailed, "SQLException (Charrington:Insert) #{e.message} #{insert_sql}" ensure insert_stmt&.close cleanup end def do_tracks_insert tracks_sql = insert_tracks_statement tracks_stmt = connection.prepareStatement(tracks_sql) tracks_stmt = add_statement_event_params(tracks_stmt, tracks) logger.info "Insert tracks statment to be run: #{tracks_stmt.toString}" tracks_stmt.execute rescue Java::JavaSql::SQLException => e logger.error("SQLException (Charrington:Insert) Insert tracks entry failed. #{e.message} #{tracks_sql}") ensure tracks_stmt&.close end private def tracks_columns redshift_transform? ? REDSHIFT_TRACKS_COLUMNS : POSTGRES_TRACKS_COLUMNS end def redshift? driver == 'redshift' end def postgres? driver == 'postgres' end def postgres_transform? transformer == 'postgres' end def redshift_transform? transformer == 'redshift' end def create_table if postgres? Charrington::CreatePostgresTable.call(connection, event, schema, table_name, columns, opts) elsif redshift? Charrington::CreateRedshiftTable.call(connection, event, schema, table_name, columns, opts) end end def alter_table if postgres? Charrington::AlterPostgresTable.call(connection, event, schema, table_name, columns) elsif redshift? Charrington::AlterRedshiftTable.call(connection, event, schema, table_name, columns) end end def cleanup @columns.clear if clearable(@columns) @tracks.clear if clearable(@tracks) end ### Set Variables def create_tracks(event) tracks = event.clone tracks.each_key do |key| tracks.delete(key) unless tracks_columns.include?(key) end tracks end def columns_text @columns_text ||= arr_to_csv(columns) end def tracks_columns_text @tracks_columns_text ||= arr_to_csv(tracks.keys) end def value_placeholders(columns) ('?' * columns.length).split('') end def placeholder_text(columns) arr_to_csv(value_placeholders(columns)) end def insert_event_statement "INSERT INTO #{schema}#{table_name} #{columns_text} VALUES #{placeholder_text(columns)}" end def insert_tracks_statement "INSERT INTO #{schema}tracks #{tracks_columns_text} VALUES #{placeholder_text(tracks.keys)}" end def add_statement_event_params(stmt, map) # rubocop:disable Metrics/CyclomaticComplexity values = [] map.keys.each_with_index do |key, idx| # rubocop:disable Metrics/BlockLength pos = idx + 1 value = map[key] values << value if TIMESTAMP_COLUMNS.include?(key) begin time = parse_date(value) stmt.setTimestamp(pos, time) next rescue java.text.ParseException time = parse_date(value, "yyyy-MM-dd'T'HH:mm:ss'Z'") stmt.setTimestamp(pos, time) next end end case value when Time stmt.setString(pos, value.strftime(STRFTIME_FMT)) when LogStash::Timestamp stmt.setString(pos, value.time.strftime(STRFTIME_FMT)) when Integer if value > 2147483647 || value < -2147483648 stmt.setLong(pos, value) else stmt.setInt(pos, value) end when BigDecimal stmt.setBigDecimal(pos, value.to_java) when Float stmt.setFloat(pos, value) when String stmt.setString(pos, value[0, 254]) # truncate at 254 string characters when Array, Hash stmt.setString(pos, value.to_json) when true, false stmt.setBoolean(pos, value) else stmt.setString(pos, nil) end end stmt end ### Helpers def parse_date(date, fmt = "yyyy-MM-dd'T'HH:mm:ss.S'Z'") format = java.text.SimpleDateFormat.new(fmt) parsed = format.parse(date) java.sql.Timestamp.new(parsed.getTime) end def arr_to_csv(arr) "(#{arr.join(', ')})" end def clearable(obj) obj.is_a? Hash or obj.is_a? Array end def underscore(str) str .gsub(/::/, '/') .gsub(/([A-Z]+)([A-Z][a-z])/, '\1_\2') .gsub(/([a-z\d])([A-Z])/, '\1_\2') .downcase .gsub(/[^a-z0-9]+/, '_') .gsub(/\A_+/, '') .gsub(/_+\z/, '')[0, 64] end ### SQL def execute(connection, sql) statement = connection.prepareStatement(sql.gsub(/\s+/, ' ').strip) statement.execute rescue Java::OrgPostgresqlUtil::PSQLException => e logger.error "PSQLException: #{e.message}, with SQL: #{sql}" rescue Java::JavaSql::SQLException => e logger.error "Redshift SQLException: #{e.message}, with SQL: #{sql}" ensure statement&.close end end end