spec/integration/bunny/bunny_adapter_spec.rb in message-driver-1.0.0 vs spec/integration/bunny/bunny_adapter_spec.rb in message-driver-1.0.1

- old
+ new

@@ -378,9 +378,85 @@ end.to raise_error MessageDriver::Error, /QueueDestination/ end end end + context 'publisher confirmations in a consumer', :aggregate_failures do + let(:source_queue) { adapter_context.create_destination('subscriptions_example_queue') } + let(:destination) { adapter_context.create_destination('confirms_destination_queue') } + let(:body) { 'Testing the QueueDestination' } + let(:headers) { { 'foo' => 'bar' } } + let(:properties) { { persistent: false } } + let(:error_handler) { double('error_handler', call: nil) } + + let(:subscription) { adapter_context.subscribe(source_queue, {ack: :auto, error_handler: error_handler}, &consumer) } + let(:subscription_channel) { subscription.sub_ctx.channel } + + before do + allow(error_handler).to receive(:call) do |err, _msg| + puts err.inspect + end + destination.purge + source_queue.purge + subscription + allow(subscription_channel).to receive(:wait_for_confirms).and_call_original + end + + after do + subscription.unsubscribe + end + + context 'when messages are sent during the transaction' do + let(:consumer) do + ->(msg) do + MessageDriver::Client.with_message_transaction(type: :confirm_and_wait) do + destination.publish(msg.body, msg.headers, msg.properties) + end + end + end + + it 'publishes and waits for confirmation before "committing" the transaction' do + expect { + source_queue.publish(body, headers, properties) + pause_if_needed + }.to change { destination.message_count }.from(0).to(1) + + expect(subscription_channel).to be_using_publisher_confirms + expect(subscription_channel.unconfirmed_set).to be_empty + + expect(adapter_context.channel).not_to be_using_publisher_confirms + expect(adapter_context.channel.unconfirmed_set).to be_nil + + expect(error_handler).not_to have_received(:call) + expect(subscription_channel).to have_received(:wait_for_confirms) + end + end + + context 'when no messages are sent during the transaction' do + let(:consumer) do + ->(msg) do + MessageDriver::Client.with_message_transaction(type: :confirm_and_wait) do + # do nothing + end + end + end + + it 'publishes and waits for confirmation before "committing" the transaction' do + source_queue.publish(body, headers, properties) + pause_if_needed + + expect(subscription.sub_ctx.channel).not_to be_using_publisher_confirms + expect(subscription.sub_ctx.channel.unconfirmed_set).to be_nil + + expect(adapter_context.channel).not_to be_using_publisher_confirms + expect(adapter_context.channel.unconfirmed_set).to be_nil + + expect(error_handler).not_to have_received(:call) + expect(subscription_channel).not_to have_received(:wait_for_confirms) + end + end + end + context 'during a transaction with a transactional context' do let(:channel) { adapter_context.ensure_channel } let(:destination) { MessageDriver::Client.dynamic_destination('tx.test.queue') } before do adapter_context.ensure_transactional_channel