spec/spec_helper.rb in rdkafka-0.7.0 vs spec/spec_helper.rb in rdkafka-0.8.0.beta.1
- old
+ new
@@ -5,10 +5,18 @@
require "pry"
require "rspec"
require "rdkafka"
+`docker-compose exec kafka kafka-topics --create --zookeeper zookeeper:2181 --replication-factor 1 --partitions 3 --if-not-exists --topic consume_test_topic`
+`docker-compose exec kafka kafka-topics --create --zookeeper zookeeper:2181 --replication-factor 1 --partitions 3 --if-not-exists --topic empty_test_topic`
+`docker-compose exec kafka kafka-topics --create --zookeeper zookeeper:2181 --replication-factor 1 --partitions 3 --if-not-exists --topic load_test_topic`
+`docker-compose exec kafka kafka-topics --create --zookeeper zookeeper:2181 --replication-factor 1 --partitions 3 --if-not-exists --topic produce_test_topic`
+`docker-compose exec kafka kafka-topics --create --zookeeper zookeeper:2181 --replication-factor 1 --partitions 3 --if-not-exists --topic rake_test_topic`
+`docker-compose exec kafka kafka-topics --create --zookeeper zookeeper:2181 --replication-factor 1 --partitions 3 --if-not-exists --topic watermarks_test_topic`
+`docker-compose exec kafka kafka-topics --create --zookeeper zookeeper:2181 --replication-factor 1 --partitions 25 --if-not-exists --topic partitioner_test_topic`
+
def rdkafka_config(config_overrides={})
config = {
:"api.version.request" => false,
:"broker.version.fallback" => "1.0",
:"bootstrap.servers" => "localhost:9092",
@@ -23,25 +31,26 @@
end
config.merge!(config_overrides)
Rdkafka::Config.new(config)
end
-def native_client
+def new_native_client
config = rdkafka_config
config.send(:native_kafka, config.send(:native_config), :rd_kafka_producer)
end
-def new_native_topic(topic_name="topic_name")
+def new_native_topic(topic_name="topic_name", native_client: )
Rdkafka::Bindings.rd_kafka_topic_new(
native_client,
topic_name,
nil
)
end
def wait_for_message(topic:, delivery_report:, timeout_in_seconds: 30, consumer: nil)
- consumer = rdkafka_config.consumer if consumer.nil?
+ new_consumer = !!consumer
+ consumer ||= rdkafka_config.consumer
consumer.subscribe(topic)
timeout = Time.now.to_i + timeout_in_seconds
loop do
if timeout <= Time.now.to_i
raise "Timeout of #{timeout_in_seconds} seconds reached in wait_for_message"
@@ -51,9 +60,11 @@
message.partition == delivery_report.partition &&
message.offset == delivery_report.offset
return message
end
end
+ensure
+ consumer.close if new_consumer
end
def wait_for_assignment(consumer)
10.times do
break if !consumer.assignment.empty?