Sha256: 95484eb586a6c1866dbc63105d75ae6b450ad57928b8451bd2570185f2172b73
Contents?: true
Size: 1.88 KB
Versions: 14
Compression:
Stored size: 1.88 KB
Contents
require 'thread' module Skylight module Util # Simple thread-safe queue backed by a ring buffer. Will only block when # poping. class Queue def initialize(max) unless max > 0 raise ArgumentError, "queue size must be positive" end @max = max @values = [nil] * max @consume = 0 @produce = 0 @waiting = [] @mutex = Mutex.new end def empty? @mutex.synchronize { __empty? } end def length @mutex.synchronize { __length } end # Returns the number of items in the queue or nil if the queue is full def push(obj) ret = nil @mutex.synchronize do return if __length == @max @values[@produce] = obj @produce = (@produce + 1) % @max ret = __length # Wakeup a blocked thread begin t = @waiting.shift t.wakeup if t rescue ThreadError retry end end ret end def pop(timeout = nil) if timeout && timeout < 0 raise ArgumentError, "timeout must be nil or >= than 0" end @mutex.synchronize do if __empty? if !timeout || timeout > 0 t = Thread.current @waiting << t @mutex.sleep(timeout) # Ensure that the thread is not in the waiting list @waiting.delete(t) else return end end __pop unless __empty? end end private def __length ((@produce - @consume) % @max) end def __empty? @produce == @consume end def __pop i = @consume v = @values[i] @values[i] = nil @consume = (i + 1) % @max return v end end end end
Version data entries
14 entries across 14 versions & 1 rubygems