lib/message_driver/adapters/bunny_adapter.rb in message-driver-0.2.0.rc1 vs lib/message_driver/adapters/bunny_adapter.rb in message-driver-0.2.0.rc2
- old
+ new
@@ -116,10 +116,13 @@
raise MessageDriver::Error, "subscriptions are only supported with QueueDestinations" unless destination.is_a? QueueDestination
@sub_ctx = adapter.new_subscription_context(self)
@error_handler = options[:error_handler]
@sub_ctx.with_channel do |ch|
queue = destination.bunny_queue(@sub_ctx.channel)
+ if options.has_key? :prefetch_size
+ ch.prefetch(options[:prefetch_size])
+ end
ack_mode = case options[:ack]
when :auto, nil
:auto
when :manual
:manual
@@ -143,24 +146,24 @@
consumer.call(message)
@sub_ctx.ack_message(message)
end
end
rescue => e
- if e.is_a?(DontRequeue) || (options[:retry_redelivered] == false && message.redelivered?)
- if [:auto, :transactional].include? ack_mode
- @sub_ctx.nack_message(message, requeue: false)
+ if [:auto, :transactional].include? ack_mode
+ requeue = true
+ if e.is_a?(DontRequeue) || (options[:retry_redelivered] == false && message.redelivered?)
+ requeue = false
end
- else
- if @sub_ctx.valid? && ack_mode == :auto
+ if @sub_ctx.valid?
begin
- @sub_ctx.nack_message(message, requeue: true)
+ @sub_ctx.nack_message(message, requeue: requeue)
rescue => e
logger.error exception_to_str(e)
end
end
- @error_handler.call(e, message) unless @error_handler.nil?
end
+ @error_handler.call(e, message) unless @error_handler.nil?
end
end
end
end
end
@@ -255,14 +258,16 @@
def commit_transaction(channel_commit=false)
raise MessageDriver::TransactionError, "you can't finish the transaction unless you already in one!" if !in_transaction? && !channel_commit
begin
if is_transactional? && valid? && !@need_channel_reset
- if @rollback_only
- @channel.tx_rollback
- else
- @channel.tx_commit
+ handle_errors do
+ if @rollback_only
+ @channel.tx_rollback
+ else
+ @channel.tx_commit
+ end
end
end
ensure
@rollback_only = false
@in_transaction = false
@@ -339,29 +344,40 @@
unless @channel.nil?
@channel.close if @channel.open?
end
end
- def with_channel(require_commit=true)
- raise MessageDriver::TransactionRollbackOnly if @rollback_only
- raise MessageDriver::Error, "oh nos!" if !valid?
- @channel = adapter.connection.create_channel if @channel.nil?
- reset_channel if @need_channel_reset
+ def handle_errors
begin
- result = yield @channel
- commit_transaction(true) if require_commit && is_transactional? && !in_transaction?
- result
+ yield
rescue Bunny::ChannelLevelException => e
@need_channel_reset = true
@rollback_only = true if in_transaction?
if e.kind_of? Bunny::NotFound
raise MessageDriver::QueueNotFound.new(e.to_s, e)
else
raise MessageDriver::WrappedError.new(e.to_s, e)
end
+ rescue Bunny::ChannelAlreadyClosed => e
+ @need_channel_reset = true
+ @rollback_only = true if in_transaction?
+ raise MessageDriver::WrappedError.new(e.to_s, e)
rescue *NETWORK_ERRORS => e
+ @need_channel_reset = true
@rollback_only = true if in_transaction?
raise MessageDriver::ConnectionError.new(e.to_s, e)
+ end
+ end
+
+ def with_channel(require_commit=true)
+ raise MessageDriver::TransactionRollbackOnly if @rollback_only
+ raise MessageDriver::Error, "oh nos!" if !valid?
+ @channel = adapter.connection.create_channel if @channel.nil?
+ reset_channel if @need_channel_reset
+ handle_errors do
+ result = yield @channel
+ commit_transaction(true) if require_commit && is_transactional? && !in_transaction?
+ result
end
end
def args_to_message(delivery_info, properties, payload)
Message.new(delivery_info, properties, payload)