spec/utils/db_producer_spec.rb in deimos-ruby-1.1.0.pre.beta2 vs spec/utils/db_producer_spec.rb in deimos-ruby-1.2.0.pre.beta1

- old
+ new

@@ -91,10 +91,68 @@ expect(phobos_producer).to have_received(:publish_list).with(['A'] * 100).once expect(phobos_producer).to have_received(:publish_list).with(['A'] * 10).once expect(phobos_producer).to have_received(:publish_list).with(['A']).once end + + describe '#compact_messages' do + let(:batch) do + [ + { + key: 1, + topic: 'my-topic', + message: 'AAA' + }, + { + key: 2, + topic: 'my-topic', + message: 'BBB' + }, + { + key: 1, + topic: 'my-topic', + message: 'CCC' + } + ].map { |h| Deimos::KafkaMessage.create!(h) } + end + + let(:deduped_batch) { batch[1..2] } + + it 'should dedupe messages when :all is set' do + Deimos.configure { |c| c.db_producer.compact_topics = :all } + expect(producer.compact_messages(batch)).to eq(deduped_batch) + end + + it 'should dedupe messages when topic is included' do + Deimos.configure { |c| c.db_producer.compact_topics = %w(my-topic my-topic2) } + expect(producer.compact_messages(batch)).to eq(deduped_batch) + end + + it 'should not dedupe messages when topic is not included' do + Deimos.configure { |c| c.db_producer.compact_topics = %w(my-topic3 my-topic2) } + expect(producer.compact_messages(batch)).to eq(batch) + end + + it 'should not dedupe messages without keys' do + unkeyed_batch = [ + { + key: nil, + topic: 'my-topic', + message: 'AAA' + }, + { + key: nil, + topic: 'my-topic', + message: 'BBB' + } + ].map { |h| Deimos::KafkaMessage.create!(h) } + Deimos.configure { |c| c.db_producer.compact_topics = :all } + expect(producer.compact_messages(unkeyed_batch)).to eq(unkeyed_batch) + Deimos.configure { |c| c.db_producer.compact_topics = [] } + end + + end end describe '#process_topic' do before(:each) do producer.id = 'abc' @@ -117,10 +175,11 @@ end expect(Deimos::KafkaTopicInfo).to receive(:lock). with('my-topic', 'abc').and_return(true) expect(producer).to receive(:retrieve_messages).ordered. and_return(messages[0..1]) + expect(producer).to receive(:send_pending_metrics).twice expect(producer).to receive(:produce_messages).ordered.with([ { payload: 'mess1', key: nil, partition_key: 'key1', @@ -191,17 +250,71 @@ expect(producer).to receive(:produce_messages).and_raise('OH NOES') expect(producer).to receive(:retrieve_messages).and_return(messages) expect(Deimos::KafkaTopicInfo).to receive(:register_error) expect(Deimos::KafkaMessage.count).to eq(4) - Deimos.subscribe('db_producer.produce') do |event| + subscriber = Deimos.subscribe('db_producer.produce') do |event| expect(event.payload[:exception_object].message).to eq('OH NOES') expect(event.payload[:messages]).to eq(messages) end producer.process_topic('my-topic') + # don't delete for regular errors + expect(Deimos::KafkaMessage.count).to eq(4) + Deimos.unsubscribe(subscriber) + end + + it 'should delete messages on buffer overflow' do + messages = (1..4).map do |i| + Deimos::KafkaMessage.create!( + id: i, + topic: 'my-topic', + message: "mess#{i}", + partition_key: "key#{i}" + ) + end + + expect(Deimos::KafkaTopicInfo).to receive(:lock). + with('my-topic', 'abc').and_return(true) + expect(producer).to receive(:produce_messages).and_raise(Kafka::BufferOverflow) + expect(producer).to receive(:retrieve_messages).and_return(messages) + expect(Deimos::KafkaTopicInfo).to receive(:register_error) + + expect(Deimos::KafkaMessage.count).to eq(4) + producer.process_topic('my-topic') expect(Deimos::KafkaMessage.count).to eq(0) end + end + + describe '#send_pending_metrics' do + it 'should use the first created_at for each topic' do |example| + # sqlite date-time strings don't work correctly + next if example.metadata[:db_config][:adapter] == 'sqlite3' + + freeze_time do + (1..2).each do |i| + Deimos::KafkaMessage.create!(topic: "topic#{i}", message: nil, + created_at: (3 + i).minutes.ago) + Deimos::KafkaMessage.create!(topic: "topic#{i}", message: nil, + created_at: (2 + i).minutes.ago) + Deimos::KafkaMessage.create!(topic: "topic#{i}", message: nil, + created_at: (1 + i).minute.ago) + end + allow(Deimos.config.metrics).to receive(:gauge) + producer.send_pending_metrics + expect(Deimos.config.metrics).to have_received(:gauge).twice + expect(Deimos.config.metrics).to have_received(:gauge). + with('pending_db_messages_max_wait', 4.minutes.to_i, tags: ['topic:topic1']) + expect(Deimos.config.metrics).to have_received(:gauge). + with('pending_db_messages_max_wait', 5.minutes.to_i, tags: ['topic:topic2']) + end + end + + it 'should send 0 if no messages' do + expect(Deimos.config.metrics).to receive(:gauge). + with('pending_db_messages_max_wait', 0) + producer.send_pending_metrics + end end example 'Full integration test' do (1..4).each do |i| (1..2).each do |j|