lib/bunny/channel.rb in bunny-0.10.1 vs lib/bunny/channel.rb in bunny-0.10.2

- old
+ new

@@ -841,10 +841,11 @@ # covers server-generated consumer tags add_consumer(queue_name, @last_basic_consume_ok.consumer_tag, no_ack, exclusive, arguments, &block) @last_basic_consume_ok end + alias consume basic_consume # Registers a consumer for queue as {Bunny::Consumer} instance. # # @param [Bunny::Consumer] consumer Consumer to register. It should already have queue name, consumer tag # and other attributes set. @@ -892,10 +893,11 @@ raise_if_continuation_resulted_in_a_channel_error! @last_basic_consume_ok end + alias consume_with basic_consume_with # Removes a consumer. Messages for this consumer will no longer be delivered. If the queue # it was on is auto-deleted and this consumer was the last one, the queue will be deleted. # # @param [String] consumer_tag Consumer tag (unique identifier) to cancel @@ -1536,14 +1538,21 @@ @continuations.push(method) when AMQ::Protocol::Basic::ConsumeOk then @continuations.push(method) when AMQ::Protocol::Basic::Cancel then if consumer = @consumers[method.consumer_tag] - consumer.handle_cancellation(method) + @work_pool.submit do + begin + @consumers.delete(method.consumer_tag) + consumer.handle_cancellation(method) + rescue Exception => e + @logger.error "Got excepton when notifying consumer #{method.consumer_tag} about cancellation!" + end + end + else + @logger.warn "No consumer for tag #{method.consumer_tag} on channel #{@id}!" end - - @consumers.delete(method.consumer_tag) when AMQ::Protocol::Basic::CancelOk then @continuations.push(method) unregister_consumer(method.consumer_tag) when AMQ::Protocol::Tx::SelectOk, AMQ::Protocol::Tx::CommitOk, AMQ::Protocol::Tx::RollbackOk then @continuations.push(method) @@ -1845,15 +1854,15 @@ end # if defined? # @private def guarding_against_stale_delivery_tags(tag, &block) case tag - # if a fixnum was passed, execute unconditionally. MK. + # if a fixnum was passed, execute unconditionally. MK. when Fixnum then block.call - # versioned delivery tags should be checked to avoid - # sending out stale (invalid) tags after channel was reopened - # during network failure recovery. MK. + # versioned delivery tags should be checked to avoid + # sending out stale (invalid) tags after channel was reopened + # during network failure recovery. MK. when VersionedDeliveryTag then if !tag.stale?(@recoveries_counter.get) block.call end end