spec/utils/db_producer_spec.rb in deimos-kafka-1.0.0.pre.beta19 vs spec/utils/db_producer_spec.rb in deimos-kafka-1.0.0.pre.beta20

- old
+ new

@@ -2,16 +2,20 @@ each_db_config(Deimos::Utils::DbProducer) do let(:producer) do producer = described_class.new allow(producer).to receive(:sleep) - phobos_producer = instance_double(Phobos::Producer::PublicAPI) - allow(phobos_producer).to receive(:publish_list) allow(producer).to receive(:producer).and_return(phobos_producer) producer end + let(:phobos_producer) do + pp = instance_double(Phobos::Producer::PublicAPI) + allow(pp).to receive(:publish_list) + pp + end + before(:each) do stub_const('Deimos::Utils::DbProducer::BATCH_SIZE', 2) end specify '#process_next_messages' do @@ -40,9 +44,52 @@ stub_const('Deimos::Utils::DbProducer::BATCH_SIZE', 5) producer.current_topic = 'topic1' messages = producer.retrieve_messages expect(messages.size).to eq(3) expect(messages).to all(be_a_kind_of(Deimos::KafkaMessage)) + end + + describe '#produce_messages' do + + it 'should produce normally' do + batch = ['A'] * 1000 + expect(phobos_producer).to receive(:publish_list).with(batch).once + expect(Deimos.config.metrics).to receive(:increment).with('publish', + tags: %w(status:success topic:), + by: 1000).once + producer.produce_messages(batch) + end + + it 'should split the batch size on buffer overflow' do + count = 0 + allow(phobos_producer).to receive(:publish_list) do + count += 1 + raise Kafka::BufferOverflow if count < 3 + end + allow(Deimos.config.metrics).to receive(:increment) + batch = ['A'] * 1000 + producer.produce_messages(batch) + expect(phobos_producer).to have_received(:publish_list).with(batch) + expect(phobos_producer).to have_received(:publish_list).with(['A'] * 100) + expect(phobos_producer).to have_received(:publish_list).with(['A'] * 10).exactly(100).times + expect(Deimos.config.metrics).to have_received(:increment).with('publish', + tags: %w(status:success topic:), + by: 10).exactly(100).times + end + + it "should raise an error if it can't split any more" do + allow(phobos_producer).to receive(:publish_list) do + raise Kafka::BufferOverflow + end + expect(Deimos.config.metrics).not_to receive(:increment) + batch = ['A'] * 1000 + expect { producer.produce_messages(batch) }.to raise_error(Kafka::BufferOverflow) + expect(phobos_producer).to have_received(:publish_list).with(batch) + expect(phobos_producer).to have_received(:publish_list).with(['A'] * 100).once + expect(phobos_producer).to have_received(:publish_list).with(['A'] * 10).once + expect(phobos_producer).to have_received(:publish_list).with(['A']).once + + end end describe '#process_topic' do before(:each) do producer.id = 'abc'