require "spec_helper" describe Rdkafka::Producer do let(:producer) { rdkafka_config.producer } it "should require a topic" do expect { producer.produce( payload: "payload", key: "key" ) }.to raise_error ArgumentError, "missing keyword: topic" end it "should produce a message" do # Produce a message handle = producer.produce( topic: "produce_test_topic", payload: "payload", key: "key" ) # Should be pending at first expect(handle.pending?).to be true # Check delivery handle and report report = handle.wait(5) expect(handle.pending?).to be false expect(report).not_to be_nil expect(report.partition).to eq 1 expect(report.offset).to be >= 0 # Close producer producer.close # Consume message and verify it's content message = wait_for_message( topic: "produce_test_topic", delivery_report: report ) expect(message.partition).to eq 1 expect(message.payload).to eq "payload" expect(message.key).to eq "key" # Since api.version.request is on by default we will get # the message creation timestamp if it's not set. expect(message.timestamp).to be > 1505069891557 end it "should produce a message with a specified partition" do # Produce a message handle = producer.produce( topic: "produce_test_topic", payload: "payload partition", key: "key partition", partition: 1 ) report = handle.wait(5) # Consume message and verify it's content message = wait_for_message( topic: "produce_test_topic", delivery_report: report ) expect(message.partition).to eq 1 expect(message.key).to eq "key partition" end it "should produce a message with utf-8 encoding" do handle = producer.produce( topic: "produce_test_topic", payload: "Τη γλώσσα μου έδωσαν ελληνική", key: "key utf8" ) report = handle.wait(5) # Consume message and verify it's content message = wait_for_message( topic: "produce_test_topic", delivery_report: report ) expect(message.partition).to eq 1 expect(message.payload.force_encoding("utf-8")).to eq "Τη γλώσσα μου έδωσαν ελληνική" expect(message.key).to eq "key utf8" end it "should produce a message with a timestamp" do handle = producer.produce( topic: "produce_test_topic", payload: "payload timestamp", key: "key timestamp", timestamp: 1505069646000 ) report = handle.wait(5) # Consume message and verify it's content message = wait_for_message( topic: "produce_test_topic", delivery_report: report ) expect(message.partition).to eq 2 expect(message.key).to eq "key timestamp" expect(message.timestamp).to eq 1505069646000 end it "should produce a message with nil key" do handle = producer.produce( topic: "produce_test_topic", payload: "payload no key" ) report = handle.wait(5) # Consume message and verify it's content message = wait_for_message( topic: "produce_test_topic", delivery_report: report ) expect(message.key).to be_nil expect(message.payload).to eq "payload no key" end it "should produce a message with nil payload" do handle = producer.produce( topic: "produce_test_topic", key: "key no payload" ) report = handle.wait(5) # Consume message and verify it's content message = wait_for_message( topic: "produce_test_topic", delivery_report: report ) expect(message.key).to eq "key no payload" expect(message.payload).to be_nil end it "should raise an error when producing fails" do expect(Rdkafka::Bindings).to receive(:rd_kafka_producev).and_return(20) expect { producer.produce( topic: "produce_test_topic", key: "key error" ) }.to raise_error Rdkafka::RdkafkaError end it "should raise a timeout error when waiting too long" do handle = producer.produce( topic: "produce_test_topic", payload: "payload timeout", key: "key timeout" ) expect { handle.wait(0) }.to raise_error Rdkafka::Producer::DeliveryHandle::WaitTimeoutError end end