lib/bunny/channel.rb in bunny-0.9.0.pre3 vs lib/bunny/channel.rb in bunny-0.9.0.pre4
- old
+ new
@@ -136,11 +136,11 @@
def reject(delivery_tag, requeue = false)
basic_reject(delivery_tag, requeue)
end
- def ack(delivery_tag, multiple)
+ def ack(delivery_tag, multiple = false)
basic_ack(delivery_tag, multiple)
end
alias acknowledge ack
def nack(delivery_tag, requeue, multiple = false)
@@ -267,22 +267,22 @@
false,
no_ack,
exclusive,
false,
arguments))
+ # helps avoid race condition between basic.consume-ok and basic.deliver if there are messages
+ # in the queue already. MK.
+ if consumer_tag && consumer_tag.strip != AMQ::Protocol::EMPTY_STRING
+ add_consumer(queue_name, consumer_tag, no_ack, exclusive, arguments, &block)
+ end
+
Bunny::Timer.timeout(1, ClientTimeout) do
@last_basic_consume_ok = @continuations.pop
end
+ # covers server-generated consumer tags
+ add_consumer(queue_name, @last_basic_consume_ok.consumer_tag, no_ack, exclusive, arguments, &block)
- @consumer_mutex.synchronize do
- # make sure to use consumer tag from basic.consume-ok in case it was
- # server-generated
- c = Consumer.new(self, queue, @last_basic_consume_ok.consumer_tag, no_ack, exclusive, arguments)
- c.on_delivery(&block) if block
- @consumers[@last_basic_consume_ok.consumer_tag] = c
- end
-
@last_basic_consume_ok
end
def basic_consume_with(consumer)
raise_if_no_longer_open!
@@ -294,19 +294,24 @@
false,
consumer.no_ack,
consumer.exclusive,
false,
consumer.arguments))
+
+ # helps avoid race condition between basic.consume-ok and basic.deliver if there are messages
+ # in the queue already. MK.
+ if consumer.consumer_tag && consumer.consumer_tag.strip != AMQ::Protocol::EMPTY_STRING
+ register_consumer(consumer.consumer_tag, consumer)
+ end
+
Bunny::Timer.timeout(1, ClientTimeout) do
@last_basic_consume_ok = @continuations.pop
end
+ # covers server-generated consumer tags
+ register_consumer(@last_basic_consume_ok.consumer_tag, consumer)
- @consumer_mutex.synchronize do
- # update the tag in case it was server-generated
- consumer.consumer_tag = @last_basic_consume_ok.consumer_tag
- @consumers[@last_basic_consume_ok.consumer_tag] = consumer
- end
+ raise_if_continuation_resulted_in_a_channel_error!
@last_basic_consume_ok
end
def basic_cancel(consumer_tag)
@@ -590,10 +595,24 @@
#
# Implementation
#
+ def register_consumer(consumer_tag, consumer)
+ @consumer_mutex.synchronize do
+ @consumers[consumer_tag] = consumer
+ end
+ end
+
+ def add_consumer(queue, consumer_tag, no_ack, exclusive, arguments, &block)
+ @consumer_mutex.synchronize do
+ c = Consumer.new(self, queue, consumer_tag, no_ack, exclusive, arguments)
+ c.on_delivery(&block) if block
+ @consumers[consumer_tag] = c
+ end
+ end
+
def handle_method(method)
# puts "Channel#handle_frame on channel #{@id}: #{method.inspect}"
case method
when AMQ::Protocol::Queue::DeclareOk then
@continuations.push(method)
@@ -668,10 +687,13 @@
consumer = @consumers[basic_deliver.consumer_tag]
if consumer
@work_pool.submit do
consumer.call(DeliveryInfo.new(basic_deliver), MessageProperties.new(properties), content)
end
+ else
+ # TODO: log it
+ puts "[warning] No consumer for tag #{basic_deliver.consumer_tag}"
end
end
def handle_basic_return(basic_return, properties, content)
x = find_exchange(basic_return.exchange)
@@ -728,10 +750,18 @@
def find_exchange(name)
@exchanges[name]
end
+ # Unique string supposed to be used as a consumer tag.
+ #
+ # @return [String] Unique string.
+ # @api plugin
+ def generate_consumer_tag(name = "bunny")
+ "#{name}-#{Time.now.to_i * 1000}-#{Kernel.rand(999_999_999_999)}"
+ end
+
protected
def closed!
@status = :closed
@work_pool.shutdown
@@ -762,16 +792,8 @@
raise @last_channel_error if @last_channel_error
end
def raise_if_no_longer_open!
raise ChannelAlreadyClosed.new("cannot use a channel that was already closed! Channel id: #{@id}", self) if closed?
- end
-
- # Unique string supposed to be used as a consumer tag.
- #
- # @return [String] Unique string.
- # @api plugin
- def generate_consumer_tag(name = "bunny")
- "#{name}-#{Time.now.to_i * 1000}-#{Kernel.rand(999_999_999_999)}"
end
end
end