spec/spec_helper.rb in rdkafka-0.10.0 vs spec/spec_helper.rb in rdkafka-0.11.0
- old
+ new
@@ -6,31 +6,61 @@
end
require "pry"
require "rspec"
require "rdkafka"
+require "timeout"
-def rdkafka_config(config_overrides={})
- config = {
+def rdkafka_base_config
+ {
:"api.version.request" => false,
:"broker.version.fallback" => "1.0",
:"bootstrap.servers" => "localhost:9092",
- :"group.id" => "ruby-test-#{Random.new.rand(0..1_000_000)}",
- :"auto.offset.reset" => "earliest",
- :"enable.partition.eof" => false
}
+end
+
+def rdkafka_config(config_overrides={})
+ # Generate the base config
+ config = rdkafka_base_config
+ # Merge overrides
+ config.merge!(config_overrides)
+ # Return it
+ Rdkafka::Config.new(config)
+end
+
+def rdkafka_consumer_config(config_overrides={})
+ # Generate the base config
+ config = rdkafka_base_config
+ # Add consumer specific fields to it
+ config[:"auto.offset.reset"] = "earliest"
+ config[:"enable.partition.eof"] = false
+ config[:"group.id"] = "ruby-test-#{Random.new.rand(0..1_000_000)}"
+ # Enable debug mode if required
+ if ENV["DEBUG_CONSUMER"]
+ config[:debug] = "cgrp,topic,fetch"
+ end
+ # Merge overrides
+ config.merge!(config_overrides)
+ # Return it
+ Rdkafka::Config.new(config)
+end
+
+def rdkafka_producer_config(config_overrides={})
+ # Generate the base config
+ config = rdkafka_base_config
+ # Enable debug mode if required
if ENV["DEBUG_PRODUCER"]
config[:debug] = "broker,topic,msg"
- elsif ENV["DEBUG_CONSUMER"]
- config[:debug] = "cgrp,topic,fetch"
end
+ # Merge overrides
config.merge!(config_overrides)
+ # Return it
Rdkafka::Config.new(config)
end
def new_native_client
- config = rdkafka_config
+ config = rdkafka_consumer_config
config.send(:native_kafka, config.send(:native_config), :rd_kafka_producer)
end
def new_native_topic(topic_name="topic_name", native_client: )
Rdkafka::Bindings.rd_kafka_topic_new(
@@ -40,11 +70,11 @@
)
end
def wait_for_message(topic:, delivery_report:, timeout_in_seconds: 30, consumer: nil)
new_consumer = !!consumer
- consumer ||= rdkafka_config.consumer
+ consumer ||= rdkafka_consumer_config.consumer
consumer.subscribe(topic)
timeout = Time.now.to_i + timeout_in_seconds
loop do
if timeout <= Time.now.to_i
raise "Timeout of #{timeout_in_seconds} seconds reached in wait_for_message"
@@ -73,10 +103,13 @@
sleep 1
end
end
RSpec.configure do |config|
+ config.filter_run focus: true
+ config.run_all_when_everything_filtered = true
+
config.before(:suite) do
admin = rdkafka_config.admin
{
consume_test_topic: 3,
empty_test_topic: 3,
@@ -92,7 +125,15 @@
rescue Rdkafka::RdkafkaError => ex
raise unless ex.message.match?(/topic_already_exists/)
end
end
admin.close
+ end
+
+ config.around(:each) do |example|
+ # Timeout specs after a minute. If they take longer
+ # they are probably stuck
+ Timeout::timeout(60) do
+ example.run
+ end
end
end