spec/rdkafka/consumer_spec.rb in rdkafka-0.13.0.beta.1 vs spec/rdkafka/consumer_spec.rb in rdkafka-0.13.0.beta.2

- old
+ new

@@ -593,23 +593,36 @@ }.to raise_error Rdkafka::RdkafkaError end end describe "#poll with headers" do - it "should return message with headers" do + it "should return message with headers using string keys (when produced with symbol keys)" do report = producer.produce( topic: "consume_test_topic", key: "key headers", headers: { foo: 'bar' } ).wait message = wait_for_message(topic: "consume_test_topic", consumer: consumer, delivery_report: report) expect(message).to be expect(message.key).to eq('key headers') - expect(message.headers).to include(foo: 'bar') + expect(message.headers).to include('foo' => 'bar') end + it "should return message with headers using string keys (when produced with string keys)" do + report = producer.produce( + topic: "consume_test_topic", + key: "key headers", + headers: { 'foo' => 'bar' } + ).wait + + message = wait_for_message(topic: "consume_test_topic", consumer: consumer, delivery_report: report) + expect(message).to be + expect(message.key).to eq('key headers') + expect(message.headers).to include('foo' => 'bar') + end + it "should return message with no headers" do report = producer.produce( topic: "consume_test_topic", key: "key no headers", headers: nil @@ -698,11 +711,11 @@ def produce_n(n) handles = [] n.times do |i| handles << producer.produce( topic: topic_name, - payload: Time.new.to_f.to_s, + payload: i % 10 == 0 ? nil : Time.new.to_f.to_s, key: i.to_s, partition: 0 ) end handles.each(&:wait) @@ -962,22 +975,10 @@ notify_listener(listener) expect(listener.queue).to eq([:assigned, :revoked]) end end - - def notify_listener(listener) - # 1. subscribe and poll - consumer.subscribe("consume_test_topic") - wait_for_assignment(consumer) - consumer.poll(100) - - # 2. unsubscribe - consumer.unsubscribe - wait_for_unassignment(consumer) - consumer.close - end end context "methods that should not be called after a consumer has been closed" do before do consumer.close @@ -1012,7 +1013,65 @@ expect(consumer.closed?).to eq(false) consumer.finalizer.call("some-ignored-object-id") expect(consumer.closed?).to eq(true) + end + + context "when the rebalance protocol is cooperative" do + let(:consumer) do + config = rdkafka_consumer_config( + { + :"partition.assignment.strategy" => "cooperative-sticky", + :"debug" => "consumer", + } + ) + config.consumer_rebalance_listener = listener + config.consumer + end + + let(:listener) do + Struct.new(:queue) do + def on_partitions_assigned(consumer, list) + collect(:assign, list) + end + + def on_partitions_revoked(consumer, list) + collect(:revoke, list) + end + + def collect(name, list) + partitions = list.to_h.map { |key, values| [key, values.map(&:partition)] }.flatten + queue << ([name] + partitions) + end + end.new([]) + end + + it "should be able to assign and unassign partitions using the cooperative partition assignment APIs" do + notify_listener(listener) do + handles = [] + 10.times do + handles << producer.produce( + topic: "consume_test_topic", + payload: "payload 1", + key: "key 1", + partition: 0 + ) + end + handles.each(&:wait) + + consumer.subscribe("consume_test_topic") + # Check the first 10 messages. Then close the consumer, which + # should break the each loop. + consumer.each_with_index do |message, i| + expect(message).to be_a Rdkafka::Consumer::Message + break if i == 10 + end + end + + expect(listener.queue).to eq([ + [:assign, "consume_test_topic", 0, 1, 2], + [:revoke, "consume_test_topic", 0, 1, 2] + ]) + end end end