lib/bunny/channel.rb in bunny-1.1.5 vs lib/bunny/channel.rb in bunny-1.1.6
- old
+ new
@@ -156,10 +156,11 @@
attr_reader :nacked_set
# @return [Hash<String, Bunny::Consumer>] Consumer instances declared on this channel
attr_reader :consumers
DEFAULT_CONTENT_TYPE = "application/octet-stream".freeze
+ SHORTSTR_LIMIT = 255
# @param [Bunny::Session] connection AMQP 0.9.1 connection
# @param [Integer] id Channel id, pass nil to make Bunny automatically allocate it
# @param [Bunny::ConsumerWorkPool] work_pool Thread pool for delivery processing, by default of size 1
def initialize(connection = nil, id = nil, work_pool = ConsumerWorkPool.new(1))
@@ -514,10 +515,11 @@
#
# @return [Bunny::Channel] Self
# @api public
def basic_publish(payload, exchange, routing_key, opts = {})
raise_if_no_longer_open!
+ raise ArgumentError, "routing key cannot be longer than #{SHORTSTR_LIMIT} characters" if routing_key && routing_key.size > SHORTSTR_LIMIT
exchange_name = if exchange.respond_to?(:name)
exchange.name
else
exchange
@@ -1503,10 +1505,20 @@
# @private
def increment_recoveries_counter
@recoveries_counter.increment
end
+ # @api public
+ def recover_cancelled_consumers!
+ @recover_cancelled_consumers = true
+ end
+
+ # @api public
+ def recovers_cancelled_consumers?
+ !!@recover_cancelled_consumers
+ end
+
# @endgroup
# @return [String] Brief human-readable representation of the channel
def to_s
@@ -1573,11 +1585,18 @@
@continuations.push(method)
when AMQ::Protocol::Basic::Cancel then
if consumer = @consumers[method.consumer_tag]
@work_pool.submit do
begin
- @consumers.delete(method.consumer_tag)
- consumer.handle_cancellation(method)
+ if recovers_cancelled_consumers?
+ consumer.handle_cancellation(method)
+ @logger.info "Automatically recovering cancelled consumer #{consumer.consumer_tag} on queue #{consumer.queue_name}"
+
+ consume_with(consumer)
+ else
+ @consumers.delete(method.consumer_tag)
+ consumer.handle_cancellation(method)
+ end
rescue Exception => e
@logger.error "Got exception when notifying consumer #{method.consumer_tag} about cancellation!"
@uncaught_exception_handler.call(e, consumer) if @uncaught_exception_handler
end
end