Sha256: 9be6f072d9efa504fda4a4006220f86c9b1720b8833c3004432c1ac9df5f15eb

Contents?: true

Size: 1.79 KB

Versions: 91

Compression:

Stored size: 1.79 KB

Contents

# frozen_string_literal: true

describe Deimos::KafkaListener do
  include_context 'with widgets'

  prepend_before(:each) do
    producer_class = Class.new(Deimos::Producer) do
      schema 'MySchema'
      namespace 'com.my-namespace'
      topic 'my-topic'
      key_config none: true
    end
    stub_const('MyProducer', producer_class)
  end

  before(:each) do
    Deimos.configure do |c|
      c.producers.backend = :kafka
      c.schema.backend = :avro_local
    end
    allow_any_instance_of(Kafka::Cluster).to receive(:add_target_topics)
    allow_any_instance_of(Kafka::Cluster).to receive(:partitions_for).
      and_raise(Kafka::Error)
  end

  describe '.send_produce_error' do
    let(:payloads) do
      [{ 'test_id' => 'foo', 'some_int' => 123 },
       { 'test_id' => 'bar', 'some_int' => 124 }]
    end

    it 'should listen to publishing errors and republish as Deimos events' do
      allow(Deimos::Producer).to receive(:descendants).and_return([MyProducer])
      Deimos.subscribe('produce_error') do |event|
        expect(event.payload).to include(
          producer: MyProducer,
          topic: 'my-topic',
          payloads: payloads
        )
      end
      expect(Deimos.config.metrics).to receive(:increment).
        with('publish_error', tags: %w(topic:my-topic), by: 2)
      expect { MyProducer.publish_list(payloads) }.to raise_error(Kafka::DeliveryFailed)
    end

    it 'should not send any notifications when producer is not found' do
      Deimos.subscribe('produce_error') do |_|
        raise 'OH NOES'
      end
      allow(Deimos::Producer).to receive(:descendants).and_return([])
      expect(Deimos.config.metrics).not_to receive(:increment).with('publish_error', anything)
      expect { MyProducer.publish_list(payloads) }.to raise_error(Kafka::DeliveryFailed)
    end
  end
end

Version data entries

91 entries across 91 versions & 2 rubygems

Version Path
deimos-ruby-1.24.2 spec/kafka_listener_spec.rb
deimos-ruby-1.24.1 spec/kafka_listener_spec.rb
deimos-ruby-1.24.0 spec/kafka_listener_spec.rb
deimos-ruby-1.23.3 spec/kafka_listener_spec.rb
deimos-ruby-1.23.2 spec/kafka_listener_spec.rb
deimos-ruby-1.23.1.pre.beta6 spec/kafka_listener_spec.rb
deimos-ruby-1.23.1.pre.beta5 spec/kafka_listener_spec.rb
deimos-ruby-1.23.1.pre.beta4 spec/kafka_listener_spec.rb
deimos-ruby-1.23.1.pre.beta3 spec/kafka_listener_spec.rb
deimos-ruby-1.23.1.pre.beta2 spec/kafka_listener_spec.rb
deimos-ruby-1.23.1.pre.beta1 spec/kafka_listener_spec.rb
deimos-ruby-1.23.0 spec/kafka_listener_spec.rb
deimos-ruby-1.22.5 spec/kafka_listener_spec.rb
deimos-ruby-1.22.4 spec/kafka_listener_spec.rb
deimos-ruby-1.22.3 spec/kafka_listener_spec.rb
deimos-ruby-1.22.2 spec/kafka_listener_spec.rb
deimos-ruby-1.22.1 spec/kafka_listener_spec.rb
deimos-ruby-1.22 spec/kafka_listener_spec.rb
deimos-ruby-1.20.1 spec/kafka_listener_spec.rb
deimos-ruby-1.20.0 spec/kafka_listener_spec.rb