Sha256: 476e1e9e5645b392bd30a2fce8266bdfb800c7b651a7495477ed020b877d8803
Contents?: true
Size: 1.26 KB
Versions: 7
Compression:
Stored size: 1.26 KB
Contents
module ConcurrentWorker require 'thread' class RequestCounter def initialize @count = Queue.new @com = Queue.new end def push(args) @count.push(args) end def pop Thread.handle_interrupt(Object => :never) do r = @count.pop @com.push(true) r end end def wait_until_less_than(n) return if @count.size < n while @com.pop break if @count.size < n end end def empty? @count.empty? end def size @count.size end def close @count.close end def closed? @count.closed? end end class IPCDuplexChannel def initialize @p_pid = Process.pid @p2c = IO.pipe('ASCII-8BIT', 'ASCII-8BIT') @c2p = IO.pipe('ASCII-8BIT', 'ASCII-8BIT') end def choose_io w_pipe, r_pipe = @p_pid == Process.pid ? [@p2c, @c2p] : [@c2p, @p2c] @wio, @rio = w_pipe[1], r_pipe[0] [w_pipe[0], r_pipe[1]].map(&:close) end def send(obj) begin Marshal.dump(obj, @wio) end end def recv begin Marshal.load(@rio) rescue IOError raise StopIteration end end def close [@wio, @rio].map(&:close) end end end
Version data entries
7 entries across 7 versions & 1 rubygems