Sha256: 4c240510384c8af579c126c3aae0a657ab64e32dae1ef12c3e072cc80d21e086

Contents?: true

Size: 1.32 KB

Versions: 1

Compression:

Stored size: 1.32 KB

Contents

require 'spec_helper'
require 'basquiat/adapters/rabbitmq_adapter'

describe 'Requeue Strategies' do
  let(:adapter) { Basquiat::Adapters::RabbitMq.new }
  let(:base_options) do
    { servers:   [{ host: ENV.fetch('BASQUIAT_RABBITMQ_1_PORT_5672_TCP_ADDR') { 'localhost' },
                    port: ENV.fetch('BASQUIAT_RABBITMQ_1_PORT_5672_TCP_PORT') { 5672 } }],
      publisher: { persistent: true } }
  end

  before(:each) { adapter.adapter_options(base_options) }
  after(:each) { remove_queues_and_exchanges(adapter) }

  describe 'BasickAcknowledge (aka the default)' do
    it 'acks a message by default' do
      adapter.subscribe_to('some.event', ->(_) { 'Everything is AWESOME!' })
      adapter.listen(block: false)

      adapter.publish('some.event', data: 'stupid message')
      sleep 0.7 # Wait for the listening thread.

      expect(adapter.session.queue.message_count).to eq(0)
      expect(adapter.session.queue).to_not have_unacked_messages
    end

    it 'support declared acks' do
      adapter.subscribe_to('some.event', ->(msg) { msg.ack })
      adapter.listen(block: false)

      adapter.publish('some.event', data: 'stupid message')
      sleep 0.7 # Wait for the listening thread.

      expect(adapter.session.queue.message_count).to eq(0)
      expect(adapter.session.queue).to_not have_unacked_messages
    end
  end
end

Version data entries

1 entries across 1 versions & 1 rubygems

Version Path
basquiat-1.2.0 spec/lib/adapters/rabbitmq/requeue_strategies/basic_acknowledge_spec.rb