Sha256: 9f70e16ea087056cb2f9c079e6510b2688432be789a0258328b01b198a6c5a29
Contents?: true
Size: 1.04 KB
Versions: 1
Compression:
Stored size: 1.04 KB
Contents
require "pry" require "rspec" require "rdkafka" def rdkafka_config config = { :"bootstrap.servers" => "localhost:9092", :"group.id" => "ruby_test", :"client.id" => "test", :"auto.offset.reset" => "earliest", :"enable.partition.eof" => false } if ENV["DEBUG_PRODUCER"] config[:debug] = "broker,topic,msg" elsif ENV["DEBUG_CONSUMER"] config[:debug] = "cgrp,topic,fetch" end Rdkafka::Config.new(config) end def native_client config = rdkafka_config config.send(:native_kafka, config.send(:native_config), :rd_kafka_producer) end def wait_for_message(topic:, delivery_report:, timeout_in_seconds: 30) offset = delivery_report.offset - 1 consumer = rdkafka_config.consumer consumer.subscribe(topic) timeout = Time.now.to_i + timeout_in_seconds loop do if timeout <= Time.now.to_i raise "Timeout of #{timeout_in_seconds} seconds reached in wait_for_message" end message = consumer.poll(100) if message && message.offset == offset return message end end ensure consumer.close end
Version data entries
1 entries across 1 versions & 1 rubygems
Version | Path |
---|---|
rdkafka-0.1.10 | spec/spec_helper.rb |