spec/utils/db_producer_spec.rb in deimos-ruby-1.1.0.pre.beta2 vs spec/utils/db_producer_spec.rb in deimos-ruby-1.2.0.pre.beta1
- old
+ new
@@ -91,10 +91,68 @@
expect(phobos_producer).to have_received(:publish_list).with(['A'] * 100).once
expect(phobos_producer).to have_received(:publish_list).with(['A'] * 10).once
expect(phobos_producer).to have_received(:publish_list).with(['A']).once
end
+
+ describe '#compact_messages' do
+ let(:batch) do
+ [
+ {
+ key: 1,
+ topic: 'my-topic',
+ message: 'AAA'
+ },
+ {
+ key: 2,
+ topic: 'my-topic',
+ message: 'BBB'
+ },
+ {
+ key: 1,
+ topic: 'my-topic',
+ message: 'CCC'
+ }
+ ].map { |h| Deimos::KafkaMessage.create!(h) }
+ end
+
+ let(:deduped_batch) { batch[1..2] }
+
+ it 'should dedupe messages when :all is set' do
+ Deimos.configure { |c| c.db_producer.compact_topics = :all }
+ expect(producer.compact_messages(batch)).to eq(deduped_batch)
+ end
+
+ it 'should dedupe messages when topic is included' do
+ Deimos.configure { |c| c.db_producer.compact_topics = %w(my-topic my-topic2) }
+ expect(producer.compact_messages(batch)).to eq(deduped_batch)
+ end
+
+ it 'should not dedupe messages when topic is not included' do
+ Deimos.configure { |c| c.db_producer.compact_topics = %w(my-topic3 my-topic2) }
+ expect(producer.compact_messages(batch)).to eq(batch)
+ end
+
+ it 'should not dedupe messages without keys' do
+ unkeyed_batch = [
+ {
+ key: nil,
+ topic: 'my-topic',
+ message: 'AAA'
+ },
+ {
+ key: nil,
+ topic: 'my-topic',
+ message: 'BBB'
+ }
+ ].map { |h| Deimos::KafkaMessage.create!(h) }
+ Deimos.configure { |c| c.db_producer.compact_topics = :all }
+ expect(producer.compact_messages(unkeyed_batch)).to eq(unkeyed_batch)
+ Deimos.configure { |c| c.db_producer.compact_topics = [] }
+ end
+
+ end
end
describe '#process_topic' do
before(:each) do
producer.id = 'abc'
@@ -117,10 +175,11 @@
end
expect(Deimos::KafkaTopicInfo).to receive(:lock).
with('my-topic', 'abc').and_return(true)
expect(producer).to receive(:retrieve_messages).ordered.
and_return(messages[0..1])
+ expect(producer).to receive(:send_pending_metrics).twice
expect(producer).to receive(:produce_messages).ordered.with([
{
payload: 'mess1',
key: nil,
partition_key: 'key1',
@@ -191,17 +250,71 @@
expect(producer).to receive(:produce_messages).and_raise('OH NOES')
expect(producer).to receive(:retrieve_messages).and_return(messages)
expect(Deimos::KafkaTopicInfo).to receive(:register_error)
expect(Deimos::KafkaMessage.count).to eq(4)
- Deimos.subscribe('db_producer.produce') do |event|
+ subscriber = Deimos.subscribe('db_producer.produce') do |event|
expect(event.payload[:exception_object].message).to eq('OH NOES')
expect(event.payload[:messages]).to eq(messages)
end
producer.process_topic('my-topic')
+ # don't delete for regular errors
+ expect(Deimos::KafkaMessage.count).to eq(4)
+ Deimos.unsubscribe(subscriber)
+ end
+
+ it 'should delete messages on buffer overflow' do
+ messages = (1..4).map do |i|
+ Deimos::KafkaMessage.create!(
+ id: i,
+ topic: 'my-topic',
+ message: "mess#{i}",
+ partition_key: "key#{i}"
+ )
+ end
+
+ expect(Deimos::KafkaTopicInfo).to receive(:lock).
+ with('my-topic', 'abc').and_return(true)
+ expect(producer).to receive(:produce_messages).and_raise(Kafka::BufferOverflow)
+ expect(producer).to receive(:retrieve_messages).and_return(messages)
+ expect(Deimos::KafkaTopicInfo).to receive(:register_error)
+
+ expect(Deimos::KafkaMessage.count).to eq(4)
+ producer.process_topic('my-topic')
expect(Deimos::KafkaMessage.count).to eq(0)
end
+ end
+
+ describe '#send_pending_metrics' do
+ it 'should use the first created_at for each topic' do |example|
+ # sqlite date-time strings don't work correctly
+ next if example.metadata[:db_config][:adapter] == 'sqlite3'
+
+ freeze_time do
+ (1..2).each do |i|
+ Deimos::KafkaMessage.create!(topic: "topic#{i}", message: nil,
+ created_at: (3 + i).minutes.ago)
+ Deimos::KafkaMessage.create!(topic: "topic#{i}", message: nil,
+ created_at: (2 + i).minutes.ago)
+ Deimos::KafkaMessage.create!(topic: "topic#{i}", message: nil,
+ created_at: (1 + i).minute.ago)
+ end
+ allow(Deimos.config.metrics).to receive(:gauge)
+ producer.send_pending_metrics
+ expect(Deimos.config.metrics).to have_received(:gauge).twice
+ expect(Deimos.config.metrics).to have_received(:gauge).
+ with('pending_db_messages_max_wait', 4.minutes.to_i, tags: ['topic:topic1'])
+ expect(Deimos.config.metrics).to have_received(:gauge).
+ with('pending_db_messages_max_wait', 5.minutes.to_i, tags: ['topic:topic2'])
+ end
+ end
+
+ it 'should send 0 if no messages' do
+ expect(Deimos.config.metrics).to receive(:gauge).
+ with('pending_db_messages_max_wait', 0)
+ producer.send_pending_metrics
+ end
end
example 'Full integration test' do
(1..4).each do |i|
(1..2).each do |j|