require 'logstash/devutils/rspec/spec_helper' require 'logstash/outputs/charrington' require 'stud/temporary' require 'java' require 'logstash-output-charrington_test_jars' require 'securerandom' java_import java.util.ArrayList java_import java.nio.file.Paths RSpec::Support::ObjectFormatter.default_instance.max_formatted_output_length = 80000 RSpec.configure do |c| end RSpec.shared_context 'pipeline' do let(:run_pipeline) do pipeline = new_pipeline_from_string(config) pipeline.run end end RSpec.shared_context 'postgres' do def start_database(embedded = true, host='localhost', port=57354, database='winston', user='testuser', password='password') if embedded config = Java::RuYandexQatoolsEmbedPostgresql::EmbeddedPostgres::cachedRuntimeConfig(Paths::get('/tmp/charrington-test-db-cache')) # avoid archive extraction every time db = Java::RuYandexQatoolsEmbedPostgresql::EmbeddedPostgres.new @url = db.start(config, host, port, database, user, password, ArrayList.new(["-E", "SQL_ASCII", "--locale=C", "--lc-collate=C", "--lc-ctype=C"])) else @url = "jdbc:postgresql://#{host}/#{database}?user=#{user}&password=#{password}"; end # setup connection manager @connection_manager = Java::ComZaxxerHikari::HikariDataSource.new @connection_manager.setDriverClassName('org.postgresql.Driver') @connection_manager.setUsername(user) @connection_manager.setPassword(password) @connection_manager.setJdbcUrl(@url) end before(:all) do if ENV.fetch("TEST_ENV", "").to_s.casecmp?("ci") start_database(false, 'postgres', 5432, 'postgres', 'postgres', '') else start_database end end let(:driver_path) { ENV.fetch('DRIVER_JAR_PATH', Pathname.new("#{Dir.pwd}/vendor/jar-dependencies/test-jars/postgresql-42.2.5.jar").to_s) } def query(sql) execute(sql, true) end def create(sql) execute(sql, false) end def insert(sql) execute(sql) end def execute(sql, results=false) conn = @connection_manager.getConnection stmt = conn.prepareStatement(sql); if !results stmt.execute return else rs = stmt.executeQuery() meta = rs.getMetaData() n = meta.getColumnCount() results = [] while rs.next() do row = {} (1..n).each do |i| row[meta.getColumnName(i).to_sym] = rs.getString(i) end results << row end results end rescue => e puts "Error executing query. sql=#{sql} #{e.message}" false ensure stmt.close if !stmt.nil? and !stmt.isClosed conn.close if !conn.nil? and !conn.isClosed end def create_tracks_table(schema = '') sql = <<-SQL CREATE TABLE IF NOT EXISTS #{ schema.empty? ? schema : schema + '.'}tracks ( id VARCHAR(512) NOT NULL CONSTRAINT tracks_pkey PRIMARY KEY, anonymous_id VARCHAR(512), app_name VARCHAR(512), context_campaign_content VARCHAR(512), context_campaign_medium VARCHAR(512), context_campaign_name VARCHAR(512), context_campaign_source VARCHAR(512), context_ip VARCHAR(512), context_library_name VARCHAR(512), context_library_version VARCHAR(512), context_page_path VARCHAR(512), context_page_referrer VARCHAR(512), context_page_search VARCHAR(512), context_page_title VARCHAR(512), context_page_url VARCHAR(512), context_user_agent VARCHAR(512), event VARCHAR(512), event_text VARCHAR(512), original_timestamp TIMESTAMP, received_at TIMESTAMP, segment_dedupe_id VARCHAR(512), sent_at TIMESTAMP, timestamp TIMESTAMP, user_id VARCHAR(512), user_uid VARCHAR(512), uuid bigint, uuid_ts TIMESTAMP DEFAULT ('now'::text)::TIMESTAMP without TIME ZONE ) SQL create(sql) end def drop_table(table) execute_update("DROP TABLE IF EXISTS #{table}") end def create_table(sql) execute_update(sql) end def execute_update(sql) conn = @connection_manager.getConnection stmt = conn.createStatement stmt.executeUpdate(sql) true rescue => e puts "Error executing update. sql=#{sql} #{e.message}" false ensure stmt.close if !stmt.nil? and !stmt.isClosed conn.close if !conn.nil? and !conn.isClosed end end