lib/message_driver/adapters/bunny_adapter.rb in message-driver-0.5.2 vs lib/message_driver/adapters/bunny_adapter.rb in message-driver-0.5.3
- old
+ new
@@ -300,10 +300,11 @@
super(adapter)
@is_transactional = false
@rollback_only = false
@need_channel_reset = false
@in_transaction = false
+ @require_commit = false
end
def create_destination(name, dest_options = {}, message_props = {})
dest = case type = dest_options.delete(:type)
when :exchange
@@ -328,20 +329,20 @@
end
@in_transaction = true
@in_confirms_transaction = true if options[:type] == :confirm_and_wait
end
- def commit_transaction(channel_commit = false)
- if !in_transaction? && !channel_commit
+ def commit_transaction
+ if !in_transaction? && !@require_commit
fail MessageDriver::TransactionError,
"you can't finish the transaction unless you already in one!"
end
begin
if @in_confirms_transaction
wait_for_confirms(@channel) unless @rollback_only
else
- if is_transactional? && valid? && !@need_channel_reset
+ if is_transactional? && valid? && !@need_channel_reset && @require_commit
handle_errors do
if @rollback_only
@channel.tx_rollback
else
@channel.tx_commit
@@ -351,10 +352,11 @@
end
ensure
@rollback_only = false
@in_transaction = false
@in_confirms_transaction = false
+ @require_commit = false
end
end
def wait_for_confirms(channel)
# FIXME: make the thread-safety of this better once https://github.com/ruby-amqp/bunny/issues/227 is fixed
@@ -413,11 +415,11 @@
ch.ack(message.delivery_tag)
end
end
def nack_message(message, options = {})
- requeue = options[:requeue].is_a?(FalseClass) ? false : true
+ requeue = options.fetch(:requeue, true)
with_channel(true) do |ch|
ch.reject(message.delivery_tag, requeue)
end
end
@@ -469,28 +471,43 @@
@need_channel_reset = true
@rollback_only = true if in_transaction?
raise MessageDriver::ConnectionError.new(e.to_s, e)
end
+ def ensure_channel
+ @channel = adapter.connection.create_channel if @channel.nil?
+ reset_channel if @need_channel_reset
+ @channel
+ end
+
+ def ensure_transactional_channel
+ ensure_channel
+ make_channel_transactional
+ end
+
+ def make_channel_transactional
+ unless is_transactional?
+ @channel.tx_select
+ @is_transactional = true
+ end
+ end
+
def with_channel(require_commit = true)
fail MessageDriver::TransactionRollbackOnly if @rollback_only
fail MessageDriver::Error, 'this adapter context is not valid!' unless valid?
- @channel = adapter.connection.create_channel if @channel.nil?
- reset_channel if @need_channel_reset
+ ensure_channel
+ @require_commit ||= require_commit
if in_transaction?
if @in_confirms_transaction
@channel.confirm_select unless @channel.using_publisher_confirmations?
else
- unless is_transactional?
- @channel.tx_select
- @is_transactional = true
- end
+ make_channel_transactional
end
end
handle_errors do
result = yield @channel
- commit_transaction(true) if require_commit && is_transactional? && !in_transaction?
+ commit_transaction if require_commit && is_transactional? && !in_transaction?
result
end
end
def args_to_message(delivery_info, properties, payload, destination)
@@ -502,9 +519,10 @@
def reset_channel
unless @channel.open?
@channel = adapter.connection.create_channel
@is_transactional = false
@rollback_only = true if in_transaction?
+ @require_commit
end
@need_channel_reset = false
end
end