lib/bunny/channel.rb in bunny-0.9.0.pre9 vs lib/bunny/channel.rb in bunny-0.9.0.pre10

- old
+ new

@@ -9,10 +9,16 @@ require "bunny/delivery_info" require "bunny/return_info" require "bunny/message_properties" +if defined?(JRUBY_VERSION) + require "bunny/concurrent/linked_continuation_queue" +else + require "bunny/concurrent/continuation_queue" +end + module Bunny # ## What are AMQP channels # # To quote {http://www.rabbitmq.com/resources/specs/amqp0-9-1.pdf AMQP 0.9.1 specification}: # @@ -167,13 +173,12 @@ @publishing_mutex = Mutex.new @consumer_mutex = Mutex.new @unconfirmed_set_mutex = Mutex.new - @continuations = ::Queue.new - @confirms_continuations = ::Queue.new - @basic_get_continuations = ::Queue.new + self.reset_continuations + # threads awaiting on continuations. Used to unblock # them when network connection goes down so that busy loops # that perform synchronous operations can work. MK. @threads_waiting_on_continuations = Set.new @threads_waiting_on_confirms_continuations = Set.new @@ -1733,7 +1738,27 @@ # @private def raise_if_no_longer_open! raise ChannelAlreadyClosed.new("cannot use a channel that was already closed! Channel id: #{@id}", self) if closed? end - end -end + + # @api private + def reset_continuations + @continuations = new_continuation + @confirms_continuations = new_continuation + @basic_get_continuations = new_continuation + end + + + if defined?(JRUBY_VERSION) + # @api private + def new_continuation + Concurrent::LinkedContinuationQueue.new + end + else + # @api private + def new_continuation + Concurrent::ContinuationQueue.new + end + end # if defined? + end # Channel +end # Bunny