lib/bunny/concurrent/continuation_queue.rb in bunny-1.7.1 vs lib/bunny/concurrent/continuation_queue.rb in bunny-2.0.0.rc1
- old
+ new
@@ -4,38 +4,55 @@
module Concurrent
# Continuation queue implementation for MRI and Rubinius
#
# @private
class ContinuationQueue
- def initialize(*args, &block)
- @q = ::Queue.new(*args)
+ def initialize
+ @q = []
+ @lock = ::Mutex.new
+ @cond = ::ConditionVariable.new
end
- def push(*args)
- @q.push(*args)
+ def push(item)
+ @lock.synchronize do
+ @q.push(item)
+ @cond.signal
+ end
end
alias << push
def pop
- @q.pop
+ poll
end
def poll(timeout_in_ms = nil)
- if timeout_in_ms
- Bunny::Timeout.timeout(timeout_in_ms / 1000.0, ::Timeout::Error) do
- @q.pop
+ timeout = timeout_in_ms ? timeout_in_ms / 1000.0 : nil
+
+ @lock.synchronize do
+ if @q.empty?
+ @cond.wait(@lock, timeout)
+ raise ::Timeout::Error if @q.empty?
end
- else
- @q.pop
+ item = @q.shift
+ @cond.signal
+
+ item
end
end
def clear
- @q.clear
+ @lock.synchronize do
+ @q.clear
+ end
end
- def method_missing(selector, *args, &block)
- @q.__send__(selector, *args, &block)
+ def empty?
+ @q.empty?
end
+
+ def size
+ @q.size
+ end
+ alias length size
end
end
end