Sha256: 8080b8c417db29ada17d83cf6e1cc70108bbc6b103ebb433b09de547b3be2bc1

Contents?: true

Size: 801 Bytes

Versions: 1

Compression:

Stored size: 801 Bytes

Contents

require 'thread'

class Proco
module Queue
# @private
class Base
  include Proco::MT::Base

  class Invalidated < Exception
    def to_s
      "Queue invalidated"
    end
  end

  def initialize size
    super()
    @size   = size
    @items  = []
    @valid  = true
  end

  def invalidate
    broadcast do
      @valid = false
    end
  end

  def push item
    @mtx.lock
    while true
      raise Invalidated unless @valid
      break if @items.length < @size
      @cv.wait @mtx
    end
    push_impl item
  ensure
    @cv.broadcast
    @mtx.unlock
  end

  def take
    @mtx.lock
    while true
      empty = @items.empty?
      break unless empty
      return nil unless @valid
      @cv.wait @mtx
    end
    take_impl
  ensure
    @cv.broadcast
    @mtx.unlock
  end
end#Base
end#Queue
end


Version data entries

1 entries across 1 versions & 1 rubygems

Version Path
proco-0.0.1 lib/proco/queue/base.rb