lib/message_driver/adapters/bunny_adapter.rb in message-driver-0.2.0.rc1 vs lib/message_driver/adapters/bunny_adapter.rb in message-driver-0.2.0.rc2

- old
+ new

@@ -116,10 +116,13 @@ raise MessageDriver::Error, "subscriptions are only supported with QueueDestinations" unless destination.is_a? QueueDestination @sub_ctx = adapter.new_subscription_context(self) @error_handler = options[:error_handler] @sub_ctx.with_channel do |ch| queue = destination.bunny_queue(@sub_ctx.channel) + if options.has_key? :prefetch_size + ch.prefetch(options[:prefetch_size]) + end ack_mode = case options[:ack] when :auto, nil :auto when :manual :manual @@ -143,24 +146,24 @@ consumer.call(message) @sub_ctx.ack_message(message) end end rescue => e - if e.is_a?(DontRequeue) || (options[:retry_redelivered] == false && message.redelivered?) - if [:auto, :transactional].include? ack_mode - @sub_ctx.nack_message(message, requeue: false) + if [:auto, :transactional].include? ack_mode + requeue = true + if e.is_a?(DontRequeue) || (options[:retry_redelivered] == false && message.redelivered?) + requeue = false end - else - if @sub_ctx.valid? && ack_mode == :auto + if @sub_ctx.valid? begin - @sub_ctx.nack_message(message, requeue: true) + @sub_ctx.nack_message(message, requeue: requeue) rescue => e logger.error exception_to_str(e) end end - @error_handler.call(e, message) unless @error_handler.nil? end + @error_handler.call(e, message) unless @error_handler.nil? end end end end end @@ -255,14 +258,16 @@ def commit_transaction(channel_commit=false) raise MessageDriver::TransactionError, "you can't finish the transaction unless you already in one!" if !in_transaction? && !channel_commit begin if is_transactional? && valid? && !@need_channel_reset - if @rollback_only - @channel.tx_rollback - else - @channel.tx_commit + handle_errors do + if @rollback_only + @channel.tx_rollback + else + @channel.tx_commit + end end end ensure @rollback_only = false @in_transaction = false @@ -339,29 +344,40 @@ unless @channel.nil? @channel.close if @channel.open? end end - def with_channel(require_commit=true) - raise MessageDriver::TransactionRollbackOnly if @rollback_only - raise MessageDriver::Error, "oh nos!" if !valid? - @channel = adapter.connection.create_channel if @channel.nil? - reset_channel if @need_channel_reset + def handle_errors begin - result = yield @channel - commit_transaction(true) if require_commit && is_transactional? && !in_transaction? - result + yield rescue Bunny::ChannelLevelException => e @need_channel_reset = true @rollback_only = true if in_transaction? if e.kind_of? Bunny::NotFound raise MessageDriver::QueueNotFound.new(e.to_s, e) else raise MessageDriver::WrappedError.new(e.to_s, e) end + rescue Bunny::ChannelAlreadyClosed => e + @need_channel_reset = true + @rollback_only = true if in_transaction? + raise MessageDriver::WrappedError.new(e.to_s, e) rescue *NETWORK_ERRORS => e + @need_channel_reset = true @rollback_only = true if in_transaction? raise MessageDriver::ConnectionError.new(e.to_s, e) + end + end + + def with_channel(require_commit=true) + raise MessageDriver::TransactionRollbackOnly if @rollback_only + raise MessageDriver::Error, "oh nos!" if !valid? + @channel = adapter.connection.create_channel if @channel.nil? + reset_channel if @need_channel_reset + handle_errors do + result = yield @channel + commit_transaction(true) if require_commit && is_transactional? && !in_transaction? + result end end def args_to_message(delivery_info, properties, payload) Message.new(delivery_info, properties, payload)