lib/bunny/channel.rb in bunny-1.1.5 vs lib/bunny/channel.rb in bunny-1.1.6

- old
+ new

@@ -156,10 +156,11 @@ attr_reader :nacked_set # @return [Hash<String, Bunny::Consumer>] Consumer instances declared on this channel attr_reader :consumers DEFAULT_CONTENT_TYPE = "application/octet-stream".freeze + SHORTSTR_LIMIT = 255 # @param [Bunny::Session] connection AMQP 0.9.1 connection # @param [Integer] id Channel id, pass nil to make Bunny automatically allocate it # @param [Bunny::ConsumerWorkPool] work_pool Thread pool for delivery processing, by default of size 1 def initialize(connection = nil, id = nil, work_pool = ConsumerWorkPool.new(1)) @@ -514,10 +515,11 @@ # # @return [Bunny::Channel] Self # @api public def basic_publish(payload, exchange, routing_key, opts = {}) raise_if_no_longer_open! + raise ArgumentError, "routing key cannot be longer than #{SHORTSTR_LIMIT} characters" if routing_key && routing_key.size > SHORTSTR_LIMIT exchange_name = if exchange.respond_to?(:name) exchange.name else exchange @@ -1503,10 +1505,20 @@ # @private def increment_recoveries_counter @recoveries_counter.increment end + # @api public + def recover_cancelled_consumers! + @recover_cancelled_consumers = true + end + + # @api public + def recovers_cancelled_consumers? + !!@recover_cancelled_consumers + end + # @endgroup # @return [String] Brief human-readable representation of the channel def to_s @@ -1573,11 +1585,18 @@ @continuations.push(method) when AMQ::Protocol::Basic::Cancel then if consumer = @consumers[method.consumer_tag] @work_pool.submit do begin - @consumers.delete(method.consumer_tag) - consumer.handle_cancellation(method) + if recovers_cancelled_consumers? + consumer.handle_cancellation(method) + @logger.info "Automatically recovering cancelled consumer #{consumer.consumer_tag} on queue #{consumer.queue_name}" + + consume_with(consumer) + else + @consumers.delete(method.consumer_tag) + consumer.handle_cancellation(method) + end rescue Exception => e @logger.error "Got exception when notifying consumer #{method.consumer_tag} about cancellation!" @uncaught_exception_handler.call(e, consumer) if @uncaught_exception_handler end end