lib/proco/queue/base.rb in proco-0.0.1 vs lib/proco/queue/base.rb in proco-0.0.2

- old
+ new

@@ -10,15 +10,16 @@ def to_s "Queue invalidated" end end - def initialize size + def initialize size, delay super() - @size = size - @items = [] - @valid = true + @size = size + @delay = delay || 0 + @items = [] + @valid = true end def invalidate broadcast do @valid = false @@ -38,13 +39,29 @@ @mtx.unlock end def take @mtx.lock + wait_at = nil while true empty = @items.empty? - break unless empty + unless empty + if wait_at && @delay > 0 + n = Time.now + t = wait_at + @delay + t += @delay * ((n - t) / @delay).to_i if t < n + t += @delay if t < n + + # Haven't took anything. + # No need to broadcast to blocked pushers + @mtx.unlock + sleep t - n + @mtx.lock + end + break + end return nil unless @valid + wait_at = Time.now @cv.wait @mtx end take_impl ensure @cv.broadcast