lib/proco/queue/base.rb in proco-0.0.1 vs lib/proco/queue/base.rb in proco-0.0.2
- old
+ new
@@ -10,15 +10,16 @@
def to_s
"Queue invalidated"
end
end
- def initialize size
+ def initialize size, delay
super()
- @size = size
- @items = []
- @valid = true
+ @size = size
+ @delay = delay || 0
+ @items = []
+ @valid = true
end
def invalidate
broadcast do
@valid = false
@@ -38,13 +39,29 @@
@mtx.unlock
end
def take
@mtx.lock
+ wait_at = nil
while true
empty = @items.empty?
- break unless empty
+ unless empty
+ if wait_at && @delay > 0
+ n = Time.now
+ t = wait_at + @delay
+ t += @delay * ((n - t) / @delay).to_i if t < n
+ t += @delay if t < n
+
+ # Haven't took anything.
+ # No need to broadcast to blocked pushers
+ @mtx.unlock
+ sleep t - n
+ @mtx.lock
+ end
+ break
+ end
return nil unless @valid
+ wait_at = Time.now
@cv.wait @mtx
end
take_impl
ensure
@cv.broadcast