spec/rdkafka/consumer_spec.rb in rdkafka-0.13.0.beta.1 vs spec/rdkafka/consumer_spec.rb in rdkafka-0.13.0.beta.2
- old
+ new
@@ -593,23 +593,36 @@
}.to raise_error Rdkafka::RdkafkaError
end
end
describe "#poll with headers" do
- it "should return message with headers" do
+ it "should return message with headers using string keys (when produced with symbol keys)" do
report = producer.produce(
topic: "consume_test_topic",
key: "key headers",
headers: { foo: 'bar' }
).wait
message = wait_for_message(topic: "consume_test_topic", consumer: consumer, delivery_report: report)
expect(message).to be
expect(message.key).to eq('key headers')
- expect(message.headers).to include(foo: 'bar')
+ expect(message.headers).to include('foo' => 'bar')
end
+ it "should return message with headers using string keys (when produced with string keys)" do
+ report = producer.produce(
+ topic: "consume_test_topic",
+ key: "key headers",
+ headers: { 'foo' => 'bar' }
+ ).wait
+
+ message = wait_for_message(topic: "consume_test_topic", consumer: consumer, delivery_report: report)
+ expect(message).to be
+ expect(message.key).to eq('key headers')
+ expect(message.headers).to include('foo' => 'bar')
+ end
+
it "should return message with no headers" do
report = producer.produce(
topic: "consume_test_topic",
key: "key no headers",
headers: nil
@@ -698,11 +711,11 @@
def produce_n(n)
handles = []
n.times do |i|
handles << producer.produce(
topic: topic_name,
- payload: Time.new.to_f.to_s,
+ payload: i % 10 == 0 ? nil : Time.new.to_f.to_s,
key: i.to_s,
partition: 0
)
end
handles.each(&:wait)
@@ -962,22 +975,10 @@
notify_listener(listener)
expect(listener.queue).to eq([:assigned, :revoked])
end
end
-
- def notify_listener(listener)
- # 1. subscribe and poll
- consumer.subscribe("consume_test_topic")
- wait_for_assignment(consumer)
- consumer.poll(100)
-
- # 2. unsubscribe
- consumer.unsubscribe
- wait_for_unassignment(consumer)
- consumer.close
- end
end
context "methods that should not be called after a consumer has been closed" do
before do
consumer.close
@@ -1012,7 +1013,65 @@
expect(consumer.closed?).to eq(false)
consumer.finalizer.call("some-ignored-object-id")
expect(consumer.closed?).to eq(true)
+ end
+
+ context "when the rebalance protocol is cooperative" do
+ let(:consumer) do
+ config = rdkafka_consumer_config(
+ {
+ :"partition.assignment.strategy" => "cooperative-sticky",
+ :"debug" => "consumer",
+ }
+ )
+ config.consumer_rebalance_listener = listener
+ config.consumer
+ end
+
+ let(:listener) do
+ Struct.new(:queue) do
+ def on_partitions_assigned(consumer, list)
+ collect(:assign, list)
+ end
+
+ def on_partitions_revoked(consumer, list)
+ collect(:revoke, list)
+ end
+
+ def collect(name, list)
+ partitions = list.to_h.map { |key, values| [key, values.map(&:partition)] }.flatten
+ queue << ([name] + partitions)
+ end
+ end.new([])
+ end
+
+ it "should be able to assign and unassign partitions using the cooperative partition assignment APIs" do
+ notify_listener(listener) do
+ handles = []
+ 10.times do
+ handles << producer.produce(
+ topic: "consume_test_topic",
+ payload: "payload 1",
+ key: "key 1",
+ partition: 0
+ )
+ end
+ handles.each(&:wait)
+
+ consumer.subscribe("consume_test_topic")
+ # Check the first 10 messages. Then close the consumer, which
+ # should break the each loop.
+ consumer.each_with_index do |message, i|
+ expect(message).to be_a Rdkafka::Consumer::Message
+ break if i == 10
+ end
+ end
+
+ expect(listener.queue).to eq([
+ [:assign, "consume_test_topic", 0, 1, 2],
+ [:revoke, "consume_test_topic", 0, 1, 2]
+ ])
+ end
end
end