# frozen_string_literal: true require 'logstash/devutils/rspec/spec_helper' require 'logstash/outputs/charrington' require 'stud/temporary' require 'java' require 'logstash_output_charrington_test_jars' require 'securerandom' java_import java.util.ArrayList java_import java.nio.file.Paths RSpec::Support::ObjectFormatter.default_instance.max_formatted_output_length = 80000 RSpec.shared_context 'pipeline' do let(:run_pipeline) do pipeline = new_pipeline_from_string(config) pipeline.run end end RSpec.shared_context 'postgres' do # rubocop:disable Metrics/BlockLength def start_database(host, _port, database, user, password) @url = "jdbc:postgresql://#{host}/#{database}?user=#{user}&password=#{password}" # setup connection manager @connection_manager = Java::ComZaxxerHikari::HikariDataSource.new @connection_manager.setDriverClassName('org.postgresql.Driver') @connection_manager.setUsername(user) @connection_manager.setPassword(password) @connection_manager.setJdbcUrl(@url) end before(:all) do database_host = ENV.fetch('DATABASE_HOST', 'postgres') database_port = ENV.fetch('DATABASE_PORT', 5432) database_name = ENV.fetch('DATABASE_NAME', 'winston') database_user = ENV.fetch('DATABASE_USER', 'postgres') database_password = ENV.fetch('DATABASE_PASSWORD', 'postgres') start_database(database_host, database_port, database_name, database_user, database_password) end let(:driver_path) do ENV.fetch('DRIVER_JAR_PATH', Pathname.new("#{Dir.pwd}/vendor/jar-dependencies/test-jars/postgresql-42.2.5.jar").to_s) end def query(sql) execute(sql, results: true) end def create(sql) execute(sql) end def insert(sql) execute(sql) end def execute(sql, results: false) conn = @connection_manager.getConnection stmt = conn.prepareStatement(sql) if !results stmt.execute nil else rs = stmt.execute_query meta = rs.getMetaData n = meta.getColumnCount results = [] while rs.next row = {} (1..n).each do |i| row[meta.getColumnName(i).to_sym] = rs.getString(i) end results << row end results end rescue StandardError => e puts "Error executing query. sql=#{sql} #{e.message}" false ensure stmt.close if !stmt.nil? && !stmt.isClosed conn.close if !conn.nil? && !conn.isClosed end def create_tracks_table(schema = '') sql = <<-SQL CREATE TABLE IF NOT EXISTS #{schema.empty? ? schema : "#{schema}."}tracks ( id VARCHAR(512) NOT NULL CONSTRAINT tracks_pkey PRIMARY KEY, action VARCHAR(512), anonymous_id VARCHAR(512), app_name VARCHAR(512), context_campaign_content VARCHAR(512), context_campaign_medium VARCHAR(512), context_campaign_name VARCHAR(512), context_campaign_source VARCHAR(512), context_ip VARCHAR(512), context_library_name VARCHAR(512), context_library_version VARCHAR(512), context_page_path VARCHAR(512), context_page_referrer VARCHAR(512), context_page_search VARCHAR(512), context_page_title VARCHAR(512), context_page_url VARCHAR(512), context_user_agent VARCHAR(512), event VARCHAR(512), event_text VARCHAR(512), original_timestamp TIMESTAMP, received_at TIMESTAMP, segment_dedupe_id VARCHAR(512), sent_at TIMESTAMP, timestamp TIMESTAMP, user_id VARCHAR(512), user_uid VARCHAR(512), uuid bigint, uuid_ts TIMESTAMP DEFAULT ('now'::text)::TIMESTAMP without TIME ZONE ) SQL create(sql) end def drop_table(table) execute_update("DROP TABLE IF EXISTS #{table}") end def create_table(sql) execute_update(sql) end def execute_update(sql) conn = @connection_manager.getConnection stmt = conn.createStatement stmt.executeUpdate(sql) true rescue StandardError => e puts "Error executing update. sql=#{sql} #{e.message}" false ensure stmt.close if !stmt.nil? && !stmt.isClosed conn.close if !conn.nil? && !conn.isClosed end end