spec/integration/bunny/bunny_adapter_spec.rb in message-driver-1.0.0 vs spec/integration/bunny/bunny_adapter_spec.rb in message-driver-1.0.1
- old
+ new
@@ -378,9 +378,85 @@
end.to raise_error MessageDriver::Error, /QueueDestination/
end
end
end
+ context 'publisher confirmations in a consumer', :aggregate_failures do
+ let(:source_queue) { adapter_context.create_destination('subscriptions_example_queue') }
+ let(:destination) { adapter_context.create_destination('confirms_destination_queue') }
+ let(:body) { 'Testing the QueueDestination' }
+ let(:headers) { { 'foo' => 'bar' } }
+ let(:properties) { { persistent: false } }
+ let(:error_handler) { double('error_handler', call: nil) }
+
+ let(:subscription) { adapter_context.subscribe(source_queue, {ack: :auto, error_handler: error_handler}, &consumer) }
+ let(:subscription_channel) { subscription.sub_ctx.channel }
+
+ before do
+ allow(error_handler).to receive(:call) do |err, _msg|
+ puts err.inspect
+ end
+ destination.purge
+ source_queue.purge
+ subscription
+ allow(subscription_channel).to receive(:wait_for_confirms).and_call_original
+ end
+
+ after do
+ subscription.unsubscribe
+ end
+
+ context 'when messages are sent during the transaction' do
+ let(:consumer) do
+ ->(msg) do
+ MessageDriver::Client.with_message_transaction(type: :confirm_and_wait) do
+ destination.publish(msg.body, msg.headers, msg.properties)
+ end
+ end
+ end
+
+ it 'publishes and waits for confirmation before "committing" the transaction' do
+ expect {
+ source_queue.publish(body, headers, properties)
+ pause_if_needed
+ }.to change { destination.message_count }.from(0).to(1)
+
+ expect(subscription_channel).to be_using_publisher_confirms
+ expect(subscription_channel.unconfirmed_set).to be_empty
+
+ expect(adapter_context.channel).not_to be_using_publisher_confirms
+ expect(adapter_context.channel.unconfirmed_set).to be_nil
+
+ expect(error_handler).not_to have_received(:call)
+ expect(subscription_channel).to have_received(:wait_for_confirms)
+ end
+ end
+
+ context 'when no messages are sent during the transaction' do
+ let(:consumer) do
+ ->(msg) do
+ MessageDriver::Client.with_message_transaction(type: :confirm_and_wait) do
+ # do nothing
+ end
+ end
+ end
+
+ it 'publishes and waits for confirmation before "committing" the transaction' do
+ source_queue.publish(body, headers, properties)
+ pause_if_needed
+
+ expect(subscription.sub_ctx.channel).not_to be_using_publisher_confirms
+ expect(subscription.sub_ctx.channel.unconfirmed_set).to be_nil
+
+ expect(adapter_context.channel).not_to be_using_publisher_confirms
+ expect(adapter_context.channel.unconfirmed_set).to be_nil
+
+ expect(error_handler).not_to have_received(:call)
+ expect(subscription_channel).not_to have_received(:wait_for_confirms)
+ end
+ end
+ end
+
context 'during a transaction with a transactional context' do
let(:channel) { adapter_context.ensure_channel }
let(:destination) { MessageDriver::Client.dynamic_destination('tx.test.queue') }
before do
adapter_context.ensure_transactional_channel