require File.join(File.dirname(__FILE__), "create_postgres_table") require File.join(File.dirname(__FILE__), "create_redshift_table") require File.join(File.dirname(__FILE__), "alter_postgres_table") require File.join(File.dirname(__FILE__), "alter_redshift_table") require File.join(File.dirname(__FILE__), "service") require 'time' module Charrington class Insert # This service assumes that the data is already clean and in a flattened hash format. # The Transform service should be called before calling this. include Service include LogStash::Util::Loggable attr_accessor :event, :should_retry attr_reader :connection, :schema, :table_name, :columns, :driver, :opts, :transformer, :tracks attr_reader :event_as_json_keyword, :enable_event_as_json_keyword # TODO create a current_table_columns (alter_postgres_table.rb) query on the tracks table to get the current columns @@redshift_tracks_columns = %w(id app_name received_at uuid uuid_ts anonymous_id context_ip context_library_name context_library_version context_page_path context_page_referrer context_page_title context_page_url context_user_agent event event_text original_timestamp sent_at timestamp user_id user_uid context_campaign_medium context_campaign_name context_page_search context_campaign_source segment_dedupe_id context_campaign_content) @@postgres_tracks_columns = %w(anonymous_user app_name event published_at session_ip session_library_name session_library_version session_page_path session_page_referrer session_page_search session_page_title session_page_url session_user_agent user_id user_uid) Error = Class.new(StandardError) EventNil = Class.new(Error) TableNameNil = Class.new(Error) InsertFailed = Class.new(Error) def initialize(connection, event, opts = {}) raise EventNil, "Table name is nil" if event.nil? @transformer = opts[:transformer] @event = event.to_hash @tracks = create_tracks(@event) event_name = event["event"].to_s.strip raise TableNameNil, "Table name is nil" if event_name.empty? @connection = connection @schema = opts[:schema].empty? ? '' : "#{opts[:schema]}." @table_name = underscore(event_name) @columns = event.keys @should_retry = false @enable_event_as_json_keyword = opts[:enable_event_as_json_keyword] @event_as_json_keyword = opts[:event_as_json_keyword] @driver = opts[:driver] @opts = opts end def call self.logger.info "Attempting insert into table name: #{table_name}" insert_sql = insert_event_statement insert_stmt = connection.prepareStatement(insert_sql) self.logger.info "Insert statement passed into prepareStatement is: #{insert_stmt}" insert_stmt = add_statement_event_params(insert_stmt, event) self.logger.info "Insert statement to be run is: #{insert_stmt.toString}" insert_stmt.execute self.logger.info "Attempting insert into tracks table" do_tracks_insert should_retry rescue Java::JavaSql::SQLException => e case e.getSQLState() when "42P01" self.logger.info "Received Java::JavaSql::SQLException with error sql state of 42P01, moving to create table" should_retry = create_table when "42703" self.logger.info "Received Java::JavaSql::SQLException with error sql state of 42703, moving to alter table" should_retry = alter_table else raise InsertFailed, "SQLException (Charrington:Insert) #{e.message} #{insert_sql}" end should_retry rescue => e raise InsertFailed, "SQLException (Charrington:Insert) #{e.message} #{insert_sql}" ensure insert_stmt.close unless insert_stmt.nil? cleanup end def do_tracks_insert tracks_sql = insert_tracks_statement tracks_stmt = connection.prepareStatement(tracks_sql) tracks_stmt = add_statement_event_params(tracks_stmt, tracks) self.logger.info "Insert tracks statment to be run: #{tracks_stmt.toString}" tracks_stmt.execute rescue Java::JavaSql::SQLException => e self.logger.error("SQLException (Charrington:Insert) Insert tracks entry failed. #{e.message} #{tracks_sql}") ensure tracks_stmt.close unless tracks_stmt.nil? end private def tracks_columns is_redshift_transform? ? @@redshift_tracks_columns : @@postgres_tracks_columns end def is_redshift? driver == 'redshift' end def is_postgres? driver == 'postgres' end def is_postgres_transform? transformer == 'postgres' end def is_redshift_transform? transformer == 'redshift' end def create_table if is_postgres? Charrington::CreatePostgresTable.call(connection, event, schema, table_name, columns, opts) elsif is_redshift? Charrington::CreateRedshiftTable.call(connection, event, schema, table_name, columns, opts) end end def alter_table if is_postgres? Charrington::AlterPostgresTable.call(connection, event, schema, table_name, columns) elsif is_redshift? Charrington::AlterRedshiftTable.call(connection, event, schema, table_name, columns) end end def cleanup @columns.clear if clearable(@columns) @tracks.clear if clearable(@tracks) end ### Set Variables def create_tracks(event) tracks = event.clone tracks.keys.each do |key| tracks.delete(key) unless tracks_columns.include?(key) end tracks end def columns_text @columns_text ||= arr_to_csv(columns) end def tracks_columns_text @tracks_columns_text ||= arr_to_csv(tracks.keys) end def value_placeholders(columns) ('?' * columns.length).split('') end def placeholder_text(columns) arr_to_csv(value_placeholders(columns)) end def insert_event_statement "INSERT INTO #{schema}#{table_name} #{columns_text} VALUES #{placeholder_text(columns)}" end def insert_tracks_statement "INSERT INTO #{schema}tracks #{tracks_columns_text} VALUES #{placeholder_text(tracks.keys)}" end def add_statement_event_params(stmt, map) values = [] map.keys.each_with_index do |key, idx| pos = idx + 1 value = map[key] values << value if key == 'published_at' || key == 'sent_at' format = java.text.SimpleDateFormat.new("yyyy-MM-dd'T'HH:mm:ss'Z'") parsed = format.parse(value) time = java.sql.Timestamp.new(parsed.getTime) stmt.setTimestamp(pos, time) next end case value when Time stmt.setString(pos, value.strftime(STRFTIME_FMT)) when LogStash::Timestamp stmt.setString(pos, value.time.strftime(STRFTIME_FMT)) when Integer if value > 2147483647 || value < -2147483648 stmt.setLong(pos, value) else stmt.setInt(pos, value) end when BigDecimal stmt.setBigDecimal(pos, value.to_java) when Float stmt.setFloat(pos, value) when String stmt.setString(pos, value[0,254]) # truncate at 254 string characters when Array, Hash stmt.setString(pos, value.to_json) when true, false stmt.setBoolean(pos, value) else stmt.setString(pos, nil) end end stmt end ### Helpers def arr_to_csv(arr) '(' + arr.join(', ') + ')' end def clearable(obj) obj.is_a? Hash or obj.is_a? Array end def underscore(str) str. gsub(/::/, '/'). gsub(/([A-Z]+)([A-Z][a-z])/,'\1_\2'). gsub(/([a-z\d])([A-Z])/,'\1_\2'). downcase. gsub(/[^a-z0-9]+/, "_"). gsub(/\A_+/, ""). gsub(/_+\z/, "") end ### SQL def execute(connection, sql) statement = connection.prepareStatement( sql.gsub(/\s+/, " ").strip ) statement.execute() rescue Java::OrgPostgresqlUtil::PSQLException => e self.logger.error "PSQLException: #{e.message}, with SQL: #{sql}" rescue Java::JavaSql::SQLException => e self.logger.error "Redshift SQLException: #{e.message}, with SQL: #{sql}" ensure statement.close unless statement.nil? end end end