spec/utils/db_producer_spec.rb in deimos-kafka-1.0.0.pre.beta19 vs spec/utils/db_producer_spec.rb in deimos-kafka-1.0.0.pre.beta20
- old
+ new
@@ -2,16 +2,20 @@
each_db_config(Deimos::Utils::DbProducer) do
let(:producer) do
producer = described_class.new
allow(producer).to receive(:sleep)
- phobos_producer = instance_double(Phobos::Producer::PublicAPI)
- allow(phobos_producer).to receive(:publish_list)
allow(producer).to receive(:producer).and_return(phobos_producer)
producer
end
+ let(:phobos_producer) do
+ pp = instance_double(Phobos::Producer::PublicAPI)
+ allow(pp).to receive(:publish_list)
+ pp
+ end
+
before(:each) do
stub_const('Deimos::Utils::DbProducer::BATCH_SIZE', 2)
end
specify '#process_next_messages' do
@@ -40,9 +44,52 @@
stub_const('Deimos::Utils::DbProducer::BATCH_SIZE', 5)
producer.current_topic = 'topic1'
messages = producer.retrieve_messages
expect(messages.size).to eq(3)
expect(messages).to all(be_a_kind_of(Deimos::KafkaMessage))
+ end
+
+ describe '#produce_messages' do
+
+ it 'should produce normally' do
+ batch = ['A'] * 1000
+ expect(phobos_producer).to receive(:publish_list).with(batch).once
+ expect(Deimos.config.metrics).to receive(:increment).with('publish',
+ tags: %w(status:success topic:),
+ by: 1000).once
+ producer.produce_messages(batch)
+ end
+
+ it 'should split the batch size on buffer overflow' do
+ count = 0
+ allow(phobos_producer).to receive(:publish_list) do
+ count += 1
+ raise Kafka::BufferOverflow if count < 3
+ end
+ allow(Deimos.config.metrics).to receive(:increment)
+ batch = ['A'] * 1000
+ producer.produce_messages(batch)
+ expect(phobos_producer).to have_received(:publish_list).with(batch)
+ expect(phobos_producer).to have_received(:publish_list).with(['A'] * 100)
+ expect(phobos_producer).to have_received(:publish_list).with(['A'] * 10).exactly(100).times
+ expect(Deimos.config.metrics).to have_received(:increment).with('publish',
+ tags: %w(status:success topic:),
+ by: 10).exactly(100).times
+ end
+
+ it "should raise an error if it can't split any more" do
+ allow(phobos_producer).to receive(:publish_list) do
+ raise Kafka::BufferOverflow
+ end
+ expect(Deimos.config.metrics).not_to receive(:increment)
+ batch = ['A'] * 1000
+ expect { producer.produce_messages(batch) }.to raise_error(Kafka::BufferOverflow)
+ expect(phobos_producer).to have_received(:publish_list).with(batch)
+ expect(phobos_producer).to have_received(:publish_list).with(['A'] * 100).once
+ expect(phobos_producer).to have_received(:publish_list).with(['A'] * 10).once
+ expect(phobos_producer).to have_received(:publish_list).with(['A']).once
+
+ end
end
describe '#process_topic' do
before(:each) do
producer.id = 'abc'