Sha256: d4cada097095b94772cb8f2147782833f8bab19c9124fc27b8ae836a14ef8f18
Contents?: true
Size: 1.61 KB
Versions: 1
Compression:
Stored size: 1.61 KB
Contents
module Agent module Transport class Queue @@registry = {} def initialize(name, max = 1) raise ArgumentError, "queue size must be positive" unless max > 0 @name = name @max = max if !@@registry[@name] @@registry[@name] = { :que => [], :wait => [], :mutex => Mutex.new, :cvar => ConditionVariable.new } end end def data; @@registry[@name]; end def que; data[:que]; end def wait; data[:wait]; end def mutex; data[:mutex]; end def cvar; data[:cvar]; end def max; @max; end def size; que.size; end def length; que.size; end def push(obj) mutex.synchronize { while true break if que.length < @max cvar.wait(mutex) end que.push obj cvar.signal } end alias << push alias enq push def pop(*args) mutex.synchronize { while true break if !que.empty? cvar.wait(mutex) end retval = que.shift cvar.signal retval } end alias shift pop alias deq pop def async?; @max > 1; end def send(msg, nonblock = false) raise ThreadError, "buffer full" if nonblock && que.length >= @max push(msg) end def receive(nonblock = false) raise ThreadError, "buffer empty" if nonblock && que.empty? pop end def close @@registry.delete @name end end end end
Version data entries
1 entries across 1 versions & 1 rubygems
Version | Path |
---|---|
agent-0.1.0 | lib/agent/transport/queue.rb |