lib/bunny/channel.rb in bunny-0.9.0.pre3 vs lib/bunny/channel.rb in bunny-0.9.0.pre4

- old
+ new

@@ -136,11 +136,11 @@ def reject(delivery_tag, requeue = false) basic_reject(delivery_tag, requeue) end - def ack(delivery_tag, multiple) + def ack(delivery_tag, multiple = false) basic_ack(delivery_tag, multiple) end alias acknowledge ack def nack(delivery_tag, requeue, multiple = false) @@ -267,22 +267,22 @@ false, no_ack, exclusive, false, arguments)) + # helps avoid race condition between basic.consume-ok and basic.deliver if there are messages + # in the queue already. MK. + if consumer_tag && consumer_tag.strip != AMQ::Protocol::EMPTY_STRING + add_consumer(queue_name, consumer_tag, no_ack, exclusive, arguments, &block) + end + Bunny::Timer.timeout(1, ClientTimeout) do @last_basic_consume_ok = @continuations.pop end + # covers server-generated consumer tags + add_consumer(queue_name, @last_basic_consume_ok.consumer_tag, no_ack, exclusive, arguments, &block) - @consumer_mutex.synchronize do - # make sure to use consumer tag from basic.consume-ok in case it was - # server-generated - c = Consumer.new(self, queue, @last_basic_consume_ok.consumer_tag, no_ack, exclusive, arguments) - c.on_delivery(&block) if block - @consumers[@last_basic_consume_ok.consumer_tag] = c - end - @last_basic_consume_ok end def basic_consume_with(consumer) raise_if_no_longer_open! @@ -294,19 +294,24 @@ false, consumer.no_ack, consumer.exclusive, false, consumer.arguments)) + + # helps avoid race condition between basic.consume-ok and basic.deliver if there are messages + # in the queue already. MK. + if consumer.consumer_tag && consumer.consumer_tag.strip != AMQ::Protocol::EMPTY_STRING + register_consumer(consumer.consumer_tag, consumer) + end + Bunny::Timer.timeout(1, ClientTimeout) do @last_basic_consume_ok = @continuations.pop end + # covers server-generated consumer tags + register_consumer(@last_basic_consume_ok.consumer_tag, consumer) - @consumer_mutex.synchronize do - # update the tag in case it was server-generated - consumer.consumer_tag = @last_basic_consume_ok.consumer_tag - @consumers[@last_basic_consume_ok.consumer_tag] = consumer - end + raise_if_continuation_resulted_in_a_channel_error! @last_basic_consume_ok end def basic_cancel(consumer_tag) @@ -590,10 +595,24 @@ # # Implementation # + def register_consumer(consumer_tag, consumer) + @consumer_mutex.synchronize do + @consumers[consumer_tag] = consumer + end + end + + def add_consumer(queue, consumer_tag, no_ack, exclusive, arguments, &block) + @consumer_mutex.synchronize do + c = Consumer.new(self, queue, consumer_tag, no_ack, exclusive, arguments) + c.on_delivery(&block) if block + @consumers[consumer_tag] = c + end + end + def handle_method(method) # puts "Channel#handle_frame on channel #{@id}: #{method.inspect}" case method when AMQ::Protocol::Queue::DeclareOk then @continuations.push(method) @@ -668,10 +687,13 @@ consumer = @consumers[basic_deliver.consumer_tag] if consumer @work_pool.submit do consumer.call(DeliveryInfo.new(basic_deliver), MessageProperties.new(properties), content) end + else + # TODO: log it + puts "[warning] No consumer for tag #{basic_deliver.consumer_tag}" end end def handle_basic_return(basic_return, properties, content) x = find_exchange(basic_return.exchange) @@ -728,10 +750,18 @@ def find_exchange(name) @exchanges[name] end + # Unique string supposed to be used as a consumer tag. + # + # @return [String] Unique string. + # @api plugin + def generate_consumer_tag(name = "bunny") + "#{name}-#{Time.now.to_i * 1000}-#{Kernel.rand(999_999_999_999)}" + end + protected def closed! @status = :closed @work_pool.shutdown @@ -762,16 +792,8 @@ raise @last_channel_error if @last_channel_error end def raise_if_no_longer_open! raise ChannelAlreadyClosed.new("cannot use a channel that was already closed! Channel id: #{@id}", self) if closed? - end - - # Unique string supposed to be used as a consumer tag. - # - # @return [String] Unique string. - # @api plugin - def generate_consumer_tag(name = "bunny") - "#{name}-#{Time.now.to_i * 1000}-#{Kernel.rand(999_999_999_999)}" end end end