lib/bunny/channel.rb in bunny-0.9.0.pre9 vs lib/bunny/channel.rb in bunny-0.9.0.pre10
- old
+ new
@@ -9,10 +9,16 @@
require "bunny/delivery_info"
require "bunny/return_info"
require "bunny/message_properties"
+if defined?(JRUBY_VERSION)
+ require "bunny/concurrent/linked_continuation_queue"
+else
+ require "bunny/concurrent/continuation_queue"
+end
+
module Bunny
# ## What are AMQP channels
#
# To quote {http://www.rabbitmq.com/resources/specs/amqp0-9-1.pdf AMQP 0.9.1 specification}:
#
@@ -167,13 +173,12 @@
@publishing_mutex = Mutex.new
@consumer_mutex = Mutex.new
@unconfirmed_set_mutex = Mutex.new
- @continuations = ::Queue.new
- @confirms_continuations = ::Queue.new
- @basic_get_continuations = ::Queue.new
+ self.reset_continuations
+
# threads awaiting on continuations. Used to unblock
# them when network connection goes down so that busy loops
# that perform synchronous operations can work. MK.
@threads_waiting_on_continuations = Set.new
@threads_waiting_on_confirms_continuations = Set.new
@@ -1733,7 +1738,27 @@
# @private
def raise_if_no_longer_open!
raise ChannelAlreadyClosed.new("cannot use a channel that was already closed! Channel id: #{@id}", self) if closed?
end
- end
-end
+
+ # @api private
+ def reset_continuations
+ @continuations = new_continuation
+ @confirms_continuations = new_continuation
+ @basic_get_continuations = new_continuation
+ end
+
+
+ if defined?(JRUBY_VERSION)
+ # @api private
+ def new_continuation
+ Concurrent::LinkedContinuationQueue.new
+ end
+ else
+ # @api private
+ def new_continuation
+ Concurrent::ContinuationQueue.new
+ end
+ end # if defined?
+ end # Channel
+end # Bunny