spec/spec_helper.rb in rdkafka-0.7.0 vs spec/spec_helper.rb in rdkafka-0.8.0.beta.1

- old
+ new

@@ -5,10 +5,18 @@ require "pry" require "rspec" require "rdkafka" +`docker-compose exec kafka kafka-topics --create --zookeeper zookeeper:2181 --replication-factor 1 --partitions 3 --if-not-exists --topic consume_test_topic` +`docker-compose exec kafka kafka-topics --create --zookeeper zookeeper:2181 --replication-factor 1 --partitions 3 --if-not-exists --topic empty_test_topic` +`docker-compose exec kafka kafka-topics --create --zookeeper zookeeper:2181 --replication-factor 1 --partitions 3 --if-not-exists --topic load_test_topic` +`docker-compose exec kafka kafka-topics --create --zookeeper zookeeper:2181 --replication-factor 1 --partitions 3 --if-not-exists --topic produce_test_topic` +`docker-compose exec kafka kafka-topics --create --zookeeper zookeeper:2181 --replication-factor 1 --partitions 3 --if-not-exists --topic rake_test_topic` +`docker-compose exec kafka kafka-topics --create --zookeeper zookeeper:2181 --replication-factor 1 --partitions 3 --if-not-exists --topic watermarks_test_topic` +`docker-compose exec kafka kafka-topics --create --zookeeper zookeeper:2181 --replication-factor 1 --partitions 25 --if-not-exists --topic partitioner_test_topic` + def rdkafka_config(config_overrides={}) config = { :"api.version.request" => false, :"broker.version.fallback" => "1.0", :"bootstrap.servers" => "localhost:9092", @@ -23,25 +31,26 @@ end config.merge!(config_overrides) Rdkafka::Config.new(config) end -def native_client +def new_native_client config = rdkafka_config config.send(:native_kafka, config.send(:native_config), :rd_kafka_producer) end -def new_native_topic(topic_name="topic_name") +def new_native_topic(topic_name="topic_name", native_client: ) Rdkafka::Bindings.rd_kafka_topic_new( native_client, topic_name, nil ) end def wait_for_message(topic:, delivery_report:, timeout_in_seconds: 30, consumer: nil) - consumer = rdkafka_config.consumer if consumer.nil? + new_consumer = !!consumer + consumer ||= rdkafka_config.consumer consumer.subscribe(topic) timeout = Time.now.to_i + timeout_in_seconds loop do if timeout <= Time.now.to_i raise "Timeout of #{timeout_in_seconds} seconds reached in wait_for_message" @@ -51,9 +60,11 @@ message.partition == delivery_report.partition && message.offset == delivery_report.offset return message end end +ensure + consumer.close if new_consumer end def wait_for_assignment(consumer) 10.times do break if !consumer.assignment.empty?