lib/bunny/channel.rb in bunny-0.10.1 vs lib/bunny/channel.rb in bunny-0.10.2
- old
+ new
@@ -841,10 +841,11 @@
# covers server-generated consumer tags
add_consumer(queue_name, @last_basic_consume_ok.consumer_tag, no_ack, exclusive, arguments, &block)
@last_basic_consume_ok
end
+ alias consume basic_consume
# Registers a consumer for queue as {Bunny::Consumer} instance.
#
# @param [Bunny::Consumer] consumer Consumer to register. It should already have queue name, consumer tag
# and other attributes set.
@@ -892,10 +893,11 @@
raise_if_continuation_resulted_in_a_channel_error!
@last_basic_consume_ok
end
+ alias consume_with basic_consume_with
# Removes a consumer. Messages for this consumer will no longer be delivered. If the queue
# it was on is auto-deleted and this consumer was the last one, the queue will be deleted.
#
# @param [String] consumer_tag Consumer tag (unique identifier) to cancel
@@ -1536,14 +1538,21 @@
@continuations.push(method)
when AMQ::Protocol::Basic::ConsumeOk then
@continuations.push(method)
when AMQ::Protocol::Basic::Cancel then
if consumer = @consumers[method.consumer_tag]
- consumer.handle_cancellation(method)
+ @work_pool.submit do
+ begin
+ @consumers.delete(method.consumer_tag)
+ consumer.handle_cancellation(method)
+ rescue Exception => e
+ @logger.error "Got excepton when notifying consumer #{method.consumer_tag} about cancellation!"
+ end
+ end
+ else
+ @logger.warn "No consumer for tag #{method.consumer_tag} on channel #{@id}!"
end
-
- @consumers.delete(method.consumer_tag)
when AMQ::Protocol::Basic::CancelOk then
@continuations.push(method)
unregister_consumer(method.consumer_tag)
when AMQ::Protocol::Tx::SelectOk, AMQ::Protocol::Tx::CommitOk, AMQ::Protocol::Tx::RollbackOk then
@continuations.push(method)
@@ -1845,15 +1854,15 @@
end # if defined?
# @private
def guarding_against_stale_delivery_tags(tag, &block)
case tag
- # if a fixnum was passed, execute unconditionally. MK.
+ # if a fixnum was passed, execute unconditionally. MK.
when Fixnum then
block.call
- # versioned delivery tags should be checked to avoid
- # sending out stale (invalid) tags after channel was reopened
- # during network failure recovery. MK.
+ # versioned delivery tags should be checked to avoid
+ # sending out stale (invalid) tags after channel was reopened
+ # during network failure recovery. MK.
when VersionedDeliveryTag then
if !tag.stale?(@recoveries_counter.get)
block.call
end
end