Sha256: d2cf7990942aeb00f1ebc3210d3ea7f2d3db08ef176d97dfa716f5e0060a1744
Contents?: true
Size: 1.2 KB
Versions: 1
Compression:
Stored size: 1.2 KB
Contents
require 'thread' class Proco module Queue # @private class Base include Proco::MT::Base class Invalidated < Exception def to_s "Queue invalidated" end end def initialize size, delay super() @size = size @delay = delay || 0 @items = [] @valid = true end def invalidate broadcast do @valid = false end end def push item @mtx.lock while true raise Invalidated unless @valid break if @items.length < @size @cv.wait @mtx end push_impl item ensure @cv.broadcast @mtx.unlock end def take @mtx.lock wait_at = nil while true empty = @items.empty? unless empty if wait_at && @delay > 0 n = Time.now t = wait_at + @delay t += @delay * ((n - t) / @delay).to_i if t < n t += @delay if t < n # Haven't took anything. # No need to broadcast to blocked pushers @mtx.unlock sleep t - n @mtx.lock end break end return nil unless @valid wait_at = Time.now @cv.wait @mtx end take_impl ensure @cv.broadcast @mtx.unlock end end#Base end#Queue end
Version data entries
1 entries across 1 versions & 1 rubygems
Version | Path |
---|---|
proco-0.0.2 | lib/proco/queue/base.rb |