spec/spec_helper.rb in rdkafka-0.3.0 vs spec/spec_helper.rb in rdkafka-0.3.1
- old
+ new
@@ -1,10 +1,15 @@
+require "simplecov"
+SimpleCov.start do
+ add_filter "/spec/"
+end
+
require "pry"
require "rspec"
require "rdkafka"
-def rdkafka_config
+def rdkafka_config(config_overrides={})
config = {
:"bootstrap.servers" => "localhost:9092",
:"group.id" => "ruby-test-#{Random.new.rand(0..1_000_000)}",
:"auto.offset.reset" => "earliest",
:"enable.partition.eof" => false
@@ -12,18 +17,27 @@
if ENV["DEBUG_PRODUCER"]
config[:debug] = "broker,topic,msg"
elsif ENV["DEBUG_CONSUMER"]
config[:debug] = "cgrp,topic,fetch"
end
+ config.merge!(config_overrides)
Rdkafka::Config.new(config)
end
def native_client
config = rdkafka_config
config.send(:native_kafka, config.send(:native_config), :rd_kafka_producer)
end
+def new_native_topic(topic_name="topic_name")
+ Rdkafka::Bindings.rd_kafka_topic_new(
+ native_client,
+ topic_name,
+ nil
+ )
+end
+
def wait_for_message(topic:, delivery_report:, timeout_in_seconds: 30, config: nil)
config = rdkafka_config if config.nil?
consumer = config.consumer
consumer.subscribe(topic)
timeout = Time.now.to_i + timeout_in_seconds
@@ -32,10 +46,10 @@
raise "Timeout of #{timeout_in_seconds} seconds reached in wait_for_message"
end
message = consumer.poll(100)
if message &&
message.partition == delivery_report.partition &&
- message.offset == delivery_report.offset - 1
+ message.offset == delivery_report.offset
return message
end
end
ensure
consumer.commit