require "integration_spec_helper" require 'net/http' RSpec.describe NulogyMessageBusProducer::RepopulateReplicationSlots do let(:tenant_id) { SecureRandom.uuid } let(:number_of_messages) { 100 } let(:kafka_bootstrap_servers) { "host.docker.internal:39092" } let(:kafka_connect) { KafkaConnect.new("http://localhost:8083", "ruby_specs") } let(:replication_slot_name) { "rspec_replication_slot" } let(:topic_name) { "repopulate_replication_slot_tests" } before do NulogyMessageBusProducer.config.register_schema( schema: "NulogyMessageBusProducer::Specs::TestSchema", key: "test" ) cleanup_everything end it "generates events" do Kafka.create_topic(topic_name) consumer = Kafka.setup_kafka_consumer(topic_name) without_transaction do create_subscription(event_type: "testCreated", topic_name: topic_name) number_of_messages.times { |n| create_event(uuid(n)) } configure_debezium NulogyMessageBusProducer::RepopulateReplicationSlots.repopulate end message_payloads = Kafka.wait_for_messages(consumer).map(&:payload) matcher = number_of_messages.times.map { |n| include(uuid(n)) } expect(message_payloads.count).to eq(number_of_messages) expect(message_payloads).to match(matcher) end def cleanup_everything truncate_db Kafka.delete_topic(topic_name) rescue nil kafka_connect.delete wait_for_replication_slot_cleanup(replication_slot_name) end def create_event(entity_uuid) trigger_event("testCreated", uuid: entity_uuid, tenant_id: tenant_id) end def configure_debezium config = build_debezium_config response = kafka_connect.configure(config) expect(response.code).to eq("201"), <<~MESSAGE Creating a Debezium config in Kafka Connect has failed. HTTP Request returned: Code: #{response.code} #{JSON.parse(response.body).pretty_inspect} #{config.pretty_inspect} MESSAGE wait_for_tasks_to_start(kafka_connect) wait_for_replication_slot(replication_slot_name) end def wait_for_tasks_to_start(kafka_connect) SpecUtils.wait_for(attempts: 10, interval: 0.5) do tasks = kafka_connect.status[:tasks] next false if tasks.blank? expect(tasks.all? { |task| task[:state] == "RUNNING" }).to eq(true), <<~MESSAGE Expected the Kafka Connect tasks to be running. Instead found: #{tasks.pretty_inspect} MESSAGE end end def build_debezium_config db_config = Rails.configuration.database_configuration[Rails.env] events_table = NulogyMessageBusProducer::PublicSubscriptionEvent.table_name { "bootstrap.servers": kafka_bootstrap_servers, "database.dbname": db_config["database"], "database.hostname": db_config["host"] == "localhost" ? "host.docker.internal" : db_config["host"], "database.password": db_config["password"], "database.port": db_config["port"] || 5432, "database.server.name": "test-environment", "database.user": db_config["username"], "slot.name": replication_slot_name, "behavior.on.null.values": "delete", "connector.class": "io.debezium.connector.postgresql.PostgresConnector", "database.initial.statements": "DO $$BEGIN IF not exists(select from pg_publication where pubname = 'debezium_public_events') THEN CREATE PUBLICATION debezium_public_events FOR TABLE #{events_table} WITH (publish = 'insert');; END IF;; END$$;", "errors.log.enable": "true", "errors.log.include.messages": "true", "heartbeat.interval.ms": "30000", "plugin.name": "pgoutput", "publication.name": "debezium_public_events", "slot.drop.on.stop": "true", "snapshot.mode": "never", "table.whitelist": "public.#{events_table}", "transforms": "requireTopicName,unwrap,extractTopicName,extractPartitionKey,removeFields", "transforms.requireTopicName.type": "io.confluent.connect.transforms.Filter$Value", "transforms.requireTopicName.filter.condition": "$.after.topic_name", "transforms.requireTopicName.filter.type": "include", "transforms.requireTopicName.missing.or.null.behavior": "exclude", "transforms.unwrap.type": "io.debezium.transforms.ExtractNewRecordState", "transforms.unwrap.drop.tombstones": "true", "transforms.extractTopicName.type": "io.confluent.connect.transforms.ExtractTopic$Value", "transforms.extractTopicName.field": "topic_name", "transforms.extractPartitionKey.type": "org.apache.kafka.connect.transforms.ValueToKey", "transforms.extractPartitionKey.fields": "partition_key", "transforms.removeFields.type": "org.apache.kafka.connect.transforms.ReplaceField$Value", "transforms.removeFields.blacklist": "topic_name,partition_key", "key.converter": "org.apache.kafka.connect.json.JsonConverter", "key.converter.schemas.enable": "false", "value.converter": "org.apache.kafka.connect.json.JsonConverter", "value.converter.schemas.enable": "false" } end end