Sha256: efe3575bfa88e3c2d67d49286ed68a0f189b211aa4a5371921e7f7224096aec5
Contents?: true
Size: 1.71 KB
Versions: 1
Compression:
Stored size: 1.71 KB
Contents
module ConcurrentWorker require 'thread' class RequestCounter def initialize @count = Queue.new @com = Queue.new end def push(args) @count.push(args) end def pop Thread.handle_interrupt(Object => :never) do @count.pop @com.push(true) end end def wait_until_less_than(n) return if @count.size < n while @com.pop break if @count.size < n end end def size @count.size end def close @count.close end def closed? @count.closed? end def rest result = [] until @count.empty? req = @count.pop next if req == [] result.push(req) end result end end class IPCDuplexChannel def initialize @p_pid = Process.pid @p2c = IO.pipe('ASCII-8BIT', 'ASCII-8BIT') @c2p = IO.pipe('ASCII-8BIT', 'ASCII-8BIT') end def choose_io w_pipe, r_pipe = @p_pid == Process.pid ? [@p2c, @c2p] : [@c2p, @p2c] @wio, @rio = w_pipe[1], r_pipe[0] [w_pipe[0], r_pipe[1]].map(&:close) end def send(obj) begin Thread.handle_interrupt(Object => :never) do data = Marshal.dump(obj) @wio.write([data.size].pack("I")) @wio.write(data) end rescue Errno::EPIPE end end def recv begin Thread.handle_interrupt(Object => :on_blocking) do szdata = @rio.read(4) return [] if szdata.nil? size = szdata.unpack("I")[0] Marshal.load(@rio.read(size)) end rescue IOError raise StopIteration end end def close [@wio, @rio].map(&:close) end end end
Version data entries
1 entries across 1 versions & 1 rubygems
Version | Path |
---|---|
concurrent_worker-0.4.0 | lib/concurrent_worker/common.rb |