spec/consumer_spec.rb in deimos-ruby-1.24.2 vs spec/consumer_spec.rb in deimos-ruby-2.0.0.pre.alpha1
- old
+ new
@@ -2,37 +2,48 @@
# :nodoc:
# rubocop:disable Metrics/ModuleLength
module ConsumerTest
describe Deimos::Consumer, 'Message Consumer' do
+ let(:use_schema_classes) { false }
+ let(:reraise_errors) { false }
prepend_before(:each) do
# :nodoc:
consumer_class = Class.new(described_class) do
- schema 'MySchema'
- namespace 'com.my-namespace'
- key_config field: 'test_id'
# :nodoc:
- def fatal_error?(_exception, payload, _metadata)
- payload.to_s == 'fatal'
+ def fatal_error?(_exception, messages)
+ messages.payloads.first&.dig(:test_id) == ['fatal']
end
# :nodoc:
- def consume(_payload, _metadata)
- raise 'This should not be called unless call_original is set'
+ def consume_message(message)
+ message.payload
end
end
stub_const('ConsumerTest::MyConsumer', consumer_class)
+ route_usc = use_schema_classes
+ route_rre = reraise_errors
+ Karafka::App.routes.redraw do
+ topic 'my_consume_topic' do
+ schema 'MySchema'
+ namespace 'com.my-namespace'
+ key_config field: 'test_id'
+ consumer consumer_class
+ use_schema_classes route_usc
+ reraise_errors route_rre
+ end
+ end
end
describe 'consume' do
SCHEMA_CLASS_SETTINGS.each do |setting, use_schema_classes|
+ let(:use_schema_classes) { use_schema_classes }
context "with Schema Class consumption #{setting}" do
before(:each) do
Deimos.configure do |config|
- config.schema.use_schema_classes = use_schema_classes
config.schema.use_full_namespace = true
end
end
it 'should consume a message' do
@@ -43,120 +54,102 @@
expect(payload['some_int']).to eq(123)
end
end
it 'should consume a nil message' do
- test_consume_message(MyConsumer, nil) do |payload, _metadata|
- expect(payload).to be_nil
+ test_consume_message(MyConsumer, nil, key: 'foo') do
+ expect(messages).to be_empty
end
end
- it 'should consume a message idempotently' do
- # testing for a crash and re-consuming the same message/metadata
- key = { 'test_id' => 'foo' }
- test_metadata = { key: key }
- allow_any_instance_of(MyConsumer).to(receive(:decode_key)) do |_instance, k|
- k['test_id']
- end
- MyConsumer.new.around_consume({ 'test_id' => 'foo',
- 'some_int' => 123 }, test_metadata) do |_payload, metadata|
- expect(metadata[:key]).to eq('foo')
- end
- MyConsumer.new.around_consume({ 'test_id' => 'foo',
- 'some_int' => 123 }, test_metadata) do |_payload, metadata|
- expect(metadata[:key]).to eq('foo')
- end
- end
-
it 'should consume a message on a topic' do
test_consume_message('my_consume_topic',
{ 'test_id' => 'foo',
'some_int' => 123 }) do |payload, _metadata|
expect(payload['test_id']).to eq('foo')
expect(payload['some_int']).to eq(123)
end
end
it 'should fail on invalid message' do
- test_consume_invalid_message(MyConsumer, { 'invalid' => 'key' })
+ expect { test_consume_message(MyConsumer, { 'invalid' => 'key' }) }.
+ to raise_error(Avro::SchemaValidator::ValidationError)
end
it 'should fail if reraise is false but fatal_error is true' do
- Deimos.configure { |config| config.consumers.reraise_errors = false }
- test_consume_invalid_message(MyConsumer, 'fatal')
+ expect { test_consume_message(MyConsumer, {test_id: 'fatal'}) }.
+ to raise_error(Avro::SchemaValidator::ValidationError)
end
it 'should fail if fatal_error is true globally' do
- Deimos.configure do |config|
- config.consumers.fatal_error = proc { true }
- config.consumers.reraise_errors = false
- end
- test_consume_invalid_message(MyConsumer, { 'invalid' => 'key' })
+ set_karafka_config(:fatal_error, proc { true })
+ expect { test_consume_message(MyConsumer, { 'invalid' => 'key' }) }.
+ to raise_error(Avro::SchemaValidator::ValidationError)
end
it 'should fail on message with extra fields' do
- test_consume_invalid_message(MyConsumer,
+ allow_any_instance_of(Deimos::SchemaBackends::AvroValidation).
+ to receive(:coerce) { |_, m| m.with_indifferent_access }
+ expect { test_consume_message(MyConsumer,
{ 'test_id' => 'foo',
'some_int' => 123,
- 'extra_field' => 'field name' })
+ 'extra_field' => 'field name' }) }.
+ to raise_error(Avro::SchemaValidator::ValidationError)
end
it 'should not fail when before_consume fails without reraising errors' do
- Deimos.configure { |config| config.consumers.reraise_errors = false }
+ set_karafka_config(:reraise_errors, false)
expect {
test_consume_message(
MyConsumer,
{ 'test_id' => 'foo',
- 'some_int' => 123 },
- skip_expectation: true
- ) { raise 'OH NOES' }
+ 'some_int' => 123 }) { raise 'OH NOES' }
}.not_to raise_error
end
it 'should not fail when consume fails without reraising errors' do
- Deimos.configure { |config| config.consumers.reraise_errors = false }
+ set_karafka_config(:reraise_errors, false)
+ allow(Deimos::ProducerMiddleware).to receive(:call) { |m| m[:payload] = m[:payload].to_json; m }
expect {
test_consume_message(
MyConsumer,
- { 'invalid' => 'key' },
- skip_expectation: true
- )
+ { 'invalid' => 'key' })
}.not_to raise_error
end
-
- it 'should call original' do
- expect {
- test_consume_message(MyConsumer,
- { 'test_id' => 'foo', 'some_int' => 123 },
- call_original: true)
- }.to raise_error('This should not be called unless call_original is set')
- end
end
end
context 'with overriden schema classes' do
before(:each) do
+ set_karafka_config(:use_schema_classes, true)
Deimos.configure do |config|
- config.schema.use_schema_classes = true
config.schema.use_full_namespace = true
end
end
prepend_before(:each) do
consumer_class = Class.new(described_class) do
- schema 'MyUpdatedSchema'
- namespace 'com.my-namespace'
- key_config field: 'test_id'
-
# :nodoc:
- def consume(_payload, _metadata)
- raise 'This should not be called unless call_original is set'
+ def consume_message(message)
+ message.payload
end
end
stub_const('ConsumerTest::MyConsumer', consumer_class)
+ Deimos.config.schema.use_schema_classes = true
+ Karafka::App.routes.redraw do
+ topic 'my_consume_topic' do
+ schema 'MyUpdatedSchema'
+ namespace 'com.my-namespace'
+ key_config field: 'test_id'
+ consumer consumer_class
+ end
+ end
end
+ after(:each) do
+ Karafka::App.routes.clear
+ end
it 'should consume messages' do
test_consume_message('my_consume_topic',
{ 'test_id' => 'foo',
'some_int' => 1 }) do |payload, _metadata|
@@ -167,95 +160,8 @@
end
end
end
- describe 'decode_key' do
-
- it 'should use the key field in the value if set' do
- # actual decoding is disabled in test
- expect(MyConsumer.new.decode_key('test_id' => '123')).to eq('123')
- expect { MyConsumer.new.decode_key(123) }.to raise_error(NoMethodError)
- end
-
- it 'should use the key schema if set' do
- consumer_class = Class.new(described_class) do
- schema 'MySchema'
- namespace 'com.my-namespace'
- key_config schema: 'MySchema_key'
- end
- stub_const('ConsumerTest::MySchemaConsumer', consumer_class)
- expect(MyConsumer.new.decode_key('test_id' => '123')).to eq('123')
- expect { MyConsumer.new.decode_key(123) }.to raise_error(NoMethodError)
- end
-
- it 'should not decode if plain is set' do
- consumer_class = Class.new(described_class) do
- schema 'MySchema'
- namespace 'com.my-namespace'
- key_config plain: true
- end
- stub_const('ConsumerTest::MyNonEncodedConsumer', consumer_class)
- expect(MyNonEncodedConsumer.new.decode_key('123')).to eq('123')
- end
-
- it 'should error with nothing set' do
- consumer_class = Class.new(described_class) do
- schema 'MySchema'
- namespace 'com.my-namespace'
- end
- stub_const('ConsumerTest::MyErrorConsumer', consumer_class)
- expect { MyErrorConsumer.new.decode_key('123') }.
- to raise_error('No key config given - if you are not decoding keys, please use `key_config plain: true`')
- end
-
- end
-
- describe 'timestamps' do
- before(:each) do
- # :nodoc:
- consumer_class = Class.new(described_class) do
- schema 'MySchemaWithDateTimes'
- namespace 'com.my-namespace'
- key_config plain: true
-
- # :nodoc:
- def consume(_payload, _metadata)
- raise 'This should not be called unless call_original is set'
- end
- end
- stub_const('ConsumerTest::MyConsumer', consumer_class)
- end
-
- it 'should consume a message' do
- expect(Deimos.config.metrics).to receive(:histogram).twice
- test_consume_message('my_consume_topic',
- { 'test_id' => 'foo',
- 'some_int' => 123,
- 'updated_at' => Time.now.to_i,
- 'timestamp' => 2.minutes.ago.to_s }) do |payload, _metadata|
- expect(payload['test_id']).to eq('foo')
- end
- end
-
- it 'should fail nicely when timestamp wrong format' do
- expect(Deimos.config.metrics).to receive(:histogram).twice
- test_consume_message('my_consume_topic',
- { 'test_id' => 'foo',
- 'some_int' => 123,
- 'updated_at' => Time.now.to_i,
- 'timestamp' => 'dffdf' }) do |payload, _metadata|
- expect(payload['test_id']).to eq('foo')
- end
- test_consume_message('my_consume_topic',
- { 'test_id' => 'foo',
- 'some_int' => 123,
- 'updated_at' => Time.now.to_i,
- 'timestamp' => '' }) do |payload, _metadata|
- expect(payload['test_id']).to eq('foo')
- end
- end
-
- end
end
end
# rubocop:enable Metrics/ModuleLength