Sha256: f02514b9119cc5a34c28cc74a28ea051091dca3b1594760aecb1ac2cc44cf6ba

Contents?: true

Size: 1.21 KB

Versions: 38

Compression:

Stored size: 1.21 KB

Contents

require "thread"

module Bunny
  module Concurrent
    # Continuation queue implementation for MRI and Rubinius
    #
    # @private
    class ContinuationQueue
      def initialize
        @q    = []
        @lock = ::Mutex.new
        @cond = ::ConditionVariable.new
      end

      def push(item)
        @lock.synchronize do
          @q.push(item)
          @cond.signal
        end
      end
      alias << push

      def pop
        poll
      end

      def poll(timeout_in_ms = nil)
        timeout = timeout_in_ms ? timeout_in_ms / 1000.0 : nil

        @lock.synchronize do
          timeout_strikes_at = Time.now.utc + (timeout || 0)
          while @q.empty?
            wait = if timeout
                     timeout_strikes_at - Time.now.utc
                   else
                     nil
                   end
            @cond.wait(@lock, wait)
            raise ::Timeout::Error if wait && Time.now.utc >= timeout_strikes_at
          end
          item = @q.shift
          item
        end
      end

      def clear
        @lock.synchronize do
          @q.clear
        end
      end

      def empty?
        @q.empty?
      end

      def size
        @q.size
      end
      alias length size
    end
  end
end

Version data entries

38 entries across 38 versions & 2 rubygems

Version Path
bunny-2.22.0 lib/bunny/concurrent/continuation_queue.rb
bunny-2.21.0 lib/bunny/concurrent/continuation_queue.rb
bunny-2.20.3 lib/bunny/concurrent/continuation_queue.rb
bunny-2.20.2 lib/bunny/concurrent/continuation_queue.rb
bunny-2.20.1 lib/bunny/concurrent/continuation_queue.rb
bunny-2.20.0 lib/bunny/concurrent/continuation_queue.rb
garaio_bunny-2.19.2 lib/bunny/concurrent/continuation_queue.rb
garaio_bunny-2.19.1 lib/bunny/concurrent/continuation_queue.rb
bunny-2.19.0 lib/bunny/concurrent/continuation_queue.rb
bunny-2.18.0 lib/bunny/concurrent/continuation_queue.rb
bunny-2.17.0 lib/bunny/concurrent/continuation_queue.rb
bunny-2.16.1 lib/bunny/concurrent/continuation_queue.rb
bunny-2.15.0 lib/bunny/concurrent/continuation_queue.rb
bunny-2.14.4 lib/bunny/concurrent/continuation_queue.rb
bunny-2.14.3 lib/bunny/concurrent/continuation_queue.rb
bunny-2.14.2 lib/bunny/concurrent/continuation_queue.rb
bunny-2.14.1 lib/bunny/concurrent/continuation_queue.rb
bunny-2.13.0 lib/bunny/concurrent/continuation_queue.rb
bunny-2.12.1 lib/bunny/concurrent/continuation_queue.rb
bunny-2.12.0 lib/bunny/concurrent/continuation_queue.rb