Sha256: d6a7afa62fa46b3678421fac11712c27e099a5d7eb04e708a5a8d14c1e58c4d7
Contents?: true
Size: 1.55 KB
Versions: 3
Compression:
Stored size: 1.55 KB
Contents
# frozen_string_literal: true require 'spec_helper' require 'basquiat/adapters/rabbitmq_adapter' describe 'Requeue Strategies' do let(:adapter) { Basquiat::Adapters::RabbitMq.new } let(:base_options) do { connection: { hosts: [ENV.fetch('BASQUIAT_RABBITMQ_1_PORT_5672_TCP_ADDR') { 'localhost' }], port: ENV.fetch('BASQUIAT_RABBITMQ_1_PORT_5672_TCP_PORT') { 5672 } }, publisher: { persistent: true }, requeue: { enabled: true, strategy: 'basic_ack' } } end before(:each) { adapter.adapter_options(base_options) } after(:each) { remove_queues_and_exchanges(adapter) } describe 'BasickAcknowledge' do it 'certify that BasicAcknowledge is being used' do expect(adapter.strategy.class).to eq Basquiat::Adapters::RabbitMq::BasicAcknowledge end it 'acks a message by default' do adapter.subscribe_to('some.event', ->(_) { 'Everything is AWESOME!' }) adapter.listen(block: false) adapter.publish('some.event', data: 'stupid message') sleep 0.3 # Wait for the listening thread. expect(adapter.session.queue.message_count).to eq(0) expect(adapter.session.queue).to_not have_unacked_messages end it 'support declared acks' do adapter.subscribe_to('some.event', ->(msg) { msg.ack }) adapter.listen(block: false) adapter.publish('some.event', data: 'stupid message') sleep 0.3 # Wait for the listening thread. expect(adapter.session.queue.message_count).to eq(0) expect(adapter.session.queue).to_not have_unacked_messages end end end
Version data entries
3 entries across 3 versions & 1 rubygems