lib/message_driver/adapters/bunny_adapter.rb in message-driver-0.5.2 vs lib/message_driver/adapters/bunny_adapter.rb in message-driver-0.5.3

- old
+ new

@@ -300,10 +300,11 @@ super(adapter) @is_transactional = false @rollback_only = false @need_channel_reset = false @in_transaction = false + @require_commit = false end def create_destination(name, dest_options = {}, message_props = {}) dest = case type = dest_options.delete(:type) when :exchange @@ -328,20 +329,20 @@ end @in_transaction = true @in_confirms_transaction = true if options[:type] == :confirm_and_wait end - def commit_transaction(channel_commit = false) - if !in_transaction? && !channel_commit + def commit_transaction + if !in_transaction? && !@require_commit fail MessageDriver::TransactionError, "you can't finish the transaction unless you already in one!" end begin if @in_confirms_transaction wait_for_confirms(@channel) unless @rollback_only else - if is_transactional? && valid? && !@need_channel_reset + if is_transactional? && valid? && !@need_channel_reset && @require_commit handle_errors do if @rollback_only @channel.tx_rollback else @channel.tx_commit @@ -351,10 +352,11 @@ end ensure @rollback_only = false @in_transaction = false @in_confirms_transaction = false + @require_commit = false end end def wait_for_confirms(channel) # FIXME: make the thread-safety of this better once https://github.com/ruby-amqp/bunny/issues/227 is fixed @@ -413,11 +415,11 @@ ch.ack(message.delivery_tag) end end def nack_message(message, options = {}) - requeue = options[:requeue].is_a?(FalseClass) ? false : true + requeue = options.fetch(:requeue, true) with_channel(true) do |ch| ch.reject(message.delivery_tag, requeue) end end @@ -469,28 +471,43 @@ @need_channel_reset = true @rollback_only = true if in_transaction? raise MessageDriver::ConnectionError.new(e.to_s, e) end + def ensure_channel + @channel = adapter.connection.create_channel if @channel.nil? + reset_channel if @need_channel_reset + @channel + end + + def ensure_transactional_channel + ensure_channel + make_channel_transactional + end + + def make_channel_transactional + unless is_transactional? + @channel.tx_select + @is_transactional = true + end + end + def with_channel(require_commit = true) fail MessageDriver::TransactionRollbackOnly if @rollback_only fail MessageDriver::Error, 'this adapter context is not valid!' unless valid? - @channel = adapter.connection.create_channel if @channel.nil? - reset_channel if @need_channel_reset + ensure_channel + @require_commit ||= require_commit if in_transaction? if @in_confirms_transaction @channel.confirm_select unless @channel.using_publisher_confirmations? else - unless is_transactional? - @channel.tx_select - @is_transactional = true - end + make_channel_transactional end end handle_errors do result = yield @channel - commit_transaction(true) if require_commit && is_transactional? && !in_transaction? + commit_transaction if require_commit && is_transactional? && !in_transaction? result end end def args_to_message(delivery_info, properties, payload, destination) @@ -502,9 +519,10 @@ def reset_channel unless @channel.open? @channel = adapter.connection.create_channel @is_transactional = false @rollback_only = true if in_transaction? + @require_commit end @need_channel_reset = false end end