# frozen_string_literal: true each_db_config(Deimos::Utils::DbProducer) do let(:producer) do producer = described_class.new(logger) allow(producer).to receive(:sleep) allow(producer).to receive(:producer).and_return(phobos_producer) producer end let(:logger) { nil } let(:phobos_producer) do pp = instance_double(Phobos::Producer::PublicAPI) allow(pp).to receive(:publish_list) pp end before(:each) do stub_const('Deimos::Utils::DbProducer::BATCH_SIZE', 2) stub_const('Deimos::Utils::DbProducer::DELETE_BATCH_SIZE', 1) end specify '#process_next_messages' do expect(producer).to receive(:retrieve_topics).and_return(%w(topic1 topic2)) expect(producer).to receive(:process_topic).twice expect(Deimos::KafkaTopicInfo).to receive(:ping_empty_topics).with(%w(topic1 topic2)) expect(producer).to receive(:sleep).with(0.5) producer.process_next_messages end specify '#retrieve_topics' do (1..3).each do |i| Deimos::KafkaMessage.create!(topic: "topic#{i}", key: 'blergkey', message: 'blerg') end expect(producer.retrieve_topics). to contain_exactly('topic1', 'topic2', 'topic3') end specify '#retrieve_messages' do (1..3).each do |i| Deimos::KafkaMessage.create!(topic: 'topic1', message: 'blah', key: "key#{i}") Deimos::KafkaMessage.create!(topic: 'topic2', message: 'blah', key: "key#{i}") end stub_const('Deimos::Utils::DbProducer::BATCH_SIZE', 5) producer.current_topic = 'topic1' messages = producer.retrieve_messages expect(messages.size).to eq(3) expect(messages).to all(be_a_kind_of(Deimos::KafkaMessage)) end describe '#produce_messages' do it 'should produce normally' do batch = ['A'] * 1000 expect(phobos_producer).to receive(:publish_list).with(batch).once expect(Deimos.config.metrics).to receive(:increment).with('publish', tags: %w(status:success topic:), by: 1000).once producer.produce_messages(batch) end it 'should split the batch size on buffer overflow' do class_producer = double(Phobos::Producer::ClassMethods::PublicAPI, # rubocop:disable RSpec/VerifiedDoubles sync_producer_shutdown: nil) allow(producer.class).to receive(:producer).and_return(class_producer) expect(class_producer).to receive(:sync_producer_shutdown).twice count = 0 allow(phobos_producer).to receive(:publish_list) do count += 1 raise Kafka::BufferOverflow if count < 3 end allow(Deimos.config.metrics).to receive(:increment) batch = ['A'] * 1000 producer.produce_messages(batch) expect(phobos_producer).to have_received(:publish_list).with(batch) expect(phobos_producer).to have_received(:publish_list).with(['A'] * 100) expect(phobos_producer).to have_received(:publish_list).with(['A'] * 10).exactly(100).times expect(Deimos.config.metrics).to have_received(:increment).with('publish', tags: %w(status:success topic:), by: 10).exactly(100).times end it "should raise an error if it can't split any more" do allow(phobos_producer).to receive(:publish_list) do raise Kafka::BufferOverflow end expect(Deimos.config.metrics).not_to receive(:increment) batch = ['A'] * 1000 expect { producer.produce_messages(batch) }.to raise_error(Kafka::BufferOverflow) expect(phobos_producer).to have_received(:publish_list).with(batch) expect(phobos_producer).to have_received(:publish_list).with(['A'] * 100).once expect(phobos_producer).to have_received(:publish_list).with(['A'] * 10).once expect(phobos_producer).to have_received(:publish_list).with(['A']).once end it 'should not resend batches of sent messages' do allow(phobos_producer).to receive(:publish_list) do |group| raise Kafka::BufferOverflow if group.any?('A') && group.size >= 1000 raise Kafka::BufferOverflow if group.any?('BIG') && group.size >= 10 end allow(Deimos.config.metrics).to receive(:increment) batch = ['A'] * 450 + ['BIG'] * 550 producer.produce_messages(batch) expect(phobos_producer).to have_received(:publish_list).with(batch) expect(phobos_producer).to have_received(:publish_list).with(['A'] * 100).exactly(4).times expect(phobos_producer).to have_received(:publish_list).with(['A'] * 50 + ['BIG'] * 50) expect(phobos_producer).to have_received(:publish_list).with(['A'] * 10).exactly(5).times expect(phobos_producer).to have_received(:publish_list).with(['BIG'] * 1).exactly(550).times expect(Deimos.config.metrics).to have_received(:increment).with('publish', tags: %w(status:success topic:), by: 100).exactly(4).times expect(Deimos.config.metrics).to have_received(:increment).with('publish', tags: %w(status:success topic:), by: 10).exactly(5).times expect(Deimos.config.metrics).to have_received(:increment).with('publish', tags: %w(status:success topic:), by: 1).exactly(550).times end describe '#compact_messages' do let(:batch) do [ { key: 1, topic: 'my-topic', message: 'AAA' }, { key: 2, topic: 'my-topic', message: 'BBB' }, { key: 1, topic: 'my-topic', message: 'CCC' } ].map { |h| Deimos::KafkaMessage.create!(h) } end let(:deduped_batch) { batch[1..2] } it 'should dedupe messages when :all is set' do Deimos.configure { |c| c.db_producer.compact_topics = :all } expect(producer.compact_messages(batch)).to eq(deduped_batch) end it 'should dedupe messages when topic is included' do Deimos.configure { |c| c.db_producer.compact_topics = %w(my-topic my-topic2) } expect(producer.compact_messages(batch)).to eq(deduped_batch) end it 'should not dedupe messages when topic is not included' do Deimos.configure { |c| c.db_producer.compact_topics = %w(my-topic3 my-topic2) } expect(producer.compact_messages(batch)).to eq(batch) end it 'should not dedupe messages without keys' do unkeyed_batch = [ { key: nil, topic: 'my-topic', message: 'AAA' }, { key: nil, topic: 'my-topic', message: 'BBB' } ].map { |h| Deimos::KafkaMessage.create!(h) } Deimos.configure { |c| c.db_producer.compact_topics = :all } expect(producer.compact_messages(unkeyed_batch)).to eq(unkeyed_batch) Deimos.configure { |c| c.db_producer.compact_topics = [] } end it 'should compact messages when all messages are unique' do Deimos.configure { |c| c.db_producer.compact_topics = %w(my-topic my-topic2) } expect(producer.compact_messages(deduped_batch)).to eq(deduped_batch) end end end describe '#process_topic' do before(:each) do producer.id = 'abc' end it 'should do nothing if lock fails' do expect(Deimos::KafkaTopicInfo).to receive(:lock). with('my-topic', 'abc').and_return(false) expect(producer).not_to receive(:retrieve_messages) producer.process_topic('my-topic') end it 'should complete successfully' do messages = (1..4).map do |i| Deimos::KafkaMessage.new( topic: 'my-topic', message: "mess#{i}", partition_key: "key#{i}" ) end expect(Deimos::KafkaTopicInfo).to receive(:lock). with('my-topic', 'abc').and_return(true) expect(producer).to receive(:retrieve_messages).ordered. and_return(messages[0..1]) expect(producer).to receive(:send_pending_metrics).twice expect(producer).to receive(:produce_messages).ordered.with([ { payload: 'mess1', key: nil, partition_key: 'key1', topic: 'my-topic' }, { payload: 'mess2', key: nil, partition_key: 'key2', topic: 'my-topic' } ]) expect(Deimos.config.metrics).to receive(:increment).ordered.with( 'db_producer.process', tags: %w(topic:my-topic), by: 2 ) expect(producer).to receive(:retrieve_messages).ordered. and_return(messages[2..3]) expect(producer).to receive(:produce_messages).ordered.with([ { payload: 'mess3', partition_key: 'key3', key: nil, topic: 'my-topic' }, { payload: 'mess4', partition_key: 'key4', key: nil, topic: 'my-topic' } ]) expect(Deimos.config.metrics).to receive(:increment).ordered.with( 'db_producer.process', tags: %w(topic:my-topic), by: 2 ) expect(producer).to receive(:retrieve_messages).ordered. and_return([]) expect(Deimos::KafkaTopicInfo).to receive(:heartbeat). with('my-topic', 'abc').twice expect(Deimos::KafkaTopicInfo).to receive(:clear_lock). with('my-topic', 'abc').once producer.process_topic('my-topic') end it 'should register an error if it gets an error' do allow(producer).to receive(:shutdown_producer) expect(producer).to receive(:retrieve_messages).and_raise('OH NOES') expect(Deimos::KafkaTopicInfo).to receive(:register_error). with('my-topic', 'abc') expect(producer).not_to receive(:produce_messages) producer.process_topic('my-topic') expect(producer).to have_received(:shutdown_producer) end it 'should move on if it gets a partial batch' do expect(producer).to receive(:retrieve_messages).ordered. and_return([Deimos::KafkaMessage.new( topic: 'my-topic', message: 'mess1' )]) expect(producer).to receive(:produce_messages).once producer.process_topic('my-topic') end it 'should notify on error' do messages = (1..4).map do |i| Deimos::KafkaMessage.create!( id: i, topic: 'my-topic', message: "mess#{i}", partition_key: "key#{i}" ) end expect(Deimos::KafkaTopicInfo).to receive(:lock). with('my-topic', 'abc').and_return(true) expect(producer).to receive(:produce_messages).and_raise('OH NOES') expect(producer).to receive(:retrieve_messages).and_return(messages) expect(Deimos::KafkaTopicInfo).to receive(:register_error) expect(Deimos::KafkaMessage.count).to eq(4) subscriber = Deimos.subscribe('db_producer.produce') do |event| expect(event.payload[:exception_object].message).to eq('OH NOES') expect(event.payload[:messages]).to eq(messages) end producer.process_topic('my-topic') # don't delete for regular errors expect(Deimos::KafkaMessage.count).to eq(4) Deimos.unsubscribe(subscriber) end it 'should retry deletes and not re-publish' do messages = (1..4).map do |i| Deimos::KafkaMessage.create!( id: i, topic: 'my-topic', message: "mess#{i}", partition_key: "key#{i}" ) end (5..8).each do |i| Deimos::KafkaMessage.create!( id: i, topic: 'my-topic2', message: "mess#{i}", partition_key: "key#{i}" ) end raise_error = true expect(Deimos::KafkaMessage).to receive(:where).exactly(5).times.and_wrap_original do |m, *args| if raise_error raise_error = false raise 'Lock wait timeout' end m.call(*args) end expect(Deimos::KafkaTopicInfo).to receive(:lock). with('my-topic', 'abc').and_return(true) expect(producer).to receive(:retrieve_messages).ordered.and_return(messages) expect(producer).to receive(:retrieve_messages).ordered.and_return([]) expect(phobos_producer).to receive(:publish_list).once.with(messages.map(&:phobos_message)) expect(Deimos::KafkaMessage.count).to eq(8) producer.process_topic('my-topic') expect(Deimos::KafkaMessage.count).to eq(4) end it 'should re-raise misc errors on delete' do messages = (1..3).map do |i| Deimos::KafkaMessage.create!( id: i, topic: 'my-topic', message: "mess#{i}", partition_key: "key#{i}" ) end expect(Deimos::KafkaMessage).to receive(:where).once.and_raise('OH NOES') expect { producer.delete_messages(messages) }.to raise_exception('OH NOES') end context 'with buffer overflow exception' do let(:messages) do (1..4).map do |i| Deimos::KafkaMessage.create!( id: i, key: i, topic: 'my-topic', message: { message: "mess#{i}" }, partition_key: "key#{i}" ) end end let(:logger) do logger = instance_double(Logger) allow(logger).to receive(:error) logger end let(:message_producer) do Deimos.config.schema.backend = :mock Deimos::ActiveRecordProducer.topic('my-topic') Deimos::ActiveRecordProducer.key_config Deimos::ActiveRecordProducer end around(:each) do |example| config = Deimos::ActiveRecordProducer.config.clone backend = Deimos.config.schema.backend example.run ensure Deimos::ActiveRecordProducer.instance_variable_set(:@config, config) Deimos.config.schema.backend = backend end before(:each) do message_producer (5..8).each do |i| Deimos::KafkaMessage.create!( id: i, topic: 'my-topic2', message: "mess#{i}", partition_key: "key#{i}" ) end allow(Deimos::KafkaTopicInfo).to receive(:lock). with('my-topic', 'abc').and_return(true) allow(producer).to receive(:produce_messages).and_raise(Kafka::BufferOverflow) allow(producer).to receive(:retrieve_messages).and_return(messages) allow(Deimos::KafkaTopicInfo).to receive(:register_error) end it 'should delete messages on buffer overflow' do expect(Deimos::KafkaMessage.count).to eq(8) producer.process_topic('my-topic') expect(Deimos::KafkaMessage.count).to eq(4) end it 'should notify on buffer overflow' do subscriber = Deimos.subscribe('db_producer.produce') do |event| expect(event.payload[:exception_object].message).to eq('Kafka::BufferOverflow') expect(event.payload[:messages]).to eq(messages) end producer.process_topic('my-topic') Deimos.unsubscribe(subscriber) expect(logger).to have_received(:error).with('Message batch too large, deleting...') expect(logger).to have_received(:error).with( [ { key: '1', payload: 'payload-decoded' }, { key: '2', payload: 'payload-decoded' }, { key: '3', payload: 'payload-decoded' }, { key: '4', payload: 'payload-decoded' } ] ) end context 'with exception on error logging attempt' do let(:message_producer) do Deimos::ActiveRecordProducer.topic('my-topic') Deimos::ActiveRecordProducer end it 'should notify on buffer overflow disregarding decoding exception' do subscriber = Deimos.subscribe('db_producer.produce') do |event| expect(event.payload[:exception_object].message).to eq('Kafka::BufferOverflow') expect(event.payload[:messages]).to eq(messages) end producer.process_topic('my-topic') Deimos.unsubscribe(subscriber) expect(logger).to have_received(:error).with('Message batch too large, deleting...') expect(logger).to have_received(:error).with( 'Large message details logging failure: '\ 'No key config given - if you are not decoding keys, please use `key_config plain: true`' ) end end end end describe '#send_pending_metrics' do it 'should use the first created_at for each topic' do |example| # sqlite date-time strings don't work correctly next if example.metadata[:db_config][:adapter] == 'sqlite3' freeze_time do (1..2).each do |i| Deimos::KafkaMessage.create!(topic: "topic#{i}", message: nil, created_at: (3 + i).minutes.ago) Deimos::KafkaMessage.create!(topic: "topic#{i}", message: nil, created_at: (2 + i).minutes.ago) Deimos::KafkaMessage.create!(topic: "topic#{i}", message: nil, created_at: (1 + i).minute.ago) end Deimos::KafkaTopicInfo.create!(topic: 'topic1', last_processed_at: 6.minutes.ago) Deimos::KafkaTopicInfo.create!(topic: 'topic2', last_processed_at: 3.minutes.ago) Deimos::KafkaTopicInfo.create!(topic: 'topic3', last_processed_at: 5.minutes.ago) allow(Deimos.config.metrics).to receive(:gauge) producer.send_pending_metrics expect(Deimos.config.metrics).to have_received(:gauge).exactly(6).times # topic1 has the earliest message 4 minutes ago and last processed 6 # minutes ago, so the most amount of time we've seen nothing is 4 minutes expect(Deimos.config.metrics).to have_received(:gauge). with('pending_db_messages_max_wait', 4.minutes.to_i, tags: ['topic:topic1']) # topic2 has earliest message 5 minutes ago and last processed 3 minutes # ago, so we should give it 3 minutes expect(Deimos.config.metrics).to have_received(:gauge). with('pending_db_messages_max_wait', 3.minutes.to_i, tags: ['topic:topic2']) # topic3 has no messages, so should get 0 expect(Deimos.config.metrics).to have_received(:gauge). with('pending_db_messages_max_wait', 0, tags: ['topic:topic3']) expect(Deimos.config.metrics).to have_received(:gauge). with('pending_db_messages_count', 3, tags: ['topic:topic1']) expect(Deimos.config.metrics).to have_received(:gauge). with('pending_db_messages_count', 3, tags: ['topic:topic2']) expect(Deimos.config.metrics).to have_received(:gauge). with('pending_db_messages_count', 0, tags: ['topic:topic3']) end end end example 'Full integration test' do (1..4).each do |i| (1..2).each do |j| Deimos::KafkaMessage.create!(topic: "topic#{j}", message: "mess#{i}", partition_key: "key#{i}") Deimos::KafkaMessage.create!(topic: "topic#{j + 2}", key: "key#{i}", partition_key: "key#{i}", message: "mess#{i}") end end allow(producer).to receive(:produce_messages) producer.process_next_messages expect(Deimos::KafkaTopicInfo.count).to eq(4) topics = Deimos::KafkaTopicInfo.select('distinct topic').map(&:topic) expect(topics).to contain_exactly('topic1', 'topic2', 'topic3', 'topic4') expect(Deimos::KafkaMessage.count).to eq(0) expect(producer).to have_received(:produce_messages).with([ { payload: 'mess1', partition_key: 'key1', key: nil, topic: 'topic1' }, { payload: 'mess2', key: nil, partition_key: 'key2', topic: 'topic1' } ]) expect(producer).to have_received(:produce_messages).with([ { payload: 'mess3', key: nil, partition_key: 'key3', topic: 'topic1' }, { payload: 'mess4', key: nil, partition_key: 'key4', topic: 'topic1' } ]) expect(producer).to have_received(:produce_messages).with([ { payload: 'mess1', key: 'key1', partition_key: 'key1', topic: 'topic3' }, { payload: 'mess2', partition_key: 'key2', key: 'key2', topic: 'topic3' } ]) expect(producer).to have_received(:produce_messages).with([ { payload: 'mess3', key: 'key3', partition_key: 'key3', topic: 'topic3' }, { payload: 'mess4', partition_key: 'key4', key: 'key4', topic: 'topic3' } ]) end end