lib/bunny/concurrent/continuation_queue.rb in bunny-1.7.1 vs lib/bunny/concurrent/continuation_queue.rb in bunny-2.0.0.rc1

- old
+ new

@@ -4,38 +4,55 @@ module Concurrent # Continuation queue implementation for MRI and Rubinius # # @private class ContinuationQueue - def initialize(*args, &block) - @q = ::Queue.new(*args) + def initialize + @q = [] + @lock = ::Mutex.new + @cond = ::ConditionVariable.new end - def push(*args) - @q.push(*args) + def push(item) + @lock.synchronize do + @q.push(item) + @cond.signal + end end alias << push def pop - @q.pop + poll end def poll(timeout_in_ms = nil) - if timeout_in_ms - Bunny::Timeout.timeout(timeout_in_ms / 1000.0, ::Timeout::Error) do - @q.pop + timeout = timeout_in_ms ? timeout_in_ms / 1000.0 : nil + + @lock.synchronize do + if @q.empty? + @cond.wait(@lock, timeout) + raise ::Timeout::Error if @q.empty? end - else - @q.pop + item = @q.shift + @cond.signal + + item end end def clear - @q.clear + @lock.synchronize do + @q.clear + end end - def method_missing(selector, *args, &block) - @q.__send__(selector, *args, &block) + def empty? + @q.empty? end + + def size + @q.size + end + alias length size end end end